From c302af5e1e729d2c505290a043fe687486fcfae3 Mon Sep 17 00:00:00 2001 From: Nate Beauregard Date: Mon, 13 Jan 2025 09:36:36 -0500 Subject: [PATCH 1/4] feat: archive public testnet tx data --- yarn-project/foundation/src/config/env_var.ts | 1 + yarn-project/p2p/src/client/p2p_client.ts | 16 ++++ yarn-project/p2p/src/config.ts | 9 ++ .../tx_pool/aztec_kv_tx_pool.test.ts | 21 +++++ .../src/mem_pools/tx_pool/aztec_kv_tx_pool.ts | 85 ++++++++++++++++++- .../src/mem_pools/tx_pool/memory_tx_pool.ts | 4 + .../p2p/src/mem_pools/tx_pool/tx_pool.ts | 7 ++ 7 files changed, 140 insertions(+), 3 deletions(-) diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index 1862daa6bea..b4d79ec20a3 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -100,6 +100,7 @@ export type EnvVar = | 'P2P_TX_PROTOCOL' | 'P2P_UDP_ANNOUNCE_ADDR' | 'P2P_UDP_LISTEN_ADDR' + | 'P2P_ARCHIVED_TX_LIMIT' | 'PEER_ID_PRIVATE_KEY' | 'PROVER_BLOB_SINK_URL' | 'PROOF_VERIFIER_L1_START_BLOCK' diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index 44010f75b07..de8b85cb2b8 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -135,6 +135,13 @@ export type P2P = P2PApi & { */ getTxByHash(txHash: TxHash): Promise; + /** + * Returns an archived transaction from the transaction pool by its hash. + * @param txHash - Hash of tx to return. + * @returns A single tx or undefined. + */ + getArchivedTxByHash(txHash: TxHash): Promise; + /** * Returns whether the given tx hash is flagged as pending or mined. * @param txHash - Hash of the tx to query. @@ -523,6 +530,15 @@ export class P2PClient return this.requestTxByHash(txHash); } + /** + * Returns an archived transaction in the transaction pool by its hash. + * @param txHash - Hash of the archived transaction to look for. + * @returns A single tx or undefined. + */ + getArchivedTxByHash(txHash: TxHash): Promise { + return Promise.resolve(this.txPool.getArchivedTxByHash(txHash)); + } + /** * Verifies the 'tx' and, if valid, adds it to local tx pool and forwards it to other peers. * @param tx - The tx to verify. diff --git a/yarn-project/p2p/src/config.ts b/yarn-project/p2p/src/config.ts index 5f5ea19258a..1835dc946a2 100644 --- a/yarn-project/p2p/src/config.ts +++ b/yarn-project/p2p/src/config.ts @@ -154,6 +154,9 @@ export interface P2PConfig extends P2PReqRespConfig { * The chain id of the L1 chain. */ l1ChainId: number; + + /** Limit of transactions to archive in the tx pool. Once the archived tx limit is reached, the oldest archived txs will be purged. */ + archivedTxLimit: number; } export const p2pConfigMappings: ConfigMappingsType = { @@ -305,6 +308,12 @@ export const p2pConfigMappings: ConfigMappingsType = { description: 'The number of blocks to fetch in a single batch.', ...numberConfigHelper(20), }, + archivedTxLimit: { + env: 'P2P_ARCHIVED_TX_LIMIT', + description: + 'The number of transactions that will be archived. If the limit is set to 0 then archiving will be disabled.', + ...numberConfigHelper(0), + }, ...p2pReqRespConfigMappings, }; diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts index bc9f91329e3..37a9cad9871 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts @@ -1,3 +1,4 @@ +import { mockTx } from '@aztec/circuit-types'; import { openTmpStore } from '@aztec/kv-store/lmdb'; import { AztecKVTxPool } from './aztec_kv_tx_pool.js'; @@ -10,4 +11,24 @@ describe('KV TX pool', () => { }); describeTxPool(() => txPool); + + it('Returns archived txs and purges archived txs once the archived tx limit is reached', async () => { + // set the archived tx limit to 2 + txPool = new AztecKVTxPool(openTmpStore(), new NoopTelemetryClient(), 2); + + const tx1 = mockTx(1); + const tx2 = mockTx(2); + const tx3 = mockTx(3); + + // add two txs and assert that they are properly archived + await txPool.addTxs([tx1, tx2]); + expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toEqual(tx1.getTxHash()); + expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2.getTxHash()); + + // add another tx and assert that the first tx is purged and the new tx is archived + await txPool.addTxs([tx3]); + expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toBeUndefined(); + expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2.getTxHash()); + expect(txPool.getArchivedTxByHash(tx3.getTxHash())).toEqual(tx3.getTxHash()); + }); }); diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts index da6ca6d526b..c7ff16e8383 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts @@ -1,15 +1,23 @@ import { Tx, TxHash } from '@aztec/circuit-types'; import { type TxAddedToPoolStats } from '@aztec/circuit-types/stats'; +import { ClientIvcProof } from '@aztec/circuits.js'; import { type Logger, createLogger } from '@aztec/foundation/log'; -import { type AztecKVStore, type AztecMap, type AztecMultiMap } from '@aztec/kv-store'; +import { + type AztecKVStore, + type AztecMap, + type AztecMapWithSize, + type AztecMultiMap, + type AztecSingleton, +} from '@aztec/kv-store'; import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client'; +import { getP2PConfigFromEnv } from '../../config.js'; import { PoolInstrumentation, PoolName } from '../instrumentation.js'; import { getPendingTxPriority } from './priority.js'; import { type TxPool } from './tx_pool.js'; /** - * In-memory implementation of the Transaction Pool. + * KV implementation of the Transaction Pool. */ export class AztecKVTxPool implements TxPool { #store: AztecKVStore; @@ -23,24 +31,46 @@ export class AztecKVTxPool implements TxPool { /** Index from tx priority (stored as hex) to its tx hash, filtered by pending txs. */ #pendingTxPriorityToHash: AztecMultiMap; + /** Archived txs map for future lookup. */ + #archivedTxs: AztecMapWithSize; + + /** Indexes of the archived txs by insertion order. */ + #archivedTxIndices: AztecMap; + + /** Index of the most recently inserted archived tx. */ + #archivedTxHead: AztecSingleton; + + /** Index of the oldest archived tx. */ + #archivedTxTail: AztecSingleton; + + /** Number of txs to archive. */ + #archivedTxLimit: number; + #log: Logger; #metrics: PoolInstrumentation; /** - * Class constructor for in-memory TxPool. Initiates our transaction pool as a JS Map. + * Class constructor for KV TxPool. Initiates our transaction pool as an AztecMap. * @param store - A KV store. * @param log - A logger. */ constructor( store: AztecKVStore, telemetry: TelemetryClient = getTelemetryClient(), + archivedTxLimit = getP2PConfigFromEnv().archivedTxLimit, log = createLogger('p2p:tx_pool'), ) { this.#txs = store.openMap('txs'); this.#minedTxHashToBlock = store.openMap('txHashToBlockMined'); this.#pendingTxPriorityToHash = store.openMultiMap('pendingTxFeeToHash'); + this.#archivedTxs = store.openMapWithSize('archivedTxs'); + this.#archivedTxIndices = store.openMap('archivedTxIndicies'); + this.#archivedTxHead = store.openSingleton('archivedTxHead'); + this.#archivedTxTail = store.openSingleton('archivedTxTail'); + this.#archivedTxLimit = archivedTxLimit; + this.#store = store; this.#log = log; this.#metrics = new PoolInstrumentation(telemetry, PoolName.TX_POOL, () => store.estimateSize()); @@ -129,6 +159,21 @@ export class AztecKVTxPool implements TxPool { return undefined; } + /** + * Checks if an archived tx exists and returns it. + * @param txHash - The tx hash. + * @returns The transaction metadata, if found, 'undefined' otherwise. + */ + public getArchivedTxByHash(txHash: TxHash): Tx | undefined { + const buffer = this.#archivedTxs.get(txHash.toString()); + if (buffer) { + const tx = Tx.fromBuffer(buffer); + tx.setTxHash(txHash); + return tx; + } + return undefined; + } + /** * Adds a list of transactions to the pool. Duplicates are ignored. * @param txs - An array of txs to be added to the pool. @@ -153,6 +198,10 @@ export class AztecKVTxPool implements TxPool { void this.#pendingTxPriorityToHash.set(getPendingTxPriority(tx), key); this.#metrics.recordSize(tx); } + + if (this.#archivedTxLimit) { + void this.archiveTx(tx); + } } this.#metrics.recordAddedObjects(pendingCount, 'pending'); @@ -213,4 +262,34 @@ export class AztecKVTxPool implements TxPool { public getAllTxHashes(): TxHash[] { return Array.from(this.#txs.keys()).map(x => TxHash.fromString(x)); } + + /** + * Archive a tx for future reference. + * @param tx - The transaction to archive. + */ + private archiveTx(tx: Tx) { + while (this.#archivedTxs.size() >= this.#archivedTxLimit) { + const tailIdx = this.#archivedTxTail.get() ?? 0; + const txHash = this.#archivedTxIndices.get(tailIdx); + if (txHash) { + void this.#archivedTxs.delete(txHash); + void this.#archivedTxIndices.delete(tailIdx); + } + void this.#archivedTxTail.set(tailIdx + 1); + } + + const archivedTx: Tx = new Tx( + tx.data, + ClientIvcProof.empty(), + tx.unencryptedLogs, + tx.contractClassLogs, + tx.enqueuedPublicFunctionCalls, + tx.publicTeardownFunctionCall, + ); + const txHash = tx.getTxHash().toString(); + void this.#archivedTxs.set(txHash, archivedTx.toBuffer()); + const headIdx = this.#archivedTxHead.get() ?? 0; + void this.#archivedTxIndices.set(headIdx, txHash); + void this.#archivedTxHead.set(headIdx + 1); + } } diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts index 7d0d87df675..2fc985a6a1f 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts @@ -100,6 +100,10 @@ export class InMemoryTxPool implements TxPool { return result === undefined ? undefined : Tx.clone(result); } + public getArchivedTxByHash(): Tx | undefined { + return undefined; + } + /** * Adds a list of transactions to the pool. Duplicates are ignored. * @param txs - An array of txs to be added to the pool. diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts index 173565c8293..6ad69b3c7de 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts @@ -17,6 +17,13 @@ export interface TxPool { */ getTxByHash(txHash: TxHash): Tx | undefined; + /** + * Checks if an archived transaction exists in the pool and returns it. + * @param txHash - The hash of the transaction, used as an ID. + * @returns The transaction, if found, 'undefined' otherwise. + */ + getArchivedTxByHash(txHash: TxHash): Tx | undefined; + /** * Marks the set of txs as mined, as opposed to pending. * @param txHashes - Hashes of the txs to flag as mined. From 8f57a30e1b29aca554c334d79bef3b0d7753c48d Mon Sep 17 00:00:00 2001 From: Nate Beauregard Date: Tue, 14 Jan 2025 22:58:05 -0500 Subject: [PATCH 2/4] Use a separate db for the tx archive and address CR comments --- yarn-project/p2p/src/client/factory.ts | 3 +- .../tx_pool/aztec_kv_tx_pool.test.ts | 21 ++-- .../src/mem_pools/tx_pool/aztec_kv_tx_pool.ts | 97 +++++++++++-------- 3 files changed, 67 insertions(+), 54 deletions(-) diff --git a/yarn-project/p2p/src/client/factory.ts b/yarn-project/p2p/src/client/factory.ts index e29c4b95206..954b584a91e 100644 --- a/yarn-project/p2p/src/client/factory.ts +++ b/yarn-project/p2p/src/client/factory.ts @@ -44,9 +44,10 @@ export const createP2PClient = async ( let config = { ..._config }; const logger = createLogger('p2p'); const store = deps.store ?? (await createStore('p2p', config, createLogger('p2p:lmdb'))); + const archive = await createStore('p2p-archive', config, createLogger('p2p-archive:lmdb')); const mempools: MemPools = { - txPool: deps.txPool ?? new AztecKVTxPool(store, telemetry), + txPool: deps.txPool ?? new AztecKVTxPool(store, archive, telemetry, config.archivedTxLimit), epochProofQuotePool: deps.epochProofQuotePool ?? new MemoryEpochProofQuotePool(telemetry), attestationPool: clientType === P2PClientType.Full diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts index 37a9cad9871..ef1d3f609e8 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts @@ -7,28 +7,29 @@ import { describeTxPool } from './tx_pool_test_suite.js'; describe('KV TX pool', () => { let txPool: AztecKVTxPool; beforeEach(() => { - txPool = new AztecKVTxPool(openTmpStore()); + txPool = new AztecKVTxPool(openTmpStore(), openTmpStore()); }); describeTxPool(() => txPool); it('Returns archived txs and purges archived txs once the archived tx limit is reached', async () => { // set the archived tx limit to 2 - txPool = new AztecKVTxPool(openTmpStore(), new NoopTelemetryClient(), 2); + txPool = new AztecKVTxPool(openTmpStore(), openTmpStore(), undefined, 2); const tx1 = mockTx(1); const tx2 = mockTx(2); const tx3 = mockTx(3); + await txPool.addTxs([tx1, tx2, tx3]); - // add two txs and assert that they are properly archived - await txPool.addTxs([tx1, tx2]); - expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toEqual(tx1.getTxHash()); - expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2.getTxHash()); + // delete two txs and assert that they are properly archived + await txPool.deleteTxs([tx1.getTxHash(), tx2.getTxHash()]); + expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toEqual(tx1); + expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2); - // add another tx and assert that the first tx is purged and the new tx is archived - await txPool.addTxs([tx3]); + // delete another tx and assert that the first tx is purged and the new tx is archived + await txPool.deleteTxs([tx3.getTxHash()]); expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toBeUndefined(); - expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2.getTxHash()); - expect(txPool.getArchivedTxByHash(tx3.getTxHash())).toEqual(tx3.getTxHash()); + expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2); + expect(txPool.getArchivedTxByHash(tx3.getTxHash())).toEqual(tx3); }); }); diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts index c7ff16e8383..446a86a3921 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts @@ -2,16 +2,9 @@ import { Tx, TxHash } from '@aztec/circuit-types'; import { type TxAddedToPoolStats } from '@aztec/circuit-types/stats'; import { ClientIvcProof } from '@aztec/circuits.js'; import { type Logger, createLogger } from '@aztec/foundation/log'; -import { - type AztecKVStore, - type AztecMap, - type AztecMapWithSize, - type AztecMultiMap, - type AztecSingleton, -} from '@aztec/kv-store'; +import { type AztecKVStore, type AztecMap, type AztecMultiMap, type AztecSingleton } from '@aztec/kv-store'; import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client'; -import { getP2PConfigFromEnv } from '../../config.js'; import { PoolInstrumentation, PoolName } from '../instrumentation.js'; import { getPendingTxPriority } from './priority.js'; import { type TxPool } from './tx_pool.js'; @@ -31,8 +24,11 @@ export class AztecKVTxPool implements TxPool { /** Index from tx priority (stored as hex) to its tx hash, filtered by pending txs. */ #pendingTxPriorityToHash: AztecMultiMap; + /** KV store for archived txs. */ + #archive: AztecKVStore; + /** Archived txs map for future lookup. */ - #archivedTxs: AztecMapWithSize; + #archivedTxs: AztecMap; /** Indexes of the archived txs by insertion order. */ #archivedTxIndices: AztecMap; @@ -52,26 +48,30 @@ export class AztecKVTxPool implements TxPool { /** * Class constructor for KV TxPool. Initiates our transaction pool as an AztecMap. - * @param store - A KV store. + * @param store - A KV store for live txs in the pool. + * @param archive - A KV store for archived txs. + * @param telemetry - A telemetry client. * @param log - A logger. */ constructor( store: AztecKVStore, + archive: AztecKVStore, telemetry: TelemetryClient = getTelemetryClient(), - archivedTxLimit = getP2PConfigFromEnv().archivedTxLimit, + archivedTxLimit: number = 0, log = createLogger('p2p:tx_pool'), ) { this.#txs = store.openMap('txs'); this.#minedTxHashToBlock = store.openMap('txHashToBlockMined'); this.#pendingTxPriorityToHash = store.openMultiMap('pendingTxFeeToHash'); - this.#archivedTxs = store.openMapWithSize('archivedTxs'); - this.#archivedTxIndices = store.openMap('archivedTxIndicies'); - this.#archivedTxHead = store.openSingleton('archivedTxHead'); - this.#archivedTxTail = store.openSingleton('archivedTxTail'); + this.#archivedTxs = archive.openMap('archivedTxs'); + this.#archivedTxIndices = archive.openMap('archivedTxIndices'); + this.#archivedTxHead = archive.openSingleton('archivedTxHead'); + this.#archivedTxTail = archive.openSingleton('archivedTxTail'); this.#archivedTxLimit = archivedTxLimit; this.#store = store; + this.#archive = archive; this.#log = log; this.#metrics = new PoolInstrumentation(telemetry, PoolName.TX_POOL, () => store.estimateSize()); } @@ -198,10 +198,6 @@ export class AztecKVTxPool implements TxPool { void this.#pendingTxPriorityToHash.set(getPendingTxPriority(tx), key); this.#metrics.recordSize(tx); } - - if (this.#archivedTxLimit) { - void this.archiveTx(tx); - } } this.#metrics.recordAddedObjects(pendingCount, 'pending'); @@ -217,7 +213,8 @@ export class AztecKVTxPool implements TxPool { let pendingDeleted = 0; let minedDeleted = 0; - return this.#store.transaction(() => { + const archiveTxs: Promise[] = []; + const poolTxs = this.#store.transaction(() => { for (const hash of txHashes) { const key = hash.toString(); const tx = this.getTxByHash(hash); @@ -233,6 +230,10 @@ export class AztecKVTxPool implements TxPool { pendingDeleted++; } + if (this.#archivedTxLimit) { + archiveTxs.push(this.archiveTx(tx)); + } + void this.#txs.delete(key); void this.#minedTxHashToBlock.delete(key); } @@ -241,6 +242,13 @@ export class AztecKVTxPool implements TxPool { this.#metrics.recordRemovedObjects(pendingDeleted, 'pending'); this.#metrics.recordRemovedObjects(minedDeleted, 'mined'); }); + + return poolTxs.then(() => + archiveTxs.reduce( + (archiveTx, remainingArchiveTxs) => archiveTx.then(() => remainingArchiveTxs), + Promise.resolve(), + ), + ); } /** @@ -264,32 +272,35 @@ export class AztecKVTxPool implements TxPool { } /** - * Archive a tx for future reference. + * Archives a tx for future reference. The number of archived txs is limited by the specified archivedTxLimit. * @param tx - The transaction to archive. */ - private archiveTx(tx: Tx) { - while (this.#archivedTxs.size() >= this.#archivedTxLimit) { - const tailIdx = this.#archivedTxTail.get() ?? 0; - const txHash = this.#archivedTxIndices.get(tailIdx); - if (txHash) { - void this.#archivedTxs.delete(txHash); - void this.#archivedTxIndices.delete(tailIdx); + private archiveTx(tx: Tx): Promise { + return this.#archive.transaction(() => { + let headIdx = this.#archivedTxHead.get() ?? 0; + let tailIdx = this.#archivedTxTail.get() ?? 0; + + while (headIdx - tailIdx >= this.#archivedTxLimit) { + const txHash = this.#archivedTxIndices.get(tailIdx); + if (txHash) { + void this.#archivedTxs.delete(txHash); + void this.#archivedTxIndices.delete(tailIdx); + } + void this.#archivedTxTail.set(++tailIdx); } - void this.#archivedTxTail.set(tailIdx + 1); - } - const archivedTx: Tx = new Tx( - tx.data, - ClientIvcProof.empty(), - tx.unencryptedLogs, - tx.contractClassLogs, - tx.enqueuedPublicFunctionCalls, - tx.publicTeardownFunctionCall, - ); - const txHash = tx.getTxHash().toString(); - void this.#archivedTxs.set(txHash, archivedTx.toBuffer()); - const headIdx = this.#archivedTxHead.get() ?? 0; - void this.#archivedTxIndices.set(headIdx, txHash); - void this.#archivedTxHead.set(headIdx + 1); + const archivedTx: Tx = new Tx( + tx.data, + ClientIvcProof.empty(), + tx.unencryptedLogs, + tx.contractClassLogs, + tx.enqueuedPublicFunctionCalls, + tx.publicTeardownFunctionCall, + ); + const txHash = tx.getTxHash().toString(); + void this.#archivedTxs.set(txHash, archivedTx.toBuffer()); + void this.#archivedTxIndices.set(headIdx, txHash); + void this.#archivedTxHead.set(++headIdx); + }); } } From 8c1f5ad7a0e83e5d487a8270d00d7f5cde75e580 Mon Sep 17 00:00:00 2001 From: Nate Beauregard Date: Wed, 15 Jan 2025 10:23:00 -0500 Subject: [PATCH 3/4] Archive txs in a single DB write --- .../tx_pool/aztec_kv_tx_pool.test.ts | 10 ++- .../src/mem_pools/tx_pool/aztec_kv_tx_pool.ts | 63 ++++++++++--------- 2 files changed, 41 insertions(+), 32 deletions(-) diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts index ef1d3f609e8..3276852b37b 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts @@ -19,7 +19,8 @@ describe('KV TX pool', () => { const tx1 = mockTx(1); const tx2 = mockTx(2); const tx3 = mockTx(3); - await txPool.addTxs([tx1, tx2, tx3]); + const tx4 = mockTx(4); + await txPool.addTxs([tx1, tx2, tx3, tx4]); // delete two txs and assert that they are properly archived await txPool.deleteTxs([tx1.getTxHash(), tx2.getTxHash()]); @@ -31,5 +32,12 @@ describe('KV TX pool', () => { expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toBeUndefined(); expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2); expect(txPool.getArchivedTxByHash(tx3.getTxHash())).toEqual(tx3); + + // delete another tx and assert that the second tx is purged and the new tx is archived + await txPool.deleteTxs([tx4.getTxHash()]); + expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toBeUndefined(); + expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toBeUndefined(); + expect(txPool.getArchivedTxByHash(tx3.getTxHash())).toEqual(tx3); + expect(txPool.getArchivedTxByHash(tx4.getTxHash())).toEqual(tx4); }); }); diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts index 446a86a3921..a9a20581a3c 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts @@ -207,14 +207,14 @@ export class AztecKVTxPool implements TxPool { /** * Deletes transactions from the pool. Tx hashes that are not present are ignored. * @param txHashes - An array of tx hashes to be removed from the tx pool. - * @returns The number of transactions that was deleted from the pool. + * @returns Empty promise. */ public deleteTxs(txHashes: TxHash[]): Promise { let pendingDeleted = 0; let minedDeleted = 0; - const archiveTxs: Promise[] = []; - const poolTxs = this.#store.transaction(() => { + const deletedTxs: Tx[] = []; + const poolDbTx = this.#store.transaction(() => { for (const hash of txHashes) { const key = hash.toString(); const tx = this.getTxByHash(hash); @@ -231,7 +231,7 @@ export class AztecKVTxPool implements TxPool { } if (this.#archivedTxLimit) { - archiveTxs.push(this.archiveTx(tx)); + deletedTxs.push(tx); } void this.#txs.delete(key); @@ -243,12 +243,7 @@ export class AztecKVTxPool implements TxPool { this.#metrics.recordRemovedObjects(minedDeleted, 'mined'); }); - return poolTxs.then(() => - archiveTxs.reduce( - (archiveTx, remainingArchiveTxs) => archiveTx.then(() => remainingArchiveTxs), - Promise.resolve(), - ), - ); + return this.#archivedTxLimit ? poolDbTx.then(() => this.archiveTxs(deletedTxs)) : poolDbTx; } /** @@ -272,35 +267,41 @@ export class AztecKVTxPool implements TxPool { } /** - * Archives a tx for future reference. The number of archived txs is limited by the specified archivedTxLimit. - * @param tx - The transaction to archive. + * Archives a list of txs for future reference. The number of archived txs is limited by the specified archivedTxLimit. + * @param txs - The list of transactions to archive. + * @returns Empty promise. */ - private archiveTx(tx: Tx): Promise { + private archiveTxs(txs: Tx[]): Promise { return this.#archive.transaction(() => { let headIdx = this.#archivedTxHead.get() ?? 0; let tailIdx = this.#archivedTxTail.get() ?? 0; - while (headIdx - tailIdx >= this.#archivedTxLimit) { - const txHash = this.#archivedTxIndices.get(tailIdx); - if (txHash) { - void this.#archivedTxs.delete(txHash); - void this.#archivedTxIndices.delete(tailIdx); + for (const tx of txs) { + while (headIdx - tailIdx >= this.#archivedTxLimit) { + const txHash = this.#archivedTxIndices.get(tailIdx); + if (txHash) { + void this.#archivedTxs.delete(txHash); + void this.#archivedTxIndices.delete(tailIdx); + } + tailIdx++; } - void this.#archivedTxTail.set(++tailIdx); + + const archivedTx: Tx = new Tx( + tx.data, + ClientIvcProof.empty(), + tx.unencryptedLogs, + tx.contractClassLogs, + tx.enqueuedPublicFunctionCalls, + tx.publicTeardownFunctionCall, + ); + const txHash = tx.getTxHash().toString(); + void this.#archivedTxs.set(txHash, archivedTx.toBuffer()); + void this.#archivedTxIndices.set(headIdx, txHash); + headIdx++; } - const archivedTx: Tx = new Tx( - tx.data, - ClientIvcProof.empty(), - tx.unencryptedLogs, - tx.contractClassLogs, - tx.enqueuedPublicFunctionCalls, - tx.publicTeardownFunctionCall, - ); - const txHash = tx.getTxHash().toString(); - void this.#archivedTxs.set(txHash, archivedTx.toBuffer()); - void this.#archivedTxIndices.set(headIdx, txHash); - void this.#archivedTxHead.set(++headIdx); + void this.#archivedTxHead.set(headIdx); + void this.#archivedTxTail.set(tailIdx); }); } } From cfd469d4553ef19719ca9c3fa52cd57c066f4b18 Mon Sep 17 00:00:00 2001 From: Nate Beauregard Date: Wed, 15 Jan 2025 13:45:03 -0500 Subject: [PATCH 4/4] Calculate head/tail indicies directly --- .../tx_pool/aztec_kv_tx_pool.test.ts | 12 +++++++----- .../src/mem_pools/tx_pool/aztec_kv_tx_pool.ts | 19 +++++-------------- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts index 3276852b37b..ea584401412 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts @@ -20,24 +20,26 @@ describe('KV TX pool', () => { const tx2 = mockTx(2); const tx3 = mockTx(3); const tx4 = mockTx(4); - await txPool.addTxs([tx1, tx2, tx3, tx4]); + const tx5 = mockTx(5); + await txPool.addTxs([tx1, tx2, tx3, tx4, tx5]); // delete two txs and assert that they are properly archived await txPool.deleteTxs([tx1.getTxHash(), tx2.getTxHash()]); expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toEqual(tx1); expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2); - // delete another tx and assert that the first tx is purged and the new tx is archived + // delete a single tx and assert that the first tx is purged and the new tx is archived await txPool.deleteTxs([tx3.getTxHash()]); expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toBeUndefined(); expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2); expect(txPool.getArchivedTxByHash(tx3.getTxHash())).toEqual(tx3); - // delete another tx and assert that the second tx is purged and the new tx is archived - await txPool.deleteTxs([tx4.getTxHash()]); + // delete multiple txs and assert that the old txs are purged and the new txs are archived + await txPool.deleteTxs([tx4.getTxHash(), tx5.getTxHash()]); expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toBeUndefined(); expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toBeUndefined(); - expect(txPool.getArchivedTxByHash(tx3.getTxHash())).toEqual(tx3); + expect(txPool.getArchivedTxByHash(tx3.getTxHash())).toBeUndefined(); expect(txPool.getArchivedTxByHash(tx4.getTxHash())).toEqual(tx4); + expect(txPool.getArchivedTxByHash(tx5.getTxHash())).toEqual(tx5); }); }); diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts index a9a20581a3c..ad7e4981fcb 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts @@ -2,7 +2,7 @@ import { Tx, TxHash } from '@aztec/circuit-types'; import { type TxAddedToPoolStats } from '@aztec/circuit-types/stats'; import { ClientIvcProof } from '@aztec/circuits.js'; import { type Logger, createLogger } from '@aztec/foundation/log'; -import { type AztecKVStore, type AztecMap, type AztecMultiMap, type AztecSingleton } from '@aztec/kv-store'; +import { type AztecKVStore, type AztecMap, type AztecMultiMap } from '@aztec/kv-store'; import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client'; import { PoolInstrumentation, PoolName } from '../instrumentation.js'; @@ -33,12 +33,6 @@ export class AztecKVTxPool implements TxPool { /** Indexes of the archived txs by insertion order. */ #archivedTxIndices: AztecMap; - /** Index of the most recently inserted archived tx. */ - #archivedTxHead: AztecSingleton; - - /** Index of the oldest archived tx. */ - #archivedTxTail: AztecSingleton; - /** Number of txs to archive. */ #archivedTxLimit: number; @@ -51,6 +45,7 @@ export class AztecKVTxPool implements TxPool { * @param store - A KV store for live txs in the pool. * @param archive - A KV store for archived txs. * @param telemetry - A telemetry client. + * @param archivedTxLimit - The number of txs to archive. * @param log - A logger. */ constructor( @@ -66,8 +61,6 @@ export class AztecKVTxPool implements TxPool { this.#archivedTxs = archive.openMap('archivedTxs'); this.#archivedTxIndices = archive.openMap('archivedTxIndices'); - this.#archivedTxHead = archive.openSingleton('archivedTxHead'); - this.#archivedTxTail = archive.openSingleton('archivedTxTail'); this.#archivedTxLimit = archivedTxLimit; this.#store = store; @@ -273,8 +266,9 @@ export class AztecKVTxPool implements TxPool { */ private archiveTxs(txs: Tx[]): Promise { return this.#archive.transaction(() => { - let headIdx = this.#archivedTxHead.get() ?? 0; - let tailIdx = this.#archivedTxTail.get() ?? 0; + // calcualte the head and tail indices of the archived txs by insertion order. + let headIdx = (this.#archivedTxIndices.entries({ limit: 1, reverse: true }).next().value?.[0] ?? -1) + 1; + let tailIdx = this.#archivedTxIndices.entries({ limit: 1 }).next().value?.[0] ?? 0; for (const tx of txs) { while (headIdx - tailIdx >= this.#archivedTxLimit) { @@ -299,9 +293,6 @@ export class AztecKVTxPool implements TxPool { void this.#archivedTxIndices.set(headIdx, txHash); headIdx++; } - - void this.#archivedTxHead.set(headIdx); - void this.#archivedTxTail.set(tailIdx); }); } }