Skip to content

Commit

Permalink
Merge pull request #4500 from harmony-one/dev
Browse files Browse the repository at this point in the history
* Fixed debug run for mac. (#4484)
* Fix debug run for mac.
* Next validator in view change.  (#4492)
* NthNextValidatorHmy
* Fixed func usage.
* Additional checks.
* HIP-30 Boilerplate (#4495)
* HIP-30: sharding configuration boilerplate
* update test
* update comment
---------

Co-authored-by: Nita Neou (Soph) <[email protected]>
* HIP-30: minimum validator commission of 7% (#4496)
* HIP-30: sharding configuration boilerplate
* update comments
* goimports
* HIP-30: minimum validator commission of 7%

Based on #4495, which must be merged before this PR. This PR should be
rebased with dev after #4495 is merged to retain atomicity of changes by
pull request.
* goimports
* update test
* update test

---------

Co-authored-by: Casey Gardiner <[email protected]>

* HIP-30: Emission split (#4497)
* HIP-30: sharding configuration boilerplate
* update comments
* goimports
* HIP-30: minimum validator commission of 7%

Based on #4495, which must be merged before this PR. This PR should be
rebased with dev after #4495 is merged to retain atomicity of changes by
pull request.

* goimports
* HIP-30: Emission split implementation

Note that the allocated split of the emission goes directly to the
recipient (and not via the Reward). This is because rewards are indexed
by validator and not by delegator, and the recipient may/may not have
any delegations which we can reward. Even if one was guaranteed to
exist, it would mess up the math of the validator.

* set up mainnet recipient of emission split
* HIP-30: Emission split addresses for non mainnet
* update test
* Update mainnet.go

---------

Co-authored-by: Casey Gardiner <[email protected]>

* HIP-30: Set up pre-image generation, recording, export and import (#4494)
* flags: set up preimage flags
* hip30: set up preimage import, export, api
* save pre-images by default
* add pre images api
* goimports
* commit rpc preimages file
* preimages: re-generate them using CLI
* add metrics and numbers for pre-images
* automate generation after import
* move from rpc to core
* add back core/preimages.go file
* export prometheus metric when no error importing preimage
* add preimages flags to rootflags

---------

Co-authored-by: Nita Neou (Soph) <[email protected]>

* HIP-30: Shard reduction (#4498)
* HIP-30: sharding configuration boilerplate
* update comments
* goimports
* HIP-30: minimum validator commission of 7%

Based on #4495, which must be merged before this PR. This PR should be
rebased with dev after #4495 is merged to retain atomicity of changes by
pull request.

* goimports
* HIP-30: Emission split implementation

Note that the allocated split of the emission goes directly to the
recipient (and not via the Reward). This is because rewards are indexed
by validator and not by delegator, and the recipient may/may not have
any delegations which we can reward. Even if one was guaranteed to
exist, it would mess up the math of the validator.

* set up mainnet recipient of emission split
* HIP-30: Emission split addresses for non mainnet
* HIP-30: deactivate shard 2 and 3 validators
* update test
* shard reduction: update block reward

---------

Co-authored-by: Casey Gardiner <[email protected]>

* Fix for index. (#4504)
* Small improvements. (#4477)
* HIP-30: Balance migration (#4499)
* flags: set up preimage flags
* hip30: set up preimage import, export, api
* save pre-images by default
* add pre images api
* goimports
* commit rpc preimages file
* preimages: re-generate them using CLI
* add metrics and numbers for pre-images
* automate generation after import
* move from rpc to core
* add back core/preimages.go file
* HIP-30: sharding configuration boilerplate
* update comments
* HIP-30: minimum validator commission of 7%

Based on #4495, which must be merged before this PR. This PR should be
rebased with dev after #4495 is merged to retain atomicity of changes by
pull request.

* goimports
* HIP-30: Emission split implementation

Note that the allocated split of the emission goes directly to the
recipient (and not via the Reward). This is because rewards are indexed
by validator and not by delegator, and the recipient may/may not have
any delegations which we can reward. Even if one was guaranteed to
exist, it would mess up the math of the validator.

* set up mainnet recipient of emission split
* HIP-30: Emission split addresses for non mainnet
* HIP-30: deactivate shard 2 and 3 validators
* goimports
* update test
* migrate balance uring epoch T - 1

highly untested code. also missing is the ability to generate a
pre-migration report for future verification.

* update test
* export prometheus metric when no error importing preimage
* add comment
* test account migration in localnet
* add preimages flags to rootflags
* enable preimages on the whitelist
* add the generate method
* fix cropping log
* cropping startpoint when bigger than endpoint
* add support for the rpcblocknumer type
* enable import api
* use earlies block
* fix error catching
* make end optional for the comand line
* fix cropping logic
* improve error when apply message fails
* add balance on the error
* fix importing
* remove unused imports

---------

Co-authored-by: Nita Neou (Soph) <[email protected]>
Co-authored-by: Soph <[email protected]>
Co-authored-by: Diego Nava <[email protected]>
Co-authored-by: Diego Nava <[email protected]>

* Hip30 balance migration with fix. (#4502)
* flags: set up preimage flags
* hip30: set up preimage import, export, api
* preimages: re-generate them using CLI
* move from rpc to core
* migrate balance uring epoch T - 1
* test account migration in localnet
* enable preimages on the whitelist
* add the generate method
* fix cropping log
* cropping startpoint when bigger than endpoint
* add support for the rpcblocknumer type
* enable import api
* Fixed stuck.
* Additional logs.
* Rebased on harmony-one:hip30/testing.
* Removed code duplicate.
* Fixed stuck.
* IsOneEpochBeforeHIP30 for only 1 epoch.

---------

Co-authored-by: MaxMustermann2 <[email protected]>
Co-authored-by: Nita Neou (Soph) <[email protected]>
Co-authored-by: Diego Nava <[email protected]>
Co-authored-by: Diego Nava <[email protected]>

* remove double import
* rename variable
* remove unused fmt
* Fixed imports. (#4507)
* Block gas 30m. (#4501)
* BlockGas30M renamed to BlockGas30MEpoch.
* Removed redundant code.
* Hip30 : localnet account migration fix (#4508)
* flags: set up preimage flags
* hip30: set up preimage import, export, api
* save pre-images by default
* add pre images api
* goimports
* commit rpc preimages file
* preimages: re-generate them using CLI
* add metrics and numbers for pre-images
* automate generation after import
* add back core/preimages.go file
* HIP-30: sharding configuration boilerplate
* update comments
* goimports

* HIP-30: minimum validator commission of 7%

Based on #4495, which must be merged before this PR. This PR should be
rebased with dev after #4495 is merged to retain atomicity of changes by
pull request.

* goimports

* HIP-30: Emission split implementation

Note that the allocated split of the emission goes directly to the
recipient (and not via the Reward). This is because rewards are indexed
by validator and not by delegator, and the recipient may/may not have
any delegations which we can reward. Even if one was guaranteed to
exist, it would mess up the math of the validator.

* set up mainnet recipient of emission split
* HIP-30: Emission split addresses for non mainnet
* HIP-30: deactivate shard 2 and 3 validators
* goimports
* update test
* goimports
* migrate balance uring epoch T - 1
highly untested code. also missing is the ability to generate a
pre-migration report for future verification.
* update test
* export prometheus metric when no error importing preimage
* test account migration in localnet
* add preimages flags to rootflags
* enable preimages on the whitelist
* add the generate method
* fix cropping log
* cropping startpoint when bigger than endpoint
* add support for the rpcblocknumer type
* enable import api
* use earlies block
* debug logs
* fix error catching
* fix error catching
* make end optional for the comand line
* fix cropping logic
* improve error when apply message fails
* add balance on the error
* fix importing
* remove unused imports
* create preimage for genesis block
* fix consensus with No Migration Possible
* use correct header for migration
* process all tx in all block for non shard 0

---------

Co-authored-by: MaxMustermann2 <[email protected]>
Co-authored-by: Diego Nava <[email protected]>
Co-authored-by: Diego Nava <[email protected]>

* create the sate as the insertchain does
* roll back changes
* use the updated state in case there is one
* use the updated state in case there is one
* add testing fmt
* fix getReceipts rpc issue (#4511)
* pass the correct config
* Fixes.
* reduce the block number count
* add verify preimages rpc method
* write preimages on process
* commit preimages
* commit preimages
* verify root hashes after commit
* send metrics on node start
* send the verified preimages
* correct the starting block
* register the verified address
* flush the db every export and verify
* add shard label
* aggregate the recovery multisig reward (#4514)

* 1) Removed unused worker (#4512)
2) Proper error checking
3) Tests for gas 30m

* Improvements of streamsync to deploy on mainnet (#4493)
* add faultRecoveryThreshold to reset stream failures
* increase MaxStreamFailures to let stream be longer in the list
* set Concurrency to 2 for devnet to be same as MinStreams, otherwise it will rewrite MinStreams
* stream sync loop checks for ErrNotEnoughStreamsand waits for enough streams in case there are not enough connected streams in list
* fix fault recovery issue
* improve checkPrerequisites to be able to continue with minimum streams
* refactor fixValues function, put priority on MinStreams rather than Concurrency
* drop remote peer if sending empty blocks array
* goimports to fix build issue
* fix getReceipts array assignments
* fix getReceipts and add tests for it
* fix duplicate function def
* reset devnet and set 30M epoch for all network except mainnet/testnet (#4517)
* enable on devnet hip28 and hip30 together at hip30
* enable 30M epoch for all except mainnet/testnet, update devnet for restart
* remove unused var
* update partner/devnet feature activation
* Remove old devnet/partner instance config
* reduce the epoch time for devnet to 30 min (#4522)
* add GetNodeData tests for stream client, increase nodes and receipts cap (#4519)
* add tests for GetNodeData in stream protocol
* increase client cap for nodes and receipts requests
* use new(big.Int) so we don't modify the epoch value (#4523)
* add hip30 testing for devnet/partner network (#4525)
* enable hip30 epoch for testnet (#4526)
* enable hip30 epoch for testnet
* fix date comment
* set blockgas30M epoch
* enable hip30 and gas30m epoch for mainnet (#4528)
* fix preimage import bugs (#4529)
* fix preimage import bugs
* Fixed lru cache size. (#4535)
* fix decryptRaw issue for nil/empty data (#4532)
* update deprecated ioutil, improve local accounts (#4527)
* make peer connected/disconnected debug log level (#4537)
* Revert improvements. (#4520)
* Updated go lib p2p deps. (#4538)
* Flush data. (#4536)
* Rotation fix and update. (#4516)

---------

Co-authored-by: Konstantin <[email protected]>
Co-authored-by: Max <[email protected]>
Co-authored-by: Nita Neou (Soph) <[email protected]>
Co-authored-by: Soph <[email protected]>
Co-authored-by: Diego Nava <[email protected]>
Co-authored-by: Diego Nava <[email protected]>
Co-authored-by: Gheis Mohammadi <[email protected]>
Co-authored-by: “GheisMohammadi” <[email protected]>
Co-authored-by: Adam Androulidakis <[email protected]>
  • Loading branch information
9 people authored Oct 18, 2023
2 parents 1b9614b + 1633656 commit 2bba333
Show file tree
Hide file tree
Showing 126 changed files with 3,572 additions and 1,102 deletions.
13 changes: 13 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ trace-pointer:
bash ./scripts/go_executable_build.sh -t

debug:
rm -rf .dht-127.0.0.1*
bash ./test/debug.sh

debug-kill:
Expand Down Expand Up @@ -167,3 +168,15 @@ docker:

travis_go_checker:
bash ./scripts/travis_go_checker.sh

travis_rpc_checker:
bash ./scripts/travis_rpc_checker.sh

travis_rosetta_checker:
bash ./scripts/travis_rosetta_checker.sh

debug_external: clean
bash test/debug-external.sh

build_localnet_validator:
bash test/build-localnet-validator.sh
5 changes: 2 additions & 3 deletions accounts/abi/bind/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"crypto/ecdsa"
"errors"
"io"
"io/ioutil"
"math/big"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -44,7 +43,7 @@ var ErrNotAuthorized = errors.New("not authorized to sign this account")
// Deprecated: Use NewTransactorWithChainID instead.
func NewTransactor(keyin io.Reader, passphrase string) (*TransactOpts, error) {
log.Warn("WARNING: NewTransactor has been deprecated in favour of NewTransactorWithChainID")
json, err := ioutil.ReadAll(keyin)
json, err := io.ReadAll(keyin)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -103,7 +102,7 @@ func NewKeyedTransactor(key *ecdsa.PrivateKey) *TransactOpts {
// NewTransactorWithChainID is a utility method to easily create a transaction signer from
// an encrypted json key stream and the associated passphrase.
func NewTransactorWithChainID(keyin io.Reader, passphrase string, chainID *big.Int) (*TransactOpts, error) {
json, err := ioutil.ReadAll(keyin)
json, err := io.ReadAll(keyin)
if err != nil {
return nil, err
}
Expand Down
9 changes: 4 additions & 5 deletions accounts/keystore/account_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package keystore

import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
Expand Down Expand Up @@ -133,11 +132,11 @@ func TestUpdatedKeyfileContents(t *testing.T) {
return
}

// needed so that modTime of `file` is different to its current value after ioutil.WriteFile
// needed so that modTime of `file` is different to its current value after io.WriteFile
time.Sleep(1000 * time.Millisecond)

// Now replace file contents with crap
if err := ioutil.WriteFile(file, []byte("foo"), 0644); err != nil {
if err := os.WriteFile(file, []byte("foo"), 0644); err != nil {
t.Fatal(err)
return
}
Expand All @@ -150,9 +149,9 @@ func TestUpdatedKeyfileContents(t *testing.T) {

// forceCopyFile is like cp.CopyFile, but doesn't complain if the destination exists.
func forceCopyFile(dst, src string) error {
data, err := ioutil.ReadFile(src)
data, err := os.ReadFile(src)
if err != nil {
return err
}
return ioutil.WriteFile(dst, data, 0644)
return os.WriteFile(dst, data, 0644)
}
30 changes: 19 additions & 11 deletions accounts/keystore/file_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package keystore

import (
"io/ioutil"
"io/fs"
"os"
"path/filepath"
"strings"
Expand All @@ -42,7 +42,7 @@ func (fc *fileCache) scan(keyDir string) (mapset.Set, mapset.Set, mapset.Set, er
t0 := time.Now()

// List all the failes from the keystore folder
files, err := ioutil.ReadDir(keyDir)
files, err := os.ReadDir(keyDir)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -63,15 +63,19 @@ func (fc *fileCache) scan(keyDir string) (mapset.Set, mapset.Set, mapset.Set, er
utils.Logger().Debug().Str("path", path).Msg("Ignoring file on account scan")
continue
}
// Gather the set of all and fresly modified files
// Gather the set of all and freshly modified files
all.Add(path)

modified := fi.ModTime()
if modified.After(fc.lastMod) {
mods.Add(path)
}
if modified.After(newLastMod) {
newLastMod = modified
if info, err := fi.Info(); err != nil {
continue
} else {
modified := info.ModTime()
if modified.After(fc.lastMod) {
mods.Add(path)
}
if modified.After(newLastMod) {
newLastMod = modified
}
}
}
t2 := time.Now()
Expand All @@ -94,14 +98,18 @@ func (fc *fileCache) scan(keyDir string) (mapset.Set, mapset.Set, mapset.Set, er
}

// nonKeyFile ignores editor backups, hidden files and folders/symlinks.
func nonKeyFile(fi os.FileInfo) bool {
func nonKeyFile(fi fs.DirEntry) bool {
// Skip editor backups and UNIX-style hidden files.
if strings.HasSuffix(fi.Name(), "~") || strings.HasPrefix(fi.Name(), ".") {
return true
}
// Skip misc special files, directories (yes, symlinks too).
if fi.IsDir() || fi.Mode()&os.ModeType != 0 {
if info, err := fi.Info(); err != nil {
return true
} else {
if fi.IsDir() || info.Mode()&os.ModeType != 0 {
return true
}
}
return false
}
3 changes: 1 addition & 2 deletions accounts/keystore/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -195,7 +194,7 @@ func writeTemporaryKeyFile(file string, content []byte) (string, error) {
}
// Atomic write: create a temporary hidden file first
// then move it into place. TempFile assigns mode 0600.
f, err := ioutil.TempFile(filepath.Dir(file), "."+filepath.Base(file)+".tmp")
f, err := os.CreateTemp(filepath.Dir(file), "."+filepath.Base(file)+".tmp")
if err != nil {
return "", err
}
Expand Down
3 changes: 1 addition & 2 deletions accounts/keystore/keystore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package keystore

import (
"io/ioutil"
"os"
"runtime"
"strings"
Expand Down Expand Up @@ -213,7 +212,7 @@ func TestSignRace(t *testing.T) {
}

func tmpKeyStore(t *testing.T, encrypted bool) (string, *KeyStore) {
d, err := ioutil.TempDir("", "eth-keystore-test")
d, err := os.MkdirTemp("", "eth-keystore-test")
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 1 addition & 2 deletions accounts/keystore/passphrase.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"

Expand Down Expand Up @@ -82,7 +81,7 @@ type keyStorePassphrase struct {

func (ks keyStorePassphrase) GetKey(addr common.Address, filename, auth string) (*Key, error) {
// Load the key from the keystore and decrypt its contents
keyjson, err := ioutil.ReadFile(filename)
keyjson, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions accounts/keystore/passphrase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package keystore

import (
"io/ioutil"
"os"
"testing"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -30,7 +30,7 @@ const (

// Tests that a json key file can be decrypted and encrypted in multiple rounds.
func TestKeyEncryptDecrypt(t *testing.T) {
keyjson, err := ioutil.ReadFile("testdata/very-light-scrypt.json")
keyjson, err := os.ReadFile("testdata/very-light-scrypt.json")
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 1 addition & 2 deletions accounts/keystore/plain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"crypto/rand"
"encoding/hex"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"reflect"
Expand All @@ -32,7 +31,7 @@ import (
)

func tmpKeyStoreIface(t *testing.T, encrypted bool) (dir string, ks keyStore) {
d, err := ioutil.TempDir("", "geth-keystore-test")
d, err := os.MkdirTemp("", "geth-keystore-test")
if err != nil {
t.Fatal(err)
}
Expand Down
11 changes: 5 additions & 6 deletions api/service/legacysync/syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/harmony-one/harmony/internal/chain"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
libp2p_peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
Expand Down Expand Up @@ -932,7 +931,7 @@ func (ss *StateSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain
}

// generateNewState will construct most recent state from downloaded blocks
func (ss *StateSync) generateNewState(bc core.BlockChain, worker *worker.Worker) error {
func (ss *StateSync) generateNewState(bc core.BlockChain) error {
// update blocks created before node start sync
parentHash := bc.CurrentBlock().Hash()

Expand Down Expand Up @@ -995,7 +994,7 @@ func (ss *StateSync) generateNewState(bc core.BlockChain, worker *worker.Worker)
}

// ProcessStateSync processes state sync from the blocks received but not yet processed so far
func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc core.BlockChain, worker *worker.Worker) error {
func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc core.BlockChain) error {
// Gets consensus hashes.
if err := ss.getConsensusHashes(startHash, size); err != nil {
return errors.Wrap(err, "getConsensusHashes")
Expand All @@ -1005,7 +1004,7 @@ func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc core.Blo
if ss.stateSyncTaskQueue.Len() > 0 {
ss.downloadBlocks(bc)
}
return ss.generateNewState(bc, worker)
return ss.generateNewState(bc)
}

func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port string) error {
Expand Down Expand Up @@ -1076,7 +1075,7 @@ func (ss *StateSync) GetMaxPeerHeight() (uint64, error) {
}

// SyncLoop will keep syncing with peers until catches up
func (ss *StateSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) {
func (ss *StateSync) SyncLoop(bc core.BlockChain, isBeacon bool, consensus *consensus.Consensus, loopMinTime time.Duration) {
utils.Logger().Info().Msgf("legacy sync is executing ...")
if !isBeacon {
ss.RegisterNodeInfo()
Expand Down Expand Up @@ -1110,7 +1109,7 @@ func (ss *StateSync) SyncLoop(bc core.BlockChain, worker *worker.Worker, isBeaco
if size > SyncLoopBatchSize {
size = SyncLoopBatchSize
}
err := ss.ProcessStateSync(startHash[:], size, bc, worker)
err := ss.ProcessStateSync(startHash[:], size, bc)
if err != nil {
utils.Logger().Error().Err(err).
Msgf("[SYNC] ProcessStateSync failed (isBeacon: %t, ShardID: %d, otherHeight: %d, currentHeight: %d)",
Expand Down
7 changes: 2 additions & 5 deletions api/service/stagedstreamsync/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ const (
// no more request will be assigned to workers to wait for InsertChain to finish.
SoftQueueCap int = 100

// DefaultConcurrency is the default settings for concurrency
DefaultConcurrency int = 4

// ShortRangeTimeout is the timeout for each short range sync, which allow short range sync
// to restart automatically when stuck in `getBlockHashes`
ShortRangeTimeout time.Duration = 1 * time.Minute
Expand Down Expand Up @@ -74,10 +71,10 @@ type (

func (c *Config) fixValues() {
if c.Concurrency == 0 {
c.Concurrency = DefaultConcurrency
c.Concurrency = c.MinStreams
}
if c.Concurrency > c.MinStreams {
c.MinStreams = c.Concurrency
c.Concurrency = c.MinStreams
}
if c.MinStreams > c.InitStreams {
c.InitStreams = c.MinStreams
Expand Down
21 changes: 17 additions & 4 deletions api/service/stagedstreamsync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,17 @@ func (d *Downloader) SubscribeDownloadFinished(ch chan struct{}) event.Subscript
// waitForBootFinish waits for stream manager to finish the initial discovery and have
// enough peers to start downloader
func (d *Downloader) waitForBootFinish() {
bootCompleted, numStreams := d.waitForEnoughStreams(d.config.InitStreams)
if bootCompleted {
fmt.Printf("boot completed for shard %d ( %d streams are connected )\n",
d.bc.ShardID(), numStreams)
}
}

func (d *Downloader) waitForEnoughStreams(requiredStreams int) (bool, int) {
d.logger.Info().Int("requiredStreams", requiredStreams).
Msg("waiting for enough stream connections to continue syncing")

evtCh := make(chan streammanager.EvtStreamAdded, 1)
sub := d.syncProtocol.SubscribeAddStreamEvent(evtCh)
defer sub.Unsubscribe()
Expand All @@ -177,12 +188,11 @@ func (d *Downloader) waitForBootFinish() {
trigger()

case <-checkCh:
if d.syncProtocol.NumStreams() >= d.config.InitStreams {
fmt.Printf("boot completed for shard %d ( %d streams are connected )\n", d.bc.ShardID(), d.syncProtocol.NumStreams())
return
if d.syncProtocol.NumStreams() >= requiredStreams {
return true, d.syncProtocol.NumStreams()
}
case <-d.closeC:
return
return false, d.syncProtocol.NumStreams()
}
}
}
Expand Down Expand Up @@ -212,6 +222,9 @@ func (d *Downloader) loop() {
case <-d.downloadC:
bnBeforeSync := d.bc.CurrentBlock().NumberU64()
estimatedHeight, addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync)
if err == ErrNotEnoughStreams {
d.waitForEnoughStreams(d.config.MinStreams)
}
if err != nil {
//TODO: if there is a bad block which can't be resolved
if d.stagedSyncInstance.invalidBlock.Active {
Expand Down
2 changes: 1 addition & 1 deletion api/service/stagedstreamsync/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var (
ErrUnexpectedNumberOfBlockHashes = WrapStagedSyncError("unexpected number of getBlocksByHashes result")
ErrUnexpectedBlockHashes = WrapStagedSyncError("unexpected get block hashes result delivered")
ErrNilBlock = WrapStagedSyncError("nil block found")
ErrNotEnoughStreams = WrapStagedSyncError("not enough streams")
ErrNotEnoughStreams = WrapStagedSyncError("number of streams smaller than minimum required")
ErrParseCommitSigAndBitmapFail = WrapStagedSyncError("parse commitSigAndBitmap failed")
ErrVerifyHeaderFail = WrapStagedSyncError("verify header failed")
ErrInsertChainFail = WrapStagedSyncError("insert to chain failed")
Expand Down
5 changes: 5 additions & 0 deletions api/service/stagedstreamsync/short_range_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils"
syncProto "github.com/harmony-one/harmony/p2p/stream/protocols/sync"
sttypes "github.com/harmony-one/harmony/p2p/stream/types"
"github.com/pkg/errors"
Expand Down Expand Up @@ -132,6 +133,10 @@ func (sh *srHelper) getBlocksByHashes(ctx context.Context, hashes []common.Hash,

func (sh *srHelper) checkPrerequisites() error {
if sh.syncProtocol.NumStreams() < sh.config.Concurrency {
utils.Logger().Info().
Int("available streams", sh.syncProtocol.NumStreams()).
Interface("concurrency", sh.config.Concurrency).
Msg("not enough streams to do concurrent processes")
return ErrNotEnoughStreams
}
return nil
Expand Down
Loading

0 comments on commit 2bba333

Please sign in to comment.