Skip to content

Commit

Permalink
experiment: a new sync mode with DISCARD_COMMITMENT (#571)
Browse files Browse the repository at this point in the history
  • Loading branch information
blxdyx authored Dec 12, 2024
1 parent 9a177e1 commit 601a76c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 37 deletions.
20 changes: 13 additions & 7 deletions core/state/rw_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,19 @@ func (rs *StateV3) ApplyState4(ctx context.Context, txTask *TxTask) error {
}

if (txTask.TxNum+1)%rs.domains.StepSize() == 0 /*&& txTask.TxNum > 0 */ {
// We do not update txNum before commitment cuz otherwise committed state will be in the beginning of next file, not in the latest.
// That's why we need to make txnum++ on SeekCommitment to get exact txNum for the latest committed state.
//fmt.Printf("[commitment] running due to txNum reached aggregation step %d\n", txNum/rs.domains.StepSize())
_, err := rs.domains.ComputeCommitment(ctx, true, txTask.BlockNum,
fmt.Sprintf("applying step %d", txTask.TxNum/rs.domains.StepSize()))
if err != nil {
return fmt.Errorf("StateV3.ComputeCommitment: %w", err)
if dbg.DiscardCommitment() {
// Don't apply update just save root hash.
rs.domains.ResetCommitment()
_ = rs.domains.SaveCommitment(txTask.BlockNum, txTask.Header.Root.Bytes())
} else {
// We do not update txNum before commitment cuz otherwise committed state will be in the beginning of next file, not in the latest.
// That's why we need to make txnum++ on SeekCommitment to get exact txNum for the latest committed state.
//fmt.Printf("[commitment] running due to txNum reached aggregation step %d\n", txNum/rs.domains.StepSize())
_, err := rs.domains.ComputeCommitment(ctx, true, txTask.BlockNum,
fmt.Sprintf("applying step %d", txTask.TxNum/rs.domains.StepSize()))
if err != nil {
return fmt.Errorf("StateV3.ComputeCommitment: %w", err)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6
collateAndBuildWorkers: 1,
mergeWorkers: 1,

commitmentValuesTransform: AggregatorSqueezeCommitmentValues,
commitmentValuesTransform: AggregatorSqueezeCommitmentValues && !dbg.DiscardCommitment(),

produce: true,
}
Expand Down
43 changes: 28 additions & 15 deletions erigon-lib/state/domain_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,12 @@ func (sd *SharedDomains) SeekCommitment(ctx context.Context, tx kv.Tx) (txsFromB
if err != nil {
return 0, err
}
if dbg.DiscardCommitment() && bn == 0 {
txn = sd.aggTx.EndTxNumNoCommitment() - 1
sd.SetBlockNum(bn)
sd.SetTxNum(txn)
return 0, nil
}
if ok {
if bn > 0 {
lastBn, _, err := rawdbv3.TxNums.Last(tx)
Expand Down Expand Up @@ -385,6 +391,18 @@ func (sd *SharedDomains) ClearRam(resetCommitment bool) {
sd.estSize = 0
}

func (sd *SharedDomains) ResetCommitment() {
sd.sdCtx.updates.Reset()
}

func (sd *SharedDomains) SaveCommitment(blockNum uint64, rootHash []byte) error {
return sd.sdCtx.storeCommitmentState(blockNum, rootHash)
}

func (sd *SharedDomains) DiscardCommitment() {
sd.sdCtx.discard = dbg.DiscardCommitment()
}

func (sd *SharedDomains) put(domain kv.Domain, key string, val []byte) {
// disable mutex - because work on parallel execution postponed after E3 release.
//sd.muMaps.Lock()
Expand Down Expand Up @@ -907,13 +925,16 @@ func (sd *SharedDomains) Flush(ctx context.Context, tx kv.RwTx) error {
sd.pastChangesAccumulator = make(map[string]*StateChangeSet)

defer mxFlushTook.ObserveDuration(time.Now())
fh, err := sd.ComputeCommitment(ctx, true, sd.BlockNum(), "flush-commitment")
if err != nil {
return err
}
if sd.trace {
_, f, l, _ := runtime.Caller(1)
fmt.Printf("[SD aggTx=%d] FLUSHING at tx %d [%x], caller %s:%d\n", sd.aggTx.id, sd.TxNum(), fh, filepath.Base(f), l)
var err error
if !dbg.DiscardCommitment() {
fh, err := sd.ComputeCommitment(ctx, true, sd.BlockNum(), "flush-commitment")
if err != nil {
return err
}
if sd.trace {
_, f, l, _ := runtime.Caller(1)
fmt.Printf("[SD aggTx=%d] FLUSHING at tx %d [%x], caller %s:%d\n", sd.aggTx.id, sd.TxNum(), fh, filepath.Base(f), l)
}
}
for _, w := range sd.domainWriters {
if w == nil {
Expand Down Expand Up @@ -1119,7 +1140,6 @@ func (sdc *SharedDomainsCommitmentContext) SetLimitReadAsOfTxNum(txNum uint64) {
func NewSharedDomainsCommitmentContext(sd *SharedDomains, mode commitment.Mode, trieVariant commitment.TrieVariant) *SharedDomainsCommitmentContext {
ctx := &SharedDomainsCommitmentContext{
sharedDomains: sd,
discard: dbg.DiscardCommitment(),
branches: make(map[string]cachedBranch),
keccak: sha3.NewLegacyKeccak256().(cryptozerocopy.KeccakState),
}
Expand Down Expand Up @@ -1295,10 +1315,6 @@ func (sdc *SharedDomainsCommitmentContext) TouchKey(d kv.Domain, key string, val

// Evaluates commitment for processed state.
func (sdc *SharedDomainsCommitmentContext) ComputeCommitment(ctx context.Context, saveState bool, blockNum uint64, logPrefix string) (rootHash []byte, err error) {
if dbg.DiscardCommitment() {
sdc.updates.Reset()
return nil, nil
}
sdc.ResetBranchCache()
defer sdc.ResetBranchCache()

Expand Down Expand Up @@ -1399,9 +1415,6 @@ func _decodeTxBlockNums(v []byte) (txNum, blockNum uint64) {
// LatestCommitmentState searches for last encoded state for CommitmentContext.
// Found value does not become current state.
func (sdc *SharedDomainsCommitmentContext) LatestCommitmentState() (blockNum, txNum uint64, state []byte, err error) {
if dbg.DiscardCommitment() {
return 0, 0, nil, nil
}
if sdc.patriciaTrie.Variant() != commitment.VariantHexPatriciaTrie {
return 0, 0, nil, fmt.Errorf("state storing is only supported hex patricia trie")
}
Expand Down
38 changes: 24 additions & 14 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func ExecV3(ctx context.Context,
defer doms.Close()
}
txNumInDB := doms.TxNum()
doms.DiscardCommitment()

txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, cfg.blockReader))

Expand Down Expand Up @@ -687,8 +688,12 @@ Loop:
aggTx := applyTx.(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx)
aggTx.RestrictSubsetFileDeletions(true)
start := time.Now()
if _, err := doms.ComputeCommitment(ctx, true, blockNum, execStage.LogPrefix()); err != nil {
return err
if dbg.DiscardCommitment() {
_ = doms.SaveCommitment(blockNum, b.Root().Bytes())
} else {
if _, err := doms.ComputeCommitment(ctx, true, blockNum, execStage.LogPrefix()); err != nil {
return err
}
}
ts += time.Since(start)
aggTx.RestrictSubsetFileDeletions(false)
Expand Down Expand Up @@ -790,6 +795,7 @@ Loop:
if err != nil {
return err
}
doms.DiscardCommitment()
doms.SetTxNum(inputTxNum)
rs = state.NewStateV3(doms, logger)

Expand Down Expand Up @@ -923,27 +929,31 @@ func flushAndCheckCommitmentV3(ctx context.Context, header *types.Header, applyT
return false, errors.New("header is nil")
}

if dbg.DiscardCommitment() {
return true, nil
}
if doms.BlockNum() != header.Number.Uint64() {
panic(fmt.Errorf("%d != %d", doms.BlockNum(), header.Number.Uint64()))
}

rh, err := doms.ComputeCommitment(ctx, true, header.Number.Uint64(), e.LogPrefix())
if err != nil {
return false, fmt.Errorf("StateV3.Apply: %w", err)
}
if cfg.blockProduction {
header.Root = common.BytesToHash(rh)
return true, nil
var rh []byte
var err error
if dbg.DiscardCommitment() {
doms.ResetCommitment()
_ = doms.SaveCommitment(doms.BlockNum(), header.Root.Bytes())
} else {
rh, err = doms.ComputeCommitment(ctx, true, header.Number.Uint64(), e.LogPrefix())
if err != nil {
return false, fmt.Errorf("StateV3.Apply: %w", err)
}
if cfg.blockProduction {
header.Root = common.BytesToHash(rh)
return true, nil
}
}
if bytes.Equal(rh, header.Root.Bytes()) {
if bytes.Equal(rh, header.Root.Bytes()) || dbg.DiscardCommitment() {
if !inMemExec {
if err := doms.Flush(ctx, applyTx); err != nil {
return false, err
}
if err = applyTx.(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx).PruneCommitHistory(ctx, applyTx, nil); err != nil {
if err := applyTx.(state2.HasAggTx).AggTx().(*state2.AggregatorRoTx).PruneCommitHistory(ctx, applyTx, nil); err != nil {
return false, err
}
}
Expand Down

0 comments on commit 601a76c

Please sign in to comment.