diff --git a/apps/worker/src/interfaces/status-monitor.interface.ts b/apps/worker/src/interfaces/status-monitor.interface.ts index 69f88f9..2b8d6ed 100644 --- a/apps/worker/src/interfaces/status-monitor.interface.ts +++ b/apps/worker/src/interfaces/status-monitor.interface.ts @@ -4,7 +4,6 @@ import { IPublisherJob } from './publisher-job.interface'; export interface ITxMonitorJob { id: string; txHash: Hash; - epoch: string; lastFinalizedBlockHash: BlockHash; referencePublishJob: IPublisherJob; } diff --git a/apps/worker/src/main.ts b/apps/worker/src/main.ts index a99a8ec..6e7abe5 100644 --- a/apps/worker/src/main.ts +++ b/apps/worker/src/main.ts @@ -1,6 +1,12 @@ import { NestFactory } from '@nestjs/core'; import { WorkerModule } from './worker.module'; +// Monkey-patch BigInt so that JSON.stringify will work +// eslint-disable-next-line +BigInt.prototype['toJSON'] = function () { + return this.toString(); +}; + async function bootstrap() { const app = await NestFactory.createApplicationContext(WorkerModule); app.enableShutdownHooks(); diff --git a/apps/worker/src/monitor/tx.status.monitor.service.ts b/apps/worker/src/monitor/tx.status.monitor.service.ts index 256e30f..b1e31e8 100644 --- a/apps/worker/src/monitor/tx.status.monitor.service.ts +++ b/apps/worker/src/monitor/tx.status.monitor.service.ts @@ -20,7 +20,6 @@ import { IPublisherJob } from '../interfaces/publisher-job.interface'; export class TxStatusMonitoringService extends BaseConsumer { constructor( @InjectRedis() private cacheManager: Redis, - @InjectQueue(TRANSACTION_RECEIPT_QUEUE_NAME) private txReceiptQueue: Queue, @InjectQueue(PUBLISH_QUEUE_NAME) private publishQueue: Queue, private blockchainService: BlockchainService, ) { @@ -31,7 +30,6 @@ export class TxStatusMonitoringService extends BaseConsumer { this.logger.log(`Monitoring job ${job.id} of type ${job.name}`); try { const numberBlocksToParse = NUMBER_BLOCKS_TO_CRAWL; - const txCapacityEpoch = job.data.epoch; const previousKnownBlockNumber = (await this.blockchainService.getBlock(job.data.lastFinalizedBlockHash)).block.header.number.toBigInt(); const currentFinalizedBlockNumber = await this.blockchainService.getLatestFinalizedBlockNumber(); const blockList: bigint[] = []; @@ -52,7 +50,7 @@ export class TxStatusMonitoringService extends BaseConsumer { } } else { // found the tx - await this.setEpochCapacity(txCapacityEpoch, BigInt(txResult.capacityWithDrawn ?? 0n)); + await this.setEpochCapacity(txResult.capacityEpoch ?? 0, txResult.capacityWithdrawn ?? 0n); if (txResult.error) { this.logger.debug(`Error found in tx result: ${JSON.stringify(txResult.error)}`); const errorReport = await this.handleMessagesFailure(job.data.id, txResult.error); @@ -107,13 +105,13 @@ export class TxStatusMonitoringService extends BaseConsumer { return { pause: false, retry: false }; } - private async setEpochCapacity(epoch: string, capacityWithdrew: bigint): Promise { + private async setEpochCapacity(epoch: number, capacityWithdrawn: bigint): Promise { const epochCapacityKey = `epochCapacity:${epoch}`; try { const savedCapacity = await this.cacheManager.get(epochCapacityKey); const epochCapacity = BigInt(savedCapacity ?? 0); - const newEpochCapacity = epochCapacity + capacityWithdrew; + const newEpochCapacity = epochCapacity + capacityWithdrawn; const epochDurationBlocks = await this.blockchainService.getCurrentEpochLength(); const epochDuration = epochDurationBlocks * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND; diff --git a/apps/worker/src/publisher/publishing.service.ts b/apps/worker/src/publisher/publishing.service.ts index 008cb62..84972fa 100644 --- a/apps/worker/src/publisher/publishing.service.ts +++ b/apps/worker/src/publisher/publishing.service.ts @@ -54,9 +54,8 @@ export class PublishingService extends BaseConsumer implements OnApplicationBoot this.logger.log(`Processing job ${job.id} of type ${job.name}`); try { const currentBlockHash = await this.blockchainService.getLatestFinalizedBlockHash(); - const currentCapacityEpoch = await this.blockchainService.getCurrentCapacityEpoch(); const txHash = await this.ipfsPublisher.publish(job.data); - await this.sendJobToTxReceiptQueue(job.data, txHash, currentBlockHash, currentCapacityEpoch.toString()); + await this.sendJobToTxReceiptQueue(job.data, txHash, currentBlockHash); this.logger.verbose(`Successfully completed job ${job.id}`); return { success: true }; } catch (e) { @@ -70,10 +69,9 @@ export class PublishingService extends BaseConsumer implements OnApplicationBoot } } - async sendJobToTxReceiptQueue(jobData: IPublisherJob, txHash: Hash, lastFinalizedBlockHash: BlockHash, epoch: string): Promise { + async sendJobToTxReceiptQueue(jobData: IPublisherJob, txHash: Hash, lastFinalizedBlockHash: BlockHash): Promise { const job: ITxMonitorJob = { id: txHash.toString(), - epoch, lastFinalizedBlockHash, txHash, referencePublishJob: jobData, diff --git a/libs/common/src/blockchain/blockchain.service.ts b/libs/common/src/blockchain/blockchain.service.ts index bfde65b..c406bc0 100644 --- a/libs/common/src/blockchain/blockchain.service.ts +++ b/libs/common/src/blockchain/blockchain.service.ts @@ -119,7 +119,7 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS nextEpochStart: number; remainingCapacity: bigint; totalCapacityIssued: bigint; - currentEpoch: bigint; + currentEpoch: number; }> { const providerU64 = this.api.createType('u64', providerId); const { epochStart }: PalletCapacityEpochInfo = await this.query('capacity', 'currentEpochInfo'); @@ -138,9 +138,9 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS }; } - public async getCurrentCapacityEpoch(): Promise { + public async getCurrentCapacityEpoch(): Promise { const currentEpoch: u32 = await this.query('capacity', 'currentEpoch'); - return typeof currentEpoch === 'number' ? BigInt(currentEpoch) : currentEpoch.toBigInt(); + return currentEpoch.toNumber(); } public async getCurrentEpochLength(): Promise { @@ -165,27 +165,28 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS txHash: Hash, blockList: bigint[], successEvents: [{ pallet: string; event: string }], - ): Promise<{ found: boolean; success: boolean; blockHash?: BlockHash; capacityWithDrawn?: string; error?: RegistryError }> { - const txReceiptPromises: Promise<{ found: boolean; success: boolean; blockHash?: BlockHash; capacityWithDrawn?: string; error?: RegistryError }>[] = blockList.map( + ): Promise<{ found: boolean; success: boolean; blockHash?: BlockHash; capacityEpoch?: number; capacityWithdrawn?: bigint; error?: RegistryError }> { + const txReceiptPromises: Promise<{ found: boolean; success: boolean; blockHash?: BlockHash; capacityWithdrawn?: bigint; error?: RegistryError }>[] = blockList.map( async (blockNumber) => { const blockHash = await this.getBlockHash(blockNumber); const block = await this.getBlock(blockHash); - const txInfo = block.block.extrinsics.find((extrinsic) => extrinsic.hash.toString() === txHash.toString()); + const txIndex = block.block.extrinsics.findIndex((extrinsic) => extrinsic.hash.toString() === txHash.toString()); - if (!txInfo) { + if (txIndex === -1) { return { found: false, success: false }; } this.logger.verbose(`Found tx ${txHash} in block ${blockNumber}`); - const at = await this.api.at(blockHash.toHex()); - const eventsPromise = firstValueFrom(at.query.system.events()); + const at = await this.apiPromise.at(blockHash.toHex()); + const capacityEpoch = (await at.query.capacity.currentEpoch()).toNumber(); + const eventsPromise = at.query.system.events(); let isTxSuccess = false; let totalBlockCapacity = 0n; let txError: RegistryError | undefined; try { - const events = await eventsPromise; + const events = (await eventsPromise).filter(({ phase }) => phase.isApplyExtrinsic && phase.asApplyExtrinsic.eq(txIndex)); events.forEach((record) => { const { event } = record; @@ -195,11 +196,8 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS this.logger.debug(`Received event: ${eventName} ${method} ${data}`); // find capacity withdrawn event - if (eventName.search('capacity') !== -1 && method.search('Withdrawn') !== -1) { - // allow lowercase constructor for eslint - // eslint-disable-next-line new-cap - const currentCapacity: u128 = new u128(this.api.registry, data[1]); - totalBlockCapacity += currentCapacity.toBigInt(); + if (at.events.capacity.CapacityWithdrawn.is(event)) { + totalBlockCapacity += event.data.amount.toBigInt(); } // check custom success events @@ -209,8 +207,8 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS } // check for system extrinsic failure - if (eventName.search('system') !== -1 && method.search('ExtrinsicFailed') !== -1) { - const dispatchError = data[0] as DispatchError; + if (at.events.system.ExtrinsicFailed.is(event)) { + const { dispatchError } = event.data; const moduleThatErrored = dispatchError.asModule; const moduleError = dispatchError.registry.findMetaError(moduleThatErrored); txError = moduleError; @@ -221,7 +219,7 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS this.logger.error(error); } this.logger.debug(`Total capacity withdrawn in block: ${totalBlockCapacity.toString()}`); - return { found: true, success: isTxSuccess, blockHash, capacityWithDrawn: totalBlockCapacity.toString(), error: txError }; + return { found: true, success: isTxSuccess, blockHash, capacityEpoch, capacityWithDrawn: totalBlockCapacity, error: txError }; }, ); const results = await Promise.all(txReceiptPromises); diff --git a/package-lock.json b/package-lock.json index c709e4e..c60add3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5798,11 +5798,11 @@ } }, "node_modules/braces": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.2.tgz", - "integrity": "sha512-b8um+L1RzM3WDSzvhm6gIz1yfTbBt6YTlcEKAvsmqCZZFw46z626lVj9j1yEPW33H5H+lBQpZMP1k8l+78Ha0A==", + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.3.tgz", + "integrity": "sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==", "dependencies": { - "fill-range": "^7.0.1" + "fill-range": "^7.1.1" }, "engines": { "node": ">=8" @@ -7499,9 +7499,9 @@ } }, "node_modules/fill-range": { - "version": "7.0.1", - "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz", - "integrity": "sha512-qOo9F+dMUmC2Lcb4BbVvnKJxTPjCm+RRpe4gDuGrzkL7mEVl/djYSu2OdQ2Pa302N4oqkSg9ir6jaLWJ2USVpQ==", + "version": "7.1.1", + "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.1.1.tgz", + "integrity": "sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==", "dependencies": { "to-regex-range": "^5.0.1" }, @@ -12203,9 +12203,9 @@ } }, "node_modules/thrift/node_modules/ws": { - "version": "5.2.3", - "resolved": "https://registry.npmjs.org/ws/-/ws-5.2.3.tgz", - "integrity": "sha512-jZArVERrMsKUatIdnLzqvcfydI85dvd/Fp1u/VOpfdDWQ4c9qWXe+VIeAbQ5FrDwciAkr+lzofXLz3Kuf26AOA==", + "version": "5.2.4", + "resolved": "https://registry.npmjs.org/ws/-/ws-5.2.4.tgz", + "integrity": "sha512-fFCejsuC8f9kOSu9FYaOw8CdO68O3h5v0lg4p74o8JqWpwTf9tniOD+nOB78aWoVSS6WptVUmDrp/KPsMVBWFQ==", "dependencies": { "async-limiter": "~1.0.0" } @@ -13018,9 +13018,9 @@ "dev": true }, "node_modules/ws": { - "version": "8.17.0", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.17.0.tgz", - "integrity": "sha512-uJq6108EgZMAl20KagGkzCKfMEjxmKvZHG7Tlq0Z6nOky7YF7aq4mOx6xK8TJ/i1LeK4Qus7INktacctDgY8Ow==", + "version": "8.18.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.0.tgz", + "integrity": "sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw==", "engines": { "node": ">=10.0.0" },