Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: archive public testnet tx data #11192

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export type EnvVar =
| 'P2P_TX_PROTOCOL'
| 'P2P_UDP_ANNOUNCE_ADDR'
| 'P2P_UDP_LISTEN_ADDR'
| 'P2P_ARCHIVED_TX_LIMIT'
| 'PEER_ID_PRIVATE_KEY'
| 'PROVER_BLOB_SINK_URL'
| 'PROOF_VERIFIER_L1_START_BLOCK'
Expand Down
3 changes: 2 additions & 1 deletion yarn-project/p2p/src/client/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ export const createP2PClient = async <T extends P2PClientType>(
let config = { ..._config };
const logger = createLogger('p2p');
const store = deps.store ?? (await createStore('p2p', config, createLogger('p2p:lmdb')));
const archive = await createStore('p2p-archive', config, createLogger('p2p-archive:lmdb'));

const mempools: MemPools<T> = {
txPool: deps.txPool ?? new AztecKVTxPool(store, telemetry),
txPool: deps.txPool ?? new AztecKVTxPool(store, archive, telemetry, config.archivedTxLimit),
epochProofQuotePool: deps.epochProofQuotePool ?? new MemoryEpochProofQuotePool(telemetry),
attestationPool:
clientType === P2PClientType.Full
Expand Down
16 changes: 16 additions & 0 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ export type P2P<T extends P2PClientType = P2PClientType.Full> = P2PApi<T> & {
*/
getTxByHash(txHash: TxHash): Promise<Tx | undefined>;

/**
* Returns an archived transaction from the transaction pool by its hash.
* @param txHash - Hash of tx to return.
* @returns A single tx or undefined.
*/
getArchivedTxByHash(txHash: TxHash): Promise<Tx | undefined>;

/**
* Returns whether the given tx hash is flagged as pending or mined.
* @param txHash - Hash of the tx to query.
Expand Down Expand Up @@ -523,6 +530,15 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
return this.requestTxByHash(txHash);
}

/**
* Returns an archived transaction in the transaction pool by its hash.
* @param txHash - Hash of the archived transaction to look for.
* @returns A single tx or undefined.
*/
getArchivedTxByHash(txHash: TxHash): Promise<Tx | undefined> {
return Promise.resolve(this.txPool.getArchivedTxByHash(txHash));
}

/**
* Verifies the 'tx' and, if valid, adds it to local tx pool and forwards it to other peers.
* @param tx - The tx to verify.
Expand Down
9 changes: 9 additions & 0 deletions yarn-project/p2p/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ export interface P2PConfig extends P2PReqRespConfig {
* The chain id of the L1 chain.
*/
l1ChainId: number;

/** Limit of transactions to archive in the tx pool. Once the archived tx limit is reached, the oldest archived txs will be purged. */
archivedTxLimit: number;
}

export const p2pConfigMappings: ConfigMappingsType<P2PConfig> = {
Expand Down Expand Up @@ -305,6 +308,12 @@ export const p2pConfigMappings: ConfigMappingsType<P2PConfig> = {
description: 'The number of blocks to fetch in a single batch.',
...numberConfigHelper(20),
},
archivedTxLimit: {
env: 'P2P_ARCHIVED_TX_LIMIT',
description:
'The number of transactions that will be archived. If the limit is set to 0 then archiving will be disabled.',
...numberConfigHelper(0),
},
...p2pReqRespConfigMappings,
};

Expand Down
34 changes: 33 additions & 1 deletion yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { mockTx } from '@aztec/circuit-types';
import { openTmpStore } from '@aztec/kv-store/lmdb';

import { AztecKVTxPool } from './aztec_kv_tx_pool.js';
Expand All @@ -6,8 +7,39 @@ import { describeTxPool } from './tx_pool_test_suite.js';
describe('KV TX pool', () => {
let txPool: AztecKVTxPool;
beforeEach(() => {
txPool = new AztecKVTxPool(openTmpStore());
txPool = new AztecKVTxPool(openTmpStore(), openTmpStore());
});

describeTxPool(() => txPool);

it('Returns archived txs and purges archived txs once the archived tx limit is reached', async () => {
// set the archived tx limit to 2
txPool = new AztecKVTxPool(openTmpStore(), openTmpStore(), undefined, 2);

const tx1 = mockTx(1);
const tx2 = mockTx(2);
const tx3 = mockTx(3);
const tx4 = mockTx(4);
const tx5 = mockTx(5);
await txPool.addTxs([tx1, tx2, tx3, tx4, tx5]);

// delete two txs and assert that they are properly archived
await txPool.deleteTxs([tx1.getTxHash(), tx2.getTxHash()]);
expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toEqual(tx1);
expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2);

// delete a single tx and assert that the first tx is purged and the new tx is archived
await txPool.deleteTxs([tx3.getTxHash()]);
expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toBeUndefined();
expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toEqual(tx2);
expect(txPool.getArchivedTxByHash(tx3.getTxHash())).toEqual(tx3);

// delete multiple txs and assert that the old txs are purged and the new txs are archived
await txPool.deleteTxs([tx4.getTxHash(), tx5.getTxHash()]);
expect(txPool.getArchivedTxByHash(tx1.getTxHash())).toBeUndefined();
expect(txPool.getArchivedTxByHash(tx2.getTxHash())).toBeUndefined();
expect(txPool.getArchivedTxByHash(tx3.getTxHash())).toBeUndefined();
expect(txPool.getArchivedTxByHash(tx4.getTxHash())).toEqual(tx4);
expect(txPool.getArchivedTxByHash(tx5.getTxHash())).toEqual(tx5);
});
});
92 changes: 87 additions & 5 deletions yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Tx, TxHash } from '@aztec/circuit-types';
import { type TxAddedToPoolStats } from '@aztec/circuit-types/stats';
import { ClientIvcProof } from '@aztec/circuits.js';
import { type Logger, createLogger } from '@aztec/foundation/log';
import { type AztecKVStore, type AztecMap, type AztecMultiMap } from '@aztec/kv-store';
import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client';
Expand All @@ -9,7 +10,7 @@ import { getPendingTxPriority } from './priority.js';
import { type TxPool } from './tx_pool.js';

