Skip to content

Commit

Permalink
feat(squid) add id generation for every message from user (#1639)
Browse files Browse the repository at this point in the history
Co-authored-by: sergey filyanin <[email protected]>
  • Loading branch information
Zewasik and sergeyfilyanin authored Oct 3, 2024
1 parent ce5e449 commit 34c0ed1
Show file tree
Hide file tree
Showing 16 changed files with 168 additions and 24 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/CI-CD-Squid-Explorer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
with:
file: idea/explorer/Dockerfile
push: true
tags: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}-explorer:${{ env.tag }}
tags: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}-explorer-latest:${{ env.tag }}

build-squid-image:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -87,7 +87,7 @@ jobs:
with:
file: idea/squid/Dockerfile
push: true
tags: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}-squid:${{ env.tag }}
tags: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}-squid-latest:${{ env.tag }}

deploy-to-k8s:
needs:
Expand All @@ -108,10 +108,10 @@ jobs:
run: |
if [ "${{ github.ref }}" == "refs/heads/idea-release" ]; then
echo "namespace=prod-idea" >> $GITHUB_ENV
echo "deployments=explorer squid-testnet squid-mainnet" >> $GITHUB_ENV
echo "deployments=explorer-latest squid-testnet-latest squid-mainnet-latest" >> $GITHUB_ENV
else
echo "namespace=dev-1" >> $GITHUB_ENV
echo "deployments=explorer squid-testnet" >> $GITHUB_ENV
echo "deployments=explorer-latest squid-testnet-latest" >> $GITHUB_ENV
fi
- name: Deploy to k8s
Expand Down
6 changes: 5 additions & 1 deletion idea/explorer/src/services/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ export class EventService {
}

