From ea230062a2cc21aa22bb2c3eb9e8d0c0f7e7bf6c Mon Sep 17 00:00:00 2001 From: Bui Quang Minh Date: Mon, 7 Oct 2024 17:59:06 +0700 Subject: [PATCH 1/3] trie: parallel insert trie when root node is a full node MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This creates a new TryBatchInsert function in trie. All key-value insertion pairs are passed together. If the resolved root node is a full node, the insertion pairs are partitioned by the first byte in their keys. Children subtries of the full node are inserted in parallel. If the resolved root node is not a full node, the insertion pairs are inserted sequentially. Result comparing BenchmarkNormalInsert to BenchmarkBatchInsert goos: linux goarch: amd64 pkg: github.com/ethereum/go-ethereum/trie cpu: 11th Gen Intel(R) Core(TM) i7-1165G7 @ 2.80GHz │ normal.txt │ batch.txt │ │ sec/op │ sec/op vs base │ NormalInsert-8 6.823m ± 7% 3.716m ± 18% -45.54% (p=0.000 n=10) --- trie/secure_trie.go | 22 +++++++++ trie/trie.go | 82 ++++++++++++++++++++++++++++++++ trie/trie_test.go | 111 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 215 insertions(+) diff --git a/trie/secure_trie.go b/trie/secure_trie.go index 18be12d34a..12f8431cbb 100644 --- a/trie/secure_trie.go +++ b/trie/secure_trie.go @@ -132,6 +132,28 @@ func (t *SecureTrie) TryUpdate(key, value []byte) error { return nil } +// TryBatchInsert batches multiple insert together. +func (t *SecureTrie) TryBatchInsert(keys, values [][]byte) error { + hashKeys := make([][]byte, 0, len(keys)) + for i := range keys { + hk := t.hashKey(keys[i]) + // t.hashKey does not return a new slice but an shared internal slice, + // so we must copy here + hashKeys = append(hashKeys, common.CopyBytes(hk)) + } + + err := t.trie.TryBatchInsert(hashKeys, values) + if err != nil { + return err + } + + for i, hashKey := range hashKeys { + t.getSecKeyCache()[string(hashKey)] = common.CopyBytes(keys[i]) + } + + return nil +} + // Delete removes any existing value for key from the trie. func (t *SecureTrie) Delete(key []byte) { if err := t.TryDelete(key); err != nil { diff --git a/trie/trie.go b/trie/trie.go index 13343112b8..55bdfadfe7 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -283,6 +283,88 @@ func (t *Trie) TryUpdate(key, value []byte) error { return nil } +// TryBatchInsert batches multiple insert together. +// +// When the root node after resolving is a fullnode, TryBatchInsert will split +// the key, value list based on the first byte of key and spawn multiple +// goroutines to insert these lists parallel. +func (t *Trie) TryBatchInsert(keys, values [][]byte) error { + t.unhashed += len(keys) + + var ( + resolvedNode node = t.root + err error + ) + if node, ok := t.root.(hashNode); ok { + resolvedNode, err = t.resolveHash(node, nil) + if err != nil { + return err + } + } + + if fnode, ok := resolvedNode.(*fullNode); ok { + type insertTask struct { + key []byte + value []byte + } + + var insertTasks [17][]insertTask + for i := range keys { + k := keybytesToHex(keys[i]) + insertTasks[k[0]] = append(insertTasks[k[0]], insertTask{ + key: k, + value: values[i], + }) + } + + var ( + wg sync.WaitGroup + returnedNodes [17]node + errors [17]error + ) + wg.Add(17) + for i, tasks := range insertTasks { + go func(index int, tasks []insertTask) { + defer wg.Done() + + var err error + taskNode := fnode.Children[index] + for _, task := range tasks { + _, taskNode, err = t.insert(taskNode, []byte{byte(index)}, task.key[1:], valueNode(task.value)) + if err != nil { + break + } + } + + errors[index] = err + returnedNodes[index] = taskNode + }(i, tasks) + } + + wg.Wait() + for _, err := range errors { + if err != nil { + return err + } + } + var newFullNode fullNode + copy(newFullNode.Children[:], returnedNodes[:]) + newFullNode.flags = t.newFlag() + t.root = &newFullNode + } else { + for i := range keys { + k := keybytesToHex(keys[i]) + _, n, err := t.insert(t.root, nil, k, valueNode(values[i])) + if err != nil { + return err + } + t.root = n + } + } + + return nil +} + func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) { if len(key) == 0 { if v, ok := n.(valueNode); ok { diff --git a/trie/trie_test.go b/trie/trie_test.go index 806a8cc634..6df5a38b81 100644 --- a/trie/trie_test.go +++ b/trie/trie_test.go @@ -182,6 +182,79 @@ func TestInsert(t *testing.T) { } } +func TestBatchUpdate(t *testing.T) { + trie := newEmpty() + + trie.TryBatchInsert([][]byte{[]byte("doe")}, [][]byte{[]byte("reindeer")}) + trie.TryBatchInsert( + [][]byte{[]byte("dog"), []byte("dogglesworth")}, + [][]byte{[]byte("puppy"), []byte("cat")}, + ) + + exp := common.HexToHash("8aad789dff2f538bca5d8ea56e8abe10f4c7ba3a5dea95fea4cd6e7c3a1168d3") + root := trie.Hash() + if root != exp { + t.Errorf("case 1: exp %x got %x", exp, root) + } + + trie = newEmpty() + // Make root node a fullnode + trie.TryBatchInsert( + [][]byte{[]byte("doe"), []byte("cat")}, + [][]byte{[]byte("reindeer"), []byte("reindeer")}, + ) + + trie.TryBatchInsert( + [][]byte{[]byte("wolf"), []byte("dog"), []byte("dogglesworth"), []byte("mouse")}, + [][]byte{[]byte("reindeer"), []byte("reindeer"), []byte("cat"), []byte("reindeer")}, + ) + + exp = common.HexToHash("96baa01a6376b285252d202de862d889ca05de308ed1c68cf442ffc4b9036988") + root = trie.Hash() + if root != exp { + t.Errorf("case 2: exp %x got %x", exp, root) + } +} + +func FuzzBatchUpdate(f *testing.F) { + // Every 64-byte chunk will be use as a key-value pair + // Key is the first 32 bytes, value is the remainning 32 bytes + f.Add([]byte("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB")) + input := append( + []byte("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"), + []byte("CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB")..., + ) + input = append(input, []byte("DAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB")...) + f.Add(input) + f.Fuzz(func(t *testing.T, fuzzInput []byte) { + if len(fuzzInput) <= 64 || len(fuzzInput)%64 != 0 { + return + } + + keys := make([][]byte, 0) + values := make([][]byte, 0) + for i := 0; i < len(fuzzInput); i += 64 { + keys = append(keys, fuzzInput[i:i+32]) + values = append(values, fuzzInput[i+32:i+64]) + } + + trie := newEmpty() + for i := range keys { + trie.Update(keys[i], values[i]) + } + + expectedHash := trie.Hash() + + trie = newEmpty() + trie.TryBatchInsert(keys, values) + gotHash := trie.Hash() + + if gotHash != expectedHash { + t.Fatalf("Trie hash mismatches, exp: %s got %s", expectedHash, gotHash) + } + }) +} + func TestGet(t *testing.T) { trie := newEmpty() updateString(trie, "doe", "reindeer") @@ -519,6 +592,44 @@ func benchUpdate(b *testing.B, e binary.ByteOrder) *Trie { return trie } +func benchmarkManyInserts(b *testing.B, f func(trie *Trie, keys [][]byte, values [][]byte)) { + numUpdates := 10000 + + k := make([]byte, 32) + + keys := make([][]byte, 0, numUpdates) + for i := 0; i < numUpdates; i++ { + binary.BigEndian.PutUint64(k, uint64(i)) + hashKey := crypto.Keccak256Hash(k) + keys = append(keys, hashKey[:]) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + trie := newEmpty() + // Make trie root a fullnode + updateString(trie, "AAA", "BBB") + updateString(trie, "ZAA", "BBB") + b.StartTimer() + f(trie, keys, keys) + } +} + +func BenchmarkNormalInsert(b *testing.B) { + benchmarkManyInserts(b, func(trie *Trie, keys, values [][]byte) { + for i := range keys { + trie.Update(keys[i], values[i]) + } + }) +} + +func BenchmarkBatchInsert(b *testing.B) { + benchmarkManyInserts(b, func(trie *Trie, keys, values [][]byte) { + trie.TryBatchInsert(keys, values) + }) +} + // Benchmarks the trie hashing. Since the trie caches the result of any operation, // we cannot use b.N as the number of hashing rouns, since all rounds apart from // the first one will be NOOP. As such, we'll use b.N as the number of account to From 6896d7c338f5586a6ed85e6ced7edafa4ca6d310 Mon Sep 17 00:00:00 2001 From: Bui Quang Minh Date: Tue, 8 Oct 2024 14:18:48 +0700 Subject: [PATCH 2/3] core/state: use parallel trie insert to update storage trie When there are more than parallelInsertThreshold (currently set to 500) pending storages update to storage trie, we will use the new TryBatchInsert to parallel insert these storages if possible. --- core/blockchain_test.go | 87 ++++++++++++++++++++++++++++++++++++++ core/state/state_object.go | 24 ++++++++++- 2 files changed, 110 insertions(+), 1 deletion(-) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 804919abd2..6359bb6873 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -17,6 +17,7 @@ package core import ( + "encoding/binary" "errors" "fmt" "io/ioutil" @@ -4312,3 +4313,89 @@ func TestSidecarsPruning(t *testing.T) { } } } + +func TestBlockChain_2000StorageUpdate(t *testing.T) { + var ( + numTxs = 2000 + signer = types.HomesteadSigner{} + testBankKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey) + bankFunds = big.NewInt(100000000000000000) + contractAddress = common.HexToAddress("0x1234") + gspec = Genesis{ + Config: params.TestChainConfig, + Alloc: GenesisAlloc{ + testBankAddress: {Balance: bankFunds}, + contractAddress: { + Nonce: 1, + Balance: common.Big0, + // Store 1 into slot passed by calldata + Code: []byte{ + byte(vm.PUSH0), + byte(vm.CALLDATALOAD), + byte(vm.PUSH1), + byte(0x1), + byte(vm.SWAP1), + byte(vm.SSTORE), + byte(vm.STOP), + }, + Storage: make(map[common.Hash]common.Hash), + }, + }, + GasLimit: 100e6, // 100 M + } + ) + + for i := 0; i < 1000; i++ { + gspec.Alloc[contractAddress].Storage[common.BigToHash(big.NewInt(int64(i)))] = common.BigToHash(big.NewInt(0x100)) + } + + // Generate the original common chain segment and the two competing forks + engine := ethash.NewFaker() + db := rawdb.NewMemoryDatabase() + genesis := gspec.MustCommit(db) + + blockGenerator := func(i int, block *BlockGen) { + block.SetCoinbase(common.Address{1}) + for txi := 0; txi < numTxs; txi++ { + var calldata [32]byte + binary.BigEndian.PutUint64(calldata[:], uint64(txi)) + tx, err := types.SignTx( + types.NewTransaction(uint64(txi), contractAddress, common.Big0, 100_000, + block.header.BaseFee, calldata[:]), + signer, + testBankKey) + if err != nil { + t.Error(err) + } + block.AddTx(tx) + } + } + + shared, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 1, blockGenerator, true) + err := os.Mkdir("./pebble", 0775) + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll("./pebble") + // Import the shared chain and the original canonical one + diskdb, err := rawdb.NewPebbleDBDatabase("./pebble", 1024, 500000, "", false, false) + if err != nil { + t.Fatal(err) + } + defer diskdb.Close() + gspec.MustCommit(diskdb) + + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil) + if err != nil { + t.Fatalf("failed to create tester chain: %v", err) + } + if _, err := chain.InsertChain(shared, nil); err != nil { + t.Fatalf("failed to insert shared chain: %v", err) + } + + blockHash := chain.CurrentBlock().Hash() + if blockHash != (common.HexToHash("0x684f656efba5a77f0e8b4c768a2b3479b28250fd7b81dbb9a888abf6180b01bd")) { + t.Fatalf("Block hash mismatches, exp %s got %s", common.Hash{}, blockHash) + } +} diff --git a/core/state/state_object.go b/core/state/state_object.go index 22d90b8420..7aa4d7a556 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -28,10 +28,13 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" ) var emptyCodeHash = crypto.Keccak256(nil) +const ParallelInsertThreshold = 500 + type Code []byte func (c Code) String() string { @@ -338,6 +341,16 @@ func (s *stateObject) updateTrie(db Database) Trie { tr := s.getTrie(db) hasher := s.db.hasher + var ( + parallelInsert, ok bool + secureTrie *trie.SecureTrie + keys, values [][]byte + ) + if len(s.pendingStorage) > ParallelInsertThreshold { + if secureTrie, ok = tr.(*trie.SecureTrie); ok { + parallelInsert = true + } + } usedStorage := make([][]byte, 0, len(s.pendingStorage)) for key, value := range s.pendingStorage { // Skip noop changes, persist actual changes @@ -353,8 +366,14 @@ func (s *stateObject) updateTrie(db Database) Trie { } else { // Encoding []byte cannot fail, ok to ignore the error. v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) - s.setError(tr.TryUpdate(key[:], v)) s.db.StorageUpdated += 1 + if parallelInsert { + key := key + keys = append(keys, key[:]) + values = append(values, v) + } else { + s.setError(tr.TryUpdate(key[:], v)) + } } // If state snapshotting is active, cache the data til commit if s.db.snap != nil { @@ -369,6 +388,9 @@ func (s *stateObject) updateTrie(db Database) Trie { } usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure } + if parallelInsert && len(keys) > 0 { + s.setError(secureTrie.TryBatchInsert(keys, values)) + } if s.db.prefetcher != nil { s.db.prefetcher.used(s.data.Root, usedStorage) } From a7952056361db63504ec59424934ca0395b57b68 Mon Sep 17 00:00:00 2001 From: Bui Quang Minh Date: Thu, 10 Oct 2024 15:43:23 +0700 Subject: [PATCH 3/3] core: add many storage updates benchmark This benchmark is intended to be used with mainnet data. The benchmark includes many storage updates to Axie contract's storage. --- core/blockchain_test.go | 83 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 6359bb6873..1e84868e9a 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -24,6 +24,7 @@ import ( "math/big" "math/rand" "os" + "os/exec" "sync" "testing" "time" @@ -41,6 +42,7 @@ import ( "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" @@ -4399,3 +4401,84 @@ func TestBlockChain_2000StorageUpdate(t *testing.T) { t.Fatalf("Block hash mismatches, exp %s got %s", common.Hash{}, blockHash) } } + +// This benchmark is intended to be used with mainnet data, so mainnet chaindata's directory +// is needed to run this benchmark +func BenchmarkManyStorageUpdate(b *testing.B) { + const ( + // Fill the chaindata's parent directory + datadir = "" + numInsert = state.ParallelInsertThreshold + 1 + ) + + var ( + diskdb ethdb.Database + err error + axieContract = common.HexToAddress("0x32950db2a7164ae833121501c797d79e7b79d74c") + value = common.HexToHash("0x11") + ) + defer func() { + if diskdb != nil { + diskdb.Close() + cmd := exec.Command("../script/overlayfs_chaindata.sh", "-d", datadir, "-c") + if err := cmd.Run(); err != nil { + b.Fatal(err) + } + } + }() + + keys := make([]common.Hash, 0, numInsert) + for i := 0; i < numInsert; i++ { + hash := crypto.Keccak256Hash(big.NewInt(int64(i)).Bytes()) + keys = append(keys, hash) + } + + b.StopTimer() + b.ResetTimer() + for i := 0; i < b.N; i++ { + cmd := exec.Command("../script/overlayfs_chaindata.sh", "-d", datadir) + if err := cmd.Run(); err != nil { + b.Fatal(err) + } + + diskdb, err = rawdb.NewPebbleDBDatabase(datadir+"/chaindata", 1024, 500000, "", false, false) + if err != nil { + b.Fatal(err) + } + + engine := ethash.NewFaker() + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil) + if err != nil { + b.Fatalf("failed to create tester chain: %v", err) + } + headBlock := chain.CurrentBlock() + + database := state.NewDatabase(diskdb) + snapshot, err := snapshot.New(diskdb, database.TrieDB(), 256, headBlock.Root(), true, true, false) + if err != nil { + b.Fatal(err) + } + + statedb, err := state.New(headBlock.Root(), database, snapshot) + if err != nil { + b.Fatal(err) + } + + b.StartTimer() + for i := 0; i < numInsert; i++ { + statedb.SetState(axieContract, keys[i], value) + } + _, err = statedb.Commit(true) + if err != nil { + b.Fatal(err) + } + b.StopTimer() + + diskdb.Close() + cmd = exec.Command("../script/overlayfs_chaindata.sh", "-d", datadir, "-c") + if err := cmd.Run(); err != nil { + b.Fatal(err) + } + diskdb = nil + } +}