Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

state_object, trie: update storage trie of an account parallel #352

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 79 additions & 4 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,23 @@ import (
"fmt"
"io"
"math/big"
"runtime"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
triePkg "github.com/ethereum/go-ethereum/trie"
)

var emptyCodeHash = crypto.Keccak256(nil)

const skipParallelInsert = 100

type Code []byte

func (c Code) String() string {
Expand Down Expand Up @@ -317,6 +323,61 @@ func (s *stateObject) finalise(prefetch bool) {
}
}

type keyValuePair struct {
key common.Hash
value []byte
}

func (s *stateObject) updateStorage(trie Trie, key common.Hash, value []byte, parallel bool) {
if parallel {
if secureTrie, ok := trie.(*triePkg.SecureTrie); ok {
s.setError(secureTrie.TryUpdateLeaf(key[:], value))
} else {
s.setError(trie.TryUpdate(key[:], value))
}
} else {
s.setError(trie.TryUpdate(key[:], value))
}

s.db.StorageUpdated += 1
}

func (s *stateObject) doUpdateStorage(storageTasks <-chan keyValuePair, trie Trie, wg *sync.WaitGroup) {
defer wg.Done()

for task := range storageTasks {
s.updateStorage(trie, task.key, task.value, true)
}
}

func (s *stateObject) updateStorageParallel(trie Trie, storageMap map[common.Hash][]byte) {
if len(storageMap) > skipParallelInsert {
log.Info("Parallel insert", "num", len(storageMap))

var (
wg sync.WaitGroup
numWorkers = runtime.NumCPU()
storageTasks = make(chan keyValuePair, len(storageMap))
)

for worker := 0; worker < numWorkers; worker++ {
wg.Add(1)
go s.doUpdateStorage(storageTasks, trie, &wg)
}

for key, value := range storageMap {
storageTasks <- keyValuePair{key: key, value: value}
}
close(storageTasks)

wg.Wait()
} else {
for key, value := range storageMap {
s.updateStorage(trie, key, value, false)
}
}
}

// updateTrie writes cached storage modifications into the object's storage trie.
// It will return nil if the trie has not been loaded and no changes have been made
func (s *stateObject) updateTrie(db Database) Trie {
Expand All @@ -329,8 +390,12 @@ func (s *stateObject) updateTrie(db Database) Trie {
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageUpdates += time.Since(start) }(time.Now())
}
// The snapshot storage map for the object
var storage map[common.Hash][]byte
var (
// The snapshot storage map for the object
storage map[common.Hash][]byte

parallelInsertStorage = make(map[common.Hash][]byte)
)
// Insert all the pending updates into the trie
tr := s.getTrie(db)
hasher := s.db.hasher
Expand All @@ -341,6 +406,7 @@ func (s *stateObject) updateTrie(db Database) Trie {
if value == s.originStorage[key] {
continue
}
originValue := s.originStorage[key]
s.originStorage[key] = value

var v []byte
Expand All @@ -350,8 +416,14 @@ func (s *stateObject) updateTrie(db Database) Trie {
} else {
// Encoding []byte cannot fail, ok to ignore the error.
v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:]))
s.setError(tr.TryUpdate(key[:], v))
s.db.StorageUpdated += 1
if originValue == (common.Hash{}) {
s.updateStorage(tr, key, v, false)
} else {
// This change does not insert a new node to trie,
// it just updates the leaf value node. It is possible
// to do these changes parallel to reduce the latency.
parallelInsertStorage[key] = v
}
}
// If state snapshotting is active, cache the data til commit
if s.db.snap != nil {
Expand All @@ -366,6 +438,9 @@ func (s *stateObject) updateTrie(db Database) Trie {
}
usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure
}

s.updateStorageParallel(tr, parallelInsertStorage)

