Skip to content

Commit d9a6112

Browse files
authored
Merge pull request #94 from windingtree/develop
LevelDB storage fix and new event handlers on sdk-contracts
2 parents 236955e + 9341b87 commit d9a6112

File tree

5 files changed

+2747
-7286
lines changed

5 files changed

+2747
-7286
lines changed

packages/contracts-manger/src/index.ts

Lines changed: 126 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import {
22
Address,
33
Hash,
4-
Abi,
54
Account,
65
InferFunctionName,
76
GetFunctionArgs,
@@ -17,12 +16,15 @@ import {
1716
zeroAddress,
1817
GetFilterLogsReturnType,
1918
InferEventName,
19+
Abi,
20+
CreateContractEventFilterParameters,
2021
} from 'viem';
2122
import { stringify } from 'superjson';
2223
import {
2324
marketABI,
2425
erc20_18ABI,
2526
entitiesRegistryABI,
27+
configABI,
2628
kinds,
2729
} from '@windingtree/contracts';
2830
import {
@@ -52,6 +54,11 @@ export interface ProtocolContractsOptions {
5254
walletClient?: WalletClient;
5355
}
5456

57+
/**
58+
* Generic filter options type.
59+
*/
60+
type FilterOptions = CreateContractEventFilterParameters<Abi, string>;
61+
5562
/**
5663
* Common API of the protocol smart contracts set
5764
*
@@ -688,69 +695,154 @@ export class ProtocolContracts<
688695
}
689696

690697
/**
691-
* Subscribes to specific events emitted by the market smart contract.
698+
* Subscribes to events from a specified smart contract.
692699
*
693-
* @param eventName - The name of the event to listen for.
694-
* @param onLogs - Callback function to handle the event logs.
695-
* @param fromBlock - (Optional) The starting block number for listening to events.
696-
* @param pollInterval - (Optional) Interval in milliseconds for polling new events.
697-
* @returns A function to unsubscribe from the event.
698-
* @template TEventName - Generic type parameter for event name.
700+
* @template TAbi The ABI type of the contract.
701+
* @template TEventName The name of the event to subscribe to.
702+
* @param {TAbi} abi The ABI of the contract to subscribe to.
703+
* @param {Address} address The address of the contract.
704+
* @param {InferEventName<TAbi, TEventName>} eventName The name of the event.
705+
* @param {(logs: GetFilterLogsReturnType<TAbi>) => void} onLogs Callback to execute when logs are received.
706+
* @param {bigint} [fromBlock] The block number from which to start listening for events.
707+
* @param {number} [pollInterval=1000] The interval in milliseconds at which to poll for new events.
708+
* @returns {Promise<() => void>} A promise that resolves to an unsubscribe function.
709+
* @private
699710
*/
700-
async subscribeMarket<TEventName extends string | undefined = undefined>(
701-
eventName: InferEventName<typeof marketABI, TEventName>,
702-
onLogs: (logs: GetFilterLogsReturnType<typeof marketABI>) => void,
711+
private async subscribeToEvents<
712+
// eslint-disable-next-line @typescript-eslint/no-redundant-type-constituents
713+
const TAbi extends Abi | readonly unknown[] = Abi,
714+
TEventName extends string | undefined = undefined,
715+
>(
716+
abi: TAbi,
717+
address: Address,
718+
eventName: InferEventName<TAbi, TEventName>,
719+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
720+
onLogs: (logs: any) => void,
703721
fromBlock?: bigint,
704-
pollInterval = 1000,
722+
pollInterval: number = 1000,
705723
): Promise<() => void> {
706724
let blockNumber = await this.publicClient.getBlockNumber();
707725
let isUnsubscribed = false;
708726
let timeoutId: NodeJS.Timeout;
709727

710-
// Use the specified fromBlock or the current block number
728+
// Adjust starting block number if fromBlock is provided and valid
711729
if (fromBlock && fromBlock < blockNumber) {
712730
blockNumber = fromBlock;
713731
}
714732

715733
// Function to fetch and process logs
716734
const getLogs = async () => {
717-
if (isUnsubscribed) return; // Stop if unsubscribed
735+
if (isUnsubscribed) return;
718736

719-
// Create an event filter
720737
const filter = await this.publicClient.createContractEventFilter({
721-
abi: marketABI,
722-
address: this.contracts['market'].address,
723-
eventName,
738+
abi,
739+
address,
724740
fromBlock: blockNumber,
725741
strict: true,
726-
});
742+
eventName: eventName,
743+
} as unknown as FilterOptions);
727744

728-
// Retrieve logs based on the filter
729745
const logs = await this.publicClient.getFilterLogs({ filter });
730746

731-
// Process logs and update the block number
732747
if (logs.length > 0) {
733-
const maxBlockNumber = logs.reduce(
734-
(max, log) => (log.blockNumber > max ? log.blockNumber : max),
735-
BigInt(0),
736-
);
737-
blockNumber = maxBlockNumber + BigInt(1);
738748
onLogs(logs);
749+
// Update the block number to the next after the last log's block
750+
const bn = logs[logs.length - 1].blockNumber;
751+
blockNumber = (bn !== null ? bn : BigInt(0)) + BigInt(1);
739752
}
740753

741-
// Schedule the next call
742-
timeoutId = setTimeout(() => {
743-
getLogs().catch(logger.error);
744-
}, pollInterval);
754+
if (!isUnsubscribed) {
755+
timeoutId = setTimeout(() => {
756+
getLogs().catch(logger.error);
757+
}, pollInterval);
758+
}
745759
};
746760

747-
// Initial call to start the polling process
761+
// Initial call to start polling
748762
getLogs().catch(logger.error);
749763

750-
// Return the unsubscribe function
764+
// Return unsubscribe function
751765
return () => {
752-
isUnsubscribed = true; // Set the flag to stop further polling
753-
clearTimeout(timeoutId); // Clear the timeout to stop scheduled calls
766+
isUnsubscribed = true;
767+
clearTimeout(timeoutId);
754768
};
755769
}
770+
771+
/**
772+
* Subscribes to market contract events.
773+
*
774+
* @template TEventName Type of event name.
775+
* @param {InferEventName<typeof marketABI, TEventName>} eventName The event name to subscribe to.
776+
* @param {(logs: GetFilterLogsReturnType<typeof marketABI>) => void} onLogs Callback for when logs are received.
777+
* @param {bigint} [fromBlock] Starting block number for listening for events.
778+
* @param {number} [pollInterval=1000] Polling interval in milliseconds.
779+
* @returns {Promise<() => void>} Unsubscribe function.
780+
*/
781+
async subscribeMarket<TEventName extends string | undefined = undefined>(
782+
eventName: InferEventName<typeof marketABI, TEventName>,
783+
onLogs: (logs: GetFilterLogsReturnType<typeof marketABI>) => void,
784+
fromBlock?: bigint,
785+
pollInterval: number = 1000,
786+
): Promise<() => void> {
787+
return this.subscribeToEvents(
788+
marketABI,
789+
this.contracts['market'].address,
790+
eventName,
791+
onLogs,
792+
fromBlock,
793+
pollInterval,
794+
);
795+
}
796+
797+
/**
798+
* Subscribes to entities contract events.
799+
*
800+
* @template TEventName Type of event name.
801+
* @param {InferEventName<typeof entitiesRegistryABI, TEventName>} eventName The event name to subscribe to.
802+
* @param {(logs: GetFilterLogsReturnType<typeof entitiesRegistryABI>) => void} onLogs Callback for when logs are received.
803+
* @param {bigint} [fromBlock] Starting block number for listening for events.
804+
* @param {number} [pollInterval=1000] Polling interval in milliseconds.
805+
* @returns {Promise<() => void>} Unsubscribe function.
806+
*/
807+
async subscribeEntities<TEventName extends string | undefined = undefined>(
808+
eventName: InferEventName<typeof entitiesRegistryABI, TEventName>,
809+
onLogs: (logs: GetFilterLogsReturnType<typeof entitiesRegistryABI>) => void,
810+
fromBlock?: bigint,
811+
pollInterval: number = 1000,
812+
): Promise<() => void> {
813+
return this.subscribeToEvents(
814+
entitiesRegistryABI,
815+
this.contracts['entities'].address,
816+
eventName,
817+
onLogs,
818+
fromBlock,
819+
pollInterval,
820+
);
821+
}
822+
823+
/**
824+
* Subscribes to config contract events.
825+
*
826+
* @template TEventName Type of event name.
827+
* @param {InferEventName<typeof configABI, TEventName>} eventName The event name to subscribe to.
828+
* @param {(logs: GetFilterLogsReturnType<typeof configABI>) => void} onLogs Callback for when logs are received.
829+
* @param {bigint} [fromBlock] Starting block number for listening for events.
830+
* @param {number} [pollInterval=1000] Polling interval in milliseconds.
831+
* @returns {Promise<() => void>} Unsubscribe function.
832+
*/
833+
async subscribeConfig<TEventName extends string | undefined = undefined>(
834+
eventName: InferEventName<typeof configABI, TEventName>,
835+
onLogs: (logs: GetFilterLogsReturnType<typeof configABI>) => void,
836+
fromBlock?: bigint,
837+
pollInterval: number = 1000,
838+
): Promise<() => void> {
839+
return this.subscribeToEvents(
840+
configABI,
841+
this.contracts['config'].address,
842+
eventName,
843+
onLogs,
844+
fromBlock,
845+
pollInterval,
846+
);
847+
}
756848
}

