Skip to content

Commit

Permalink
Add onlineaccounts and onlineroundparamstail to catchpoint files
Browse files Browse the repository at this point in the history
  • Loading branch information
cce committed Nov 25, 2024
1 parent 0bc3d7e commit d775659
Show file tree
Hide file tree
Showing 32 changed files with 1,679 additions and 240 deletions.
4 changes: 2 additions & 2 deletions catchup/catchpointService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func (m *catchpointCatchupAccessorMock) Ledger() (l ledger.CatchupAccessorClient
}

// GetVerifyData returns the balances hash, spver hash and totals used by VerifyCatchpoint
func (m *catchpointCatchupAccessorMock) GetVerifyData(ctx context.Context) (balancesHash crypto.Digest, spverHash crypto.Digest, totals ledgercore.AccountTotals, err error) {
return crypto.Digest{}, crypto.Digest{}, ledgercore.AccountTotals{}, nil
func (m *catchpointCatchupAccessorMock) GetVerifyData(ctx context.Context) (balancesHash, spverHash, onlineAccountsHash, onlineRoundParamsHash crypto.Digest, totals ledgercore.AccountTotals, err error) {
return crypto.Digest{}, crypto.Digest{}, crypto.Digest{}, crypto.Digest{}, ledgercore.AccountTotals{}, nil
}

// TestCatchpointServicePeerRank ensures CatchpointService does not crash when a block fetched
Expand Down
7 changes: 4 additions & 3 deletions cmd/catchpointdump/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,13 @@ func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.Catc
if err != nil {
return fileHeader, err
}
var balanceHash, spverHash crypto.Digest
balanceHash, spverHash, _, err = catchupAccessor.GetVerifyData(ctx)
var balanceHash, spverHash, onlineAccountsHash, onlineRoundParamsHash crypto.Digest
balanceHash, spverHash, onlineAccountsHash, onlineRoundParamsHash, _, err = catchupAccessor.GetVerifyData(ctx)

Check warning on line 218 in cmd/catchpointdump/file.go

View check run for this annotation

Codecov / codecov/patch

cmd/catchpointdump/file.go#L217-L218

Added lines #L217 - L218 were not covered by tests
if err != nil {
return fileHeader, err
}
fmt.Printf("accounts digest=%s, spver digest=%s\n\n", balanceHash, spverHash)
fmt.Printf("accounts digest=%s, spver digest=%s, onlineaccounts digest=%s onlineroundparams digest=%s\n\n",
balanceHash, spverHash, onlineAccountsHash, onlineRoundParamsHash)

Check warning on line 223 in cmd/catchpointdump/file.go

View check run for this annotation

Codecov / codecov/patch

cmd/catchpointdump/file.go#L222-L223

Added lines #L222 - L223 were not covered by tests
}
return fileHeader, nil
}
Expand Down
4 changes: 2 additions & 2 deletions components/mocks/mockCatchpointCatchupAccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func (m *MockCatchpointCatchupAccessor) GetCatchupBlockRound(ctx context.Context
}