if s.db.prefetcher != nil {
s.db.prefetcher.used(s.data.Root, usedStorage)
}
Expand Down
7 changes: 5 additions & 2 deletions trie/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"
"strings"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
Expand All @@ -36,11 +37,13 @@ type (
fullNode struct {
Children [17]node // Actual trie node data to encode/decode (needs custom encoder)
flags nodeFlag
lock [16]sync.RWMutex
}
shortNode struct {
Key []byte
Val node
flags nodeFlag
lock sync.RWMutex
}
hashNode []byte
valueNode []byte
Expand Down Expand Up @@ -147,13 +150,13 @@ func decodeShort(hash, elems []byte) (node, error) {
if err != nil {
return nil, fmt.Errorf("invalid value node: %v", err)
}
return &shortNode{key, append(valueNode{}, val...), flag}, nil
return &shortNode{Key: key, Val: append(valueNode{}, val...), flags: flag}, nil
}
r, _, err := decodeRef(rest)
if err != nil {
return nil, wrapError(err, "val")
}
return &shortNode{key, r, flag}, nil
return &shortNode{Key: key, Val: r, flags: flag}, nil
}

func decodeFull(hash, elems []byte) (*fullNode, error) {
Expand Down
24 changes: 24 additions & 0 deletions trie/secure_trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"golang.org/x/crypto/sha3"
)

// SecureTrie wraps a trie with key hashing. In a secure trie, all
Expand Down Expand Up @@ -132,6 +134,28 @@ func (t *SecureTrie) TryUpdate(key, value []byte) error {
return nil
}

// hashKeyParallel is the the same as hashKey but it creates a distinct buffer
// for storing the result instead of sharing the result buffer. This allows
// this function to run parallel
func hashKeyParallel(key []byte) []byte {
hash := make([]byte, common.HashLength)

hasher := sha3.NewLegacyKeccak256()
hasher.Write(key)
hasher.(crypto.KeccakState).Read(hash)
return hash
}

// TryUpdateLeaf updates the leaf value node, this function can run parallel
func (t *SecureTrie) TryUpdateLeaf(key, value []byte) error {
hk := hashKeyParallel(key)
err := t.trie.TryUpdateLeaf(hk, value)
if err != nil {
return err
}
return nil
}

