diff --git a/packages/node-core/src/blockchain.service.ts b/packages/node-core/src/blockchain.service.ts index 924cb49371..b9b4faf507 100644 --- a/packages/node-core/src/blockchain.service.ts +++ b/packages/node-core/src/blockchain.service.ts @@ -13,8 +13,10 @@ export interface ICoreBlockchainService< // Project service onProjectChange(project: SubQueryProject): Promise | void; - /* Not all networks have a block timestamp, e.g. Shiden */ - getBlockTimestamp(height: number): Promise; + /* Not all networks have a block timestamp, e.g. Shiden need to request one more get */ + getBlockTimestamp(height: number): Promise; + + getHeaderForHeight(height: number): Promise
; } export interface IBlockchainService< @@ -57,7 +59,6 @@ export interface IBlockchainService< // Unfinalized blocks getHeaderForHash(hash: string): Promise
; - getHeaderForHeight(height: number): Promise
; // Dynamic Ds sevice /** diff --git a/packages/node-core/src/configure/SubqueryProject.ts b/packages/node-core/src/configure/SubqueryProject.ts index e328a6d66d..e23b53c05d 100644 --- a/packages/node-core/src/configure/SubqueryProject.ts +++ b/packages/node-core/src/configure/SubqueryProject.ts @@ -124,7 +124,7 @@ export class BaseSubqueryProject< return this.#dataSources; } - async applyCronTimestamps(getTimestamp: (height: number) => Promise): Promise { + async applyCronTimestamps(getTimestamp: (height: number) => Promise): Promise { this.#dataSources = await insertBlockFiltersCronSchedules( this.dataSources, getTimestamp, diff --git a/packages/node-core/src/db/sync-helper.test.ts b/packages/node-core/src/db/sync-helper.test.ts index 1751cd7ca5..1820b1086f 100644 --- a/packages/node-core/src/db/sync-helper.test.ts +++ b/packages/node-core/src/db/sync-helper.test.ts @@ -4,7 +4,9 @@ import {INestApplication} from '@nestjs/common'; import {Test} from '@nestjs/testing'; import {delay} from '@subql/common'; +import {hashName} from '@subql/utils'; import {Sequelize} from '@subql/x-sequelize'; +import {PoolClient} from 'pg'; import {NodeConfig} from '../configure/NodeConfig'; import {DbModule} from './db.module'; import {createSendNotificationTriggerFunction, createNotifyTrigger, getDbSizeAndUpdateMetadata} from './sync-helper'; @@ -45,7 +47,7 @@ describe('sync helper test', () => { }, 50000); describe('has the correct notification trigger payload', () => { - let client: unknown; + let client: PoolClient; afterEach(async () => { if (client) { @@ -76,11 +78,11 @@ describe('sync helper test', () => { await sequelize.query(createSendNotificationTriggerFunction(schema)); await sequelize.query(createNotifyTrigger(schema, tableName)); - client = await sequelize.connectionManager.getConnection({ + client = (await sequelize.connectionManager.getConnection({ type: 'read', - }); + })) as PoolClient; - await (client as any).query('LISTEN "0xc4e66f9e1358fa3c"'); + await client.query('LISTEN "0xc4e66f9e1358fa3c"'); const listener = jest.fn(); (client as any).on('notification', (msg: any) => { @@ -130,9 +132,9 @@ describe('sync helper test', () => { await sequelize.query(createSendNotificationTriggerFunction(schema)); await sequelize.query(createNotifyTrigger(schema, tableName)); - client = await sequelize.connectionManager.getConnection({ + client = (await sequelize.connectionManager.getConnection({ type: 'read', - }); + })) as PoolClient; await (client as any).query('LISTEN "0xc4e66f9e1358fa3c"'); diff --git a/packages/node-core/src/db/sync-helper.ts b/packages/node-core/src/db/sync-helper.ts index bc3b6ac744..610005bf6c 100644 --- a/packages/node-core/src/db/sync-helper.ts +++ b/packages/node-core/src/db/sync-helper.ts @@ -16,6 +16,8 @@ import { Utils, } from '@subql/x-sequelize'; import {ModelAttributeColumnReferencesOptions, ModelIndexesOptions} from '@subql/x-sequelize/types/model'; +import {MultiChainRewindEvent} from '../events'; +import {MultiChainRewindStatus} from '../indexer'; import {EnumType} from '../utils'; import {formatAttributes, generateIndexName, modelToTableName} from './sequelizeUtil'; // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -297,6 +299,55 @@ export function createSchemaTriggerFunction(schema: string): string { $$ LANGUAGE plpgsql;`; } +export function createRewindTrigger(schema: string): string { + const triggerName = hashName(schema, 'rewind_trigger', '_global'); + + return ` + CREATE TRIGGER "${triggerName}" + AFTER INSERT OR UPDATE OR DELETE + ON "${schema}"."_global" + FOR EACH ROW + EXECUTE FUNCTION "${schema}".rewind_notification(); + `; +} + +export function createRewindTriggerFunction(schema: string): string { + const triggerName = hashName(schema, 'rewind_trigger', '_global'); + + return ` + CREATE OR REPLACE FUNCTION "${schema}".rewind_notification() + RETURNS trigger AS $$ + BEGIN + IF TG_OP = 'INSERT' THEN + PERFORM pg_notify( + '${triggerName}', + format('{"event":"%s","chainId":"%s"}', '${MultiChainRewindEvent.Rewind}', NEW."chainId") + ); + ELSIF TG_OP = 'UPDATE' THEN + IF NEW.status = '${MultiChainRewindStatus.Complete}' THEN + PERFORM pg_notify( + '${triggerName}', + format('{"event":"%s","chainId":"%s"}', '${MultiChainRewindEvent.RewindComplete}', NEW."chainId") + ); + ELSIF NEW."rewindTimestamp" < OLD."rewindTimestamp" THEN + PERFORM pg_notify( + '${triggerName}', + format('{"event":"%s","chainId":"%s"}', '${MultiChainRewindEvent.RewindTimestampDecreased}', NEW."chainId") + ); + END IF; + ELSIF TG_OP = 'DELETE' THEN + PERFORM pg_notify( + '${triggerName}', + format('{"event":"%s","chainId":"%s"}', '${MultiChainRewindEvent.FullyRewind}', OLD."chainId") + ); + END IF; + + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + `; +} + export function getExistedIndexesQuery(schema: string): string { return `SELECT indexname FROM pg_indexes WHERE schemaname = '${schema}'`; } @@ -702,3 +753,7 @@ create or replace function ${escapedName(schema, functionName)}(search text) export const commentSearchFunctionQuery = (schema: string, table: string): Query => commentOnFunction(schema, searchFunctionName(schema, table), `@name search_${table}`); + +export const tableExistsQuery = (schema: string): string => { + return `SELECT table_name FROM information_schema.tables where table_schema='${schema}'`; +}; diff --git a/packages/node-core/src/events.ts b/packages/node-core/src/events.ts index 63900a0de4..87fd94e1bf 100644 --- a/packages/node-core/src/events.ts +++ b/packages/node-core/src/events.ts @@ -29,6 +29,13 @@ export enum PoiEvent { PoiTarget = 'poi_target', } +export enum MultiChainRewindEvent { + Rewind = 'rewind', + RewindComplete = 'rewind_complete', + RewindTimestampDecreased = 'timestamp_decreased', + FullyRewind = 'fully_rewind', +} + export interface RewindPayload { success: boolean; height: number; diff --git a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts index 1ca33a13e2..5ca1d820b1 100644 --- a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts @@ -4,6 +4,7 @@ import assert from 'assert'; import {EventEmitter2, OnEvent} from '@nestjs/event-emitter'; +import {ICoreBlockchainService} from '@subql/node-core/blockchain.service'; import {hexToU8a, u8aEq} from '@subql/utils'; import {Transaction} from '@subql/x-sequelize'; import {NodeConfig, IProjectUpgradeService} from '../../configure'; @@ -11,6 +12,7 @@ import {AdminEvent, IndexerEvent, PoiEvent, TargetBlockPayload} from '../../even import {getLogger} from '../../logger'; import {exitWithError, monitorCreateBlockFork, monitorCreateBlockStart, monitorWrite} from '../../process'; import {AutoQueue, IQueue, isTaskFlushedError, mainThreadOnly} from '../../utils'; +import {MultiChainRewindService} from '../multiChainRewind.service'; import {PoiBlock, PoiSyncService} from '../poi'; import {StoreService} from '../store.service'; import {IStoreModelProvider, StoreCacheService} from '../storeModelProvider'; @@ -34,6 +36,7 @@ export interface IBlockDispatcher { latestBufferedHeight: number; batchSize: number; + setLatestProcessedHeight(height: number): void; // Remove all enqueued blocks, used when a dynamic ds is created flushQueue(height: number): void; } @@ -65,8 +68,10 @@ export abstract class BaseBlockDispatcher implements IB private projectUpgradeService: IProjectUpgradeService, protected queue: Q, protected storeService: StoreService, - protected storeModelProvider: IStoreModelProvider, - private poiSyncService: PoiSyncService + private storeModelProvider: IStoreModelProvider, + private poiSyncService: PoiSyncService, + private blockChainService: ICoreBlockchainService, + private multiChainRewindService: MultiChainRewindService ) {} abstract enqueueBlocks(heights: (IBlock | number)[], latestBufferHeight?: number): void | Promise; @@ -170,7 +175,8 @@ export abstract class BaseBlockDispatcher implements IB const {blockHash, blockHeight: height} = header; const {dynamicDsCreated, reindexBlockHeader: processReindexBlockHeader} = processBlockResponse; // Rewind height received from admin api have higher priority than processed reindexBlockHeight - const reindexBlockHeader = this._pendingRewindHeader ?? processReindexBlockHeader; + const reindexBlockHeader = + this._pendingRewindHeader ?? this.multiChainRewindService.waitRewindHeader ?? processReindexBlockHeader; monitorWrite(`Finished block ${height}`); if (reindexBlockHeader !== null && reindexBlockHeader !== undefined) { try { @@ -226,7 +232,7 @@ export abstract class BaseBlockDispatcher implements IB } @OnEvent(AdminEvent.rewindTarget) - handleAdminRewind(blockPayload: TargetBlockPayload): void { + async handleAdminRewind(blockPayload: TargetBlockPayload) { if (this.currentProcessingHeight < blockPayload.height) { // this will throw back to admin controller, will NOT lead current indexing exit throw new Error( @@ -235,9 +241,7 @@ export abstract class BaseBlockDispatcher implements IB } // TODO can this work without - this._pendingRewindHeader = { - blockHeight: Number(blockPayload.height), - } as Header; + this._pendingRewindHeader = await this.blockChainService.getHeaderForHeight(blockPayload.height); const message = `Received admin command to rewind to block ${blockPayload.height}`; monitorWrite(`***** [ADMIN] ${message}`); logger.warn(message); diff --git a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.spec.ts b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.spec.ts index 9c7cf7f820..8bd140967f 100644 --- a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.spec.ts +++ b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.spec.ts @@ -7,6 +7,7 @@ import {isTaskFlushedError, TaskFlushedError} from '@subql/node-core/utils'; import {NodeConfig} from '../../configure'; import {exitWithError} from '../../process'; import {ConnectionPoolStateManager} from '../connectionPoolState.manager'; +import {MultiChainRewindService} from '../multiChainRewind.service'; import {Header, IBlock} from '../types'; import {BaseWorkerService, BlockUnavailableError, IBaseIndexerWorker, WorkerStatusResponse} from '../worker'; import {IBlockDispatcher, ProcessBlockResponse} from './base-block-dispatcher'; @@ -82,7 +83,7 @@ const blockchainService = { }, fetchBlockWorker: (worker: TestWorker, height: number) => worker.fetchBlock(height), } as any; - +const multichainRewindService = {waitRewindHeader: undefined} as MultiChainRewindService; class TestWorkerService extends BaseWorkerService { async fetchChainBlock(height: number): Promise> { return fetchBlocksFunction(height); @@ -210,7 +211,8 @@ describe.each<[string, () => IBlockDispatcher]>([ id: 'id', } as any, // ISubqueryProject blockchainService, - indexerManager + indexerManager, + multichainRewindService ); }, ], @@ -235,6 +237,7 @@ describe.each<[string, () => IBlockDispatcher]>([ id: 'id', } as any, // ISubqueryProject blockchainService, + multichainRewindService, '', // workerPath [] // workerFns ); @@ -573,8 +576,6 @@ describe.each<[string, () => IBlockDispatcher]>([ it('should call _onDynamicDsCreated when dynamic datasource is created', async () => { dynamicDsCreatedBlock = [7]; const onDynamicDsCreatedSpy = jest.spyOn(blockDispatcher as any, '_onDynamicDsCreated'); - // await Promise.all([ - // ]); await blockDispatcher.enqueueBlocks([7], 7); await blockDispatcher.enqueueBlocks([8], 8); await delay(2); @@ -584,4 +585,28 @@ describe.each<[string, () => IBlockDispatcher]>([ expect(isTaskFlushedError).toHaveBeenCalledWith(new TaskFlushedError(queueName)); }); }); + + describe('Multi chain rewind', () => { + beforeEach(() => { + multichainRewindService.waitRewindHeader = { + blockHeight: 10, + blockHash: '0xhash', + parentHash: '0xparenthash', + timestamp: new Date(), + }; + jest.clearAllMocks(); + }); + afterAll(() => { + multichainRewindService.waitRewindHeader = undefined; + }); + it('Data before the rollback height is reached can be written normally', async () => { + await blockDispatcher.enqueueBlocks([7, 8, 9], 9); + await delay(2); + expect(projectService.reindex).toHaveBeenCalledTimes(0); + + await blockDispatcher.enqueueBlocks([10], 10); + await delay(2); + expect(projectService.reindex).toHaveBeenCalledWith(multichainRewindService.waitRewindHeader); + }); + }); }); diff --git a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts index f23a5c2bd9..1671f192a0 100644 --- a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts @@ -9,7 +9,7 @@ import {IBlockchainService} from '../../blockchain.service'; import {NodeConfig} from '../../configure'; import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service'; import {IndexerEvent} from '../../events'; -import {getBlockHeight, IBlock, PoiSyncService, StoreService} from '../../indexer'; +import {getBlockHeight, IBlock, MultiChainRewindService, PoiSyncService, StoreService} from '../../indexer'; import {getLogger} from '../../logger'; import {exitWithError, monitorWrite} from '../../process'; import {profilerWrap} from '../../profiler'; @@ -46,7 +46,8 @@ export class BlockDispatcher poiSyncService: PoiSyncService, project: ISubqueryProject, blockchainService: IBlockchainService, - private indexerManager: IIndexerManager + private indexerManager: IIndexerManager, + multiChainRewindService: MultiChainRewindService ) { super( nodeConfig, @@ -57,7 +58,9 @@ export class BlockDispatcher new Queue(nodeConfig.batchSize * 3), storeService, storeModelProvider, - poiSyncService + poiSyncService, + blockchainService, + multiChainRewindService ); this.processQueue = new AutoQueue(nodeConfig.batchSize * 3, 1, nodeConfig.timeout, 'Process'); this.fetchQueue = new RampQueue( diff --git a/packages/node-core/src/indexer/blockDispatcher/factory.ts b/packages/node-core/src/indexer/blockDispatcher/factory.ts index 2c02860d41..29b2a360f8 100644 --- a/packages/node-core/src/indexer/blockDispatcher/factory.ts +++ b/packages/node-core/src/indexer/blockDispatcher/factory.ts @@ -10,6 +10,7 @@ import {ConnectionPoolStateManager} from '../connectionPoolState.manager'; import {DynamicDsService} from '../dynamic-ds.service'; import {InMemoryCacheService} from '../inMemoryCache.service'; import {MonitorService} from '../monitor.service'; +import {MultiChainRewindService} from '../multiChainRewind.service'; import {PoiSyncService} from '../poi'; import {ProjectService} from '../project.service'; import {StoreService} from '../store.service'; @@ -41,6 +42,7 @@ export const blockDispatcherFactory = connectionPoolState: ConnectionPoolStateManager, blockchainService: IBlockchainService, indexerManager: IIndexerManager, + multiChainRewindService: MultiChainRewindService, monitorService?: MonitorService ): IBlockDispatcher => { return nodeConfig.workers @@ -58,6 +60,7 @@ export const blockDispatcherFactory = connectionPoolState, project, blockchainService, + multiChainRewindService, workerPath, workerFns, monitorService @@ -72,6 +75,7 @@ export const blockDispatcherFactory = poiSyncService, project, blockchainService, - indexerManager + indexerManager, + multiChainRewindService ); }; diff --git a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts index f30b4611de..7e72de0d1c 100644 --- a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts @@ -19,6 +19,7 @@ import { IBaseIndexerWorker, IBlock, InMemoryCacheService, + MultiChainRewindService, PoiSyncService, TerminateableWorker, UnfinalizedBlocksService, @@ -74,6 +75,7 @@ export class WorkerBlockDispatcher< connectionPoolState: ConnectionPoolStateManager, @Inject('ISubqueryProject') project: ISubqueryProject, @Inject('IBlockchainService') private blockchainService: IBlockchainService, + multiChainRewindService: MultiChainRewindService, workerPath: string, workerFns: Parameters>[1], monitorService?: MonitorServiceInterface, @@ -88,7 +90,9 @@ export class WorkerBlockDispatcher< initAutoQueue(nodeConfig.workers, nodeConfig.batchSize, nodeConfig.timeout, 'Fetch'), storeService, storeModelProvider, - poiSyncService + poiSyncService, + blockchainService, + multiChainRewindService ); this.processQueue = initAutoQueue(nodeConfig.workers, nodeConfig.batchSize, nodeConfig.timeout, 'Process'); diff --git a/packages/node-core/src/indexer/entities/GlobalData.entity.ts b/packages/node-core/src/indexer/entities/GlobalData.entity.ts new file mode 100644 index 0000000000..f200602b41 --- /dev/null +++ b/packages/node-core/src/indexer/entities/GlobalData.entity.ts @@ -0,0 +1,53 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {BuildOptions, DataTypes, Model, Sequelize} from '@subql/x-sequelize'; + +export enum MultiChainRewindStatus { + /** Indicates a normal state. Each chain needs to register before starting to sync blocks. */ + Normal = 'normal', + /** The rewind task has been completed. The rollback height can be determined using rewindTimestamp. */ + Complete = 'complete', + /** Unprocessed rewind task. The rollback height can be determined using rewindTimestamp. */ + Incomplete = 'incomplete', +} + +export interface GlobalData { + chainId: string; + rewindTimestamp: Date; + status: MultiChainRewindStatus; + initiator: boolean; +} + +interface GlobalDataEntity extends Model, GlobalData {} + +export type GlobalDataRepo = typeof Model & { + new (values?: unknown, options?: BuildOptions): GlobalDataEntity; +}; + +export function GlobalDataFactory(sequelize: Sequelize, schema: string): GlobalDataRepo { + const tableName = '_global'; + + return sequelize.define( + tableName, + { + chainId: { + type: DataTypes.STRING, + primaryKey: true, + }, + rewindTimestamp: { + type: DataTypes.DATE, + defaultValue: new Date(0), + }, + status: { + type: DataTypes.ENUM, + values: [MultiChainRewindStatus.Complete, MultiChainRewindStatus.Incomplete], + }, + initiator: { + type: DataTypes.BOOLEAN, + defaultValue: false, + }, + }, + {freezeTableName: true, schema: schema} + ); +} diff --git a/packages/node-core/src/indexer/entities/index.ts b/packages/node-core/src/indexer/entities/index.ts index cf880d9f41..ef1988522a 100644 --- a/packages/node-core/src/indexer/entities/index.ts +++ b/packages/node-core/src/indexer/entities/index.ts @@ -3,3 +3,4 @@ export * from './Poi.entity'; export * from './Metadata.entity'; +export * from './GlobalData.entity'; diff --git a/packages/node-core/src/indexer/fetch.service.spec.ts b/packages/node-core/src/indexer/fetch.service.spec.ts index 247333d6fd..5accba0a6b 100644 --- a/packages/node-core/src/indexer/fetch.service.spec.ts +++ b/packages/node-core/src/indexer/fetch.service.spec.ts @@ -1,9 +1,9 @@ // Copyright 2020-2025 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { SchedulerRegistry } from '@nestjs/schedule'; -import { BaseCustomDataSource, BaseDataSource, BaseHandler, BaseMapping, DictionaryQueryEntry } from '@subql/types-core'; +import {EventEmitter2} from '@nestjs/event-emitter'; +import {SchedulerRegistry} from '@nestjs/schedule'; +import {BaseCustomDataSource, BaseDataSource, BaseHandler, BaseMapping, DictionaryQueryEntry} from '@subql/types-core'; import { UnfinalizedBlocksService, BlockDispatcher, @@ -18,10 +18,14 @@ import { DatasourceParams, IBaseIndexerWorker, BypassBlocks, + MultiChainRewindService, + MultiChainRewindStatus, + reindex, + getLogger, } from '../'; -import { BlockHeightMap } from '../utils/blockHeightMap'; -import { DictionaryService } from './dictionary/dictionary.service'; -import { FetchService } from './fetch.service'; +import {BlockHeightMap} from '../utils/blockHeightMap'; +import {DictionaryService} from './dictionary/dictionary.service'; +import {FetchService} from './fetch.service'; const CHAIN_INTERVAL = 100; // 100ms @@ -73,7 +77,7 @@ class TestBlockchainService implements IBlockchainService { fetchBlockWorker( worker: IBaseIndexerWorker, blockNum: number, - context: { workers: IBaseIndexerWorker[] } + context: {workers: IBaseIndexerWorker[]} ): Promise
{ throw new Error('Method not implemented.'); } @@ -82,7 +86,7 @@ class TestBlockchainService implements IBlockchainService { blockHeight: this.finalizedHeight, blockHash: '0xxx', parentHash: '0xxx', - timestamp: new Date() + timestamp: new Date(), }); } async getBestHeight(): Promise { @@ -124,9 +128,13 @@ class TestBlockchainService implements IBlockchainService { throw new Error('Method not implemented.'); } // eslint-disable-next-line @typescript-eslint/promise-function-async - getBlockTimestamp(height: number): Promise { + getBlockTimestamp(height: number): Promise { throw new Error('Method not implemented.'); } + + async getRequiredHeaderForHeight(height: number): Promise
{ + return (await this.getHeaderForHeight(height)) as any; + } } const nodeConfig = new NodeConfig({ @@ -161,7 +169,7 @@ function mockModuloDs(startBlock: number, endBlock: number, modulo: number): Bas { kind: 'mock/Handler', handler: 'mockFunction', - filter: { modulo: modulo }, + filter: {modulo: modulo}, }, ], }, @@ -208,6 +216,14 @@ const getBlockDispatcher = () => { return inst; }; +jest.mock('../utils/promise', () => { + const original = jest.requireActual('../utils/promise'); + return { + ...original, + delay: jest.fn(original.delay), + }; +}); + describe('Fetch Service', () => { let fetchService: TestFetchService; let blockDispatcher: IBlockDispatcher; @@ -215,6 +231,8 @@ describe('Fetch Service', () => { let dataSources: BaseDataSource[]; let unfinalizedBlocksService: UnfinalizedBlocksService; let blockchainService: TestBlockchainService; + const multichainRewindService: MultiChainRewindService = {} as MultiChainRewindService; + let projectService: IProjectService; let spyOnEnqueueSequential: jest.SpyInstance< void | Promise, @@ -231,7 +249,7 @@ describe('Fetch Service', () => { const eventEmitter = new EventEmitter2(); const schedulerRegistry = new SchedulerRegistry(); - const projectService = { + projectService = { getStartBlockFromDataSources: jest.fn(() => Math.min(...dataSources.map((ds) => ds.startBlock ?? 0))), getAllDataSources: jest.fn(() => dataSources), getDataSourcesMap: jest.fn(() => { @@ -246,6 +264,7 @@ describe('Fetch Service', () => { return new BlockHeightMap(x); }), bypassBlocks: [], + reindex: jest.fn(), } as any as IProjectService; blockDispatcher = getBlockDispatcher(); @@ -268,7 +287,8 @@ describe('Fetch Service', () => { set: jest.fn(), }, } as any, - blockchainService + blockchainService, + multichainRewindService ); spyOnEnqueueSequential = jest.spyOn(fetchService as any, 'enqueueSequential') as any; @@ -302,20 +322,20 @@ describe('Fetch Service', () => { const moduloBlockHeightMap = new BlockHeightMap( new Map([ - [1, [{ ...mockModuloDs(1, 100, 20), startBlock: 1, endBlock: 100 }]], + [1, [{...mockModuloDs(1, 100, 20), startBlock: 1, endBlock: 100}]], [ 101, // empty gap for discontinuous block [], ], - [201, [{ ...mockModuloDs(201, 500, 30), startBlock: 201, endBlock: 500 }]], + [201, [{...mockModuloDs(201, 500, 30), startBlock: 201, endBlock: 500}]], // to infinite - [500, [{ ...mockModuloDs(500, Number.MAX_SAFE_INTEGER, 99), startBlock: 500 }]], + [500, [{...mockModuloDs(500, Number.MAX_SAFE_INTEGER, 99), startBlock: 500}]], // multiple ds [ 600, [ - { ...mockModuloDs(500, 800, 99), startBlock: 600, endBlock: 800 }, - { ...mockModuloDs(700, Number.MAX_SAFE_INTEGER, 101), startBlock: 700 }, + {...mockModuloDs(500, 800, 99), startBlock: 600, endBlock: 800}, + {...mockModuloDs(700, Number.MAX_SAFE_INTEGER, 101), startBlock: 700}, ], ], ]) @@ -333,43 +353,43 @@ describe('Fetch Service', () => { [ 1, [ - { ...mockDs, startBlock: 1, endBlock: 300 }, - { ...mockDs, startBlock: 1, endBlock: 100 }, + {...mockDs, startBlock: 1, endBlock: 300}, + {...mockDs, startBlock: 1, endBlock: 100}, ], ], [ 10, [ - { ...mockDs, startBlock: 1, endBlock: 300 }, - { ...mockDs, startBlock: 1, endBlock: 100 }, - { ...mockDs, startBlock: 10, endBlock: 20 }, + {...mockDs, startBlock: 1, endBlock: 300}, + {...mockDs, startBlock: 1, endBlock: 100}, + {...mockDs, startBlock: 10, endBlock: 20}, ], ], [ 21, [ - { ...mockDs, startBlock: 1, endBlock: 300 }, - { ...mockDs, startBlock: 1, endBlock: 100 }, + {...mockDs, startBlock: 1, endBlock: 300}, + {...mockDs, startBlock: 1, endBlock: 100}, ], ], [ 50, [ - { ...mockDs, startBlock: 1, endBlock: 300 }, - { ...mockDs, startBlock: 1, endBlock: 100 }, - { ...mockDs, startBlock: 50, endBlock: 200 }, + {...mockDs, startBlock: 1, endBlock: 300}, + {...mockDs, startBlock: 1, endBlock: 100}, + {...mockDs, startBlock: 50, endBlock: 200}, ], ], [ 101, [ - { ...mockDs, startBlock: 1, endBlock: 300 }, - { ...mockDs, startBlock: 50, endBlock: 200 }, + {...mockDs, startBlock: 1, endBlock: 300}, + {...mockDs, startBlock: 50, endBlock: 200}, ], ], - [201, [{ ...mockDs, startBlock: 1, endBlock: 300 }]], + [201, [{...mockDs, startBlock: 1, endBlock: 300}]], [301, []], - [500, [{ ...mockDs, startBlock: 500 }]], + [500, [{...mockDs, startBlock: 500}]], ]) ) ); @@ -505,7 +525,7 @@ describe('Fetch Service', () => { { kind: 'mock/BlockHandler', handler: 'mockFunction', - filter: { modulo: 3 }, + filter: {modulo: 3}, }, { kind: 'mock/CallHandler', @@ -638,7 +658,7 @@ describe('Fetch Service', () => { it('enqueues modulo blocks with furture dataSources', async () => { fetchService.mockGetModulos([3]); - dataSources.push({ ...mockDs, startBlock: 20 }); + dataSources.push({...mockDs, startBlock: 20}); await fetchService.init(1); @@ -651,7 +671,7 @@ describe('Fetch Service', () => { it('at the end of modulo block filter, enqueue END should be min of data source range end height and api last height', async () => { // So this will skip next data source fetchService.mockGetModulos([10]); - dataSources.push({ ...mockDs, startBlock: 200 }); + dataSources.push({...mockDs, startBlock: 200}); await fetchService.init(191); expect((fetchService as any).useDictionary).toBeFalsy(); @@ -796,4 +816,13 @@ describe('Fetch Service', () => { expect(spyOnEnqueueSequential).toHaveBeenCalledTimes(1); expect((fetchService as any).blockDispatcher.latestBufferedHeight).toEqual(910); }, 10000); + + it('MultiChainRewindStatus.Complete message', async () => { + const logger = getLogger('FetchService'); + const consoleSpy = jest.spyOn(logger, 'info'); + + (multichainRewindService as any).status = MultiChainRewindStatus.Complete; + await fetchService.init(10); + expect(consoleSpy).toHaveBeenCalledWith(expect.stringMatching(/Waiting for all chains to complete rewind/)); + }); }); diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index 62a25c3da3..b40a58775b 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -3,7 +3,7 @@ import assert from 'assert'; import {Inject, Injectable, OnApplicationShutdown} from '@nestjs/common'; -import {EventEmitter2} from '@nestjs/event-emitter'; +import {EventEmitter2, OnEvent} from '@nestjs/event-emitter'; import {SchedulerRegistry} from '@nestjs/schedule'; import {BaseDataSource} from '@subql/types-core'; import {range} from 'lodash'; @@ -16,12 +16,15 @@ import {IBlockDispatcher} from './blockDispatcher'; import {mergeNumAndBlocksToNums} from './dictionary'; import {DictionaryService} from './dictionary/dictionary.service'; import {mergeNumAndBlocks} from './dictionary/utils'; +import {MultiChainRewindStatus} from './entities'; +import {MultiChainRewindService} from './multiChainRewind.service'; import {IStoreModelProvider} from './storeModelProvider'; import {BypassBlocks, IBlock, IProjectService} from './types'; import {IUnfinalizedBlocksServiceUtil} from './unfinalizedBlocks.service'; const logger = getLogger('FetchService'); - +// Unit is ms +const multiChainRewindDelay = 3; @Injectable() export class FetchService, FB> implements OnApplicationShutdown @@ -39,7 +42,8 @@ export class FetchService + @Inject('IBlockchainService') private blockchainSevice: IBlockchainService, + private multiChainRewindService: MultiChainRewindService ) {} private get latestBestHeight(): number { @@ -197,6 +201,16 @@ export class FetchService { + const rewindTimestamp = (1740100000 + height) * 1000; + return {rewindTimestamp, rewindDate: new Date(rewindTimestamp)}; +}; + +const testSchemaName = 'test_multi_chain_rewind'; +const schema = buildSchemaFromString(` + type Account @entity { + id: ID! # Account address + balance: Int + } +`); + +async function createChainProject(chainId: string, mockBlockchainService: any, sequelize: Sequelize) { + const nodeConfig = new NodeConfig({ + subquery: 'test', + dbSchema: testSchemaName, + proofOfIndex: true, + enableCache: false, + multiChain: true, + }); + const project = {network: {chainId}, schema} as any; + const dbModel = new PlainStoreModelService(sequelize, nodeConfig); + const storeService = new StoreService(sequelize, nodeConfig, dbModel, project); + await storeService.initCoreTables(testSchemaName); + await storeService.init(testSchemaName); + await storeService.modelProvider.metadata.set('chain', chainId); + await storeService.modelProvider.metadata.set('startHeight', 1); + await storeService.modelProvider.metadata.set('lastProcessedHeight', 10000); + await storeService.modelProvider.metadata.set( + 'lastProcessedBlockTimestamp', + genBlockTimestamp(10000).rewindTimestamp + ); + + const multiChainRewindService = new MultiChainRewindService( + nodeConfig, + sequelize, + storeService, + mockBlockchainService + ); + + const reindex = jest.fn(); + + // Initialize the service + await multiChainRewindService.init(chainId, reindex); + + return {sequelize, storeService, multiChainRewindService}; +} + +describe('MultiChain Rewind Service', () => { + const notifyHandleDelay = 0.5; + const lockInfoSql = `SELECT "chainId","rewindTimestamp","status" FROM "${testSchemaName}"."_global";`; + + const chainId1 = 'chain1'; + const chainId2 = 'chain2'; + let sequelize: Sequelize; + let sequelize1: Sequelize; + let sequelize2: Sequelize; + let storeService1: StoreService, multiChainRewindService1: MultiChainRewindService; + let storeService2: StoreService, multiChainRewindService2: MultiChainRewindService; + + // Mock IBlockchainService + const mockBlockchainService = { + getHeaderForHeight: jest.fn((height: number) => ({ + blockHeight: height, + timestamp: genBlockTimestamp(height).rewindDate, + blockHash: `hash${height}`, + parentHash: height > 0 ? `hash${height - 1}` : '', + })), + }; + + beforeEach(async () => { + sequelize = new Sequelize( + `postgresql://${option.username}:${option.password}@${option.host}:${option.port}/${option.database}`, + option + ); + await sequelize.authenticate(); + await sequelize.query(`CREATE SCHEMA ${testSchemaName};`); + + sequelize1 = new Sequelize( + `postgresql://${option.username}:${option.password}@${option.host}:${option.port}/${option.database}`, + option + ); + const projectChain1 = await createChainProject(chainId1, mockBlockchainService, sequelize1); + storeService1 = projectChain1.storeService; + multiChainRewindService1 = projectChain1.multiChainRewindService; + + sequelize2 = new Sequelize( + `postgresql://${option.username}:${option.password}@${option.host}:${option.port}/${option.database}`, + option + ); + const projectChain2 = await createChainProject(chainId2, mockBlockchainService, sequelize2); + storeService2 = projectChain2.storeService; + multiChainRewindService2 = projectChain2.multiChainRewindService; + }); + + afterEach(async () => { + await multiChainRewindService1.onApplicationShutdown(); + await multiChainRewindService2.onApplicationShutdown(); + await sequelize.query(`DROP SCHEMA ${testSchemaName} CASCADE;`); + await sequelize.close(); + }); + + describe('acquireGlobalRewindLock', () => { + it('setGlobalRewindLock should set the lock timestamp', async () => { + const {rewindDate} = genBlockTimestamp(5); + const result = await multiChainRewindService1.acquireGlobalRewindLock(rewindDate); + expect(result).toEqual(true); + + const res = await sequelize.query(lockInfoSql, {type: QueryTypes.SELECT}); + expect(res).toEqual( + expect.arrayContaining([ + {chainId: chainId1, rewindTimestamp: rewindDate, status: MultiChainRewindStatus.Incomplete}, + {chainId: chainId2, rewindTimestamp: rewindDate, status: MultiChainRewindStatus.Incomplete}, + ]) + ); + }); + + it('A rewind can be moved further back when there is already one in progress.', async () => { + const {rewindDate: rewindDate10} = genBlockTimestamp(10); + const {rewindDate: rewindDate5} = genBlockTimestamp(5); + + await expect(multiChainRewindService1.acquireGlobalRewindLock(rewindDate10)).resolves.toBe(true); + let res = await sequelize.query(lockInfoSql, {type: QueryTypes.SELECT}); + expect(res).toEqual( + expect.arrayContaining([ + {chainId: chainId1, rewindTimestamp: rewindDate10, status: MultiChainRewindStatus.Incomplete}, + {chainId: chainId2, rewindTimestamp: rewindDate10, status: MultiChainRewindStatus.Incomplete}, + ]) + ); + + await expect(multiChainRewindService1.acquireGlobalRewindLock(rewindDate5)).resolves.toBe(true); + res = await sequelize.query(lockInfoSql, {type: QueryTypes.SELECT}); + expect(res).toEqual( + expect.arrayContaining([ + {chainId: chainId1, rewindTimestamp: rewindDate5, status: MultiChainRewindStatus.Incomplete}, + {chainId: chainId2, rewindTimestamp: rewindDate5, status: MultiChainRewindStatus.Incomplete}, + ]) + ); + }); + + it('Not allowed to lock further backward', async () => { + const {rewindDate: rewindDate10} = genBlockTimestamp(10); + const {rewindDate: rewindDate5} = genBlockTimestamp(5); + + await expect(multiChainRewindService1.acquireGlobalRewindLock(rewindDate5)).resolves.toBe(true); + let res = await sequelize.query(lockInfoSql, {type: QueryTypes.SELECT}); + expect(res).toEqual( + expect.arrayContaining([ + {chainId: chainId1, rewindTimestamp: rewindDate5, status: MultiChainRewindStatus.Incomplete}, + {chainId: chainId2, rewindTimestamp: rewindDate5, status: MultiChainRewindStatus.Incomplete}, + ]) + ); + + await expect(multiChainRewindService1.acquireGlobalRewindLock(rewindDate10)).resolves.toBe(false); + res = await sequelize.query(lockInfoSql, {type: QueryTypes.SELECT}); + expect(res).toEqual( + expect.arrayContaining([ + {chainId: chainId1, rewindTimestamp: rewindDate5, status: MultiChainRewindStatus.Incomplete}, + {chainId: chainId2, rewindTimestamp: rewindDate5, status: MultiChainRewindStatus.Incomplete}, + ]) + ); + }); + + it('Only one can successfully acquire the lock.', async () => { + const {rewindDate: rewindDate1} = genBlockTimestamp(5); + const {rewindDate: rewindDate2} = genBlockTimestamp(5); + + const results = await Promise.allSettled([ + multiChainRewindService1.acquireGlobalRewindLock(rewindDate1), + multiChainRewindService2.acquireGlobalRewindLock(rewindDate2), + ]); + + const success = results.filter((result) => result.status === 'fulfilled'); + const failures = results.filter((result) => result.status === 'rejected'); + + expect(success.length).toBe(1); + expect(failures.length).toBe(1); + }); + }); + + describe('releaseChainRewindLock', () => { + const {rewindDate} = genBlockTimestamp(5); + beforeEach(async () => { + await multiChainRewindService1.acquireGlobalRewindLock(rewindDate); + }); + it('Same height as the target, release lock', async () => { + const tx = await sequelize1.transaction(); + const remaining = await multiChainRewindService1.releaseChainRewindLock(tx, rewindDate); + await tx.commit(); + + expect(remaining).toBe(1); + }); + + it('Different height from the target, release failed.', async () => { + const tx = await sequelize1.transaction(); + await expect(multiChainRewindService1.releaseChainRewindLock(tx, new Date())).rejects.toThrow(); + await tx.rollback(); + }); + + it('The height of rewindLock is greater than or equal to lastProcessHeight, it can be forcibly released.', async () => { + const {rewindDate: allowLastDate} = genBlockTimestamp(5); + + const tx = await sequelize1.transaction(); + const remaining = await multiChainRewindService1.releaseChainRewindLock(tx, rewindDate, allowLastDate); + await tx.commit(); + expect(remaining).toBe(1); + }); + + it('rewindLock is less than lastProcessHeight, it cannot be forcibly released.', async () => { + const {rewindDate: allowLastDate} = genBlockTimestamp(6); + const tx = await sequelize1.transaction(); + await expect(multiChainRewindService1.releaseChainRewindLock(tx, rewindDate, allowLastDate)).rejects.toThrow(); + await tx.rollback(); + }); + }); + + describe('The situation where notifyHandle controls the state', () => { + it('A chain rollback has been completed.', async () => { + const {rewindDate} = genBlockTimestamp(5); + await multiChainRewindService1.acquireGlobalRewindLock(rewindDate); + await delay(notifyHandleDelay); + expect(multiChainRewindService1.status).toBe(MultiChainRewindStatus.Incomplete); + + const tx = await sequelize1.transaction(); + const remaining = await multiChainRewindService1.releaseChainRewindLock(tx, rewindDate); + await tx.commit(); + expect(multiChainRewindService1.status).toBe(MultiChainRewindStatus.Complete); + + await delay(notifyHandleDelay); + expect(remaining).toBe(1); + expect(multiChainRewindService2.status).toEqual(MultiChainRewindStatus.Incomplete); + expect(multiChainRewindService2.waitRewindHeader).toEqual({ + blockHash: 'hash5', + blockHeight: 5, + parentHash: 'hash4', + timestamp: rewindDate, + }); + }); + + it('should handle multiple concurrent rewind requests', async () => { + const {rewindDate: rewindDate1} = genBlockTimestamp(5); + const {rewindDate: rewindDate2} = genBlockTimestamp(3); // Earlier timestamp + + await multiChainRewindService1.acquireGlobalRewindLock(rewindDate1); + await delay(notifyHandleDelay); + expect(multiChainRewindService1.status).toBe(MultiChainRewindStatus.Incomplete); + + await multiChainRewindService2.acquireGlobalRewindLock(rewindDate2); + await delay(notifyHandleDelay); + expect(multiChainRewindService2.status).toBe(MultiChainRewindStatus.Incomplete); + + // The rewindTimestamp of the later chain will overwrite that of the earlier chain. + const res = await sequelize.query(lockInfoSql, {type: QueryTypes.SELECT}); + expect(res).toEqual([ + {chainId: chainId1, rewindTimestamp: rewindDate2, status: MultiChainRewindStatus.Incomplete}, + {chainId: chainId2, rewindTimestamp: rewindDate2, status: MultiChainRewindStatus.Incomplete}, + ]); + + // Rollback to rewindDate1 is not allowed because it has already been overwritten. + let remaining = 2; + let tx = await sequelize1.transaction(); + await expect(multiChainRewindService1.releaseChainRewindLock(tx, rewindDate1)).rejects.toThrow(); + await tx.rollback(); + expect(multiChainRewindService1.status).toBe(MultiChainRewindStatus.Incomplete); + + // Rollback to rewindDate2 is allowed. + tx = await sequelize1.transaction(); + remaining = await multiChainRewindService1.releaseChainRewindLock(tx, rewindDate2); + await tx.commit(); + expect(remaining).toBe(1); + expect(multiChainRewindService1.status).toBe(MultiChainRewindStatus.Complete); + + // Chain2 has started rolling back. + tx = await sequelize2.transaction(); + remaining = await multiChainRewindService2.releaseChainRewindLock(tx, rewindDate2); + await tx.commit(); + expect(multiChainRewindService2.status).toBe(MultiChainRewindStatus.Complete); + + await delay(notifyHandleDelay); + // The last chain rollback is complete, all chains have finished rolling back. + expect(remaining).toBe(0); + expect(multiChainRewindService1.status).toBe(MultiChainRewindStatus.Normal); + expect(multiChainRewindService2.status).toBe(MultiChainRewindStatus.Normal); + }); + + it('should handle binary search edge cases for timestamp matching', async () => { + // Mock the binary search to simulate a large gap between blocks + const mockGetHeaderByBinarySearch = jest.spyOn(multiChainRewindService1 as any, 'getHeaderByBinarySearch'); + mockGetHeaderByBinarySearch.mockResolvedValueOnce({ + blockHeight: 4, + timestamp: genBlockTimestamp(4).rewindDate, + blockHash: 'hash4', + parentHash: 'hash3', + }); + + const {rewindDate} = genBlockTimestamp(4.5); // Timestamp between blocks 4 and 5 + await multiChainRewindService1.acquireGlobalRewindLock(rewindDate); + await delay(notifyHandleDelay); + expect(multiChainRewindService1.status).toBe(MultiChainRewindStatus.Incomplete); + expect(mockGetHeaderByBinarySearch).toHaveBeenCalledWith(expect.any(Date)); + expect(multiChainRewindService1.waitRewindHeader).toEqual({ + blockHeight: 4, + timestamp: genBlockTimestamp(4.5).rewindDate, + blockHash: 'hash4', + parentHash: 'hash3', + }); + + const tx = await sequelize1.transaction(); + const remaining = await multiChainRewindService1.releaseChainRewindLock(tx, rewindDate); + await tx.commit(); + expect(multiChainRewindService1.status).toBe(MultiChainRewindStatus.Complete); + expect(remaining).toBe(1); + expect(multiChainRewindService1.waitRewindHeader).toBeUndefined(); + }); + }); + + describe('getHeaderByBinarySearch', () => { + beforeEach(async () => { + (multiChainRewindService1 as any).startHeight = 20; + await storeService1.modelProvider.metadata.set('lastProcessedHeight', 10000); + }); + + it('Within the already processed interval', async () => { + const {rewindDate} = genBlockTimestamp(23); + const header = await (multiChainRewindService1 as any).getHeaderByBinarySearch(rewindDate); + + expect(header).toEqual({ + blockHeight: 23, + timestamp: rewindDate, + blockHash: 'hash23', + parentHash: 'hash22', + }); + }); + + it('Not within the already processed interval', async () => { + const {rewindDate: rewindDate19} = genBlockTimestamp(19); + const {rewindDate: rewindDate20} = genBlockTimestamp(20); + const header = await (multiChainRewindService1 as any).getHeaderByBinarySearch(rewindDate19); + expect(header).toEqual({ + blockHeight: 20, + timestamp: rewindDate20, + blockHash: 'hash20', + parentHash: 'hash19', + }); + + const {rewindDate: rewindDate10001} = genBlockTimestamp(10001); + const {rewindDate: rewindDate10000} = genBlockTimestamp(10000); + const header10001 = await (multiChainRewindService1 as any).getHeaderByBinarySearch(rewindDate10001); + expect(header10001).toEqual({ + blockHeight: 10000, + timestamp: rewindDate10000, + blockHash: 'hash10000', + parentHash: 'hash9999', + }); + }); + }); + + describe('Project initialization', () => { + const reindex = jest.fn(); + let multiChainRewindService: MultiChainRewindService; + + beforeEach(async () => { + const nodeConfig = new NodeConfig({ + subquery: 'test', + dbSchema: testSchemaName, + proofOfIndex: true, + enableCache: false, + multiChain: true, + }); + const project = {network: {chainId: chainId1}, schema} as any; + const dbModel = new PlainStoreModelService(sequelize, nodeConfig); + const storeService = new StoreService(sequelize, nodeConfig, dbModel, project); + await storeService.initCoreTables(testSchemaName); + await storeService.init(testSchemaName); + + multiChainRewindService = new MultiChainRewindService( + nodeConfig, + sequelize, + storeService, + mockBlockchainService as any + ); + }); + afterEach(async () => { + await multiChainRewindService.onApplicationShutdown(); + jest.clearAllMocks(); + }); + + it('Normal startup, starting will not trigger a reindex.', async () => { + // Initialize the service + await multiChainRewindService.init(chainId1, reindex); + + expect(reindex).toHaveBeenCalledTimes(0); + }); + + it('After another chain undergoes a rewind, the current chain starts, which can trigger a reindex.', async () => { + const {rewindDate} = genBlockTimestamp(5); + await multiChainRewindService1.acquireGlobalRewindLock(rewindDate); + + // Initialize the service + await multiChainRewindService.init(chainId1, reindex); + + expect(reindex).toHaveBeenCalledTimes(1); + expect(multiChainRewindService.status).toBe(MultiChainRewindStatus.Incomplete); + expect(multiChainRewindService.waitRewindHeader).toEqual({ + blockHeight: 5, + timestamp: rewindDate, + blockHash: 'hash5', + parentHash: 'hash4', + }); + }); + + it('The current chain has already completed the rewind, and there are still other chains that need to rewind. In this case, starting will not trigger a reindex.', async () => { + const {rewindDate} = genBlockTimestamp(5); + await multiChainRewindService1.acquireGlobalRewindLock(rewindDate); + const tx = await sequelize1.transaction(); + await multiChainRewindService1.releaseChainRewindLock(tx, rewindDate); + await tx.commit(); + + // Initialize the service + await multiChainRewindService.init(chainId1, reindex); + + expect(multiChainRewindService.status).toBe(MultiChainRewindStatus.Complete); + expect(reindex).toHaveBeenCalledTimes(0); + }); + }); +}); diff --git a/packages/node-core/src/indexer/multiChainRewind.service.ts b/packages/node-core/src/indexer/multiChainRewind.service.ts new file mode 100644 index 0000000000..325500eafd --- /dev/null +++ b/packages/node-core/src/indexer/multiChainRewind.service.ts @@ -0,0 +1,252 @@ +// Copyright 2020-2025 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import assert from 'assert'; +import {Inject, Injectable, OnApplicationShutdown} from '@nestjs/common'; +import {hashName} from '@subql/utils'; +import {Transaction, Sequelize} from '@subql/x-sequelize'; +import {Connection} from '@subql/x-sequelize/types/dialects/abstract/connection-manager'; +import {uniqueId} from 'lodash'; +import {PoolClient} from 'pg'; +import {IBlockchainService} from '../blockchain.service'; +import {NodeConfig} from '../configure'; +import {createRewindTrigger, createRewindTriggerFunction, getTriggers} from '../db'; +import {MultiChainRewindEvent} from '../events'; +import {getLogger} from '../logger'; +import {MultiChainRewindStatus} from './entities'; +import {StoreService} from './store.service'; +import {PlainGlobalModel} from './storeModelProvider/global/global'; +import {Header} from './types'; + +const logger = getLogger('MultiChainRewindService'); + +export interface IMultiChainRewindService { + chainId: string; + status: MultiChainRewindStatus; + waitRewindHeader?: Header; + + acquireGlobalRewindLock(rewindTimestamp: Date): Promise; + /** + * Check if the height is consistent before unlocking. + * @param tx + * @param rewindTimestamp The timestamp to roll back to. + * @param allowRewindTimestamp Set a rewind-allowed height; only heights greater than or equal this can be released. + * @returns the number of remaining rewind chains + */ + releaseChainRewindLock(tx: Transaction, rewindTimestamp: Date, allowRewindTimestamp?: Date): Promise; +} + +/** + * Working principle: + * multiChainRewindService is primarily responsible for coordinating multi-chain projects. + * When global.rewindLock changes, a PG trigger sends a notification, and all subscribed chain projects will receive the rollback notification. + * This triggers a rollback process, where the fetch service handles the message by clearing the queue. + * During the next fillNextBlockBuffer loop, if it detects the rewinding state, it will execute the rollback. + */ +@Injectable() +export class MultiChainRewindService implements IMultiChainRewindService, OnApplicationShutdown { + private _status: MultiChainRewindStatus = MultiChainRewindStatus.Normal; + private _chainId?: string; + private dbSchema: string; + private rewindTriggerName: string; + private startHeight = 0; + private pgListener?: PoolClient; + private _globalModel?: PlainGlobalModel = undefined; + private processingPromise: Promise = Promise.resolve(); + waitRewindHeader?: Header; + constructor( + private nodeConfig: NodeConfig, + private sequelize: Sequelize, + private storeService: StoreService, + @Inject('IBlockchainService') private readonly blockchainService: IBlockchainService + ) { + this.dbSchema = this.nodeConfig.dbSchema; + this.rewindTriggerName = hashName(this.dbSchema, 'rewind_trigger', '_global'); + } + + get chainId(): string { + assert(this._chainId, 'chainId is not set'); + return this._chainId; + } + + private set status(status: MultiChainRewindStatus) { + this._status = status; + } + + get status() { + assert(this._status, 'status is not set'); + return this._status; + } + + get globalModel() { + if (!this._globalModel) { + this._globalModel = new PlainGlobalModel(this.dbSchema, this.chainId, this.storeService.globalDataRepo); + } + return this._globalModel; + } + + async onApplicationShutdown() { + await this.processingPromise; + if (this.pgListener) { + this.sequelize.connectionManager.releaseConnection(this.pgListener as Connection); + } + } + + async init(chainId: string, reindex?: (targetHeader: Header) => Promise) { + this._chainId = chainId; + + if (reindex === undefined) { + // When using the reindex command, this parameter is not required. + return; + } + if (!this.storeService.isMultichain) return; + + await this.sequelize.query(`${createRewindTriggerFunction(this.dbSchema)}`); + const rewindTriggers = await getTriggers(this.sequelize, this.rewindTriggerName); + if (rewindTriggers.length === 0) { + await this.sequelize.query(`${createRewindTrigger(this.dbSchema)}`); + } + + const startHeight = await this.storeService.modelProvider.metadata.find('startHeight'); + assert(startHeight !== undefined, 'startHeight is not set'); + this.startHeight = startHeight; + + // Register a listener and create a schema notification sending function. + await this.registerPgListener(); + + if (this.waitRewindHeader) { + const rewindHeader = {...this.waitRewindHeader}; + await reindex(rewindHeader); + return rewindHeader; + } + } + + private async registerPgListener() { + if (this.pgListener) return; + + // Creating a new pgClient is to avoid using the same database connection as the block scheduler, + // which may prevent real-time listening to rollback events. + this.pgListener = (await this.sequelize.connectionManager.getConnection({ + type: 'read', + })) as PoolClient; + + this.pgListener.on('notification', this.notifyHandle.bind(this)); + + await this.pgListener.query(`LISTEN "${this.rewindTriggerName}"`); + logger.info(`Register rewind listener success, chainId: ${this.chainId}`); + + // Check whether the current state is in rollback. + // If a global lock situation occurs, prioritize setting it to the WaitOtherChain state. If a rollback is still required, then set it to the rewinding state. + const chainRewindInfo = await this.globalModel.getChainRewindInfo(); + if (!chainRewindInfo) return; + + if (chainRewindInfo.status === MultiChainRewindStatus.Complete) { + this.status = MultiChainRewindStatus.Complete; + } + if (chainRewindInfo.status === MultiChainRewindStatus.Incomplete) { + this.status = MultiChainRewindStatus.Incomplete; + this.waitRewindHeader = await this.searchWaitRewindHeader(chainRewindInfo.rewindTimestamp); + } + } + + private notifyHandle(msg: any) { + this.processingPromise = this.processingPromise.then(async () => { + assert(msg.payload, 'Payload is empty'); + const {chainId, event: eventType} = JSON.parse(msg.payload) as {chainId: string; event: MultiChainRewindEvent}; + if (chainId !== this.chainId) return; + + const sessionUuid = uniqueId(); + logger.info(`[${sessionUuid}]Received rewind event: ${eventType}, chainId: ${this.chainId}`); + switch (eventType) { + case MultiChainRewindEvent.Rewind: + case MultiChainRewindEvent.RewindTimestampDecreased: { + const chainRewindInfo = await this.globalModel.getChainRewindInfo(); + assert(chainRewindInfo, `Not registered rewind timestamp in global data, chainId: ${this.chainId}`); + + await this.setStatus(MultiChainRewindStatus.Incomplete, chainRewindInfo.rewindTimestamp); + break; + } + case MultiChainRewindEvent.RewindComplete: + await this.setStatus(MultiChainRewindStatus.Complete); + break; + case MultiChainRewindEvent.FullyRewind: + await this.setStatus(MultiChainRewindStatus.Normal); + break; + default: + throw new Error(`Unknown rewind event: ${eventType}`); + } + logger.info(`[${sessionUuid}]Handle success rewind event: ${eventType}, chainId: ${this.chainId}`); + }); + } + + private async searchWaitRewindHeader(rewindTimestamp: Date): Promise
{ + const rewindBlockHeader = await this.getHeaderByBinarySearch(rewindTimestamp); + // The blockHeader.timestamp obtained from the query cannot be used directly, as it will cause an infinite loop. + // Different chains have timestamp discrepancies, which will result in infinite backward tracing. + return {...rewindBlockHeader, timestamp: rewindTimestamp}; + } + + /** + * If the set rewindTimestamp is greater than or equal to the current blockHeight, we do nothing because we will roll back to an earlier time. + * If the set rewindTimestamp is less than the current blockHeight, we should roll back to the earlier rewindTimestamp. + * @param rewindTimestamp rewindTimestamp in milliseconds + */ + async acquireGlobalRewindLock(rewindTimestamp: Date): Promise { + const {lockTimestamp} = await this.globalModel.acquireGlobalRewindLock(rewindTimestamp); + + const existEarlierLock = lockTimestamp < rewindTimestamp; + if (!existEarlierLock) { + logger.info(`setGlobalRewindLock success chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}`); + } + return !existEarlierLock; + } + + async releaseChainRewindLock(tx: Transaction, rewindTimestamp: Date, allowRewindTimestamp?: Date): Promise { + const chainsCount = await this.globalModel.releaseChainRewindLock(tx, rewindTimestamp, allowRewindTimestamp); + // The current chain has completed the rewind, and we still need to wait for other chains to finish. + // When fully synchronized, set the status back to normal by pgListener. + await this.setStatus(MultiChainRewindStatus.Complete); + logger.info(`Rewind success chainId: ${JSON.stringify({chainsCount, chainId: this.chainId, rewindTimestamp})}`); + return chainsCount; + } + + private async setStatus(status: MultiChainRewindStatus, rewindTimestamp?: Date) { + if (status === MultiChainRewindStatus.Incomplete) { + assert(rewindTimestamp, 'rewindTimestamp is not set'); + this.status = MultiChainRewindStatus.Incomplete; + this.waitRewindHeader = await this.searchWaitRewindHeader(rewindTimestamp); + } else { + this.status = status; + this.waitRewindHeader = undefined; + } + } + + /** + * Get the block header closest to the given timestamp + * @param timestamp To find the block closest to a given timestamp + * @returns + */ + private async getHeaderByBinarySearch(timestamp: Header['timestamp']): Promise
{ + let left = this.startHeight; + let {height: right} = await this.storeService.getLastProcessedBlock(); + let searchNum = 0; + while (left < right) { + searchNum++; + const mid = Math.floor((left + right) / 2); + const header = await this.blockchainService.getHeaderForHeight(mid); + + if (header.timestamp === timestamp) { + return header; + } else if (header.timestamp < timestamp) { + left = mid + 1; + } else { + right = mid; + } + } + + const targetHeader = await this.blockchainService.getHeaderForHeight(left); + logger.info(`Binary search times: ${searchNum}, target Header: ${JSON.stringify(targetHeader)}`); + + return targetHeader; + } +} diff --git a/packages/node-core/src/indexer/project.service.spec.ts b/packages/node-core/src/indexer/project.service.spec.ts index 17212b79d7..595839da2c 100644 --- a/packages/node-core/src/indexer/project.service.spec.ts +++ b/packages/node-core/src/indexer/project.service.spec.ts @@ -54,8 +54,8 @@ class TestBlockchainService implements IBlockchainService { // throw new Error('Method onProjectChange not implemented.'); } // eslint-disable-next-line @typescript-eslint/promise-function-async - getBlockTimestamp(height: number): Promise { - return Promise.resolve(undefined); + getBlockTimestamp(height: number): Promise { + return Promise.resolve(new Date()); } getBlockSize(block: IBlock): number { return 0; @@ -112,6 +112,15 @@ class TestBlockchainService implements IBlockchainService { timestamp: new Date(), }; } + // eslint-disable-next-line @typescript-eslint/require-await + async getRequiredHeaderForHeight(height: number): Promise
{ + return { + blockHeight: height, + blockHash: `b${height}`, + parentHash: `b${height - 1}`, + timestamp: new Date(), + }; + } } describe('BaseProjectService', () => { @@ -131,7 +140,8 @@ describe('BaseProjectService', () => { {getDynamicDatasources: jest.fn()} as unknown as DynamicDsService, null as unknown as any, null as unknown as any, - new TestBlockchainService() + new TestBlockchainService(), + null as unknown as any ); }); @@ -424,7 +434,8 @@ describe('BaseProjectService', () => { } as unknown as DynamicDsService, // dynamicDsService new EventEmitter2(), // eventEmitter new UnfinalizedBlocksService(nodeConfig, storeService.modelProvider, blockchainService), // unfinalizedBlocksService - blockchainService + blockchainService, + {init: jest.fn()} as any // MultiChainRewindService ); }; diff --git a/packages/node-core/src/indexer/project.service.ts b/packages/node-core/src/indexer/project.service.ts index c8736864ee..77637da47c 100644 --- a/packages/node-core/src/indexer/project.service.ts +++ b/packages/node-core/src/indexer/project.service.ts @@ -2,28 +2,29 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; -import { isMainThread } from 'worker_threads'; -import { Inject } from '@nestjs/common'; -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { BaseDataSource, IProjectNetworkConfig } from '@subql/types-core'; -import { Sequelize } from '@subql/x-sequelize'; -import { IApi } from '../api.service'; -import { ICoreBlockchainService } from '../blockchain.service'; -import { IProjectUpgradeService, NodeConfig } from '../configure'; -import { IndexerEvent } from '../events'; -import { getLogger } from '../logger'; -import { exitWithError, monitorWrite } from '../process'; -import { getExistingProjectSchema, getStartHeight, hasValue, initDbSchema, mainThreadOnly, reindex } from '../utils'; -import { BlockHeightMap } from '../utils/blockHeightMap'; -import { DsProcessorService } from './ds-processor.service'; -import { DynamicDsService } from './dynamic-ds.service'; -import { MetadataKeys } from './entities'; -import { PoiSyncService } from './poi'; -import { PoiService } from './poi/poi.service'; -import { StoreService } from './store.service'; -import { cacheProviderFlushData } from './storeModelProvider'; -import { ISubqueryProject, IProjectService, BypassBlocks, HistoricalMode, Header } from './types'; -import { IUnfinalizedBlocksService } from './unfinalizedBlocks.service'; +import {isMainThread} from 'worker_threads'; +import {Inject} from '@nestjs/common'; +import {EventEmitter2} from '@nestjs/event-emitter'; +import {BaseDataSource, IProjectNetworkConfig} from '@subql/types-core'; +import {Sequelize} from '@subql/x-sequelize'; +import {IApi} from '../api.service'; +import {ICoreBlockchainService} from '../blockchain.service'; +import {IProjectUpgradeService, NodeConfig} from '../configure'; +import {IndexerEvent} from '../events'; +import {getLogger} from '../logger'; +import {exitWithError, monitorWrite} from '../process'; +import {getExistingProjectSchema, getStartHeight, hasValue, initDbSchema, mainThreadOnly, reindex} from '../utils'; +import {BlockHeightMap} from '../utils/blockHeightMap'; +import {DsProcessorService} from './ds-processor.service'; +import {DynamicDsService} from './dynamic-ds.service'; +import {MetadataKeys} from './entities'; +import {MultiChainRewindService} from './multiChainRewind.service'; +import {PoiSyncService} from './poi'; +import {PoiService} from './poi/poi.service'; +import {StoreService} from './store.service'; +import {cacheProviderFlushData} from './storeModelProvider'; +import {ISubqueryProject, IProjectService, BypassBlocks, HistoricalMode, Header} from './types'; +import {IUnfinalizedBlocksService} from './unfinalizedBlocks.service'; const logger = getLogger('Project'); @@ -36,8 +37,9 @@ class NotInitError extends Error { export class ProjectService< DS extends BaseDataSource = BaseDataSource, API extends IApi = IApi, - UnfinalizedBlocksService extends IUnfinalizedBlocksService = IUnfinalizedBlocksService -> implements IProjectService { + UnfinalizedBlocksService extends IUnfinalizedBlocksService = IUnfinalizedBlocksService, +> implements IProjectService +{ private _schema?: string; private _startHeight?: number; private _blockOffset?: number; @@ -55,7 +57,8 @@ export class ProjectService< private readonly dynamicDsService: DynamicDsService, private eventEmitter: EventEmitter2, @Inject('IUnfinalizedBlocksService') private readonly unfinalizedBlockService: UnfinalizedBlocksService, - @Inject('IBlockchainService') private blockchainService: ICoreBlockchainService + @Inject('IBlockchainService') private blockchainService: ICoreBlockchainService, + private multiChainRewindService: MultiChainRewindService ) { if (this.nodeConfig.unfinalizedBlocks && this.nodeConfig.allowSchemaMigration) { throw new Error('Unfinalized Blocks and Schema Migration cannot be enabled at the same time'); @@ -106,6 +109,7 @@ export class ProjectService< // Init metadata before rest of schema so we can determine the correct project version to create the schema await this.storeService.initCoreTables(this._schema); + await this.ensureMetadata(); // DynamicDsService is dependent on metadata so we need to ensure it exists first await this.dynamicDsService.init(this.storeService.modelProvider.metadata); @@ -135,6 +139,9 @@ export class ProjectService< } const reindexedUpgrade = await this.initUpgradeService(this.startHeight); + + const reindexMultiChain = await this.initMultiChainRewindService(); + // Unfinalized is dependent on POI in some cases, it needs to be init after POI is init const reindexedUnfinalized = await this.initUnfinalizedInternal(); @@ -146,6 +153,10 @@ export class ProjectService< this._startHeight = reindexedUpgrade; } + if (reindexMultiChain !== undefined) { + this._startHeight = reindexMultiChain.blockHeight; + } + // Flush any pending operations to set up DB await cacheProviderFlushData(this.storeService.modelProvider, true); } else { @@ -216,16 +227,16 @@ export class ProjectService< const existing = await metadata.findMany(keys); - const { chain, genesisHash, specName } = this.apiService.networkMeta; + const {chain, genesisHash, specName} = this.apiService.networkMeta; if (this.project.runner) { - const { node, query } = this.project.runner; + const {node, query} = this.project.runner; await metadata.setBulk([ - { key: 'runnerNode', value: node.name }, - { key: 'runnerNodeVersion', value: node.version }, - { key: 'runnerQuery', value: query.name }, - { key: 'runnerQueryVersion', value: query.version }, + {key: 'runnerNode', value: node.name}, + {key: 'runnerNodeVersion', value: node.version}, + {key: 'runnerQuery', value: query.name}, + {key: 'runnerQueryVersion', value: query.version}, ]); } if (!existing.genesisHash) { @@ -337,7 +348,7 @@ export class ProjectService< const nextProject = projects[i + 1][1]; nextMinStartHeight = Math.max( nextProject.dataSources - .filter((ds): ds is DS & { startBlock: number } => !!ds.startBlock) + .filter((ds): ds is DS & {startBlock: number} => !!ds.startBlock) .sort((a, b) => a.startBlock - b.startBlock)[0].startBlock, projects[i + 1][0] ); @@ -352,12 +363,12 @@ export class ProjectService< }[] = []; [...project.dataSources, ...dynamicDs] - .filter((ds): ds is DS & { startBlock: number } => { + .filter((ds): ds is DS & {startBlock: number} => { return !!ds.startBlock && (!nextMinStartHeight || nextMinStartHeight > ds.startBlock); }) .forEach((ds) => { - events.push({ block: Math.max(height, ds.startBlock), start: true, ds }); - if (ds.endBlock) events.push({ block: ds.endBlock + 1, start: false, ds }); + events.push({block: Math.max(height, ds.startBlock), start: true, ds}); + if (ds.endBlock) events.push({block: ds.endBlock + 1, start: false, ds}); }); // sort events by block in ascending order, start events come before end events @@ -439,6 +450,9 @@ export class ProjectService< } return undefined; } + private async initMultiChainRewindService(): Promise
{ + return this.multiChainRewindService.init(this.apiService.networkMeta.chain, this.reindex.bind(this)); + } private async handleProjectChange(): Promise { if (isMainThread && !this.nodeConfig.allowSchemaMigration) { @@ -464,12 +478,13 @@ export class ProjectService< return reindex( this.getStartBlockFromDataSources(), targetBlockHeader, - { height, timestamp }, + {height, timestamp}, this.storeService, this.unfinalizedBlockService, this.dynamicDsService, this.sequelize, this.projectUpgradeService, + this.multiChainRewindService, this.nodeConfig.proofOfIndex ? this.poiService : undefined /* Not providing force clean service, it should never be needed */ ); diff --git a/packages/node-core/src/indexer/store.service.test.ts b/packages/node-core/src/indexer/store.service.test.ts index 38a20c70bc..e22666dc06 100644 --- a/packages/node-core/src/indexer/store.service.test.ts +++ b/packages/node-core/src/indexer/store.service.test.ts @@ -18,6 +18,10 @@ const option: DbOption = { }; jest.setTimeout(60000); +// Mock 1740100000 is the timestamp of the genesis block +const genBlockTimestamp = (height: number) => (1740100000 + height) * 1000; +const genBlockDate = (height: number) => new Date(genBlockTimestamp(height)); + const testSchemaName = 'test_model_store'; const schema = buildSchemaFromString(` type Account @entity { @@ -57,7 +61,12 @@ describe('Check whether the db store and cache store are consistent.', () => { }); it('Same block, Execute the set method multiple times.', async () => { - await storeService.setBlockHeader({blockHeight: 1, blockHash: '0x01', parentHash: '0x00'}); + await storeService.setBlockHeader({ + blockHeight: 1, + blockHash: '0x01', + parentHash: '0x00', + timestamp: genBlockDate(1), + }); const accountEntity = {id: 'block-001', balance: 100}; @@ -121,7 +130,12 @@ describe('Check whether the db store and cache store are consistent.', () => { }, 30000); it('_block_range update check', async () => { - await storeService.setBlockHeader({blockHeight: 1000, blockHash: '0x1000', parentHash: '0x0999'}); + await storeService.setBlockHeader({ + blockHeight: 1000, + blockHash: '0x1000', + parentHash: '0x0999', + timestamp: genBlockDate(1000), + }); // insert new account. const account1000Data = {id: 'block-1000', balance: 999}; @@ -207,7 +221,12 @@ describe('Cache Provider', () => { tx.afterCommit(() => { Account.clear(blockHeight); }); - await storeService.setBlockHeader({blockHeight, blockHash: `0x${blockHeight}`, parentHash: `0x${blockHeight - 1}`}); + await storeService.setBlockHeader({ + blockHeight, + blockHash: `0x${blockHeight}`, + parentHash: `0x${blockHeight - 1}`, + timestamp: genBlockDate(blockHeight), + }); await handle(blockHeight); await Account.runFlush(tx, blockHeight); await tx.commit(); diff --git a/packages/node-core/src/indexer/store.service.ts b/packages/node-core/src/indexer/store.service.ts index c7912f28be..bbe69d6484 100644 --- a/packages/node-core/src/indexer/store.service.ts +++ b/packages/node-core/src/indexer/store.service.ts @@ -31,11 +31,20 @@ import { getDbSizeAndUpdateMetadata, getTriggers, SchemaMigrationService, + tableExistsQuery, } from '../db'; import {getLogger} from '../logger'; import {exitWithError} from '../process'; -import {camelCaseObjectKey, customCamelCaseGraphqlKey, getHistoricalUnit} from '../utils'; -import {MetadataFactory, MetadataRepo, PoiFactory, PoiFactoryDeprecate, PoiRepo} from './entities'; +import {camelCaseObjectKey, customCamelCaseGraphqlKey, getHistoricalUnit, hasValue} from '../utils'; +import { + GlobalDataFactory, + GlobalDataRepo, + MetadataFactory, + MetadataRepo, + PoiFactory, + PoiFactoryDeprecate, + PoiRepo, +} from './entities'; import {Store} from './store'; import {IMetadata, IStoreModelProvider, PlainStoreModelService} from './storeModelProvider'; import {StoreOperations} from './StoreOperations'; @@ -63,10 +72,12 @@ export class StoreService { poiRepo?: PoiRepo; private _modelIndexedFields?: IndexField[]; private _modelsRelations?: GraphQLModelsRelationsEnums; + private _globalDataRepo?: GlobalDataRepo; private _metaDataRepo?: MetadataRepo; private _historical?: HistoricalMode; private _metadataModel?: IMetadata; private _schema?: string; + private _isMultichain?: boolean; // Should be updated each block private _blockHeader?: Header; private _operationStack?: StoreOperations; @@ -104,6 +115,11 @@ export class StoreService { return this._operationStack; } + get globalDataRepo(): GlobalDataRepo { + assert(this._globalDataRepo, new NoInitError()); + return this._globalDataRepo; + } + get blockHeader(): Header { assert(this._blockHeader, new Error('StoreService.setBlockHeader has not been called')); return this._blockHeader; @@ -118,6 +134,11 @@ export class StoreService { return this.#transaction; } + get isMultichain(): boolean { + assert(this._isMultichain !== undefined, new NoInitError()); + return this._isMultichain; + } + async syncDbSize(): Promise { if (!this._lastTimeDbSizeChecked || Date.now() - this._lastTimeDbSizeChecked > DB_SIZE_CACHE_TIMEOUT) { this._lastTimeDbSizeChecked = Date.now(); @@ -151,14 +172,20 @@ export class StoreService { this.poiRepo = usePoiFactory(this.sequelize, schema); } + this._schema = schema; + + await this.setMultiChainProject(); + this._metaDataRepo = await MetadataFactory( this.sequelize, schema, - this.config.multiChain, + this.isMultichain, this.subqueryProject.network.chainId ); - this._schema = schema; + if (this.isMultichain) { + this._globalDataRepo = GlobalDataFactory(this.sequelize, schema); + } await this.sequelize.sync(); @@ -293,10 +320,7 @@ export class StoreService { const {historical, multiChain} = this.config; try { - const tableRes = await this.sequelize.query>( - `SELECT table_name FROM information_schema.tables where table_schema='${schema}'`, - {type: QueryTypes.SELECT} - ); + const tableRes = await this.sequelize.query>(tableExistsQuery(schema), {type: QueryTypes.SELECT}); const metadataTableNames = flatten(tableRes).filter( (value: string) => METADATA_REGEX.test(value) || MULTI_METADATA_REGEX.test(value) @@ -482,6 +506,28 @@ group by // Cant throw here because even with historical disabled the current height is used by the store return getHistoricalUnit(this.historical, this.blockHeader); } + + async getLastProcessedBlock(): Promise<{height: number; timestamp?: number}> { + const {lastProcessedBlockTimestamp: timestamp, lastProcessedHeight: height} = await this.metadataModel.findMany([ + 'lastProcessedHeight', + 'lastProcessedBlockTimestamp', + ]); + + return {height: height || 0, timestamp}; + } + + private async setMultiChainProject() { + if (this.config.multiChain) { + this._isMultichain = true; + return; + } + + const tableRes = await this.sequelize.query>(tableExistsQuery(this.schema), { + type: QueryTypes.SELECT, + }); + + this._isMultichain = !!flatten(tableRes).find((value: string) => MULTI_METADATA_REGEX.test(value)); + } } // REMOVE 10,000 record per batch diff --git a/packages/node-core/src/indexer/storeModelProvider/global/global.ts b/packages/node-core/src/indexer/storeModelProvider/global/global.ts new file mode 100644 index 0000000000..6d97bd9476 --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/global/global.ts @@ -0,0 +1,200 @@ +// Copyright 2020-2025 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import assert from 'assert'; +import {getLogger} from '@subql/node-core/logger'; +import {MULTI_METADATA_REGEX} from '@subql/utils'; +import {Op, QueryTypes, Sequelize, Transaction} from '@subql/x-sequelize'; +import {flatten} from 'lodash'; +import {tableExistsQuery} from '../../../db'; +import {GlobalData, GlobalDataRepo, MultiChainRewindStatus} from '../../entities'; + +export interface IGlobalData { + getChainRewindInfo(): Promise; + + acquireGlobalRewindLock(rewindTimestamp: Date): Promise<{lockTimestamp: Date}>; + /** + * Check if the height is consistent before unlocking. + * @param tx + * @param rewindTimestamp The timestamp to roll back to, in milliseconds. + * @param allowRewindTimestamp Set a rewind-allowed height; only heights greater than or equal this can be released. + * @returns the number of remaining rewind chains + */ + releaseChainRewindLock(tx: Transaction, rewindTimestamp: Date, allowRewindTimestamp?: Date): Promise; +} + +const logger = getLogger('PlainGlobalModel'); + +export class PlainGlobalModel implements IGlobalData { + constructor( + private readonly dbSchema: string, + private readonly chainId: string, + private readonly model: GlobalDataRepo + ) {} + + private get sequelize(): Sequelize { + const sequelize = this.model.sequelize; + + if (!sequelize) { + throw new Error(`Sequelize is not available on ${this.model.name}`); + } + + return sequelize; + } + + async getChainRewindInfo(): Promise { + const rewindTimestampInfo = await this.model.findOne({ + where: {chainId: this.chainId}, + }); + + return rewindTimestampInfo; + } + + /** + * If the set rewindTimestamp is greater than or equal to the current blockHeight, we do nothing because we will roll back to an earlier time. + * If the set rewindTimestamp is less than the current blockHeight, we should roll back to the earlier rewindTimestamp. + * The reason for not using the external tx variable here is to complete the locking task as soon as possible and promptly send a notification to allow other chains to rewind. + * @param rewindTimestamp + * @param tx + * @returns + */ + async acquireGlobalRewindLock(rewindTimestamp: Date): Promise<{lockTimestamp: Date}> { + const tx = await this.sequelize.transaction(); + const {chainsCount, currentChain} = await this.getChainsInfo(tx); + try { + if (chainsCount) { + assert(currentChain, `Not found chainId: ${this.chainId} in global data`); + // Exist rewind task + let lockTimestamp = currentChain.rewindTimestamp; + const [affectedCount] = await this.model.update( + { + status: MultiChainRewindStatus.Incomplete, + rewindTimestamp, + initiator: false, + }, + { + where: { + rewindTimestamp: {[Op.gt]: rewindTimestamp}, + }, + transaction: tx, + } + ); + + assert( + affectedCount === 0 || affectedCount === chainsCount, + `Set global rewind lock failed, chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}` + ); + + if (affectedCount === chainsCount) { + lockTimestamp = rewindTimestamp; + await this.model.update({initiator: true}, {where: {chainId: this.chainId}, transaction: tx}); + } + + await tx.commit(); + return {lockTimestamp}; + } else { + const chainIds = await this.getChainIdsFromMetadata(tx); + // No rewind task, set the current chain as the initiator + await this.model.bulkCreate( + chainIds.map((chainId) => ({ + chainId, + status: MultiChainRewindStatus.Incomplete, + rewindTimestamp, + initiator: chainId === this.chainId, + })), + {transaction: tx} + ); + await tx.commit(); + return {lockTimestamp: rewindTimestamp}; + } + } catch (e: any) { + logger.error( + `setGlobalRewindLock failed chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}, errorMsg: ${e.message}` + ); + await tx.rollback(); + throw e; + } + } + + /** + * The following special cases may exist: + * 1. A table lock needs to be added before release to prevent multiple chains from releasing simultaneously, causing a deadlock. If one chain is setting setGlobalRewindLock, others need to queue (so it’s necessary to check if it’s the lock for the current timestamp; if not, the release fails, and the rollback fails). + * 2. There may be a case where lastProcessBlock < rewindTime, in which case the lock should be released directly (force = true). + * @param rewindTimestamp + * + * @returns + */ + async releaseChainRewindLock(tx: Transaction, rewindTimestamp: Date, allowRewindTimestamp?: Date): Promise { + // A table lock should be used here to prevent multiple chains from releasing simultaneously, causing a deadlock. + const {currentChain, waitChainCount} = await this.getChainsInfo(tx); + assert(currentChain, `Not registered rewind timestamp key in global data, chainId: ${this.chainId}`); + + if (currentChain.status === MultiChainRewindStatus.Complete) { + // Already completed, no need to release + return waitChainCount; + } + if (allowRewindTimestamp && allowRewindTimestamp > currentChain.rewindTimestamp) { + throw new Error( + 'Rewind lock timestamp is less than the allowed rewind timestamp; releasing the lock is not permitted.' + ); + } + + assert( + currentChain.rewindTimestamp.getTime() === rewindTimestamp.getTime(), + `Chain rewind lock mismatch: chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}, status: ${currentChain.status}` + ); + + const [affectedCount] = await this.model.update( + {status: MultiChainRewindStatus.Complete}, + { + where: { + chainId: this.chainId, + status: MultiChainRewindStatus.Incomplete, + rewindTimestamp, + }, + transaction: tx, + } + ); + + assert(affectedCount === 1, `Release chain rewind lock failed, chainId: ${this.chainId}`); + + if (waitChainCount - 1 === 0) { + // Everything is complete, release the lock. + await this.model.destroy({where: {status: MultiChainRewindStatus.Complete}, transaction: tx}); + } + + return waitChainCount - 1; + } + + private async getChainsInfo(tx: Transaction) { + const chainList = await this.model.findAll({transaction: tx, lock: tx.LOCK.UPDATE}); + const currentChain = chainList.find((chain) => chain.chainId === this.chainId); + const waitChainCount = chainList.filter((chain) => chain.status === MultiChainRewindStatus.Incomplete).length; + return {currentChain, chainsCount: chainList.length, waitChainCount}; + } + + async getChainIdsFromMetadata(tx: Transaction): Promise { + const tableRes = await this.sequelize.query>(tableExistsQuery(this.dbSchema), { + type: QueryTypes.SELECT, + }); + const multiMetadataTables: string[] = flatten(tableRes).filter((value: string) => MULTI_METADATA_REGEX.test(value)); + assert( + multiMetadataTables.length > 0, + `No multi metadata tables found in the database. Please check your schema or configuration.` + ); + + const metadataRes = await Promise.all( + multiMetadataTables.map((table) => + this.sequelize.query<{value: string}>( + `SELECT "value" FROM "${this.dbSchema}"."${table}" WHERE "key" = 'chain'`, + { + type: QueryTypes.SELECT, + transaction: tx, + } + ) + ) + ); + + return metadataRes.map((metadata) => metadata[0].value); + } +} diff --git a/packages/node-core/src/indexer/storeModelProvider/global/index.ts b/packages/node-core/src/indexer/storeModelProvider/global/index.ts new file mode 100644 index 0000000000..6e4c701873 --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/global/index.ts @@ -0,0 +1,4 @@ +// Copyright 2020-2025 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +export {IGlobalData} from './global'; diff --git a/packages/node-core/src/indexer/types.ts b/packages/node-core/src/indexer/types.ts index c3b3cd420b..4690b9c8d2 100644 --- a/packages/node-core/src/indexer/types.ts +++ b/packages/node-core/src/indexer/types.ts @@ -27,7 +27,7 @@ export interface ISubqueryProject< C = unknown, > extends Omit, 'schema' | 'version' | 'name' | 'specVersion' | 'description'> { readonly schema: GraphQLSchema; - applyCronTimestamps: (getBlockTimestamp: (height: number) => Promise) => Promise; + applyCronTimestamps: (getBlockTimestamp: (height: number) => Promise) => Promise; readonly id: string; chainTypes?: C; // The chainTypes after loaded readonly root: string; @@ -76,7 +76,7 @@ export type Header = { blockHeight: number; blockHash: string; parentHash: string | undefined; - timestamp?: Date; + timestamp: Date; }; export type BypassBlocks = (number | `${number}-${number}`)[]; diff --git a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts index 258c60912c..7ef91ef6fa 100644 --- a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts +++ b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts @@ -302,7 +302,7 @@ export class UnfinalizedBlocksService implements IUnfinalizedBlocksServ const result: (Header & {timestamp: string})[] = JSON.parse(val); return result.map(({timestamp, ...header}) => ({ ...header, - timestamp: timestamp ? new Date(timestamp) : undefined, + timestamp: new Date(timestamp), })); } return []; diff --git a/packages/node-core/src/subcommands/reindex.service.ts b/packages/node-core/src/subcommands/reindex.service.ts index 4899f3c2a8..a72cd06d9d 100644 --- a/packages/node-core/src/subcommands/reindex.service.ts +++ b/packages/node-core/src/subcommands/reindex.service.ts @@ -2,11 +2,11 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; -import { Inject, Injectable } from '@nestjs/common'; -import { BaseDataSource } from '@subql/types-core'; -import { Sequelize } from '@subql/x-sequelize'; -import { IBlockchainService } from '../blockchain.service'; -import { NodeConfig, ProjectUpgradeService } from '../configure'; +import {Inject, Injectable} from '@nestjs/common'; +import {BaseDataSource} from '@subql/types-core'; +import {Sequelize} from '@subql/x-sequelize'; +import {IBlockchainService} from '../blockchain.service'; +import {NodeConfig, ProjectUpgradeService} from '../configure'; import { IUnfinalizedBlocksService, StoreService, @@ -15,19 +15,20 @@ import { IMetadata, cacheProviderFlushData, Header, + MultiChainRewindService, } from '../indexer'; -import { DynamicDsService } from '../indexer/dynamic-ds.service'; -import { getLogger } from '../logger'; -import { exitWithError, monitorWrite } from '../process'; -import { getExistingProjectSchema, initDbSchema, reindex } from '../utils'; -import { ForceCleanService } from './forceClean.service'; +import {DynamicDsService} from '../indexer/dynamic-ds.service'; +import {getLogger} from '../logger'; +import {exitWithError, monitorWrite} from '../process'; +import {getExistingProjectSchema, initDbSchema, reindex} from '../utils'; +import {ForceCleanService} from './forceClean.service'; const logger = getLogger('Reindex'); @Injectable() export class ReindexService

{ private _metadataRepo?: IMetadata; - private _lastProcessedHeader?: { height: number; timestamp?: number }; + private _lastProcessedHeader?: {height: number; timestamp?: number}; constructor( private readonly sequelize: Sequelize, @@ -40,6 +41,7 @@ export class ReindexService

, @Inject('DynamicDsService') private readonly dynamicDsService: DynamicDsService, @Inject('IBlockchainService') private readonly blockchainService: IBlockchainService, + private readonly multiChainRewindService: MultiChainRewindService ) {} private get metadataRepo(): IMetadata { @@ -65,10 +67,7 @@ export class ReindexService

blockHeight <= inputHeight); + const bestBlocks = unfinalizedBlocks.filter(({blockHeight}) => blockHeight <= inputHeight); if (bestBlocks.length && inputHeight >= bestBlocks[0].blockHeight) { return bestBlocks[0]; } @@ -103,7 +107,7 @@ export class ReindexService

= (ds: DS) => ds is DS; // eslint-disable-next-line @typescript-eslint/require-await export async function insertBlockFiltersCronSchedules( dataSources: DS[], - getBlockTimestamp: (height: number) => Promise, + getBlockTimestamp: (height: number) => Promise, isRuntimeDs: IsRuntimeDs, blockHandlerKind: string ): Promise { @@ -248,13 +248,7 @@ export async function insertBlockFiltersCronSchedules, sequelize: Sequelize, projectUpgradeService: IProjectUpgradeService, + multichainRewindService: MultiChainRewindService, poiService?: PoiService, forceCleanService?: ForceCleanService ): Promise { @@ -59,11 +62,17 @@ export async function reindex( logger.warn( `Skipping reindexing to ${storeService.historical} ${targetUnit}: current indexing height ${lastUnit} is behind requested ${storeService.historical}` ); + if (storeService.isMultichain) { + const tx = await sequelize.transaction(); + await multichainRewindService.releaseChainRewindLock(tx, new Date(targetUnit), new Date(lastUnit || 0)); + await tx.commit(); + } return; } // if startHeight is greater than the targetHeight, just force clean - if (targetBlockHeader.blockHeight < startHeight) { + // We prevent the entire data from being cleared due to multiple chains because the startblock is uncertain in multi-chain projects. + if (targetBlockHeader.blockHeight < startHeight && !storeService.isMultichain) { logger.info( `targetHeight: ${targetBlockHeader.blockHeight} is less than startHeight: ${startHeight}. Hence executing force-clean` ); @@ -73,45 +82,54 @@ export async function reindex( // if DB need rollback? no, because forceCleanService will take care of it await cacheProviderResetData(storeService.modelProvider); await forceCleanService?.forceClean(); - } else { - logger.info(`Reindexing to ${storeService.historical}: ${targetUnit}`); - await cacheProviderFlushData(storeService.modelProvider, true); - await cacheProviderResetData(storeService.modelProvider); - if (storeService.modelProvider instanceof StoreCacheService) { - await storeService.modelProvider.flushData(true); - await storeService.modelProvider.resetData(); + return; + } + + logger.info(`Reindexing to ${storeService.historical}: ${targetUnit}`); + if (storeService.isMultichain) { + const needRewind = await multichainRewindService.acquireGlobalRewindLock(new Date(targetUnit)); + if (!needRewind) { + logger.warn(`Rewind to ${storeService.historical} ${targetUnit} is not needed`); + return; } - const transaction = await sequelize.transaction(); - try { - /* + } + + await cacheProviderFlushData(storeService.modelProvider, true); + await cacheProviderResetData(storeService.modelProvider); + if (storeService.modelProvider instanceof StoreCacheService) { + await storeService.modelProvider.flushData(true); + await storeService.modelProvider.resetData(); + } + const transaction = await sequelize.transaction(); + try { + /* Must initialize storeService, to ensure all models are loaded, as storeService.init has not been called at this point - 1. During runtime, model should be already been init - 2.1 On start, projectUpgrade rewind will sync the sequelize models - 2.2 On start, without projectUpgrade or upgradablePoint, sequelize will sync models through project.service + 1. During runtime, model should be already been init + 2.1 On start, projectUpgrade rewind will sync the sequelize models + 2.2 On start, without projectUpgrade or upgradablePoint, sequelize will sync models through project.service */ - await projectUpgradeService.rewind( - targetBlockHeader.blockHeight, - lastProcessed.height, - transaction, - storeService - ); + await projectUpgradeService.rewind(targetBlockHeader.blockHeight, lastProcessed.height, transaction, storeService); - await Promise.all([ - storeService.rewind(targetBlockHeader, transaction), - unfinalizedBlockService.resetUnfinalizedBlocks(), // TODO: may not needed for nonfinalized chains - unfinalizedBlockService.resetLastFinalizedVerifiedHeight(), // TODO: may not needed for nonfinalized chains - dynamicDsService.resetDynamicDatasource(targetBlockHeader.blockHeight, transaction), - poiService?.rewind(targetBlockHeader.blockHeight, transaction), - ]); - // Flush metadata changes from above Promise.all - await storeService.modelProvider.metadata.flush?.(transaction, targetUnit); + await Promise.all([ + storeService.rewind(targetBlockHeader, transaction), + unfinalizedBlockService.resetUnfinalizedBlocks(), // TODO: may not needed for nonfinalized chains + unfinalizedBlockService.resetLastFinalizedVerifiedHeight(), // TODO: may not needed for nonfinalized chains + dynamicDsService.resetDynamicDatasource(targetBlockHeader.blockHeight, transaction), + poiService?.rewind(targetBlockHeader.blockHeight, transaction), + ]); + // Flush metadata changes from above Promise.all + await storeService.modelProvider.metadata.flush?.(transaction, targetUnit); - await transaction.commit(); - logger.info('Reindex Success'); - } catch (err: any) { - logger.error(err, 'Reindexing failed'); - await transaction.rollback(); - throw err; + // release rewind lock + if (storeService.isMultichain) { + await multichainRewindService.releaseChainRewindLock(transaction, new Date(targetUnit)); } + + await transaction.commit(); + logger.info('Reindex Success'); + } catch (err: any) { + logger.error(err, 'Reindexing failed'); + await transaction.rollback(); + throw err; } } diff --git a/packages/node-core/src/yargs.ts b/packages/node-core/src/yargs.ts index 1f4b1ea6dd..ec36cc9051 100644 --- a/packages/node-core/src/yargs.ts +++ b/packages/node-core/src/yargs.ts @@ -135,11 +135,6 @@ export function yargsBuilder< describe: 'Specify the dictionary api for this network', type: 'string', }, - 'network-endpoint': { - demandOption: false, - type: 'string', - describe: 'Blockchain network endpoint to connect', - }, 'primary-network-endpoint': { demandOption: false, type: 'string', @@ -380,6 +375,11 @@ export function yargsBuilder< describe: 'Local path or IPFS cid of the subquery project', type: 'string', }, + 'network-endpoint': { + demandOption: false, + type: 'string', + describe: 'Blockchain network endpoint to connect', + }, }) ); } diff --git a/packages/node/src/blockchain.service.spec.ts b/packages/node/src/blockchain.service.spec.ts index 9e8652d900..1ebde105ef 100644 --- a/packages/node/src/blockchain.service.spec.ts +++ b/packages/node/src/blockchain.service.spec.ts @@ -55,4 +55,9 @@ describe('BlockchainService', () => { const interval = await blockchainService.getChainInterval(); expect(interval).toEqual(5000); }); + + it('can get the chain create time', async () => { + const requiredHeader = await blockchainService.getHeaderForHeight(24723095); + expect(requiredHeader.timestamp.getTime()).toEqual(1739501268001); + }, 10000); }); diff --git a/packages/node/src/blockchain.service.ts b/packages/node/src/blockchain.service.ts index 374bdac380..62dcf1d8bf 100644 --- a/packages/node/src/blockchain.service.ts +++ b/packages/node/src/blockchain.service.ts @@ -35,7 +35,7 @@ import { calcInterval, getBlockByHeight, getTimestamp, - substrateHeaderToHeader, + getHeaderForHash, } from './utils/substrate'; const BLOCK_TIME_VARIANCE = 5000; //ms @@ -115,9 +115,20 @@ export class BlockchainService this.apiService.updateBlockFetching(); } - async getBlockTimestamp(height: number): Promise { + async getBlockTimestamp(height: number): Promise { const block = await getBlockByHeight(this.apiService.api, height); - return getTimestamp(block); + + let timestamp = getTimestamp(block); + if (!timestamp) { + // Not all networks have a block timestamp, e.g. Shiden + const blockTimestamp = await ( + await this.apiService.unsafeApi.at(block.hash) + ).query.timestamp.now(); + + timestamp = new Date(blockTimestamp.toNumber()); + } + + return timestamp; } getBlockSize(block: IBlock): number { @@ -127,9 +138,8 @@ export class BlockchainService async getFinalizedHeader(): Promise

{ const finalizedHash = await this.apiService.unsafeApi.rpc.chain.getFinalizedHead(); - const finalizedHeader = - await this.apiService.unsafeApi.rpc.chain.getHeader(finalizedHash); - return substrateHeaderToHeader(finalizedHeader); + + return this.getHeaderForHash(finalizedHash.toHex()); } async getBestHeight(): Promise { @@ -148,9 +158,7 @@ export class BlockchainService // TODO can this decorator be in unfinalizedBlocks Service? @mainThreadOnly() async getHeaderForHash(hash: string): Promise
{ - return substrateHeaderToHeader( - await this.apiService.unsafeApi.rpc.chain.getHeader(hash), - ); + return getHeaderForHash(this.apiService.unsafeApi, hash); } // TODO can this decorator be in unfinalizedBlocks Service? diff --git a/packages/node/src/indexer/fetch.module.ts b/packages/node/src/indexer/fetch.module.ts index 879e58bcd8..bf39476557 100644 --- a/packages/node/src/indexer/fetch.module.ts +++ b/packages/node/src/indexer/fetch.module.ts @@ -19,6 +19,7 @@ import { DynamicDsService, FetchService, DictionaryService, + MultiChainRewindService, blockDispatcherFactory, } from '@subql/node-core'; import { SubstrateDatasource } from '@subql/types'; @@ -62,6 +63,7 @@ import { IIndexerWorker } from './worker/worker'; useClass: ProjectService, provide: 'IProjectService', }, + MultiChainRewindService, IndexerManager, { provide: 'IBlockDispatcher', @@ -89,6 +91,7 @@ import { IIndexerWorker } from './worker/worker'; ConnectionPoolStateManager, 'IBlockchainService', IndexerManager, + MultiChainRewindService, MonitorService, ], }, diff --git a/packages/node/src/indexer/project.service.spec.ts b/packages/node/src/indexer/project.service.spec.ts index 36c2e7f3dd..c650f1ad04 100644 --- a/packages/node/src/indexer/project.service.spec.ts +++ b/packages/node/src/indexer/project.service.spec.ts @@ -12,6 +12,7 @@ import { upgradableSubqueryProject, DsProcessorService, DynamicDsService, + MultiChainRewindService, } from '@subql/node-core'; import { SubstrateDatasourceKind, SubstrateHandlerKind } from '@subql/types'; import { GraphQLSchema } from 'graphql'; @@ -145,6 +146,7 @@ describe('ProjectService', () => { apiService: ApiService, project: SubqueryProject, blockchainService: BlockchainService, + multiChainRewindService: MultiChainRewindService, ) => new TestProjectService( { @@ -172,14 +174,25 @@ describe('ProjectService', () => { null as unknown as any, null as unknown as any, blockchainService, + multiChainRewindService, ), - inject: ['APIService', 'ISubqueryProject', 'IBlockchainService'], + inject: [ + 'APIService', + 'ISubqueryProject', + 'IBlockchainService', + MultiChainRewindService, + ], }, EventEmitter2, { provide: 'APIService', useFactory: ApiService.init, - inject: ['ISubqueryProject', ConnectionPoolService, EventEmitter2, NodeConfig] + inject: [ + 'ISubqueryProject', + ConnectionPoolService, + EventEmitter2, + NodeConfig, + ], }, { provide: ProjectUpgradeService, @@ -194,6 +207,11 @@ describe('ProjectService', () => { provide: 'IBlockchainService', useClass: BlockchainService, }, + { + provide: MultiChainRewindService, + // eslint-disable-next-line @typescript-eslint/no-empty-function + useValue: { init: () => {} }, + }, ], imports: [EventEmitterModule.forRoot()], }).compile(); diff --git a/packages/node/src/indexer/worker/worker-fetch.module.ts b/packages/node/src/indexer/worker/worker-fetch.module.ts index 8c4bc8ee29..8cbe249478 100644 --- a/packages/node/src/indexer/worker/worker-fetch.module.ts +++ b/packages/node/src/indexer/worker/worker-fetch.module.ts @@ -9,6 +9,7 @@ import { WorkerCoreModule, ProjectService, DsProcessorService, + MultiChainRewindService, } from '@subql/node-core'; import { BlockchainService } from '../../blockchain.service'; import { ApiService } from '../api.service'; @@ -48,6 +49,7 @@ import { WorkerService } from './worker.service'; provide: 'IBlockchainService', useClass: BlockchainService, }, + MultiChainRewindService, WorkerService, ], exports: [], diff --git a/packages/node/src/indexer/worker/worker.service.ts b/packages/node/src/indexer/worker/worker.service.ts index c97ee07287..838972e73f 100644 --- a/packages/node/src/indexer/worker/worker.service.ts +++ b/packages/node/src/indexer/worker/worker.service.ts @@ -17,11 +17,7 @@ import { ApiService } from '../api.service'; import { SpecVersion } from '../dictionary'; import { IndexerManager } from '../indexer.manager'; import { WorkerRuntimeService } from '../runtime/workerRuntimeService'; -import { - BlockContent, - getBlockSize, - LightBlockContent, -} from '../types'; +import { BlockContent, getBlockSize, LightBlockContent } from '../types'; export type FetchBlockResponse = Header & { specVersion?: number }; diff --git a/packages/node/src/subcommands/reindex.module.ts b/packages/node/src/subcommands/reindex.module.ts index faf76fe32a..917041daaa 100644 --- a/packages/node/src/subcommands/reindex.module.ts +++ b/packages/node/src/subcommands/reindex.module.ts @@ -17,6 +17,7 @@ import { DsProcessorService, UnfinalizedBlocksService, DynamicDsService, + MultiChainRewindService, } from '@subql/node-core'; import { Sequelize } from '@subql/x-sequelize'; import { BlockchainService } from '../blockchain.service'; @@ -65,6 +66,7 @@ import { RuntimeService } from '../indexer/runtime/runtimeService'; provide: 'IBlockchainService', useClass: BlockchainService, }, + MultiChainRewindService, SchedulerRegistry, ], controllers: [], diff --git a/packages/node/src/utils/substrate.test.ts b/packages/node/src/utils/substrate.test.ts index 257bd32879..21cd9aad12 100644 --- a/packages/node/src/utils/substrate.test.ts +++ b/packages/node/src/utils/substrate.test.ts @@ -9,6 +9,7 @@ import { fetchBlocksBatches, filterExtrinsic, getBlockByHeight, + getHeaderForHash, getTimestamp, } from './substrate'; @@ -102,4 +103,16 @@ describe('substrate utils', () => { expect(getTimestamp(block1)).toBeUndefined(); await api.disconnect(); }); + + it('return defined if no timestamp set extrinsic', async () => { + const provider = new WsProvider(ENDPOINT_SHIDEN); + const api = await ApiPromise.create({ provider }); + const block1 = await getBlockByHeight(api, 999999); + const { timestamp } = await getHeaderForHash( + api, + block1.block.header.hash.toString(), + ); + expect(timestamp).toBeDefined(); + await api.disconnect(); + }); }); diff --git a/packages/node/src/utils/substrate.ts b/packages/node/src/utils/substrate.ts index fc1899c255..0890ac6306 100644 --- a/packages/node/src/utils/substrate.ts +++ b/packages/node/src/utils/substrate.ts @@ -39,7 +39,9 @@ const INTERVAL_THRESHOLD = BN_THOUSAND.div(BN_TWO); const DEFAULT_TIME = new BN(6_000); const A_DAY = new BN(24 * 60 * 60 * 1000); -export function substrateHeaderToHeader(header: SubstrateHeader): Header { +type MissTsHeader = Omit; + +export function substrateHeaderToHeader(header: SubstrateHeader): MissTsHeader { return { blockHeight: header.number.toNumber(), blockHash: header.hash.toHex(), @@ -48,9 +50,15 @@ export function substrateHeaderToHeader(header: SubstrateHeader): Header { } export function substrateBlockToHeader(block: SignedBlock): Header { + const timestamp = getTimestamp(block); + assert( + timestamp, + 'Failed to retrieve a reliable timestamp. This issue is more likely to occur on networks like Shiden', + ); + return { ...substrateHeaderToHeader(block.block.header), - timestamp: getTimestamp(block), + timestamp, }; } @@ -90,6 +98,22 @@ export function getTimestamp({ return undefined; } +export async function getHeaderForHash( + api: ApiPromise, + blockHash: string, +): Promise
{ + const block = await api.rpc.chain.getBlock(blockHash).catch((e) => { + logger.error( + `failed to fetch Block hash="${blockHash}" ${getApiDecodeErrMsg( + e.message, + )}`, + ); + throw ApiPromiseConnection.handleError(e); + }); + + return substrateBlockToHeader(block); +} + export function wrapExtrinsics( wrappedBlock: SubstrateBlock, allEvents: EventRecord[], @@ -425,7 +449,7 @@ export async function fetchLightBlock( throw ApiPromiseConnection.handleError(e); }); - const [header, events] = await Promise.all([ + const [header, events, timestamp] = await Promise.all([ api.rpc.chain.getHeader(blockHash).catch((e) => { logger.error( `failed to fetch Block Header hash="${blockHash}" height="${height}"`, @@ -436,6 +460,8 @@ export async function fetchLightBlock( logger.error(`failed to fetch events at block ${blockHash}`); throw ApiPromiseConnection.handleError(e); }), + // TODO: Maybe api.query.timestamp.now.at(blockHash) is the only option. If we do use it we need sufficient tests and errors if a chain doesn't support getting the timestamp. + (await api.at(blockHash)).query.timestamp.now(), ]); const blockHeader: BlockHeader = { @@ -448,7 +474,10 @@ export async function fetchLightBlock( events: events.map((evt, idx) => merge(evt, { idx, block: blockHeader })), }, getHeader: () => { - return substrateHeaderToHeader(blockHeader.block.header); + return { + ...substrateHeaderToHeader(blockHeader.block.header), + timestamp: new Date(timestamp.toNumber()), + }; }, }; }