Skip to content

Commit 8f57a30

Browse files
Use a separate db for the tx archive and address CR comments
1 parent c302af5 commit 8f57a30

File tree

3 files changed

+67
-54
lines changed

3 files changed

+67
-54
lines changed

yarn-project/p2p/src/client/factory.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,10 @@ export const createP2PClient = async <T extends P2PClientType>(
4444
let config = { ..._config };
4545
const logger = createLogger('p2p');
4646
const store = deps.store ?? (await createStore('p2p', config, createLogger('p2p:lmdb')));
47+
const archive = await createStore('p2p-archive', config, createLogger('p2p-archive:lmdb'));
4748

4849
const mempools: MemPools<T> = {
49-
txPool: deps.txPool ?? new AztecKVTxPool(store, telemetry),
50+
txPool: deps.txPool ?? new AztecKVTxPool(store, archive, telemetry, config.archivedTxLimit),
5051
epochProofQuotePool: deps.epochProofQuotePool ?? new MemoryEpochProofQuotePool(telemetry),
5152
attestationPool:
5253
clientType === P2PClientType.Full

yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,29 @@ import { describeTxPool } from './tx_pool_test_suite.js';
77
describe('KV TX pool', () => {
88
let txPool: AztecKVTxPool;
99
beforeEach(() => {
10-
txPool = new AztecKVTxPool(openTmpStore());
10+
txPool = new AztecKVTxPool(openTmpStore(), openTmpStore());
1111
});
1212

1313
describeTxPool(() => txPool);
1414

1515
it('Returns archived txs and purges archived txs once the archived tx limit is reached', async () => {
1616
// set the archived tx limit to 2
17-
txPool = new AztecKVTxPool(openTmpStore(), new NoopTelemetryClient(), 2);
17+
txPool = new AztecKVTxPool(openTmpStore(), openTmpStore(), undefined, 2);
1818

1919
const tx1 = mockTx(1);
2020
const tx2 = mockTx(2);
2121
const tx3 = mockTx(3);
22+
await txPool.addTxs([tx1, tx2, tx3]);
2223

23-
// add two txs and assert that they are properly archived
24-
await txPool.addTxs([tx1, tx2]);
25-
expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toEqual(tx1.getTxHash());
26-
expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2.getTxHash());
24+
// delete two txs and assert that they are properly archived
25+
await txPool.deleteTxs([tx1.getTxHash(), tx2.getTxHash()]);
26+
expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toEqual(tx1);
27+
expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2);
2728

28-
// add another tx and assert that the first tx is purged and the new tx is archived
29-
await txPool.addTxs([tx3]);
29+
// delete another tx and assert that the first tx is purged and the new tx is archived
30+
await txPool.deleteTxs([tx3.getTxHash()]);
3031
expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toBeUndefined();
31-
expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2.getTxHash());
32-
expect(txPool.getArchivedTxByHash(tx3.getTxHash())).toEqual(tx3.getTxHash());
32+
expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2);
33+
expect(txPool.getArchivedTxByHash(tx3.getTxHash())).toEqual(tx3);
3334
});
3435
});

yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts

Lines changed: 54 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,9 @@ import { Tx, TxHash } from '@aztec/circuit-types';
22
import { type TxAddedToPoolStats } from '@aztec/circuit-types/stats';
33
import { ClientIvcProof } from '@aztec/circuits.js';
44
import { type Logger, createLogger } from '@aztec/foundation/log';
5-
import {
6-
type AztecKVStore,
7-
type AztecMap,
8-
type AztecMapWithSize,
9-
type AztecMultiMap,
10-
type AztecSingleton,
11-
} from '@aztec/kv-store';
5+
import { type AztecKVStore, type AztecMap, type AztecMultiMap, type AztecSingleton } from '@aztec/kv-store';
126
import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client';
137

