-
Notifications
You must be signed in to change notification settings - Fork 358
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
multi chain rewind service #2673
base: main
Are you sure you want to change the base?
Changes from all commits
aa851f4
4190451
56ea202
c4063a7
2694b55
c5e798a
8be26fa
3a1bd7c
be995ac
c05acd8
bce4223
baaed32
0ae0424
cb0c1b0
e1290f4
aea8d55
53440df
34dcc59
72caba8
0842faa
cbc4c6e
f14f23a
2b8c917
f1757d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,10 +4,19 @@ | |
import {INestApplication} from '@nestjs/common'; | ||
import {Test} from '@nestjs/testing'; | ||
import {delay} from '@subql/common'; | ||
import {hashName} from '@subql/utils'; | ||
import {Sequelize} from '@subql/x-sequelize'; | ||
import {NodeConfig} from '../configure/NodeConfig'; | ||
import {MultiChainRewindEvent} from '../events'; | ||
import {RewindLockKey} from '../indexer'; | ||
import {DbModule} from './db.module'; | ||
import {createSendNotificationTriggerFunction, createNotifyTrigger, getDbSizeAndUpdateMetadata} from './sync-helper'; | ||
import { | ||
createSendNotificationTriggerFunction, | ||
createNotifyTrigger, | ||
getDbSizeAndUpdateMetadata, | ||
createRewindTriggerFunction, | ||
createRewindTrigger, | ||
} from './sync-helper'; | ||
|
||
const nodeConfig = new NodeConfig({subquery: 'packages/node-core/test/v1.0.0', subqueryName: 'test'}); | ||
|
||
|
@@ -185,3 +194,84 @@ describe('sync helper test', () => { | |
}, 10_000); | ||
}); | ||
}); | ||
|
||
describe('Multi-chain notification', () => { | ||
let app: INestApplication; | ||
let sequelize: Sequelize; | ||
const schema = 'multi-chain-test'; | ||
|
||
let client: unknown; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please provide the type for this. |
||
|
||
const listenerHash = hashName(schema, 'rewind_trigger', '_global'); | ||
|
||
afterAll(async () => { | ||
await sequelize.dropSchema(schema, {}); | ||
await sequelize.close(); | ||
return app?.close(); | ||
}); | ||
|
||
afterEach(async () => { | ||
if (client) { | ||
await (client as any).query(`UNLISTEN "${listenerHash}"`); | ||
(client as any).removeAllListeners('notification'); | ||
sequelize.connectionManager.releaseConnection(client); | ||
} | ||
}); | ||
|
||
it('can handle multiple rows in one transaction', async () => { | ||
const module = await Test.createTestingModule({ | ||
imports: [DbModule.forRootWithConfig(nodeConfig)], | ||
}).compile(); | ||
app = module.createNestApplication(); | ||
await app.init(); | ||
sequelize = app.get(Sequelize); | ||
await sequelize.createSchema(schema, {}); | ||
// mock create global table | ||
await sequelize.query(` | ||
CREATE TABLE IF NOT EXISTS "${schema}"._global ( | ||
key VARCHAR(255) NOT NULL PRIMARY KEY, | ||
value JSONB, | ||
"createdAt" TIMESTAMP WITH TIME ZONE NOT NULL, | ||
"updatedAt" TIMESTAMP WITH TIME ZONE NOT NULL | ||
)`); | ||
|
||
await sequelize.query(createRewindTriggerFunction(schema)); | ||
await sequelize.query(createRewindTrigger(schema)); | ||
|
||
client = await sequelize.connectionManager.getConnection({ | ||
type: 'read', | ||
}); | ||
await (client as any).query(`LISTEN "${listenerHash}"`); | ||
|
||
const listener = jest.fn(); | ||
(client as any).on('notification', (msg: any) => { | ||
console.log('Payload:', msg.payload); | ||
listener(msg.payload); | ||
}); | ||
|
||
const rewindSqlFromTimestamp = ( | ||
timestamp: number | ||
) => `INSERT INTO "${schema}"."_global" ( "key", "value", "createdAt", "updatedAt" ) | ||
VALUES | ||
( 'rewindLock', '{"timestamp":${timestamp},"chainNum":1}', now(), now()) | ||
ON CONFLICT ( "key" ) | ||
DO UPDATE | ||
SET "key" = EXCLUDED."key", | ||
"value" = EXCLUDED."value", | ||
"updatedAt" = EXCLUDED."updatedAt" | ||
WHERE "_global"."key" = '${RewindLockKey}' AND ("_global"."value"->>'timestamp')::BIGINT > ${timestamp}`; | ||
const rewindTimestamp = 1597669506000; | ||
await sequelize.query(rewindSqlFromTimestamp(rewindTimestamp)); | ||
await delay(1); | ||
|
||
await sequelize.query(rewindSqlFromTimestamp(rewindTimestamp - 1)); | ||
await delay(1); | ||
|
||
await sequelize.query(`DELETE FROM "${schema}"."_global" WHERE "key" = '${RewindLockKey}'`); | ||
await delay(1); | ||
Comment on lines
+265
to
+271
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please try and remove the delays. Or reduce them to much smaller times |
||
expect(listener).toHaveBeenCalledTimes(3); | ||
expect(listener).toHaveBeenNthCalledWith(1, MultiChainRewindEvent.Rewind); | ||
expect(listener).toHaveBeenNthCalledWith(2, MultiChainRewindEvent.RewindTimestampDecreased); | ||
expect(listener).toHaveBeenNthCalledWith(3, MultiChainRewindEvent.RewindComplete); | ||
}, 20_000); | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
import assert from 'assert'; | ||
|
||
import {EventEmitter2, OnEvent} from '@nestjs/event-emitter'; | ||
import {ICoreBlockchainService} from '@subql/node-core/blockchain.service'; | ||
import {hexToU8a, u8aEq} from '@subql/utils'; | ||
import {Transaction} from '@subql/x-sequelize'; | ||
import {NodeConfig, IProjectUpgradeService} from '../../configure'; | ||
|
@@ -33,6 +34,7 @@ export interface IBlockDispatcher<B> { | |
latestBufferedHeight: number; | ||
batchSize: number; | ||
|
||
setLatestProcessedHeight(height: number): void; | ||
// Remove all enqueued blocks, used when a dynamic ds is created | ||
flushQueue(height: number): void; | ||
} | ||
|
@@ -60,7 +62,8 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB | |
protected queue: Q, | ||
protected storeService: StoreService, | ||
private storeModelProvider: IStoreModelProvider, | ||
private poiSyncService: PoiSyncService | ||
private poiSyncService: PoiSyncService, | ||
private blockChainService: ICoreBlockchainService | ||
) {} | ||
|
||
abstract enqueueBlocks(heights: (IBlock<B> | number)[], latestBufferHeight?: number): void | Promise<void>; | ||
|
@@ -214,7 +217,7 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB | |
} | ||
|
||
@OnEvent(AdminEvent.rewindTarget) | ||
handleAdminRewind(blockPayload: TargetBlockPayload): void { | ||
async handleAdminRewind(blockPayload: TargetBlockPayload) { | ||
if (this.currentProcessingHeight < blockPayload.height) { | ||
// this will throw back to admin controller, will NOT lead current indexing exit | ||
throw new Error( | ||
|
@@ -223,8 +226,10 @@ export abstract class BaseBlockDispatcher<Q extends IQueue, DS, B> implements IB | |
} | ||
|
||
// TODO can this work without | ||
const timestamp = await this.blockChainService.getBlockTimestamp(blockPayload.height); | ||
this._pendingRewindHeader = { | ||
blockHeight: Number(blockPayload.height), | ||
timestamp, | ||
Comment on lines
+229
to
+232
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think here now its easiest to call |
||
} as Header; | ||
const message = `Received admin command to rewind to block ${blockPayload.height}`; | ||
monitorWrite(`***** [ADMIN] ${message}`); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be removed.