packages/pubsub/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
"dependencies": {
3131
"@chainsafe/libp2p-gossipsub": "^10.1.0",
3232
"@libp2p/interface": "^0.1.3",
33+
"@libp2p/peer-id": "^4.0.6",
3334
"@multiformats/multiaddr": "^12.1.3",
3435
"@windingtree/sdk-constants": "workspace:*",
3536
"@windingtree/sdk-logger": "workspace:*",

packages/pubsub/src/index.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { decodeText } from '@windingtree/sdk-utils';
1919
import { CashedMessageEntry, MessagesCache } from './cache.js';
2020
import { createLogger } from '@windingtree/sdk-logger';
2121
import { parse } from 'superjson';
22+
import { peerIdFromBytes } from '@libp2p/peer-id';
2223

2324
const logger = createLogger('CenterSub');
2425

@@ -179,13 +180,20 @@ export class CenterSub extends GossipSub {
179180
logger.trace('messageTransformer not defined');
180181
return;
181182
}
183+
184+
rpcMsg.from = new Uint8Array(rpcMsg.from as ArrayBufferLike);
185+
rpcMsg.signature = new Uint8Array(rpcMsg.signature as ArrayBufferLike);
186+
rpcMsg.key = new Uint8Array(rpcMsg.key as ArrayBufferLike);
187+
rpcMsg.data = new Uint8Array(rpcMsg.data as ArrayBufferLike);
188+
rpcMsg.seqno = new Uint8Array(rpcMsg.seqno as ArrayBufferLike);
189+
182190
const msgId = await sha256.encode(rpcMsg.data);
183191
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
184192
const msgIdStr = this['msgIdToStrFn'](msgId) as string;
185193
const transformed = this.messageTransformer(rpcMsg.data);
186194
await this.messages.set(
187195
msgIdStr,
188-
rpcMsg.from.toString(),
196+
peerIdFromBytes(rpcMsg.from).toString(),
189197
rpcMsg,
190198
Number(transformed.expire),
191199
Number(transformed.nonce),

0 commit comments

Comments
 (0)