From dde2d70e95a2d486e9de4c2a07d6a4f66fabe1f3 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 7 Apr 2020 11:44:36 -0700 Subject: [PATCH 1/7] Ensure genesis block is properly synced --- .gitignore | 1 + Makefile | 1 + go.mod | 2 +- go.sum | 2 ++ internal/reconciler/reconciler.go | 35 ++++++++++---------- internal/storage/block_storage.go | 1 + internal/syncer/syncer.go | 53 +++++++++++++++++++++---------- 7 files changed, 59 insertions(+), 36 deletions(-) diff --git a/.gitignore b/.gitignore index 1cec70fe..22672235 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ validator-data +rosetta-validator diff --git a/Makefile b/Makefile index c082bb3a..ea3340f3 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,7 @@ salus: validate: docker build -t rosetta-validator .; \ docker run \ + --rm \ -v ${PWD}/validator-data:/data \ -e DATA_DIR="/data" \ -e SERVER_URL="${SERVER_URL}" \ diff --git a/go.mod b/go.mod index 2b11adfc..378d953c 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.13 require ( github.com/caarlos0/env v3.5.0+incompatible - github.com/coinbase/rosetta-sdk-go v0.0.1 + github.com/coinbase/rosetta-sdk-go v0.0.3 github.com/davecgh/go-spew v1.1.1 github.com/dgraph-io/badger v1.6.0 github.com/stretchr/testify v1.5.1 diff --git a/go.sum b/go.sum index f60368c8..56ba7642 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEe github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/coinbase/rosetta-sdk-go v0.0.1 h1:s6oBsnXCEmTvZxNTHZ4+sjSSWEGCtCBO7kTcED3WILc= github.com/coinbase/rosetta-sdk-go v0.0.1/go.mod h1:T7kbh9AOzlxEITJGt2Fu854vxg/yEjy5MsR1woSM5aI= +github.com/coinbase/rosetta-sdk-go v0.0.3 h1:raFtDs4u0P7h7H+HzHpGQzDYctEpL80nx3e/EY4esXk= +github.com/coinbase/rosetta-sdk-go v0.0.3/go.mod h1:T7kbh9AOzlxEITJGt2Fu854vxg/yEjy5MsR1woSM5aI= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= diff --git a/internal/reconciler/reconciler.go b/internal/reconciler/reconciler.go index a972da62..c01f7051 100644 --- a/internal/reconciler/reconciler.go +++ b/internal/reconciler/reconciler.go @@ -69,7 +69,7 @@ const ( // inactiveReconciliationSleep is used as the time.Duration // to sleep when there are no seen accounts to reconcile. - inactiveReconciliationSleep = 30 * time.Second + inactiveReconciliationSleep = 5 * time.Second ) var ( @@ -421,26 +421,25 @@ func simpleAccountAndCurrency(acct *AccountAndCurrency) string { func (r *Reconciler) reconcileActiveAccounts( ctx context.Context, ) error { - for acctIndex := range r.acctQueue { - if ctx.Err() != nil { - return nil - } - - if acctIndex.blockIndex < r.highWaterMark { - continue - } + for { + select { + case <-ctx.Done(): + return ctx.Err() + case acctIndex := <-r.acctQueue: + if acctIndex.blockIndex < r.highWaterMark { + continue + } - err := r.accountReconciliation( - ctx, - acctIndex.accountAndCurrency, - false, - ) - if err != nil { - return err + err := r.accountReconciliation( + ctx, + acctIndex.accountAndCurrency, + false, + ) + if err != nil { + return err + } } } - - return nil } // reconcileInactiveAccounts selects a random account diff --git a/internal/storage/block_storage.go b/internal/storage/block_storage.go index 7f39cc51..84d14280 100644 --- a/internal/storage/block_storage.go +++ b/internal/storage/block_storage.go @@ -407,6 +407,7 @@ func (b *BlockStorage) UpdateBalance( if !ok { return fmt.Errorf("%s is not an integer", amount.Value) } + if newVal.Sign() == -1 { return fmt.Errorf( "%w %+v for %+v at %+v", diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go index 3e2d8d6c..2655612a 100644 --- a/internal/syncer/syncer.go +++ b/internal/syncer/syncer.go @@ -139,6 +139,7 @@ func (s *Syncer) storeBlockBalanceChanges( Account: op.Account, Currency: amount.Currency, } + if !reconciler.ContainsAccountAndCurrency(modifiedAccounts, accountAndCurrency) { modifiedAccounts = append(modifiedAccounts, accountAndCurrency) } @@ -333,6 +334,35 @@ func (s *Syncer) SyncBlockRange( return s.logger.BlockLatency(ctx, allBlocks) } +// nextSyncableRange returns the next range of indexes to sync +// based on what the last processed block in storage is and +// the contents of the network status response. +func (s *Syncer) nextSyncableRange( + ctx context.Context, + networkStatus *rosetta.NetworkStatusResponse, +) (int64, int64, error) { + tx := s.storage.NewDatabaseTransaction(ctx, false) + defer tx.Discard(ctx) + + var startIndex int64 + head, err := s.storage.GetHeadBlockIdentifier(ctx, tx) + if err == nil { + startIndex = head.Index + 1 + } else if err == storage.ErrHeadBlockNotFound { + head = networkStatus.NetworkStatus.NetworkInformation.GenesisBlockIdentifier + startIndex = head.Index + } else { + return -1, -1, err + } + + endIndex := networkStatus.NetworkStatus.NetworkInformation.CurrentBlockIdentifier.Index + if endIndex-startIndex > maxSync { + endIndex = startIndex + maxSync + } + + return startIndex, endIndex, nil +} + // SyncCycle is a single iteration of processing up to maxSync blocks. // SyncCycle is called repeatedly by Sync until there is an error. func (s *Syncer) SyncCycle(ctx context.Context, printNetwork bool) error { @@ -353,29 +383,18 @@ func (s *Syncer) SyncCycle(ctx context.Context, printNetwork bool) error { } } - tx := s.storage.NewDatabaseTransaction(ctx, false) - defer tx.Discard(ctx) - - head, err := s.storage.GetHeadBlockIdentifier(ctx, tx) - if err == storage.ErrHeadBlockNotFound { - head = networkStatus.NetworkStatus.NetworkInformation.GenesisBlockIdentifier - } else if err != nil { + startIndex, endIndex, err := s.nextSyncableRange(ctx, networkStatus) + if err != nil { return err } - currIndex := head.Index + 1 - endIndex := networkStatus.NetworkStatus.NetworkInformation.CurrentBlockIdentifier.Index - if endIndex-currIndex > maxSync { - endIndex = currIndex + maxSync - } - - if currIndex > endIndex { - log.Printf("Next block %d > Blockchain Head %d", currIndex, endIndex) + if startIndex > endIndex { + log.Printf("Next block %d > Blockchain Head %d", startIndex, endIndex) return nil } - log.Printf("Syncing blocks %d-%d\n", currIndex, endIndex) - return s.SyncBlockRange(ctx, currIndex, endIndex) + log.Printf("Syncing blocks %d-%d\n", startIndex, endIndex) + return s.SyncBlockRange(ctx, startIndex, endIndex) } // Sync cycles endlessly until there is an error. From 480cf5316eafe94362602622e0cb64c7f78f0996 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 7 Apr 2020 11:54:30 -0700 Subject: [PATCH 2/7] Ensure genesis block cannot be orphaned --- internal/syncer/syncer.go | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go index 2655612a..9efa04f9 100644 --- a/internal/syncer/syncer.go +++ b/internal/syncer/syncer.go @@ -275,9 +275,11 @@ func (s *Syncer) ProcessBlock( } // SyncBlockRange syncs blocks from startIndex to endIndex, inclusive. -// This function handles re-orgs that may occur while syncing. +// This function handles re-orgs that may occur while syncing as long +// as the genesisIndex is not orphaned. func (s *Syncer) SyncBlockRange( ctx context.Context, + genesisIndex int64, startIndex int64, endIndex int64, ) error { @@ -326,6 +328,10 @@ func (s *Syncer) SyncBlockRange( return err } + if newIndex < genesisIndex { + return errors.New("cannot orphan genesis block") + } + currIndex = newIndex allBlocks = append(allBlocks, block) s.reconciler.QueueAccounts(ctx, block.Block.BlockIdentifier.Index, modifiedAccounts) @@ -340,19 +346,21 @@ func (s *Syncer) SyncBlockRange( func (s *Syncer) nextSyncableRange( ctx context.Context, networkStatus *rosetta.NetworkStatusResponse, -) (int64, int64, error) { +) (int64, int64, int64, error) { tx := s.storage.NewDatabaseTransaction(ctx, false) defer tx.Discard(ctx) + genesisBlockIdentifier := networkStatus.NetworkStatus.NetworkInformation.GenesisBlockIdentifier + var startIndex int64 head, err := s.storage.GetHeadBlockIdentifier(ctx, tx) if err == nil { startIndex = head.Index + 1 } else if err == storage.ErrHeadBlockNotFound { - head = networkStatus.NetworkStatus.NetworkInformation.GenesisBlockIdentifier + head = genesisBlockIdentifier startIndex = head.Index } else { - return -1, -1, err + return -1, -1, -1, err } endIndex := networkStatus.NetworkStatus.NetworkInformation.CurrentBlockIdentifier.Index @@ -360,7 +368,7 @@ func (s *Syncer) nextSyncableRange( endIndex = startIndex + maxSync } - return startIndex, endIndex, nil + return genesisBlockIdentifier.Index, startIndex, endIndex, nil } // SyncCycle is a single iteration of processing up to maxSync blocks. @@ -383,7 +391,7 @@ func (s *Syncer) SyncCycle(ctx context.Context, printNetwork bool) error { } } - startIndex, endIndex, err := s.nextSyncableRange(ctx, networkStatus) + genesisIndex, startIndex, endIndex, err := s.nextSyncableRange(ctx, networkStatus) if err != nil { return err } @@ -394,7 +402,12 @@ func (s *Syncer) SyncCycle(ctx context.Context, printNetwork bool) error { } log.Printf("Syncing blocks %d-%d\n", startIndex, endIndex) - return s.SyncBlockRange(ctx, startIndex, endIndex) + return s.SyncBlockRange( + ctx, + genesisIndex, + startIndex, + endIndex, + ) } // Sync cycles endlessly until there is an error. From 281a62770032f064542567315eb09ba87ae915d1 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 7 Apr 2020 12:42:44 -0700 Subject: [PATCH 3/7] Fix genesis orphan --- internal/syncer/syncer.go | 12 ++++------ internal/syncer/syncer_test.go | 44 ++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go index 9efa04f9..208570fe 100644 --- a/internal/syncer/syncer.go +++ b/internal/syncer/syncer.go @@ -220,6 +220,7 @@ func (s *Syncer) AddBlock( // head should be orphaned. func (s *Syncer) ProcessBlock( ctx context.Context, + genesisIndex int64, currIndex int64, block *rosetta.Block, ) ([]*reconciler.AccountAndCurrency, int64, error) { @@ -234,8 +235,9 @@ func (s *Syncer) ProcessBlock( var modifiedAccounts []*reconciler.AccountAndCurrency var newIndex int64 if reorg { - if currIndex == 0 { - return nil, 0, errors.New("Can't reorg genesis block") + newIndex = currIndex - 1 + if newIndex == genesisIndex { + return nil, 0, errors.New("cannot orphan genesis block") } head, err := s.storage.GetHeadBlockIdentifier(ctx, tx) @@ -248,7 +250,6 @@ func (s *Syncer) ProcessBlock( return nil, currIndex, err } - newIndex = currIndex - 1 err = s.logger.BlockStream(ctx, block, true) if err != nil { log.Printf("Unable to log block %v\n", err) @@ -321,6 +322,7 @@ func (s *Syncer) SyncBlockRange( // Can't return modifiedAccounts without creating new variable modifiedAccounts, newIndex, err := s.ProcessBlock( ctx, + genesisIndex, currIndex, block.Block, ) @@ -328,10 +330,6 @@ func (s *Syncer) SyncBlockRange( return err } - if newIndex < genesisIndex { - return errors.New("cannot orphan genesis block") - } - currIndex = newIndex allBlocks = append(allBlocks, block) s.reconciler.QueueAccounts(ctx, block.Block.BlockIdentifier.Index, modifiedAccounts) diff --git a/internal/syncer/syncer_test.go b/internal/syncer/syncer_test.go index bb17de2b..f55e29f7 100644 --- a/internal/syncer/syncer_test.go +++ b/internal/syncer/syncer_test.go @@ -152,6 +152,18 @@ var ( }, } + orphanGenesis = &rosetta.Block{ + BlockIdentifier: &rosetta.BlockIdentifier{ + Hash: "1", + Index: 1, + }, + ParentBlockIdentifier: &rosetta.BlockIdentifier{ + Hash: "0a", + Index: 0, + }, + Transactions: []*rosetta.Transaction{}, + } + blockSequenceReorg = []*rosetta.Block{ &rosetta.Block{ // genesis BlockIdentifier: &rosetta.BlockIdentifier{ @@ -262,10 +274,12 @@ func TestNoReorgProcessBlock(t *testing.T) { rec := reconciler.New(ctx, nil, blockStorage, fetcher, logger, 1) syncer := New(ctx, nil, blockStorage, fetcher, logger, rec) currIndex := int64(0) + genesisIndex := blockSequenceNoReorg[0].BlockIdentifier.Index t.Run("No block exists", func(t *testing.T) { modifiedAccounts, newIndex, err := syncer.ProcessBlock( ctx, + genesisIndex, currIndex, blockSequenceNoReorg[0], ) @@ -284,6 +298,7 @@ func TestNoReorgProcessBlock(t *testing.T) { t.Run("Block exists, no reorg", func(t *testing.T) { modifiedAccounts, newIndex, err := syncer.ProcessBlock( ctx, + genesisIndex, currIndex, blockSequenceNoReorg[1], ) @@ -302,6 +317,7 @@ func TestNoReorgProcessBlock(t *testing.T) { t.Run("Block with transaction", func(t *testing.T) { modifiedAccounts, newIndex, err := syncer.ProcessBlock( ctx, + genesisIndex, currIndex, blockSequenceNoReorg[2], ) @@ -336,6 +352,7 @@ func TestNoReorgProcessBlock(t *testing.T) { t.Run("Block with invalid transaction", func(t *testing.T) { modifiedAccounts, newIndex, err := syncer.ProcessBlock( ctx, + genesisIndex, currIndex, blockSequenceNoReorg[3], ) @@ -380,10 +397,12 @@ func TestReorgProcessBlock(t *testing.T) { rec := reconciler.New(ctx, nil, blockStorage, fetcher, logger, 1) syncer := New(ctx, nil, blockStorage, fetcher, logger, rec) currIndex := int64(0) + genesisIndex := blockSequenceReorg[0].BlockIdentifier.Index t.Run("No block exists", func(t *testing.T) { modifiedAccounts, newIndex, err := syncer.ProcessBlock( ctx, + genesisIndex, currIndex, blockSequenceReorg[0], ) @@ -399,9 +418,29 @@ func TestReorgProcessBlock(t *testing.T) { assert.NoError(t, err) }) + t.Run("Orphan genesis", func(t *testing.T) { + modifiedAccounts, newIndex, err := syncer.ProcessBlock( + ctx, + genesisIndex, + currIndex, + orphanGenesis, + ) + + assert.Equal(t, int64(0), newIndex) + assert.Equal(t, 0, len(modifiedAccounts)) + assert.EqualError(t, err, "cannot orphan genesis block") + + tx := syncer.storage.NewDatabaseTransaction(ctx, false) + head, err := syncer.storage.GetHeadBlockIdentifier(ctx, tx) + tx.Discard(ctx) + assert.Equal(t, blockSequenceReorg[0].BlockIdentifier, head) + assert.NoError(t, err) + }) + t.Run("Block exists, no reorg", func(t *testing.T) { modifiedAccounts, newIndex, err := syncer.ProcessBlock( ctx, + genesisIndex, currIndex, blockSequenceReorg[1], ) @@ -435,6 +474,7 @@ func TestReorgProcessBlock(t *testing.T) { // Orphan block modifiedAccounts, newIndex, err := syncer.ProcessBlock( ctx, + genesisIndex, currIndex, blockSequenceReorg[2], ) @@ -482,6 +522,7 @@ func TestReorgProcessBlock(t *testing.T) { // Process new block modifiedAccounts, currIndex, err = syncer.ProcessBlock( ctx, + genesisIndex, currIndex, blockSequenceReorg[3], ) @@ -497,6 +538,7 @@ func TestReorgProcessBlock(t *testing.T) { modifiedAccounts, currIndex, err = syncer.ProcessBlock( ctx, + genesisIndex, currIndex, blockSequenceReorg[2], ) @@ -517,6 +559,7 @@ func TestReorgProcessBlock(t *testing.T) { modifiedAccounts, currIndex, err = syncer.ProcessBlock( ctx, + genesisIndex, currIndex, blockSequenceReorg[4], ) @@ -534,6 +577,7 @@ func TestReorgProcessBlock(t *testing.T) { t.Run("Out of order block", func(t *testing.T) { modifiedAccounts, newIndex, err := syncer.ProcessBlock( ctx, + genesisIndex, currIndex, blockSequenceReorg[5], ) From 465591b856928aaa5cf1d1b6dfbff3f18622a8e0 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 7 Apr 2020 13:00:13 -0700 Subject: [PATCH 4/7] Add tests for nextSyncableRange --- internal/syncer/syncer_test.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/internal/syncer/syncer_test.go b/internal/syncer/syncer_test.go index f55e29f7..22d681c5 100644 --- a/internal/syncer/syncer_test.go +++ b/internal/syncer/syncer_test.go @@ -247,6 +247,9 @@ var ( GenesisBlockIdentifier: &rosetta.BlockIdentifier{ Index: 0, }, + CurrentBlockIdentifier: &rosetta.BlockIdentifier{ + Index: 1000, + }, }, }, Options: &rosetta.Options{ @@ -378,6 +381,25 @@ func TestNoReorgProcessBlock(t *testing.T) { }) } +// assertNextSyncableRange is a helper function used to test +// the nextSyncableRange function during block processing. +func assertNextSyncableRange( + ctx context.Context, + t *testing.T, + syncer *Syncer, + currIndex int64, +) { + genesisIndex, startIndex, endIndex, err := syncer.nextSyncableRange( + ctx, + networkStatusResponse, + ) + + assert.Equal(t, int64(0), genesisIndex) + assert.Equal(t, currIndex, startIndex) + assert.Equal(t, currIndex+maxSync, endIndex) + assert.NoError(t, err) +} + func TestReorgProcessBlock(t *testing.T) { ctx := context.Background() @@ -400,6 +422,9 @@ func TestReorgProcessBlock(t *testing.T) { genesisIndex := blockSequenceReorg[0].BlockIdentifier.Index t.Run("No block exists", func(t *testing.T) { + assertNextSyncableRange(ctx, t, syncer, currIndex) + + // Add genesis block modifiedAccounts, newIndex, err := syncer.ProcessBlock( ctx, genesisIndex, @@ -416,6 +441,8 @@ func TestReorgProcessBlock(t *testing.T) { tx.Discard(ctx) assert.Equal(t, blockSequenceReorg[0].BlockIdentifier, head) assert.NoError(t, err) + + assertNextSyncableRange(ctx, t, syncer, currIndex) }) t.Run("Orphan genesis", func(t *testing.T) { @@ -468,6 +495,8 @@ func TestReorgProcessBlock(t *testing.T) { }, amounts) assert.Equal(t, blockSequenceReorg[1].BlockIdentifier, block) assert.NoError(t, err) + + assertNextSyncableRange(ctx, t, syncer, currIndex) }) t.Run("Orphan block", func(t *testing.T) { @@ -489,6 +518,7 @@ func TestReorgProcessBlock(t *testing.T) { }, }, modifiedAccounts) assert.NoError(t, err) + assertNextSyncableRange(ctx, t, syncer, currIndex) // Assert head is back to genesis tx := syncer.storage.NewDatabaseTransaction(ctx, false) @@ -529,6 +559,7 @@ func TestReorgProcessBlock(t *testing.T) { assert.Equal(t, int64(2), currIndex) assert.Equal(t, 0, len(modifiedAccounts)) assert.NoError(t, err) + assertNextSyncableRange(ctx, t, syncer, currIndex) tx = syncer.storage.NewDatabaseTransaction(ctx, false) head, err = syncer.storage.GetHeadBlockIdentifier(ctx, tx) From b268a50088728d5c177fd766c5346cd8e6721636 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 7 Apr 2020 14:04:05 -0700 Subject: [PATCH 5/7] Add ability to bootstrap balances in genesis --- Makefile | 2 + examples/bootstrap_balances.csv | 2 + internal/storage/block_storage.go | 103 ++++++++++++++++++++++++++++++ main.go | 12 ++++ 4 files changed, 119 insertions(+) create mode 100644 examples/bootstrap_balances.csv diff --git a/Makefile b/Makefile index ea3340f3..10514a1e 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,7 @@ LICENCE_SCRIPT=addlicense -c "Coinbase, Inc." -l "apache" -v SERVER_URL?=http://localhost:10000 LOG_TRANSACTIONS?=false LOG_BENCHMARKS?=true +BOOTSTRAP_BALANCES?=false deps: go get ./... @@ -41,6 +42,7 @@ validate: -e ACCOUNT_CONCURRENCY="8" \ -e LOG_TRANSACTIONS="${LOG_TRANSACTIONS}" \ -e LOG_BENCHMARKS="${LOG_BENCHMARKS}" \ + -e BOOTSTRAP_BALANCES="${BOOTSTRAP_BALANCES}" \ --network host \ rosetta-validator \ rosetta-validator; diff --git a/examples/bootstrap_balances.csv b/examples/bootstrap_balances.csv new file mode 100644 index 00000000..90400d92 --- /dev/null +++ b/examples/bootstrap_balances.csv @@ -0,0 +1,2 @@ +AccountIdentifier_address,Amount_value,Currency_symbol,Currency_decimals +1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa,5000000000,BTC,8 diff --git a/internal/storage/block_storage.go b/internal/storage/block_storage.go index 84d14280..f6156121 100644 --- a/internal/storage/block_storage.go +++ b/internal/storage/block_storage.go @@ -18,11 +18,17 @@ import ( "bytes" "context" "crypto/sha256" + "encoding/csv" "encoding/gob" "errors" "fmt" + "io" "log" "math/big" + "os" + "path" + "strconv" + "strings" rosetta "github.com/coinbase/rosetta-sdk-go/gen" @@ -73,6 +79,18 @@ const ( // balanceNamespace is prepended to any stored balance. balanceNamespace = "balance" + + // bootstrapBalancesFile is loaded to bootstrap the balance + // of a collection of accounts. + bootstrapBalancesFile = "bootstrap_balances.csv" + + // bootstrapBalancesHeader is used as the CSV header + // in the bootstrapBalancesFile. + bootstrapBalancesHeader = "AccountIdentifier_address,Amount_value,Currency_symbol,Currency_decimals" + bootstrapAddressIndex = 0 + bootstrapValueIndex = 1 + bootstrapSymbolIndex = 2 + bootstrapDecimalsIndex = 3 ) /* @@ -497,3 +515,88 @@ func (b *BlockStorage) GetBalance( return deserialBal.Amounts, deserialBal.Block, nil } + +// BootstrapBalances is utilized to set the balance of +// any number of AccountIdentifiers at the genesis blocks. +// This is particularly useful for setting the value of +// accounts that received an allocation in the genesis block. +func (b *BlockStorage) BootstrapBalances( + ctx context.Context, + dataDir string, + genesisBlockIdentifier *rosetta.BlockIdentifier, +) error { + f, err := os.Open(path.Join(dataDir, bootstrapBalancesFile)) + if err != nil { + return err + } + + dbTransaction := b.NewDatabaseTransaction(ctx, true) + defer dbTransaction.Discard(ctx) + + _, err = b.GetHeadBlockIdentifier(ctx, dbTransaction) + if err != ErrHeadBlockNotFound { + return errors.New("cannot bootstrap accounts already started syncing") + } + + csvReader := csv.NewReader(f) + rowsRead := 0 + for { + record, err := csvReader.Read() + if err == io.EOF { + break + } + rowsRead++ + + // Assert header is correct + if rowsRead == 1 { + if bootstrapBalancesHeader != strings.Join(record[:], ",") { + return errors.New("incorrect header on bootstrap file") + } + + continue + } + + account := &rosetta.AccountIdentifier{ + Address: record[bootstrapAddressIndex], + } + + currencyDecimals, err := strconv.Atoi(record[bootstrapDecimalsIndex]) + if err != nil { + return err + } + + amount := &rosetta.Amount{ + Value: record[bootstrapValueIndex], + Currency: &rosetta.Currency{ + Symbol: record[bootstrapSymbolIndex], + Decimals: int32(currencyDecimals), + }, + } + + log.Printf( + "Setting account %s balance to %s %+v\n", + account.Address, + amount.Value, + amount.Currency, + ) + + err = b.UpdateBalance( + ctx, + dbTransaction, + account, + amount, + genesisBlockIdentifier, + ) + if err != nil { + return err + } + } + + err = dbTransaction.Commit(ctx) + if err != nil { + return err + } + + log.Printf("%d Balances Bootstrapped\n", rowsRead-1) + return nil +} diff --git a/main.go b/main.go index c4376377..1b4b1e1e 100644 --- a/main.go +++ b/main.go @@ -40,6 +40,7 @@ type config struct { AccountConcurrency int `env:"ACCOUNT_CONCURRENCY,required"` LogTransactions bool `env:"LOG_TRANSACTIONS,required"` LogBenchmarks bool `env:"LOG_BENCHMARKS,required"` + BootstrapBalances bool `env:"BOOTSTRAP_BALANCES,required"` } func main() { @@ -78,6 +79,17 @@ func main() { } blockStorage := storage.NewBlockStorage(ctx, localStore) + if cfg.BootstrapBalances { + err = blockStorage.BootstrapBalances( + ctx, + cfg.DataDir, + networkResponse.NetworkStatus.NetworkInformation.GenesisBlockIdentifier, + ) + if err != nil { + log.Fatal(err) + } + } + logger := logger.NewLogger(cfg.DataDir, cfg.LogTransactions, cfg.LogBenchmarks) g, ctx := errgroup.WithContext(ctx) From 34b7591425b12f06363926de3ee3d9abd40d9d66 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 7 Apr 2020 14:17:34 -0700 Subject: [PATCH 6/7] Update readme to mention bootstrap functionality --- README.md | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index c4f960a6..8940d61f 100644 --- a/README.md +++ b/README.md @@ -18,11 +18,6 @@ and wallets to integrate with much less communication overhead and network-specific work. ## Run the Validator - -The validator needs the URL of the Rosetta server configured. This can be set -as an environment variable named `SERVER_URL`, passed as an argument to make eg `make SERVER_URL= validate` -or editing `Makefile` itself. - 1. Start your Rosetta Server (and the blockchain node it connects to if it is not a single binary. 2. Start the validator using `make SERVER_URL= validate`. @@ -30,10 +25,25 @@ not a single binary. by setting `LOG_TRANSACTIONS="true"` in the environment or as a `make` argument. 4. Watch for errors in the processing logs. Any error will cause the validator to stop. 5. Analyze benchmarks from `validator-data/block_benchmarks.csv` and - `validator-data/account_benchmarks.csv` by setting `LOG_BENCHMARKS="true"` in the environment or as a `make` argument. +`validator-data/account_benchmarks.csv` by setting `LOG_BENCHMARKS="true"` in +the environment or as a `make` argument. + +### Setting the Server URL +The validator needs the URL of the Rosetta server configured. This can be set +as an environment variable named `SERVER_URL`, passed as an argument to make +(ex: `make SERVER_URL= validate`) or editing `Makefile` itself. + +### Bootstrapping Balances +Blockchains that set balances in genesis must create a `bootstrap_balances.csv` +file in the `/validator-data` directory and pass `BOOTSTRAP_BALANCES=true` as an +argument to make. If balances are not bootsrapped and balances are set in genesis, +reconciliation will fail. + +There is an example file in `examples/bootstrap_balances.csv`. -_There is no additional setting required to support blockchains with reorgs. This -is handled automatically!_ +### Re-orgable Blockchains +There is no additional setting required to support blockchains with reorgs. This +is handled automatically! ## Development * `make deps` to install dependencies From c7acdd86a576137ae31977c955607fa3f522fe6c Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 7 Apr 2020 15:48:01 -0700 Subject: [PATCH 7/7] Add test for bootstrapping balances --- internal/storage/block_storage.go | 79 +++++++---- internal/storage/block_storage_test.go | 180 +++++++++++++++++++++++++ 2 files changed, 230 insertions(+), 29 deletions(-) diff --git a/internal/storage/block_storage.go b/internal/storage/block_storage.go index f6156121..260449ac 100644 --- a/internal/storage/block_storage.go +++ b/internal/storage/block_storage.go @@ -35,32 +35,6 @@ import ( "github.com/davecgh/go-spew/spew" ) -var ( - // ErrHeadBlockNotFound is returned when there is no - // head block found in BlockStorage. - ErrHeadBlockNotFound = errors.New("Head block not found") - - // ErrBlockNotFound is returned when a block is not - // found in BlockStorage. - ErrBlockNotFound = errors.New("Block not found") - - // ErrAccountNotFound is returned when an account - // is not found in BlockStorage. - ErrAccountNotFound = errors.New("Account not found") - - // ErrNegativeBalance is returned when an account - // balance goes negative as the result of an operation. - ErrNegativeBalance = errors.New("Negative balance") - - // ErrDuplicateBlockHash is returned when a block hash - // cannot be stored because it is a duplicate. - ErrDuplicateBlockHash = errors.New("Duplicate block hash") - - // ErrDuplicateTransactionHash is returned when a transaction - // hash cannot be stored because it is a duplicate. - ErrDuplicateTransactionHash = errors.New("Duplicate transaction hash") -) - const ( // headBlockKey is used to lookup the head block identifier. // The head block is the block with the largest index that is @@ -84,6 +58,10 @@ const ( // of a collection of accounts. bootstrapBalancesFile = "bootstrap_balances.csv" + // bootstrapBalancesPermissions specifies that the user can + // read and write the file. + bootstrapBalancesPermissions = 0600 + // bootstrapBalancesHeader is used as the CSV header // in the bootstrapBalancesFile. bootstrapBalancesHeader = "AccountIdentifier_address,Amount_value,Currency_symbol,Currency_decimals" @@ -93,6 +71,40 @@ const ( bootstrapDecimalsIndex = 3 ) +var ( + // ErrHeadBlockNotFound is returned when there is no + // head block found in BlockStorage. + ErrHeadBlockNotFound = errors.New("Head block not found") + + // ErrBlockNotFound is returned when a block is not + // found in BlockStorage. + ErrBlockNotFound = errors.New("Block not found") + + // ErrAccountNotFound is returned when an account + // is not found in BlockStorage. + ErrAccountNotFound = errors.New("Account not found") + + // ErrNegativeBalance is returned when an account + // balance goes negative as the result of an operation. + ErrNegativeBalance = errors.New("Negative balance") + + // ErrDuplicateBlockHash is returned when a block hash + // cannot be stored because it is a duplicate. + ErrDuplicateBlockHash = errors.New("Duplicate block hash") + + // ErrDuplicateTransactionHash is returned when a transaction + // hash cannot be stored because it is a duplicate. + ErrDuplicateTransactionHash = errors.New("Duplicate transaction hash") + + // ErrAlreadyStartedSyncing is returned when trying to bootstrap + // balances after syncing has started. + ErrAlreadyStartedSyncing = errors.New("already started syncing") + + // ErrIncorrectHeader is returned when a bootstrap file has an + // incorrect header. + ErrIncorrectHeader = errors.New("incorrect header") +) + /* Key Construction */ @@ -525,7 +537,11 @@ func (b *BlockStorage) BootstrapBalances( dataDir string, genesisBlockIdentifier *rosetta.BlockIdentifier, ) error { - f, err := os.Open(path.Join(dataDir, bootstrapBalancesFile)) + f, err := os.OpenFile( + path.Join(dataDir, bootstrapBalancesFile), + os.O_RDONLY, + bootstrapBalancesPermissions, + ) if err != nil { return err } @@ -535,7 +551,7 @@ func (b *BlockStorage) BootstrapBalances( _, err = b.GetHeadBlockIdentifier(ctx, dbTransaction) if err != ErrHeadBlockNotFound { - return errors.New("cannot bootstrap accounts already started syncing") + return ErrAlreadyStartedSyncing } csvReader := csv.NewReader(f) @@ -550,12 +566,17 @@ func (b *BlockStorage) BootstrapBalances( // Assert header is correct if rowsRead == 1 { if bootstrapBalancesHeader != strings.Join(record[:], ",") { - return errors.New("incorrect header on bootstrap file") + return ErrIncorrectHeader } continue } + // Assert row column length correct + if len(record) != len(strings.Split(bootstrapBalancesHeader, ",")) { + return fmt.Errorf("row %d does not have expected fields: %s", rowsRead, record) + } + account := &rosetta.AccountIdentifier{ Address: record[bootstrapAddressIndex], } diff --git a/internal/storage/block_storage_test.go b/internal/storage/block_storage_test.go index 821ae877..c20802db 100644 --- a/internal/storage/block_storage_test.go +++ b/internal/storage/block_storage_test.go @@ -17,6 +17,8 @@ package storage import ( "context" "fmt" + "os" + "path" "testing" rosetta "github.com/coinbase/rosetta-sdk-go/gen" @@ -615,3 +617,181 @@ func TestGetCurrencyKey(t *testing.T) { }) } } + +func createBootstrapBalancesFile(dataDir string) (*os.File, error) { + return os.OpenFile( + path.Join(dataDir, bootstrapBalancesFile), + os.O_CREATE|os.O_WRONLY, + 0600, + ) +} + +func TestBootstrapBalances(t *testing.T) { + var ( + genesisBlockIdentifier = &rosetta.BlockIdentifier{ + Index: 0, + Hash: "0", + } + + account = &rosetta.AccountIdentifier{ + Address: "hello", + } + ) + + ctx := context.Background() + + newDir, err := CreateTempDir() + assert.NoError(t, err) + defer RemoveTempDir(*newDir) + + database, err := NewBadgerStorage(ctx, *newDir) + assert.NoError(t, err) + defer database.Close(ctx) + + storage := NewBlockStorage(ctx, database) + + t.Run("File doesn't exist", func(t *testing.T) { + err = storage.BootstrapBalances( + ctx, + *newDir, + genesisBlockIdentifier, + ) + assert.EqualError(t, err, fmt.Sprintf( + "open %s: no such file or directory", + path.Join(*newDir, bootstrapBalancesFile), + )) + }) + + t.Run("Set balance successfully", func(t *testing.T) { + f, err := createBootstrapBalancesFile(*newDir) + defer f.Close() + assert.NoError(t, err) + + _, err = f.WriteString(fmt.Sprintf( + "%s\n", + bootstrapBalancesHeader, + )) + assert.NoError(t, err) + + amount := &rosetta.Amount{ + Value: "10", + Currency: &rosetta.Currency{ + Symbol: "BTC", + Decimals: 8, + }, + } + + _, err = f.WriteString(fmt.Sprintf( + "%s,%s,%s,%d\n", + account.Address, + amount.Value, + amount.Currency.Symbol, + amount.Currency.Decimals, + )) + assert.NoError(t, err) + + err = storage.BootstrapBalances( + ctx, + *newDir, + genesisBlockIdentifier, + ) + assert.NoError(t, err) + + tx := storage.NewDatabaseTransaction(ctx, false) + amountMap, blockIdentifier, err := storage.GetBalance( + ctx, + tx, + account, + ) + tx.Discard(ctx) + + assert.Equal(t, amount, amountMap[GetCurrencyKey(amount.Currency)]) + assert.Equal(t, genesisBlockIdentifier, blockIdentifier) + assert.NoError(t, err) + }) + + t.Run("Invalid file header", func(t *testing.T) { + f, err := createBootstrapBalancesFile(*newDir) + defer f.Close() + assert.NoError(t, err) + + _, err = f.WriteString("bad header") + assert.NoError(t, err) + + err = storage.BootstrapBalances( + ctx, + *newDir, + genesisBlockIdentifier, + ) + assert.EqualError(t, err, ErrIncorrectHeader.Error()) + }) + + t.Run("Invalid account row", func(t *testing.T) { + f, err := createBootstrapBalancesFile(*newDir) + defer f.Close() + assert.NoError(t, err) + + _, err = f.WriteString(fmt.Sprintf( + "%s\n", + bootstrapBalancesHeader, + )) + assert.NoError(t, err) + + _, err = f.WriteString("bad row\n") + assert.NoError(t, err) + + err = storage.BootstrapBalances( + ctx, + *newDir, + genesisBlockIdentifier, + ) + assert.EqualError(t, err, "row 2 does not have expected fields: [bad row]") + }) + + t.Run("Invalid account value", func(t *testing.T) { + f, err := createBootstrapBalancesFile(*newDir) + defer f.Close() + assert.NoError(t, err) + + _, err = f.WriteString(fmt.Sprintf( + "%s\n", + bootstrapBalancesHeader, + )) + assert.NoError(t, err) + + amount := &rosetta.Amount{ + Value: "goodbye", + Currency: &rosetta.Currency{ + Symbol: "BTC", + Decimals: 8, + }, + } + + _, err = f.WriteString(fmt.Sprintf( + "%s,%s,%s,%d\n", + account.Address, + amount.Value, + amount.Currency.Symbol, + amount.Currency.Decimals, + )) + assert.NoError(t, err) + + err = storage.BootstrapBalances( + ctx, + *newDir, + genesisBlockIdentifier, + ) + assert.EqualError(t, err, "goodbye is not an integer") + }) + + t.Run("Head block identifier already set", func(t *testing.T) { + tx := storage.NewDatabaseTransaction(ctx, true) + err := storage.StoreHeadBlockIdentifier(ctx, tx, genesisBlockIdentifier) + assert.NoError(t, err) + assert.NoError(t, tx.Commit(ctx)) + + // Use the created CSV file from the last test + err = storage.BootstrapBalances(ctx, *newDir, genesisBlockIdentifier) + assert.EqualError(t, err, ErrAlreadyStartedSyncing.Error()) + }) +}