// GetVerifyData returns the balances hash, spver hash and totals used by VerifyCatchpoint
func (m *MockCatchpointCatchupAccessor) GetVerifyData(ctx context.Context) (balancesHash crypto.Digest, spverHash crypto.Digest, totals ledgercore.AccountTotals, err error) {
return crypto.Digest{}, crypto.Digest{}, ledgercore.AccountTotals{}, nil
func (m *MockCatchpointCatchupAccessor) GetVerifyData(ctx context.Context) (balancesHash, spverHash, onlineAccountsHash, onlineRoundParams crypto.Digest, totals ledgercore.AccountTotals, err error) {
return crypto.Digest{}, crypto.Digest{}, crypto.Digest{}, crypto.Digest{}, ledgercore.AccountTotals{}, nil

Check warning on line 75 in components/mocks/mockCatchpointCatchupAccessor.go

View check run for this annotation

Codecov / codecov/patch

components/mocks/mockCatchpointCatchupAccessor.go#L74-L75

Added lines #L74 - L75 were not covered by tests
}

// VerifyCatchpoint verifies that the catchpoint is valid by reconstructing the label.
Expand Down
6 changes: 6 additions & 0 deletions config/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,10 @@ type ConsensusParams struct {
// Version 7 includes state proof verification contexts
EnableCatchpointsWithSPContexts bool

// EnableCatchpointsWithOnlineAccounts specifies when to enable version 8 catchpoints.
// Version 8 includes onlineaccounts and onlineroundparams amounts, for historical stake lookups.
EnableCatchpointsWithOnlineAccounts bool

// AppForbidLowResources enforces a rule that prevents apps from accessing
// asas and apps below 256, in an effort to decrease the ambiguity of
// opcodes that accept IDs or slot indexes. Simultaneously, the first ID
Expand Down Expand Up @@ -1532,6 +1536,8 @@ func initConsensusProtocols() {
// 2.9 sec rounds gives about 10.8M rounds per year.
vFuture.Bonus.DecayInterval = 250_000 // .99^(10.8/0.25) ~ .648. So 35% decay per year

vFuture.EnableCatchpointsWithOnlineAccounts = true

Consensus[protocol.ConsensusFuture] = vFuture

// vAlphaX versions are an separate series of consensus parameters and versions for alphanet
Expand Down
20 changes: 11 additions & 9 deletions ledger/catchpointfileheader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import (
type CatchpointFileHeader struct {
_struct struct{} `codec:",omitempty,omitemptyarray"`

Version uint64 `codec:"version"`
BalancesRound basics.Round `codec:"balancesRound"`
BlocksRound basics.Round `codec:"blocksRound"`
Totals ledgercore.AccountTotals `codec:"accountTotals"`
TotalAccounts uint64 `codec:"accountsCount"`
TotalChunks uint64 `codec:"chunksCount"`
TotalKVs uint64 `codec:"kvsCount"`
Catchpoint string `codec:"catchpoint"`
BlockHeaderDigest crypto.Digest `codec:"blockHeaderDigest"`
Version uint64 `codec:"version"`
BalancesRound basics.Round `codec:"balancesRound"`
BlocksRound basics.Round `codec:"blocksRound"`
Totals ledgercore.AccountTotals `codec:"accountTotals"`
TotalAccounts uint64 `codec:"accountsCount"`
TotalChunks uint64 `codec:"chunksCount"`
TotalKVs uint64 `codec:"kvsCount"`
TotalOnlineAccounts uint64 `codec:"onlineAccountsCount"`
TotalOnlineRoundParams uint64 `codec:"onlineRoundParamsCount"`
Catchpoint string `codec:"catchpoint"`
BlockHeaderDigest crypto.Digest `codec:"blockHeaderDigest"`
}
197 changes: 141 additions & 56 deletions ledger/catchpointfilewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,33 +51,29 @@ const (
// the writing is complete. It might take multiple steps until the operation is over, and the caller
// has the option of throttling the CPU utilization in between the calls.
type catchpointFileWriter struct {
ctx context.Context
tx trackerdb.SnapshotScope
filePath string
totalAccounts uint64
totalKVs uint64
file *os.File
tar *tar.Writer
compressor io.WriteCloser
chunk catchpointFileChunkV6
chunkNum uint64
writtenBytes int64
biggestChunkLen uint64
accountsIterator accountsBatchIter
maxResourcesPerChunk int
accountsDone bool
kvRows kvIter
}

type kvIter interface {
Next() bool
KeyValue() ([]byte, []byte, error)
Close()
}

type accountsBatchIter interface {
Next(ctx context.Context, accountCount int, resourceCount int) ([]encoded.BalanceRecordV6, uint64, error)
Close()
ctx context.Context
tx trackerdb.SnapshotScope
filePath string
totalAccounts uint64
totalKVs uint64
totalOnlineAccounts uint64
totalOnlineRoundParams uint64
file *os.File
tar *tar.Writer
compressor io.WriteCloser
chunk catchpointFileChunkV6
chunkNum uint64
writtenBytes int64
biggestChunkLen uint64
accountsIterator trackerdb.EncodedAccountsBatchIter
maxResourcesPerChunk int
accountsDone bool
kvRows trackerdb.KVsIter
kvDone bool
onlineAccountRows trackerdb.TableIterator[*encoded.OnlineAccountRecordV6]
onlineAccountsDone bool
onlineRoundParamsRows trackerdb.TableIterator[*encoded.OnlineRoundParamsRecordV6]
onlineRoundParamsDone bool
}

type catchpointFileBalancesChunkV5 struct {
Expand All @@ -88,13 +84,15 @@ type catchpointFileBalancesChunkV5 struct {
type catchpointFileChunkV6 struct {
_struct struct{} `codec:",omitempty,omitemptyarray"`

Balances []encoded.BalanceRecordV6 `codec:"bl,allocbound=BalancesPerCatchpointFileChunk"`
numAccounts uint64
KVs []encoded.KVRecordV6 `codec:"kv,allocbound=BalancesPerCatchpointFileChunk"`
Balances []encoded.BalanceRecordV6 `codec:"bl,allocbound=BalancesPerCatchpointFileChunk"`
numAccounts uint64
KVs []encoded.KVRecordV6 `codec:"kv,allocbound=BalancesPerCatchpointFileChunk"`
OnlineAccounts []encoded.OnlineAccountRecordV6 `codec:"oa,allocbound=BalancesPerCatchpointFileChunk"`
OnlineRoundParams []encoded.OnlineRoundParamsRecordV6 `codec:"orp,allocbound=BalancesPerCatchpointFileChunk"`
}

func (chunk catchpointFileChunkV6) empty() bool {
return len(chunk.Balances) == 0 && len(chunk.KVs) == 0
return len(chunk.Balances) == 0 && len(chunk.KVs) == 0 && len(chunk.OnlineAccounts) == 0 && len(chunk.OnlineRoundParams) == 0
}

type catchpointStateProofVerificationContext struct {
Expand Down Expand Up @@ -122,6 +120,16 @@ func makeCatchpointFileWriter(ctx context.Context, filePath string, tx trackerdb
return nil, err
}

totalOnlineAccounts, err := aw.TotalOnlineAccountRows(ctx)
if err != nil {
return nil, err

Check warning on line 125 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L125

Added line #L125 was not covered by tests
}

totalOnlineRoundParams, err := aw.TotalOnlineRoundParams(ctx)
if err != nil {
return nil, err

Check warning on line 130 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L130

Added line #L130 was not covered by tests
}

err = os.MkdirAll(filepath.Dir(filePath), 0700)
if err != nil {
return nil, err
Expand All @@ -137,16 +145,18 @@ func makeCatchpointFileWriter(ctx context.Context, filePath string, tx trackerdb
tar := tar.NewWriter(compressor)

res := &catchpointFileWriter{
ctx: ctx,
tx: tx,
filePath: filePath,
totalAccounts: totalAccounts,
totalKVs: totalKVs,
file: file,
compressor: compressor,
tar: tar,
accountsIterator: tx.MakeEncodedAccoutsBatchIter(),
maxResourcesPerChunk: maxResourcesPerChunk,
ctx: ctx,
tx: tx,
filePath: filePath,
totalAccounts: totalAccounts,
totalKVs: totalKVs,
totalOnlineAccounts: totalOnlineAccounts,
totalOnlineRoundParams: totalOnlineRoundParams,
file: file,
compressor: compressor,
tar: tar,
accountsIterator: tx.MakeEncodedAccountsBatchIter(),
maxResourcesPerChunk: maxResourcesPerChunk,
}
return res, nil
}
Expand Down Expand Up @@ -233,6 +243,14 @@ func (cw *catchpointFileWriter) FileWriteStep(stepCtx context.Context) (more boo
cw.kvRows.Close()
cw.kvRows = nil
}
if cw.onlineAccountRows != nil {
cw.onlineAccountRows.Close()
cw.onlineAccountRows = nil
}
if cw.onlineRoundParamsRows != nil {
cw.onlineRoundParamsRows.Close()
cw.onlineRoundParamsRows = nil
}
}
}()

Expand Down Expand Up @@ -323,27 +341,94 @@ func (cw *catchpointFileWriter) readDatabaseStep(ctx context.Context) error {
cw.accountsDone = true
}

// Create the *Rows iterator JIT
if cw.kvRows == nil {
rows, err := cw.tx.MakeKVsIter(ctx)
if err != nil {
return err
// Create the kvRows iterator JIT
if !cw.kvDone {
if cw.kvRows == nil {
rows, err := cw.tx.MakeKVsIter(ctx)
if err != nil {
return err

Check warning on line 349 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L349

Added line #L349 was not covered by tests
}
cw.kvRows = rows
}

kvrs := make([]encoded.KVRecordV6, 0, BalancesPerCatchpointFileChunk)
for cw.kvRows.Next() {
k, v, err := cw.kvRows.KeyValue()
if err != nil {
return err

Check warning on line 358 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L358

Added line #L358 was not covered by tests
}
kvrs = append(kvrs, encoded.KVRecordV6{Key: k, Value: v})
if len(kvrs) == BalancesPerCatchpointFileChunk {
break

Check warning on line 362 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L362

Added line #L362 was not covered by tests
}
}
if len(kvrs) > 0 {
cw.chunk = catchpointFileChunkV6{KVs: kvrs}
return nil
}
cw.kvRows = rows
// Do not close kvRows here, or it will start over on the next iteration
cw.kvDone = true
}

kvrs := make([]encoded.KVRecordV6, 0, BalancesPerCatchpointFileChunk)
for cw.kvRows.Next() {
k, v, err := cw.kvRows.KeyValue()
if err != nil {
return err
if !cw.onlineAccountsDone {
// Create the OnlineAccounts iterator JIT
if cw.onlineAccountRows == nil {
rows, err := cw.tx.MakeOnlineAccountsIter(ctx)
if err != nil {
return err

Check warning on line 378 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L378

Added line #L378 was not covered by tests
}
cw.onlineAccountRows = rows
}
kvrs = append(kvrs, encoded.KVRecordV6{Key: k, Value: v})
if len(kvrs) == BalancesPerCatchpointFileChunk {
break

onlineAccts := make([]encoded.OnlineAccountRecordV6, 0, BalancesPerCatchpointFileChunk)
for cw.onlineAccountRows.Next() {
oa, err := cw.onlineAccountRows.GetItem()
if err != nil {
return err

Check warning on line 387 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L387

Added line #L387 was not covered by tests
}
onlineAccts = append(onlineAccts, *oa)
if len(onlineAccts) == BalancesPerCatchpointFileChunk {
break

Check warning on line 391 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L391

Added line #L391 was not covered by tests
}
}
if len(onlineAccts) > 0 {
cw.chunk = catchpointFileChunkV6{OnlineAccounts: onlineAccts}
return nil
}
// Do not close onlineAccountRows here, or it will start over on the next iteration
cw.onlineAccountsDone = true
}

if !cw.onlineRoundParamsDone {
// Create the OnlineRoundParams iterator JIT
if cw.onlineRoundParamsRows == nil {
rows, err := cw.tx.MakeOnlineRoundParamsIter(ctx)
if err != nil {
return err

Check warning on line 407 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L407

Added line #L407 was not covered by tests
}
cw.onlineRoundParamsRows = rows
}

onlineRndParams := make([]encoded.OnlineRoundParamsRecordV6, 0, BalancesPerCatchpointFileChunk)
for cw.onlineRoundParamsRows.Next() {
or, err := cw.onlineRoundParamsRows.GetItem()
if err != nil {
return err

Check warning on line 416 in ledger/catchpointfilewriter.go

View check run for this annotation

Codecov / codecov/patch

ledger/catchpointfilewriter.go#L416

Added line #L416 was not covered by tests
}
onlineRndParams = append(onlineRndParams, *or)
if len(onlineRndParams) == BalancesPerCatchpointFileChunk {
break
}
}
if len(onlineRndParams) > 0 {
cw.chunk = catchpointFileChunkV6{OnlineRoundParams: onlineRndParams}
return nil
}
// Do not close onlineRndParamsRows here, or it will start over on the next iteration
cw.onlineRoundParamsDone = true
}
cw.chunk = catchpointFileChunkV6{KVs: kvrs}

// Finished the last chunk
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions ledger/catchpointfilewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ func TestExactAccountChunk(t *testing.T) {
catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz")

cph := testWriteCatchpoint(t, dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
require.EqualValues(t, cph.TotalChunks, 1)
require.EqualValues(t, cph.TotalChunks, 2)

l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath)
defer l.Close()
Expand Down Expand Up @@ -906,7 +906,7 @@ func TestCatchpointAfterTxns(t *testing.T) {
catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz")

cph := testWriteCatchpoint(t, dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
require.EqualValues(t, 2, cph.TotalChunks)
require.EqualValues(t, 3, cph.TotalChunks)

l := testNewLedgerFromCatchpoint(t, dl.validator.trackerDB(), catchpointFilePath)
defer l.Close()
Expand All @@ -922,7 +922,7 @@ func TestCatchpointAfterTxns(t *testing.T) {

// Write and read back in, and ensure even the last effect exists.
cph = testWriteCatchpoint(t, dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
require.EqualValues(t, cph.TotalChunks, 2) // Still only 2 chunks, as last was in a recent block
require.EqualValues(t, cph.TotalChunks, 3) // Still only 3 chunks, as last was in a recent block

// Drive home the point that `last` is _not_ included in the catchpoint by inspecting balance read from catchpoint.
{
Expand All @@ -938,7 +938,7 @@ func TestCatchpointAfterTxns(t *testing.T) {
}

cph = testWriteCatchpoint(t, dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
require.EqualValues(t, cph.TotalChunks, 3)
require.EqualValues(t, cph.TotalChunks, 4)

l = testNewLedgerFromCatchpoint(t, dl.validator.trackerDB(), catchpointFilePath)
defer l.Close()
Expand Down Expand Up @@ -1028,7 +1028,7 @@ func TestCatchpointAfterBoxTxns(t *testing.T) {
catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz")

cph := testWriteCatchpoint(t, dl.generator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0)
require.EqualValues(t, 2, cph.TotalChunks)
require.EqualValues(t, 3, cph.TotalChunks)

l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath)
defer l.Close()
Expand Down
Loading

0 comments on commit d775659

Please sign in to comment.