// Delete removes any existing value for key from the trie.
func (t *SecureTrie) Delete(key []byte) {
if err := t.TryDelete(key); err != nil {
Expand Down
109 changes: 101 additions & 8 deletions trie/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -65,7 +66,7 @@ type Trie struct {
// Keep track of the number leafs which have been inserted since the last
// hashing operation. This number will not directly map to the number of
// actually unhashed nodes
unhashed int
unhashed int32
}

// newFlag returns the cache flag value for a newly created node.
Expand Down Expand Up @@ -283,6 +284,98 @@ func (t *Trie) TryUpdate(key, value []byte) error {
return nil
}

func (t *Trie) TryUpdateLeaf(key, value []byte) error {
// Other TryUpdateLeaf can run in parallel, so use atomic here
atomic.AddInt32(&t.unhashed, 1)
k := keybytesToHex(key)

return t.updateLeaf(t.root, nil, k, valueNode(value))
}

// resolveChildNode check if the child node is a hash node, if so hold the lock in parent node and
// resolve the hash node.
func (t *Trie) resolveChildNode(childPointer *node, lock *sync.RWMutex, prefix []byte) (node, error) {
var (
childNode node
err error
)

lock.RLock()
childNode = *childPointer
lock.RUnlock()

if _, ok := childNode.(hashNode); ok {
lock.Lock()
childNode = *childPointer
if v, ok := childNode.(hashNode); ok {
childNode, err = t.resolveHash(v, prefix)
if err != nil {
return nil, err
}

*childPointer = childNode
}
lock.Unlock()
}

return childNode, nil
}

// updateLeaf is called in parallel to update the leaf node of the trie
func (t *Trie) updateLeaf(n node, prefix, key []byte, value node) error {
switch n := n.(type) {
case *shortNode:
matchlen := prefixLen(key, n.Key)
if matchlen == len(n.Key) {
if matchlen == len(key) {
// child node is the value node we want to update
n.Val = value
n.flags = t.newFlag()
} else {
childNode, err := t.resolveChildNode(&n.Val, &n.lock, prefix)
if err != nil {
return err
}

if err := t.updateLeaf(childNode, append(prefix, key[:matchlen]...), key[matchlen:], value); err != nil {
return err
}

// This can cause a data race as 2 writers can happen concurrently. However, we write the same
// value in these 2 writers so it must be safe here.
n.flags = t.newFlag()
}
return nil
}

return fmt.Errorf("key does not match, key: %s short node's key: %s", common.Bytes2Hex(key), common.Bytes2Hex(n.Key))

case *fullNode:
if len(key) == 1 {
// child node is the value node we want to update
n.Children[key[0]] = value
n.flags = t.newFlag()
} else {
childNode, err := t.resolveChildNode(&n.Children[key[0]], &n.lock[key[0]], prefix)
if err != nil {
return err
}

if err := t.updateLeaf(childNode, append(prefix, key[0]), key[1:], value); err != nil {
return err
}

// This can cause a data race as 2 writers can happen concurrently. However, we write the same
// value in these 2 writers so it must be safe here.
n.flags = t.newFlag()
}
return nil

default:
return fmt.Errorf("%T: invalid node: %v", n, n)
}
}

func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) {
if len(key) == 0 {
if v, ok := n.(valueNode); ok {
Expand All @@ -300,7 +393,7 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error
if !dirty || err != nil {
return false, n, err
}
return true, &shortNode{n.Key, nn, t.newFlag()}, nil
return true, &shortNode{Key: n.Key, Val: nn, flags: t.newFlag()}, nil
}
// Otherwise branch out at the index where they differ.
branch := &fullNode{flags: t.newFlag()}
Expand All @@ -318,7 +411,7 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error
return true, branch, nil
}
// Otherwise, replace it with a short node leading up to the branch.
return true, &shortNode{key[:matchlen], branch, t.newFlag()}, nil
return true, &shortNode{Key: key[:matchlen], Val: branch, flags: t.newFlag()}, nil

case *fullNode:
dirty, nn, err := t.insert(n.Children[key[0]], append(prefix, key[0]), key[1:], value)
Expand All @@ -331,7 +424,7 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error
return true, n, nil

case nil:
return true, &shortNode{key, value, t.newFlag()}, nil
return true, &shortNode{Key: key, Val: value, flags: t.newFlag()}, nil

case hashNode:
// We've hit a part of the trie that isn't loaded yet. Load
Expand Down Expand Up @@ -401,9 +494,9 @@ func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) {
// always creates a new slice) instead of append to
// avoid modifying n.Key since it might be shared with
// other nodes.
return true, &shortNode{concat(n.Key, child.Key...), child.Val, t.newFlag()}, nil
return true, &shortNode{Key: concat(n.Key, child.Key...), Val: child.Val, flags: t.newFlag()}, nil
default:
return true, &shortNode{n.Key, child, t.newFlag()}, nil
return true, &shortNode{Key: n.Key, Val: child, flags: t.newFlag()}, nil
}

case *fullNode:
Expand Down Expand Up @@ -457,12 +550,12 @@ func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) {
}
if cnode, ok := cnode.(*shortNode); ok {
k := append([]byte{byte(pos)}, cnode.Key...)
return true, &shortNode{k, cnode.Val, t.newFlag()}, nil
return true, &shortNode{Key: k, Val: cnode.Val, flags: t.newFlag()}, nil
}
}
// Otherwise, n is replaced by a one-nibble short node
// containing the child.
return true, &shortNode{[]byte{byte(pos)}, n.Children[pos], t.newFlag()}, nil
return true, &shortNode{Key: []byte{byte(pos)}, Val: n.Children[pos], flags: t.newFlag()}, nil
}
// n still contains at least two values and cannot be reduced.
return true, n, nil
Expand Down