From b598cf8e8e2ce668702bee776cc5c8d98c25e9b3 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Fri, 20 Dec 2024 20:07:37 -0300 Subject: [PATCH] wip --- .../aztec-node/src/aztec-node/server.ts | 4 +- .../circuit-types/src/interfaces/configs.ts | 6 + yarn-project/circuit-types/src/tx/tx.ts | 16 ++ .../src/tx/validator/empty_validator.ts | 10 +- .../src/tx/validator/tx_validator.ts | 8 +- yarn-project/circuit-types/src/tx_effect.ts | 5 + yarn-project/circuits.js/src/structs/gas.ts | 5 + .../end-to-end/src/e2e_block_building.test.ts | 2 +- yarn-project/foundation/src/config/env_var.ts | 2 + yarn-project/p2p/src/client/p2p_client.ts | 20 ++ .../tx_validator/aggregate_tx_validator.ts | 33 ++- .../tx_validator/data_validator.test.ts | 4 +- .../tx_validator/data_validator.ts | 29 +-- .../tx_validator/double_spend_validator.ts | 54 +---- .../tx_validator/metadata_validator.ts | 13 +- .../tx_validator/tx_proof_validator.ts | 29 +-- .../p2p/src/services/libp2p/libp2p_service.ts | 7 +- .../prover-client/src/mocks/test_context.ts | 6 +- .../prover-node/src/job/epoch-proving-job.ts | 12 +- yarn-project/sequencer-client/src/config.ts | 10 + .../src/sequencer/sequencer.test.ts | 6 +- .../src/sequencer/sequencer.ts | 199 +++++------------- .../sequencer-client/src/sequencer/utils.ts | 4 +- .../src/tx_validator/gas_validator.ts | 19 +- .../src/tx_validator/tx_validator_factory.ts | 4 +- .../src/public/public_processor.test.ts | 35 ++- .../simulator/src/public/public_processor.ts | 129 +++++++++--- .../validator-client/src/validator.ts | 18 +- 28 files changed, 344 insertions(+), 345 deletions(-) diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index a801fbabf285..457c71878ceb 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -877,10 +877,8 @@ export class AztecNodeService implements AztecNode, Traceable { const txValidator = new AggregateTxValidator(...txValidators); - const [_, invalidTxs] = await txValidator.validateTxs([tx]); - if (invalidTxs.length > 0) { + if (!(await txValidator.validateTx(tx))) { this.log.warn(`Rejecting tx ${tx.getTxHash()} because of validation errors`); - return false; } diff --git a/yarn-project/circuit-types/src/interfaces/configs.ts b/yarn-project/circuit-types/src/interfaces/configs.ts index baafd3642945..0c18743a2d6f 100644 --- a/yarn-project/circuit-types/src/interfaces/configs.ts +++ b/yarn-project/circuit-types/src/interfaces/configs.ts @@ -20,6 +20,10 @@ export interface SequencerConfig { maxTxsPerBlock?: number; /** The minimum number of txs to include in a block. */ minTxsPerBlock?: number; + /** The maximum L2 block gas. */ + maxL2BlockGas?: number; + /** The maximum DA block gas. */ + maxDABlockGas?: number; /** Recipient of block reward. */ coinbase?: EthAddress; /** Address to receive fees. */ @@ -53,6 +57,8 @@ export const SequencerConfigSchema = z.object({ transactionPollingIntervalMS: z.number().optional(), maxTxsPerBlock: z.number().optional(), minTxsPerBlock: z.number().optional(), + maxL2BlockGas: z.number().optional(), + maxDABlockGas: z.number().optional(), coinbase: schemas.EthAddress.optional(), feeRecipient: schemas.AztecAddress.optional(), acvmWorkingDirectory: z.string().optional(), diff --git a/yarn-project/circuit-types/src/tx/tx.ts b/yarn-project/circuit-types/src/tx/tx.ts index 1e8a03107d19..a907d7b0a4e8 100644 --- a/yarn-project/circuit-types/src/tx/tx.ts +++ b/yarn-project/circuit-types/src/tx/tx.ts @@ -1,6 +1,8 @@ import { ClientIvcProof, + Fr, PrivateKernelTailCircuitPublicInputs, + PrivateLog, type PrivateToPublicAccumulatedData, type ScopedLogHash, } from '@aztec/circuits.js'; @@ -230,6 +232,20 @@ export class Tx extends Gossipable { ); } + /** + * Estimates the tx size based on its private effects. Note that the actual size of the tx + * after processing will probably be larger, as public execution would generate more data. + */ + getEstimatedPrivateTxEffectsSize() { + return ( + this.unencryptedLogs.getSerializedLength() + + this.contractClassLogs.getSerializedLength() + + this.data.getNonEmptyNoteHashes().length * Fr.SIZE_IN_BYTES + + this.data.getNonEmptyNullifiers().length * Fr.SIZE_IN_BYTES + + this.data.getNonEmptyPrivateLogs().length * PrivateLog.SIZE_IN_BYTES + ); + } + /** * Convenience function to get a hash out of a tx or a tx-like. * @param tx - Tx-like object. diff --git a/yarn-project/circuit-types/src/tx/validator/empty_validator.ts b/yarn-project/circuit-types/src/tx/validator/empty_validator.ts index 2ea10e7a55ab..ccb15a050721 100644 --- a/yarn-project/circuit-types/src/tx/validator/empty_validator.ts +++ b/yarn-project/circuit-types/src/tx/validator/empty_validator.ts @@ -1,11 +1,7 @@ -import { type AnyTx, type TxValidator } from './tx_validator.js'; +import { type AnyTx, type TxValidationResult, type TxValidator } from './tx_validator.js'; export class EmptyTxValidator implements TxValidator { - public validateTxs(txs: T[]): Promise<[validTxs: T[], invalidTxs: T[], skippedTxs: T[]]> { - return Promise.resolve([txs, [], []]); - } - - public validateTx(_tx: T): Promise { - return Promise.resolve(true); + public validateTx(_tx: T): Promise { + return Promise.resolve({ result: 'valid' }); } } diff --git a/yarn-project/circuit-types/src/tx/validator/tx_validator.ts b/yarn-project/circuit-types/src/tx/validator/tx_validator.ts index 040d764cf3d5..a39216a7ea39 100644 --- a/yarn-project/circuit-types/src/tx/validator/tx_validator.ts +++ b/yarn-project/circuit-types/src/tx/validator/tx_validator.ts @@ -3,7 +3,11 @@ import { type Tx } from '../tx.js'; export type AnyTx = Tx | ProcessedTx; +export type TxValidationResult = + | { result: 'valid' } + | { result: 'invalid'; reason: string[] } + | { result: 'skipped'; reason: string[] }; + export interface TxValidator { - validateTx(tx: T): Promise; - validateTxs(txs: T[]): Promise<[validTxs: T[], invalidTxs: T[], skippedTxs?: T[]]>; + validateTx(tx: T): Promise; } diff --git a/yarn-project/circuit-types/src/tx_effect.ts b/yarn-project/circuit-types/src/tx_effect.ts index 924f2e5bc95c..1a459801267b 100644 --- a/yarn-project/circuit-types/src/tx_effect.ts +++ b/yarn-project/circuit-types/src/tx_effect.ts @@ -152,6 +152,11 @@ export class TxEffect { ]); } + /** Returns the size of this tx effect in bytes as serialized onto DA. */ + getDASize() { + return this.toBlobFields().length * Fr.SIZE_IN_BYTES; + } + /** * Deserializes the TxEffect object from a Buffer. * @param buffer - Buffer or BufferReader object to deserialize. diff --git a/yarn-project/circuits.js/src/structs/gas.ts b/yarn-project/circuits.js/src/structs/gas.ts index 7952b2cbbbda..956c2b5052d4 100644 --- a/yarn-project/circuits.js/src/structs/gas.ts +++ b/yarn-project/circuits.js/src/structs/gas.ts @@ -78,6 +78,11 @@ export class Gas { return new Gas(Math.ceil(this.daGas * scalar), Math.ceil(this.l2Gas * scalar)); } + /** Returns true if any of this instance's dimensions is greater than the corresponding on the other. */ + gtAny(other: Gas) { + return this.daGas > other.daGas || this.l2Gas > other.l2Gas; + } + computeFee(gasFees: GasFees) { return GasDimensions.reduce( (acc, dimension) => acc.add(gasFees.get(dimension).mul(new Fr(this.get(dimension)))), diff --git a/yarn-project/end-to-end/src/e2e_block_building.test.ts b/yarn-project/end-to-end/src/e2e_block_building.test.ts index 70f1d36420e8..7854362a1d93 100644 --- a/yarn-project/end-to-end/src/e2e_block_building.test.ts +++ b/yarn-project/end-to-end/src/e2e_block_building.test.ts @@ -171,7 +171,7 @@ describe('e2e_block_building', () => { // This will leave the sequencer with just 2s to build the block, so it shouldn't be // able to squeeze in more than 10 txs in each. This is sensitive to the time it takes // to pick up and validate the txs, so we may need to bump it to work on CI. - sequencer.sequencer.timeTable[SequencerState.WAITING_FOR_TXS] = 2; + sequencer.sequencer.timeTable[SequencerState.INITIALIZING_PROPOSAL] = 2; sequencer.sequencer.timeTable[SequencerState.CREATING_BLOCK] = 2; sequencer.sequencer.processTxTime = 1; diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index cf8d34bfd05e..bd686606619a 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -139,6 +139,8 @@ export type EnvVar = | 'SEQ_MAX_BLOCK_SIZE_IN_BYTES' | 'SEQ_MAX_TX_PER_BLOCK' | 'SEQ_MIN_TX_PER_BLOCK' + | 'SEQ_MAX_DA_BLOCK_GAS' + | 'SEQ_MAX_L2_BLOCK_GAS' | 'SEQ_PUBLISH_RETRY_INTERVAL_MS' | 'SEQ_PUBLISHER_PRIVATE_KEY' | 'SEQ_REQUIRED_CONFIRMATIONS' diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index 0db273a1bd68..4ab855f48476 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -142,6 +142,12 @@ export type P2P = P2PApi & { */ getTxStatus(txHash: TxHash): 'pending' | 'mined' | undefined; + /** Returns an iterator over pending txs on the mempool. */ + iteratePendingTxs(): Iterable; + + /** Returns the number of pending txs in the mempool. */ + getPendingTxCount(): number; + /** * Starts the p2p client. * @returns A promise signalling the completion of the block sync. @@ -460,6 +466,20 @@ export class P2PClient return Promise.resolve(this.getTxs('pending')); } + public getPendingTxCount(): number { + return this.txPool.getPendingTxHashes().length; + } + + public *iteratePendingTxs() { + const pendingTxHashes = this.txPool.getPendingTxHashes(); + for (const txHash of pendingTxHashes) { + const tx = this.txPool.getTxByHash(txHash); + if (tx) { + yield tx; + } + } + } + /** * Returns all transactions in the transaction pool. * @returns An array of Txs. diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/aggregate_tx_validator.ts b/yarn-project/p2p/src/msg_validators/tx_validator/aggregate_tx_validator.ts index 21bf24ddb8db..8941aef57333 100644 --- a/yarn-project/p2p/src/msg_validators/tx_validator/aggregate_tx_validator.ts +++ b/yarn-project/p2p/src/msg_validators/tx_validator/aggregate_tx_validator.ts @@ -1,4 +1,4 @@ -import { type ProcessedTx, type Tx, type TxValidator } from '@aztec/circuit-types'; +import { type ProcessedTx, type Tx, type TxValidationResult, type TxValidator } from '@aztec/circuit-types'; export class AggregateTxValidator implements TxValidator { #validators: TxValidator[]; @@ -10,27 +10,20 @@ export class AggregateTxValidator implements TxValid this.#validators = validators; } - async validateTxs(txs: T[]): Promise<[validTxs: T[], invalidTxs: T[], skippedTxs: T[]]> { - const invalidTxs: T[] = []; - const skippedTxs: T[] = []; - let txPool = txs; + async validateTx(tx: T): Promise { + const aggregate: { result: string; reason: string[] } = { result: 'valid', reason: [] }; for (const validator of this.#validators) { - const [valid, invalid, skipped] = await validator.validateTxs(txPool); - invalidTxs.push(...invalid); - skippedTxs.push(...(skipped ?? [])); - txPool = valid; - } - - return [txPool, invalidTxs, skippedTxs]; - } - - async validateTx(tx: T): Promise { - for (const validator of this.#validators) { - const valid = await validator.validateTx(tx); - if (!valid) { - return false; + const result = await validator.validateTx(tx); + if (result.result === 'invalid') { + aggregate.result = 'invalid'; + aggregate.reason.push(...result.reason); + } else if (result.result === 'skipped') { + if (aggregate.result === 'valid') { + aggregate.result = 'skipped'; + } + aggregate.reason.push(...result.reason); } } - return true; + return aggregate as TxValidationResult; } } diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/data_validator.test.ts b/yarn-project/p2p/src/msg_validators/tx_validator/data_validator.test.ts index 6b7f42859f62..eebe11264cfd 100644 --- a/yarn-project/p2p/src/msg_validators/tx_validator/data_validator.test.ts +++ b/yarn-project/p2p/src/msg_validators/tx_validator/data_validator.test.ts @@ -22,8 +22,8 @@ describe('TxDataValidator', () => { }); it('allows transactions with the correct data', async () => { - const txs = mockTxs(3); - await expect(validator.validateTxs(txs)).resolves.toEqual([txs, []]); + const [tx] = mockTxs(1); + await expect(validator.validateTx(tx)).resolves.toEqual({ result: 'valid' }); }); it('rejects txs with mismatch non revertible execution requests', async () => { diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/data_validator.ts b/yarn-project/p2p/src/msg_validators/tx_validator/data_validator.ts index 143713cc2801..ddc5d43ca87c 100644 --- a/yarn-project/p2p/src/msg_validators/tx_validator/data_validator.ts +++ b/yarn-project/p2p/src/msg_validators/tx_validator/data_validator.ts @@ -1,29 +1,14 @@ -import { Tx, type TxValidator } from '@aztec/circuit-types'; +import { Tx, type TxValidationResult, type TxValidator } from '@aztec/circuit-types'; import { createLogger } from '@aztec/foundation/log'; export class DataTxValidator implements TxValidator { #log = createLogger('p2p:tx_validator:tx_data'); - validateTxs(txs: Tx[]): Promise<[validTxs: Tx[], invalidTxs: Tx[]]> { - const validTxs: Tx[] = []; - const invalidTxs: Tx[] = []; - for (const tx of txs) { - if (!this.#hasCorrectExecutionRequests(tx)) { - invalidTxs.push(tx); - continue; - } - - validTxs.push(tx); - } - - return Promise.resolve([validTxs, invalidTxs]); - } - - validateTx(tx: Tx): Promise { + validateTx(tx: Tx): Promise { return Promise.resolve(this.#hasCorrectExecutionRequests(tx)); } - #hasCorrectExecutionRequests(tx: Tx): boolean { + #hasCorrectExecutionRequests(tx: Tx): TxValidationResult { const callRequests = [ ...tx.data.getRevertiblePublicCallRequests(), ...tx.data.getNonRevertiblePublicCallRequests(), @@ -34,7 +19,7 @@ export class DataTxValidator implements TxValidator { callRequests.length }. Got ${tx.enqueuedPublicFunctionCalls.length}.`, ); - return false; + return { result: 'invalid', reason: ['Wrong number of execution requests for public calls'] }; } const invalidExecutionRequestIndex = tx.enqueuedPublicFunctionCalls.findIndex( @@ -46,7 +31,7 @@ export class DataTxValidator implements TxValidator { tx, )} because of incorrect execution requests for public call at index ${invalidExecutionRequestIndex}.`, ); - return false; + return { result: 'invalid', reason: ['Incorrect execution request for public call'] }; } const teardownCallRequest = tx.data.getTeardownPublicCallRequest(); @@ -55,10 +40,10 @@ export class DataTxValidator implements TxValidator { (teardownCallRequest && !tx.publicTeardownFunctionCall.isForCallRequest(teardownCallRequest)); if (isInvalidTeardownExecutionRequest) { this.#log.warn(`Rejecting tx ${Tx.getHash(tx)} because of incorrect teardown execution requests.`); - return false; + return { result: 'invalid', reason: ['Incorrect teardown execution request'] }; } - return true; + return { result: 'valid' }; } // TODO: Check logs. diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/double_spend_validator.ts b/yarn-project/p2p/src/msg_validators/tx_validator/double_spend_validator.ts index 9f735e197b02..69f5c83fc830 100644 --- a/yarn-project/p2p/src/msg_validators/tx_validator/double_spend_validator.ts +++ b/yarn-project/p2p/src/msg_validators/tx_validator/double_spend_validator.ts @@ -1,69 +1,35 @@ -import { type AnyTx, Tx, type TxValidator } from '@aztec/circuit-types'; +import { type AnyTx, Tx, type TxValidationResult, type TxValidator } from '@aztec/circuit-types'; import { createLogger } from '@aztec/foundation/log'; export interface NullifierSource { - getNullifierIndices: (nullifiers: Buffer[]) => Promise<(bigint | undefined)[]>; + nullifierExists: (nullifier: Buffer) => Promise; } export class DoubleSpendTxValidator implements TxValidator { #log = createLogger('p2p:tx_validator:tx_double_spend'); #nullifierSource: NullifierSource; - constructor(nullifierSource: NullifierSource, private readonly isValidatingBlock: boolean = true) { + constructor(nullifierSource: NullifierSource) { this.#nullifierSource = nullifierSource; } - async validateTxs(txs: T[]): Promise<[validTxs: T[], invalidTxs: T[]]> { - const validTxs: T[] = []; - const invalidTxs: T[] = []; - const thisBlockNullifiers = new Set(); - - for (const tx of txs) { - if (!(await this.#uniqueNullifiers(tx, thisBlockNullifiers))) { - invalidTxs.push(tx); - continue; - } - - validTxs.push(tx); - } - - return [validTxs, invalidTxs]; - } - - validateTx(tx: T): Promise { - return this.#uniqueNullifiers(tx, new Set()); - } - - async #uniqueNullifiers(tx: AnyTx, thisBlockNullifiers: Set): Promise { + async validateTx(tx: T): Promise { const nullifiers = tx instanceof Tx ? tx.data.getNonEmptyNullifiers() : tx.txEffect.nullifiers; // Ditch this tx if it has repeated nullifiers const uniqueNullifiers = new Set(nullifiers); if (uniqueNullifiers.size !== nullifiers.length) { this.#log.warn(`Rejecting tx ${Tx.getHash(tx)} for emitting duplicate nullifiers`); - return false; + return { result: 'invalid', reason: ['Duplicate nullifier in tx'] }; } - if (this.isValidatingBlock) { - for (const nullifier of nullifiers) { - const nullifierBigInt = nullifier.toBigInt(); - if (thisBlockNullifiers.has(nullifierBigInt)) { - this.#log.warn(`Rejecting tx ${Tx.getHash(tx)} for repeating a nullifier in the same block`); - return false; - } - - thisBlockNullifiers.add(nullifierBigInt); + for (const nullifier of nullifiers) { + if (await this.#nullifierSource.nullifierExists(nullifier.toBuffer())) { + this.#log.warn(`Rejecting tx ${Tx.getHash(tx)} for repeating a nullifier`); + return { result: 'invalid', reason: ['Repeated nullifier'] }; } } - const nullifierIndexes = await this.#nullifierSource.getNullifierIndices(nullifiers.map(n => n.toBuffer())); - - const hasDuplicates = nullifierIndexes.some(index => index !== undefined); - if (hasDuplicates) { - this.#log.warn(`Rejecting tx ${Tx.getHash(tx)} for repeating nullifiers present in state trees`); - return false; - } - - return true; + return { result: 'valid' }; } } diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/metadata_validator.ts b/yarn-project/p2p/src/msg_validators/tx_validator/metadata_validator.ts index fe3194a454ed..a079153a549b 100644 --- a/yarn-project/p2p/src/msg_validators/tx_validator/metadata_validator.ts +++ b/yarn-project/p2p/src/msg_validators/tx_validator/metadata_validator.ts @@ -1,4 +1,4 @@ -import { type AnyTx, Tx, type TxValidator } from '@aztec/circuit-types'; +import { type AnyTx, Tx, type TxValidationResult, type TxValidator } from '@aztec/circuit-types'; import { type Fr } from '@aztec/circuits.js'; import { createLogger } from '@aztec/foundation/log'; @@ -27,8 +27,15 @@ export class MetadataTxValidator implements TxValidator { return Promise.resolve([validTxs, invalidTxs]); } - validateTx(tx: T): Promise { - return Promise.resolve(this.#hasCorrectChainId(tx) && this.#isValidForBlockNumber(tx)); + validateTx(tx: T): Promise { + const errors = []; + if (!this.#hasCorrectChainId(tx)) { + errors.push('Incorrect chain id'); + } + if (!this.#isValidForBlockNumber(tx)) { + errors.push('Invalid block number'); + } + return Promise.resolve(errors.length > 0 ? { result: 'invalid', reason: errors } : { result: 'valid' }); } #hasCorrectChainId(tx: T): boolean { diff --git a/yarn-project/p2p/src/msg_validators/tx_validator/tx_proof_validator.ts b/yarn-project/p2p/src/msg_validators/tx_validator/tx_proof_validator.ts index 172234ce3bc8..5e6e2abaa8d5 100644 --- a/yarn-project/p2p/src/msg_validators/tx_validator/tx_proof_validator.ts +++ b/yarn-project/p2p/src/msg_validators/tx_validator/tx_proof_validator.ts @@ -1,4 +1,9 @@ -import { type ClientProtocolCircuitVerifier, Tx, type TxValidator } from '@aztec/circuit-types'; +import { + type ClientProtocolCircuitVerifier, + Tx, + type TxValidationResult, + type TxValidator, +} from '@aztec/circuit-types'; import { createLogger } from '@aztec/foundation/log'; export class TxProofValidator implements TxValidator { @@ -6,23 +11,11 @@ export class TxProofValidator implements TxValidator { constructor(private verifier: ClientProtocolCircuitVerifier) {} - async validateTxs(txs: Tx[]): Promise<[validTxs: Tx[], invalidTxs: Tx[]]> { - const validTxs: Tx[] = []; - const invalidTxs: Tx[] = []; - - for (const tx of txs) { - if (await this.verifier.verifyProof(tx)) { - validTxs.push(tx); - } else { - this.#log.warn(`Rejecting tx ${Tx.getHash(tx)} for invalid proof`); - invalidTxs.push(tx); - } + validateTx(tx: Tx): Promise { + if (!this.verifier.verifyProof(tx)) { + this.#log.warn(`Rejecting tx ${Tx.getHash(tx)} for invalid proof`); + return Promise.resolve({ result: 'invalid', reason: ['Invalid proof'] }); } - - return [validTxs, invalidTxs]; - } - - validateTx(tx: Tx): Promise { - return this.verifier.verifyProof(tx); + return Promise.resolve({ result: 'valid' }); } } diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index 6b052af024a2..a9f15d71d305 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -6,17 +6,18 @@ import { type Gossipable, type L2BlockSource, MerkleTreeId, + P2PClientType, PeerErrorSeverity, type PeerInfo, type RawGossipMessage, TopicTypeMap, Tx, TxHash, + TxValidationResult, type WorldStateSynchronizer, getTopicTypeForClientType, metricsTopicStrToLabels, } from '@aztec/circuit-types'; -import { P2PClientType } from '@aztec/circuit-types'; import { Fr } from '@aztec/circuits.js'; import { type EpochCache } from '@aztec/epoch-cache'; import { createLogger } from '@aztec/foundation/log'; @@ -73,14 +74,14 @@ import { GossipSubEvent } from '../types.js'; interface MessageValidator { validator: { - validateTx(tx: Tx): Promise; + validateTx(tx: Tx): Promise; }; severity: PeerErrorSeverity; } interface ValidationResult { name: string; - isValid: boolean; + isValid: TxValidationResult; severity: PeerErrorSeverity; } diff --git a/yarn-project/prover-client/src/mocks/test_context.ts b/yarn-project/prover-client/src/mocks/test_context.ts index 9ec68ce94ce0..a65934237261 100644 --- a/yarn-project/prover-client/src/mocks/test_context.ts +++ b/yarn-project/prover-client/src/mocks/test_context.ts @@ -195,7 +195,7 @@ export class TestContext { return { block, txs, msgs }; } - public async processPublicFunctions(txs: Tx[], maxTransactions: number, txValidator?: TxValidator) { + public async processPublicFunctions(txs: Tx[], maxTransactions: number) { const defaultExecutorImplementation = ( _stateManager: AvmPersistableStateManager, executionRequest: PublicExecutionRequest, @@ -220,7 +220,6 @@ export class TestContext { return await this.processPublicFunctionsWithMockExecutorImplementation( txs, maxTransactions, - txValidator, defaultExecutorImplementation, ); } @@ -244,7 +243,6 @@ export class TestContext { private async processPublicFunctionsWithMockExecutorImplementation( txs: Tx[], maxTransactions: number, - txValidator?: TxValidator, executorMock?: ( stateManager: AvmPersistableStateManager, executionRequest: PublicExecutionRequest, @@ -271,7 +269,7 @@ export class TestContext { if (executorMock) { simulateInternal.mockImplementation(executorMock); } - return await this.publicProcessor.process(txs, maxTransactions, txValidator); + return await this.publicProcessor.process(txs, { maxTransactions }); } } diff --git a/yarn-project/prover-node/src/job/epoch-proving-job.ts b/yarn-project/prover-node/src/job/epoch-proving-job.ts index 06e5d26fa82b..61f2ee23a110 100644 --- a/yarn-project/prover-node/src/job/epoch-proving-job.ts +++ b/yarn-project/prover-node/src/job/epoch-proving-job.ts @@ -1,5 +1,4 @@ import { - EmptyTxValidator, type EpochProver, type EpochProvingJobState, type ForkMerkleTreeOperations, @@ -93,7 +92,6 @@ export class EpochProvingJob implements Traceable { await asyncPool(this.config.parallelBlockLimit, this.blocks, async block => { const globalVariables = block.header.globalVariables; const txHashes = block.body.txEffects.map(tx => tx.txHash); - const txCount = block.body.numberOfTxsIncludingPadded; const l1ToL2Messages = await this.getL1ToL2Messages(block); const txs = await this.getTxs(txHashes, block.number); const previousHeader = await this.getBlockHeader(block.number - 1); @@ -115,7 +113,7 @@ export class EpochProvingJob implements Traceable { // Process public fns const db = await this.dbProvider.fork(block.number - 1); const publicProcessor = this.publicProcessorFactory.create(db, previousHeader, globalVariables, true); - const processed = await this.processTxs(publicProcessor, txs, txCount); + const processed = await this.processTxs(publicProcessor, txs); await this.prover.addTxs(processed); await db.close(); this.log.verbose(`Processed all ${txs.length} txs for block ${block.number}`, { @@ -179,12 +177,8 @@ export class EpochProvingJob implements Traceable { return this.l1ToL2MessageSource.getL1ToL2Messages(BigInt(block.number)); } - private async processTxs( - publicProcessor: PublicProcessor, - txs: Tx[], - totalNumberOfTxs: number, - ): Promise { - const [processedTxs, failedTxs] = await publicProcessor.process(txs, totalNumberOfTxs, new EmptyTxValidator()); + private async processTxs(publicProcessor: PublicProcessor, txs: Tx[]): Promise { + const [processedTxs, failedTxs] = await publicProcessor.process(txs); if (failedTxs.length) { throw new Error( diff --git a/yarn-project/sequencer-client/src/config.ts b/yarn-project/sequencer-client/src/config.ts index 10f714b6cf60..09064f254046 100644 --- a/yarn-project/sequencer-client/src/config.ts +++ b/yarn-project/sequencer-client/src/config.ts @@ -59,6 +59,16 @@ export const sequencerConfigMappings: ConfigMappingsType = { description: 'The minimum number of txs to include in a block.', ...numberConfigHelper(1), }, + maxL2BlockGas: { + env: 'SEQ_MAX_L2_BLOCK_GAS', + description: 'The maximum L2 block gas.', + ...numberConfigHelper(10e9), + }, + maxDABlockGas: { + env: 'SEQ_MAX_DA_BLOCK_GAS', + description: 'The maximum DA block gas.', + ...numberConfigHelper(10e9), + }, coinbase: { env: 'COINBASE', parseEnv: (val: string) => EthAddress.fromString(val), diff --git a/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts b/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts index b1a024014505..be05f6bc747e 100644 --- a/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts +++ b/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts @@ -157,7 +157,9 @@ describe('sequencer', () => { publicProcessor = mock({ process: async txs => [ await Promise.all( - txs.map(tx => makeProcessedTxFromPrivateOnlyTx(tx, Fr.ZERO, undefined, block.header.globalVariables)), + Array.from(txs).map(tx => + makeProcessedTxFromPrivateOnlyTx(tx, Fr.ZERO, undefined, block.header.globalVariables), + ), ), [], [], @@ -237,7 +239,7 @@ describe('sequencer', () => { }); it.each([ - { delayedState: SequencerState.WAITING_FOR_TXS }, + { delayedState: SequencerState.INITIALIZING_PROPOSAL }, // It would be nice to add the other states, but we would need to inject delays within the `work` loop ])('does not build a block if it does not have enough time left in the slot', async ({ delayedState }) => { // trick the sequencer into thinking that we are just too far into slot 1 diff --git a/yarn-project/sequencer-client/src/sequencer/sequencer.ts b/yarn-project/sequencer-client/src/sequencer/sequencer.ts index 9f758c456ccb..d89534fb7946 100644 --- a/yarn-project/sequencer-client/src/sequencer/sequencer.ts +++ b/yarn-project/sequencer-client/src/sequencer/sequencer.ts @@ -18,6 +18,7 @@ import { BlockHeader, ContentCommitment, GENESIS_ARCHIVE_ROOT, + Gas, type GlobalVariables, StateReference, } from '@aztec/circuits.js'; @@ -46,12 +47,6 @@ import { SequencerState, orderAttestations } from './utils.js'; export { SequencerState }; -export type ShouldProposeArgs = { - pendingTxsCount?: number; - validTxsCount?: number; - processedTxsCount?: number; -}; - export class SequencerTooSlowError extends Error { constructor( public readonly currentState: SequencerState, @@ -89,6 +84,7 @@ export class Sequencer { private state = SequencerState.STOPPED; private allowedInSetup: AllowedElement[] = getDefaultAllowedSetupFunctions(); private maxBlockSizeInBytes: number = 1024 * 1024; + private maxBlockGas: Gas = new Gas(10e9, 10e9); private processTxTime: number = 12; private metrics: SequencerMetrics; private isFlushing: boolean = false; @@ -144,6 +140,12 @@ export class Sequencer { if (config.minTxsPerBlock !== undefined) { this.minTxsPerBLock = config.minTxsPerBlock; } + if (config.maxDABlockGas !== undefined) { + this.maxBlockGas = new Gas(config.maxDABlockGas, this.maxBlockGas.l2Gas); + } + if (config.maxL2BlockGas !== undefined) { + this.maxBlockGas = new Gas(this.maxBlockGas.daGas, config.maxL2BlockGas); + } if (config.coinbase) { this._coinbase = config.coinbase; } @@ -172,10 +174,10 @@ export class Sequencer { private setTimeTable() { // How late into the slot can we be to start working - const initialTime = 1; + const initialTime = 2; - // How long it takes to validate the txs collected and get ready to start building - const blockPrepareTime = 2; + // How long it takes to get ready to start building + const blockPrepareTime = 1; // How long it takes to for attestations to travel across the p2p layer. const attestationPropagationTime = 2; @@ -213,9 +215,9 @@ export class Sequencer { [SequencerState.SYNCHRONIZING]: this.aztecSlotDuration, // We always want to allow the full slot to check if we are the proposer [SequencerState.PROPOSER_CHECK]: this.aztecSlotDuration, - // First transition towards building a block - [SequencerState.WAITING_FOR_TXS]: initialTime, - // We then validate the txs and prepare to start building the block + // How late we can start initializing a new block proposal + [SequencerState.INITIALIZING_PROPOSAL]: initialTime, + // When we start building a block [SequencerState.CREATING_BLOCK]: initialTime + blockPrepareTime, // We start collecting attestations after building the block [SequencerState.COLLECTING_ATTESTATIONS]: initialTime + blockPrepareTime + processTxsTime + blockValidationTime, @@ -316,25 +318,27 @@ export class Sequencer { void this.publisher.castVote(slot, newGlobalVariables.timestamp.toBigInt()); - if (!this.shouldProposeBlock(historicalHeader, {})) { + // Check the pool has enough txs to build a block + const pendingTxCount = this.p2pClient.getPendingTxCount(); + if (pendingTxCount < this.minTxsPerBLock && !this.isFlushing) { + this.log.verbose(`Not enough txs to propose block. Got ${pendingTxCount} min ${this.minTxsPerBLock}.`, { + slot, + blockNumber: newBlockNumber, + }); + await this.claimEpochProofRightIfAvailable(slot); return; } + this.setState(SequencerState.INITIALIZING_PROPOSAL, slot); this.log.verbose(`Preparing proposal for block ${newBlockNumber} at slot ${slot}`, { chainTipArchive: new Fr(chainTipArchive), blockNumber: newBlockNumber, slot, }); - this.setState(SequencerState.WAITING_FOR_TXS, slot); - - // Get txs to build the new block. - const pendingTxs = await this.p2pClient.getPendingTxs(); - - if (!this.shouldProposeBlock(historicalHeader, { pendingTxsCount: pendingTxs.length })) { - await this.claimEpochProofRightIfAvailable(slot); - return; - } + // We don't fetch exactly maxTxsPerBlock txs here because we may not need all of them if we hit a limit before, + // and also we may need to fetch more if we don't have enough valid txs. + const pendingTxs = this.p2pClient.iteratePendingTxs(); // If I created a "partial" header here that should make our job much easier. const proposalHeader = new BlockHeader( @@ -346,35 +350,12 @@ export class Sequencer { Fr.ZERO, ); - // TODO: It should be responsibility of the P2P layer to validate txs before passing them on here. - // TODO: We should validate only the number of txs we need to speed up this process. - const allValidTxs = await this.takeValidTxs( - pendingTxs, - this.txValidatorFactory.validatorForNewTxs(newGlobalVariables, this.allowedInSetup), - ); - - // TODO: We are taking the size of the tx from private-land, but we should be doing this after running - // public functions. Only reason why we do it here now is because the public processor and orchestrator - // are set up such that they require knowing the total number of txs in advance. Still, main reason for - // exceeding max block size in bytes is contract class registration, which happens in private-land. This - // may break if we start emitting lots of log data from public-land. - const validTxs = this.takeTxsWithinMaxSize(allValidTxs); - - this.log.verbose( - `Collected ${validTxs.length} txs out of ${allValidTxs.length} valid txs out of ${pendingTxs.length} total pending txs for block ${newBlockNumber}`, - ); - - // Bail if we don't have enough valid txs - if (!this.shouldProposeBlock(historicalHeader, { validTxsCount: validTxs.length })) { - await this.claimEpochProofRightIfAvailable(slot); - return; - } - try { + // TODO(palla/txs) Is the note below still valid? We don't seem to be doing any rollback in there. // @note It is very important that the following function will FAIL and not just return early // if it have made any state changes. If not, we won't rollback the state, and you will // be in for a world of pain. - await this.buildBlockAndAttemptToPublish(validTxs, proposalHeader, historicalHeader); + await this.buildBlockAndAttemptToPublish(pendingTxs, proposalHeader, historicalHeader); } catch (err) { this.log.error(`Error assembling block`, err, { blockNumber: newBlockNumber, slot }); } @@ -462,64 +443,20 @@ export class Sequencer { this.state = proposedState; } - shouldProposeBlock(historicalHeader: BlockHeader | undefined, args: ShouldProposeArgs): boolean { - if (this.isFlushing) { - this.log.verbose(`Flushing all pending txs in new block`); - return true; - } - - // Compute time elapsed since the previous block - const lastBlockTime = historicalHeader?.globalVariables.timestamp.toNumber() || 0; - const currentTime = Math.floor(Date.now() / 1000); - const elapsedSinceLastBlock = currentTime - lastBlockTime; - this.log.debug( - `Last block mined at ${lastBlockTime} current time is ${currentTime} (elapsed ${elapsedSinceLastBlock})`, - ); - - // We need to have at least minTxsPerBLock txs. - if (args.pendingTxsCount !== undefined && args.pendingTxsCount < this.minTxsPerBLock) { - this.log.verbose( - `Not creating block because not enough txs in the pool (got ${args.pendingTxsCount} min ${this.minTxsPerBLock})`, - ); - return false; - } - - // Bail if we don't have enough valid txs - if (args.validTxsCount !== undefined && args.validTxsCount < this.minTxsPerBLock) { - this.log.verbose( - `Not creating block because not enough valid txs loaded from the pool (got ${args.validTxsCount} min ${this.minTxsPerBLock})`, - ); - return false; - } - - // TODO: This check should be processedTxs.length < this.minTxsPerBLock, so we don't publish a block with - // less txs than the minimum. But that'd cause the entire block to be aborted and retried. Instead, we should - // go back to the p2p pool and load more txs until we hit our minTxsPerBLock target. Only if there are no txs - // we should bail. - if (args.processedTxsCount === 0 && this.minTxsPerBLock > 0) { - this.log.verbose('No txs processed correctly to build block.'); - return false; - } - - return true; - } - /** * Build a block * * Shared between the sequencer and the validator for re-execution * - * @param validTxs - The valid transactions to construct the block from + * @param pendingTxs - The pending transactions to construct the block from * @param newGlobalVariables - The global variables for the new block * @param historicalHeader - The historical header of the parent - * @param interrupt - The interrupt callback, used to validate the block for submission and check if we should propose the block * @param opts - Whether to just validate the block as a validator, as opposed to building it as a proposal */ private async buildBlock( - validTxs: Tx[], + pendingTxs: Iterable, newGlobalVariables: GlobalVariables, historicalHeader?: BlockHeader, - interrupt?: (processedTxs: ProcessedTx[]) => Promise, opts: { validateOnly?: boolean } = {}, ) { const blockNumber = newGlobalVariables.blockNumber.toBigInt(); @@ -527,19 +464,9 @@ export class Sequencer { this.log.debug(`Requesting L1 to L2 messages from contract for block ${blockNumber}`); const l1ToL2Messages = await this.l1ToL2MessageSource.getL1ToL2Messages(blockNumber); + const msgCount = l1ToL2Messages.length; - this.log.verbose( - `Building block ${blockNumber} with ${validTxs.length} txs and ${l1ToL2Messages.length} messages`, - { - msgCount: l1ToL2Messages.length, - txCount: validTxs.length, - slot, - blockNumber, - }, - ); - - const numRealTxs = validTxs.length; - const blockSize = Math.max(2, numRealTxs); + this.log.verbose(`Building block ${blockNumber} for slot ${slot}`, { slot, blockNumber, msgCount }); // Sync to the previous block at least await this.worldState.syncImmediate(newGlobalVariables.blockNumber.toNumber() - 1); @@ -563,18 +490,27 @@ export class Sequencer { // We set the deadline for tx processing to the start of the CREATING_BLOCK phase, plus the expected time for tx processing. // Deadline is only set if enforceTimeTable is enabled. const processingEndTimeWithinSlot = this.timeTable[SequencerState.CREATING_BLOCK] + this.processTxTime; - const processingDeadline = this.enforceTimeTable + const deadline = this.enforceTimeTable ? new Date((this.getSlotStartTimestamp(slot) + processingEndTimeWithinSlot) * 1000) : undefined; - this.log.verbose(`Processing ${validTxs.length} txs`, { + this.log.verbose(`Processing pending txs`, { slot, slotStart: new Date(this.getSlotStartTimestamp(slot) * 1000), now: new Date(this.dateProvider.now()), - deadline: processingDeadline, + deadline, }); + + // REFACTOR: Public processor should just handle processing, one tx at a time. It should be responsibility + // of the sequencer to update world state and iterate over txs. We should refactor this along with unifying the + // publicProcessorFork and orchestratorFork, to avoid doing tree insertions twice when building the block. + const newTxValidator = this.txValidatorFactory.validatorForNewTxs(newGlobalVariables, this.allowedInSetup); const processingTxValidator = this.txValidatorFactory.validatorForProcessedTxs(publicProcessorFork); const [publicProcessorDuration, [processedTxs, failedTxs]] = await elapsed(() => - processor.process(validTxs, blockSize, processingTxValidator, processingDeadline), + processor.process( + pendingTxs, + { deadline, maxTransactions: this.maxTxsPerBlock, maxBlockSize: this.maxBlockSizeInBytes }, + { preProcess: newTxValidator, postProcess: processingTxValidator }, + ), ); if (failedTxs.length > 0) { @@ -602,8 +538,6 @@ export class Sequencer { const duration = Number(end - start) / 1_000; this.metrics.recordBlockBuilderTreeInsertions(duration); - await interrupt?.(processedTxs); - // All real transactions have been added, set the block as full and pad if needed const block = await blockBuilder.setBlockCompleted(); @@ -611,7 +545,7 @@ export class Sequencer { block, publicProcessorDuration, numMsgs: l1ToL2Messages.length, - numProcessedTxs: processedTxs.length, + numTxs: processedTxs.length, blockBuildingTimer, }; } finally { @@ -635,7 +569,7 @@ export class Sequencer { * @dev MUST throw instead of exiting early to ensure that world-state * is being rolled back if the block is dropped. * - * @param validTxs - The valid transactions to construct the block from + * @param pendingTxs - Iterable of pending transactions to construct the block from * @param proposalHeader - The partial header constructed for the proposal * @param historicalHeader - The historical header of the parent */ @@ -643,7 +577,7 @@ export class Sequencer { [Attributes.BLOCK_NUMBER]: proposalHeader.globalVariables.blockNumber.toNumber(), })) private async buildBlockAndAttemptToPublish( - validTxs: Tx[], + pendingTxs: Iterable, proposalHeader: BlockHeader, historicalHeader: BlockHeader | undefined, ): Promise { @@ -653,40 +587,19 @@ export class Sequencer { const blockNumber = newGlobalVariables.blockNumber.toNumber(); const slot = newGlobalVariables.slotNumber.toBigInt(); - this.metrics.recordNewBlock(blockNumber, validTxs.length); + // this.metrics.recordNewBlock(blockNumber, validTxs.length); const workTimer = new Timer(); this.setState(SequencerState.CREATING_BLOCK, slot); - /** - * BuildBlock is shared between the sequencer and the validator for re-execution - * We use the interrupt callback to validate the block for submission and check if we should propose the block - * - * If we fail, we throw an error in order to roll back - */ - const interrupt = async (processedTxs: ProcessedTx[]) => { - await this.publisher.validateBlockForSubmission(proposalHeader); - - if ( - !this.shouldProposeBlock(historicalHeader, { - validTxsCount: validTxs.length, - processedTxsCount: processedTxs.length, - }) - ) { - // TODO: Roll back changes to world state - throw new Error('Should not propose the block'); - } - }; - // Start collecting proof quotes for the previous epoch if needed in the background const proofQuotePromise = this.createProofClaimForPreviousEpoch(slot); try { - const buildBlockRes = await this.buildBlock(validTxs, newGlobalVariables, historicalHeader, interrupt); - const { block, publicProcessorDuration, numProcessedTxs, numMsgs, blockBuildingTimer } = buildBlockRes; + const buildBlockRes = await this.buildBlock(pendingTxs, newGlobalVariables, historicalHeader); + const { block, publicProcessorDuration, numTxs, numMsgs, blockBuildingTimer } = buildBlockRes; // TODO(@PhilWindle) We should probably periodically check for things like another // block being published before ours instead of just waiting on our block - await this.publisher.validateBlockForSubmission(block.header); const workDuration = workTimer.ms(); @@ -700,8 +613,8 @@ export class Sequencer { }; const blockHash = block.hash(); - const txHashes = validTxs.map(tx => tx.getTxHash()); - this.log.info(`Built block ${block.number} with hash ${blockHash}`, { + const txHashes = block.body.txEffects.map(tx => tx.txHash); + this.log.info(`Built block ${block.number} for slot ${slot} with ${numTxs} txs`, { blockHash, globalVariables: block.header.globalVariables.toInspect(), txHashes, @@ -727,14 +640,12 @@ export class Sequencer { await this.publishL2Block(block, attestations, txHashes, proofQuote); this.metrics.recordPublishedBlock(workDuration); this.log.info( - `Published rollup block ${ - block.number - } with ${numProcessedTxs} transactions and ${numMsgs} messages in ${Math.ceil(workDuration)}ms`, + `Published block ${block.number} with ${numTxs} txs and ${numMsgs} messages in ${Math.ceil(workDuration)}ms`, { blockNumber: block.number, blockHash: blockHash, slot, - txCount: numProcessedTxs, + txCount: numTxs, msgCount: numMsgs, duration: Math.ceil(workDuration), submitter: this.publisher.getSenderAddress().toString(), diff --git a/yarn-project/sequencer-client/src/sequencer/utils.ts b/yarn-project/sequencer-client/src/sequencer/utils.ts index 5939b43c353c..af90e2f45ddd 100644 --- a/yarn-project/sequencer-client/src/sequencer/utils.ts +++ b/yarn-project/sequencer-client/src/sequencer/utils.ts @@ -19,9 +19,9 @@ export enum SequencerState { */ PROPOSER_CHECK = 'PROPOSER_CHECK', /** - * Polling the P2P module for txs to include in a block. Will move to CREATING_BLOCK if there are valid txs to include, or back to SYNCHRONIZING otherwise. + * Initializing the block proposal. Will move to CREATING_BLOCK if there are valid txs to include, or back to SYNCHRONIZING otherwise. */ - WAITING_FOR_TXS = 'WAITING_FOR_TXS', + INITIALIZING_PROPOSAL = 'INITIALIZING_PROPOSAL', /** * Creating a new L2 block. Includes processing public function calls and running rollup circuits. Will move to PUBLISHING_CONTRACT_DATA. */ diff --git a/yarn-project/sequencer-client/src/tx_validator/gas_validator.ts b/yarn-project/sequencer-client/src/tx_validator/gas_validator.ts index 8b4167543c9e..dc3cb1d2fb85 100644 --- a/yarn-project/sequencer-client/src/tx_validator/gas_validator.ts +++ b/yarn-project/sequencer-client/src/tx_validator/gas_validator.ts @@ -1,4 +1,4 @@ -import { type Tx, TxExecutionPhase, type TxValidator } from '@aztec/circuit-types'; +import { type Tx, TxExecutionPhase, type TxValidationResult, type TxValidator } from '@aztec/circuit-types'; import { type AztecAddress, type Fr, FunctionSelector, type GasFees } from '@aztec/circuits.js'; import { createLogger } from '@aztec/foundation/log'; import { computeFeePayerBalanceStorageSlot, getExecutionRequestsByPhase } from '@aztec/simulator'; @@ -45,7 +45,10 @@ export class GasTxValidator implements TxValidator { return [validTxs, invalidTxs, skippedTxs]; } - validateTx(tx: Tx): Promise { + validateTx(tx: Tx): Promise { + if (this.#shouldSkip(tx)) { + return Promise.resolve({ result: 'skipped', reason: ['Insufficient fee per gas'] }); + } return this.#validateTxFee(tx); } @@ -57,20 +60,22 @@ export class GasTxValidator implements TxValidator { const notEnoughMaxFees = maxFeesPerGas.feePerDaGas.lt(this.#gasFees.feePerDaGas) || maxFeesPerGas.feePerL2Gas.lt(this.#gasFees.feePerL2Gas); + if (notEnoughMaxFees) { this.#log.warn(`Skipping transaction ${tx.getTxHash()} due to insufficient fee per gas`); } return notEnoughMaxFees; } - async #validateTxFee(tx: Tx): Promise { + async #validateTxFee(tx: Tx): Promise { const feePayer = tx.data.feePayer; // TODO(@spalladino) Eventually remove the is_zero condition as we should always charge fees to every tx if (feePayer.isZero()) { if (this.#enforceFees) { this.#log.warn(`Rejecting transaction ${tx.getTxHash()} due to missing fee payer`); + return { result: 'invalid', reason: ['Missing fee payer'] }; } else { - return true; + return { result: 'valid' }; } } @@ -98,13 +103,13 @@ export class GasTxValidator implements TxValidator { const balance = claimFunctionCall ? initialBalance.add(claimFunctionCall.args[2]) : initialBalance; if (balance.lt(feeLimit)) { - this.#log.info(`Rejecting transaction due to not enough fee payer balance`, { + this.#log.warn(`Rejecting transaction due to not enough fee payer balance`, { feePayer, balance: balance.toBigInt(), feeLimit: feeLimit.toBigInt(), }); - return false; + return { result: 'invalid', reason: ['Insufficient fee payer balance'] }; } - return true; + return { result: 'valid' }; } } diff --git a/yarn-project/sequencer-client/src/tx_validator/tx_validator_factory.ts b/yarn-project/sequencer-client/src/tx_validator/tx_validator_factory.ts index 59b6baab1cf6..8769eb2e312a 100644 --- a/yarn-project/sequencer-client/src/tx_validator/tx_validator_factory.ts +++ b/yarn-project/sequencer-client/src/tx_validator/tx_validator_factory.ts @@ -44,10 +44,10 @@ export class TxValidatorFactory { validatorForNewTxs(globalVariables: GlobalVariables, setupAllowList: AllowedElement[]): TxValidator { return new AggregateTxValidator( - new DataTxValidator(), + new DataTxValidator(), // remove new MetadataTxValidator(globalVariables.chainId, globalVariables.blockNumber), new DoubleSpendTxValidator(this.nullifierSource), - new PhasesTxValidator(this.contractDataSource, setupAllowList), + new PhasesTxValidator(this.contractDataSource, setupAllowList), // what does this do? new GasTxValidator( this.publicStateSource, ProtocolContractAddress.FeeJuice, diff --git a/yarn-project/simulator/src/public/public_processor.test.ts b/yarn-project/simulator/src/public/public_processor.test.ts index d20435862881..340a601986ed 100644 --- a/yarn-project/simulator/src/public/public_processor.test.ts +++ b/yarn-project/simulator/src/public/public_processor.test.ts @@ -4,6 +4,7 @@ import { ProvingRequestType, SimulationError, type TreeInfo, + type Tx, type TxValidator, mockTx, } from '@aztec/circuit-types'; @@ -95,7 +96,7 @@ describe('public_processor', () => { it('process private-only txs', async function () { const tx = mockPrivateOnlyTx(); - const [processed, failed] = await processor.process([tx], 1); + const [processed, failed] = await processor.process([tx]); expect(processed.length).toBe(1); expect(processed[0].hash).toEqual(tx.getTxHash()); @@ -106,7 +107,7 @@ describe('public_processor', () => { it('runs a tx with enqueued public calls', async function () { const tx = mockTxWithPublicCalls(); - const [processed, failed] = await processor.process([tx], 1); + const [processed, failed] = await processor.process([tx]); expect(processed.length).toBe(1); expect(processed[0].hash).toEqual(tx.getTxHash()); @@ -122,7 +123,7 @@ describe('public_processor', () => { mockedEnqueuedCallsResult.revertCode = RevertCode.APP_LOGIC_REVERTED; mockedEnqueuedCallsResult.revertReason = new SimulationError(`Failed`, []); - const [processed, failed] = await processor.process([tx], 1); + const [processed, failed] = await processor.process([tx]); expect(processed.length).toBe(1); expect(processed[0].hash).toEqual(tx.getTxHash()); @@ -135,7 +136,7 @@ describe('public_processor', () => { publicTxSimulator.simulate.mockRejectedValue(new SimulationError(`Failed`, [])); const tx = mockTxWithPublicCalls(); - const [processed, failed] = await processor.process([tx], 1); + const [processed, failed] = await processor.process([tx]); expect(processed).toEqual([]); expect(failed.length).toBe(1); @@ -149,7 +150,7 @@ describe('public_processor', () => { const txs = Array.from([1, 2, 3], seed => mockPrivateOnlyTx({ seed })); // We are passing 3 txs but only 2 can fit in the block - const [processed, failed] = await processor.process(txs, 2); + const [processed, failed] = await processor.process(txs, { maxTransactions: 2 }); expect(processed.length).toBe(2); expect(processed[0].hash).toEqual(txs[0].getTxHash()); @@ -159,13 +160,25 @@ describe('public_processor', () => { expect(worldStateDB.commit).toHaveBeenCalledTimes(2); }); - it('does not send a transaction to the prover if validation fails', async function () { + it('does not send a transaction to the prover if pre validation fails', async function () { + const tx = mockPrivateOnlyTx(); + + const txValidator: MockProxy> = mock(); + txValidator.validateTx.mockResolvedValue({ result: 'invalid', reason: ['Invalid'] }); + + const [processed, failed] = await processor.process([tx], {}, { preProcess: txValidator }); + + expect(processed).toEqual([]); + expect(failed.length).toBe(1); + }); + + it('does not send a transaction to the prover if post validation fails', async function () { const tx = mockPrivateOnlyTx(); const txValidator: MockProxy> = mock(); - txValidator.validateTxs.mockRejectedValue([[], [tx]]); + txValidator.validateTx.mockResolvedValue({ result: 'invalid', reason: ['Invalid'] }); - const [processed, failed] = await processor.process([tx], 1, txValidator); + const [processed, failed] = await processor.process([tx], {}, { postProcess: txValidator }); expect(processed).toEqual([]); expect(failed.length).toBe(1); @@ -183,7 +196,7 @@ describe('public_processor', () => { // We allocate a deadline of 1s, so only one 2 txs should fit const deadline = new Date(Date.now() + 1000); - const [processed, failed] = await processor.process(txs, 3, undefined, deadline); + const [processed, failed] = await processor.process(txs, { deadline }); expect(processed.length).toBe(2); expect(processed[0].hash).toEqual(txs[0].getTxHash()); @@ -215,7 +228,7 @@ describe('public_processor', () => { const txFee = privateGasUsed.computeFee(globalVariables.gasFees); - const [processed, failed] = await processor.process([tx], 1); + const [processed, failed] = await processor.process([tx]); expect(processed).toHaveLength(1); expect(processed[0].data.feePayer).toEqual(feePayer); @@ -239,7 +252,7 @@ describe('public_processor', () => { } tx.data.gasUsed = privateGasUsed; - const [processed, failed] = await processor.process([tx], 1); + const [processed, failed] = await processor.process([tx]); expect(processed).toEqual([]); expect(failed).toHaveLength(1); diff --git a/yarn-project/simulator/src/public/public_processor.ts b/yarn-project/simulator/src/public/public_processor.ts index 5a5fc91d0e07..a09af00c457d 100644 --- a/yarn-project/simulator/src/public/public_processor.ts +++ b/yarn-project/simulator/src/public/public_processor.ts @@ -136,29 +136,99 @@ export class PublicProcessor implements Traceable { * @returns The list of processed txs with their circuit simulation outputs. */ public async process( - txs: Tx[], - maxTransactions = txs.length, - txValidator?: TxValidator, - deadline?: Date, + txs: Iterable, + limits: { + maxTransactions?: number; + maxBlockSize?: number; + maxBlockGas?: Gas; + deadline?: Date; + } = {}, + validators: { preProcess?: TxValidator; postProcess?: TxValidator } = {}, ): Promise<[ProcessedTx[], FailedTx[], NestedProcessReturnValues[]]> { - // The processor modifies the tx objects in place, so we need to clone them. - txs = txs.map(tx => Tx.clone(tx)); + const { maxTransactions, maxBlockSize, deadline, maxBlockGas } = limits; + const { preProcess: preProcessTxValidator, postProcess: postProcessTxValidator } = validators; const result: ProcessedTx[] = []; const failed: FailedTx[] = []; - let returns: NestedProcessReturnValues[] = []; - let totalGas = new Gas(0, 0); const timer = new Timer(); - for (const tx of txs) { - // only process up to the limit of the block - if (result.length >= maxTransactions) { + let totalSizeInBytes = 0; + let returns: NestedProcessReturnValues[] = []; + let totalPublicGas = new Gas(0, 0); + let totalBlockGas = new Gas(0, 0); + + for (const origTx of txs) { + // Only process up to the max tx limit + if (maxTransactions !== undefined && result.length >= maxTransactions) { + this.log.debug(`Stopping tx processing due to reaching the max tx limit.`); + break; + } + + // Bail if we've hit the deadline + if (deadline && this.dateProvider.now() > +deadline) { + this.log.warn(`Stopping tx processing due to timeout.`); break; } + + // Skip this tx if it'd exceed max block size + const txHash = origTx.getTxHash().toString(); + const preTxSizeInBytes = origTx.getEstimatedPrivateTxEffectsSize(); + if (maxBlockSize !== undefined && totalSizeInBytes + preTxSizeInBytes > maxBlockSize) { + this.log.warn(`Skipping processing of tx ${txHash} sized ${preTxSizeInBytes} bytes due to block size limit`, { + txHash, + sizeInBytes: preTxSizeInBytes, + totalSizeInBytes, + maxBlockSize, + }); + continue; + } + + // Skip this tx if its gas limit would exceed the block gas limit + const txGasLimit = origTx.data.constants.txContext.gasSettings.gasLimits; + if (maxBlockGas !== undefined && totalBlockGas.add(txGasLimit).gtAny(maxBlockGas)) { + this.log.warn(`Skipping processing of tx ${txHash} due to block gas limit`, { + txHash, + txGasLimit, + totalBlockGas, + maxBlockGas, + }); + continue; + } + + // The processor modifies the tx objects in place, so we need to clone them. + const tx = Tx.clone(origTx); + + // We validate the tx before processing it, to avoid unnecessary work. + if (preProcessTxValidator && !preProcessTxValidator.validateTx(tx)) { + this.log.warn(`Rejecting tx ${tx.getTxHash().toString()} due to pre-process validation failure`); + // REFACTOR: Validators should return a reason, not just a boolean. + failed.push({ tx, error: new Error('Tx failed preprocess validation') }); + returns.push(new NestedProcessReturnValues([])); + continue; + } + try { - const [processedTx, returnValues] = await this.processTx(tx, txValidator, deadline); + const [processedTx, returnValues] = await this.processTx(tx, deadline); + + // If the actual size of this tx would exceed block size, skip it + const txSize = processedTx.txEffect.getDASize(); + if (maxBlockSize !== undefined && totalSizeInBytes + txSize > maxBlockSize) { + this.log.warn(`Skipping processed tx ${txHash} sized ${txSize} due to max block size.`, { + txHash, + sizeInBytes: txSize, + totalSizeInBytes, + maxBlockSize, + }); + continue; + } + + // Otherwise, commit tx state for the next tx to be processed + await this.commitTxState(processedTx, postProcessTxValidator); result.push(processedTx); returns = returns.concat(returnValues); - totalGas = totalGas.add(processedTx.gasUsed.publicGas); + + totalPublicGas = totalPublicGas.add(processedTx.gasUsed.publicGas); + totalBlockGas = totalBlockGas.add(processedTx.gasUsed.totalGas); + totalSizeInBytes += txSize; } catch (err: any) { if (err?.name === 'PublicProcessorTimeoutError') { this.log.warn(`Stopping tx processing due to timeout.`); @@ -173,18 +243,22 @@ export class PublicProcessor implements Traceable { } const duration = timer.s(); - const rate = duration > 0 ? totalGas.l2Gas / duration : 0; - this.metrics.recordAllTxs(totalGas, rate); + const rate = duration > 0 ? totalPublicGas.l2Gas / duration : 0; + this.metrics.recordAllTxs(totalPublicGas, rate); + + this.log.info(`Processed ${result.length} succesful txs and ${failed.length} txs in ${duration}ms`, { + duration, + rate, + totalPublicGas, + totalBlockGas, + totalSizeInBytes, + }); return [result, failed, returns]; } @trackSpan('PublicProcessor.processTx', tx => ({ [Attributes.TX_HASH]: tx.tryGetTxHash()?.toString() })) - private async processTx( - tx: Tx, - txValidator?: TxValidator, - deadline?: Date, - ): Promise<[ProcessedTx, NestedProcessReturnValues[]]> { + private async processTx(tx: Tx, deadline?: Date): Promise<[ProcessedTx, NestedProcessReturnValues[]]> { const [time, [processedTx, returnValues]] = await elapsed(() => this.processTxWithinDeadline(tx, deadline)); this.log.verbose( @@ -208,7 +282,12 @@ export class PublicProcessor implements Traceable { }, ); + return [processedTx, returnValues ?? []]; + } + + private async commitTxState(processedTx: ProcessedTx, txValidator?: TxValidator): Promise { // Commit the state updates from this transaction + // TODO(palla/txs): Shouldn't we be doing this after validation? Either way, it seems like this doesn't do anything. await this.worldStateDB.commit(); // Re-validate the transaction @@ -217,9 +296,11 @@ export class PublicProcessor implements Traceable { // public functions emitting nullifiers would pass earlier check but fail here. // Note that we're checking all nullifiers generated in the private execution twice, // we could store the ones already checked and skip them here as an optimization. - const [_, invalid] = await txValidator.validateTxs([processedTx]); - if (invalid.length) { - throw new Error(`Transaction ${invalid[0].hash} invalid after processing public functions`); + const result = await txValidator.validateTx(processedTx); + if (result.result !== 'valid') { + throw new Error( + `Transaction ${processedTx.hash} invalid after processing public functions: ${result.reason.join(', ')}.`, + ); } } // Update the state so that the next tx in the loop has the correct .startState @@ -255,8 +336,6 @@ export class PublicProcessor implements Traceable { ); const treeInsertionEnd = process.hrtime.bigint(); this.metrics.recordTreeInsertions(Number(treeInsertionEnd - treeInsertionStart) / 1_000); - - return [processedTx, returnValues ?? []]; } /** Processes the given tx within deadline. Returns timeout if deadline is hit. */ diff --git a/yarn-project/validator-client/src/validator.ts b/yarn-project/validator-client/src/validator.ts index b7874b104cea..09c19eac82b5 100644 --- a/yarn-project/validator-client/src/validator.ts +++ b/yarn-project/validator-client/src/validator.ts @@ -1,11 +1,4 @@ -import { - type BlockAttestation, - type BlockProposal, - type L2Block, - type ProcessedTx, - type Tx, - type TxHash, -} from '@aztec/circuit-types'; +import { type BlockAttestation, type BlockProposal, type L2Block, type Tx, type TxHash } from '@aztec/circuit-types'; import { type BlockHeader, type GlobalVariables } from '@aztec/circuits.js'; import { type EpochCache } from '@aztec/epoch-cache'; import { Buffer32 } from '@aztec/foundation/buffer'; @@ -38,12 +31,11 @@ import { ValidatorMetrics } from './metrics.js'; * We reuse the sequencer's block building functionality for re-execution */ type BlockBuilderCallback = ( - txs: Tx[], + txs: Iterable, globalVariables: GlobalVariables, historicalHeader?: BlockHeader, - interrupt?: (processedTxs: ProcessedTx[]) => Promise, opts?: { validateOnly?: boolean }, -) => Promise<{ block: L2Block; publicProcessorDuration: number; numProcessedTxs: number; blockBuildingTimer: Timer }>; +) => Promise<{ block: L2Block; publicProcessorDuration: number; numTxs: number; blockBuildingTimer: Timer }>; export interface Validator { start(): Promise; @@ -243,9 +235,7 @@ export class ValidatorClient extends WithTracer implements Validator { // Use the sequencer's block building logic to re-execute the transactions const stopTimer = this.metrics.reExecutionTimer(); - const { block } = await this.blockBuilder(txs, header.globalVariables, undefined, undefined, { - validateOnly: true, - }); + const { block } = await this.blockBuilder(txs, header.globalVariables, undefined, { validateOnly: true }); stopTimer(); this.log.verbose(`Transaction re-execution complete`);