Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi chain rewind service #2673

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions packages/cli/src/controller/build-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
// SPDX-License-Identifier: GPL-3.0

import assert from 'assert';
import { readFileSync } from 'fs';
import {readFileSync} from 'fs';
import path from 'path';
import { globSync } from 'glob';
import {globSync} from 'glob';
import TerserPlugin from 'terser-webpack-plugin';
import webpack, { Configuration } from 'webpack';
import { merge } from 'webpack-merge';
import webpack, {Configuration} from 'webpack';
import {merge} from 'webpack-merge';

const getBaseConfig = (
buildEntries: Configuration['entry'],
Expand Down Expand Up @@ -72,7 +72,7 @@ export async function runWebpack(
): Promise<void> {
const config = merge(
getBaseConfig(buildEntries, projectDir, outputDir, isDev),
{ output: { clean } }
{output: {clean}}
// Can allow projects to override webpack config here
);

Expand Down Expand Up @@ -121,7 +121,7 @@ export function getBuildEntries(directory: string): Record<string, string> {
acc[key] = path.resolve(directory, value);
return acc;
},
{ ...buildEntries }
{...buildEntries}
);
}

Expand Down
4 changes: 2 additions & 2 deletions packages/node-core/src/blockchain.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ export interface ICoreBlockchainService<

// Project service
onProjectChange(project: SubQueryProject): Promise<void> | void;
/* Not all networks have a block timestamp, e.g. Shiden */
getBlockTimestamp(height: number): Promise<Date | undefined>;
/* Not all networks have a block timestamp, e.g. Shiden need to request one more get */
getBlockTimestamp(height: number): Promise<Date>;
}

export interface IBlockchainService<
Expand Down
2 changes: 1 addition & 1 deletion packages/node-core/src/configure/SubqueryProject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export class BaseSubqueryProject<
return this.#dataSources;
}

async applyCronTimestamps(getTimestamp: (height: number) => Promise<Date | undefined>): Promise<void> {
async applyCronTimestamps(getTimestamp: (height: number) => Promise<Date>): Promise<void> {
this.#dataSources = await insertBlockFiltersCronSchedules(
this.dataSources,
getTimestamp,
Expand Down
1 change: 1 addition & 0 deletions packages/node-core/src/db/db.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import {DynamicModule, Global} from '@nestjs/common';
import {Sequelize, Options as SequelizeOption} from '@subql/x-sequelize';
import {PoolConfig} from 'pg';
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be removed.

import {NodeConfig} from '../configure/NodeConfig';
import {getLogger} from '../logger';
import {exitWithError} from '../process';
Expand Down
92 changes: 91 additions & 1 deletion packages/node-core/src/db/sync-helper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,19 @@
import {INestApplication} from '@nestjs/common';
import {Test} from '@nestjs/testing';
import {delay} from '@subql/common';
import {hashName} from '@subql/utils';
import {Sequelize} from '@subql/x-sequelize';
import {NodeConfig} from '../configure/NodeConfig';
import {MultiChainRewindEvent} from '../events';
import {RewindLockKey} from '../indexer';
import {DbModule} from './db.module';
import {createSendNotificationTriggerFunction, createNotifyTrigger, getDbSizeAndUpdateMetadata} from './sync-helper';
import {
createSendNotificationTriggerFunction,
createNotifyTrigger,
getDbSizeAndUpdateMetadata,
createRewindTriggerFunction,
createRewindTrigger,
} from './sync-helper';

const nodeConfig = new NodeConfig({subquery: 'packages/node-core/test/v1.0.0', subqueryName: 'test'});

Expand Down Expand Up @@ -185,3 +194,84 @@ describe('sync helper test', () => {
}, 10_000);
});
});

