Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] multi chain rewind #2627

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB
protected async rewind(lastCorrectHeader: Header): Promise<void> {
if (lastCorrectHeader.blockHeight <= this.currentProcessingHeight) {
logger.info(`Found last verified block at height ${lastCorrectHeader.blockHeight}, rewinding...`);
await this.projectService.reindex(lastCorrectHeader);
await this.projectService.reindex(lastCorrectHeader, true);
this.setLatestProcessedHeight(lastCorrectHeader.blockHeight);
logger.info(`Successful rewind to block ${lastCorrectHeader.blockHeight}!`);
}
Expand Down
44 changes: 44 additions & 0 deletions packages/node-core/src/indexer/entities/GlobalData.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {blake2AsHex} from '@subql/utils';
import {BuildOptions, DataTypes, Model, Sequelize} from '@subql/x-sequelize';

type RewindTimestampKey = `rewindTimestamp_${string}`;
export interface GlobalDataKeys {
rewindLock: number;
[key: RewindTimestampKey]: number;
}

export interface GlobalData {
key: keyof GlobalDataKeys;
value: GlobalDataKeys[keyof GlobalDataKeys];
}

interface GlobalDataEntity extends Model<GlobalData>, GlobalData {}

export type GlobalDataRepo = typeof Model & {
new (values?: unknown, options?: BuildOptions): GlobalDataEntity;
};

export function GlobalDataFactory(sequelize: Sequelize, schema: string): GlobalDataRepo {
const tableName = '_global';

return <GlobalDataRepo>sequelize.define(
tableName,
{
key: {
type: DataTypes.STRING,
primaryKey: true,
},
value: {
type: DataTypes.JSONB,
},
},
{freezeTableName: true, schema: schema}
);
}

export function generateRewindTimestampKey(chainId: string): RewindTimestampKey {
return `rewindTimestamp_${blake2AsHex(chainId)})`.substring(0, 63) as RewindTimestampKey;
}
1 change: 1 addition & 0 deletions packages/node-core/src/indexer/entities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@

export * from './Poi.entity';
export * from './Metadata.entity';
export * from './GlobalData.entity';
15 changes: 15 additions & 0 deletions packages/node-core/src/indexer/fetch.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,21 @@ export abstract class BaseFetchService<DS extends BaseDataSource, B extends IBlo
// Update the target height, this happens here to stay in sync with the rest of indexing
void this.storeModelProvider.metadata.set('targetHeight', latestHeight);

// If we're rewinding, we should wait until it's done
const {needRewind, needWaitRewind} = await this.projectService.getRewindStatus();
if (needRewind || needWaitRewind) {
logger.info(`Fetch service is waiting for rewind to finish`);
if (needRewind) {
// TODO Retrieve the block headers that require rollback.
const rewindBlockHeader: Header = {} as any;

this.blockDispatcher.flushQueue(rewindBlockHeader.blockHeight);
await this.projectService.reindex(rewindBlockHeader, false);
}
await delay(3);
continue;
}

// This could be latestBestHeight, dictionary should never include finalized blocks
// TODO add buffer so dictionary not used when project synced
if (startBlockHeight < this.latestBestHeight - scaledBatchSize) {
Expand Down
88 changes: 51 additions & 37 deletions packages/node-core/src/indexer/project.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,26 @@
// SPDX-License-Identifier: GPL-3.0

import assert from 'assert';
import { isMainThread } from 'worker_threads';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { BaseDataSource, IProjectNetworkConfig } from '@subql/types-core';
import { Sequelize } from '@subql/x-sequelize';
import { IApi } from '../api.service';
import { IProjectUpgradeService, NodeConfig } from '../configure';
import { IndexerEvent } from '../events';
import { getLogger } from '../logger';
import { exitWithError, monitorWrite } from '../process';
import { getExistingProjectSchema, getStartHeight, hasValue, initDbSchema, mainThreadOnly, reindex } from '../utils';
import { BlockHeightMap } from '../utils/blockHeightMap';
import { BaseDsProcessorService } from './ds-processor.service';
import { DynamicDsService } from './dynamic-ds.service';
import { MetadataKeys } from './entities';
import { PoiSyncService } from './poi';
import { PoiService } from './poi/poi.service';
import { StoreService } from './store.service';
import { cacheProviderFlushData } from './storeModelProvider';
import { ISubqueryProject, IProjectService, BypassBlocks, HistoricalMode, Header } from './types';
import { IUnfinalizedBlocksService } from './unfinalizedBlocks.service';
import {isMainThread} from 'worker_threads';
import {EventEmitter2} from '@nestjs/event-emitter';
import {BaseDataSource, IProjectNetworkConfig} from '@subql/types-core';
import {Sequelize} from '@subql/x-sequelize';
import {IApi} from '../api.service';
import {IProjectUpgradeService, NodeConfig} from '../configure';
import {IndexerEvent} from '../events';
import {getLogger} from '../logger';
import {exitWithError, monitorWrite} from '../process';
import {getExistingProjectSchema, getStartHeight, hasValue, initDbSchema, mainThreadOnly, reindex} from '../utils';
import {BlockHeightMap} from '../utils/blockHeightMap';
import {BaseDsProcessorService} from './ds-processor.service';
import {DynamicDsService} from './dynamic-ds.service';
import {MetadataKeys} from './entities';
import {PoiSyncService} from './poi';
import {PoiService} from './poi/poi.service';
import {StoreService} from './store.service';
import {cacheProviderFlushData} from './storeModelProvider';
import {ISubqueryProject, IProjectService, BypassBlocks, HistoricalMode, Header} from './types';
import {IUnfinalizedBlocksService} from './unfinalizedBlocks.service';

const logger = getLogger('Project');

Expand All @@ -35,7 +35,8 @@ export abstract class BaseProjectService<
API extends IApi,
DS extends BaseDataSource,
UnfinalizedBlocksService extends IUnfinalizedBlocksService<any> = IUnfinalizedBlocksService<any>,
> implements IProjectService<DS> {
> implements IProjectService<DS>
{
private _schema?: string;
private _startHeight?: number;
private _blockOffset?: number;
Expand Down Expand Up @@ -218,16 +219,16 @@ export abstract class BaseProjectService<

const existing = await metadata.findMany(keys);

const { chain, genesisHash, specName } = this.apiService.networkMeta;
const {chain, genesisHash, specName} = this.apiService.networkMeta;

if (this.project.runner) {
const { node, query } = this.project.runner;
const {node, query} = this.project.runner;

await metadata.setBulk([
{ key: 'runnerNode', value: node.name },
{ key: 'runnerNodeVersion', value: node.version },
{ key: 'runnerQuery', value: query.name },
{ key: 'runnerQueryVersion', value: query.version },
{key: 'runnerNode', value: node.name},
{key: 'runnerNodeVersion', value: node.version},
{key: 'runnerQuery', value: query.name},
{key: 'runnerQueryVersion', value: query.version},
]);
}
if (!existing.genesisHash) {
Expand Down Expand Up @@ -340,7 +341,7 @@ export abstract class BaseProjectService<
const nextProject = projects[i + 1][1];
nextMinStartHeight = Math.max(
nextProject.dataSources
.filter((ds): ds is DS & { startBlock: number } => !!ds.startBlock)
.filter((ds): ds is DS & {startBlock: number} => !!ds.startBlock)
.sort((a, b) => a.startBlock - b.startBlock)[0].startBlock,
projects[i + 1][0]
);
Expand All @@ -355,12 +356,12 @@ export abstract class BaseProjectService<
}[] = [];

[...project.dataSources, ...dynamicDs]
.filter((ds): ds is DS & { startBlock: number } => {
.filter((ds): ds is DS & {startBlock: number} => {
return !!ds.startBlock && (!nextMinStartHeight || nextMinStartHeight > ds.startBlock);
})
.forEach((ds) => {
events.push({ block: Math.max(height, ds.startBlock), start: true, ds });
if (ds.endBlock) events.push({ block: ds.endBlock + 1, start: false, ds });
events.push({block: Math.max(height, ds.startBlock), start: true, ds});
if (ds.endBlock) events.push({block: ds.endBlock + 1, start: false, ds});
});

// sort events by block in ascending order, start events come before end events
Expand Down Expand Up @@ -432,10 +433,13 @@ export abstract class BaseProjectService<

const timestamp = await this.getBlockTimestamp(upgradePoint);
// Only timestamp and blockHeight are used with reindexing so its safe to convert to a header
await this.reindex({
blockHeight: upgradePoint,
timestamp,
} as Header);
await this.reindex(
{
blockHeight: upgradePoint,
timestamp,
} as Header,
true
);
return upgradePoint + 1;
}
}
Expand All @@ -454,7 +458,7 @@ export abstract class BaseProjectService<
await this.onProjectChange(this.project);
}

async reindex(targetBlockHeader: Header): Promise<void> {
async reindex(targetBlockHeader: Header, flushGlobalLock: boolean): Promise<void> {
const [height, timestamp] = await Promise.all([
this.getLastProcessedHeight(),
this.storeService.modelProvider.metadata.find('lastProcessedBlockTimestamp'),
Expand All @@ -464,10 +468,13 @@ export abstract class BaseProjectService<
throw new Error('Cannot reindex with missing lastProcessedHeight');
}

if (flushGlobalLock && targetBlockHeader.timestamp) {
await this.storeService.setGlobalRewindLock(targetBlockHeader.timestamp.getTime());
}
return reindex(
this.getStartBlockFromDataSources(),
targetBlockHeader,
{ height, timestamp },
{height, timestamp},
this.storeService,
this.unfinalizedBlockService,
this.dynamicDsService,
Expand All @@ -477,4 +484,11 @@ export abstract class BaseProjectService<
/* Not providing force clean service, it should never be needed */
);
}

// `needRewind` indicates that the current chain requires a rewind,
// while `needWaitRewind` suggests that there are chains (including the current chain) in the project that have pending rewinds.
async getRewindStatus(): Promise<{needRewind: boolean; needWaitRewind: boolean}> {
const globalRewindStatus = await this.storeService.getGlobalRewindStatus();
return {needRewind: !!globalRewindStatus.rewindTimestamp, needWaitRewind: !!globalRewindStatus.rewindLock};
}
}
Loading
Loading