From d71862b3c5733aa9b7ad4ba9c766007445ce1d7e Mon Sep 17 00:00:00 2001 From: icezohu Date: Mon, 29 Apr 2024 18:08:28 +0800 Subject: [PATCH] [release] @subql/network-support@1.2.0 @subql/apollo-links@1.4.0 * implement state manager * get signed state * support both old and new state * minor fixes * update logs * fix state cache * apollo-links support channel performance * revert main file config * deal with base64 encoded authorization error message enhance debug log * pass state store for apollo-links and eth-provider * [release] @subql/network-support@1.2.0 @subql/apollo-links@1.4.0 --- .eslintrc | 30 ++- packages/apollo-links/CHANGELOG.md | 5 +- packages/apollo-links/package.json | 2 +- packages/apollo-links/src/authHttpLink.ts | 4 +- .../apollo-links/src/core/clusterAuthLink.ts | 4 +- .../apollo-links/src/core/responseLink.ts | 45 ++--- packages/eth-provider/src/provider.ts | 22 ++- packages/network-support/CHANGELOG.md | 5 +- packages/network-support/package.json | 5 +- packages/network-support/src/fetch.ts | 10 +- packages/network-support/src/index.ts | 1 + packages/network-support/src/orderManager.ts | 95 +++++----- packages/network-support/src/scoreManager.ts | 4 +- packages/network-support/src/stateManager.ts | 174 ++++++++++++++++++ packages/network-support/src/types.ts | 10 + packages/network-support/src/utils/hash.ts | 10 + packages/network-support/src/utils/query.ts | 16 +- packages/network-support/src/utils/version.ts | 30 +++ yarn.lock | 3 +- 19 files changed, 370 insertions(+), 105 deletions(-) create mode 100644 packages/network-support/src/stateManager.ts create mode 100644 packages/network-support/src/utils/hash.ts create mode 100644 packages/network-support/src/utils/version.ts diff --git a/.eslintrc b/.eslintrc index 2a595bf1..0fb5e5b2 100644 --- a/.eslintrc +++ b/.eslintrc @@ -3,7 +3,10 @@ "parserOptions": { "sourceType": "module" }, - "plugins": ["@typescript-eslint/eslint-plugin", "header"], + "plugins": [ + "@typescript-eslint/eslint-plugin", + "header" + ], "extends": [ "eslint:recommended", "plugin:@typescript-eslint/recommended" @@ -14,17 +17,26 @@ "jest": true, "browser": true }, - "ignorePatterns": ["**/packages/**/dist/**", "/test/jest-setup.ts"], + "ignorePatterns": [ + "**/packages/**/dist/**", + "/test/jest-setup.ts" + ], "rules": { "@typescript-eslint/interface-name-prefix": "off", "@typescript-eslint/explicit-function-return-type": "off", "@typescript-eslint/explicit-module-boundary-types": "off", "@typescript-eslint/no-explicit-any": "off", - "header/header": [2, "line", [ - { - "pattern": " Copyright \\d{4}(-\\d{4})? SubQuery Pte Ltd authors & contributors", - "template": " Copyright 2020-2023 SubQuery Pte Ltd authors & contributors" }, - " SPDX-License-Identifier: Apache-2.0" - ], 2] + "header/header": [ + 2, + "line", + [ + { + "pattern": " Copyright \\d{4}(-\\d{4})? SubQuery Pte Ltd authors & contributors", + "template": " Copyright 2020-2024 SubQuery Pte Ltd authors & contributors" + }, + " SPDX-License-Identifier: Apache-2.0" + ], + 2 + ] } -} +} \ No newline at end of file diff --git a/packages/apollo-links/CHANGELOG.md b/packages/apollo-links/CHANGELOG.md index 7f60a337..c1faa665 100644 --- a/packages/apollo-links/CHANGELOG.md +++ b/packages/apollo-links/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [1.4.0] - 2024-04-29 + ## [1.3.2] - 2024-01-26 ## [1.3.0] - 2024-01-22 @@ -96,7 +98,8 @@ Breaking change for `dictHttpLink` and `deploymentHttpLink`, use `const { link } - Add Authlink for Apollo client -[unreleased]: https://github.com/subquery/network-clients/compare/v1.3.2...HEAD +[unreleased]: https://github.com/subquery/network-clients/compare/v1.4.0...HEAD +[1.4.0]: https://github.com/subquery/network-clients/compare/v1.3.2...v1.4.0 [1.3.2]: https://github.com/subquery/network-clients/compare/v1.3.0...v1.3.2 [1.3.0]: https://github.com/subquery/network-clients/compare/v1.2.6...v1.3.0 [1.2.6]: https://github.com/subquery/network-clients/compare/v1.2.4...v1.2.6 diff --git a/packages/apollo-links/package.json b/packages/apollo-links/package.json index d6be1e5f..72f1b82e 100644 --- a/packages/apollo-links/package.json +++ b/packages/apollo-links/package.json @@ -1,6 +1,6 @@ { "name": "@subql/apollo-links", - "version": "1.3.2", + "version": "1.4.0", "description": "SubQuery Network - graphql links", "main": "dist/index.js", "author": "SubQuery Pte Limited", diff --git a/packages/apollo-links/src/authHttpLink.ts b/packages/apollo-links/src/authHttpLink.ts index 9807810a..cecc5c0d 100644 --- a/packages/apollo-links/src/authHttpLink.ts +++ b/packages/apollo-links/src/authHttpLink.ts @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 import { ApolloLink, from } from '@apollo/client/core'; - import { IStore, OrderManager, @@ -28,6 +27,7 @@ interface BaseAuthOptions { logger?: Logger; // logger for `AuthLink` fallbackServiceUrl?: string; // fall back service url for `AuthLink` scoreStore?: IStore; // pass store in, so it doesn't get lost between page refresh + stateStore?: IStore; selector?: RunnerSelector; maxRetries?: number; useImmediateFallbackOnError?: boolean; @@ -68,6 +68,7 @@ function authHttpLink(options: AuthOptions): AuthHttpLink { authUrl, projectType, scoreStore, + stateStore, maxRetries, useImmediateFallbackOnError = false, logger: _logger, @@ -84,6 +85,7 @@ function authHttpLink(options: AuthOptions): AuthHttpLink { projectType, logger, scoreStore, + stateStore, responseFormat: ResponseFormat.Inline, selector, timeout, diff --git a/packages/apollo-links/src/core/clusterAuthLink.ts b/packages/apollo-links/src/core/clusterAuthLink.ts index 2432a93d..ac944525 100644 --- a/packages/apollo-links/src/core/clusterAuthLink.ts +++ b/packages/apollo-links/src/core/clusterAuthLink.ts @@ -36,8 +36,8 @@ export class ClusterAuthLink extends ApolloLink { .getRequestParams(this.getRequestId(operation)) .then((params) => { if (params) { - const { headers, url, type, runner } = params; - operation.setContext({ url, headers, type, indexer: runner }); + const { headers, url, type, runner, channelId } = params; + operation.setContext({ url, headers, type, indexer: runner, channelId }); sub = forward(operation).subscribe(observer); } else { this.logger?.debug('no available orders'); diff --git a/packages/apollo-links/src/core/responseLink.ts b/packages/apollo-links/src/core/responseLink.ts index 6ece42d0..1b55a01d 100644 --- a/packages/apollo-links/src/core/responseLink.ts +++ b/packages/apollo-links/src/core/responseLink.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 import { ApolloLink, FetchResult, NextLink, Observable, Operation } from '@apollo/client/core'; -import { ChannelState, OrderManager, OrderType, POST, ScoreType } from '@subql/network-support'; +import { ChannelState, OrderManager, OrderType, ScoreType, State } from '@subql/network-support'; import { Base64 } from 'js-base64'; import { Logger } from '../utils/logger'; @@ -13,40 +13,22 @@ export type ResponseLinkOptions = { }; export class ResponseLink extends ApolloLink { - private options: ResponseLinkOptions; - - constructor(private option: ResponseLinkOptions) { + constructor(private options: ResponseLinkOptions) { super(); - this.options = option; } get logger(): Logger | undefined { return this.options.logger; } - async syncChannelState(state: ChannelState): Promise { - try { - const stateUrl = new URL('/channel/state', this.options.authUrl); - const res = await POST<{ consumerSign: string }>(stateUrl.toString(), state); - - if (res.consumerSign) { - this.logger?.debug(`syncChannelState succeed`); - } else { - this.logger?.debug(`syncChannelState failed: ${JSON.stringify(res)}`); - } - } catch (e) { - this.logger?.debug(`syncChannelState failed: ${String(e)}`); - } - } - override request(operation: Operation, forward: NextLink): Observable | null { if (!forward) return null; - const { type, indexer } = operation.getContext(); + const { type, indexer, channelId } = operation.getContext(); return new Observable((observer) => { const subscription = forward(operation).subscribe({ - next: (response: FetchResult> & { state: ChannelState }) => { + next: (response: FetchResult> & { state: State | ChannelState }) => { if (!response.errors || response.errors?.length === 0) { this.options.orderManager.updateScore(indexer, ScoreType.SUCCESS); } @@ -54,16 +36,25 @@ export class ResponseLink extends ApolloLink { if (type === OrderType.flexPlan) { const responseHeaders = operation.getContext().response.headers; if (responseHeaders) { - const channelState = responseHeaders.get('X-Channel-State') - ? (JSON.parse( + let channelState: State | ChannelState; + if (responseHeaders.get('X-Channel-State')) { + try { + channelState = JSON.parse( Base64.decode(responseHeaders.get('X-Channel-State')).toString() - ) as ChannelState) - : response.state; + ) as ChannelState; + } catch { + channelState = { + authorization: responseHeaders.get('X-Channel-State'), + }; + } + } else { + channelState = response.state; + } if (!channelState) { this.logger?.debug("Can't find the channel state information"); } - void this.syncChannelState(channelState); + void this.options.orderManager.syncChannelState(channelId, channelState); } } diff --git a/packages/eth-provider/src/provider.ts b/packages/eth-provider/src/provider.ts index c1cef520..e06b4fae 100644 --- a/packages/eth-provider/src/provider.ts +++ b/packages/eth-provider/src/provider.ts @@ -14,6 +14,7 @@ import { ProjectType, ResponseFormat, ScoreType, + State, generateUniqueId, silentLogger, } from '@subql/network-support'; @@ -39,6 +40,7 @@ interface Options { logger?: Logger; // logger for `AuthLink` fallbackUrl?: string | ConnectionInfo; // fall back service url for `AuthLink` scoreStore?: IStore; // pass store in, so it doesn't get lost between page refresh + stateStore?: IStore; maxRetries?: number; network?: Networkish; } @@ -62,6 +64,8 @@ export class SubqueryAuthedRpcProvider extends JsonRpcProvider { projectType: ProjectType.deployment, logger: this.logger, responseFormat: ResponseFormat.Wrapped, + scoreStore: opt.scoreStore, + stateStore: opt.stateStore, }); } @@ -91,8 +95,7 @@ export class SubqueryAuthedRpcProvider extends JsonRpcProvider { const requestResult: () => Promise = async () => { const requestParams = await this.orderManager.getRequestParams(requestId); if (requestParams) { - // eslint-disable-next-line @typescript-eslint/unbound-method - const { url, headers, type, runner } = requestParams; + const { url, headers, type, runner, channelId } = requestParams; try { const result = await this._send( { @@ -102,6 +105,7 @@ export class SubqueryAuthedRpcProvider extends JsonRpcProvider { request, { type, + channelId, } ); @@ -142,13 +146,15 @@ export class SubqueryAuthedRpcProvider extends JsonRpcProvider { async _send( url: string | ConnectionInfo, request: unknown, - options: { + options?: { type?: OrderType; - } = {} + channelId?: string; + } ): Promise { - const { type } = options; + const type = options?.type; + const channelId = options?.channelId; let result; - let state: ChannelState | undefined; + let state: State | ChannelState | undefined; try { result = await fetchJson(url, JSON.stringify(request), (payload, resp) => { let res = payload; @@ -186,8 +192,8 @@ export class SubqueryAuthedRpcProvider extends JsonRpcProvider { }); throw error; } - if (state && type === OrderType.flexPlan) { - void this.orderManager.syncChannelState(state); + if (type === OrderType.flexPlan && channelId && state) { + void this.orderManager.syncChannelState(channelId, state); } return result; } diff --git a/packages/network-support/CHANGELOG.md b/packages/network-support/CHANGELOG.md index 403117cc..171d3b0f 100644 --- a/packages/network-support/CHANGELOG.md +++ b/packages/network-support/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [1.2.0] 2024-04-29 + ## [1.1.2] 2024-04-08 ## [1.1.1] 2024-03-22 @@ -25,7 +27,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - It's a internal library. -[unreleased]: https://github.com/subquery/network-support/compare/v1.1.2...HEAD +[unreleased]: https://github.com/subquery/network-support/compare/v1.2.0...HEAD +[1.2.0]: https://github.com/subquery/network-support/releases/tag/v1.2.0 [1.1.2]: https://github.com/subquery/network-support/releases/tag/v1.1.2 [1.1.1]: https://github.com/subquery/network-support/releases/tag/v1.1.1 [1.1.0]: https://github.com/subquery/network-support/releases/tag/v1.1.0 diff --git a/packages/network-support/package.json b/packages/network-support/package.json index 14d79bee..8e7609b4 100644 --- a/packages/network-support/package.json +++ b/packages/network-support/package.json @@ -1,6 +1,6 @@ { "name": "@subql/network-support", - "version": "1.1.2", + "version": "1.2.0", "main": "dist/index.js", "author": "SubQuery Pte Limited", "license": "Apache-2.0", @@ -13,7 +13,8 @@ "cross-fetch": "^4.0.0", "js-base64": "^3.7.5", "jwt-decode": "^3.1.2", - "lru-cache": "^10.0.1" + "lru-cache": "^10.0.1", + "semver": "^7.6.0" }, "devDependencies": { "@types/node": "18", diff --git a/packages/network-support/src/fetch.ts b/packages/network-support/src/fetch.ts index ffbc0b2d..aef05b65 100644 --- a/packages/network-support/src/fetch.ts +++ b/packages/network-support/src/fetch.ts @@ -6,6 +6,7 @@ import { customFetch, generateUniqueId, Logger } from './utils'; import { ChannelState, OrderType } from './types'; import { ScoreType } from './scoreManager'; import { Base64 } from 'js-base64'; +import { State } from './stateManager'; interface SystemError extends Error { code?: string | undefined; @@ -55,13 +56,14 @@ export function createFetch( headers: {}, type: OrderType.fallback, runner: 'fallback', + channelId: 'fallback', }; logger?.warn(`fallback to ${orderManager.fallbackServiceUrl}`); } else { throw new FetchError(`no available order`, 'sqn'); } } - const { url, headers, type, runner } = requestParams; + const { url, headers, type, runner, channelId } = requestParams; try { const _res = await customFetch(url, { headers: { @@ -71,7 +73,7 @@ export function createFetch( method: 'post', body: init.body, }); - let state: ChannelState | undefined, res: object; + let state: State | ChannelState | undefined, res: object; if (type === OrderType.flexPlan) { [res, state] = orderManager.extractChannelState( await _res.json(), @@ -91,8 +93,8 @@ export function createFetch( } orderManager.updateScore(runner, ScoreType.SUCCESS); - if (state && type === OrderType.flexPlan) { - void orderManager.syncChannelState(state); + if (type === OrderType.flexPlan && channelId && state) { + void orderManager.syncChannelState(channelId, state); } return { status: _res.status, diff --git a/packages/network-support/src/index.ts b/packages/network-support/src/index.ts index 733a28fd..b9a1a31a 100644 --- a/packages/network-support/src/index.ts +++ b/packages/network-support/src/index.ts @@ -4,5 +4,6 @@ export * from './types'; export * from './orderManager'; export * from './scoreManager'; +export * from './stateManager'; export * from './utils'; export * from './fetch'; diff --git a/packages/network-support/src/orderManager.ts b/packages/network-support/src/orderManager.ts index 2af90621..966036ee 100644 --- a/packages/network-support/src/orderManager.ts +++ b/packages/network-support/src/orderManager.ts @@ -5,7 +5,6 @@ import assert from 'assert'; import { Base64 } from 'js-base64'; import { ScoreManager, ScoreType } from './scoreManager'; import { - ChannelAuth, ChannelState, FlexPlanOrder, Order, @@ -19,6 +18,8 @@ import { WrappedResponse, } from './types'; import { createStore, fetchOrders, isTokenExpired, IStore, Logger, POST } from './utils'; +import { BlockType, State, StateManager } from './stateManager'; +import { Version } from './utils/version'; export enum ResponseFormat { Inline = 'inline', @@ -34,6 +35,7 @@ type Options = { projectType: ProjectType; responseFormat?: ResponseFormat; scoreStore?: IStore; + stateStore?: IStore; selector?: RunnerSelector; timeout?: number; }; @@ -53,6 +55,7 @@ export class OrderManager { private selectedRunnersStore: IStore; private scoreManager: ScoreManager; + private stateManager: StateManager; private authUrl: string; private apikey?: string; @@ -73,6 +76,7 @@ export class OrderManager { logger, projectType, scoreStore, + stateStore, selector, responseFormat, timeout = 60000, @@ -91,10 +95,17 @@ export class OrderManager { fallbackServiceUrl, scoreStore, }); + this.stateManager = new StateManager({ + logger, + authUrl, + projectId, + apikey, + stateStore, + }); - this._init = this.refreshAgreements(); + this._init = this.refreshOrders(); // eslint-disable-next-line @typescript-eslint/no-misused-promises - this.timer = setInterval(() => this.refreshAgreements(), this.interval); + this.timer = setInterval(() => this.refreshOrders(), this.interval); this.selector = selector; this.timeout = timeout; } @@ -136,7 +147,7 @@ export class OrderManager { return this.options.fallbackServiceUrl; } - private async refreshAgreements() { + private async refreshOrders() { try { const orders = await fetchOrders(this.authUrl, this.projectId, this.projectType, this.apikey); if (orders.agreements) { @@ -164,8 +175,9 @@ export class OrderManager { const order = await this.getNextOrder(requestId); const headers: RequestParam['headers'] = {}; if (order) { - const { type, indexer: runner, url, id } = order; + const { type, indexer: runner, url, id, metadata } = order; if (type === OrderType.agreement) { + const channelId = id; headers['X-Indexer-Response-Format'] = this.responseFormat ?? 'inline'; let { token } = order as ServiceAgreementOrder; if (isTokenExpired(token)) { @@ -177,33 +189,41 @@ export class OrderManager { } } headers.authorization = tokenToAuthHeader(token); - return { url, runner, headers, type }; + return { url, runner, channelId, headers, type } as RequestParam; } else if (type === OrderType.flexPlan) { const channelId = id; headers['X-Indexer-Response-Format'] = this.responseFormat ?? 'inline'; try { - this.logger?.debug(`request new signature for runner ${runner}`); - - const tokenUrl = new URL('/channel/sign', this.authUrl); - const signedState = await POST(tokenUrl.toString(), { - deployment: this.projectId, + const higherVersion = Version.gte(metadata.proxyVersion, 'v2.1.0'); + const signedState = await this.stateManager.getSignedState( channelId, - apikey: this.apikey, - }); - - this.logger?.debug(`request new state signature for runner ${runner} success`); + higherVersion ? BlockType.Multiple : BlockType.Single + ); const { authorization } = signedState; - // TODO: debug to confirm headers.authorization = authorization; - + headers['X-Channel-Block'] = higherVersion ? 'multiple' : 'single'; + const convertResult = this.stateManager.tryConvertJson(authorization); + if (convertResult.error) { + throw new Error(convertResult.error); + } + if (higherVersion && this.stateManager.tryConvertJson(authorization).success) { + headers['X-Channel-Block'] = 'single'; + } + if (headers['X-Channel-Block'] === 'single') { + this.logger?.debug( + `requested state signature of [${headers['X-Channel-Block']}] for runner ${runner}` + ); + } return { type, url, runner, + channelId, headers, - }; + } as RequestParam; } catch (error) { - this.logger?.debug(`request new state signature for runner ${runner} failed`); + this.logger?.debug(`request state signature for runner ${runner} failed`); + this.logger?.debug(error); throw new RequestParamError((error as any).message, runner); } } @@ -224,7 +244,10 @@ export class OrderManager { return raceAwait; } - extractChannelState(payload: string | object, headers: Headers): [object, ChannelState, string] { + extractChannelState( + payload: string | object, + headers: Headers + ): [object, State | ChannelState, string] { switch (headers.get('X-Indexer-Response-Format')) { case ResponseFormat.Wrapped: { const body = ( @@ -241,11 +264,15 @@ export class OrderManager { assert(_state, 'invalid response, missing channel state'); const _signature = headers.get('X-Indexer-Sig'); assert(_signature, 'invalid response, missing channel signature'); - return [ - typeof payload === 'string' ? JSON.parse(payload) : payload, - JSON.parse(Base64.decode(_state)), - _signature, - ]; + let state: State | ChannelState; + try { + state = JSON.parse(Base64.decode(_state)) as ChannelState; + } catch (e) { + state = { + authorization: _state, + } as State; + } + return [typeof payload === 'string' ? JSON.parse(payload) : payload, state, _signature]; } case undefined: { const body = typeof payload === 'string' ? JSON.parse(payload) : payload; @@ -261,22 +288,8 @@ export class OrderManager { } } - async syncChannelState(state: ChannelState): Promise { - try { - const stateUrl = new URL('/channel/state', this.authUrl); - const res = await POST<{ consumerSign: string }>(stateUrl.toString(), { - ...state, - apikey: this.apikey, - }); - - if (res.consumerSign) { - this.logger?.debug(`syncChannelState succeed`); - } else { - this.logger?.debug(`syncChannelState failed: ${JSON.stringify(res)}`); - } - } catch (e) { - this.logger?.debug(`syncChannelState failed: ${e}`); - } + async syncChannelState(channelId: string, state: State | ChannelState): Promise { + await this.stateManager.syncState(channelId, state); } private async getNextOrder(requestId: string): Promise { diff --git a/packages/network-support/src/scoreManager.ts b/packages/network-support/src/scoreManager.ts index 943df246..69919f16 100644 --- a/packages/network-support/src/scoreManager.ts +++ b/packages/network-support/src/scoreManager.ts @@ -85,7 +85,9 @@ export class ScoreManager { }; } - this.logger?.debug(`updateScore type: ${runner} ${errorType}`); + if (errorType !== ScoreType.SUCCESS) { + this.logger?.debug(`updateScore type: ${runner} ${errorType}`); + } this.logger?.debug(`updateScore before: ${runner} ${JSON.stringify(score)}`); const delta = scoresDelta[errorType]; diff --git a/packages/network-support/src/stateManager.ts b/packages/network-support/src/stateManager.ts new file mode 100644 index 00000000..08494955 --- /dev/null +++ b/packages/network-support/src/stateManager.ts @@ -0,0 +1,174 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { ChannelAuth, ChannelState } from './types'; +import { Logger, POST } from './utils'; +import { computeMD5 } from './utils/hash'; +import { IStore, createStore } from './utils/store'; + +type Options = { + logger: Logger; + authUrl: string; + projectId: string; + apikey?: string; + stateStore?: IStore; +}; + +export enum BlockType { + Single = 'single', + Multiple = 'multiple', +} + +export enum ActiveType { + Active, + Inactive1, + Inactive2, +} + +export type State = { + authorization: string; +}; + +export class StateManager { + private logger: Logger; + private authUrl: string; + private projectId: string; + private apikey?: string; + private stateStore: IStore; + + constructor(options: Options) { + this.logger = options.logger; + this.authUrl = options.authUrl; + this.projectId = options.projectId; + this.apikey = options.apikey; + this.stateStore = options.stateStore ?? createStore({ ttl: 86_400_000 }); + } + + async getSignedState(channelId: string, block: BlockType): Promise { + const cachedState = block === BlockType.Multiple ? await this.getState(channelId) : undefined; + if (cachedState) { + return cachedState; + } + const signedState = await this.requestState(channelId, block); + const convertResult = this.tryConvertJson(signedState.authorization); + if (block === BlockType.Multiple && signedState.authorization && !convertResult.success) { + await this.setState(channelId, { + authorization: signedState.authorization, + }); + } + return signedState; + } + + private async requestState(channelId: string, block: BlockType): Promise { + const tokenUrl = new URL('/channel/sign', this.authUrl); + this.logger?.debug( + `requesting new state signature [${block}] for deployment ${this.projectId} and channel ${channelId}` + ); + const signedState = await POST(tokenUrl.toString(), { + deployment: this.projectId, + channelId, + apikey: this.apikey, + block, + }); + this.logger?.debug( + `requested new state signature [${block}] for deployment ${this.projectId} and channel ${channelId}` + ); + const state: State = { + authorization: signedState.authorization, + }; + return state; + } + + async syncState(channelId: string, state: State | ChannelState): Promise { + if ('consumerSign' in state) { + // ChannelState + const stateUrl = new URL('/channel/state', this.authUrl); + try { + const res = await POST<{ spent: string }>(stateUrl.toString(), { + ...state, + apikey: this.apikey, + }); + if (res.spent) { + this.logger?.debug(`syncChannelState [single] succeed`); + } else { + this.logger?.debug(`syncChannelState [single] failed: ${JSON.stringify(res)}`); + } + } catch (e) { + this.logger?.debug(`syncChannelState [single] failed: ${e}`); + } + } else { + // State + if (this.getActiveType(state) === ActiveType.Active) { + // await this.setState(channelId, state); + return; + } + try { + await this.setState(channelId, state); + const stateUrl = new URL('/channel/state', this.authUrl); + const res = await POST<{ authorization: string }>( + stateUrl.toString(), + { + apikey: this.apikey, + auth: state.authorization, + block: BlockType.Multiple, + }, + { + auth: state.authorization, + } + ); + if (res.authorization) { + await this.setState(channelId, { + authorization: res.authorization, + }); + this.logger?.debug(`syncChannelState [multiple] succeed`); + } else { + this.logger?.debug(`syncChannelState [multiple] failed: ${JSON.stringify(res)}`); + } + } catch (e) { + this.logger?.debug(`syncChannelState [multiple] failed: ${e}`); + } + } + } + + tryConvertJson(bs64Data: string): { success: boolean; data: any; error: any } { + const data = Buffer.from(bs64Data, 'base64'); + try { + const json = JSON.parse(data.toString('utf-8')); + return { + success: true, + data: json, + error: json.error, + }; + } catch { + return { + success: false, + data: data, + error: undefined, + }; + } + } + + private getActiveType(state: State): ActiveType { + const data = Buffer.from(state.authorization, 'base64'); + return data[0] as ActiveType; + } + + private async getState(channelId: string): Promise { + const key = this.getCacheKey(channelId); + return await this.stateStore.get(key); + } + + private async setState(channelId: string, state: State): Promise { + const key = this.getCacheKey(channelId); + await this.stateStore.set(key, state); + } + + private async removeState(channelId: string): Promise { + const key = this.getCacheKey(channelId); + await this.stateStore.remove(key); + } + + private getCacheKey(channelId: string) { + return `state:${this.projectId}:${channelId}:${computeMD5(this.apikey ?? '')}`; + } +} diff --git a/packages/network-support/src/types.ts b/packages/network-support/src/types.ts index 1e62ca07..7a930f96 100644 --- a/packages/network-support/src/types.ts +++ b/packages/network-support/src/types.ts @@ -14,10 +14,19 @@ export enum OrderType { export type OrderWithType = (Order | ServiceAgreementOrder) & { type: OrderType }; +export interface IndexingMetadata { + subqueryHealthy: boolean; + coordinatorVersion: string; + proxyVersion: string; + lastHeight: number; + targetHeight: number; +} + export interface Order { id: string; indexer: string; url: string; + metadata: IndexingMetadata; } export interface ServiceAgreementOrder extends Order { @@ -46,6 +55,7 @@ export type RequestParam = { headers: { [key: string]: string }; type: OrderType; runner: string; + channelId?: string; }; export class RequestParamError extends Error { diff --git a/packages/network-support/src/utils/hash.ts b/packages/network-support/src/utils/hash.ts new file mode 100644 index 00000000..aedac3bf --- /dev/null +++ b/packages/network-support/src/utils/hash.ts @@ -0,0 +1,10 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import crypto from 'crypto'; + +export function computeMD5(input: string) { + const hash = crypto.createHash('md5'); + hash.update(input); + return hash.digest('hex'); +} diff --git a/packages/network-support/src/utils/query.ts b/packages/network-support/src/utils/query.ts index 61d03430..23a2513a 100644 --- a/packages/network-support/src/utils/query.ts +++ b/packages/network-support/src/utils/query.ts @@ -30,9 +30,13 @@ export const customFetch = ( export async function POST( url: string, - body: Record + body: Record, + headers?: Record ): Promise { - const headers = { 'Content-Type': 'application/json' }; + if (!headers) { + headers = {}; + } + headers['Content-Type'] = 'application/json'; const res = await customFetch(url, { body: JSON.stringify(body), method: 'post', @@ -56,7 +60,7 @@ export async function GET(url: string): Promise { return res.json(); } -interface AgreementsResponse { +interface OrdersResponse { agreements: ServiceAgreementOrder[]; plans: FlexPlanOrder[]; } @@ -68,11 +72,11 @@ export async function fetchOrders( apikey?: string ) { try { - const agreementsURL = new URL(`/orders/${projectType}/${projectId}`, authUrl); + const ordersURL = new URL(`/orders/${projectType}/${projectId}`, authUrl); if (apikey) { - agreementsURL.searchParams.append('apikey', apikey); + ordersURL.searchParams.append('apikey', apikey); } - return await GET(agreementsURL.toString()); + return await GET(ordersURL.toString()); } catch { return { agreements: [], plans: [] }; } diff --git a/packages/network-support/src/utils/version.ts b/packages/network-support/src/utils/version.ts new file mode 100644 index 00000000..d4424f6f --- /dev/null +++ b/packages/network-support/src/utils/version.ts @@ -0,0 +1,30 @@ +// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: Apache-2.0 + +import { SemVer } from 'semver'; + +export class Version { + static parse(version: string): SemVer { + return new SemVer(version); + } + + static gte(version1: string, version2: string): boolean { + return new SemVer(version1).compare(version2) >= 0; + } + + static lte(version1: string, version2: string): boolean { + return new SemVer(version1).compare(version2) <= -1; + } + + static lt(version1: string, version2: string): boolean { + return new SemVer(version1).compare(version2) === -1; + } + + static gt(version1: string, version2: string): boolean { + return new SemVer(version1).compare(version2) === 0; + } + + static eq(version1: string, version2: string): boolean { + return new SemVer(version1).compare(version2) === 0; + } +} diff --git a/yarn.lock b/yarn.lock index 60881880..38047df5 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3283,6 +3283,7 @@ __metadata: js-base64: ^3.7.5 jwt-decode: ^3.1.2 lru-cache: ^10.0.1 + semver: ^7.6.0 typescript: ^4.6.4 languageName: unknown linkType: soft @@ -12478,7 +12479,7 @@ __metadata: languageName: node linkType: hard -"semver@npm:^7.5.4": +"semver@npm:^7.5.4, semver@npm:^7.6.0": version: 7.6.0 resolution: "semver@npm:7.6.0" dependencies: