diff --git a/idea/test-balance/.env.example b/idea/test-balance/.env.example index cf33b7dfe6..cb14f07c46 100644 --- a/idea/test-balance/.env.example +++ b/idea/test-balance/.env.example @@ -7,6 +7,4 @@ DB_HOST=localhost #gear WS_PROVIDER=wss://rpc-node.gear-tech.io:443 TEST_ACCOUNT_SEED=0x8999321253e3a76e31d91767d0e2a915223210e008089a0d34e1919c0d84da5 -ROOT_ACCOUNT_SEED=//Alice -TEST_ACCOUNT_BALANCE=10000000000 TEST_BALANCE_VALUE=1000000 diff --git a/idea/test-balance/src/config/configuration.ts b/idea/test-balance/src/config.ts similarity index 88% rename from idea/test-balance/src/config/configuration.ts rename to idea/test-balance/src/config.ts index 8c0fc9dd75..5051057ebd 100644 --- a/idea/test-balance/src/config/configuration.ts +++ b/idea/test-balance/src/config.ts @@ -19,8 +19,6 @@ export default { gear: { providerAddresses: checkEnv('WS_PROVIDER').split(','), accountSeed: checkEnv('TEST_ACCOUNT_SEED'), - rootAccountSeed: checkEnv('ROOT_ACCOUNT_SEED'), - accountBalance: checkEnv('TEST_ACCOUNT_BALANCE'), balanceToTransfer: checkEnv('TEST_BALANCE_VALUE'), }, rabbitmq: { diff --git a/idea/test-balance/src/database/app-data-source.ts b/idea/test-balance/src/database/app-data-source.ts index 3dc6a6ea08..df1a643b49 100644 --- a/idea/test-balance/src/database/app-data-source.ts +++ b/idea/test-balance/src/database/app-data-source.ts @@ -1,6 +1,6 @@ import { DataSource } from 'typeorm'; -import config from '../config/configuration'; +import config from '../config'; import { TransferBalance } from './entities/transfer.entity'; export const AppDataSource = new DataSource({ diff --git a/idea/test-balance/src/gear/connection.ts b/idea/test-balance/src/gear/connection.ts deleted file mode 100644 index e058da8687..0000000000 --- a/idea/test-balance/src/gear/connection.ts +++ /dev/null @@ -1,60 +0,0 @@ -import { GearApi, HexString } from '@gear-js/api'; -import { logger } from '@gear-js/common'; - -import { changeStatus } from '../healthcheck.router'; -import config from '../config/configuration'; -import { producer } from '../rabbitmq/producer'; - -export let api: GearApi; -let genesisHash: HexString; - -const addresses = config.gear.providerAddresses; -// max number of reconnections for each node address -const MAX_RECONNECTIONS = 10; -let reconnectionsCounter = 0; - -let providerAddress = addresses[0]; - -export async function connect() { - if (!providerAddress) { - throw new Error('There are no node addresses to connect to'); - } - - api = new GearApi({ providerAddress }); - - try { - await api.isReadyOrError; - } catch (error) { - logger.error(`Failed to connect to ${providerAddress}`, { error }); - await reconnect(); - } - await api.isReady; - api.on('disconnected', () => { - producer.sendDeleteGenesis(genesisHash); - reconnect(); - }); - genesisHash = api.genesisHash.toHex(); - logger.info(`Connected to ${await api.chain()} with genesis ${genesisHash}`); - changeStatus('ws'); -} - -async function reconnect(): Promise { - if (api) { - await api.disconnect(); - api = null; - } - - reconnectionsCounter++; - if (reconnectionsCounter > MAX_RECONNECTIONS) { - providerAddress = addresses.filter((address) => address !== providerAddress)[0]; - reconnectionsCounter = 0; - } - - logger.info('Attempting to reconnect'); - changeStatus('ws'); - return connect(); -} - -export function getGenesisHash() { - return genesisHash; -} diff --git a/idea/test-balance/src/gear/index.ts b/idea/test-balance/src/gear/index.ts deleted file mode 100644 index 44dc69af7b..0000000000 --- a/idea/test-balance/src/gear/index.ts +++ /dev/null @@ -1,81 +0,0 @@ -import { TransferData } from '@gear-js/api'; -import { KeyringPair } from '@polkadot/keyring/types'; -import { BN } from '@polkadot/util'; -import { logger } from '@gear-js/common'; - -import config from '../config/configuration'; -import { transferService } from '../transfer.service'; -import { createAccount } from './utils'; -import { connect, api, getGenesisHash } from './connection'; - -interface ResponseTransferBalance { - status?: string; - transferredBalance?: string; - error?: string; -} - -let tbAccount: KeyringPair; -let prefundedAcc: KeyringPair; -let tbAccBalance: BN; -let balanceToTransfer: BN; - -async function init() { - tbAccount = await createAccount(config.gear.accountSeed); - prefundedAcc = await createAccount(config.gear.rootAccountSeed); - tbAccBalance = new BN(config.gear.accountBalance); - balanceToTransfer = new BN(config.gear.balanceToTransfer); - - await connect(); - - if (await isSmallAccountBalance()) { - await transferBalance(tbAccount.address, prefundedAcc, tbAccBalance); - } -} - -async function transferBalance( - to: string, - from: KeyringPair = tbAccount, - balance: BN = balanceToTransfer, -): Promise { - logger.info(`Transfer value`, { from: from.address, to, amount: balance.toString() }); - try { - await transfer(to, from, balance); - } catch (error) { - logger.error('Transfer balance error', { error }); - return { error: `Transfer balance from ${from} to ${to} failed` }; - } - if (to !== tbAccount.address) { - await transferService.setTransferDate(to, getGenesisHash()); - } - return { status: 'ok', transferredBalance: balance.toString() }; -} - -async function transfer( - to: string, - from: KeyringPair = tbAccount, - balance: BN = balanceToTransfer, -): Promise { - const tx = api.balance.transfer(to, balance); - return new Promise((resolve, reject) => { - tx.signAndSend(from, ({ events }) => { - events.forEach(({ event }) => { - const { method, data } = event; - if (method === 'Transfer') { - resolve(data as TransferData); - } else if (method === 'ExtrinsicFailed') { - reject(api.getExtrinsicFailedError(event).docs.filter(Boolean).join('. ')); - } - }); - }); - }); -} - -async function isSmallAccountBalance(): Promise { - const balance = await api.balance.findOut(tbAccount.address); - if (balance.lt(tbAccBalance)) { - return true; - } - return false; -} - -export const gearService = { init, getGenesisHash, transferBalance }; diff --git a/idea/test-balance/src/main.ts b/idea/test-balance/src/main.ts index d0bbb44df6..21a854de15 100644 --- a/idea/test-balance/src/main.ts +++ b/idea/test-balance/src/main.ts @@ -1,13 +1,13 @@ import express from 'express'; -import config from './config/configuration'; +import config from './config'; import { changeStatus, healthcheckRouter } from './healthcheck.router'; import { connectToDB } from './database/app-data-source'; -import { gearService } from './gear'; -import { initAMQ } from './rabbitmq/rmq'; -import { transferProcess } from './common/transfer-balance-process'; +import { transferProcess } from './transfer-balance-process'; import { logger } from '@gear-js/common'; +import { transferService } from './transfer.service'; +import { rmqService } from './rmq'; const app = express(); @@ -23,8 +23,8 @@ const startApp = async () => { await connectToDB(); changeStatus('database'); - await gearService.init(); - await initAMQ(); + await transferService.init(); + await rmqService.init(); changeStatus('rabbitMQ'); transferProcess(); }; diff --git a/idea/test-balance/src/rabbitmq/consumer.ts b/idea/test-balance/src/rabbitmq/consumer.ts deleted file mode 100644 index 41570fa249..0000000000 --- a/idea/test-balance/src/rabbitmq/consumer.ts +++ /dev/null @@ -1,44 +0,0 @@ -import { Channel, Replies } from 'amqplib'; -import { TEST_BALANCE_METHODS, logger } from '@gear-js/common'; - -import { producer } from './producer'; -import { gearService } from '../gear'; -import { requests } from '../common/transfer-balance-process'; - -export async function directMessageConsumer(channel: Channel, queue: string): Promise { - try { - await channel.consume( - queue, - async (message) => { - const payload = JSON.parse(message.content.toString()); - const method = message.properties.headers.method; - const correlationId = message.properties.correlationId; - - if (method === TEST_BALANCE_METHODS.TEST_BALANCE_GET && payload.genesis === gearService.getGenesisHash()) { - requests.push({ payload, correlationId }); - } - }, - { noAck: true }, - ); - } catch (error) { - logger.error(`Direct exchange consumer error`, { error }); - } -} - -export async function topicMessageConsumer(channel: Channel, repliesAssertQueue: Replies.AssertQueue): Promise { - try { - await channel.consume( - repliesAssertQueue.queue, - async (message) => { - if (!message) { - return; - } - - producer.sendGenesis(gearService.getGenesisHash()); - }, - { noAck: true }, - ); - } catch (error) { - logger.error(`Topic exchange consumer error`, { error }); - } -} diff --git a/idea/test-balance/src/rabbitmq/producer.ts b/idea/test-balance/src/rabbitmq/producer.ts deleted file mode 100644 index 88caccedec..0000000000 --- a/idea/test-balance/src/rabbitmq/producer.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { RMQExchange, RMQQueue, RMQServiceAction, RMQServices } from '@gear-js/common'; - -import { mainChannelAMQP } from './rmq'; - -function sendGenesis(genesis: string): void { - const messageBuff = JSON.stringify({ service: RMQServices.TEST_BALANCE, action: RMQServiceAction.ADD, genesis }); - mainChannelAMQP.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESISES, Buffer.from(messageBuff)); -} - -function sendDeleteGenesis(genesis: string): void { - const messageBuff = JSON.stringify({ service: RMQServices.TEST_BALANCE, action: RMQServiceAction.DELETE, genesis }); - mainChannelAMQP.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESISES, Buffer.from(messageBuff)); -} - -function sendMessage(exchange: RMQExchange, queue: RMQQueue, correlationId: string, params: any): void { - const messageBuff = JSON.stringify(params); - mainChannelAMQP.publish(exchange, queue, Buffer.from(messageBuff), { - correlationId, - }); -} - -export const producer = { sendGenesis, sendMessage, sendDeleteGenesis }; diff --git a/idea/test-balance/src/rabbitmq/rmq.ts b/idea/test-balance/src/rabbitmq/rmq.ts deleted file mode 100644 index 74d47f5dfc..0000000000 --- a/idea/test-balance/src/rabbitmq/rmq.ts +++ /dev/null @@ -1,55 +0,0 @@ -import { Channel, connect, Connection } from 'amqplib'; -import { logger, RMQExchange, RMQQueue, RMQServiceAction, RMQServices } from '@gear-js/common'; - -import config from '../config/configuration'; -import { gearService } from '../gear'; -import { directMessageConsumer, topicMessageConsumer } from './consumer'; - -export let connectionAMQP: Connection; -export let mainChannelAMQP: Channel; -export let topicChannelAMQP: Channel; - -export async function initAMQ(): Promise { - try { - connectionAMQP = await connect(config.rabbitmq.url); - } catch (error) { - logger.error('RabbitMQ connection error', { error }); - } - - mainChannelAMQP = await connectionAMQP.createChannel(); - topicChannelAMQP = await connectionAMQP.createChannel(); - const directExchange = RMQExchange.DIRECT_EX; - const topicExchange = RMQExchange.TOPIC_EX; - const genesis = gearService.getGenesisHash(); - const directExchangeType = 'direct'; - const topicExchangeType = 'topic'; - const routingKey = `${RMQServices.TEST_BALANCE}.${genesis}`; - - //send genesis - const messageBuff = JSON.stringify({ service: RMQServices.TEST_BALANCE, action: RMQServiceAction.ADD, genesis }); - mainChannelAMQP.publish(directExchange, RMQQueue.GENESISES, Buffer.from(messageBuff)); - - await mainChannelAMQP.assertExchange(directExchange, directExchangeType); - await mainChannelAMQP.assertExchange(topicExchange, topicExchangeType); - - const assertTopicQueue = await topicChannelAMQP.assertQueue(`${RMQServices.TEST_BALANCE}t.${genesis}`, { - durable: false, - autoDelete: true, - exclusive: false, - }); - await mainChannelAMQP.assertQueue(routingKey, { - durable: false, - autoDelete: false, - exclusive: false, - }); - await mainChannelAMQP.bindQueue(routingKey, directExchange, routingKey); - await mainChannelAMQP.bindQueue(assertTopicQueue.queue, topicExchange, `${RMQServices.TEST_BALANCE}.genesises`); - - await directMessageConsumer(mainChannelAMQP, routingKey); - await topicMessageConsumer(topicChannelAMQP, assertTopicQueue); - - connectionAMQP.on('close', (error) => { - logger.error('RabbitMQ connection closed', { error }); - process.exit(1); - }); -} diff --git a/idea/test-balance/src/rmq.ts b/idea/test-balance/src/rmq.ts new file mode 100644 index 0000000000..47a2a8c602 --- /dev/null +++ b/idea/test-balance/src/rmq.ts @@ -0,0 +1,114 @@ +import { Channel, connect, Connection, Replies } from 'amqplib'; +import { logger, RMQExchange, RMQQueue, RMQServiceAction, RMQServices, TEST_BALANCE_METHODS } from '@gear-js/common'; + +import config from './config'; +import { transferService } from './transfer.service'; +import { requests } from './transfer-balance-process'; + +export class RMQService { + private connection: Connection; + private mainChannel: Channel; + private topicChannel: Channel; + + async init() { + try { + this.connection = await connect(config.rabbitmq.url); + } catch (error) { + logger.error('RabbitMQ connection error', { error }); + } + + this.connection.on('close', (error) => { + logger.error('RabbitMQ connection closed', { error }); + process.exit(1); + }); + + this.mainChannel = await this.connection.createChannel(); + this.topicChannel = await this.connection.createChannel(); + const directExchange = RMQExchange.DIRECT_EX; + const topicExchange = RMQExchange.TOPIC_EX; + const genesis = transferService.genesisHash; + const directExchangeType = 'direct'; + const topicExchangeType = 'topic'; + const routingKey = `${RMQServices.TEST_BALANCE}.${genesis}`; + + //send genesis + const messageBuff = JSON.stringify({ service: RMQServices.TEST_BALANCE, action: RMQServiceAction.ADD, genesis }); + this.mainChannel.publish(directExchange, RMQQueue.GENESISES, Buffer.from(messageBuff)); + + await this.mainChannel.assertExchange(directExchange, directExchangeType); + await this.mainChannel.assertExchange(topicExchange, topicExchangeType); + + const assertTopicQueue = await this.topicChannel.assertQueue(`${RMQServices.TEST_BALANCE}t.${genesis}`, { + durable: false, + autoDelete: true, + exclusive: false, + }); + await this.mainChannel.assertQueue(routingKey, { + durable: false, + autoDelete: false, + exclusive: false, + }); + await this.mainChannel.bindQueue(routingKey, directExchange, routingKey); + await this.mainChannel.bindQueue(assertTopicQueue.queue, topicExchange, `${RMQServices.TEST_BALANCE}.genesises`); + + await this.directMessageConsumer(routingKey); + await this.topicMessageConsumer(assertTopicQueue); + } + + async directMessageConsumer(queue: string): Promise { + try { + await this.mainChannel.consume( + queue, + async (message) => { + const payload = JSON.parse(message.content.toString()); + const method = message.properties.headers.method; + const correlationId = message.properties.correlationId; + + if (method === TEST_BALANCE_METHODS.TEST_BALANCE_GET && payload.genesis === transferService.genesisHash) { + requests.push({ payload, correlationId }); + } + }, + { noAck: true }, + ); + } catch (error) { + logger.error(`Direct exchange consumer error`, { error }); + } + } + + async topicMessageConsumer(repliesAssertQueue: Replies.AssertQueue): Promise { + try { + await this.topicChannel.consume( + repliesAssertQueue.queue, + async (message) => { + if (!message) { + return; + } + + this.sendGenesis(transferService.genesisHash); + }, + { noAck: true }, + ); + } catch (error) { + logger.error(`Topic exchange consumer error`, { error }); + } + } + + sendGenesis(genesis: string): void { + const messageBuff = JSON.stringify({ service: RMQServices.TEST_BALANCE, action: RMQServiceAction.ADD, genesis }); + this.mainChannel.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESISES, Buffer.from(messageBuff)); + } + + sendDeleteGenesis(genesis: string): void { + const messageBuff = JSON.stringify({ service: RMQServices.TEST_BALANCE, action: RMQServiceAction.DELETE, genesis }); + this.mainChannel.publish(RMQExchange.DIRECT_EX, RMQQueue.GENESISES, Buffer.from(messageBuff)); + } + + sendMessage(exchange: RMQExchange, queue: RMQQueue, correlationId: string, params: any): void { + const messageBuff = JSON.stringify(params); + this.mainChannel.publish(exchange, queue, Buffer.from(messageBuff), { + correlationId, + }); + } +} + +export const rmqService = new RMQService(); diff --git a/idea/test-balance/src/common/transfer-balance-process.ts b/idea/test-balance/src/transfer-balance-process.ts similarity index 81% rename from idea/test-balance/src/common/transfer-balance-process.ts rename to idea/test-balance/src/transfer-balance-process.ts index b96e7fd9ab..f28ba19834 100644 --- a/idea/test-balance/src/common/transfer-balance-process.ts +++ b/idea/test-balance/src/transfer-balance-process.ts @@ -1,9 +1,8 @@ import { logger, JSONRPC_ERRORS, RMQExchange, RMQQueue } from '@gear-js/common'; import { EventEmitter } from 'node:events'; -import { transferService } from '../transfer.service'; -import { gearService } from '../gear'; -import { producer } from '../rabbitmq/producer'; +import { transferService } from './transfer.service'; +import { rmqService } from './rmq'; interface TBRequestParams { payload: { address: string; genesis: string }; @@ -45,7 +44,7 @@ export async function transferProcess(): Promise { try { const isPossibleToTransfer = await transferService.isPossibleToTransfer(address, genesis); if (isPossibleToTransfer) { - const transferBalance = await gearService.transferBalance(address); + const transferBalance = await transferService.transferBalance(address); result = { result: transferBalance }; } else { result = { error: JSONRPC_ERRORS.TransferLimitReached.name }; @@ -54,6 +53,6 @@ export async function transferProcess(): Promise { logger.error(error.message, { stack: error.stack }); result = { error: JSONRPC_ERRORS.InternalError.name }; } - producer.sendMessage(RMQExchange.DIRECT_EX, RMQQueue.REPLIES, correlationId, result); + rmqService.sendMessage(RMQExchange.DIRECT_EX, RMQQueue.REPLIES, correlationId, result); } } diff --git a/idea/test-balance/src/transfer.service.ts b/idea/test-balance/src/transfer.service.ts index 655fdf78e7..2ebb1e7c35 100644 --- a/idea/test-balance/src/transfer.service.ts +++ b/idea/test-balance/src/transfer.service.ts @@ -1,9 +1,82 @@ +import { KeyringPair } from '@polkadot/keyring/types'; +import { BN } from '@polkadot/util'; +import { logger } from '@gear-js/common'; import { plainToClass } from 'class-transformer'; import { TransferBalance } from './database/entities/transfer.entity'; import { transferRepository } from './database/repositories/transfer.repository'; +import { createAccount } from './utils'; +import config from './config'; +import { GearApi, TransferData } from '@gear-js/api'; +import { changeStatus } from './healthcheck.router'; +import { rmqService } from './rmq'; + +interface ResponseTransferBalance { + status?: string; + transferredBalance?: string; + error?: string; +} + +let providerAddress = config.gear.providerAddresses[0]; +const MAX_RECONNECTIONS = 10; +let reconnectionsCounter = 0; + +export class TransferService { + private account: KeyringPair; + private balanceToTransfer: BN; + private api: GearApi; + private genesis: string; + + async init() { + this.account = await createAccount(config.gear.accountSeed); + this.balanceToTransfer = new BN(config.gear.balanceToTransfer); + await this.connect(); + } + + get genesisHash() { + return this.genesis; + } + + async connect() { + if (!providerAddress) { + throw new Error('There are no node addresses to connect to'); + } + + this.api = new GearApi({ providerAddress }); + + try { + await this.api.isReadyOrError; + } catch (error) { + logger.error(`Failed to connect to ${providerAddress}`, { error }); + await this.reconnect(); + } + await this.api.isReady; + this.api.on('disconnected', () => { + rmqService.sendDeleteGenesis(this.genesis); + this.reconnect(); + }); + this.genesis = this.api.genesisHash.toHex(); + logger.info(`Connected to ${await this.api.chain()} with genesis ${this.genesis}`); + changeStatus('ws'); + } + + async reconnect(): Promise { + if (this.api) { + await this.api.disconnect(); + this.api = null; + } + + reconnectionsCounter++; + if (reconnectionsCounter > MAX_RECONNECTIONS) { + providerAddress = config.gear.providerAddresses.filter((address) => address !== providerAddress)[0]; + reconnectionsCounter = 0; + } + + logger.info('Attempting to reconnect'); + changeStatus('ws'); + return this.connect(); + } -const transferService = { async setTransferDate(account: string, genesis: string): Promise { const transferBalanceTypeDB = plainToClass(TransferBalance, { account: `${account}.${genesis}`, @@ -11,7 +84,7 @@ const transferService = { }); return transferRepository.save(transferBalanceTypeDB); - }, + } async isPossibleToTransfer(account: string, genesis: string): Promise { const transfer = await transferRepository.getByAccountAndGenesis(account, genesis); @@ -20,21 +93,42 @@ const transferService = { return true; } - if (isLastTransferEarlierThanToday(transfer)) { - return true; + return isLastTransferEarlierThanToday(transfer); + } + + async transferBalance(to: string): Promise { + logger.info(`Transfer balance`, { from: this.account.address, to, amount: this.balanceToTransfer.toString() }); + try { + await this.transfer(to); + } catch (error) { + logger.error('Transfer balance error', { error: error.message, stack: error.stack }); + return { error: `Transfer balance to ${to} failed` }; } - }, -}; + await this.setTransferDate(to, this.genesis); + return { status: 'ok', transferredBalance: this.balanceToTransfer.toString() }; + } -function isLastTransferEarlierThanToday(transfer: TransferBalance): boolean { - const HOURS = 0; - const MIN = 0; - const SEC = 0; - const MS = 0; + async transfer(to: string): Promise { + const tx = this.api.balance.transfer(to, this.balanceToTransfer); + return new Promise((resolve, reject) => { + tx.signAndSend(this.account, ({ events }) => { + events.forEach(({ event }) => { + const { method, data } = event; + if (method === 'Transfer') { + resolve(data as TransferData); + } else if (method === 'ExtrinsicFailed') { + reject(this.api.getExtrinsicFailedError(event).docs.filter(Boolean).join('. ')); + } + }); + }); + }); + } +} - const now = new Date().setHours(HOURS, MIN, SEC, MS); +function isLastTransferEarlierThanToday(transfer: TransferBalance): boolean { + const now = new Date().setHours(0, 0, 0, 0); - return transfer.lastTransfer.setHours(HOURS, MIN, SEC, MS) < now; + return transfer.lastTransfer.setHours(0, 0, 0, 0) < now; } -export { transferService }; +export const transferService = new TransferService(); diff --git a/idea/test-balance/src/gear/utils.ts b/idea/test-balance/src/utils.ts similarity index 100% rename from idea/test-balance/src/gear/utils.ts rename to idea/test-balance/src/utils.ts diff --git a/idea/test-balance/tsconfig.json b/idea/test-balance/tsconfig.json index 7e058e86f3..759fb69bef 100644 --- a/idea/test-balance/tsconfig.json +++ b/idea/test-balance/tsconfig.json @@ -7,4 +7,4 @@ "allowSyntheticDefaultImports": true, "strict": false } -} \ No newline at end of file +}