From 53d8065a72e4c2aa985ad211e60a1d6e5291aa38 Mon Sep 17 00:00:00 2001 From: Tate Date: Thu, 12 Dec 2024 02:39:08 +0000 Subject: [PATCH] draft: multi chain rewind --- .../blockDispatcher/base-block-dispatcher.ts | 2 +- .../src/indexer/entities/GlobalData.entity.ts | 44 +++++ .../node-core/src/indexer/entities/index.ts | 1 + .../node-core/src/indexer/fetch.service.ts | 15 ++ .../node-core/src/indexer/project.service.ts | 88 ++++++---- .../node-core/src/indexer/store.service.ts | 166 +++++++++++++----- packages/node-core/src/indexer/types.ts | 4 +- .../src/indexer/unfinalizedBlocks.service.ts | 32 ++-- .../src/subcommands/reindex.service.ts | 28 +-- packages/node-core/src/utils/reindex.ts | 36 ++-- 10 files changed, 291 insertions(+), 125 deletions(-) create mode 100644 packages/node-core/src/indexer/entities/GlobalData.entity.ts diff --git a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts index f2b4eb862d..b4bd3ebe4b 100644 --- a/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/base-block-dispatcher.ts @@ -130,7 +130,7 @@ export abstract class BaseBlockDispatcher implements IB protected async rewind(lastCorrectHeader: Header): Promise { if (lastCorrectHeader.blockHeight <= this.currentProcessingHeight) { logger.info(`Found last verified block at height ${lastCorrectHeader.blockHeight}, rewinding...`); - await this.projectService.reindex(lastCorrectHeader); + await this.projectService.reindex(lastCorrectHeader, true); this.setLatestProcessedHeight(lastCorrectHeader.blockHeight); logger.info(`Successful rewind to block ${lastCorrectHeader.blockHeight}!`); } diff --git a/packages/node-core/src/indexer/entities/GlobalData.entity.ts b/packages/node-core/src/indexer/entities/GlobalData.entity.ts new file mode 100644 index 0000000000..982e1dfaef --- /dev/null +++ b/packages/node-core/src/indexer/entities/GlobalData.entity.ts @@ -0,0 +1,44 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {blake2AsHex} from '@subql/utils'; +import {BuildOptions, DataTypes, Model, Sequelize} from '@subql/x-sequelize'; + +type RewindTimestampKey = `rewindTimestamp_${string}`; +export interface GlobalDataKeys { + rewindLock: number; + [key: RewindTimestampKey]: number; +} + +export interface GlobalData { + key: keyof GlobalDataKeys; + value: GlobalDataKeys[keyof GlobalDataKeys]; +} + +interface GlobalDataEntity extends Model, GlobalData {} + +export type GlobalDataRepo = typeof Model & { + new (values?: unknown, options?: BuildOptions): GlobalDataEntity; +}; + +export function GlobalDataFactory(sequelize: Sequelize, schema: string): GlobalDataRepo { + const tableName = '_global'; + + return sequelize.define( + tableName, + { + key: { + type: DataTypes.STRING, + primaryKey: true, + }, + value: { + type: DataTypes.JSONB, + }, + }, + {freezeTableName: true, schema: schema} + ); +} + +export function generateRewindTimestampKey(chainId: string): RewindTimestampKey { + return `rewindTimestamp_${blake2AsHex(chainId)})`.substring(0, 63) as RewindTimestampKey; +} diff --git a/packages/node-core/src/indexer/entities/index.ts b/packages/node-core/src/indexer/entities/index.ts index 7f479d87f7..8780079d4e 100644 --- a/packages/node-core/src/indexer/entities/index.ts +++ b/packages/node-core/src/indexer/entities/index.ts @@ -3,3 +3,4 @@ export * from './Poi.entity'; export * from './Metadata.entity'; +export * from './GlobalData.entity'; diff --git a/packages/node-core/src/indexer/fetch.service.ts b/packages/node-core/src/indexer/fetch.service.ts index 42bb2d52ba..b43cf34fd7 100644 --- a/packages/node-core/src/indexer/fetch.service.ts +++ b/packages/node-core/src/indexer/fetch.service.ts @@ -225,6 +225,21 @@ export abstract class BaseFetchService = IUnfinalizedBlocksService, -> implements IProjectService { +> implements IProjectService +{ private _schema?: string; private _startHeight?: number; private _blockOffset?: number; @@ -218,16 +219,16 @@ export abstract class BaseProjectService< const existing = await metadata.findMany(keys); - const { chain, genesisHash, specName } = this.apiService.networkMeta; + const {chain, genesisHash, specName} = this.apiService.networkMeta; if (this.project.runner) { - const { node, query } = this.project.runner; + const {node, query} = this.project.runner; await metadata.setBulk([ - { key: 'runnerNode', value: node.name }, - { key: 'runnerNodeVersion', value: node.version }, - { key: 'runnerQuery', value: query.name }, - { key: 'runnerQueryVersion', value: query.version }, + {key: 'runnerNode', value: node.name}, + {key: 'runnerNodeVersion', value: node.version}, + {key: 'runnerQuery', value: query.name}, + {key: 'runnerQueryVersion', value: query.version}, ]); } if (!existing.genesisHash) { @@ -340,7 +341,7 @@ export abstract class BaseProjectService< const nextProject = projects[i + 1][1]; nextMinStartHeight = Math.max( nextProject.dataSources - .filter((ds): ds is DS & { startBlock: number } => !!ds.startBlock) + .filter((ds): ds is DS & {startBlock: number} => !!ds.startBlock) .sort((a, b) => a.startBlock - b.startBlock)[0].startBlock, projects[i + 1][0] ); @@ -355,12 +356,12 @@ export abstract class BaseProjectService< }[] = []; [...project.dataSources, ...dynamicDs] - .filter((ds): ds is DS & { startBlock: number } => { + .filter((ds): ds is DS & {startBlock: number} => { return !!ds.startBlock && (!nextMinStartHeight || nextMinStartHeight > ds.startBlock); }) .forEach((ds) => { - events.push({ block: Math.max(height, ds.startBlock), start: true, ds }); - if (ds.endBlock) events.push({ block: ds.endBlock + 1, start: false, ds }); + events.push({block: Math.max(height, ds.startBlock), start: true, ds}); + if (ds.endBlock) events.push({block: ds.endBlock + 1, start: false, ds}); }); // sort events by block in ascending order, start events come before end events @@ -432,10 +433,13 @@ export abstract class BaseProjectService< const timestamp = await this.getBlockTimestamp(upgradePoint); // Only timestamp and blockHeight are used with reindexing so its safe to convert to a header - await this.reindex({ - blockHeight: upgradePoint, - timestamp, - } as Header); + await this.reindex( + { + blockHeight: upgradePoint, + timestamp, + } as Header, + true + ); return upgradePoint + 1; } } @@ -454,7 +458,7 @@ export abstract class BaseProjectService< await this.onProjectChange(this.project); } - async reindex(targetBlockHeader: Header): Promise { + async reindex(targetBlockHeader: Header, flushGlobalLock: boolean): Promise { const [height, timestamp] = await Promise.all([ this.getLastProcessedHeight(), this.storeService.modelProvider.metadata.find('lastProcessedBlockTimestamp'), @@ -464,10 +468,13 @@ export abstract class BaseProjectService< throw new Error('Cannot reindex with missing lastProcessedHeight'); } + if (flushGlobalLock && targetBlockHeader.timestamp) { + await this.storeService.setGlobalRewindLock(targetBlockHeader.timestamp.getTime()); + } return reindex( this.getStartBlockFromDataSources(), targetBlockHeader, - { height, timestamp }, + {height, timestamp}, this.storeService, this.unfinalizedBlockService, this.dynamicDsService, @@ -477,4 +484,11 @@ export abstract class BaseProjectService< /* Not providing force clean service, it should never be needed */ ); } + + // `needRewind` indicates that the current chain requires a rewind, + // while `needWaitRewind` suggests that there are chains (including the current chain) in the project that have pending rewinds. + async getRewindStatus(): Promise<{needRewind: boolean; needWaitRewind: boolean}> { + const globalRewindStatus = await this.storeService.getGlobalRewindStatus(); + return {needRewind: !!globalRewindStatus.rewindTimestamp, needWaitRewind: !!globalRewindStatus.rewindLock}; + } } diff --git a/packages/node-core/src/indexer/store.service.ts b/packages/node-core/src/indexer/store.service.ts index 306ea896e9..538e4b315a 100644 --- a/packages/node-core/src/indexer/store.service.ts +++ b/packages/node-core/src/indexer/store.service.ts @@ -2,8 +2,8 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; -import { Inject, Injectable } from '@nestjs/common'; -import { IProjectNetworkConfig } from '@subql/types-core'; +import {Inject, Injectable} from '@nestjs/common'; +import {IProjectNetworkConfig} from '@subql/types-core'; import { GraphQLModelsRelationsEnums, hashName, @@ -11,6 +11,7 @@ import { MULTI_METADATA_REGEX, hexToU8a, GraphQLModelsType, + blake2AsHex, } from '@subql/utils'; import { IndexesOptions, @@ -22,8 +23,8 @@ import { Transaction, Deferrable, } from '@subql/x-sequelize'; -import { camelCase, flatten, last, upperFirst } from 'lodash'; -import { NodeConfig } from '../configure'; +import {camelCase, flatten, last, upperFirst} from 'lodash'; +import {NodeConfig} from '../configure'; import { BTREE_GIST_EXTENSION_EXIST_QUERY, createSchemaTrigger, @@ -32,14 +33,23 @@ import { getTriggers, SchemaMigrationService, } from '../db'; -import { getLogger } from '../logger'; -import { exitWithError } from '../process'; -import { camelCaseObjectKey, customCamelCaseGraphqlKey, getHistoricalUnit } from '../utils'; -import { MetadataFactory, MetadataRepo, PoiFactory, PoiFactoryDeprecate, PoiRepo } from './entities'; -import { Store } from './store'; -import { IMetadata, IStoreModelProvider, PlainStoreModelService } from './storeModelProvider'; -import { StoreOperations } from './StoreOperations'; -import { Header, HistoricalMode, ISubqueryProject } from './types'; +import {getLogger} from '../logger'; +import {exitWithError} from '../process'; +import {camelCaseObjectKey, customCamelCaseGraphqlKey, getHistoricalUnit, hasValue} from '../utils'; +import { + generateRewindTimestampKey, + GlobalDataFactory, + GlobalDataRepo, + MetadataFactory, + MetadataRepo, + PoiFactory, + PoiFactoryDeprecate, + PoiRepo, +} from './entities'; +import {Store} from './store'; +import {IMetadata, IStoreModelProvider, PlainStoreModelService} from './storeModelProvider'; +import {StoreOperations} from './StoreOperations'; +import {Header, HistoricalMode, ISubqueryProject} from './types'; const logger = getLogger('StoreService'); const NULL_MERKEL_ROOT = hexToU8a('0x00'); @@ -63,6 +73,7 @@ export class StoreService { poiRepo?: PoiRepo; private _modelIndexedFields?: IndexField[]; private _modelsRelations?: GraphQLModelsRelationsEnums; + private _globalDataRepo?: GlobalDataRepo; private _metaDataRepo?: MetadataRepo; private _historical?: HistoricalMode; private _metadataModel?: IMetadata; @@ -91,6 +102,11 @@ export class StoreService { return this._modelsRelations; } + private get globalDataRepo(): GlobalDataRepo { + assert(this._globalDataRepo, new NoInitError()); + return this._globalDataRepo; + } + private get metaDataRepo(): MetadataRepo { assert(this._metaDataRepo, new NoInitError()); return this._metaDataRepo; @@ -158,6 +174,8 @@ export class StoreService { this.subqueryProject.network.chainId ); + this._globalDataRepo = GlobalDataFactory(this.sequelize, schema); + this._schema = schema; await this.sequelize.sync(); @@ -172,6 +190,8 @@ export class StoreService { await this.initHotSchemaReloadQueries(schema); await this.metadataModel.set('historicalStateEnabled', this.historical); + + await this.initChainRewindTimestamp(); } async init(schema: string): Promise { @@ -204,7 +224,7 @@ export class StoreService { await this.metadataModel.setIncrement('schemaMigrationCount'); } } catch (e: any) { - exitWithError(new Error(`Having a problem when syncing schema`, { cause: e }), logger); + exitWithError(new Error(`Having a problem when syncing schema`, {cause: e}), logger); } } @@ -233,7 +253,7 @@ export class StoreService { try { this._modelIndexedFields = await this.getAllIndexFields(schema); } catch (e: any) { - exitWithError(new Error(`Having a problem when getting indexed fields`, { cause: e }), logger); + exitWithError(new Error(`Having a problem when getting indexed fields`, {cause: e}), logger); } } @@ -285,17 +305,17 @@ export class StoreService { private async useDeprecatePoi(schema: string): Promise { const sql = `SELECT * FROM information_schema.columns WHERE table_schema = ? AND table_name = '_poi' AND column_name = 'projectId'`; - const [result] = await this.sequelize.query(sql, { replacements: [schema] }); + const [result] = await this.sequelize.query(sql, {replacements: [schema]}); return !!result.length; } async getHistoricalStateEnabled(schema: string): Promise { - const { historical, multiChain } = this.config; + const {historical, multiChain} = this.config; try { const tableRes = await this.sequelize.query>( `SELECT table_name FROM information_schema.tables where table_schema='${schema}'`, - { type: QueryTypes.SELECT } + {type: QueryTypes.SELECT} ); const metadataTableNames = flatten(tableRes).filter( @@ -306,9 +326,9 @@ export class StoreService { throw new Error('Metadata table does not exist'); } - const res = await this.sequelize.query<{ key: string; value: boolean | string }>( + const res = await this.sequelize.query<{key: string; value: boolean | string}>( `SELECT key, value FROM "${schema}"."${metadataTableNames[0]}" WHERE (key = 'historicalStateEnabled')`, - { type: QueryTypes.SELECT } + {type: QueryTypes.SELECT} ); if (res[0]?.key !== 'historicalStateEnabled') { @@ -477,6 +497,68 @@ group by // Cant throw here because even with historical disabled the current height is used by the store return getHistoricalUnit(this.historical, this.blockHeader); } + + private async getRewindTimestamp(): Promise { + const rewindTimestampKey = generateRewindTimestampKey(this.subqueryProject.network.chainId); + const record = await this.globalDataRepo.findByPk(rewindTimestampKey); + if (hasValue(record)) { + return record.toJSON().value as number; + } + } + + private async initChainRewindTimestamp() { + if (await this.getRewindTimestamp()) return; + + const rewindTimestampKey = generateRewindTimestampKey(this.subqueryProject.network.chainId); + await this.globalDataRepo.create({key: rewindTimestampKey, value: 0}); + } + + async getGlobalRewindStatus(): Promise<{rewindTimestamp: number; rewindLock?: number}> { + const chainId = this.subqueryProject.network.chainId; + const rewindTimestampKey = generateRewindTimestampKey(chainId); + + const records = await this.globalDataRepo.findAll({where: {key: {[Op.in]: [rewindTimestampKey, 'rewindLock']}}}); + const rewindLock = records.find((r) => r.key === 'rewindLock')?.value; + const rewindTimestamp = records.find((r) => r.key === rewindTimestampKey)?.value; + if (rewindTimestamp === undefined) { + throw new Error(`Not registered rewind timestamp key in global data, chainId: ${chainId}`); + } + + return {rewindTimestamp, rewindLock}; + } + + // If the set rewindTimestamp is greater than or equal to the current blockHeight, we do nothing because we will roll back to an earlier time. + // If the set rewindTimestamp is less than the current blockHeight, we should roll back to the earlier rewindTimestamp. + async setGlobalRewindLock(rewindTimestamp: number): Promise { + const updateRecord = await this.globalDataRepo.bulkCreate([{key: 'rewindLock', value: rewindTimestamp}], { + updateOnDuplicate: ['key', 'value'], + conflictWhere: {key: {[Op.gt]: rewindTimestamp}}, + }); + + // If there is a rewind lock that is greater than the current rewind timestamp, we should not update the rewind timestamp + if (updateRecord.length) { + await this.globalDataRepo.update({value: rewindTimestamp}, {where: {key: {[Op.like]: 'rewindTimestamp_%'}}}); + } + } + + async releaseRewindLock(tx: Transaction): Promise { + const chainId = this.subqueryProject.network.chainId; + const rewindTimestampKey = generateRewindTimestampKey(chainId); + await this.globalDataRepo.update({value: 0}, {where: {key: rewindTimestampKey}, transaction: tx}); + const record = await this.globalDataRepo.findAll({ + where: { + key: {[Op.like]: 'rewindLock_%'}, + value: {[Op.gt]: 0}, + }, + transaction: tx, + }); + if (record.length) { + logger.info(`Rewind success chainId: ${chainId}, but there are still rewind locks for other chains`); + return; + } + await this.globalDataRepo.destroy({where: {key: 'rewindLock'}, transaction: tx}); + logger.info(`Rewind success chainId: ${chainId}, all rewind locks are released`); + } } // REMOVE 10,000 record per batch @@ -495,33 +577,33 @@ async function batchDeleteAndThenUpdate( destroyCompleted ? 0 : model.destroy({ - transaction, - hooks: false, - limit: batchSize, - where: sequelize.where(sequelize.fn('lower', sequelize.col('_block_range')), Op.gt, targetBlockUnit), - }), + transaction, + hooks: false, + limit: batchSize, + where: sequelize.where(sequelize.fn('lower', sequelize.col('_block_range')), Op.gt, targetBlockUnit), + }), updateCompleted ? [0] : model.update( - { - __block_range: sequelize.fn('int8range', sequelize.fn('lower', sequelize.col('_block_range')), null), - }, - { - transaction, - limit: batchSize, - hooks: false, - where: { - [Op.and]: [ - { - __block_range: { - [Op.contains]: targetBlockUnit, - }, - }, - sequelize.where(sequelize.fn('upper', sequelize.col('_block_range')), Op.not, null), - ], + { + __block_range: sequelize.fn('int8range', sequelize.fn('lower', sequelize.col('_block_range')), null), }, - } - ), + { + transaction, + limit: batchSize, + hooks: false, + where: { + [Op.and]: [ + { + __block_range: { + [Op.contains]: targetBlockUnit, + }, + }, + sequelize.where(sequelize.fn('upper', sequelize.col('_block_range')), Op.not, null), + ], + }, + } + ), ]); logger.debug(`${model.name} deleted ${numDestroyRows} records, updated ${numUpdatedRows} records`); if (numDestroyRows === 0) { diff --git a/packages/node-core/src/indexer/types.ts b/packages/node-core/src/indexer/types.ts index b5ef954888..62e99ac204 100644 --- a/packages/node-core/src/indexer/types.ts +++ b/packages/node-core/src/indexer/types.ts @@ -52,7 +52,7 @@ export interface IProjectService { blockOffset: number | undefined; startHeight: number; bypassBlocks: BypassBlocks; - reindex(lastCorrectHeader: Header): Promise; + reindex(lastCorrectHeader: Header, flushGlobalLock: boolean): Promise; /** * This is used everywhere but within indexing blocks, see comment on getDataSources for more info * */ @@ -65,6 +65,8 @@ export interface IProjectService { getStartBlockFromDataSources(): number; getDataSourcesMap(): BlockHeightMap; hasDataSourcesAfterHeight(height: number): boolean; + + getRewindStatus(): Promise<{needRewind: boolean; needWaitRewind: boolean}>; } export interface IBlock { diff --git a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts index b1724b2653..4c05931ed3 100644 --- a/packages/node-core/src/indexer/unfinalizedBlocks.service.ts +++ b/packages/node-core/src/indexer/unfinalizedBlocks.service.ts @@ -2,16 +2,16 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; -import { Transaction } from '@subql/x-sequelize'; -import { isEqual, last } from 'lodash'; -import { NodeConfig } from '../configure'; -import { Header, IBlock } from '../indexer/types'; -import { getLogger } from '../logger'; -import { exitWithError } from '../process'; -import { mainThreadOnly } from '../utils'; -import { ProofOfIndex } from './entities'; -import { PoiBlock } from './poi'; -import { IStoreModelProvider } from './storeModelProvider'; +import {Transaction} from '@subql/x-sequelize'; +import {isEqual, last} from 'lodash'; +import {NodeConfig} from '../configure'; +import {Header, IBlock} from '../indexer/types'; +import {getLogger} from '../logger'; +import {exitWithError} from '../process'; +import {mainThreadOnly} from '../utils'; +import {ProofOfIndex} from './entities'; +import {PoiBlock} from './poi'; +import {IStoreModelProvider} from './storeModelProvider'; const logger = getLogger('UnfinalizedBlocks'); @@ -25,7 +25,7 @@ const UNFINALIZED_THRESHOLD = 200; type UnfinalizedBlocks = Header[]; export interface IUnfinalizedBlocksService extends IUnfinalizedBlocksServiceUtil { - init(reindex: (targetHeader: Header) => Promise): Promise
; + init(reindex: (targetHeader: Header, flushGlobalLock: boolean) => Promise): Promise
; processUnfinalizedBlocks(block: IBlock | undefined): Promise
; processUnfinalizedBlockHeader(header: Header | undefined): Promise
; resetUnfinalizedBlocks(tx?: Transaction): void; @@ -78,7 +78,7 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo protected readonly storeModelProvider: IStoreModelProvider ) {} - async init(reindex: (tagetHeader: Header) => Promise): Promise
{ + async init(reindex: (tagetHeader: Header, flushGlobalLock: boolean) => Promise): Promise
{ logger.info(`Unfinalized blocks is ${this.nodeConfig.unfinalizedBlocks ? 'enabled' : 'disabled'}`); this.unfinalizedBlocks = await this.getMetadataUnfinalizedBlocks(); @@ -94,7 +94,7 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo logger.info( `Found un-finalized blocks from previous indexing but unverified, rolling back to last finalized block ${rewindHeight}` ); - await reindex(rewindHeight); + await reindex(rewindHeight, true); logger.info(`Successful rewind to block ${rewindHeight}!`); return rewindHeight; } else { @@ -164,7 +164,7 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo // remove any records less and equal than input finalized blockHeight private removeFinalized(blockHeight: number): void { - this.unfinalizedBlocks = this.unfinalizedBlocks.filter(({ blockHeight: height }) => height > blockHeight); + this.unfinalizedBlocks = this.unfinalizedBlocks.filter(({blockHeight: height}) => height > blockHeight); } // find closest record from block heights @@ -172,7 +172,7 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo // Have the block in the best block, can be verified return [...this.unfinalizedBlocks] // Copy so we can reverse .reverse() // Reverse the list to find the largest block - .find(({ blockHeight: height }) => height <= blockHeight); + .find(({blockHeight: height}) => height <= blockHeight); } // check unfinalized blocks for a fork, returns the header where a fork happened @@ -225,7 +225,7 @@ export abstract class BaseUnfinalizedBlocksService implements IUnfinalizedBlo protected async getLastCorrectFinalizedBlock(forkedHeader: Header): Promise
{ const bestVerifiableBlocks = this.unfinalizedBlocks.filter( - ({ blockHeight }) => blockHeight <= this.finalizedBlockNumber + ({blockHeight}) => blockHeight <= this.finalizedBlockNumber ); let checkingHeader = forkedHeader; diff --git a/packages/node-core/src/subcommands/reindex.service.ts b/packages/node-core/src/subcommands/reindex.service.ts index f117172155..279abfae19 100644 --- a/packages/node-core/src/subcommands/reindex.service.ts +++ b/packages/node-core/src/subcommands/reindex.service.ts @@ -2,10 +2,10 @@ // SPDX-License-Identifier: GPL-3.0 import assert from 'assert'; -import { Inject, Injectable } from '@nestjs/common'; -import { BaseDataSource } from '@subql/types-core'; -import { Sequelize } from '@subql/x-sequelize'; -import { NodeConfig, ProjectUpgradeService } from '../configure'; +import {Inject, Injectable} from '@nestjs/common'; +import {BaseDataSource} from '@subql/types-core'; +import {Sequelize} from '@subql/x-sequelize'; +import {NodeConfig, ProjectUpgradeService} from '../configure'; import { IUnfinalizedBlocksService, StoreService, @@ -15,18 +15,18 @@ import { cacheProviderFlushData, Header, } from '../indexer'; -import { DynamicDsService } from '../indexer/dynamic-ds.service'; -import { getLogger } from '../logger'; -import { exitWithError, monitorWrite } from '../process'; -import { getExistingProjectSchema, initDbSchema, reindex } from '../utils'; -import { ForceCleanService } from './forceClean.service'; +import {DynamicDsService} from '../indexer/dynamic-ds.service'; +import {getLogger} from '../logger'; +import {exitWithError, monitorWrite} from '../process'; +import {getExistingProjectSchema, initDbSchema, reindex} from '../utils'; +import {ForceCleanService} from './forceClean.service'; const logger = getLogger('Reindex'); @Injectable() export class ReindexService

{ private _metadataRepo?: IMetadata; - private _lastProcessedHeader?: { height: number; timestamp?: number }; + private _lastProcessedHeader?: {height: number; timestamp?: number}; constructor( private readonly sequelize: Sequelize, @@ -90,7 +90,7 @@ export class ReindexService

blockHeight <= inputHeight); + const bestBlocks = unfinalizedBlocks.filter(({blockHeight}) => blockHeight <= inputHeight); if (bestBlocks.length && inputHeight >= bestBlocks[0].blockHeight) { return bestBlocks[0]; } @@ -101,7 +101,7 @@ export class ReindexService

, dynamicDsService: DynamicDsService, @@ -64,6 +63,8 @@ export async function reindex( // if startHeight is greater than the targetHeight, just force clean if (targetBlockHeader.blockHeight < startHeight) { + // TODO: multi-chain forceClean should not be allowed, 应该只回滚到 startHeight 而不是 forceClean + logger.info( `targetHeight: ${targetBlockHeader.blockHeight} is less than startHeight: ${startHeight}. Hence executing force-clean` ); @@ -75,20 +76,22 @@ export async function reindex( await forceCleanService?.forceClean(); } else { logger.info(`Reindexing to ${storeService.historical}: ${targetUnit}`); + // need to flush cache before rewind await cacheProviderFlushData(storeService.modelProvider, true); await cacheProviderResetData(storeService.modelProvider); - if (storeService.modelProvider instanceof StoreCacheService) { - await storeService.modelProvider.flushData(true); - await storeService.modelProvider.resetData(); + // if Plain model, applyPendingChanges will be called in rewind + if (storeService.transaction) { + await storeService.modelProvider.applyPendingChanges(lastProcessed.height, false, storeService.transaction); } + const transaction = await sequelize.transaction(); try { /* - Must initialize storeService, to ensure all models are loaded, as storeService.init has not been called at this point - 1. During runtime, model should be already been init - 2.1 On start, projectUpgrade rewind will sync the sequelize models - 2.2 On start, without projectUpgrade or upgradablePoint, sequelize will sync models through project.service - */ + Must initialize storeService, to ensure all models are loaded, as storeService.init has not been called at this point + 1. During runtime, model should be already been init + 2.1 On start, projectUpgrade rewind will sync the sequelize models + 2.2 On start, without projectUpgrade or upgradablePoint, sequelize will sync models through project.service + */ await projectUpgradeService.rewind( targetBlockHeader.blockHeight, lastProcessed.height, @@ -106,6 +109,7 @@ export async function reindex( // Flush metadata changes from above Promise.all await storeService.modelProvider.metadata.flush?.(transaction, targetUnit); + await storeService.releaseRewindLock(transaction); await transaction.commit(); logger.info('Reindex Success'); } catch (err: any) {