14-
import { getP2PConfigFromEnv } from '../../config.js';
158
import { PoolInstrumentation, PoolName } from '../instrumentation.js';
169
import { getPendingTxPriority } from './priority.js';
1710
import { type TxPool } from './tx_pool.js';
@@ -31,8 +24,11 @@ export class AztecKVTxPool implements TxPool {
3124
/** Index from tx priority (stored as hex) to its tx hash, filtered by pending txs. */
3225
#pendingTxPriorityToHash: AztecMultiMap<string, string>;
3326

27+
/** KV store for archived txs. */
28+
#archive: AztecKVStore;
29+
3430
/** Archived txs map for future lookup. */
35-
#archivedTxs: AztecMapWithSize<string, Buffer>;
31+
#archivedTxs: AztecMap<string, Buffer>;
3632

3733
/** Indexes of the archived txs by insertion order. */
3834
#archivedTxIndices: AztecMap<number, string>;
@@ -52,26 +48,30 @@ export class AztecKVTxPool implements TxPool {
5248

5349
/**
5450
* Class constructor for KV TxPool. Initiates our transaction pool as an AztecMap.
55-
* @param store - A KV store.
51+
* @param store - A KV store for live txs in the pool.
52+
* @param archive - A KV store for archived txs.
53+
* @param telemetry - A telemetry client.
5654
* @param log - A logger.
5755
*/
5856
constructor(
5957
store: AztecKVStore,
58+
archive: AztecKVStore,
6059
telemetry: TelemetryClient = getTelemetryClient(),
61-
archivedTxLimit = getP2PConfigFromEnv().archivedTxLimit,
60+
archivedTxLimit: number = 0,
6261
log = createLogger('p2p:tx_pool'),
6362
) {
6463
this.#txs = store.openMap('txs');
6564
this.#minedTxHashToBlock = store.openMap('txHashToBlockMined');
6665
this.#pendingTxPriorityToHash = store.openMultiMap('pendingTxFeeToHash');
6766

68-
this.#archivedTxs = store.openMapWithSize('archivedTxs');
69-
this.#archivedTxIndices = store.openMap('archivedTxIndicies');
70-
this.#archivedTxHead = store.openSingleton('archivedTxHead');
71-
this.#archivedTxTail = store.openSingleton('archivedTxTail');
67+
this.#archivedTxs = archive.openMap('archivedTxs');
68+
this.#archivedTxIndices = archive.openMap('archivedTxIndices');
69+
this.#archivedTxHead = archive.openSingleton('archivedTxHead');
70+
this.#archivedTxTail = archive.openSingleton('archivedTxTail');
7271
this.#archivedTxLimit = archivedTxLimit;
7372

7473
this.#store = store;
74+
this.#archive = archive;
7575
this.#log = log;
7676
this.#metrics = new PoolInstrumentation(telemetry, PoolName.TX_POOL, () => store.estimateSize());
7777
}
@@ -198,10 +198,6 @@ export class AztecKVTxPool implements TxPool {
198198
void this.#pendingTxPriorityToHash.set(getPendingTxPriority(tx), key);
199199
this.#metrics.recordSize(tx);
200200
}
201-
202-
if (this.#archivedTxLimit) {
203-
void this.archiveTx(tx);
204-
}
205201
}
206202

207203
this.#metrics.recordAddedObjects(pendingCount, 'pending');
@@ -217,7 +213,8 @@ export class AztecKVTxPool implements TxPool {
217213
let pendingDeleted = 0;
218214
let minedDeleted = 0;
219215

220-
return this.#store.transaction(() => {
216+
const archiveTxs: Promise<void>[] = [];
217+
const poolTxs = this.#store.transaction(() => {
221218
for (const hash of txHashes) {
222219
const key = hash.toString();
223220
const tx = this.getTxByHash(hash);
@@ -233,6 +230,10 @@ export class AztecKVTxPool implements TxPool {
233230
pendingDeleted++;
234231
}
235232

233+
if (this.#archivedTxLimit) {
234+
archiveTxs.push(this.archiveTx(tx));
235+
}
236+
236237
void this.#txs.delete(key);
237238
void this.#minedTxHashToBlock.delete(key);
238239
}
@@ -241,6 +242,13 @@ export class AztecKVTxPool implements TxPool {
241242
this.#metrics.recordRemovedObjects(pendingDeleted, 'pending');
242243
this.#metrics.recordRemovedObjects(minedDeleted, 'mined');
243244
});
245+
246+
return poolTxs.then(() =>
247+
archiveTxs.reduce(
248+
(archiveTx, remainingArchiveTxs) => archiveTx.then(() => remainingArchiveTxs),
249+
Promise.resolve(),
250+
),
251+
);
244252
}
245253

246254
/**
@@ -264,32 +272,35 @@ export class AztecKVTxPool implements TxPool {
264272
}
265273

266274
/**
267-
* Archive a tx for future reference.
275+
* Archives a tx for future reference. The number of archived txs is limited by the specified archivedTxLimit.
268276
* @param tx - The transaction to archive.
269277
*/
270-
private archiveTx(tx: Tx) {
271-
while (this.#archivedTxs.size() >= this.#archivedTxLimit) {
272-
const tailIdx = this.#archivedTxTail.get() ?? 0;
273-
const txHash = this.#archivedTxIndices.get(tailIdx);
274-
if (txHash) {
275-
void this.#archivedTxs.delete(txHash);
276-
void this.#archivedTxIndices.delete(tailIdx);
278+
private archiveTx(tx: Tx): Promise<void> {
279+
return this.#archive.transaction(() => {
280+
let headIdx = this.#archivedTxHead.get() ?? 0;
281+
let tailIdx = this.#archivedTxTail.get() ?? 0;
282+
283+
while (headIdx - tailIdx >= this.#archivedTxLimit) {
284+
const txHash = this.#archivedTxIndices.get(tailIdx);
285+
if (txHash) {
286+
void this.#archivedTxs.delete(txHash);
287+
void this.#archivedTxIndices.delete(tailIdx);
288+
}
289+
void this.#archivedTxTail.set(++tailIdx);
277290
}
278-
void this.#archivedTxTail.set(tailIdx + 1);
279-
}
280291

281-
const archivedTx: Tx = new Tx(
282-
tx.data,
283-
ClientIvcProof.empty(),
284-
tx.unencryptedLogs,
285-
tx.contractClassLogs,
286-
tx.enqueuedPublicFunctionCalls,
287-
tx.publicTeardownFunctionCall,
288-
);
289-
const txHash = tx.getTxHash().toString();
290-
void this.#archivedTxs.set(txHash, archivedTx.toBuffer());
291-
const headIdx = this.#archivedTxHead.get() ?? 0;
292-
void this.#archivedTxIndices.set(headIdx, txHash);
293-
void this.#archivedTxHead.set(headIdx + 1);
292+
const archivedTx: Tx = new Tx(
293+
tx.data,
294+
ClientIvcProof.empty(),
295+
tx.unencryptedLogs,
296+
tx.contractClassLogs,
297+
tx.enqueuedPublicFunctionCalls,
298+
tx.publicTeardownFunctionCall,
299+
);
300+
const txHash = tx.getTxHash().toString();
301+
void this.#archivedTxs.set(txHash, archivedTx.toBuffer());
302+
void this.#archivedTxIndices.set(headIdx, txHash);
303+
void this.#archivedTxHead.set(++headIdx);
304+
});
294305
}
295306
}

0 commit comments

Comments
 (0)