Skip to content

Commit

Permalink
feat(api): subscribe to blocks and events starting from a particular …
Browse files Browse the repository at this point in the history
…block (#1528)
  • Loading branch information
osipov-mit authored Apr 12, 2024
1 parent 825b6ae commit 59cff46
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 97 deletions.
8 changes: 8 additions & 0 deletions api/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## 0.37.2

_04/11/2024_

### Changes
https://github.com/gear-tech/gear-js/pull/1528
- Subscribe to blocks and gear events starting from a particular block

## 0.37.1

_04/10/2024_
Expand Down
2 changes: 1 addition & 1 deletion api/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@gear-js/api",
"version": "0.37.1",
"version": "0.37.2",
"description": "A JavaScript library that provides functionality to connect GEAR Component APIs.",
"main": "cjs/index.js",
"module": "index.js",
Expand Down
45 changes: 44 additions & 1 deletion api/src/Blocks.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { AnyNumber, AnyTuple } from '@polkadot/types/types';
import { BlockHash, BlockNumber, SignedBlock } from '@polkadot/types/interfaces';
import { BlockHash, BlockNumber, Header, SignedBlock } from '@polkadot/types/interfaces';
import { Compact, GenericExtrinsic, Vec, u64 } from '@polkadot/types';
import { isHex, isNumber, isU8a } from '@polkadot/util';
import { FrameSystemEventRecord } from '@polkadot/types/lookup';
Expand Down Expand Up @@ -132,4 +132,47 @@ export class GearBlock {
async getFinalizedHead(): Promise<BlockHash> {
return this.api.rpc.chain.getFinalizedHead();
}

async subscribeToHeadsFrom(
from: number | HexString,
cb: (header: Header) => Promise<void> | void,
blocks: 'finalized' | 'latest' = 'latest',
): Promise<() => void> {
let blockNumber = typeof from === 'string' ? (await this.getBlockNumber(from)).toNumber() : from;

const lastHeader =
blocks === 'finalized'
? await this.api.rpc.chain.getHeader(await this.getFinalizedHead())
: await this.api.rpc.chain.getHeader();

let lastHeadNumber = lastHeader.number.toNumber();

let unsubscribed = false;

const unsub = await this.api.rpc.chain[blocks === 'finalized' ? 'subscribeFinalizedHeads' : 'subscribeNewHeads'](
(header) => {
lastHeadNumber = header.number.toNumber();
if (blockNumber >= lastHeadNumber) {
cb(header);
}
},
);

let oldBlocksSub = async () => {
while (!unsubscribed && lastHeadNumber > blockNumber) {
const hash = await this.api.rpc.chain.getBlockHash(blockNumber);
const header = await this.api.rpc.chain.getHeader(hash);
await cb(header);
blockNumber++;
}
};

oldBlocksSub();

return () => {
unsubscribed = true;
oldBlocksSub = null;
unsub();
};
}
}
37 changes: 35 additions & 2 deletions api/src/Message.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { SubmittableExtrinsic, VoidFn } from '@polkadot/api/types';
import { SubmittableExtrinsic, UnsubscribePromise, VoidFn } from '@polkadot/api/types';
import { HexString } from '@polkadot/util/types';
import { ISubmittableResult } from '@polkadot/types/types';
import { ReplaySubject } from 'rxjs';

import { ICalculateReplyForHandleOptions, MessageSendOptions, MessageSendReplyOptions, ReplyInfo } from './types';
import { SendMessageError, SendReplyError } from './errors';
import { UserMessageSent, UserMessageSentData } from './events';
import {
decodeAddress,
encodePayload,
Expand All @@ -16,7 +17,6 @@ import {
import { GearTransaction } from './Transaction';
import { ProgramMetadata } from './metadata';
import { SPEC_VERSION } from './consts';
import { UserMessageSentData } from './events';

export class GearMessage extends GearTransaction {
/**
Expand Down Expand Up @@ -271,6 +271,39 @@ export class GearMessage extends GearTransaction {
}
}

/**
* ## Get event with reply message
* @param msgId - id of sent message
* @param txBlock - number or hash of block where the message was sent
* @returns UserMessageSent event
*/
async getReplyEvent(programId: HexString, msgId: HexString | null, txBlock: HexString | number) {
let unsub: UnsubscribePromise;

const replyEvent = new Promise<UserMessageSent>((resolve) => {
unsub = this._api.gearEvents.subscribeToGearEvent(
'UserMessageSent',
(event) => {
if (event.data.message.source.eq(programId) === false) return;

if (msgId === null) {
resolve(event);
}

if (event.data.message.details.isSome && event.data.message.details.unwrap().to.toHex() === msgId) {
resolve(event);
}
},
txBlock,
);
});

(await unsub)();

return replyEvent;
}

/** @deprecated */
listenToReplies(programId: HexString, bufferSize = 5) {
let unsub: VoidFn;
const subject = new ReplaySubject<[HexString, UserMessageSentData]>(bufferSize);
Expand Down
53 changes: 32 additions & 21 deletions api/src/events/Events.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { FrameSystemAccountInfo } from '@polkadot/types/lookup';
import { FrameSystemAccountInfo, FrameSystemEventRecord } from '@polkadot/types/lookup';
import { HexString } from '@polkadot/util/types';
import { UnsubscribePromise } from '@polkadot/api/types';

import { IBalanceCallback, IBlocksCallback } from '../types';
import { IGearEvent, IGearVoucherEvent } from './types';
import { Transfer, UserMessageSent } from './GearEvents';
import { GearApi } from '../GearApi';
import { Vec } from '@polkadot/types-codec';

export class GearEvents {
private api: GearApi;
Expand All @@ -17,30 +18,40 @@ export class GearEvents {
subscribeToGearEvent<M extends keyof IGearEvent>(
method: M,
callback: (event: IGearEvent[M]) => void | Promise<void>,
fromBlock?: number | HexString,
blocks: 'finalized' | 'latest' = 'latest',
) {
const handler = (events: Vec<FrameSystemEventRecord>) => {
events
.filter(({ event }) => event.method === method)
.forEach(({ event }) => {
callback(event as IGearEvent[M]);
});
};

if (fromBlock) {
return this.api.blocks.subscribeToHeadsFrom(
fromBlock,
(header) => {
this.api
.at(header.hash)
.then((apiAt) => apiAt.query.system.events())
.then(handler);
},
blocks,
);
}

if (blocks === 'latest') {
return this.api.query.system.events((events) => {
events
.filter(({ event }) => event.method === method)
.forEach(({ event }) => {
callback(event as IGearEvent[M]);
});
});
} else {
return this.api.rpc.chain.subscribeFinalizedHeads(async (header) => {
await this.api
.at(header.hash)
.then((apiAt) => apiAt.query.system.events())
.then((events) =>
events
.filter(({ event }) => event.method === method)
.forEach(({ event }) => {
callback(event as IGearEvent[M]);
}),
);
});
return this.api.query.system.events(handler);
}

return this.api.rpc.chain.subscribeFinalizedHeads((header) => {
this.api
.at(header.hash)
.then((apiAt) => apiAt.query.system.events())
.then(handler);
});
}

subscribeToGearVoucherEvent<M extends keyof IGearVoucherEvent>(
Expand Down
11 changes: 7 additions & 4 deletions api/test/Gas.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { u64 } from '@polkadot/types-codec';

import { GearApi, ProgramMetadata } from '../src';
import { TARGET, TEST_GAS_META, WS_ADDRESS } from './config';
import { checkInit, getAccount, listenToUserMessageSent, sendTransaction, sleep } from './utilsFunctions';
import { checkInit, getAccount, sendTransaction, sleep } from './utilsFunctions';
import { GasInfo } from '../src/types';
import { decodeAddress } from '../src/utils';

Expand Down Expand Up @@ -115,9 +115,12 @@ describe('Calculate gas', () => {
},
meta,
);
const waitForReply = listenToUserMessageSent(api, programId);
await sendTransaction(tx, alice, ['MessageQueued']);
const { message } = await waitForReply(null);
const [_, blockHash] = await sendTransaction(tx, alice, ['MessageQueued']);

const {
data: { message },
} = await api.message.getReplyEvent(programId, null, blockHash);

expect(message.id).toBeDefined();
messageId = message.id.toHex();
expect(message.details).toBeDefined();
Expand Down
17 changes: 17 additions & 0 deletions api/test/GearApi.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,23 @@ describe('Blocks', () => {
const blockNumber = await api.blocks.getBlockNumber(hash.toHex());
expect(blockNumber.toNumber()).toBe(1);
});

test('subscribe heads from', async () => {
const blocks: number[] = [];

const unsub = await api.blocks.subscribeToHeadsFrom(1, (header) => {
if (blocks.includes(header.number.toNumber())) throw new Error('Block already exists in the array');
blocks.push(header.number.toNumber());
});

await sleep(7_000);

unsub();

for (let i = 1; i < Math.max(...blocks); i++) {
expect(blocks).toContain(i);
}
});
});

describe('Runtime consts', () => {
Expand Down
13 changes: 6 additions & 7 deletions api/test/Message.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,14 @@ describe('Gear Message', () => {
metadata,
);

const waitForReply = api.message.listenToReplies(programId);

const [txData] = await sendTransaction(tx, alice, ['MessageQueued']);
const [txData, blockHash] = await sendTransaction(tx, alice, ['MessageQueued']);
expect(txData).toBeDefined();
expect(blockHash).toBeDefined();

const reply = await waitForReply(txData.id.toHex());
expect(reply?.message.details.isSome).toBeTruthy();
expect(reply?.message.details.unwrap().code.isSuccess).toBeTruthy();
expect(reply?.message.payload.toHex()).toBe(message.reply);
const reply = await api.message.getReplyEvent(programId, txData.id.toHex(), blockHash);
expect(reply.data.message.details.isSome).toBeTruthy();
expect(reply.data.message.details.unwrap().code.isSuccess).toBeTruthy();
expect(reply.data.message.payload.toHex()).toBe(message.reply);
}
});

Expand Down
25 changes: 13 additions & 12 deletions api/test/Program.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ describe('New Program', () => {
}
});

const waitForReply = api.message.listenToReplies(programId);

const [pcData, mqData] = await sendTransaction(program.extrinsic, alice, ['ProgramChanged', 'MessageQueued']);
const [pcData, mqData, blockHash] = await sendTransaction(program.extrinsic, alice, [
'ProgramChanged',
'MessageQueued',
]);

expect(pcData.id.toHex()).toBe(programId);
expect(pcData.change.isProgramSet).toBeTruthy();
Expand All @@ -75,12 +76,12 @@ describe('New Program', () => {

expect(await status).toBe('success');

const reply = await waitForReply(mqData.id.toHex());
expect(metadata.createType(metadata.types.init.output!, reply.message.payload).toJSON()).toMatchObject({ One: 1 });
const reply = await api.message.getReplyEvent(programId, mqData.id.toHex(), blockHash);
expect(metadata.createType(metadata.types.init.output!, reply.data.message.payload).toJSON()).toMatchObject({
One: 1,
});
expect(isProgramSetHappened).toBeTruthy();
expect(isActiveHappened).toBeTruthy();
// expect(programSetExpiration!).toBe(activeExpiration!);
// expiration = activeExpiration!;
});

test.skip('Wait when program will be paused', async () => {
Expand Down Expand Up @@ -113,18 +114,18 @@ describe('New Program', () => {
programChangedStatuses.push(st);
});

const waitForReply = api.message.listenToReplies(programId);

const [transactionData] = await sendTransaction(api.program, alice, ['MessageQueued']);
const [transactionData, blockHash] = await sendTransaction(api.program, alice, ['MessageQueued']);

expect(transactionData.destination.toHex()).toBe(programId);
expect(await status).toBe('success');

expect(programChangedStatuses).toContain('ProgramSet');
expect(programChangedStatuses).toContain('Active');

const reply = await waitForReply(transactionData.id.toHex());
expect(metadata.createType(metadata.types.init.output!, reply.message.payload).toJSON()).toMatchObject({ One: 1 });
const reply = await api.message.getReplyEvent(programId, transactionData.id.toHex(), blockHash);
expect(metadata.createType(metadata.types.init.output!, reply.data.message.payload).toJSON()).toMatchObject({
One: 1,
});
});

test('Throw error if value is incorrect', () => {
Expand Down
12 changes: 6 additions & 6 deletions api/test/Voucher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,14 @@ describe('Voucher', () => {
metadata.types.reply!,
);

const waitForReply = api.message.listenToReplies(programId);

const [txData] = await sendTransaction(api.voucher.call(voucher, { SendReply: tx }), charlie, ['MessageQueued']);
const [txData, blockHash] = await sendTransaction(api.voucher.call(voucher, { SendReply: tx }), charlie, [
'MessageQueued',
]);
expect(txData).toBeDefined();

const reply = await waitForReply(msgId);
expect(reply?.message.details.isSome).toBeTruthy();
expect(reply?.message.details.unwrap().code.isSuccess).toBeTruthy();
const reply = await api.message.getReplyEvent(programId, msgId, blockHash);
expect(reply.data.message.details.isSome).toBeTruthy();
expect(reply.data.message.details.unwrap().code.isSuccess).toBeTruthy();
});

test('Update voucher', async () => {
Expand Down
Loading

0 comments on commit 59cff46

Please sign in to comment.