Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
910 changes: 197 additions & 713 deletions package-lock.json

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions packages/config-service/src/services/globalConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,11 @@ const _CONFIG = {
required: false,
defaultValue: true,
},
ENABLE_NONCE_ORDERING: {
type: 'boolean',
required: false,
defaultValue: false,
},
USE_MIRROR_NODE_MODULARIZED_SERVICES: {
type: 'boolean',
required: false,
Expand Down
4 changes: 3 additions & 1 deletion packages/relay/src/lib/clients/sdkClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import { Logger } from 'pino';

import { prepend0x, weibarHexToTinyBarInt } from '../../formatters';
import { Utils } from '../../utils';
import { CommonService } from '../services';
import { CommonService, LockService } from '../services';
import { HbarLimitService } from '../services/hbarLimitService';
import { ITransactionRecordMetric, RequestDetails, TypedEvents } from '../types';
import constants from './../constants';
Expand Down Expand Up @@ -135,6 +135,7 @@ export class SDKClient {
* @param {string} originalCallerAddress - The address of the original caller making the request.
* @param {number} networkGasPriceInWeiBars - The predefined gas price of the network in weibar.
* @param {number} currentNetworkExchangeRateInCents - The exchange rate in cents of the current network.
* @param {string | null} lockSessionKey - The session key for the acquired lock, null if no lock was acquired.
* @returns {Promise<{ txResponse: TransactionResponse; fileId: FileId | null }>}
* @throws {SDKClientError} Throws an error if no file ID is created or if the preemptive fee check fails.
*/
Expand Down Expand Up @@ -277,6 +278,7 @@ export class SDKClient {
shouldThrowHbarLimit: boolean,
originalCallerAddress: string,
estimatedTxFee?: number,
lockSessionKey?: string,
): Promise<TransactionResponse> {
const txConstructorName = transaction.constructor.name;
let transactionId: string = '';
Expand Down
3 changes: 3 additions & 0 deletions packages/relay/src/lib/eth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
IBlockService,
ICommonService,
IContractService,
LockService,
TransactionPoolService,
TransactionService,
} from './services';
Expand Down Expand Up @@ -126,6 +127,7 @@ export class EthImpl implements Eth {
chain: string,
public readonly cacheService: CacheService,
storage: PendingTransactionStorage,
lockService: LockService,
) {
this.chain = chain;
this.logger = logger;
Expand All @@ -146,6 +148,7 @@ export class EthImpl implements Eth {
logger,
mirrorNodeClient,
transactionPoolService,
lockService,
);
this.accountService = new AccountService(
cacheService,
Expand Down
6 changes: 4 additions & 2 deletions packages/relay/src/lib/relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { DebugImpl } from './debug';
import { RpcMethodDispatcher } from './dispatcher';
import { EthImpl } from './eth';
import { NetImpl } from './net';
import { LockService, LockStrategyFactory } from './services';
import { CacheService } from './services/cacheService/cacheService';
import HAPIService from './services/hapiService/hapiService';
import { HbarLimitService } from './services/hbarLimitService';
Expand Down Expand Up @@ -314,8 +315,8 @@ export class Relay {
duration,
);

// Create HAPI service
const hapiService = new HAPIService(this.logger, this.register, hbarLimitService);
const lockService = new LockService(LockStrategyFactory.create(this.redisClient, this.logger));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the comment above this line is misleading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

const hapiService = new HAPIService(this.logger, this.register, hbarLimitService, lockService);
this.operatorAccountId = hapiService.getOperatorAccountId();

// Create simple service implementations
Expand Down Expand Up @@ -348,6 +349,7 @@ export class Relay {
chainId,
this.cacheService,
storage,
lockService,
);

// Set up event listeners
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { Precheck } from '../../../precheck';
import { ITransactionReceipt, RequestDetails, TypedEvents } from '../../../types';
import { CacheService } from '../../cacheService/cacheService';
import HAPIService from '../../hapiService/hapiService';
import { ICommonService, TransactionPoolService } from '../../index';
import { ICommonService, LockService, TransactionPoolService } from '../../index';
import { ITransactionService } from './ITransactionService';

export class TransactionService implements ITransactionService {
Expand All @@ -46,6 +46,13 @@ export class TransactionService implements ITransactionService {
*/
private readonly hapiService: HAPIService;

/**
* The lock service for managing transaction ordering.
* @private
* @readonly
*/
private readonly lockService: LockService;

/**
* Logger instance for logging messages.
* @private
Expand Down Expand Up @@ -86,6 +93,7 @@ export class TransactionService implements ITransactionService {
logger: Logger,
mirrorNodeClient: MirrorNodeClient,
transactionPoolService: TransactionPoolService,
lockService: LockService,
) {
this.cacheService = cacheService;
this.chain = chain;
Expand All @@ -96,6 +104,7 @@ export class TransactionService implements ITransactionService {
this.mirrorNodeClient = mirrorNodeClient;
this.precheck = new Precheck(mirrorNodeClient, chain, transactionPoolService);
this.transactionPoolService = transactionPoolService;
this.lockService = lockService;
}

/**
Expand Down Expand Up @@ -253,36 +262,60 @@ export class TransactionService implements ITransactionService {

const transactionBuffer = Buffer.from(this.prune0x(transaction), 'hex');
const parsedTx = Precheck.parseRawTransaction(transaction);
const networkGasPriceInWeiBars = Utils.addPercentageBufferToGasPrice(
await this.common.getGasPriceInWeibars(requestDetails),
);
let lockSessionKey: string | undefined;

// Acquire lock FIRST - before any side effects or async operations
// This ensures proper nonce ordering for transactions from the same sender
if (parsedTx.from && ConfigService.get('ENABLE_NONCE_ORDERING')) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as we did in the transaction pool implementation, would it make sense to abstract this feature-specific flag into its own service class?

In this case, ConfigService.get('ENABLE_NONCE_ORDERING') could be encapsulated within LockService, similar to how TransactionPoolService handles its flag.

lockSessionKey = await this.lockService.acquireLock(parsedTx.from);
}

await this.validateRawTransaction(parsedTx, networkGasPriceInWeiBars, requestDetails);
try {
const networkGasPriceInWeiBars = Utils.addPercentageBufferToGasPrice(
await this.common.getGasPriceInWeibars(requestDetails),
);

// Save the transaction to the transaction pool before submitting it to the network
await this.transactionPoolService.saveTransaction(parsedTx.from!, parsedTx);
await this.validateRawTransaction(parsedTx, networkGasPriceInWeiBars, requestDetails);

/**
* Note: If the USE_ASYNC_TX_PROCESSING feature flag is enabled,
* the transaction hash is calculated and returned immediately after passing all prechecks.
* All transaction processing logic is then handled asynchronously in the background.
*/
const useAsyncTxProcessing = ConfigService.get('USE_ASYNC_TX_PROCESSING');
if (useAsyncTxProcessing) {
this.sendRawTransactionProcessor(transactionBuffer, parsedTx, networkGasPriceInWeiBars, requestDetails);
return Utils.computeTransactionHash(transactionBuffer);
}
// Save the transaction to the transaction pool before submitting it to the network
await this.transactionPoolService.saveTransaction(parsedTx.from!, parsedTx);

/**
* Note: If the USE_ASYNC_TX_PROCESSING feature flag is disabled,
* wait for all transaction processing logic to complete before returning the transaction hash.
*/
return await this.sendRawTransactionProcessor(
transactionBuffer,
parsedTx,
networkGasPriceInWeiBars,
requestDetails,
);
/**
* Note: If the USE_ASYNC_TX_PROCESSING feature flag is enabled,
* the transaction hash is calculated and returned immediately after passing all prechecks.
* All transaction processing logic is then handled asynchronously in the background.
*/
const useAsyncTxProcessing = ConfigService.get('USE_ASYNC_TX_PROCESSING');
if (useAsyncTxProcessing) {
// Fire and forget - lock will be released after consensus submission
this.sendRawTransactionProcessor(
transactionBuffer,
parsedTx,
networkGasPriceInWeiBars,
requestDetails,
lockSessionKey,
);
return Utils.computeTransactionHash(transactionBuffer);
}

/**
* Note: If the USE_ASYNC_TX_PROCESSING feature flag is disabled,
* wait for all transaction processing logic to complete before returning the transaction hash.
*/
return await this.sendRawTransactionProcessor(
transactionBuffer,
parsedTx,
networkGasPriceInWeiBars,
requestDetails,
lockSessionKey,
);
} catch (error) {
// Release lock on any error during validation or prechecks
if (lockSessionKey) {
await this.lockService.releaseLock(parsedTx.from!, lockSessionKey);
}
throw error;
}
}

/**
Expand Down Expand Up @@ -472,13 +505,15 @@ export class TransactionService implements ITransactionService {
* @param {EthersTransaction} parsedTx - The parsed Ethereum transaction object.
* @param {number} networkGasPriceInWeiBars - The current network gas price in wei bars.
* @param {RequestDetails} requestDetails - Details of the request for logging and tracking purposes.
* @param {string | null} lockSessionKey - The session key for the acquired lock, null if no lock was acquired.
* @returns {Promise<string | JsonRpcError>} A promise that resolves to the transaction hash if successful, or a JsonRpcError if an error occurs.
*/
async sendRawTransactionProcessor(
transactionBuffer: Buffer,
parsedTx: EthersTransaction,
networkGasPriceInWeiBars: number,
requestDetails: RequestDetails,
lockSessionKey?: string,
): Promise<string | JsonRpcError> {
let sendRawTransactionError: any;

Expand All @@ -493,8 +528,12 @@ export class TransactionService implements ITransactionService {
originalCallerAddress,
networkGasPriceInWeiBars,
requestDetails,
lockSessionKey,
);

if (lockSessionKey) {
await this.lockService.releaseLock(originalCallerAddress.toLowerCase(), lockSessionKey);
}
// Remove the transaction from the transaction pool after submission
await this.transactionPoolService.removeTransaction(originalCallerAddress, parsedTx.serialized);

Expand Down Expand Up @@ -649,6 +688,7 @@ export class TransactionService implements ITransactionService {
originalCallerAddress: string,
networkGasPriceInWeiBars: number,
requestDetails: RequestDetails,
lockSessionKey?: string,
): Promise<{
txSubmitted: boolean;
submittedTransactionId: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { Counter, Registry } from 'prom-client';
import { SDKClient } from '../../clients';
import { ITransactionRecordMetric, RequestDetails, TypedEvents } from '../../types';
import { HbarLimitService } from '../hbarLimitService';
import { LockService } from '../lockService/LockService';

export default class HAPIService {
/**
Expand Down Expand Up @@ -94,7 +95,7 @@ export default class HAPIService {
* @param register - The registry instance for metrics and other services.
* @param hbarLimitService - An HBAR Rate Limit service that tracks hbar expenses and limits.
*/
constructor(logger: Logger, register: Registry, hbarLimitService: HbarLimitService) {
constructor(logger: Logger, register: Registry, hbarLimitService: HbarLimitService, lockService: LockService) {
this.logger = logger;
this.hbarLimitService = hbarLimitService;
this.eventEmitter = new EventEmitter<TypedEvents>();
Expand Down
30 changes: 17 additions & 13 deletions packages/relay/src/lib/services/lockService/LocalLockStrategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@ export class LocalLockStrategy {
* @returns A session key identifying the current lock owner
*/
async acquireLock(address: string): Promise<string> {
const sessionKey = randomUUID();
if (this.logger.isLevelEnabled('debug')) {
this.logger.debug(`Acquiring lock for address ${address}.`);
this.logger.debug(`Acquiring lock for address ${address} and sessionkey ${sessionKey}.`);
}

const sessionKey = randomUUID();
const state = this.getOrCreateState(address);

// Acquire the mutex (this will block until available)
Expand All @@ -83,16 +82,20 @@ export class LocalLockStrategy {
* @param sessionKey - The session key of the lock holder
*/
async releaseLock(address: string, sessionKey: string): Promise<void> {
if (this.logger.isLevelEnabled('debug')) {
const holdTime = Date.now() - state.acquiredAt!;
this.logger.debug(`Releasing lock for address ${address} and session key ${sessionKey} held for ${holdTime}ms.`);
}

const state = this.localLockStates.get(address);

// Ensure only the lock owner can release
if (state?.sessionKey === sessionKey) {
await this.doRelease(state);
this.logger.info(`LocalLockStates ${this.localLockStates}`);
this.logger.info(`The address to release ${address}`);
const state = this.localLockStates.get(address.toLowerCase());
if (state) {
if (this.logger.isLevelEnabled('debug')) {
const holdTime = Date.now() - state.acquiredAt!;
this.logger.debug(
`Releasing lock for address ${address} and session key ${sessionKey} held for ${holdTime}ms.`,
);
}
// Ensure only the lock owner can release
if (state.sessionKey === sessionKey) {
await this.doRelease(state);
}
}
}

Expand All @@ -104,6 +107,7 @@ export class LocalLockStrategy {
*/
private getOrCreateState(address: string): LockState {
address = address.toLowerCase();
this.logger.info(`The address to save ${address}`);
if (!this.localLockStates.has(address)) {
this.localLockStates.set(address, {
mutex: new Mutex(),
Expand Down
Loading