diff --git a/packages/cli/src/controller/build-controller.ts b/packages/cli/src/controller/build-controller.ts index cb30f1cebd..f4003bfd5e 100644 --- a/packages/cli/src/controller/build-controller.ts +++ b/packages/cli/src/controller/build-controller.ts @@ -2,12 +2,12 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; -import { readFileSync } from 'fs'; +import {readFileSync} from 'fs'; import path from 'path'; -import { globSync } from 'glob'; +import {globSync} from 'glob'; import TerserPlugin from 'terser-webpack-plugin'; -import webpack, { Configuration } from 'webpack'; -import { merge } from 'webpack-merge'; +import webpack, {Configuration} from 'webpack'; +import {merge} from 'webpack-merge'; const getBaseConfig = ( buildEntries: Configuration['entry'], @@ -72,7 +72,7 @@ export async function runWebpack( ): Promise { const config = merge( getBaseConfig(buildEntries, projectDir, outputDir, isDev), - { output: { clean } } + {output: {clean}} // Can allow projects to override webpack config here ); @@ -121,7 +121,7 @@ export function getBuildEntries(directory: string): Record { acc[key] = path.resolve(directory, value); return acc; }, - { ...buildEntries } + {...buildEntries} ); } diff --git a/packages/node-core/src/blockchain.service.ts b/packages/node-core/src/blockchain.service.ts index 924cb49371..1b92fdd544 100644 --- a/packages/node-core/src/blockchain.service.ts +++ b/packages/node-core/src/blockchain.service.ts @@ -13,8 +13,8 @@ export interface ICoreBlockchainService< // Project service onProjectChange(project: SubQueryProject): Promise | void; - /* Not all networks have a block timestamp, e.g. Shiden */ - getBlockTimestamp(height: number): Promise; + /* Not all networks have a block timestamp, e.g. Shiden need to request one more get */ + getBlockTimestamp(height: number): Promise; } export interface IBlockchainService< diff --git a/packages/node-core/src/configure/SubqueryProject.ts b/packages/node-core/src/configure/SubqueryProject.ts index e328a6d66d..e23b53c05d 100644 --- a/packages/node-core/src/configure/SubqueryProject.ts +++ b/packages/node-core/src/configure/SubqueryProject.ts @@ -124,7 +124,7 @@ export class BaseSubqueryProject< return this.#dataSources; } - async applyCronTimestamps(getTimestamp: (height: number) => Promise): Promise { + async applyCronTimestamps(getTimestamp: (height: number) => Promise): Promise { this.#dataSources = await insertBlockFiltersCronSchedules( this.dataSources, getTimestamp, diff --git a/packages/node-core/src/db/db.module.ts b/packages/node-core/src/db/db.module.ts index 4c91a071fa..3fa0e3a997 100644 --- a/packages/node-core/src/db/db.module.ts +++ b/packages/node-core/src/db/db.module.ts @@ -3,6 +3,7 @@ import {DynamicModule, Global} from '@nestjs/common'; import {Sequelize, Options as SequelizeOption} from '@subql/x-sequelize'; +import {PoolConfig} from 'pg'; import {NodeConfig} from '../configure/NodeConfig'; import {getLogger} from '../logger'; import {exitWithError} from '../process'; diff --git a/packages/node-core/src/db/sync-helper.test.ts b/packages/node-core/src/db/sync-helper.test.ts index 1751cd7ca5..8ae257f30d 100644 --- a/packages/node-core/src/db/sync-helper.test.ts +++ b/packages/node-core/src/db/sync-helper.test.ts @@ -4,10 +4,19 @@ import {INestApplication} from '@nestjs/common'; import {Test} from '@nestjs/testing'; import {delay} from '@subql/common'; +import {hashName} from '@subql/utils'; import {Sequelize} from '@subql/x-sequelize'; import {NodeConfig} from '../configure/NodeConfig'; +import {MultiChainRewindEvent} from '../events'; +import {RewindLockKey} from '../indexer'; import {DbModule} from './db.module'; -import {createSendNotificationTriggerFunction, createNotifyTrigger, getDbSizeAndUpdateMetadata} from './sync-helper'; +import { + createSendNotificationTriggerFunction, + createNotifyTrigger, + getDbSizeAndUpdateMetadata, + createRewindTriggerFunction, + createRewindTrigger, +} from './sync-helper'; const nodeConfig = new NodeConfig({subquery: 'packages/node-core/test/v1.0.0', subqueryName: 'test'}); @@ -185,3 +194,84 @@ describe('sync helper test', () => { }, 10_000); }); }); + +describe('Multi-chain notification', () => { + let app: INestApplication; + let sequelize: Sequelize; + const schema = 'multi-chain-test'; + + let client: unknown; + + const listenerHash = hashName(schema, 'rewind_trigger', '_global'); + + afterAll(async () => { + await sequelize.dropSchema(schema, {}); + await sequelize.close(); + return app?.close(); + }); + + afterEach(async () => { + if (client) { + await (client as any).query(`UNLISTEN "${listenerHash}"`); + (client as any).removeAllListeners('notification'); + sequelize.connectionManager.releaseConnection(client); + } + }); + + it('can handle multiple rows in one transaction', async () => { + const module = await Test.createTestingModule({ + imports: [DbModule.forRootWithConfig(nodeConfig)], + }).compile(); + app = module.createNestApplication(); + await app.init(); + sequelize = app.get(Sequelize); + await sequelize.createSchema(schema, {}); + // mock create global table + await sequelize.query(` + CREATE TABLE IF NOT EXISTS "${schema}"._global ( + key VARCHAR(255) NOT NULL PRIMARY KEY, + value JSONB, + "createdAt" TIMESTAMP WITH TIME ZONE NOT NULL, + "updatedAt" TIMESTAMP WITH TIME ZONE NOT NULL + )`); + + await sequelize.query(createRewindTriggerFunction(schema)); + await sequelize.query(createRewindTrigger(schema)); + + client = await sequelize.connectionManager.getConnection({ + type: 'read', + }); + await (client as any).query(`LISTEN "${listenerHash}"`); + + const listener = jest.fn(); + (client as any).on('notification', (msg: any) => { + console.log('Payload:', msg.payload); + listener(msg.payload); + }); + + const rewindSqlFromTimestamp = ( + timestamp: number + ) => `INSERT INTO "${schema}"."_global" ( "key", "value", "createdAt", "updatedAt" ) + VALUES + ( 'rewindLock', '{"timestamp":${timestamp},"chainNum":1}', now(), now()) + ON CONFLICT ( "key" ) + DO UPDATE + SET "key" = EXCLUDED."key", + "value" = EXCLUDED."value", + "updatedAt" = EXCLUDED."updatedAt" + WHERE "_global"."key" = '${RewindLockKey}' AND ("_global"."value"->>'timestamp')::BIGINT > ${timestamp}`; + const rewindTimestamp = 1597669506000; + await sequelize.query(rewindSqlFromTimestamp(rewindTimestamp)); + await delay(1); + + await sequelize.query(rewindSqlFromTimestamp(rewindTimestamp - 1)); + await delay(1); + + await sequelize.query(`DELETE FROM "${schema}"."_global" WHERE "key" = '${RewindLockKey}'`); + await delay(1); + expect(listener).toHaveBeenCalledTimes(3); + expect(listener).toHaveBeenNthCalledWith(1, MultiChainRewindEvent.Rewind); + expect(listener).toHaveBeenNthCalledWith(2, MultiChainRewindEvent.RewindTimestampDecreased); + expect(listener).toHaveBeenNthCalledWith(3, MultiChainRewindEvent.RewindComplete); + }, 20_000); +}); diff --git a/packages/node-core/src/db/sync-helper.ts b/packages/node-core/src/db/sync-helper.ts index bc3b6ac744..d8e5fb8764 100644 --- a/packages/node-core/src/db/sync-helper.ts +++ b/packages/node-core/src/db/sync-helper.ts @@ -16,6 +16,8 @@ import { Utils, } from '@subql/x-sequelize'; import {ModelAttributeColumnReferencesOptions, ModelIndexesOptions} from '@subql/x-sequelize/types/model'; +import {MultiChainRewindEvent} from '../events'; +import {RewindLockKey} from '../indexer'; import {EnumType} from '../utils'; import {formatAttributes, generateIndexName, modelToTableName} from './sequelizeUtil'; // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -297,6 +299,57 @@ export function createSchemaTriggerFunction(schema: string): string { $$ LANGUAGE plpgsql;`; } +export function createRewindTrigger(schema: string): string { + const triggerName = hashName(schema, 'rewind_trigger', '_global'); + + return ` + CREATE TRIGGER "${triggerName}" + AFTER INSERT OR UPDATE OR DELETE + ON "${schema}"."_global" + FOR EACH ROW + EXECUTE FUNCTION "${schema}".rewind_notification(); + `; +} + +export function createRewindTriggerFunction(schema: string): string { + const triggerName = hashName(schema, 'rewind_trigger', '_global'); + + return ` + CREATE OR REPLACE FUNCTION "${schema}".rewind_notification() + RETURNS trigger AS $$ + DECLARE + key_value TEXT; + BEGIN + IF TG_OP = 'DELETE' THEN + key_value := OLD.value ->> 'key'; + ELSE + key_value := NEW.value ->> 'key'; + END IF; + + -- Make sure it’s RewindLockKey + IF key_value <> '${RewindLockKey}' THEN + RETURN NULL; + END IF; + + IF TG_OP = 'INSERT' THEN + PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.Rewind}'); + END IF; + + -- During a rollback, there is a chain that needs to be rolled back to an earlier height. + IF TG_OP = 'UPDATE' AND (NEW.value ->> 'timestamp')::BIGINT < (OLD.value ->> 'timestamp')::BIGINT THEN + PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.RewindTimestampDecreased}'); + END IF; + + IF TG_OP = 'DELETE' THEN + PERFORM pg_notify('${triggerName}', '${MultiChainRewindEvent.RewindComplete}'); + END IF; + + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + `; +} + export function getExistedIndexesQuery(schema: string): string { return `SELECT indexname FROM pg_indexes WHERE schemaname = '${schema}'`; } diff --git a/packages/node-core/src/events.ts b/packages/node-core/src/events.ts index 63900a0de4..ce02e4a87a 100644 --- a/packages/node-core/src/events.ts +++ b/packages/node-core/src/events.ts @@ -29,6 +29,12 @@ export enum PoiEvent { PoiTarget = 'poi_target', } +export enum MultiChainRewindEvent { + Rewind = 'rewind', + RewindComplete = 'rewind_complete', + RewindTimestampDecreased = 'timestamp_decreased', +} + export interface RewindPayload { success: boolean; height: number; @@ -61,3 +67,7 @@ export interface NetworkMetadataPayload { specName: string; genesisHash: string; } + +export interface MultiChainRewindPayload { + height: number; +} diff --git a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts index 2caab9353d..b1f4a1c480 100644 --- a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts @@ -4,6 +4,7 @@ import assert from 'assert'; import {EventEmitter2, OnEvent} from '@nestjs/event-emitter'; +import {ICoreBlockchainService} from '@subql/node-core/blockchain.service'; import {hexToU8a, u8aEq} from '@subql/utils'; import {Transaction} from '@subql/x-sequelize'; import {NodeConfig, IProjectUpgradeService} from '../../configure'; @@ -33,6 +34,7 @@ export interface IBlockDispatcher { latestBufferedHeight: number; batchSize: number; + setLatestProcessedHeight(height: number): void; // Remove all enqueued blocks, used when a dynamic ds is created flushQueue(height: number): void; } @@ -60,7 +62,8 @@ export abstract class BaseBlockDispatcher implements IB protected queue: Q, protected storeService: StoreService, private storeModelProvider: IStoreModelProvider, - private poiSyncService: PoiSyncService + private poiSyncService: PoiSyncService, + private blockChainService: ICoreBlockchainService ) {} abstract enqueueBlocks(heights: (IBlock | number)[], latestBufferHeight?: number): void | Promise; @@ -214,7 +217,7 @@ export abstract class BaseBlockDispatcher implements IB } @OnEvent(AdminEvent.rewindTarget) - handleAdminRewind(blockPayload: TargetBlockPayload): void { + async handleAdminRewind(blockPayload: TargetBlockPayload) { if (this.currentProcessingHeight < blockPayload.height) { // this will throw back to admin controller, will NOT lead current indexing exit throw new Error( @@ -223,8 +226,10 @@ export abstract class BaseBlockDispatcher implements IB } // TODO can this work without + const timestamp = await this.blockChainService.getBlockTimestamp(blockPayload.height); this._pendingRewindHeader = { blockHeight: Number(blockPayload.height), + timestamp, } as Header; const message = `Received admin command to rewind to block ${blockPayload.height}`; monitorWrite(`***** [ADMIN] ${message}`); diff --git a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts index 3f2eafbfc8..ee874bff01 100644 --- a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts @@ -1,22 +1,22 @@ // Copyright 2020-2025 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -import { OnApplicationShutdown } from '@nestjs/common'; -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { Interval } from '@nestjs/schedule'; -import { BaseDataSource } from '@subql/types-core'; -import { IBlockchainService } from '../../blockchain.service'; -import { NodeConfig } from '../../configure'; -import { IProjectUpgradeService } from '../../configure/ProjectUpgrade.service'; -import { IndexerEvent } from '../../events'; -import { getBlockHeight, IBlock, PoiSyncService, StoreService } from '../../indexer'; -import { getLogger } from '../../logger'; -import { exitWithError, monitorWrite } from '../../process'; -import { profilerWrap } from '../../profiler'; -import { Queue, AutoQueue, RampQueue, delay, isTaskFlushedError } from '../../utils'; -import { IStoreModelProvider } from '../storeModelProvider'; -import { IIndexerManager, IProjectService, ISubqueryProject } from '../types'; -import { BaseBlockDispatcher } from './base-block-dispatcher'; +import {OnApplicationShutdown} from '@nestjs/common'; +import {EventEmitter2} from '@nestjs/event-emitter'; +import {Interval} from '@nestjs/schedule'; +import {BaseDataSource} from '@subql/types-core'; +import {IBlockchainService} from '../../blockchain.service'; +import {NodeConfig} from '../../configure'; +import {IProjectUpgradeService} from '../../configure/ProjectUpgrade.service'; +import {IndexerEvent} from '../../events'; +import {getBlockHeight, IBlock, PoiSyncService, StoreService} from '../../indexer'; +import {getLogger} from '../../logger'; +import {exitWithError, monitorWrite} from '../../process'; +import {profilerWrap} from '../../profiler'; +import {Queue, AutoQueue, RampQueue, delay, isTaskFlushedError} from '../../utils'; +import {IStoreModelProvider} from '../storeModelProvider'; +import {IIndexerManager, IProjectService, ISubqueryProject} from '../types'; +import {BaseBlockDispatcher} from './base-block-dispatcher'; const logger = getLogger('BlockDispatcherService'); @@ -27,7 +27,8 @@ type BatchBlockFetcher = (heights: number[]) => Promise[]>; */ export class BlockDispatcher extends BaseBlockDispatcher | number>, DS, B> - implements OnApplicationShutdown { + implements OnApplicationShutdown +{ private fetchQueue: AutoQueue>; private processQueue: AutoQueue; @@ -57,7 +58,8 @@ export class BlockDispatcher new Queue(nodeConfig.batchSize * 3), storeService, storeModelProvider, - poiSyncService + poiSyncService, + blockchainService ); this.processQueue = new AutoQueue(nodeConfig.batchSize * 3, 1, nodeConfig.timeout, 'Process'); this.fetchQueue = new RampQueue( @@ -177,7 +179,8 @@ export class BlockDispatcher } logger.error( e, - `Failed to index block at height ${header.blockHeight} ${e.handler ? `${e.handler}(${e.stack ?? ''})` : '' + `Failed to index block at height ${header.blockHeight} ${ + e.handler ? `${e.handler}(${e.stack ?? ''})` : '' }` ); throw e; @@ -198,7 +201,7 @@ export class BlockDispatcher // Do nothing, fetching the block was flushed, this could be caused by forked blocks or dynamic datasources return; } - exitWithError(new Error(`Failed to enqueue fetched block to process`, { cause: e }), logger); + exitWithError(new Error(`Failed to enqueue fetched block to process`, {cause: e}), logger); }); this.eventEmitter.emit(IndexerEvent.BlockQueueSize, { @@ -207,7 +210,7 @@ export class BlockDispatcher } } catch (e: any) { if (!this.isShutdown) { - exitWithError(new Error(`Failed to process blocks from queue`, { cause: e }), logger); + exitWithError(new Error(`Failed to process blocks from queue`, {cause: e}), logger); } } finally { this.fetching = false; diff --git a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts index 478467a629..0ab24c2786 100644 --- a/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/worker-block-dispatcher.ts @@ -89,7 +89,8 @@ export class WorkerBlockDispatcher< initAutoQueue(nodeConfig.workers, nodeConfig.batchSize, nodeConfig.timeout, 'Worker'), storeService, storeModelProvider, - poiSyncService + poiSyncService, + blockchainService ); this.createWorker = () => diff --git a/packages/node-core/src/indexer/entities/GlobalData.entity.ts b/packages/node-core/src/indexer/entities/GlobalData.entity.ts new file mode 100644 index 0000000000..6f8e6cd820 --- /dev/null +++ b/packages/node-core/src/indexer/entities/GlobalData.entity.ts @@ -0,0 +1,57 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {blake2AsHex} from '@subql/utils'; +import {BuildOptions, DataTypes, Model, Sequelize} from '@subql/x-sequelize'; + +export const RewindTimestampKeyPrefix = 'rewindTimestamp'; +export const RewindLockKey = 'rewindLock'; + +/** + * @string chainId + */ +export type RewindTimestampKey = `${typeof RewindTimestampKeyPrefix}_${string}`; + +export type RewindLockInfo = { + /** Timestamp to rewind to. */ + timestamp: number; + /** Remaining number of chains to be rolled back */ + chainNum: number; +}; +export interface GlobalDataKeys { + rewindLock: RewindLockInfo; + [key: RewindTimestampKey]: number; +} + +export interface GlobalData { + key: k; + value: GlobalDataKeys[k]; +} + +interface GlobalDataEntity extends Model, GlobalData {} + +export type GlobalDataRepo = typeof Model & { + new (values?: unknown, options?: BuildOptions): GlobalDataEntity; +}; + +export function GlobalDataFactory(sequelize: Sequelize, schema: string): GlobalDataRepo { + const tableName = '_global'; + + return sequelize.define( + tableName, + { + key: { + type: DataTypes.STRING, + primaryKey: true, + }, + value: { + type: DataTypes.JSONB, + }, + }, + {freezeTableName: true, schema: schema} + ); +} + +export function generateRewindTimestampKey(chainId: string): RewindTimestampKey { + return `${RewindTimestampKeyPrefix}_${blake2AsHex(chainId)}` as RewindTimestampKey; +} diff --git a/packages/node-core/src/indexer/entities/index.ts b/packages/node-core/src/indexer/entities/index.ts index cf880d9f41..ef1988522a 100644 --- a/packages/node-core/src/indexer/entities/index.ts +++ b/packages/node-core/src/indexer/entities/index.ts @@ -3,3 +3,4 @@ export * from './Poi.entity'; export * from './Metadata.entity'; +export * from './GlobalData.entity'; diff --git a/packages/node-core/src/indexer/fetch.service.spec.ts b/packages/node-core/src/indexer/fetch.service.spec.ts index 247333d6fd..c7b8fe17b4 100644 --- a/packages/node-core/src/indexer/fetch.service.spec.ts +++ b/packages/node-core/src/indexer/fetch.service.spec.ts @@ -1,9 +1,9 @@ // Copyright 2020-2025 SubQuery Pte Ltd authors & contributors // SPDX-License-Identifier: GPL-3.0 -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { SchedulerRegistry } from '@nestjs/schedule'; -import { BaseCustomDataSource, BaseDataSource, BaseHandler, BaseMapping, DictionaryQueryEntry } from '@subql/types-core'; +import {EventEmitter2} from '@nestjs/event-emitter'; +import {SchedulerRegistry} from '@nestjs/schedule'; +import {BaseCustomDataSource, BaseDataSource, BaseHandler, BaseMapping, DictionaryQueryEntry} from '@subql/types-core'; import { UnfinalizedBlocksService, BlockDispatcher, @@ -19,9 +19,9 @@ import { IBaseIndexerWorker, BypassBlocks, } from '../'; -import { BlockHeightMap } from '../utils/blockHeightMap'; -import { DictionaryService } from './dictionary/dictionary.service'; -import { FetchService } from './fetch.service'; +import {BlockHeightMap} from '../utils/blockHeightMap'; +import {DictionaryService} from './dictionary/dictionary.service'; +import {FetchService} from './fetch.service'; const CHAIN_INTERVAL = 100; // 100ms @@ -73,7 +73,7 @@ class TestBlockchainService implements IBlockchainService { fetchBlockWorker( worker: IBaseIndexerWorker, blockNum: number, - context: { workers: IBaseIndexerWorker[] } + context: {workers: IBaseIndexerWorker[]} ): Promise
{ throw new Error('Method not implemented.'); } @@ -82,7 +82,7 @@ class TestBlockchainService implements IBlockchainService { blockHeight: this.finalizedHeight, blockHash: '0xxx', parentHash: '0xxx', - timestamp: new Date() + timestamp: new Date(), }); } async getBestHeight(): Promise { @@ -124,9 +124,13 @@ class TestBlockchainService implements IBlockchainService { throw new Error('Method not implemented.'); } // eslint-disable-next-line @typescript-eslint/promise-function-async - getBlockTimestamp(height: number): Promise { + getBlockTimestamp(height: number): Promise { throw new Error('Method not implemented.'); } + + async getRequiredHeaderForHeight(height: number): Promise
{ + return (await this.getHeaderForHeight(height)) as any; + } } const nodeConfig = new NodeConfig({ @@ -161,7 +165,7 @@ function mockModuloDs(startBlock: number, endBlock: number, modulo: number): Bas { kind: 'mock/Handler', handler: 'mockFunction', - filter: { modulo: modulo }, + filter: {modulo: modulo}, }, ], }, @@ -268,7 +272,8 @@ describe('Fetch Service', () => { set: jest.fn(), }, } as any, - blockchainService + blockchainService, + {} as any ); spyOnEnqueueSequential = jest.spyOn(fetchService as any, 'enqueueSequential') as any; @@ -302,20 +307,20 @@ describe('Fetch Service', () => { const moduloBlockHeightMap = new BlockHeightMap( new Map([ - [1, [{ ...mockModuloDs(1, 100, 20), startBlock: 1, endBlock: 100 }]], + [1, [{...mockModuloDs(1, 100, 20), startBlock: 1, endBlock: 100}]], [ 101, // empty gap for discontinuous block [], ], - [201, [{ ...mockModuloDs(201, 500, 30), startBlock: 201, endBlock: 500 }]], + [201, [{...mockModuloDs(201, 500, 30), startBlock: 201, endBlock: 500}]], // to infinite - [500, [{ ...mockModuloDs(500, Number.MAX_SAFE_INTEGER, 99), startBlock: 500 }]], + [500, [{...mockModuloDs(500, Number.MAX_SAFE_INTEGER, 99), startBlock: 500}]], // multiple ds [ 600, [ - { ...mockModuloDs(500, 800, 99), startBlock: 600, endBlock: 800 }, - { ...mockModuloDs(700, Number.MAX_SAFE_INTEGER, 101), startBlock: 700 }, + {...mockModuloDs(500, 800, 99), startBlock: 600, endBlock: 800}, + {...mockModuloDs(700, Number.MAX_SAFE_INTEGER, 101), startBlock: 700}, ], ], ]) @@ -333,43 +338,43 @@ describe('Fetch Service', () => { [ 1, [ - { ...mockDs, startBlock: 1, endBlock: 300 }, - { ...mockDs, startBlock: 1, endBlock: 100 }, + {...mockDs, startBlock: 1, endBlock: 300}, + {...mockDs, startBlock: 1, endBlock: 100}, ], ], [ 10, [ - { ...mockDs, startBlock: 1, endBlock: 300 }, - { ...mockDs, startBlock: 1, endBlock: 100 }, - { ...mockDs, startBlock: 10, endBlock: 20 }, + {...mockDs, startBlock: 1, endBlock: 300}, + {...mockDs, startBlock: 1, endBlock: 100}, + {...mockDs, startBlock: 10, endBlock: 20}, ], ], [ 21, [ - { ...mockDs, startBlock: 1, endBlock: 300 }, - { ...mockDs, startBlock: 1, endBlock: 100 }, + {...mockDs, startBlock: 1, endBlock: 300}, + {...mockDs, startBlock: 1, endBlock: 100}, ], ], [ 50, [ - { ...mockDs, startBlock: 1, endBlock: 300 }, - { ...mockDs, startBlock: 1, endBlock: 100 }, - { ...mockDs, startBlock: 50, endBlock: 200 }, + {...mockDs, startBlock: 1, endBlock: 300}, + {...mockDs, startBlock: 1, endBlock: 100}, + {...mockDs, startBlock: 50, endBlock: 200}, ], ], [ 101, [ - { ...mockDs, startBlock: 1, endBlock: 300 }, - { ...mockDs, startBlock: 50, endBlock: 200 }, + {...mockDs, startBlock: 1, endBlock: 300}, + {...mockDs, startBlock: 50, endBlock: 200}, ], ], - [201, [{ ...mockDs, startBlock: 1, endBlock: 300 }]], + [201, [{...mockDs, startBlock: 1, endBlock: 300}]], [301, []], - [500, [{ ...mockDs, startBlock: 500 }]], + [500, [{...mockDs, startBlock: 500}]], ]) ) ); @@ -505,7 +510,7 @@ describe('Fetch Service', () => { { kind: 'mock/BlockHandler', handler: 'mockFunction', - filter: { modulo: 3 }, + filter: {modulo: 3}, }, { kind: 'mock/CallHandler', @@ -638,7 +643,7 @@ describe('Fetch Service', () => { it('enqueues modulo blocks with furture dataSources', async () => { fetchService.mockGetModulos([3]); - dataSources.push({ ...mockDs, startBlock: 20 }); + dataSources.push({...mockDs, startBlock: 20}); await fetchService.init(1); @@ -651,7 +656,7 @@ describe('Fetch Service', () => { it('at the end of modulo block filter, enqueue END should be min of data source range end height and api last height', async () => { // So this will skip next data source fetchService.mockGetModulos([10]); - dataSources.push({ ...mockDs, startBlock: 200 }); + dataSources.push({...mockDs, startBlock: 200}); await fetchService.init(191); expect((fetchService as any).useDictionary).toBeFalsy(); diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index 62a25c3da3..66910f7e94 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -3,28 +3,29 @@ import assert from 'assert'; import {Inject, Injectable, OnApplicationShutdown} from '@nestjs/common'; -import {EventEmitter2} from '@nestjs/event-emitter'; +import {EventEmitter2, OnEvent} from '@nestjs/event-emitter'; import {SchedulerRegistry} from '@nestjs/schedule'; import {BaseDataSource} from '@subql/types-core'; import {range} from 'lodash'; import {IBlockchainService} from '../blockchain.service'; import {NodeConfig} from '../configure'; -import {IndexerEvent} from '../events'; +import {EventPayload, IndexerEvent, MultiChainRewindEvent, MultiChainRewindPayload} from '../events'; import {getLogger} from '../logger'; import {delay, filterBypassBlocks, getModulos} from '../utils'; import {IBlockDispatcher} from './blockDispatcher'; import {mergeNumAndBlocksToNums} from './dictionary'; import {DictionaryService} from './dictionary/dictionary.service'; import {mergeNumAndBlocks} from './dictionary/utils'; +import {IMultiChainHandler, MultiChainRewindService, RewindStatus} from './multiChainRewind.service'; import {IStoreModelProvider} from './storeModelProvider'; import {BypassBlocks, IBlock, IProjectService} from './types'; import {IUnfinalizedBlocksServiceUtil} from './unfinalizedBlocks.service'; const logger = getLogger('FetchService'); - +const multiChainRewindDelay = 3; @Injectable() export class FetchService, FB> - implements OnApplicationShutdown + implements OnApplicationShutdown, IMultiChainHandler { private _latestBestHeight?: number; private _latestFinalizedHeight?: number; @@ -39,7 +40,8 @@ export class FetchService + @Inject('IBlockchainService') private blockchainSevice: IBlockchainService, + private multiChainRewindService: MultiChainRewindService ) {} private get latestBestHeight(): number { @@ -197,6 +199,23 @@ export class FetchService (1740100000 + height) * 1000; +const genBlockDate = (height: number) => new Date(genBlockTimestamp(height)); + +const testSchemaName = 'test_multi_chain_rewind'; +const schema = buildSchemaFromString(` + type Account @entity { + id: ID! # Account address + balance: Int + } +`); + +describe('Check whether the db store and cache store are consistent.', () => { + let sequelize: Sequelize; + let storeService: StoreService; + let multiChainRewindService: MultiChainRewindService; + + // Mock IBlockchainService + const mockBlockchainService = { + getHeaderForHeight: jest.fn((height: number) => ({ + blockHeight: height, + timestamp: genBlockDate(height), + blockHash: `hash${height}`, + parentHash: height > 0 ? `hash${height - 1}` : '', + })), + }; + + beforeAll(async () => { + sequelize = new Sequelize( + `postgresql://${option.username}:${option.password}@${option.host}:${option.port}/${option.database}`, + option + ); + await sequelize.authenticate(); + + await sequelize.query(`CREATE SCHEMA ${testSchemaName};`); + const nodeConfig = new NodeConfig({ + subquery: 'test', + dbSchema: testSchemaName, + proofOfIndex: true, + enableCache: false, + multiChain: true, + }); + const project = {network: {chainId: '1'}, schema} as any; + const dbModel = new PlainStoreModelService(sequelize, nodeConfig); + storeService = new StoreService(sequelize, nodeConfig, dbModel, project); + await storeService.initCoreTables(testSchemaName); + await storeService.init(testSchemaName); + await storeService.modelProvider.metadata.set('startHeight', 1); + await storeService.modelProvider.metadata.set('lastProcessedHeight', 10000); + await storeService.modelProvider.metadata.set('lastProcessedBlockTimestamp', genBlockTimestamp(10000)); + + const chainId = '1'; + const eventEmitter = new EventEmitter2(); + multiChainRewindService = new MultiChainRewindService( + nodeConfig, + eventEmitter, + sequelize, + storeService, + mockBlockchainService as any + ); + + const reindex = jest.fn(); + + // Initialize the service + await multiChainRewindService.init(chainId, reindex); + }); + + afterEach(async () => { + await sequelize.query(`DROP SCHEMA ${testSchemaName} CASCADE;`); + multiChainRewindService.onApplicationShutdown(); + await sequelize.close(); + }); + + it('should handle rewind correctly', async () => { + // Act: Set global rewind lock to rewind to block 5 + const rewindTimestamp = genBlockTimestamp(5); + await multiChainRewindService.setGlobalRewindLock(rewindTimestamp); + + // Wait briefly for the PostgreSQL listener to process the notification + await delay(1); + + // Assert: Check that the service is in Rewinding state and has the correct waitRewindHeader + expect(multiChainRewindService.status).toBe(RewindStatus.Rewinding); + expect(multiChainRewindService.waitRewindHeader).toEqual({ + blockHeight: 5, + timestamp: new Date(rewindTimestamp), + blockHash: 'hash5', + parentHash: 'hash4', + }); + + // Release the chain rewind lock + const tx = await sequelize.transaction(); + const remaining = await multiChainRewindService.releaseChainRewindLock(tx, rewindTimestamp); + await tx.commit(); + + // Wait for the RewindComplete notification to be processed + await delay(1); + + // Assert: Check that the rewind is complete and status is back to Normal + expect(remaining).toBe(0); // No remaining chains to rewind + expect(multiChainRewindService.status).toBe(RewindStatus.Normal); + expect(multiChainRewindService.waitRewindHeader).toBeUndefined(); + }); +}); diff --git a/packages/node-core/src/indexer/multiChainRewind.service.ts b/packages/node-core/src/indexer/multiChainRewind.service.ts new file mode 100644 index 0000000000..0a9d9e020a --- /dev/null +++ b/packages/node-core/src/indexer/multiChainRewind.service.ts @@ -0,0 +1,251 @@ +// Copyright 2020-2025 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import assert from 'assert'; +import {Inject, Injectable, OnApplicationShutdown} from '@nestjs/common'; +import {EventEmitter2} from '@nestjs/event-emitter'; +import {hashName} from '@subql/utils'; +import {Transaction, Sequelize} from '@subql/x-sequelize'; +import {Connection} from '@subql/x-sequelize/types/dialects/abstract/connection-manager'; +import dayjs from 'dayjs'; +import {uniqueId} from 'lodash'; +import {PoolClient} from 'pg'; +import {IBlockchainService} from '../blockchain.service'; +import {NodeConfig} from '../configure'; +import {createRewindTrigger, createRewindTriggerFunction, getTriggers} from '../db'; +import {MultiChainRewindEvent, MultiChainRewindPayload} from '../events'; +import {getLogger} from '../logger'; +import {StoreService} from './store.service'; +import {PlainGlobalModel} from './storeModelProvider/global/global'; +import {Header} from './types'; + +const logger = getLogger('MultiChainRewindService'); + +export enum RewindStatus { + /** The current chain is in normal state. */ + Normal = 'normal', + /** The current chain is waiting for other chains to rewind. */ + WaitOtherChain = 'waitOtherChain', + /** The current chain is executing rewind. */ + Rewinding = 'rewinding', +} +export interface IMultiChainRewindService { + chainId: string; + status: RewindStatus; + waitRewindHeader?: Header; + setGlobalRewindLock(rewindTimestamp: number): Promise; + /** + * Check if the height is consistent before unlocking. + * @param tx + * @param rewindTimestamp The timestamp to roll back to, in milliseconds. + * @returns the number of remaining rewind chains + */ + releaseChainRewindLock(tx: Transaction, rewindTimestamp: number): Promise; +} + +export interface IMultiChainHandler { + processMultiChainRewind(rewindBlockPayload: MultiChainRewindPayload): void; +} + +/** + * Working principle: + * multiChainRewindService is primarily responsible for coordinating multi-chain projects. + * When global.rewindLock changes, a PG trigger sends a notification, and all subscribed chain projects will receive the rollback notification. + * This triggers a rollback process, where the fetch service handles the message by clearing the queue. + * During the next fillNextBlockBuffer loop, if it detects the rewinding state, it will execute the rollback. + */ +@Injectable() +export class MultiChainRewindService implements IMultiChainRewindService, OnApplicationShutdown { + private _status: RewindStatus = RewindStatus.Normal; + private _chainId?: string; + private dbSchema: string; + private rewindTriggerName: string; + private pgListener?: PoolClient; + private _globalModel?: PlainGlobalModel = undefined; + waitRewindHeader?: Header; + constructor( + private nodeConfig: NodeConfig, + private eventEmitter: EventEmitter2, + private sequelize: Sequelize, + private storeService: StoreService, + @Inject('IBlockchainService') private readonly blockchainService: IBlockchainService + ) { + this.dbSchema = this.nodeConfig.dbSchema; + this.rewindTriggerName = hashName(this.dbSchema, 'rewind_trigger', '_global'); + } + + private set chainId(chainId: string) { + this._chainId = chainId; + } + + get chainId(): string { + assert(this._chainId, 'chainId is not set'); + return this._chainId; + } + + private set status(status: RewindStatus) { + this._status = status; + } + + get status() { + assert(this._status, 'status is not set'); + return this._status; + } + + get globalModel() { + if (!this._globalModel) { + this._globalModel = new PlainGlobalModel(this.dbSchema, this.chainId, this.storeService.globalDataRepo); + } + return this._globalModel; + } + + onApplicationShutdown() { + this.sequelize.connectionManager.releaseConnection(this.pgListener as Connection); + } + + async init(chainId: string, reindex: (targetHeader: Header) => Promise) { + this.chainId = chainId; + + if (this.nodeConfig.multiChain) { + await this.sequelize.query(`${createRewindTriggerFunction(this.dbSchema)}`); + + const rewindTriggers = await getTriggers(this.sequelize, this.rewindTriggerName); + if (rewindTriggers.length === 0) { + await this.sequelize.query(`${createRewindTrigger(this.dbSchema)}`); + } + + // Register a listener and create a schema notification sending function. + await this.registerPgListener(); + + if (this.waitRewindHeader) { + const rewindHeader = {...this.waitRewindHeader}; + await reindex(rewindHeader); + return rewindHeader; + } + } + } + + private async registerPgListener() { + if (this.pgListener) return; + + // Creating a new pgClient is to avoid using the same database connection as the block scheduler, + // which may prevent real-time listening to rollback events. + this.pgListener = (await this.sequelize.connectionManager.getConnection({ + type: 'read', + })) as PoolClient; + + let processingPromise = Promise.resolve(); + this.pgListener.on('notification', (msg) => { + processingPromise = processingPromise.then(async () => { + const eventType = msg.payload; + const sessionUuid = uniqueId(); + logger.info(`[${sessionUuid}]Received rewind event: ${eventType}, chainId: ${this.chainId}`); + switch (eventType) { + case MultiChainRewindEvent.Rewind: + case MultiChainRewindEvent.RewindTimestampDecreased: { + const {rewindTimestamp} = await this.globalModel.getGlobalRewindStatus(); + this.waitRewindHeader = await this.searchWaitRewindHeader(rewindTimestamp); + this.status = RewindStatus.Rewinding; + + // Trigger the rewind event, and let the fetchService listen to the message and handle the queueFlush. + this.eventEmitter.emit(eventType, { + height: this.waitRewindHeader.blockHeight, + } satisfies MultiChainRewindPayload); + break; + } + case MultiChainRewindEvent.RewindComplete: + // recover indexing status + this.waitRewindHeader = undefined; + this.status = RewindStatus.Normal; + break; + default: + throw new Error(`Unknown rewind event: ${eventType}`); + } + logger.info(`[${sessionUuid}]Handle success rewind event: ${eventType}, chainId: ${this.chainId}`); + }); + }); + + await this.pgListener.query(`LISTEN "${this.rewindTriggerName}"`); + logger.info(`Register rewind listener success, chainId: ${this.chainId}`); + + // Check whether the current state is in rollback. + // If a global lock situation occurs, prioritize setting it to the WaitOtherChain state. If a rollback is still required, then set it to the rewinding state. + const {rewindLock, rewindTimestamp} = await this.globalModel.getGlobalRewindStatus(); + if (rewindLock) { + this.status = RewindStatus.WaitOtherChain; + } + if (rewindTimestamp) { + this.status = RewindStatus.Rewinding; + this.waitRewindHeader = await this.searchWaitRewindHeader(rewindTimestamp); + } + } + + private async searchWaitRewindHeader(rewindTimestamp: number): Promise
{ + const rewindDate = dayjs(rewindTimestamp).toDate(); + const rewindBlockHeader = await this.getHeaderByBinarySearch(rewindDate); + // The blockHeader.timestamp obtained from the query cannot be used directly, as it will cause an infinite loop. + // Different chains have timestamp discrepancies, which will result in infinite backward tracing. + return {...rewindBlockHeader, timestamp: rewindDate}; + } + + /** + * If the set rewindTimestamp is greater than or equal to the current blockHeight, we do nothing because we will roll back to an earlier time. + * If the set rewindTimestamp is less than the current blockHeight, we should roll back to the earlier rewindTimestamp. + * @param rewindTimestamp rewindTimestamp in milliseconds + */ + async setGlobalRewindLock(rewindTimestamp: number) { + const {needRewind} = await this.globalModel.setGlobalRewindLock(rewindTimestamp); + if (needRewind) { + logger.info(`setGlobalRewindLock success chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}`); + this.status = RewindStatus.Rewinding; + } + } + + async releaseChainRewindLock(tx: Transaction, rewindTimestamp: number): Promise { + const chainNum = await this.globalModel.releaseChainRewindLock(tx, rewindTimestamp); + // The current chain has completed the rewind, and we still need to wait for other chains to finish. + // When fully synchronized, set the status back to normal by pgListener. + this.status = RewindStatus.WaitOtherChain; + logger.info(`Rewind success chainId: ${JSON.stringify({chainNum, chainId: this.chainId, rewindTimestamp})}`); + return chainNum; + } + + /** + * Get the block header closest to the given timestamp + * @param timestamp To find the block closest to a given timestamp + * @returns + */ + private async getHeaderByBinarySearch(timestamp: Header['timestamp']): Promise> { + const startHeight = await this.storeService.modelProvider.metadata.find('startHeight'); + assert(startHeight !== undefined, 'startHeight is not set'); + + let left = startHeight; + let {height: right} = await this.storeService.getLastProcessedBlock(); + let searchNum = 0; + while (left < right) { + searchNum++; + const mid = Math.floor((left + right) / 2); + const header = await this.blockchainService.getHeaderForHeight(mid); + + if (header.timestamp === timestamp) { + return header; + } else if (header.timestamp < timestamp) { + left = mid + 1; + } else { + right = mid; + } + } + + const targetHeader = left + ? await this.blockchainService.getHeaderForHeight(left) + : { + blockHash: '', + blockHeight: 0, + parentHash: '', + timestamp, + }; + logger.info(`Binary search times: ${searchNum}, target Header: ${JSON.stringify(targetHeader)}`); + + return targetHeader; + } +} diff --git a/packages/node-core/src/indexer/project.service.spec.ts b/packages/node-core/src/indexer/project.service.spec.ts index 17212b79d7..595839da2c 100644 --- a/packages/node-core/src/indexer/project.service.spec.ts +++ b/packages/node-core/src/indexer/project.service.spec.ts @@ -54,8 +54,8 @@ class TestBlockchainService implements IBlockchainService { // throw new Error('Method onProjectChange not implemented.'); } // eslint-disable-next-line @typescript-eslint/promise-function-async - getBlockTimestamp(height: number): Promise { - return Promise.resolve(undefined); + getBlockTimestamp(height: number): Promise { + return Promise.resolve(new Date()); } getBlockSize(block: IBlock): number { return 0; @@ -112,6 +112,15 @@ class TestBlockchainService implements IBlockchainService { timestamp: new Date(), }; } + // eslint-disable-next-line @typescript-eslint/require-await + async getRequiredHeaderForHeight(height: number): Promise
{ + return { + blockHeight: height, + blockHash: `b${height}`, + parentHash: `b${height - 1}`, + timestamp: new Date(), + }; + } } describe('BaseProjectService', () => { @@ -131,7 +140,8 @@ describe('BaseProjectService', () => { {getDynamicDatasources: jest.fn()} as unknown as DynamicDsService, null as unknown as any, null as unknown as any, - new TestBlockchainService() + new TestBlockchainService(), + null as unknown as any ); }); @@ -424,7 +434,8 @@ describe('BaseProjectService', () => { } as unknown as DynamicDsService, // dynamicDsService new EventEmitter2(), // eventEmitter new UnfinalizedBlocksService(nodeConfig, storeService.modelProvider, blockchainService), // unfinalizedBlocksService - blockchainService + blockchainService, + {init: jest.fn()} as any // MultiChainRewindService ); }; diff --git a/packages/node-core/src/indexer/project.service.ts b/packages/node-core/src/indexer/project.service.ts index c8736864ee..3d654d31bc 100644 --- a/packages/node-core/src/indexer/project.service.ts +++ b/packages/node-core/src/indexer/project.service.ts @@ -2,28 +2,29 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; -import { isMainThread } from 'worker_threads'; -import { Inject } from '@nestjs/common'; -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { BaseDataSource, IProjectNetworkConfig } from '@subql/types-core'; -import { Sequelize } from '@subql/x-sequelize'; -import { IApi } from '../api.service'; -import { ICoreBlockchainService } from '../blockchain.service'; -import { IProjectUpgradeService, NodeConfig } from '../configure'; -import { IndexerEvent } from '../events'; -import { getLogger } from '../logger'; -import { exitWithError, monitorWrite } from '../process'; -import { getExistingProjectSchema, getStartHeight, hasValue, initDbSchema, mainThreadOnly, reindex } from '../utils'; -import { BlockHeightMap } from '../utils/blockHeightMap'; -import { DsProcessorService } from './ds-processor.service'; -import { DynamicDsService } from './dynamic-ds.service'; -import { MetadataKeys } from './entities'; -import { PoiSyncService } from './poi'; -import { PoiService } from './poi/poi.service'; -import { StoreService } from './store.service'; -import { cacheProviderFlushData } from './storeModelProvider'; -import { ISubqueryProject, IProjectService, BypassBlocks, HistoricalMode, Header } from './types'; -import { IUnfinalizedBlocksService } from './unfinalizedBlocks.service'; +import {isMainThread} from 'worker_threads'; +import {Inject} from '@nestjs/common'; +import {EventEmitter2} from '@nestjs/event-emitter'; +import {BaseDataSource, IProjectNetworkConfig} from '@subql/types-core'; +import {Sequelize} from '@subql/x-sequelize'; +import {IApi} from '../api.service'; +import {ICoreBlockchainService} from '../blockchain.service'; +import {IProjectUpgradeService, NodeConfig} from '../configure'; +import {IndexerEvent} from '../events'; +import {getLogger} from '../logger'; +import {exitWithError, monitorWrite} from '../process'; +import {getExistingProjectSchema, getStartHeight, hasValue, initDbSchema, mainThreadOnly, reindex} from '../utils'; +import {BlockHeightMap} from '../utils/blockHeightMap'; +import {DsProcessorService} from './ds-processor.service'; +import {DynamicDsService} from './dynamic-ds.service'; +import {MetadataKeys} from './entities'; +import {MultiChainRewindService} from './multiChainRewind.service'; +import {PoiSyncService} from './poi'; +import {PoiService} from './poi/poi.service'; +import {StoreService} from './store.service'; +import {cacheProviderFlushData} from './storeModelProvider'; +import {ISubqueryProject, IProjectService, BypassBlocks, HistoricalMode, Header} from './types'; +import {IUnfinalizedBlocksService} from './unfinalizedBlocks.service'; const logger = getLogger('Project'); @@ -36,8 +37,9 @@ class NotInitError extends Error { export class ProjectService< DS extends BaseDataSource = BaseDataSource, API extends IApi = IApi, - UnfinalizedBlocksService extends IUnfinalizedBlocksService = IUnfinalizedBlocksService -> implements IProjectService { + UnfinalizedBlocksService extends IUnfinalizedBlocksService = IUnfinalizedBlocksService, +> implements IProjectService +{ private _schema?: string; private _startHeight?: number; private _blockOffset?: number; @@ -55,7 +57,8 @@ export class ProjectService< private readonly dynamicDsService: DynamicDsService, private eventEmitter: EventEmitter2, @Inject('IUnfinalizedBlocksService') private readonly unfinalizedBlockService: UnfinalizedBlocksService, - @Inject('IBlockchainService') private blockchainService: ICoreBlockchainService + @Inject('IBlockchainService') private blockchainService: ICoreBlockchainService, + private multiChainRewindService: MultiChainRewindService ) { if (this.nodeConfig.unfinalizedBlocks && this.nodeConfig.allowSchemaMigration) { throw new Error('Unfinalized Blocks and Schema Migration cannot be enabled at the same time'); @@ -106,6 +109,7 @@ export class ProjectService< // Init metadata before rest of schema so we can determine the correct project version to create the schema await this.storeService.initCoreTables(this._schema); + await this.ensureMetadata(); // DynamicDsService is dependent on metadata so we need to ensure it exists first await this.dynamicDsService.init(this.storeService.modelProvider.metadata); @@ -138,6 +142,8 @@ export class ProjectService< // Unfinalized is dependent on POI in some cases, it needs to be init after POI is init const reindexedUnfinalized = await this.initUnfinalizedInternal(); + const reindexMultiChain = await this.initMultiChainRewindService(); + if (reindexedUnfinalized !== undefined) { this._startHeight = reindexedUnfinalized.blockHeight; } @@ -146,6 +152,10 @@ export class ProjectService< this._startHeight = reindexedUpgrade; } + if (reindexMultiChain !== undefined) { + this._startHeight = reindexMultiChain.blockHeight; + } + // Flush any pending operations to set up DB await cacheProviderFlushData(this.storeService.modelProvider, true); } else { @@ -216,16 +226,16 @@ export class ProjectService< const existing = await metadata.findMany(keys); - const { chain, genesisHash, specName } = this.apiService.networkMeta; + const {chain, genesisHash, specName} = this.apiService.networkMeta; if (this.project.runner) { - const { node, query } = this.project.runner; + const {node, query} = this.project.runner; await metadata.setBulk([ - { key: 'runnerNode', value: node.name }, - { key: 'runnerNodeVersion', value: node.version }, - { key: 'runnerQuery', value: query.name }, - { key: 'runnerQueryVersion', value: query.version }, + {key: 'runnerNode', value: node.name}, + {key: 'runnerNodeVersion', value: node.version}, + {key: 'runnerQuery', value: query.name}, + {key: 'runnerQueryVersion', value: query.version}, ]); } if (!existing.genesisHash) { @@ -337,7 +347,7 @@ export class ProjectService< const nextProject = projects[i + 1][1]; nextMinStartHeight = Math.max( nextProject.dataSources - .filter((ds): ds is DS & { startBlock: number } => !!ds.startBlock) + .filter((ds): ds is DS & {startBlock: number} => !!ds.startBlock) .sort((a, b) => a.startBlock - b.startBlock)[0].startBlock, projects[i + 1][0] ); @@ -352,12 +362,12 @@ export class ProjectService< }[] = []; [...project.dataSources, ...dynamicDs] - .filter((ds): ds is DS & { startBlock: number } => { + .filter((ds): ds is DS & {startBlock: number} => { return !!ds.startBlock && (!nextMinStartHeight || nextMinStartHeight > ds.startBlock); }) .forEach((ds) => { - events.push({ block: Math.max(height, ds.startBlock), start: true, ds }); - if (ds.endBlock) events.push({ block: ds.endBlock + 1, start: false, ds }); + events.push({block: Math.max(height, ds.startBlock), start: true, ds}); + if (ds.endBlock) events.push({block: ds.endBlock + 1, start: false, ds}); }); // sort events by block in ascending order, start events come before end events @@ -439,6 +449,9 @@ export class ProjectService< } return undefined; } + private async initMultiChainRewindService(): Promise
{ + return this.multiChainRewindService.init(this.project.network.chainId, this.reindex.bind(this)); + } private async handleProjectChange(): Promise { if (isMainThread && !this.nodeConfig.allowSchemaMigration) { @@ -464,12 +477,14 @@ export class ProjectService< return reindex( this.getStartBlockFromDataSources(), targetBlockHeader, - { height, timestamp }, + {height, timestamp}, + this.nodeConfig, this.storeService, this.unfinalizedBlockService, this.dynamicDsService, this.sequelize, this.projectUpgradeService, + this.multiChainRewindService, this.nodeConfig.proofOfIndex ? this.poiService : undefined /* Not providing force clean service, it should never be needed */ ); diff --git a/packages/node-core/src/indexer/store.service.test.ts b/packages/node-core/src/indexer/store.service.test.ts index 3305176726..f713bfe92f 100644 --- a/packages/node-core/src/indexer/store.service.test.ts +++ b/packages/node-core/src/indexer/store.service.test.ts @@ -18,6 +18,10 @@ const option: DbOption = { }; jest.setTimeout(60000); +// Mock 1740100000 is the timestamp of the genesis block +const genBlockTimestamp = (height: number) => (1740100000 + height) * 1000; +const genBlockDate = (height: number) => new Date(genBlockTimestamp(height)); + const testSchemaName = 'test_model_store'; const schema = buildSchemaFromString(` type Account @entity { @@ -57,7 +61,12 @@ describe('Check whether the db store and cache store are consistent.', () => { }); it('Same block, Execute the set method multiple times.', async () => { - await storeService.setBlockHeader({blockHeight: 1, blockHash: '0x01', parentHash: '0x00'}); + await storeService.setBlockHeader({ + blockHeight: 1, + blockHash: '0x01', + parentHash: '0x00', + timestamp: genBlockDate(1), + }); const accountEntity = {id: 'block-001', balance: 100}; @@ -121,7 +130,12 @@ describe('Check whether the db store and cache store are consistent.', () => { }, 30000); it('_block_range update check', async () => { - await storeService.setBlockHeader({blockHeight: 1000, blockHash: '0x1000', parentHash: '0x0999'}); + await storeService.setBlockHeader({ + blockHeight: 1000, + blockHash: '0x1000', + parentHash: '0x0999', + timestamp: genBlockDate(1000), + }); // insert new account. const account1000Data = {id: 'block-1000', balance: 999}; @@ -204,7 +218,12 @@ describe('Cache Provider', () => { tx.afterCommit(() => { Account.clear(blockHeight); }); - await storeService.setBlockHeader({blockHeight, blockHash: `0x${blockHeight}`, parentHash: `0x${blockHeight - 1}`}); + await storeService.setBlockHeader({ + blockHeight, + blockHash: `0x${blockHeight}`, + parentHash: `0x${blockHeight - 1}`, + timestamp: genBlockDate(blockHeight), + }); await handle(blockHeight); await Account.runFlush(tx, blockHeight); await tx.commit(); diff --git a/packages/node-core/src/indexer/store.service.ts b/packages/node-core/src/indexer/store.service.ts index a2307b0a82..b4be1580a8 100644 --- a/packages/node-core/src/indexer/store.service.ts +++ b/packages/node-core/src/indexer/store.service.ts @@ -34,8 +34,17 @@ import { } from '../db'; import {getLogger} from '../logger'; import {exitWithError} from '../process'; -import {camelCaseObjectKey, customCamelCaseGraphqlKey, getHistoricalUnit} from '../utils'; -import {MetadataFactory, MetadataRepo, PoiFactory, PoiFactoryDeprecate, PoiRepo} from './entities'; +import {camelCaseObjectKey, customCamelCaseGraphqlKey, getHistoricalUnit, hasValue} from '../utils'; +import { + generateRewindTimestampKey, + GlobalDataFactory, + GlobalDataRepo, + MetadataFactory, + MetadataRepo, + PoiFactory, + PoiFactoryDeprecate, + PoiRepo, +} from './entities'; import {Store} from './store'; import {IMetadata, IStoreModelProvider, PlainStoreModelService} from './storeModelProvider'; import {StoreOperations} from './StoreOperations'; @@ -63,6 +72,7 @@ export class StoreService { poiRepo?: PoiRepo; private _modelIndexedFields?: IndexField[]; private _modelsRelations?: GraphQLModelsRelationsEnums; + private _globalDataRepo?: GlobalDataRepo; private _metaDataRepo?: MetadataRepo; private _historical?: HistoricalMode; private _metadataModel?: IMetadata; @@ -104,6 +114,11 @@ export class StoreService { return this._operationStack; } + get globalDataRepo(): GlobalDataRepo { + assert(this._globalDataRepo, new NoInitError()); + return this._globalDataRepo; + } + get blockHeader(): Header { assert(this._blockHeader, new Error('StoreService.setBlockHeader has not been called')); return this._blockHeader; @@ -158,6 +173,10 @@ export class StoreService { this.subqueryProject.network.chainId ); + if (this.config.multiChain) { + this._globalDataRepo = GlobalDataFactory(this.sequelize, schema); + } + this._schema = schema; await this.sequelize.sync(); @@ -172,6 +191,8 @@ export class StoreService { await this.initHotSchemaReloadQueries(schema); await this.metadataModel.set('historicalStateEnabled', this.historical); + + await this.initChainRewindTimestamp(); } async init(schema: string): Promise { @@ -477,6 +498,30 @@ group by // Cant throw here because even with historical disabled the current height is used by the store return getHistoricalUnit(this.historical, this.blockHeader); } + + private async getRewindTimestamp(): Promise { + const rewindTimestampKey = generateRewindTimestampKey(this.subqueryProject.network.chainId); + const record = await this.globalDataRepo.findByPk(rewindTimestampKey); + if (hasValue(record)) { + return record.toJSON().value as number; + } + } + + private async initChainRewindTimestamp() { + if (!this.config.multiChain) return; + if ((await this.getRewindTimestamp()) !== undefined) return; + const rewindTimestampKey = generateRewindTimestampKey(this.subqueryProject.network.chainId); + await this.globalDataRepo.create({key: rewindTimestampKey, value: 0}); + } + + async getLastProcessedBlock(): Promise<{height: number; timestamp?: number}> { + const {lastProcessedBlockTimestamp: timestamp, lastProcessedHeight: height} = await this.metadataModel.findMany([ + 'lastProcessedHeight', + 'lastProcessedBlockTimestamp', + ]); + + return {height: height || 0, timestamp}; + } } // REMOVE 10,000 record per batch diff --git a/packages/node-core/src/indexer/storeModelProvider/global/global.ts b/packages/node-core/src/indexer/storeModelProvider/global/global.ts new file mode 100644 index 0000000000..fa54e3476e --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/global/global.ts @@ -0,0 +1,185 @@ +// Copyright 2020-2025 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import assert from 'assert'; +import {getLogger} from '@subql/node-core/logger'; +import {Op, QueryTypes, Sequelize, Transaction} from '@subql/x-sequelize'; +import { + generateRewindTimestampKey, + GlobalData, + GlobalDataKeys, + GlobalDataRepo, + RewindLockInfo, + RewindLockKey, + RewindTimestampKey, + RewindTimestampKeyPrefix, +} from '../../entities'; + +export interface IGlobalData { + getGlobalRewindStatus(): Promise<{ + rewindTimestamp: GlobalDataKeys[RewindTimestampKey]; + rewindLock?: GlobalDataKeys[typeof RewindLockKey]; + }>; + + setGlobalRewindLock(rewindTimestamp: number): Promise<{needRewind: boolean}>; + /** + * Check if the height is consistent before unlocking. + * @param tx + * @param rewindTimestamp The timestamp to roll back to, in milliseconds. + * @returns the number of remaining rewind chains + */ + releaseChainRewindLock(tx: Transaction, rewindTimestamp: number): Promise; +} + +const logger = getLogger('PlainGlobalModel'); + +export class PlainGlobalModel implements IGlobalData { + constructor( + private readonly dbSchema: string, + private readonly chainId: string, + private readonly model: GlobalDataRepo + ) {} + + private get sequelize(): Sequelize { + const sequelize = this.model.sequelize; + + if (!sequelize) { + throw new Error(`Sequelize is not available on ${this.model.name}`); + } + + return sequelize; + } + + /** + * Serialize the rewind lock + * @param rewindTimestamp ms + * @param chainTotal The total number of registered chains. + * @returns + */ + private serializeRewindLock(rewindTimestamp: number, chainTotal: number): string { + return JSON.stringify({timestamp: rewindTimestamp, chainNum: chainTotal}); + } + + async getGlobalRewindStatus() { + const rewindTimestampKey = generateRewindTimestampKey(this.chainId); + + const records = await this.model.findAll({ + where: {key: {[Op.in]: [rewindTimestampKey, RewindLockKey]}}, + }); + const rewindLockInfo = records.find((r) => r.key === RewindLockKey)?.toJSON>(); + const rewindTimestampInfo = records + .find((r) => r.key === rewindTimestampKey) + ?.toJSON>(); + + assert( + rewindTimestampInfo !== undefined, + `Not registered rewind timestamp key in global data, chainId: ${this.chainId}` + ); + return {rewindTimestamp: rewindTimestampInfo.value, rewindLock: rewindLockInfo?.value}; + } + + /** + * If the set rewindTimestamp is greater than or equal to the current blockHeight, we do nothing because we will roll back to an earlier time. + * If the set rewindTimestamp is less than the current blockHeight, we should roll back to the earlier rewindTimestamp. + * @param rewindTimestamp rewindTimestamp in milliseconds + */ + async setGlobalRewindLock(rewindTimestamp: number): Promise<{needRewind: boolean}> { + const globalTable = this.model.tableName; + const chainTotal = await this.model.count({ + where: { + key: {[Op.like]: `${RewindTimestampKeyPrefix}_%`}, + }, + }); + + let needRewind = false; + const tx = await this.sequelize.transaction(); + try { + const [_, updateRows] = await this.sequelize.query( + `INSERT INTO "${this.dbSchema}"."${globalTable}" ( "key", "value", "createdAt", "updatedAt" ) + VALUES + ( '${RewindLockKey}', '${this.serializeRewindLock(rewindTimestamp, chainTotal)}', now(), now()) + ON CONFLICT ( "key" ) + DO UPDATE + SET "key" = EXCLUDED."key", + "value" = EXCLUDED."value", + "updatedAt" = EXCLUDED."updatedAt" + WHERE "${globalTable}"."key" = '${RewindLockKey}' AND ("${globalTable}"."value"->>'timestamp')::BIGINT > ${rewindTimestamp}`, + { + type: QueryTypes.INSERT, + transaction: tx, + } + ); + + // If there is a rewind lock that is greater than the current rewind timestamp, we should not update the rewind timestamp + if (updateRows === 1) { + await this.model.update( + {value: rewindTimestamp}, + { + where: {key: {[Op.like]: 'rewindTimestamp_%'}}, + transaction: tx, + } + ); + + // The current chain is in REWINDING state + needRewind = true; + } + await tx.commit(); + return {needRewind}; + } catch (e: any) { + logger.error( + `setGlobalRewindLock failed chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}, errorMsg: ${e.message}` + ); + await tx.rollback(); + throw e; + } + } + + async releaseChainRewindLock(tx: Transaction, rewindTimestamp: number): Promise { + const globalTable = this.model.tableName; + + // Ensure the first write occurs and prevent deadlock, only update the rewindNum - 1 + const results = await this.sequelize.query<{value: RewindLockInfo}>( + `UPDATE "${this.dbSchema}"."${globalTable}" + SET value = jsonb_set( + value, + '{chainNum}', + to_jsonb(COALESCE(("${globalTable}"."value" ->> 'chainNum')::BIGINT, 0) - 1), + false + ) + WHERE "${globalTable}"."key" = '${RewindLockKey}' AND ("${globalTable}"."value"->>'timestamp')::BIGINT = ${rewindTimestamp} + RETURNING value`, + { + type: QueryTypes.SELECT, + transaction: tx, + } + ); + + // not exist rewind lock in current timestamp + if (results.length === 0) { + return 0; + } + const chainNum = results[0].value.chainNum; + + const rewindTimestampKey = generateRewindTimestampKey(this.chainId); + const [affectedCount] = await this.model.update( + {value: 0}, + { + where: { + key: rewindTimestampKey, + value: rewindTimestamp, + }, + transaction: tx, + } + ); + assert( + affectedCount === 1, + `not found rewind timestamp key in global data, chainId: ${this.chainId}, rewindTimestamp: ${rewindTimestamp}` + ); + + if (chainNum === 0) { + await this.model.destroy({where: {key: RewindLockKey}, transaction: tx}); + } + + return chainNum; + } +} diff --git a/packages/node-core/src/indexer/storeModelProvider/global/index.ts b/packages/node-core/src/indexer/storeModelProvider/global/index.ts new file mode 100644 index 0000000000..6e4c701873 --- /dev/null +++ b/packages/node-core/src/indexer/storeModelProvider/global/index.ts @@ -0,0 +1,4 @@ +// Copyright 2020-2025 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +export {IGlobalData} from './global'; diff --git a/packages/node-core/src/indexer/types.ts b/packages/node-core/src/indexer/types.ts index c3b3cd420b..4690b9c8d2 100644 --- a/packages/node-core/src/indexer/types.ts +++ b/packages/node-core/src/indexer/types.ts @@ -27,7 +27,7 @@ export interface ISubqueryProject< C = unknown, > extends Omit, 'schema' | 'version' | 'name' | 'specVersion' | 'description'> { readonly schema: GraphQLSchema; - applyCronTimestamps: (getBlockTimestamp: (height: number) => Promise) => Promise; + applyCronTimestamps: (getBlockTimestamp: (height: number) => Promise) => Promise; readonly id: string; chainTypes?: C; // The chainTypes after loaded readonly root: string; @@ -76,7 +76,7 @@ export type Header = { blockHeight: number; blockHash: string; parentHash: string | undefined; - timestamp?: Date; + timestamp: Date; }; export type BypassBlocks = (number | `${number}-${number}`)[]; diff --git a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts index 258c60912c..7ef91ef6fa 100644 --- a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts +++ b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts @@ -302,7 +302,7 @@ export class UnfinalizedBlocksService implements IUnfinalizedBlocksServ const result: (Header & {timestamp: string})[] = JSON.parse(val); return result.map(({timestamp, ...header}) => ({ ...header, - timestamp: timestamp ? new Date(timestamp) : undefined, + timestamp: new Date(timestamp), })); } return []; diff --git a/packages/node-core/src/indexer/worker/worker.service.ts b/packages/node-core/src/indexer/worker/worker.service.ts index ac4b477d4d..7e5acf1bc7 100644 --- a/packages/node-core/src/indexer/worker/worker.service.ts +++ b/packages/node-core/src/indexer/worker/worker.service.ts @@ -33,7 +33,7 @@ export abstract class BaseWorkerService< private queue: AutoQueue>; protected abstract fetchChainBlock(heights: number, extra: E): Promise>; - protected abstract toBlockResponse(block: B): R; + protected abstract toBlockResponse(block: B): Promise; protected abstract processFetchedBlock(block: IBlock, dataSources: DS[]): Promise; protected abstract getBlockSize(block: IBlock): number; diff --git a/packages/node-core/src/subcommands/reindex.service.ts b/packages/node-core/src/subcommands/reindex.service.ts index 4899f3c2a8..07967557f3 100644 --- a/packages/node-core/src/subcommands/reindex.service.ts +++ b/packages/node-core/src/subcommands/reindex.service.ts @@ -2,11 +2,11 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; -import { Inject, Injectable } from '@nestjs/common'; -import { BaseDataSource } from '@subql/types-core'; -import { Sequelize } from '@subql/x-sequelize'; -import { IBlockchainService } from '../blockchain.service'; -import { NodeConfig, ProjectUpgradeService } from '../configure'; +import {Inject, Injectable} from '@nestjs/common'; +import {BaseDataSource} from '@subql/types-core'; +import {Sequelize} from '@subql/x-sequelize'; +import {IBlockchainService} from '../blockchain.service'; +import {NodeConfig, ProjectUpgradeService} from '../configure'; import { IUnfinalizedBlocksService, StoreService, @@ -15,19 +15,20 @@ import { IMetadata, cacheProviderFlushData, Header, + MultiChainRewindService, } from '../indexer'; -import { DynamicDsService } from '../indexer/dynamic-ds.service'; -import { getLogger } from '../logger'; -import { exitWithError, monitorWrite } from '../process'; -import { getExistingProjectSchema, initDbSchema, reindex } from '../utils'; -import { ForceCleanService } from './forceClean.service'; +import {DynamicDsService} from '../indexer/dynamic-ds.service'; +import {getLogger} from '../logger'; +import {exitWithError, monitorWrite} from '../process'; +import {getExistingProjectSchema, initDbSchema, reindex} from '../utils'; +import {ForceCleanService} from './forceClean.service'; const logger = getLogger('Reindex'); @Injectable() export class ReindexService

{ private _metadataRepo?: IMetadata; - private _lastProcessedHeader?: { height: number; timestamp?: number }; + private _lastProcessedHeader?: {height: number; timestamp?: number}; constructor( private readonly sequelize: Sequelize, @@ -40,6 +41,7 @@ export class ReindexService

, @Inject('DynamicDsService') private readonly dynamicDsService: DynamicDsService, @Inject('IBlockchainService') private readonly blockchainService: IBlockchainService, + private readonly multiChainRewindService: MultiChainRewindService ) {} private get metadataRepo(): IMetadata { @@ -65,10 +67,7 @@ export class ReindexService

blockHeight <= inputHeight); + const bestBlocks = unfinalizedBlocks.filter(({blockHeight}) => blockHeight <= inputHeight); if (bestBlocks.length && inputHeight >= bestBlocks[0].blockHeight) { return bestBlocks[0]; } @@ -103,7 +102,7 @@ export class ReindexService

= (ds: DS) => ds is DS; // eslint-disable-next-line @typescript-eslint/require-await export async function insertBlockFiltersCronSchedules( dataSources: DS[], - getBlockTimestamp: (height: number) => Promise, + getBlockTimestamp: (height: number) => Promise, isRuntimeDs: IsRuntimeDs, blockHandlerKind: string ): Promise { @@ -248,13 +248,7 @@ export async function insertBlockFiltersCronSchedules, dynamicDsService: DynamicDsService, sequelize: Sequelize, projectUpgradeService: IProjectUpgradeService, + multichainRewindService: MultiChainRewindService, poiService?: PoiService, forceCleanService?: ForceCleanService ): Promise { @@ -59,11 +63,17 @@ export async function reindex( logger.warn( `Skipping reindexing to ${storeService.historical} ${targetUnit}: current indexing height ${lastUnit} is behind requested ${storeService.historical}` ); + if (nodeConfig.multiChain) { + const tx = await sequelize.transaction(); + await multichainRewindService.releaseChainRewindLock(tx, targetUnit); + await tx.commit(); + } return; } // if startHeight is greater than the targetHeight, just force clean - if (targetBlockHeader.blockHeight < startHeight) { + // We prevent the entire data from being cleared due to multiple chains because the startblock is uncertain in multi-chain projects. + if (targetBlockHeader.blockHeight < startHeight && !nodeConfig.multiChain) { logger.info( `targetHeight: ${targetBlockHeader.blockHeight} is less than startHeight: ${startHeight}. Hence executing force-clean` ); @@ -73,45 +83,50 @@ export async function reindex( // if DB need rollback? no, because forceCleanService will take care of it await cacheProviderResetData(storeService.modelProvider); await forceCleanService?.forceClean(); - } else { - logger.info(`Reindexing to ${storeService.historical}: ${targetUnit}`); - await cacheProviderFlushData(storeService.modelProvider, true); - await cacheProviderResetData(storeService.modelProvider); - if (storeService.modelProvider instanceof StoreCacheService) { - await storeService.modelProvider.flushData(true); - await storeService.modelProvider.resetData(); - } - const transaction = await sequelize.transaction(); - try { - /* + return; + } + + logger.info(`Reindexing to ${storeService.historical}: ${targetUnit}`); + if (nodeConfig.multiChain) { + await multichainRewindService.setGlobalRewindLock(targetUnit); + } + + await cacheProviderFlushData(storeService.modelProvider, true); + await cacheProviderResetData(storeService.modelProvider); + if (storeService.modelProvider instanceof StoreCacheService) { + await storeService.modelProvider.flushData(true); + await storeService.modelProvider.resetData(); + } + const transaction = await sequelize.transaction(); + try { + /* Must initialize storeService, to ensure all models are loaded, as storeService.init has not been called at this point - 1. During runtime, model should be already been init - 2.1 On start, projectUpgrade rewind will sync the sequelize models - 2.2 On start, without projectUpgrade or upgradablePoint, sequelize will sync models through project.service + 1. During runtime, model should be already been init + 2.1 On start, projectUpgrade rewind will sync the sequelize models + 2.2 On start, without projectUpgrade or upgradablePoint, sequelize will sync models through project.service */ - await projectUpgradeService.rewind( - targetBlockHeader.blockHeight, - lastProcessed.height, - transaction, - storeService - ); + await projectUpgradeService.rewind(targetBlockHeader.blockHeight, lastProcessed.height, transaction, storeService); - await Promise.all([ - storeService.rewind(targetBlockHeader, transaction), - unfinalizedBlockService.resetUnfinalizedBlocks(), // TODO: may not needed for nonfinalized chains - unfinalizedBlockService.resetLastFinalizedVerifiedHeight(), // TODO: may not needed for nonfinalized chains - dynamicDsService.resetDynamicDatasource(targetBlockHeader.blockHeight, transaction), - poiService?.rewind(targetBlockHeader.blockHeight, transaction), - ]); - // Flush metadata changes from above Promise.all - await storeService.modelProvider.metadata.flush?.(transaction, targetUnit); + await Promise.all([ + storeService.rewind(targetBlockHeader, transaction), + unfinalizedBlockService.resetUnfinalizedBlocks(), // TODO: may not needed for nonfinalized chains + unfinalizedBlockService.resetLastFinalizedVerifiedHeight(), // TODO: may not needed for nonfinalized chains + dynamicDsService.resetDynamicDatasource(targetBlockHeader.blockHeight, transaction), + poiService?.rewind(targetBlockHeader.blockHeight, transaction), + ]); + // Flush metadata changes from above Promise.all + await storeService.modelProvider.metadata.flush?.(transaction, targetUnit); - await transaction.commit(); - logger.info('Reindex Success'); - } catch (err: any) { - logger.error(err, 'Reindexing failed'); - await transaction.rollback(); - throw err; + // release rewind lock + if (nodeConfig.multiChain) { + await multichainRewindService.releaseChainRewindLock(transaction, targetUnit); } + + await transaction.commit(); + logger.info('Reindex Success'); + } catch (err: any) { + logger.error(err, 'Reindexing failed'); + await transaction.rollback(); + throw err; } } diff --git a/packages/node/src/blockchain.service.spec.ts b/packages/node/src/blockchain.service.spec.ts index 9e8652d900..af6a778dca 100644 --- a/packages/node/src/blockchain.service.spec.ts +++ b/packages/node/src/blockchain.service.spec.ts @@ -55,4 +55,9 @@ describe('BlockchainService', () => { const interval = await blockchainService.getChainInterval(); expect(interval).toEqual(5000); }); + + it('can get the chain create time', async () => { + const requiredHeader = await blockchainService.getHeaderForHeight(24723095); + expect(requiredHeader.timestamp.getTime()).toEqual(1739501268001); + }); }); diff --git a/packages/node/src/blockchain.service.ts b/packages/node/src/blockchain.service.ts index 374bdac380..b73f2594a9 100644 --- a/packages/node/src/blockchain.service.ts +++ b/packages/node/src/blockchain.service.ts @@ -34,7 +34,8 @@ import { IIndexerWorker } from './indexer/worker/worker'; import { calcInterval, getBlockByHeight, - getTimestamp, + getTimestampFromSignedBlock, + getTimestampFromBlockHash, substrateHeaderToHeader, } from './utils/substrate'; @@ -115,9 +116,20 @@ export class BlockchainService this.apiService.updateBlockFetching(); } - async getBlockTimestamp(height: number): Promise { + async getBlockTimestamp(height: number): Promise { const block = await getBlockByHeight(this.apiService.api, height); - return getTimestamp(block); + + let timestamp = getTimestampFromSignedBlock(block); + if (!timestamp) { + // Not all networks have a block timestamp, e.g. Shiden + const blockTimestamp = await ( + await this.apiService.unsafeApi.at(block.hash) + ).query.timestamp.now(); + + timestamp = new Date(blockTimestamp.toNumber()); + } + + return timestamp; } getBlockSize(block: IBlock): number { @@ -127,9 +139,8 @@ export class BlockchainService async getFinalizedHeader(): Promise

{ const finalizedHash = await this.apiService.unsafeApi.rpc.chain.getFinalizedHead(); - const finalizedHeader = - await this.apiService.unsafeApi.rpc.chain.getHeader(finalizedHash); - return substrateHeaderToHeader(finalizedHeader); + + return this.getHeaderForHash(finalizedHash.toHex()); } async getBestHeight(): Promise { @@ -148,9 +159,15 @@ export class BlockchainService // TODO can this decorator be in unfinalizedBlocks Service? @mainThreadOnly() async getHeaderForHash(hash: string): Promise
{ - return substrateHeaderToHeader( + const blockHeader = substrateHeaderToHeader( await this.apiService.unsafeApi.rpc.chain.getHeader(hash), ); + const timestamp = await getTimestampFromBlockHash( + this.apiService.unsafeApi, + blockHeader.blockHash, + ); + + return { ...blockHeader, timestamp }; } // TODO can this decorator be in unfinalizedBlocks Service? diff --git a/packages/node/src/indexer/fetch.module.ts b/packages/node/src/indexer/fetch.module.ts index 879e58bcd8..868572e5dd 100644 --- a/packages/node/src/indexer/fetch.module.ts +++ b/packages/node/src/indexer/fetch.module.ts @@ -19,6 +19,7 @@ import { DynamicDsService, FetchService, DictionaryService, + MultiChainRewindService, blockDispatcherFactory, } from '@subql/node-core'; import { SubstrateDatasource } from '@subql/types'; @@ -62,6 +63,7 @@ import { IIndexerWorker } from './worker/worker'; useClass: ProjectService, provide: 'IProjectService', }, + MultiChainRewindService, IndexerManager, { provide: 'IBlockDispatcher', diff --git a/packages/node/src/indexer/project.service.spec.ts b/packages/node/src/indexer/project.service.spec.ts index 36c2e7f3dd..c650f1ad04 100644 --- a/packages/node/src/indexer/project.service.spec.ts +++ b/packages/node/src/indexer/project.service.spec.ts @@ -12,6 +12,7 @@ import { upgradableSubqueryProject, DsProcessorService, DynamicDsService, + MultiChainRewindService, } from '@subql/node-core'; import { SubstrateDatasourceKind, SubstrateHandlerKind } from '@subql/types'; import { GraphQLSchema } from 'graphql'; @@ -145,6 +146,7 @@ describe('ProjectService', () => { apiService: ApiService, project: SubqueryProject, blockchainService: BlockchainService, + multiChainRewindService: MultiChainRewindService, ) => new TestProjectService( { @@ -172,14 +174,25 @@ describe('ProjectService', () => { null as unknown as any, null as unknown as any, blockchainService, + multiChainRewindService, ), - inject: ['APIService', 'ISubqueryProject', 'IBlockchainService'], + inject: [ + 'APIService', + 'ISubqueryProject', + 'IBlockchainService', + MultiChainRewindService, + ], }, EventEmitter2, { provide: 'APIService', useFactory: ApiService.init, - inject: ['ISubqueryProject', ConnectionPoolService, EventEmitter2, NodeConfig] + inject: [ + 'ISubqueryProject', + ConnectionPoolService, + EventEmitter2, + NodeConfig, + ], }, { provide: ProjectUpgradeService, @@ -194,6 +207,11 @@ describe('ProjectService', () => { provide: 'IBlockchainService', useClass: BlockchainService, }, + { + provide: MultiChainRewindService, + // eslint-disable-next-line @typescript-eslint/no-empty-function + useValue: { init: () => {} }, + }, ], imports: [EventEmitterModule.forRoot()], }).compile(); diff --git a/packages/node/src/indexer/worker/worker-fetch.module.ts b/packages/node/src/indexer/worker/worker-fetch.module.ts index 8c4bc8ee29..8cbe249478 100644 --- a/packages/node/src/indexer/worker/worker-fetch.module.ts +++ b/packages/node/src/indexer/worker/worker-fetch.module.ts @@ -9,6 +9,7 @@ import { WorkerCoreModule, ProjectService, DsProcessorService, + MultiChainRewindService, } from '@subql/node-core'; import { BlockchainService } from '../../blockchain.service'; import { ApiService } from '../api.service'; @@ -48,6 +49,7 @@ import { WorkerService } from './worker.service'; provide: 'IBlockchainService', useClass: BlockchainService, }, + MultiChainRewindService, WorkerService, ], exports: [], diff --git a/packages/node/src/indexer/worker/worker.service.ts b/packages/node/src/indexer/worker/worker.service.ts index c97ee07287..798c9ed4e5 100644 --- a/packages/node/src/indexer/worker/worker.service.ts +++ b/packages/node/src/indexer/worker/worker.service.ts @@ -12,16 +12,12 @@ import { Header, } from '@subql/node-core'; import { SubstrateDatasource } from '@subql/types'; -import { substrateBlockToHeader } from '../../utils/substrate'; +import { fillTsInHeader, substrateBlockToHeader } from '../../utils/substrate'; import { ApiService } from '../api.service'; import { SpecVersion } from '../dictionary'; import { IndexerManager } from '../indexer.manager'; import { WorkerRuntimeService } from '../runtime/workerRuntimeService'; -import { - BlockContent, - getBlockSize, - LightBlockContent, -} from '../types'; +import { BlockContent, getBlockSize, LightBlockContent } from '../types'; export type FetchBlockResponse = Header & { specVersion?: number }; @@ -64,11 +60,13 @@ export class WorkerService extends BaseWorkerService< } // TODO test this with LightBlockContent - protected toBlockResponse( + protected async toBlockResponse( block: BlockContent /* | LightBlockContent*/, - ): FetchBlockResponse { + ): Promise { + const header = substrateBlockToHeader(block.block); + return { - ...substrateBlockToHeader(block.block), + ...(await fillTsInHeader(this.apiService.unsafeApi, header)), specVersion: block.block.specVersion, }; } diff --git a/packages/node/src/subcommands/reindex.module.ts b/packages/node/src/subcommands/reindex.module.ts index faf76fe32a..917041daaa 100644 --- a/packages/node/src/subcommands/reindex.module.ts +++ b/packages/node/src/subcommands/reindex.module.ts @@ -17,6 +17,7 @@ import { DsProcessorService, UnfinalizedBlocksService, DynamicDsService, + MultiChainRewindService, } from '@subql/node-core'; import { Sequelize } from '@subql/x-sequelize'; import { BlockchainService } from '../blockchain.service'; @@ -65,6 +66,7 @@ import { RuntimeService } from '../indexer/runtime/runtimeService'; provide: 'IBlockchainService', useClass: BlockchainService, }, + MultiChainRewindService, SchedulerRegistry, ], controllers: [], diff --git a/packages/node/src/utils/substrate.test.ts b/packages/node/src/utils/substrate.test.ts index 257bd32879..ec2979d81e 100644 --- a/packages/node/src/utils/substrate.test.ts +++ b/packages/node/src/utils/substrate.test.ts @@ -9,7 +9,8 @@ import { fetchBlocksBatches, filterExtrinsic, getBlockByHeight, - getTimestamp, + getTimestampFromBlockHash, + getTimestampFromSignedBlock, } from './substrate'; const ENDPOINT_POLKADOT = 'wss://rpc.polkadot.io'; @@ -99,7 +100,19 @@ describe('substrate utils', () => { const provider = new WsProvider(ENDPOINT_SHIDEN); const api = await ApiPromise.create({ provider }); const block1 = await getBlockByHeight(api, 1); - expect(getTimestamp(block1)).toBeUndefined(); + expect(getTimestampFromSignedBlock(block1)).toBeUndefined(); + await api.disconnect(); + }); + + it('return defined if no timestamp set extrinsic', async () => { + const provider = new WsProvider(ENDPOINT_SHIDEN); + const api = await ApiPromise.create({ provider }); + const block1 = await getBlockByHeight(api, 99999); + const timestamp = await getTimestampFromBlockHash( + api, + block1.block.header.hash.toString(), + ); + expect(timestamp).toBeDefined(); await api.disconnect(); }); }); diff --git a/packages/node/src/utils/substrate.ts b/packages/node/src/utils/substrate.ts index fc1899c255..e1e5caa212 100644 --- a/packages/node/src/utils/substrate.ts +++ b/packages/node/src/utils/substrate.ts @@ -39,7 +39,10 @@ const INTERVAL_THRESHOLD = BN_THOUSAND.div(BN_TWO); const DEFAULT_TIME = new BN(6_000); const A_DAY = new BN(24 * 60 * 60 * 1000); -export function substrateHeaderToHeader(header: SubstrateHeader): Header { +type MissTsHeader = Omit; +type OptionalTsHeader = MissTsHeader & { timestamp?: Date }; + +export function substrateHeaderToHeader(header: SubstrateHeader): MissTsHeader { return { blockHeight: header.number.toNumber(), blockHash: header.hash.toHex(), @@ -47,10 +50,12 @@ export function substrateHeaderToHeader(header: SubstrateHeader): Header { }; } -export function substrateBlockToHeader(block: SignedBlock): Header { +export function substrateBlockToHeader(block: SignedBlock): OptionalTsHeader { + const timestamp = getTimestampFromSignedBlock(block); + return { ...substrateHeaderToHeader(block.block.header), - timestamp: getTimestamp(block), + timestamp, }; } @@ -60,13 +65,13 @@ export function wrapBlock( specVersion: number, ): SubstrateBlock { return merge(signedBlock, { - timestamp: getTimestamp(signedBlock), + timestamp: getTimestampFromSignedBlock(signedBlock), specVersion: specVersion, events, }); } -export function getTimestamp({ +export function getTimestampFromSignedBlock({ block: { extrinsics }, }: SignedBlock): Date | undefined { // extrinsics can be undefined when fetching light blocks @@ -90,6 +95,22 @@ export function getTimestamp({ return undefined; } +export async function getTimestampFromBlockHash( + api: ApiPromise, + blockHash: string, +): Promise { + try { + const blockTimestamp = await ( + await api.at(blockHash) + ).query.timestamp.now(); + return new Date(blockTimestamp.toNumber()); + } catch (e) { + logger.error(`failed to fetch timestamp for block ${blockHash}`); + // SHIDEN network query.timestamp is undefined + return undefined as any; + } +} + export function wrapExtrinsics( wrappedBlock: SubstrateBlock, allEvents: EventRecord[], @@ -394,6 +415,13 @@ export async function fetchBlocksBatches( : fetchRuntimeVersionRange(api, parentBlockHashs), ]); + const blockHeaderMap: Header[] = await Promise.all( + blocks.map(async (block, idx) => { + const header = substrateBlockToHeader(block); + return fillTsInHeader(api, header); + }), + ); + return blocks.map((block, idx) => { const events = blockEvents[idx]; const parentSpecVersion = @@ -405,7 +433,7 @@ export async function fetchBlocksBatches( const wrappedEvents = wrapEvents(wrappedExtrinsics, events, wrappedBlock); return { - getHeader: () => substrateBlockToHeader(wrappedBlock), + getHeader: () => blockHeaderMap[idx], block: { block: wrappedBlock, extrinsics: wrappedExtrinsics, @@ -415,6 +443,21 @@ export async function fetchBlocksBatches( }); } +function isFullHeader(header: OptionalTsHeader): header is Header { + return header.timestamp !== undefined; +} + +export async function fillTsInHeader( + api: ApiPromise, + header: OptionalTsHeader, +): Promise
{ + if (!isFullHeader(header)) { + const timestamp = await getTimestampFromBlockHash(api, header.blockHash); + return { ...header, timestamp }; + } + return header; +} + // TODO why is fetchBlocksBatches a breadth first funciton rather than depth? export async function fetchLightBlock( api: ApiPromise, @@ -425,7 +468,7 @@ export async function fetchLightBlock( throw ApiPromiseConnection.handleError(e); }); - const [header, events] = await Promise.all([ + const [header, events, timestamp] = await Promise.all([ api.rpc.chain.getHeader(blockHash).catch((e) => { logger.error( `failed to fetch Block Header hash="${blockHash}" height="${height}"`, @@ -436,6 +479,7 @@ export async function fetchLightBlock( logger.error(`failed to fetch events at block ${blockHash}`); throw ApiPromiseConnection.handleError(e); }), + (await api.at(blockHash)).query.timestamp.now(), ]); const blockHeader: BlockHeader = { @@ -448,7 +492,10 @@ export async function fetchLightBlock( events: events.map((evt, idx) => merge(evt, { idx, block: blockHeader })), }, getHeader: () => { - return substrateHeaderToHeader(blockHeader.block.header); + return { + ...substrateHeaderToHeader(blockHeader.block.header), + timestamp: new Date(timestamp.toNumber()), + }; }, }; }