@@ -19,6 +19,7 @@ package trie
19
19
import (
20
20
"errors"
21
21
"fmt"
22
+ "sync"
22
23
23
24
"github.com/ethereum/go-ethereum/common"
24
25
"github.com/ethereum/go-ethereum/common/prque"
@@ -98,10 +99,9 @@ func NewSyncPath(path []byte) SyncPath {
98
99
99
100
// nodeRequest represents a scheduled or already in-flight trie node retrieval request.
100
101
type nodeRequest struct {
101
- hash common.Hash // Hash of the trie node to retrieve
102
- path []byte // Merkle path leading to this node for prioritization
103
- data []byte // Data content of the node, cached until all subtrees complete
104
- deletes [][]byte // List of internal path segments for trie nodes to delete
102
+ hash common.Hash // Hash of the trie node to retrieve
103
+ path []byte // Merkle path leading to this node for prioritization
104
+ data []byte // Data content of the node, cached until all subtrees complete
105
105
106
106
parent * nodeRequest // Parent state node referencing this entry
107
107
deps int // Number of dependencies before allowed to commit this node
@@ -128,37 +128,69 @@ type CodeSyncResult struct {
128
128
Data []byte // Data content of the retrieved bytecode
129
129
}
130
130
131
+ // nodeOp represents an operation upon the trie node. It can either represent a
132
+ // deletion to the specific node or a node write for persisting retrieved node.
133
+ type nodeOp struct {
134
+ owner common.Hash // identifier of the trie (empty for account trie)
135
+ path []byte // path from the root to the specified node.
136
+ blob []byte // the content of the node (nil for deletion)
137
+ hash common.Hash // hash of the node content (empty for node deletion)
138
+ }
139
+
140
+ // isDelete indicates if the operation is a database deletion.
141
+ func (op * nodeOp ) isDelete () bool {
142
+ return len (op .blob ) == 0
143
+ }
144
+
131
145
// syncMemBatch is an in-memory buffer of successfully downloaded but not yet
132
146
// persisted data items.
133
147
type syncMemBatch struct {
134
- nodes map [string ][]byte // In-memory membatch of recently completed nodes
135
- hashes map [string ]common.Hash // Hashes of recently completed nodes
136
- deletes map [string ]struct {} // List of paths for trie node to delete
137
- codes map [common.Hash ][]byte // In-memory membatch of recently completed codes
148
+ scheme string // State scheme identifier
149
+ nodes []nodeOp // In-memory batch of recently completed/deleted nodes
150
+ codes map [common.Hash ][]byte // In-memory membatch of recently completed codes
138
151
}
139
152
140
153
// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
141
- func newSyncMemBatch () * syncMemBatch {
154
+ func newSyncMemBatch (scheme string ) * syncMemBatch {
142
155
return & syncMemBatch {
143
- nodes : make (map [string ][]byte ),
144
- hashes : make (map [string ]common.Hash ),
145
- deletes : make (map [string ]struct {}),
146
- codes : make (map [common.Hash ][]byte ),
156
+ scheme : scheme ,
157
+ codes : make (map [common.Hash ][]byte ),
147
158
}
148
159
}
149
160
150
- // hasNode reports the trie node with specific path is already cached.
151
- func (batch * syncMemBatch ) hasNode (path []byte ) bool {
152
- _ , ok := batch .nodes [string (path )]
153
- return ok
154
- }
155
-
156
161
// hasCode reports the contract code with specific hash is already cached.
157
162
func (batch * syncMemBatch ) hasCode (hash common.Hash ) bool {
158
163
_ , ok := batch .codes [hash ]
159
164
return ok
160
165
}
161
166
167
+ // addCode caches a contract code database write operation.
168
+ func (batch * syncMemBatch ) addCode (hash common.Hash , code []byte ) {
169
+ batch .codes [hash ] = code
170
+ }
171
+
172
+ // addNode caches a node database write operation.
173
+ func (batch * syncMemBatch ) addNode (owner common.Hash , path []byte , blob []byte , hash common.Hash ) {
174
+ batch .nodes = append (batch .nodes , nodeOp {
175
+ owner : owner ,
176
+ path : path ,
177
+ blob : blob ,
178
+ hash : hash ,
179
+ })
180
+ }
181
+
182
+ // delNode caches a node database delete operation.
183
+ func (batch * syncMemBatch ) delNode (owner common.Hash , path []byte ) {
184
+ if batch .scheme != rawdb .PathScheme {
185
+ log .Error ("Unexpected node deletion" , "owner" , owner , "path" , path , "scheme" , batch .scheme )
186
+ return // deletion is not supported in hash mode.
187
+ }
188
+ batch .nodes = append (batch .nodes , nodeOp {
189
+ owner : owner ,
190
+ path : path ,
191
+ })
192
+ }
193
+
162
194
// Sync is the main state trie synchronisation scheduler, which provides yet
163
195
// unknown trie hashes to retrieve, accepts node data associated with said hashes
164
196
// and reconstructs the trie step by step until all is done.
@@ -194,7 +226,7 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb
194
226
ts := & Sync {
195
227
scheme : scheme ,
196
228
database : database ,
197
- membatch : newSyncMemBatch (),
229
+ membatch : newSyncMemBatch (scheme ),
198
230
nodeReqs : make (map [string ]* nodeRequest ),
199
231
codeReqs : make (map [common.Hash ]* codeRequest ),
200
232
queue : prque .New (nil ),
@@ -213,16 +245,18 @@ func (s *Sync) AddSubTrie(root common.Hash, path []byte, parent common.Hash, par
213
245
if root == emptyRoot {
214
246
return
215
247
}
216
- if s .membatch .hasNode (path ) {
217
- return
218
- }
219
248
if s .bloom == nil || s .bloom .Contains (root [:]) {
220
249
// Bloom filter says this might be a duplicate, double check.
221
250
// If database says yes, then at least the trie node is present
222
251
// and we hold the assumption that it's NOT legacy contract code.
223
252
owner , inner := ResolvePath (path )
224
- if rawdb .HasTrieNode (s .database , owner , inner , root , s .scheme ) {
253
+ exist , inconsistent := s .hasNode (owner , inner , root )
254
+ if exist {
255
+ // The entire subtrie is already present in the database.
225
256
return
257
+ } else if inconsistent {
258
+ // There is a pre-existing node with the wrong hash in DB, remove it.
259
+ s .membatch .delNode (owner , inner )
226
260
}
227
261
// False positive, bump fault meter
228
262
bloomFaultMeter .Mark (1 )
@@ -382,36 +416,39 @@ func (s *Sync) ProcessNode(result NodeSyncResult) error {
382
416
}
383
417
384
418
// Commit flushes the data stored in the internal membatch out to persistent
385
- // storage, returning any occurred error.
419
+ // storage, returning any occurred error. The whole data set will be flushed
420
+ // in an atomic database batch.
386
421
func (s * Sync ) Commit (dbw ethdb.Batch ) error {
387
- // Flush the pending node writes into database batch.
388
422
var (
389
423
account int
390
424
storage int
391
425
)
392
- for path , value := range s .membatch .nodes {
393
- owner , inner := ResolvePath ([]byte (path ))
394
- if owner == (common.Hash {}) {
395
- account += 1
426
+ // Flush the pending node writes into database batch.
427
+ for _ , op := range s .membatch .nodes {
428
+ if op .isDelete () {
429
+ // node deletion is only supported in path mode.
430
+ if op .owner == (common.Hash {}) {
431
+ rawdb .DeleteAccountTrieNode (dbw , op .path )
432
+ } else {
433
+ rawdb .DeleteStorageTrieNode (dbw , op .owner , op .path )
434
+ }
435
+ deletionGauge .Inc (1 )
396
436
} else {
397
- storage += 1
437
+ if op .owner == (common.Hash {}) {
438
+ account += 1
439
+ } else {
440
+ storage += 1
441
+ }
442
+ rawdb .WriteTrieNode (dbw , op .owner , op .path , op .hash , op .blob , s .scheme )
398
443
}
399
- rawdb .WriteTrieNode (dbw , owner , inner , s .membatch .hashes [path ], value , s .scheme )
400
- hash := s .membatch .hashes [path ]
444
+ hash := op .hash
401
445
if s .bloom != nil {
402
446
s .bloom .Add (hash [:])
403
447
}
404
448
}
405
449
accountNodeSyncedGauge .Inc (int64 (account ))
406
450
storageNodeSyncedGauge .Inc (int64 (storage ))
407
451
408
- // Flush the pending node deletes into the database batch.
409
- // Please note that each written and deleted node has a
410
- // unique path, ensuring no duplication occurs.
411
- for path := range s .membatch .deletes {
412
- owner , inner := ResolvePath ([]byte (path ))
413
- rawdb .DeleteTrieNode (dbw , owner , inner , common.Hash {} /* unused */ , s .scheme )
414
- }
415
452
// Flush the pending code writes into database batch.
416
453
for hash , value := range s .membatch .codes {
417
454
rawdb .WriteCode (dbw , hash , value )
@@ -421,7 +458,7 @@ func (s *Sync) Commit(dbw ethdb.Batch) error {
421
458
}
422
459
codeSyncedGauge .Inc (int64 (len (s .membatch .codes )))
423
460
424
- s .membatch = newSyncMemBatch () // reset the batch
461
+ s .membatch = newSyncMemBatch (s . scheme ) // reset the batch
425
462
return nil
426
463
}
427
464
@@ -489,12 +526,15 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
489
526
// child as invalid. This is essential in the case of path mode
490
527
// scheme; otherwise, state healing might overwrite existing child
491
528
// nodes silently while leaving a dangling parent node within the
492
- // range of this internal path on disk. This would break the
493
- // guarantee for state healing.
529
+ // range of this internal path on disk and the persistent state
530
+ // ends up with a very weird situation that nodes on the same path
531
+ // are not inconsistent while they all present in disk. This property
532
+ // would break the guarantee for state healing.
494
533
//
495
534
// While it's possible for this shortNode to overwrite a previously
496
535
// existing full node, the other branches of the fullNode can be
497
- // retained as they remain untouched and complete.
536
+ // retained as they are not accessible with the new shortNode, and
537
+ // also the whole sub-trie is still untouched and complete.
498
538
//
499
539
// This step is only necessary for path mode, as there is no deletion
500
540
// in hash mode at all.
@@ -511,8 +551,7 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
511
551
exists = rawdb .ExistsStorageTrieNode (s .database , owner , append (inner , key [:i ]... ))
512
552
}
513
553
if exists {
514
- req .deletes = append (req .deletes , key [:i ])
515
- deletionGauge .Inc (1 )
554
+ s .membatch .delNode (owner , append (inner , key [:i ]... ))
516
555
log .Debug ("Detected dangling node" , "owner" , owner , "path" , append (inner , key [:i ]... ))
517
556
}
518
557
}
@@ -532,6 +571,7 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
532
571
}
533
572
// Iterate over the children, and request all unknown ones
534
573
requests := make ([]* nodeRequest , 0 , len (children ))
574
+ var batchMu sync.Mutex
535
575
for _ , child := range children {
536
576
// Notify any external watcher of a new key/value node
537
577
if req .callback != nil {
@@ -548,29 +588,33 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
548
588
}
549
589
}
550
590
}
551
- // If the child references another node, resolve or schedule
591
+ // If the child references another node, resolve or schedule.
592
+ // We check all children concurrently.
552
593
if node , ok := (child .node ).(hashNode ); ok {
553
- // Try to resolve the node from the local database
554
- if s .membatch .hasNode (child .path ) {
555
- continue
556
- }
557
- chash := common .BytesToHash (node )
594
+ path := child .path
595
+ hash := common .BytesToHash (node )
558
596
if s .bloom == nil || s .bloom .Contains (node ) {
559
597
// Bloom filter says this might be a duplicate, double check.
560
598
// If database says yes, then at least the trie node is present
561
599
// and we hold the assumption that it's NOT legacy contract code.
562
- owner , inner := ResolvePath (child .path )
563
- if rawdb .HasTrieNode (s .database , owner , inner , chash , s .scheme ) {
600
+ owner , inner := ResolvePath (path )
601
+ exist , inconsistent := s .hasNode (owner , inner , hash )
602
+ if exist {
564
603
continue
604
+ } else if inconsistent {
605
+ // There is a pre-existing node with the wrong hash in DB, remove it.
606
+ batchMu .Lock ()
607
+ s .membatch .delNode (owner , inner )
608
+ batchMu .Unlock ()
565
609
}
566
610
567
611
// False positive, bump fault meter
568
612
bloomFaultMeter .Mark (1 )
569
613
}
570
614
// Locally unknown node, schedule for retrieval
571
615
requests = append (requests , & nodeRequest {
572
- path : child . path ,
573
- hash : chash ,
616
+ path : path ,
617
+ hash : hash ,
574
618
parent : req ,
575
619
callback : req .callback ,
576
620
})
@@ -584,14 +628,10 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
584
628
// committed themselves.
585
629
func (s * Sync ) commitNodeRequest (req * nodeRequest ) error {
586
630
// Write the node content to the membatch
587
- s . membatch . nodes [ string ( req . path )] = req .data
588
- s .membatch .hashes [ string ( req .path )] = req .hash
631
+ owner , path := ResolvePath ( req .path )
632
+ s .membatch .addNode ( owner , path , req .data , req .hash )
589
633
590
- // Delete the internal nodes which are marked as invalid
591
- for _ , segment := range req .deletes {
592
- path := append (req .path , segment ... )
593
- s .membatch .deletes [string (path )] = struct {}{}
594
- }
634
+ // Removed the completed node request
595
635
delete (s .nodeReqs , string (req .path ))
596
636
s .fetches [len (req .path )]--
597
637
@@ -612,7 +652,9 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error {
612
652
// committed themselves.
613
653
func (s * Sync ) commitCodeRequest (req * codeRequest ) error {
614
654
// Write the node content to the membatch
615
- s .membatch .codes [req .hash ] = req .data
655
+ s .membatch .addCode (req .hash , req .data )
656
+
657
+ // Removed the completed code request
616
658
delete (s .codeReqs , req .hash )
617
659
s .fetches [len (req .path )]--
618
660
@@ -628,6 +670,28 @@ func (s *Sync) commitCodeRequest(req *codeRequest) error {
628
670
return nil
629
671
}
630
672
673
+ // hasNode reports whether the specified trie node is present in the database.
674
+ // 'exists' is true when the node exists in the database and matches the given root
675
+ // hash. The 'inconsistent' return value is true when the node exists but does not
676
+ // match the expected hash.
677
+ func (s * Sync ) hasNode (owner common.Hash , path []byte , hash common.Hash ) (exists bool , inconsistent bool ) {
678
+ // If node is running with hash scheme, check the presence with node hash.
679
+ if s .scheme == rawdb .HashScheme {
680
+ return rawdb .HasLegacyTrieNode (s .database , hash ), false
681
+ }
682
+ // If node is running with path scheme, check the presence with node path.
683
+ var blob []byte
684
+ var dbHash common.Hash
685
+ if owner == (common.Hash {}) {
686
+ blob , dbHash = rawdb .ReadAccountTrieNode (s .database , path )
687
+ } else {
688
+ blob , dbHash = rawdb .ReadStorageTrieNode (s .database , owner , path )
689
+ }
690
+ exists = hash == dbHash
691
+ inconsistent = ! exists && len (blob ) != 0
692
+ return exists , inconsistent
693
+ }
694
+
631
695
// ResolvePath resolves the provided composite node path by separating the
632
696
// path in account trie if it's existent.
633
697
func ResolvePath (path []byte ) (common.Hash , []byte ) {
0 commit comments