describe('Multi-chain notification', () => {
let app: INestApplication;
let sequelize: Sequelize;
const schema = 'multi-chain-test';

let client: unknown;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please provide the type for this.


const listenerHash = hashName(schema, 'rewind_trigger', '_global');

afterAll(async () => {
await sequelize.dropSchema(schema, {});
await sequelize.close();
return app?.close();
});

afterEach(async () => {
if (client) {
await (client as any).query(`UNLISTEN "${listenerHash}"`);
(client as any).removeAllListeners('notification');
sequelize.connectionManager.releaseConnection(client);
}
});

it('can handle multiple rows in one transaction', async () => {
const module = await Test.createTestingModule({
imports: [DbModule.forRootWithConfig(nodeConfig)],
}).compile();
app = module.createNestApplication();
await app.init();
sequelize = app.get(Sequelize);
await sequelize.createSchema(schema, {});
// mock create global table
await sequelize.query(`
CREATE TABLE IF NOT EXISTS "${schema}"._global (
key VARCHAR(255) NOT NULL PRIMARY KEY,
value JSONB,
"createdAt" TIMESTAMP WITH TIME ZONE NOT NULL,
"updatedAt" TIMESTAMP WITH TIME ZONE NOT NULL
)`);

await sequelize.query(createRewindTriggerFunction(schema));
await sequelize.query(createRewindTrigger(schema));

client = await sequelize.connectionManager.getConnection({
type: 'read',
});
await (client as any).query(`LISTEN "${listenerHash}"`);

const listener = jest.fn();
(client as any).on('notification', (msg: any) => {
console.log('Payload:', msg.payload);
listener(msg.payload);
});

const rewindSqlFromTimestamp = (
timestamp: number
) => `INSERT INTO "${schema}"."_global" ( "key", "value", "createdAt", "updatedAt" )
VALUES
( 'rewindLock', '{"timestamp":${timestamp},"chainNum":1}', now(), now())
ON CONFLICT ( "key" )
DO UPDATE
SET "key" = EXCLUDED."key",
"value" = EXCLUDED."value",
"updatedAt" = EXCLUDED."updatedAt"
WHERE "_global"."key" = '${RewindLockKey}' AND ("_global"."value"->>'timestamp')::BIGINT > ${timestamp}`;
const rewindTimestamp = 1597669506000;
await sequelize.query(rewindSqlFromTimestamp(rewindTimestamp));
await delay(1);

await sequelize.query(rewindSqlFromTimestamp(rewindTimestamp - 1));
await delay(1);

await sequelize.query(`DELETE FROM "${schema}"."_global" WHERE "key" = '${RewindLockKey}'`);
await delay(1);
Comment on lines +265 to +271
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please try and remove the delays. Or reduce them to much smaller times

expect(listener).toHaveBeenCalledTimes(3);
expect(listener).toHaveBeenNthCalledWith(1, MultiChainRewindEvent.Rewind);
expect(listener).toHaveBeenNthCalledWith(2, MultiChainRewindEvent.RewindTimestampDecreased);
expect(listener).toHaveBeenNthCalledWith(3, MultiChainRewindEvent.RewindComplete);
}, 20_000);
});
53 changes: 53 additions & 0 deletions packages/node-core/src/db/sync-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import {
Utils,
} from '@subql/x-sequelize';
import {ModelAttributeColumnReferencesOptions, ModelIndexesOptions} from '@subql/x-sequelize/types/model';
import {MultiChainRewindEvent} from '../events';
import {RewindLockKey} from '../indexer';
import {EnumType} from '../utils';
import {formatAttributes, generateIndexName, modelToTableName} from './sequelizeUtil';
// eslint-disable-next-line @typescript-eslint/no-var-requires
Expand Down Expand Up @@ -297,6 +299,57 @@ export function createSchemaTriggerFunction(schema: string): string {
$$ LANGUAGE plpgsql;`;
}

export function createRewindTrigger(schema: string): string {
const triggerName = hashName(schema, 'rewind_trigger', '_global');

return `
CREATE TRIGGER "${triggerName}"
AFTER INSERT OR UPDATE OR DELETE
ON "${schema}"."_global"
FOR EACH ROW
EXECUTE FUNCTION "${schema}".rewind_notification();
`;
}

export function createRewindTriggerFunction(schema: string): string {
const triggerName = hashName(schema, 'rewind_trigger', '_global');

return `
CREATE OR REPLACE FUNCTION "${schema}".rewind_notification()
RETURNS trigger AS $$
DECLARE
key_value TEXT;
BEGIN
IF TG_OP = 'DELETE' THEN
key_value := OLD.value ->> 'key';
ELSE
key_value := NEW.value ->> 'key';
END IF;

-- Make sure it’s RewindLockKey
IF key_value <> '${RewindLockKey}' THEN
RETURN NULL;
END IF;

IF TG_OP = 'INSERT' THEN
PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.Rewind}');
END IF;

