Skip to content
5 changes: 4 additions & 1 deletion packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,10 @@ export function addAttestationPostElectra(
true
);
} else {
const committees = epochCtx.getBeaconCommittees(attestation.data.slot, committeeIndices);
const attSlot = attestation.data.slot;
const attEpoch = computeEpochAtSlot(attSlot);
const decisionRoot = epochCtx.getShufflingDecisionRoot(attEpoch);
const committees = this.shufflingCache.getBeaconCommittees(attEpoch, decisionRoot, attSlot, committeeIndices);
const aggregationBools = attestation.aggregationBits.toBoolArray();
let offset = 0;
for (let i = 0; i < committees.length; i++) {
Expand Down
12 changes: 9 additions & 3 deletions packages/beacon-node/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ export async function verifyBlocksInEpoch(
throw new BlockError(block0, {code: BlockErrorCode.PRESTATE_MISSING, error: e as Error});
});

// in forky condition, make sure to populate ShufflingCache with regened state
// otherwise it may fail to get indexed attestations from shuffling cache later
this.shufflingCache.processState(preState0);

if (!isStateValidatorsNodesPopulated(preState0)) {
this.logger.verbose("verifyBlocksInEpoch preState0 SSZ cache stats", {
slot: preState0.slot,
Expand Down Expand Up @@ -105,9 +109,11 @@ export async function verifyBlocksInEpoch(
// Store indexed attestations for each block to avoid recomputing them during import
const indexedAttestationsByBlock: IndexedAttestation[][] = [];
for (const [i, block] of blocks.entries()) {
indexedAttestationsByBlock[i] = block.message.body.attestations.map((attestation) =>
preState0.epochCtx.getIndexedAttestation(fork, attestation)
);
indexedAttestationsByBlock[i] = block.message.body.attestations.map((attestation) => {
const attEpoch = computeEpochAtSlot(attestation.data.slot);
const decisionRoot = preState0.epochCtx.getShufflingDecisionRoot(attEpoch);
return this.shufflingCache.getIndexedAttestation(attEpoch, decisionRoot, fork, attestation);
});
}

// batch all I/O operations to reduce overhead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {MapDef, assert, toRootHex} from "@lodestar/utils";
import {Metrics} from "../../metrics/metrics.js";
import {IntersectResult, intersectUint8Arrays} from "../../util/bitArray.js";
import {getShufflingDependentRoot} from "../../util/dependentRoot.js";
import {ShufflingCache} from "../shufflingCache.js";
import {InsertOutcome} from "./types.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";

Expand Down Expand Up @@ -207,22 +208,18 @@ export class AggregatedAttestationPool {
this.lowestPermissibleSlot = Math.max(clockSlot - slotsToRetain, 0);
}

getAttestationsForBlock(fork: ForkName, forkChoice: IForkChoice, state: CachedBeaconStateAllForks): Attestation[] {
getAttestationsForBlock(
fork: ForkName,
forkChoice: IForkChoice,
shufflingCache: ShufflingCache,
state: CachedBeaconStateAllForks
): Attestation[] {
const forkSeq = ForkSeq[fork];
return forkSeq >= ForkSeq.electra
? this.getAttestationsForBlockElectra(fork, forkChoice, state)
: this.getAttestationsForBlockPreElectra(fork, forkChoice, state);
}
if (forkSeq < ForkSeq.electra) {
throw new Error("Does not support producing blocks for pre-electra forks anymore");
}

/**
* Get attestations to be included in a block pre-electra. Returns up to $MAX_ATTESTATIONS items
*/
getAttestationsForBlockPreElectra(
_fork: ForkName,
_forkChoice: IForkChoice,
_state: CachedBeaconStateAllForks
): phase0.Attestation[] {
throw new Error("Does not support producing blocks for pre-electra forks anymore");
return this.getAttestationsForBlockElectra(fork, forkChoice, shufflingCache, state);
}

/**
Expand All @@ -231,14 +228,15 @@ export class AggregatedAttestationPool {
getAttestationsForBlockElectra(
fork: ForkName,
forkChoice: IForkChoice,
shufflingCache: ShufflingCache,
state: CachedBeaconStateAllForks
): electra.Attestation[] {
const stateSlot = state.slot;
const stateEpoch = state.epochCtx.epoch;
const statePrevEpoch = stateEpoch - 1;
const rootCache = new RootCache(state);

const notSeenValidatorsFn = getNotSeenValidatorsFn(this.config, state);
const notSeenValidatorsFn = getNotSeenValidatorsFn(this.config, shufflingCache, state);
const validateAttestationDataFn = getValidateAttestationDataFn(forkChoice, state);

const slots = Array.from(this.attestationGroupByIndexByDataHexBySlot.keys()).sort((a, b) => b - a);
Expand Down Expand Up @@ -740,7 +738,11 @@ export function aggregateConsolidation({byCommittee, attData}: AttestationsConso
* Pre-compute participation from a CachedBeaconStateAllForks, for use to check if an attestation's committee
* has already attested or not.
*/
export function getNotSeenValidatorsFn(config: BeaconConfig, state: CachedBeaconStateAllForks): GetNotSeenValidatorsFn {
export function getNotSeenValidatorsFn(
config: BeaconConfig,
shufflingCache: ShufflingCache,
state: CachedBeaconStateAllForks
): GetNotSeenValidatorsFn {
const stateSlot = state.slot;
if (config.getForkName(stateSlot) === ForkName.phase0) {
throw new Error("getNotSeenValidatorsFn is not supported phase0 state");
Expand Down Expand Up @@ -772,7 +774,8 @@ export function getNotSeenValidatorsFn(config: BeaconConfig, state: CachedBeacon
return notSeenCommitteeMembers.size === 0 ? null : notSeenCommitteeMembers;
}

const committee = state.epochCtx.getBeaconCommittee(slot, committeeIndex);
const decisionRoot = state.epochCtx.getShufflingDecisionRoot(computeEpochAtSlot(slot));
const committee = shufflingCache.getBeaconCommittee(epoch, decisionRoot, slot, committeeIndex);
notSeenCommitteeMembers = new Set<number>();
for (const [i, validatorIndex] of committee.entries()) {
// no need to check flagIsTimelySource as if validator is not seen, it's participation status is 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,12 @@ export async function produceCommonBlockBody<T extends BlockType>(
this.opPool.getSlashingsAndExits(currentState, blockType, this.metrics);

const endAttestations = stepsMetrics?.startTimer();
const attestations = this.aggregatedAttestationPool.getAttestationsForBlock(fork, this.forkChoice, currentState);
const attestations = this.aggregatedAttestationPool.getAttestationsForBlock(
fork,
this.forkChoice,
this.shufflingCache,
currentState
);
endAttestations?.({
step: BlockProductionStep.attestations,
});
Expand Down
67 changes: 65 additions & 2 deletions packages/beacon-node/src/chain/shufflingCache.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import {CachedBeaconStateAllForks, EpochShuffling} from "@lodestar/state-transition";
import {Epoch, RootHex} from "@lodestar/types";
import {ForkSeq} from "@lodestar/params";
import {
CachedBeaconStateAllForks,
EpochShuffling,
getAttestingIndices,
getBeaconCommittees,
getIndexedAttestation,
} from "@lodestar/state-transition";
import {Attestation, CommitteeIndex, Epoch, IndexedAttestation, RootHex, Slot} from "@lodestar/types";
import {LodestarError, Logger, MapDef, pruneSetToMax} from "@lodestar/utils";
import {Metrics} from "../metrics/metrics.js";

Expand Down Expand Up @@ -128,6 +135,26 @@ export class ShufflingCache {
return cacheItem.promise;
}

/**
* Get a shuffling synchronously, return null if not present.
* The only time we have a promise cache item is when we regen shuffling for attestation, which never happens
* with default chain option.
*/
getSync(epoch: Epoch, decisionRoot: RootHex): EpochShuffling | null {
const cacheItem = this.itemsByDecisionRootByEpoch.getOrDefault(epoch).get(decisionRoot);
if (cacheItem === undefined) {
this.metrics?.shufflingCache.miss.inc();
return null;
}

if (isShufflingCacheItem(cacheItem)) {
this.metrics?.shufflingCache.hit.inc();
return cacheItem.shuffling;
}

return null;
}

/**
* Process a state to extract and cache all shufflings (previous, current, next).
* Uses the stored decision roots from epochCtx.
Expand All @@ -145,6 +172,42 @@ export class ShufflingCache {
this.set(epochCtx.nextShuffling, epochCtx.nextDecisionRoot);
}

getIndexedAttestation(
epoch: number,
decisionRoot: string,
fork: ForkSeq,
attestation: Attestation
): IndexedAttestation {
const shuffling = this.getShufflingOrThrow(epoch, decisionRoot);
return getIndexedAttestation(shuffling, fork, attestation);
}

getAttestingIndices(epoch: number, decisionRoot: string, fork: ForkSeq, attestation: Attestation): number[] {
const shuffling = this.getShufflingOrThrow(epoch, decisionRoot);
return getAttestingIndices(shuffling, fork, attestation);
}

getBeaconCommittee(epoch: number, decisionRoot: string, slot: Slot, index: CommitteeIndex): Uint32Array {
return this.getBeaconCommittees(epoch, decisionRoot, slot, [index])[0];
}

getBeaconCommittees(epoch: number, decisionRoot: string, slot: Slot, indices: CommitteeIndex[]): Uint32Array[] {
const shuffling = this.getShufflingOrThrow(epoch, decisionRoot);
return getBeaconCommittees(shuffling, slot, indices);
}

private getShufflingOrThrow(epoch: number, decisionRoot: string): EpochShuffling {
const shuffling = this.getSync(epoch, decisionRoot);
if (shuffling === null) {
throw new ShufflingCacheError({
code: ShufflingCacheErrorCode.NO_SHUFFLING_FOUND,
epoch,
decisionRoot,
});
}
return shuffling;
}

/**
* Add an EpochShuffling to the ShufflingCache. If a promise for the shuffling is present it will
* resolve the promise with the built shuffling
Expand Down
6 changes: 3 additions & 3 deletions packages/beacon-node/src/chain/validation/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import {
isForkPostElectra,
} from "@lodestar/params";
import {
EpochCacheError,
EpochCacheErrorCode,
EpochShuffling,
ShufflingError,
ShufflingErrorCode,
SingleSignatureSet,
computeEpochAtSlot,
computeSigningRoot,
Expand Down Expand Up @@ -224,7 +224,7 @@ export async function validateApiAttestation(
code: AttestationErrorCode.INVALID_SIGNATURE,
});
} catch (err) {
if (err instanceof EpochCacheError && err.type.code === EpochCacheErrorCode.COMMITTEE_INDEX_OUT_OF_RANGE) {
if (err instanceof ShufflingError && err.type.code === ShufflingErrorCode.COMMITTEE_INDEX_OUT_OF_RANGE) {
throw new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.BAD_TARGET_EPOCH,
});
Expand Down
3 changes: 3 additions & 0 deletions packages/beacon-node/src/chain/validation/block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ export async function validateGossipBlock(
throw new BlockGossipError(GossipAction.IGNORE, {code: BlockErrorCode.PARENT_UNKNOWN, parentRoot});
});

// in forky condition, make sure to populate ShufflingCache with regened state
chain.shufflingCache.processState(blockState);

// Extra conditions for merge fork blocks
// [REJECT] The block's execution payload timestamp is correct with respect to the slot
// -- i.e. execution_payload.timestamp == compute_timestamp_at_slot(state, block.slot).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
import {ssz} from "@lodestar/types";
import {generatePerfTestCachedStateAltair} from "../../../../../state-transition/test/perf/util.js";
import {AggregatedAttestationPool} from "../../../../src/chain/opPools/aggregatedAttestationPool.js";
import {ShufflingCache} from "../../../../src/chain/shufflingCache.js";

const vc = 1_500_000;

Expand Down Expand Up @@ -163,10 +164,12 @@ describe(`getAttestationsForBlock vc=${vc}`, () => {
},
beforeEach: (state) => {
const pool = getAggregatedAttestationPool(state, numMissedVotes, numBadVotes);
return {state, pool};
const shufflingCache = new ShufflingCache();
shufflingCache.processState(state);
return {state, pool, shufflingCache};
},
fn: ({state, pool}) => {
pool.getAttestationsForBlock(state.config.getForkName(state.slot), forkchoice, state);
fn: ({state, pool, shufflingCache}) => {
pool.getAttestationsForBlock(state.config.getForkName(state.slot), forkchoice, shufflingCache, state);
},
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
aggregateInto,
} from "../../../../src/chain/opPools/aggregatedAttestationPool.js";
import {InsertOutcome} from "../../../../src/chain/opPools/types.js";
import {ShufflingCache} from "../../../../src/chain/shufflingCache.js";
import {ZERO_HASH_HEX} from "../../../../src/constants/constants.js";
import {linspace} from "../../../../src/util/numpy.js";
import {MockedForkChoice, getMockedForkChoice} from "../../../mocks/mockedBeaconChain.js";
Expand Down Expand Up @@ -248,7 +249,9 @@ describe("AggregatedAttestationPool - get packed attestations - Electra", () =>
forkchoiceStub.getBlockHex.mockReturnValue(generateProtoBlock());
forkchoiceStub.getDependentRoot.mockReturnValue(ZERO_HASH_HEX);

const blockAttestations = pool.getAttestationsForBlock(fork, forkchoiceStub, electraState);
const shufflingCache = new ShufflingCache();
shufflingCache.processState(electraState);
const blockAttestations = pool.getAttestationsForBlock(fork, forkchoiceStub, shufflingCache, electraState);
// make sure test data is correct
expect(packedCommitteeBits.length).toBe(packedAggregationBitsLen.length);
expect(blockAttestations.length).toBe(packedCommitteeBits.length);
Expand Down
Loading
Loading