/**
* In-memory implementation of the Transaction Pool.
* KV implementation of the Transaction Pool.
natebeauregard marked this conversation as resolved.
Show resolved Hide resolved
*/
export class AztecKVTxPool implements TxPool {
#store: AztecKVStore;
Expand All @@ -23,25 +24,47 @@ export class AztecKVTxPool implements TxPool {
/** Index from tx priority (stored as hex) to its tx hash, filtered by pending txs. */
#pendingTxPriorityToHash: AztecMultiMap<string, string>;

/** KV store for archived txs. */
#archive: AztecKVStore;

/** Archived txs map for future lookup. */
#archivedTxs: AztecMap<string, Buffer>;

/** Indexes of the archived txs by insertion order. */
#archivedTxIndices: AztecMap<number, string>;

/** Number of txs to archive. */
#archivedTxLimit: number;

#log: Logger;

#metrics: PoolInstrumentation<Tx>;

/**
* Class constructor for in-memory TxPool. Initiates our transaction pool as a JS Map.
* @param store - A KV store.
* Class constructor for KV TxPool. Initiates our transaction pool as an AztecMap.
* @param store - A KV store for live txs in the pool.
* @param archive - A KV store for archived txs.
* @param telemetry - A telemetry client.
* @param archivedTxLimit - The number of txs to archive.
* @param log - A logger.
*/
constructor(
store: AztecKVStore,
archive: AztecKVStore,
telemetry: TelemetryClient = getTelemetryClient(),
archivedTxLimit: number = 0,
log = createLogger('p2p:tx_pool'),
) {
this.#txs = store.openMap('txs');
this.#minedTxHashToBlock = store.openMap('txHashToBlockMined');
this.#pendingTxPriorityToHash = store.openMultiMap('pendingTxFeeToHash');

this.#archivedTxs = archive.openMap('archivedTxs');
this.#archivedTxIndices = archive.openMap('archivedTxIndices');
this.#archivedTxLimit = archivedTxLimit;

this.#store = store;
this.#archive = archive;
this.#log = log;
this.#metrics = new PoolInstrumentation(telemetry, PoolName.TX_POOL, () => store.estimateSize());
}
Expand Down Expand Up @@ -129,6 +152,21 @@ export class AztecKVTxPool implements TxPool {
return undefined;
}

/**
* Checks if an archived tx exists and returns it.
* @param txHash - The tx hash.
* @returns The transaction metadata, if found, 'undefined' otherwise.
*/
public getArchivedTxByHash(txHash: TxHash): Tx | undefined {
const buffer = this.#archivedTxs.get(txHash.toString());
if (buffer) {
const tx = Tx.fromBuffer(buffer);
tx.setTxHash(txHash);
return tx;
}
return undefined;
}

/**
* Adds a list of transactions to the pool. Duplicates are ignored.
* @param txs - An array of txs to be added to the pool.
Expand Down Expand Up @@ -162,13 +200,14 @@ export class AztecKVTxPool implements TxPool {
/**
* Deletes transactions from the pool. Tx hashes that are not present are ignored.
* @param txHashes - An array of tx hashes to be removed from the tx pool.
* @returns The number of transactions that was deleted from the pool.
* @returns Empty promise.
*/
public deleteTxs(txHashes: TxHash[]): Promise<void> {
let pendingDeleted = 0;
let minedDeleted = 0;

return this.#store.transaction(() => {
const deletedTxs: Tx[] = [];
const poolDbTx = this.#store.transaction(() => {
for (const hash of txHashes) {
const key = hash.toString();
const tx = this.getTxByHash(hash);
Expand All @@ -184,6 +223,10 @@ export class AztecKVTxPool implements TxPool {
pendingDeleted++;
}

if (this.#archivedTxLimit) {
deletedTxs.push(tx);
}

void this.#txs.delete(key);
void this.#minedTxHashToBlock.delete(key);
}
Expand All @@ -192,6 +235,8 @@ export class AztecKVTxPool implements TxPool {
this.#metrics.recordRemovedObjects(pendingDeleted, 'pending');
this.#metrics.recordRemovedObjects(minedDeleted, 'mined');
});

return this.#archivedTxLimit ? poolDbTx.then(() => this.archiveTxs(deletedTxs)) : poolDbTx;
}

/**
Expand All @@ -213,4 +258,41 @@ export class AztecKVTxPool implements TxPool {
public getAllTxHashes(): TxHash[] {
return Array.from(this.#txs.keys()).map(x => TxHash.fromString(x));
}

/**
* Archives a list of txs for future reference. The number of archived txs is limited by the specified archivedTxLimit.
* @param txs - The list of transactions to archive.
* @returns Empty promise.
*/
private archiveTxs(txs: Tx[]): Promise<void> {
return this.#archive.transaction(() => {
// calcualte the head and tail indices of the archived txs by insertion order.
let headIdx = (this.#archivedTxIndices.entries({ limit: 1, reverse: true }).next().value?.[0] ?? -1) + 1;
let tailIdx = this.#archivedTxIndices.entries({ limit: 1 }).next().value?.[0] ?? 0;

for (const tx of txs) {
while (headIdx - tailIdx >= this.#archivedTxLimit) {
const txHash = this.#archivedTxIndices.get(tailIdx);
if (txHash) {
void this.#archivedTxs.delete(txHash);
void this.#archivedTxIndices.delete(tailIdx);
}
tailIdx++;
}

const archivedTx: Tx = new Tx(
tx.data,
ClientIvcProof.empty(),
tx.unencryptedLogs,
tx.contractClassLogs,
tx.enqueuedPublicFunctionCalls,
tx.publicTeardownFunctionCall,
);
const txHash = tx.getTxHash().toString();
void this.#archivedTxs.set(txHash, archivedTx.toBuffer());
void this.#archivedTxIndices.set(headIdx, txHash);
headIdx++;
}
});
}
}
4 changes: 4 additions & 0 deletions yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ export class InMemoryTxPool implements TxPool {
return result === undefined ? undefined : Tx.clone(result);
}

public getArchivedTxByHash(): Tx | undefined {
return undefined;
}

/**
* Adds a list of transactions to the pool. Duplicates are ignored.
* @param txs - An array of txs to be added to the pool.
Expand Down
7 changes: 7 additions & 0 deletions yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ export interface TxPool {
*/
getTxByHash(txHash: TxHash): Tx | undefined;

/**
* Checks if an archived transaction exists in the pool and returns it.
* @param txHash - The hash of the transaction, used as an ID.
* @returns The transaction, if found, 'undefined' otherwise.
*/
getArchivedTxByHash(txHash: TxHash): Tx | undefined;

/**
* Marks the set of txs as mined, as opposed to pending.
* @param txHashes - Hashes of the txs to flag as mined.
Expand Down
Loading