Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulSnow committed Jun 7, 2024
1 parent e034ff0 commit 3d7e13a
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 59 deletions.
6 changes: 1 addition & 5 deletions internal/database/blockchainDB/bfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
//

const (
BufferSize = 1024 * 1024 * 1 // N MB, i.e. N *(1024^2)
BufferSize = 64 * 1024 * 1 // N MB, i.e. N *(1024^2)

BFilePerm = iota // Key/Value pairs where the key is a function of the Value (can't change)
BFileDynamic // Key/Value pair where the value can be updated
Expand Down Expand Up @@ -170,10 +170,6 @@ func NewBFile(Filename string, BufferCnt int) (bFile *BFile, err error) {
}
bFile.CreateBuffers()

if bFile.File, err = os.Create(Filename); err != nil {
return nil, err
}

var offsetB [8]byte // Offset to end of file (8, the length of the offset)
if err := bFile.Write(offsetB[:]); err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions internal/database/blockchainDB/blocklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (b *BlockList) NextBlockFile() (err error) {
b.BlockHeight++

filename := b.GetFilename(b.BlockHeight)
if b.BFile, err = NewBFile( filename,b.BufferCnt); err != nil {
if b.BFile, err = NewBFile(filename, b.BufferCnt); err != nil {
return err
}

Expand All @@ -113,19 +113,19 @@ func NewBlockFile(Directory string, BufferCnt int) (blockFile *BlockList, err er
blockFile.LoadState()

filename := blockFile.GetFilename(blockFile.BlockHeight)
if blockFile.BFile, err = NewBFile(filename,BufferCnt); err != nil {
if blockFile.BFile, err = NewBFile(filename, BufferCnt); err != nil {
return nil, err
}

return blockFile, nil
}

// OpenBFile
// OpenBList
// Open a particular BFile in a BlockList at a given height. If a BFile is
// currently opened, then it is closed. If the BFile being opened does not
// exist (has a height > b.BlockHeight) then the provided Height must be
// b.BlockHeight+1
func (b *BlockList) OpenBFile(Height int, BufferCnt int) (bFile *BFile, err error) {
func (b *BlockList) OpenBList(Height int, BufferCnt int) (bFile *BFile, err error) {
if Height > b.BlockHeight+1 {
return nil, fmt.Errorf("height %d is invalid. current BlockList height is: %d",
Height, b.BlockHeight)
Expand Down
11 changes: 8 additions & 3 deletions internal/database/blockchainDB/blocklist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@ import (
// Create and close a BlockFile
func TestCreateBlockFile(t *testing.T) {
Directory := filepath.Join(os.TempDir(), "BFTest")
os.RemoveAll(Directory)

bf, err := NewBlockList(Directory, 1, 5)
assert.NoError(t, err, "failed to create a BlockFile")
assert.NoError(t, err, "error creating BlockList")
assert.NotNil(t,bf,"failed to create BlockList")
bf.Close()

os.RemoveAll(Directory)
}

func TestOpenBlockFile(t *testing.T) {
Directory := filepath.Join(os.TempDir(), "BFTest")
os.RemoveAll(Directory)

bf, err := NewBlockList(Directory, 1, 5)
assert.NoError(t, err, "failed to create a BlockFile")
bf.Close()
Expand Down Expand Up @@ -60,14 +65,14 @@ func TestBlockFileLoad(t *testing.T) {
fr = NewFastRandom([32]byte{1, 2, 3})
for i := 0; i < 2; i++ {
fmt.Printf("%3d ", i)
_, err := bf.OpenBFile(i, 5)
_, err := bf.OpenBList(i, 5)
assert.NoErrorf(t, err, "failed to open block file %d", i)
for j := 0; j < 10; j++ {
hash := fr.NextHash()
value := fr.RandBuff(100, 300)
v, err := bf.Get(hash)
assert.NoError(t, err, "failed to get value for key")
assert.Equalf(t, value, v, "blk %d pair %d value was not the value expected",j,j)
assert.Equalf(t, value, v, "blk %d pair %d value was not the value expected", j, j)
}
}
fmt.Print("\nDone\n")
Expand Down
25 changes: 20 additions & 5 deletions internal/database/blockchainDB/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ type Shard struct {
Mutex sync.Mutex // Keeps compression from conflict with access
}

// OpenShard
// Open an existing shard
func OpenShard(BufferCnt int, Filename string) (shard *Shard, err error) {
shard = new(Shard)
shard.BufferCnt = BufferCnt
shard.Filename = Filename
shard.Cache = make(map[[32]byte][]byte)
if shard.BFile, err = OpenBFile(BufferCnt,Filename); err != nil {
return nil, err
}
go shard.process()
return shard, err
}

// NewShard
// Create and open a new Shard
func NewShard(BufferCnt int, Filename string) (shard *Shard, err error) {
Expand All @@ -37,9 +51,8 @@ func NewShard(BufferCnt int, Filename string) (shard *Shard, err error) {
return shard, err
}


// process
// Note that process calls rand.Intn() which isn't randomized without a call to
// Note that process calls rand.Intn() which isn't randomized without a call to
// rand.Seed()
func (s *Shard) process() {
for {
Expand Down Expand Up @@ -82,10 +95,12 @@ func (s *Shard) Open() (err error) {
return
}
if s.BFile, err = OpenBFile(s.BufferCnt, s.Filename); err != nil {
if os.IsNotExist(err) {
s.BFile, err = NewBFile(s.Filename, s.BufferCnt)
if !os.IsNotExist(err) { // Can't deal with errors other than does not exist
return err
}
if s.BFile, err = NewBFile(s.Filename, s.BufferCnt); err != nil {
return err // If file creation fails, return that error
}
return err
}
s.KeyCount = len(s.BFile.Keys)
s.KeyWrites = 0
Expand Down
2 changes: 1 addition & 1 deletion internal/database/blockchainDB/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestShard(t *testing.T) {
writes :=0
reads :=0
start := time.Now()
for i := 0; i < 10000; i++ {
for i := 0; i < 1000; i++ {
if i%100 == 0 && i != 0 {
fmt.Printf("Writes: %10d Reads %10d %13.0f/s \n",writes,reads,
float64(writes+reads)/time.Since(start).Seconds())
Expand Down
101 changes: 61 additions & 40 deletions internal/database/blockchainDB/sharddb.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,51 +7,50 @@ import (
"path/filepath"
)

const (
ShardBits = 8
Shards = 256 // Number of shards in bits
)

// ShardDB
// Maintains shards of key value pairs to allow reading and writing of
// key value pairs even during compression and eventually multi-thread
// transactions.
type ShardDB struct {
PermBFile *BlockList // The BFile has the directory and file
BufferCnt int // Buffer count used for BFiles
Shards [Shards]*Shard // List of all the Shards
PermBFile *BlockList // The BFile has the directory and file
BufferCnt int // Buffer count used for BFiles
Shards []*Shard // List of all the Shards
}

func NewShardDB(Directory string, Partition, BufferCnt int) (SDB *ShardDB, err error) {
_, err = os.Stat(Directory)
if err == nil {
return nil, fmt.Errorf("cannot create ShardDB; directory %s exists", Directory)
}
if !os.IsNotExist(err) {
return nil, fmt.Errorf("error getting status on directory '%s': %v", Directory, err)
}
// NewShardDB
// Creates a ShardDB directory and structures. Will overwrite an existing
// file or directory if it exists.
func NewShardDB(Directory string, Partition, ShardCnt, BufferCnt int) (SDB *ShardDB, err error) {
os.RemoveAll(Directory)

SDB = new(ShardDB)
SDB.BufferCnt = BufferCnt
SDB.Shards = make([]*Shard, ShardCnt)
err = os.Mkdir(Directory, os.ModePerm)
if err != nil {
return nil, err
}
SDB.PermBFile, err = NewBlockList(filepath.Join(Directory, "PermBFile"), Partition, BufferCnt)
f, e := os.Create(filepath.Join(Directory, "state.dat"))
if e != nil {
return nil, err
}
defer f.Close()

f.Write([]byte{byte(ShardCnt >> 8), byte(ShardCnt)}) // Write big endian 16 bit shard cnt

SDB.PermBFile, err = NewBlockList(filepath.Join(Directory, "PermBFile.dat"), Partition, BufferCnt)
if err != nil {
return nil, err
}

for i := 0; i < Shards; i++ {
for i := 0; i < len(SDB.Shards); i++ {
sDir := filepath.Join(Directory, fmt.Sprintf("shard%03d-%03d", Partition, i))
err = os.Mkdir(sDir, os.ModePerm)
if err != nil {
os.RemoveAll(Directory)
return nil, err
}
SDB.Shards[i] = new(Shard)
SDB.Shards[i].Filename = filepath.Join(sDir, "shard.dat")
SDB.Shards[i].BufferCnt = BufferCnt
err = SDB.Shards[i].Open()
SDB.Shards[i], err = NewShard(BufferCnt, filepath.Join(sDir, "shard.dat"))
if err != nil {
os.RemoveAll(Directory)
return nil, err
Expand All @@ -61,6 +60,36 @@ func NewShardDB(Directory string, Partition, BufferCnt int) (SDB *ShardDB, err e
return SDB, nil
}

// OpenShardDB
// Opens an existing ShardDB.
func OpenShardDB(Directory string, Partition, BufferCnt int) (SDB *ShardDB, err error) {

// Get the ShardCnt from the ShardDB state.dat file.
var ShardCntBuff [2]byte
f, e1 := os.Open(filepath.Join(Directory, "state.dat"))
defer f.Close()
_, e2 := f.Read(ShardCntBuff[:])
switch {
case e1 != nil:
return nil, e1
case e2 != nil:
return nil, e2
}
ShardCnt := int(binary.BigEndian.Uint16(ShardCntBuff[:]))
_ = ShardCnt
// Open the shards
SDB = new(ShardDB)
//SDB.PermBFile, err = Open(BufferCnt, filepath.Join(Directory,"PermBFile.dat"))
//SDB.Shards = make([]*Shards,ShardCnt)
for i := 0; i < len(SDB.Shards); i++ {
sDir := filepath.Join(Directory, fmt.Sprintf("shard%03d-%03d", Partition, i))
if SDB.Shards[i], err = OpenShard(BufferCnt, filepath.Join(Directory, sDir, "shard.dat")); err != nil {
return nil, err
}
}
return nil, nil
}

func (s *ShardDB) Close() {
if s.PermBFile != nil {
s.PermBFile.Close()
Expand All @@ -72,37 +101,29 @@ func (s *ShardDB) Close() {
}
}

// GetShard
// Get the shard responsible for a given key
func (s *ShardDB) GetShard(key [32]byte) *Shard {
v := int(binary.BigEndian.Uint16(key[:2]))
i := v % len(s.Shards)
return s.Shards[i]
}

func (s *ShardDB) PutH(scratch bool, key [32]byte, value []byte) error {
k := binary.BigEndian.Uint16(key[:]) >> (16 - ShardBits)
shard := s.Shards[k]
if shard == nil {
shard = new(Shard)
s.Shards[k] = shard
}
return s.PermBFile.Put(key, value)
}

// Put
// Put a key into the database
func (s *ShardDB) Put(key [32]byte, value []byte) error {
k := binary.BigEndian.Uint16(key[:]) >> (16 - ShardBits)
shard := s.Shards[k]
if shard == nil {
shard = new(Shard)
shard.Open()
s.Shards[k] = shard
}
shard := s.GetShard(key)
return shard.BFile.Put(key, value)
}

// Get
// Get a key from the DB
func (s *ShardDB) Get(key [32]byte) (value []byte) {
k := binary.BigEndian.Uint16(key[:]) >> (16 - ShardBits)
shard := s.Shards[k]
if shard == nil {
return nil
}
shard := s.GetShard(key)
v, err := shard.BFile.Get(key)
if err != nil && v == nil {
v, _ = s.PermBFile.Get(key) // If the err is not nil, v will be, so no need to check err
Expand Down
19 changes: 18 additions & 1 deletion internal/database/blockchainDB/sharddb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,29 @@ import (
"gitlab.com/accumulatenetwork/accumulate/internal/database/smt/common"
)

func TestAShard(t *testing.T) {
shardDB, err := NewShardDB(Directory,Partition,3,3)
assert.NoError(t,err,"failed to create a shardDB")
shard := shardDB.Shards[0]
fr := NewFastRandom([32]byte{1,2,3})
for i:=0; i< 100000; i++{
shard.Put(fr.NextHash(),fr.RandBuff(100,500))
}
shardDB.Close()
shardDB, err = OpenShardDB(Directory,Partition,3)
assert.NoError(t,err,"failed to open shardDB")
}


func TestShardDB(t *testing.T) {

shardDB,err := NewShardDB(Directory, Partition,5)
shardDB,err := NewShardDB(Directory, Partition,3,5)
defer os.RemoveAll(Directory)

assert.NoError(t,err,"failed to create directory")
if err != nil {
return
}
var r common.RandHash
for i := 0; i < 3; i++ {
key := r.NextA()
Expand Down

0 comments on commit 3d7e13a

Please sign in to comment.