@Pagination()
async getEvents({ source, service, name, limit, offset }: ParamGetEvents): Promise<ResManyResult<Event>> {
async getEvents({ source, parentId, service, name, limit, offset }: ParamGetEvents): Promise<ResManyResult<Event>> {
const builder = this._repo.createQueryBuilder('event');

if (source) {
builder.andWhere('event.source = :source', { source });
}

if (parentId) {
builder.andWhere('event.parent_id = :parentId', { parentId });
}

if (service) {
builder.andWhere('event.service ILIKE :service', { service: `%${service.toLowerCase()}%` });
}
Expand Down
5 changes: 5 additions & 0 deletions idea/explorer/src/services/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export class MessageService {
async getMsgsFrom({
source,
destination,
parentId,
isInMailbox,
limit,
offset,
Expand All @@ -105,6 +106,10 @@ export class MessageService {
qb.andWhere('msg.destination = :destination', { destination });
}

if (parentId) {
qb.andWhere('msg.parent_id = :parentId', { parentId });
}

if (isInMailbox) {
qb.andWhere('msg.readReson = NULL').andWhere('msg.expiration != NULL');
}
Expand Down
1 change: 1 addition & 0 deletions idea/explorer/src/types/requests/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ export interface ParamGetEvents extends ParamPagination, ParamGenesis {
service?: string;
name?: string;
source?: string;
parentId?: string;
}
1 change: 1 addition & 0 deletions idea/explorer/src/types/requests/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export class ParamGetMsgsToProgram extends ParamPagination {
export class ParamGetMsgsFromProgram extends ParamPagination {
readonly destination?: string;
readonly source?: string;
readonly parentId?: string;
readonly isInMailbox?: boolean;
readonly service?: string;
readonly fn?: string;
Expand Down
3 changes: 3 additions & 0 deletions idea/indexer-db/src/entities/event.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ export class Event extends BaseEntity {
@Column({ nullable: true })
public payload: string;

@Column({ nullable: true, name: 'parent_id' })
public parentId: string;

@Column({ nullable: true })
public service?: string;

Expand Down
3 changes: 3 additions & 0 deletions idea/indexer-db/src/entities/message-from-program.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ export class MessageFromProgram extends BaseEntity {
@Column({ nullable: true })
public payload: string;

@Column({ nullable: true, name: 'parent_id' })
public parentId: string;

@Column({ default: '0' })
public value: string;

Expand Down
13 changes: 13 additions & 0 deletions idea/squid/db/migrations/1725631344412-Data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module.exports = class Data1725704609634 {
name = 'Data1725704609634'

async up(db) {
await db.query(`ALTER TABLE "message_from_program" ADD "parent_id" character varying`)
await db.query(`ALTER TABLE "event" ADD "parent_id" character varying`)
}

async down(db) {
await db.query(`ALTER TABLE "message_from_program" DROP COLUMN "parent_id"`)
await db.query(`ALTER TABLE "event" DROP COLUMN "parent_id"`)
}
}
2 changes: 2 additions & 0 deletions idea/squid/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"dotenv": "16.4.5",
"indexer-db": "workspace:^",
"pg": "8.11.5",
"redis": "^4.7.0",
"sails-js": "^0.1.9",
"typeorm": "0.3.20"
},
Expand All @@ -31,6 +32,7 @@
"@subsquid/substrate-typegen": "8.1.0",
"@subsquid/typeorm-codegen": "2.0.0",
"@types/node": "20.12.7",
"@types/redis": "^4.0.11",
"ts-node-dev": "^2.0.0",
"typescript": "5.4.5"
}
Expand Down
6 changes: 6 additions & 0 deletions idea/squid/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,10 @@ export const config = {
fromBlock: parseInt(process.env.FROM_BLOCK || '0'),
toBlock: parseInt(process.env.TO_BLOCK) || undefined,
},
redis: {
host: process.env.REDIS_HOST || '127.0.0.1',
port: parseInt(process.env.REDIS_PORT) || 6379,
user: process.env.REDIS_USER || '',
password: process.env.REDIS_PASSWORD || '',
},
};
17 changes: 14 additions & 3 deletions idea/squid/src/event.route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ export interface IHandleEventProps {
block: Block<Fields>;
}

const callHandlers: Array<{ pattern: (obj: any) => boolean; handler: (args: IHandleCallProps) => void }> = [
const callHandlers: Array<{
pattern: (obj: any) => boolean;
handler: (args: IHandleCallProps) => void;
}> = [
{ pattern: isUploadProgram, handler: handleUploadProgram },
{ pattern: isSendMessageCall, handler: handleSendMessageCall },
{ pattern: isVoucherCall, handler: handleVoucherCall },
Expand Down Expand Up @@ -72,8 +75,11 @@ export async function handleUserMessageSent({ event, common, tempState }: IHandl
value: event.args.message.value,
replyToMessageId: event.args.message.details?.to || null,
expiration: event.args.expirtaion || null,
exitCode: event.args.message.details?.code?.__kind === 'Success' ? 0 : 1,
exitCode: !event.args.message.details?.code ? null : event.args.message.details.code.__kind === 'Success' ? 0 : 1,
});

msg.parentId = msg.replyToMessageId ? msg.replyToMessageId : await tempState.getMessageId(msg.id);

if (event.args.message.destination === ZERO_ADDRESS) {
tempState.addEvent(msg);
} else {
Expand Down Expand Up @@ -140,7 +146,12 @@ export async function handleCodeChanged({ event, common, tempState }: IHandleEve
}

export async function handleMessagesDispatched({ event, tempState }: IHandleEventProps) {
await Promise.all(event.args.statuses.map((s) => tempState.setDispatchedStatus(s[0], s[1].__kind)));
await Promise.all(
event.args.statuses.map((s) => {
tempState.removeParentMsgId(s[0]);
return tempState.setDispatchedStatus(s[0], s[1].__kind);
}),
);
}

const reasons = {
Expand Down
34 changes: 26 additions & 8 deletions idea/squid/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import {
handleUserMessageSent,
IHandleEventProps,
} from './event.route';
import { createClient, RedisClientType } from 'redis';
import { config } from './config';
import { GearApi } from '@gear-js/api';

let tempState: TempState;

const callHandlers: Array<{ pattern: (obj: any) => boolean; handler: (args: IHandleEventProps) => Promise<void> }> = [
const eventHandlers: Array<{ pattern: (obj: any) => boolean; handler: (args: IHandleEventProps) => Promise<void> }> = [
{ pattern: isMessageQueued, handler: handleMessageQueued },
{ pattern: isUserMessageSent, handler: handleUserMessageSent },
{ pattern: isProgramChanged, handler: handleProgramChanged },
Expand All @@ -42,7 +45,7 @@ const handler = async (ctx: ProcessorContext<Store>) => {
};

for (const event of block.events) {
const { handler } = callHandlers.find(({ pattern }) => pattern(event));
const { handler } = eventHandlers.find(({ pattern }) => pattern(event));

if (!handler) {
continue;
Expand All @@ -55,12 +58,27 @@ const handler = async (ctx: ProcessorContext<Store>) => {
await tempState.save();
};

const main = async () => {
tempState = new TempState();
interface RedisClient extends RedisClientType<any, any, any> {}

const main = async (api: GearApi) => {
const redisClient: RedisClient = createClient({
username: config.redis.user,
password: config.redis.password,
socket: {
host: config.redis.host,
port: config.redis.port,
},
});
await redisClient.connect();

tempState = new TempState(redisClient, api.genesisHash.toHex());
api.disconnect();
processor.run(new TypeormDatabase({ supportHotBlocks: true }), handler);
};

main().catch((e) => {
console.error(e);
process.exit(1);
});
GearApi.create({ providerAddress: config.squid.rpc })
.then(main)
.catch((e) => {
console.error(e);
process.exit(1);
});
49 changes: 47 additions & 2 deletions idea/squid/src/temp-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import {
} from './model';
import { Block, ProcessorContext } from './processor';
import { MessageStatus } from './common';
import { RedisClientType } from '@redis/client';
import { findChildMessageId } from './util';

const gearProgramModule = xxhashAsHex('GearProgram', 128);
const programStorageMethod = xxhashAsHex('ProgramStorage', 128);
Expand Down Expand Up @@ -49,29 +51,43 @@ export class TempState {
private messagesFromProgram: Map<string, MessageFromProgram>;
private messagesToProgram: Map<string, MessageToProgram>;
private events: Map<string, Event>;
private cachedMessages: { [key: string]: number };
private genesisHash: string;
private _ctx: ProcessorContext<Store>;
private _metadata: Metadata;
private _registry: TypeRegistry;
private _specVersion: number;
private _programStorageTy: string;
private _redis: RedisClientType;

constructor() {
constructor(redisClient: RedisClientType, genesisHash: string) {
this._redis = redisClient;
this.genesisHash = genesisHash;
this.programs = new Map();
this.codes = new Map();
this.messagesFromProgram = new Map();
this.messagesToProgram = new Map();
this.events = new Map();
this.newPrograms = new Set();
this.cachedMessages = {};
}

newState(ctx: ProcessorContext<Store>) {
async newState(ctx: ProcessorContext<Store>) {
this._ctx = ctx;
this.programs.clear();
this.codes.clear();
this.messagesFromProgram.clear();
this.messagesToProgram.clear();
this.events.clear();
this.newPrograms.clear();

this._redis.on('error', (error) => this._ctx.log.error(error));

const temp = Object.entries(await this._redis.hGetAll(this.genesisHash));
this.cachedMessages = {};
temp.forEach(([key, value]) => {
this.cachedMessages[key] = Number(value);
});
}

addProgram(program: Program) {
Expand All @@ -89,6 +105,8 @@ export class TempState {
msg.service = service;
msg.fn = name;

this.saveParentMsgId(msg.id);

this.messagesToProgram.set(msg.id, msg);
}

Expand All @@ -114,6 +132,7 @@ export class TempState {
blockHash: msg.blockHash,
blockNumber: msg.blockNumber,
id: msg.id,
parentId: msg.parentId,
source: msg.source,
payload: msg.payload,
service,
Expand Down Expand Up @@ -296,11 +315,37 @@ export class TempState {
'Data saved',
);
}

await this._redis.del(this.genesisHash);
if (Object.keys(this.cachedMessages).length > 0) {
await this._redis.hSet(this.genesisHash, this.cachedMessages);
}
} catch (error) {
this._ctx.log.error({ error: error.message, stack: error.stack }, 'Failed to save data');
throw error;
}
}

saveParentMsgId(parentId: string, nonce: number = 0) {
this.cachedMessages[parentId] = nonce;
return parentId;
}

removeParentMsgId(parentId: string) {
delete this.cachedMessages[parentId];
}

async getMessageId(childId: string) {
const finder = Object.entries(this.cachedMessages).map(([parentId, nonce]) => {
return findChildMessageId(parentId, childId, Number(nonce));
});

return Promise.any(finder)
.then(({ parentId, nonce }) => {
return this.saveParentMsgId(parentId, nonce);
})
.catch<null>(() => null);
}
}

const getAllNeccesaryTypes = (metadata: Metadata, tyindex: SiLookupTypeId | number): Record<string, string> => {
Expand Down
26 changes: 22 additions & 4 deletions idea/squid/src/util.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { CUploadCode, CUploadProgram, CVoucherCall, isUploadCode, isUploadProgram } from './types/calls';
import { getGrReply } from '@gear-js/api';
import { u8aToHex } from '@polkadot/util';
import { getGrReply, CreateType } from '@gear-js/api';
import { u8aToHex, u8aToU8a, u8aConcat, stringToU8a } from '@polkadot/util';
import { blake2AsHex } from '@polkadot/util-crypto';

export async function getMetahash(call: CUploadCode | CUploadProgram | CVoucherCall): Promise<string | null> {
const code =
isUploadCode(call) || isUploadProgram(call)
? call.args.code
: call.args.call.__kind === 'UploadCode'
? call.args.call.code
: null;
? call.args.call.code
: null;

if (code) {
let metahash: Uint8Array;
Expand All @@ -24,3 +25,20 @@ export async function getMetahash(call: CUploadCode | CUploadProgram | CVoucherC

return null;
}

const prefix = stringToU8a('outgoing');
const nonces = Array.from({ length: 512 }, (_v, i) => CreateType.create('u32', i).toU8a());

export async function findChildMessageId(parentId: string, idToFind: string, startNonce: number = 0) {
const msgId = u8aToU8a(parentId);

for (let i = startNonce; i < nonces.length; i++) {
const childId = blake2AsHex(u8aConcat(prefix, msgId, nonces[i]));

if (childId === idToFind) {
return { parentId, nonce: i };
}
}

throw Error('Child id not found');
}
Loading

0 comments on commit 34c0ed1

Please sign in to comment.