-- During a rollback, there is a chain that needs to be rolled back to an earlier height.
IF TG_OP = 'UPDATE' AND (NEW.value ->> 'timestamp')::BIGINT < (OLD.value ->> 'timestamp')::BIGINT THEN
PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.RewindTimestampDecreased}');
END IF;

IF TG_OP = 'DELETE' THEN
PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.RewindComplete}');
END IF;

RETURN NULL;
END;
$$ LANGUAGE plpgsql;
`;
}

export function getExistedIndexesQuery(schema: string): string {
return `SELECT indexname FROM pg_indexes WHERE schemaname = '${schema}'`;
}
Expand Down
10 changes: 10 additions & 0 deletions packages/node-core/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ export enum PoiEvent {
PoiTarget = 'poi_target',
}

export enum MultiChainRewindEvent {
Rewind = 'rewind',
RewindComplete = 'rewind_complete',
RewindTimestampDecreased = 'timestamp_decreased',
}

export interface RewindPayload {
success: boolean;
height: number;
Expand Down Expand Up @@ -61,3 +67,7 @@ export interface NetworkMetadataPayload {
specName: string;
genesisHash: string;
}

export interface MultiChainRewindPayload {
height: number;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import assert from 'assert';

import {EventEmitter2, OnEvent} from '@nestjs/event-emitter';
import {ICoreBlockchainService} from '@subql/node-core/blockchain.service';
import {hexToU8a, u8aEq} from '@subql/utils';
import {Transaction} from '@subql/x-sequelize';
import {NodeConfig, IProjectUpgradeService} from '../../configure';
Expand Down Expand Up @@ -33,6 +34,7 @@ export interface IBlockDispatcher<B> {
latestBufferedHeight: number;
batchSize: number;

setLatestProcessedHeight(height: number): void;
// Remove all enqueued blocks, used when a dynamic ds is created
flushQueue(height: number): void;
}
Expand Down Expand Up @@ -60,7 +62,8 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
protected queue: Q,
protected storeService: StoreService,
private storeModelProvider: IStoreModelProvider,
private poiSyncService: PoiSyncService
private poiSyncService: PoiSyncService,
private blockChainService: ICoreBlockchainService
) {}

abstract enqueueBlocks(heights: (IBlock<B> | number)[], latestBufferHeight?: number): void | Promise<void>;
Expand Down Expand Up @@ -214,7 +217,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
}

@OnEvent(AdminEvent.rewindTarget)
handleAdminRewind(blockPayload: TargetBlockPayload): void {
async handleAdminRewind(blockPayload: TargetBlockPayload) {
if (this.currentProcessingHeight < blockPayload.height) {
// this will throw back to admin controller, will NOT lead current indexing exit
throw new Error(
Expand All @@ -223,8 +226,10 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
}

// TODO can this work without
const timestamp = await this.blockChainService.getBlockTimestamp(blockPayload.height);
this._pendingRewindHeader = {
blockHeight: Number(blockPayload.height),
timestamp,
Comment on lines +229 to +232
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here now its easiest to call getHeaderForHeight

} as Header;
const message = `Received admin command to rewind to block ${blockPayload.height}`;
monitorWrite(`***** [ADMIN] ${message}`);
Expand Down
45 changes: 24 additions & 21 deletions packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
// Copyright 2020-2025 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import { OnApplicationShutdown } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Interval } from '@nestjs/schedule';
import { BaseDataSource } from '@subql/types-core';
import { IBlockchainService } from '../../blockchain.service';
import { NodeConfig } from '../../configure';
import { IProjectUpgradeService } from '../../configure/ProjectUpgrade.service';
import { IndexerEvent } from '../../events';
import { getBlockHeight, IBlock, PoiSyncService, StoreService } from '../../indexer';
import { getLogger } from '../../logger';
import { exitWithError, monitorWrite } from '../../process';
import { profilerWrap } from '../../profiler';
import { Queue, AutoQueue, RampQueue, delay, isTaskFlushedError } from '../../utils';
import { IStoreModelProvider } from '../storeModelProvider';
import { IIndexerManager, IProjectService, ISubqueryProject } from '../types';
import { BaseBlockDispatcher } from './base-block-dispatcher';
import {OnApplicationShutdown} from '@nestjs/common';
import {EventEmitter2} from '@nestjs/event-emitter';
import {Interval} from '@nestjs/schedule';
import {BaseDataSource} from '@subql/types-core';
import {IBlockchainService} from '../../blockchain.service';
import {NodeConfig} from '../../configure';
import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service';
import {IndexerEvent} from '../../events';
import {getBlockHeight, IBlock, PoiSyncService, StoreService} from '../../indexer';
import {getLogger} from '../../logger';
import {exitWithError, monitorWrite} from '../../process';
import {profilerWrap} from '../../profiler';
import {Queue, AutoQueue, RampQueue, delay, isTaskFlushedError} from '../../utils';
import {IStoreModelProvider} from '../storeModelProvider';
import {IIndexerManager, IProjectService, ISubqueryProject} from '../types';
import {BaseBlockDispatcher} from './base-block-dispatcher';

const logger = getLogger('BlockDispatcherService');

Expand All @@ -27,7 +27,8 @@ type BatchBlockFetcher<B> = (heights: number[]) => Promise<IBlock<B>[]>;
*/
export class BlockDispatcher<B, DS extends BaseDataSource>
extends BaseBlockDispatcher<Queue<IBlock<B> | number>, DS, B>
implements OnApplicationShutdown {
implements OnApplicationShutdown
{
private fetchQueue: AutoQueue<IBlock<B>>;
private processQueue: AutoQueue<void>;

Expand Down Expand Up @@ -57,7 +58,8 @@ export class BlockDispatcher<B, DS extends BaseDataSource>
new Queue(nodeConfig.batchSize * 3),
storeService,
storeModelProvider,
poiSyncService
poiSyncService,
blockchainService
);
this.processQueue = new AutoQueue(nodeConfig.batchSize * 3, 1, nodeConfig.timeout, 'Process');
this.fetchQueue = new RampQueue(
Expand Down Expand Up @@ -177,7 +179,8 @@ export class BlockDispatcher<B, DS extends BaseDataSource>
}
logger.error(
e,
`Failed to index block at height ${header.blockHeight} ${e.handler ? `${e.handler}(${e.stack ?? ''})` : ''
`Failed to index block at height ${header.blockHeight} ${
e.handler ? `${e.handler}(${e.stack ?? ''})` : ''
}`
);
throw e;
Expand All @@ -198,7 +201,7 @@ export class BlockDispatcher<B, DS extends BaseDataSource>
// Do nothing, fetching the block was flushed, this could be caused by forked blocks or dynamic datasources
return;
}
exitWithError(new Error(`Failed to enqueue fetched block to process`, { cause: e }), logger);
exitWithError(new Error(`Failed to enqueue fetched block to process`, {cause: e}), logger);
});

this.eventEmitter.emit(IndexerEvent.BlockQueueSize, {
Expand All @@ -207,7 +210,7 @@ export class BlockDispatcher<B, DS extends BaseDataSource>
}
} catch (e: any) {
if (!this.isShutdown) {
exitWithError(new Error(`Failed to process blocks from queue`, { cause: e }), logger);
exitWithError(new Error(`Failed to process blocks from queue`, {cause: e}), logger);
}
} finally {
this.fetching = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ export class WorkerBlockDispatcher<
initAutoQueue(nodeConfig.workers, nodeConfig.batchSize, nodeConfig.timeout, 'Worker'),
storeService,
storeModelProvider,
poiSyncService
poiSyncService,
blockchainService
);

this.createWorker = () =>
Expand Down
Loading
Loading