Skip to content
This repository has been archived by the owner on Jul 25, 2024. It is now read-only.

Commit

Permalink
fix: extrinsic monitoring and capacity accounting (#147)
Browse files Browse the repository at this point in the history
# Description
This PR fixes 2 bugs in the Worker related to extrinsic completion
monitoring and capacity accounting:
- Ensures that ONLY the events associated with the target extrinsic are
processed in a block for both capacity accounting and extrinsic
success/failure/timeout determination
- Ensures the correct capacity epoch is used when recording the
withdrawn capacity for an extrinsic

Closes #146
  • Loading branch information
JoeCap08055 authored Jul 12, 2024
1 parent 9ede276 commit 96cd94c
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 41 deletions.
1 change: 0 additions & 1 deletion apps/worker/src/interfaces/status-monitor.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { IPublisherJob } from './publisher-job.interface';
export interface ITxMonitorJob {
id: string;
txHash: Hash;
epoch: string;
lastFinalizedBlockHash: BlockHash;
referencePublishJob: IPublisherJob;
}
6 changes: 6 additions & 0 deletions apps/worker/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { NestFactory } from '@nestjs/core';
import { WorkerModule } from './worker.module';

// Monkey-patch BigInt so that JSON.stringify will work
// eslint-disable-next-line
BigInt.prototype['toJSON'] = function () {
return this.toString();
};

async function bootstrap() {
const app = await NestFactory.createApplicationContext(WorkerModule);
app.enableShutdownHooks();
Expand Down
8 changes: 3 additions & 5 deletions apps/worker/src/monitor/tx.status.monitor.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import { IPublisherJob } from '../interfaces/publisher-job.interface';
export class TxStatusMonitoringService extends BaseConsumer {
constructor(
@InjectRedis() private cacheManager: Redis,
@InjectQueue(TRANSACTION_RECEIPT_QUEUE_NAME) private txReceiptQueue: Queue,
@InjectQueue(PUBLISH_QUEUE_NAME) private publishQueue: Queue,
private blockchainService: BlockchainService,
) {
Expand All @@ -31,7 +30,6 @@ export class TxStatusMonitoringService extends BaseConsumer {
this.logger.log(`Monitoring job ${job.id} of type ${job.name}`);
try {
const numberBlocksToParse = NUMBER_BLOCKS_TO_CRAWL;
const txCapacityEpoch = job.data.epoch;
const previousKnownBlockNumber = (await this.blockchainService.getBlock(job.data.lastFinalizedBlockHash)).block.header.number.toBigInt();
const currentFinalizedBlockNumber = await this.blockchainService.getLatestFinalizedBlockNumber();
const blockList: bigint[] = [];
Expand All @@ -52,7 +50,7 @@ export class TxStatusMonitoringService extends BaseConsumer {
}
} else {
// found the tx
await this.setEpochCapacity(txCapacityEpoch, BigInt(txResult.capacityWithDrawn ?? 0n));
await this.setEpochCapacity(txResult.capacityEpoch ?? 0, txResult.capacityWithdrawn ?? 0n);
if (txResult.error) {
this.logger.debug(`Error found in tx result: ${JSON.stringify(txResult.error)}`);
const errorReport = await this.handleMessagesFailure(job.data.id, txResult.error);
Expand Down Expand Up @@ -107,13 +105,13 @@ export class TxStatusMonitoringService extends BaseConsumer {
return { pause: false, retry: false };
}

private async setEpochCapacity(epoch: string, capacityWithdrew: bigint): Promise<void> {
private async setEpochCapacity(epoch: number, capacityWithdrawn: bigint): Promise<void> {
const epochCapacityKey = `epochCapacity:${epoch}`;

try {
const savedCapacity = await this.cacheManager.get(epochCapacityKey);
const epochCapacity = BigInt(savedCapacity ?? 0);
const newEpochCapacity = epochCapacity + capacityWithdrew;
const newEpochCapacity = epochCapacity + capacityWithdrawn;

const epochDurationBlocks = await this.blockchainService.getCurrentEpochLength();
const epochDuration = epochDurationBlocks * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND;
Expand Down
6 changes: 2 additions & 4 deletions apps/worker/src/publisher/publishing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ export class PublishingService extends BaseConsumer implements OnApplicationBoot
this.logger.log(`Processing job ${job.id} of type ${job.name}`);
try {
const currentBlockHash = await this.blockchainService.getLatestFinalizedBlockHash();
const currentCapacityEpoch = await this.blockchainService.getCurrentCapacityEpoch();
const txHash = await this.ipfsPublisher.publish(job.data);
await this.sendJobToTxReceiptQueue(job.data, txHash, currentBlockHash, currentCapacityEpoch.toString());
await this.sendJobToTxReceiptQueue(job.data, txHash, currentBlockHash);
this.logger.verbose(`Successfully completed job ${job.id}`);
return { success: true };
} catch (e) {
Expand All @@ -70,10 +69,9 @@ export class PublishingService extends BaseConsumer implements OnApplicationBoot
}
}

async sendJobToTxReceiptQueue(jobData: IPublisherJob, txHash: Hash, lastFinalizedBlockHash: BlockHash, epoch: string): Promise<void> {
async sendJobToTxReceiptQueue(jobData: IPublisherJob, txHash: Hash, lastFinalizedBlockHash: BlockHash): Promise<void> {
const job: ITxMonitorJob = {
id: txHash.toString(),
epoch,
lastFinalizedBlockHash,
txHash,
referencePublishJob: jobData,
Expand Down
34 changes: 16 additions & 18 deletions libs/common/src/blockchain/blockchain.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
nextEpochStart: number;
remainingCapacity: bigint;
totalCapacityIssued: bigint;
currentEpoch: bigint;
currentEpoch: number;
}> {
const providerU64 = this.api.createType('u64', providerId);
const { epochStart }: PalletCapacityEpochInfo = await this.query('capacity', 'currentEpochInfo');
Expand All @@ -138,9 +138,9 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
};
}

public async getCurrentCapacityEpoch(): Promise<bigint> {
public async getCurrentCapacityEpoch(): Promise<number> {
const currentEpoch: u32 = await this.query('capacity', 'currentEpoch');
return typeof currentEpoch === 'number' ? BigInt(currentEpoch) : currentEpoch.toBigInt();
return currentEpoch.toNumber();
}

public async getCurrentEpochLength(): Promise<number> {
Expand All @@ -165,27 +165,28 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
txHash: Hash,
blockList: bigint[],
successEvents: [{ pallet: string; event: string }],
): Promise<{ found: boolean; success: boolean; blockHash?: BlockHash; capacityWithDrawn?: string; error?: RegistryError }> {
const txReceiptPromises: Promise<{ found: boolean; success: boolean; blockHash?: BlockHash; capacityWithDrawn?: string; error?: RegistryError }>[] = blockList.map(
): Promise<{ found: boolean; success: boolean; blockHash?: BlockHash; capacityEpoch?: number; capacityWithdrawn?: bigint; error?: RegistryError }> {
const txReceiptPromises: Promise<{ found: boolean; success: boolean; blockHash?: BlockHash; capacityWithdrawn?: bigint; error?: RegistryError }>[] = blockList.map(
async (blockNumber) => {
const blockHash = await this.getBlockHash(blockNumber);
const block = await this.getBlock(blockHash);
const txInfo = block.block.extrinsics.find((extrinsic) => extrinsic.hash.toString() === txHash.toString());
const txIndex = block.block.extrinsics.findIndex((extrinsic) => extrinsic.hash.toString() === txHash.toString());

if (!txInfo) {
if (txIndex === -1) {
return { found: false, success: false };
}

this.logger.verbose(`Found tx ${txHash} in block ${blockNumber}`);
const at = await this.api.at(blockHash.toHex());
const eventsPromise = firstValueFrom(at.query.system.events());
const at = await this.apiPromise.at(blockHash.toHex());
const capacityEpoch = (await at.query.capacity.currentEpoch()).toNumber();
const eventsPromise = at.query.system.events();

let isTxSuccess = false;
let totalBlockCapacity = 0n;
let txError: RegistryError | undefined;

try {
const events = await eventsPromise;
const events = (await eventsPromise).filter(({ phase }) => phase.isApplyExtrinsic && phase.asApplyExtrinsic.eq(txIndex));

events.forEach((record) => {
const { event } = record;
Expand All @@ -195,11 +196,8 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
this.logger.debug(`Received event: ${eventName} ${method} ${data}`);

// find capacity withdrawn event
if (eventName.search('capacity') !== -1 && method.search('Withdrawn') !== -1) {
// allow lowercase constructor for eslint
// eslint-disable-next-line new-cap
const currentCapacity: u128 = new u128(this.api.registry, data[1]);
totalBlockCapacity += currentCapacity.toBigInt();
if (at.events.capacity.CapacityWithdrawn.is(event)) {
totalBlockCapacity += event.data.amount.toBigInt();
}

// check custom success events
Expand All @@ -209,8 +207,8 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
}

// check for system extrinsic failure
if (eventName.search('system') !== -1 && method.search('ExtrinsicFailed') !== -1) {
const dispatchError = data[0] as DispatchError;
if (at.events.system.ExtrinsicFailed.is(event)) {
const { dispatchError } = event.data;
const moduleThatErrored = dispatchError.asModule;
const moduleError = dispatchError.registry.findMetaError(moduleThatErrored);
txError = moduleError;
Expand All @@ -221,7 +219,7 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS
this.logger.error(error);
}
this.logger.debug(`Total capacity withdrawn in block: ${totalBlockCapacity.toString()}`);
return { found: true, success: isTxSuccess, blockHash, capacityWithDrawn: totalBlockCapacity.toString(), error: txError };
return { found: true, success: isTxSuccess, blockHash, capacityEpoch, capacityWithDrawn: totalBlockCapacity, error: txError };
},
);
const results = await Promise.all(txReceiptPromises);
Expand Down
26 changes: 13 additions & 13 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 96cd94c

Please sign in to comment.