From f2e425eca50d55106b7ef1ae9476f9ff13d2c521 Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 5 May 2026 12:12:22 +0200 Subject: [PATCH 1/4] chore: add ohlcv websocket --- .../ohlcv/OHLCVService-method-action-types.ts | 33 + .../src/api/ohlcv/OHLCVService.test.ts | 714 ++++++++++++++++++ .../src/api/ohlcv/OHLCVService.ts | 483 ++++++++++++ packages/core-backend/src/api/ohlcv/index.ts | 18 + packages/core-backend/src/api/ohlcv/types.ts | 33 + packages/core-backend/src/index.ts | 25 + 6 files changed, 1306 insertions(+) create mode 100644 packages/core-backend/src/api/ohlcv/OHLCVService-method-action-types.ts create mode 100644 packages/core-backend/src/api/ohlcv/OHLCVService.test.ts create mode 100644 packages/core-backend/src/api/ohlcv/OHLCVService.ts create mode 100644 packages/core-backend/src/api/ohlcv/index.ts create mode 100644 packages/core-backend/src/api/ohlcv/types.ts diff --git a/packages/core-backend/src/api/ohlcv/OHLCVService-method-action-types.ts b/packages/core-backend/src/api/ohlcv/OHLCVService-method-action-types.ts new file mode 100644 index 0000000000..c53628c087 --- /dev/null +++ b/packages/core-backend/src/api/ohlcv/OHLCVService-method-action-types.ts @@ -0,0 +1,33 @@ +/** + * This file is auto generated. + * Do not edit manually. + */ + +import type { OHLCVService } from './OHLCVService'; + +/** + * Subscribe to an OHLCV channel for real-time candlestick data. + * + * @param options - The subscription parameters (assetId, interval, currency). + */ +export type OHLCVServiceSubscribeAction = { + type: `OHLCVService:subscribe`; + handler: OHLCVService['subscribe']; +}; + +/** + * Unsubscribe from an OHLCV channel. + * + * @param options - The subscription parameters to unsubscribe from. + */ +export type OHLCVServiceUnsubscribeAction = { + type: `OHLCVService:unsubscribe`; + handler: OHLCVService['unsubscribe']; +}; + +/** + * Union of all OHLCVService action types. + */ +export type OHLCVServiceMethodActions = + | OHLCVServiceSubscribeAction + | OHLCVServiceUnsubscribeAction; diff --git a/packages/core-backend/src/api/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/api/ohlcv/OHLCVService.test.ts new file mode 100644 index 0000000000..44d9060c0c --- /dev/null +++ b/packages/core-backend/src/api/ohlcv/OHLCVService.test.ts @@ -0,0 +1,714 @@ +import { Messenger, MOCK_ANY_NAMESPACE } from '@metamask/messenger'; +import type { + MessengerActions, + MessengerEvents, + MockAnyNamespace, +} from '@metamask/messenger'; + +import { flushPromises } from '../../../../../tests/helpers'; +import type { ServerNotificationMessage } from '../../BackendWebSocketService'; +import { WebSocketState } from '../../BackendWebSocketService'; +import { OHLCVService } from './OHLCVService'; +import type { OHLCVServiceMessenger } from './OHLCVService'; +import type { OHLCVSubscriptionOptions } from './types'; + +// ============================================================================= +// Test Helpers +// ============================================================================= + +type AllOHLCVServiceActions = MessengerActions; +type AllOHLCVServiceEvents = MessengerEvents; + +type RootMessenger = Messenger< + MockAnyNamespace, + AllOHLCVServiceActions, + AllOHLCVServiceEvents +>; + +const completeAsyncOperations = async (timeoutMs = 0): Promise => { + await flushPromises(); + if (timeoutMs > 0) { + await new Promise((resolve) => setTimeout(resolve, timeoutMs)); + } + await flushPromises(); +}; + +function getRootMessenger(): RootMessenger { + return new Messenger({ namespace: MOCK_ANY_NAMESPACE }); +} + +const getMessenger = (): { + rootMessenger: RootMessenger; + messenger: OHLCVServiceMessenger; + mocks: { + connect: jest.Mock; + subscribe: jest.Mock; + channelHasSubscription: jest.Mock; + getSubscriptionsByChannel: jest.Mock; + findSubscriptionsByChannelPrefix: jest.Mock; + forceReconnection: jest.Mock; + addChannelCallback: jest.Mock; + removeChannelCallback: jest.Mock; + getConnectionInfo: jest.Mock; + }; +} => { + const rootMessenger = getRootMessenger(); + const messenger: OHLCVServiceMessenger = new Messenger< + 'OHLCVService', + AllOHLCVServiceActions, + AllOHLCVServiceEvents, + RootMessenger + >({ + namespace: 'OHLCVService', + parent: rootMessenger, + }); + + rootMessenger.delegate({ + actions: [ + 'BackendWebSocketService:connect', + 'BackendWebSocketService:forceReconnection', + 'BackendWebSocketService:subscribe', + 'BackendWebSocketService:getConnectionInfo', + 'BackendWebSocketService:channelHasSubscription', + 'BackendWebSocketService:getSubscriptionsByChannel', + 'BackendWebSocketService:findSubscriptionsByChannelPrefix', + 'BackendWebSocketService:addChannelCallback', + 'BackendWebSocketService:removeChannelCallback', + ], + events: ['BackendWebSocketService:connectionStateChanged'], + messenger, + }); + + const mockConnect = jest.fn(); + const mockForceReconnection = jest.fn(); + const mockSubscribe = jest.fn(); + const mockChannelHasSubscription = jest.fn().mockReturnValue(false); + const mockGetSubscriptionsByChannel = jest.fn().mockReturnValue([]); + const mockFindSubscriptionsByChannelPrefix = jest + .fn() + .mockReturnValue([]); + const mockAddChannelCallback = jest.fn(); + const mockRemoveChannelCallback = jest.fn(); + const mockGetConnectionInfo = jest.fn(); + + rootMessenger.registerActionHandler( + 'BackendWebSocketService:connect', + mockConnect, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:forceReconnection', + mockForceReconnection, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:subscribe', + mockSubscribe, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:channelHasSubscription', + mockChannelHasSubscription, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:getSubscriptionsByChannel', + mockGetSubscriptionsByChannel, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:findSubscriptionsByChannelPrefix', + mockFindSubscriptionsByChannelPrefix, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:addChannelCallback', + mockAddChannelCallback, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:removeChannelCallback', + mockRemoveChannelCallback, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:getConnectionInfo', + mockGetConnectionInfo, + ); + + return { + rootMessenger, + messenger, + mocks: { + connect: mockConnect, + subscribe: mockSubscribe, + channelHasSubscription: mockChannelHasSubscription, + getSubscriptionsByChannel: mockGetSubscriptionsByChannel, + findSubscriptionsByChannelPrefix: mockFindSubscriptionsByChannelPrefix, + forceReconnection: mockForceReconnection, + addChannelCallback: mockAddChannelCallback, + removeChannelCallback: mockRemoveChannelCallback, + getConnectionInfo: mockGetConnectionInfo, + }, + }; +}; + +type WithServiceCallback = (payload: { + service: OHLCVService; + messenger: OHLCVServiceMessenger; + rootMessenger: RootMessenger; + mocks: ReturnType['mocks']; + destroy: () => void; +}) => Promise | R; + +async function withService(fn: WithServiceCallback): Promise { + const setup = getMessenger(); + const service = new OHLCVService({ messenger: setup.messenger }); + service.init(); + + try { + return await fn({ + service, + messenger: setup.messenger, + rootMessenger: setup.rootMessenger, + mocks: setup.mocks, + destroy: () => service.destroy(), + }); + } finally { + service.destroy(); + } +} + +const getSystemNotificationCallback = (mocks: { + addChannelCallback: jest.Mock; +}): ((notification: ServerNotificationMessage) => void) => { + const call = mocks.addChannelCallback.mock.calls.find( + (c: unknown[]) => + c[0] && + typeof c[0] === 'object' && + 'channelName' in c[0] && + (c[0] as { channelName: string }).channelName === + 'system-notifications.v1.market-data.v1', + ); + + if (!call) { + throw new Error('system notification callback not registered'); + } + + return (call[0] as { callback: (n: ServerNotificationMessage) => void }) + .callback; +}; + +// ============================================================================= +// Shared Constants +// ============================================================================= + +const SUB_OPTS: OHLCVSubscriptionOptions = { + assetId: 'eip155:8453/erc20:0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913', + interval: '1m', + currency: 'usd', +}; + +const EXPECTED_CHANNEL = + 'market-data.v1.eip155:8453/erc20:0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913.1m.usd'; + +// ============================================================================= +// Tests +// ============================================================================= + +describe('OHLCVService', () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + // =========================================================================== + // Constructor + // =========================================================================== + + describe('constructor', () => { + it('should register method action handlers and system-notifications callback', async () => { + await withService(async ({ service, mocks }) => { + expect(service).toBeInstanceOf(OHLCVService); + expect(service.name).toBe('OHLCVService'); + + expect(mocks.addChannelCallback).toHaveBeenCalledWith({ + channelName: 'system-notifications.v1.market-data.v1', + callback: expect.any(Function), + }); + }); + }); + }); + + // =========================================================================== + // Subscribe + // =========================================================================== + + describe('subscribe', () => { + it('should connect and create a WebSocket subscription for a new channel', async () => { + await withService(async ({ service, mocks }) => { + await service.subscribe(SUB_OPTS); + + expect(mocks.connect).toHaveBeenCalledTimes(1); + expect(mocks.channelHasSubscription).toHaveBeenCalledWith( + EXPECTED_CHANNEL, + ); + expect(mocks.subscribe).toHaveBeenCalledWith({ + channels: [EXPECTED_CHANNEL], + channelType: 'market-data.v1', + callback: expect.any(Function), + }); + }); + }); + + it('should skip WS subscribe if the channel already has a subscription', async () => { + await withService(async ({ service, mocks }) => { + mocks.channelHasSubscription.mockReturnValue(true); + + await service.subscribe(SUB_OPTS); + + expect(mocks.subscribe).not.toHaveBeenCalled(); + }); + }); + + it('should increment refCount on duplicate subscribe without WS traffic', async () => { + await withService(async ({ service, mocks }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.connect.mockClear(); + + await service.subscribe(SUB_OPTS); + + expect(mocks.connect).not.toHaveBeenCalled(); + expect(mocks.subscribe).not.toHaveBeenCalled(); + }); + }); + + it('should force reconnection when subscribe fails', async () => { + await withService(async ({ service, mocks, messenger }) => { + mocks.connect.mockRejectedValueOnce(new Error('connection failed')); + + const errorListener = jest.fn(); + messenger.subscribe('OHLCVService:subscriptionError', errorListener); + + await service.subscribe(SUB_OPTS); + + expect(errorListener).toHaveBeenCalledWith({ + channel: EXPECTED_CHANNEL, + error: expect.stringContaining('connection failed'), + operation: 'subscribe', + }); + expect(mocks.forceReconnection).toHaveBeenCalledTimes(1); + }); + }); + + it('should publish barUpdated events when WebSocket delivers data', async () => { + await withService(async ({ service, mocks, messenger }) => { + let capturedCallback: (n: ServerNotificationMessage) => void = jest.fn(); + + mocks.subscribe.mockImplementation((opts) => { + capturedCallback = opts.callback; + return Promise.resolve(); + }); + + await service.subscribe(SUB_OPTS); + + const barListener = jest.fn(); + messenger.subscribe('OHLCVService:barUpdated', barListener); + + capturedCallback({ + event: 'data', + subscriptionId: 'sub-1', + timestamp: 1776364071003, + channel: EXPECTED_CHANNEL, + data: { + timestamp: 1776364020, + open: 74.099, + high: 74.1, + low: 74.083, + close: 74.099, + volume: 5806.43, + }, + } as ServerNotificationMessage); + + expect(barListener).toHaveBeenCalledWith({ + channel: EXPECTED_CHANNEL, + bar: { + timestamp: 1776364020, + open: 74.099, + high: 74.1, + low: 74.083, + close: 74.099, + volume: 5806.43, + }, + }); + }); + }); + }); + + // =========================================================================== + // Unsubscribe + // =========================================================================== + + describe('unsubscribe', () => { + it('should be a no-op if channel was never subscribed', async () => { + await withService(async ({ service, mocks }) => { + await service.unsubscribe(SUB_OPTS); + + expect(mocks.getSubscriptionsByChannel).not.toHaveBeenCalled(); + }); + }); + + it('should decrement refCount without unsubscribing when other consumers remain', async () => { + await withService(async ({ service, mocks }) => { + await service.subscribe(SUB_OPTS); + await service.subscribe(SUB_OPTS); + + await service.unsubscribe(SUB_OPTS); + + // No timer should have been started, no WS unsubscribe + jest.advanceTimersByTime(5000); + await completeAsyncOperations(); + expect(mocks.getSubscriptionsByChannel).not.toHaveBeenCalled(); + }); + }); + + it('should start a grace-period timer and unsubscribe after expiry', async () => { + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + // Before grace period expires — still subscribed + expect(mockUnsub).not.toHaveBeenCalled(); + + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + + expect(mocks.getSubscriptionsByChannel).toHaveBeenCalledWith( + EXPECTED_CHANNEL, + ); + expect(mockUnsub).toHaveBeenCalledTimes(1); + }); + }); + }); + + // =========================================================================== + // Grace Period — Re-subscribe During Grace + // =========================================================================== + + describe('grace period', () => { + it('should cancel grace-period timer if re-subscribed before expiry', async () => { + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + // Re-subscribe during grace period + jest.advanceTimersByTime(1000); + mocks.subscribe.mockClear(); + mocks.connect.mockClear(); + await service.subscribe(SUB_OPTS); + + // Should NOT have called connect/subscribe again — subscription is still alive + expect(mocks.connect).not.toHaveBeenCalled(); + expect(mocks.subscribe).not.toHaveBeenCalled(); + + // Advance past original grace period — should NOT unsubscribe + jest.advanceTimersByTime(5000); + await completeAsyncOperations(); + expect(mockUnsub).not.toHaveBeenCalled(); + }); + }); + }); + + // =========================================================================== + // Reference Counting + // =========================================================================== + + describe('reference counting', () => { + it('should share a single WS subscription across multiple consumers', async () => { + await withService(async ({ service, mocks }) => { + await service.subscribe(SUB_OPTS); + await service.subscribe(SUB_OPTS); + await service.subscribe(SUB_OPTS); + + // Only one WS subscribe call + expect(mocks.subscribe).toHaveBeenCalledTimes(1); + + // Unsubscribe twice — refCount goes from 3 → 1 + await service.unsubscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + jest.advanceTimersByTime(5000); + await completeAsyncOperations(); + + // Still has one consumer — no WS unsubscribe + expect(mocks.getSubscriptionsByChannel).not.toHaveBeenCalled(); + }); + }); + + it('should unsubscribe from WS when all consumers leave and grace expires', async () => { + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + await service.subscribe(SUB_OPTS); + await service.subscribe(SUB_OPTS); + + await service.unsubscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + + expect(mockUnsub).toHaveBeenCalledTimes(1); + }); + }); + }); + + // =========================================================================== + // Reconnect Resilience + // =========================================================================== + + describe('reconnect', () => { + it('should resubscribe active channels on WebSocket CONNECTED', async () => { + await withService(async ({ service, mocks, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(false); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + expect(mocks.subscribe).toHaveBeenCalledWith({ + channels: [EXPECTED_CHANNEL], + channelType: 'market-data.v1', + callback: expect.any(Function), + }); + }); + }); + + it('should skip resubscribe if channel already has a subscription after reconnect', async () => { + await withService(async ({ service, mocks, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(true); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + expect(mocks.subscribe).not.toHaveBeenCalled(); + }); + }); + + it('should publish chainStatusChanged down on DISCONNECTED', async () => { + await withService(async ({ mocks, messenger, rootMessenger }) => { + const statusListener = jest.fn(); + messenger.subscribe( + 'OHLCVService:chainStatusChanged', + statusListener, + ); + + // Simulate a system notification marking a chain as up + const systemCallback = getSystemNotificationCallback(mocks); + systemCallback({ + event: 'system-notification', + channel: 'system-notifications.v1.market-data.v1', + data: { chainIds: ['eip155:8453'], status: 'up' }, + timestamp: Date.now(), + } as ServerNotificationMessage); + + statusListener.mockClear(); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + state: WebSocketState.DISCONNECTED, + connectedAt: null, + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + expect(statusListener).toHaveBeenCalledWith( + expect.objectContaining({ + chainIds: ['eip155:8453'], + status: 'down', + }), + ); + }); + }); + }); + + // =========================================================================== + // System Notifications + // =========================================================================== + + describe('system notifications', () => { + it('should forward chain-down notifications via chainStatusChanged event', async () => { + await withService(async ({ mocks, messenger }) => { + const statusListener = jest.fn(); + messenger.subscribe( + 'OHLCVService:chainStatusChanged', + statusListener, + ); + + const systemCallback = getSystemNotificationCallback(mocks); + systemCallback({ + event: 'system-notification', + channel: 'system-notifications.v1.market-data.v1', + data: { chainIds: ['eip155:8453'], status: 'down' }, + timestamp: 1776364071003, + } as ServerNotificationMessage); + + expect(statusListener).toHaveBeenCalledWith({ + chainIds: ['eip155:8453'], + status: 'down', + timestamp: 1776364071003, + }); + }); + }); + + it('should forward chain-up notifications', async () => { + await withService(async ({ mocks, messenger }) => { + const statusListener = jest.fn(); + messenger.subscribe( + 'OHLCVService:chainStatusChanged', + statusListener, + ); + + const systemCallback = getSystemNotificationCallback(mocks); + systemCallback({ + event: 'system-notification', + channel: 'system-notifications.v1.market-data.v1', + data: { chainIds: ['eip155:1', 'eip155:137'], status: 'up' }, + timestamp: 1776364071003, + } as ServerNotificationMessage); + + expect(statusListener).toHaveBeenCalledWith({ + chainIds: ['eip155:1', 'eip155:137'], + status: 'up', + timestamp: 1776364071003, + }); + }); + }); + + it('should throw on invalid system notification data', async () => { + await withService(async ({ mocks }) => { + const systemCallback = getSystemNotificationCallback(mocks); + + expect(() => + systemCallback({ + event: 'system-notification', + channel: 'system-notifications.v1.market-data.v1', + data: { invalid: true }, + timestamp: Date.now(), + } as unknown as ServerNotificationMessage), + ).toThrow('Invalid system notification data'); + }); + }); + }); + + // =========================================================================== + // Error Paths + // =========================================================================== + + describe('error paths', () => { + it('should publish subscriptionError and force reconnection when unsubscribe fails', async () => { + await withService(async ({ service, mocks, messenger }) => { + mocks.getSubscriptionsByChannel.mockImplementation(() => { + throw new Error('ws gone'); + }); + + const errorListener = jest.fn(); + messenger.subscribe('OHLCVService:subscriptionError', errorListener); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + + expect(errorListener).toHaveBeenCalledWith({ + channel: EXPECTED_CHANNEL, + error: expect.stringContaining('ws gone'), + operation: 'unsubscribe', + }); + expect(mocks.forceReconnection).toHaveBeenCalled(); + }); + }); + + it('should log and continue when resubscription fails for a channel', async () => { + await withService(async ({ service, mocks, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(false); + mocks.subscribe.mockRejectedValueOnce(new Error('resubscribe fail')); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 1, + }, + ); + await completeAsyncOperations(); + + // Should have attempted but failed silently + expect(mocks.subscribe).toHaveBeenCalledTimes(1); + }); + }); + }); + + // =========================================================================== + // Destroy + // =========================================================================== + + describe('destroy', () => { + it('should clear grace-period timers and remove channel callback', async () => { + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + // Grace timer is running — destroy should clear it + service.destroy(); + + jest.advanceTimersByTime(5000); + await completeAsyncOperations(); + + // Timer was cleared so the actual unsubscribe should NOT have fired + expect(mockUnsub).not.toHaveBeenCalled(); + + expect(mocks.removeChannelCallback).toHaveBeenCalledWith( + 'system-notifications.v1.market-data.v1', + ); + }); + }); + }); +}); diff --git a/packages/core-backend/src/api/ohlcv/OHLCVService.ts b/packages/core-backend/src/api/ohlcv/OHLCVService.ts new file mode 100644 index 0000000000..1dcf9753e4 --- /dev/null +++ b/packages/core-backend/src/api/ohlcv/OHLCVService.ts @@ -0,0 +1,483 @@ +/** + * OHLCV Service for real-time candlestick data streaming via WebSocket. + * + * Wraps {@link BackendWebSocketService} through the messenger pattern to + * provide subscribe/unsubscribe semantics for OHLCV market-data channels. + * Includes reference counting, grace-period unsubscribe, idempotency checks, + * chain-status forwarding, and automatic resubscription on reconnect. + */ + +import type { TraceCallback } from '@metamask/controller-utils'; +import type { Messenger } from '@metamask/messenger'; + +import type { + WebSocketConnectionInfo, + BackendWebSocketServiceConnectionStateChangedEvent, + ServerNotificationMessage, +} from '../../BackendWebSocketService'; +import { WebSocketState } from '../../BackendWebSocketService'; +import type { BackendWebSocketServiceMethodActions } from '../../BackendWebSocketService-method-action-types'; +import { projectLogger, createModuleLogger } from '../../logger'; +import type { OHLCVServiceMethodActions } from './OHLCVService-method-action-types'; +import type { OHLCVBar, OHLCVSubscriptionOptions } from './types'; + +// ============================================================================= +// Constants +// ============================================================================= + +const SERVICE_NAME = 'OHLCVService'; + +const log = createModuleLogger(projectLogger, SERVICE_NAME); + +const MESSENGER_EXPOSED_METHODS = ['subscribe', 'unsubscribe'] as const; + +const SUBSCRIPTION_NAMESPACE = 'market-data.v1'; + +const SYSTEM_NOTIFICATIONS_CHANNEL = `system-notifications.v1.${SUBSCRIPTION_NAMESPACE}`; + +/** Delay before actually unsubscribing from a channel after refCount reaches 0. */ +const GRACE_PERIOD_MS = 3_000; + +// ============================================================================= +// Types — Channel Tracking +// ============================================================================= + +type ChannelEntry = { + refCount: number; + gracePeriodTimer?: ReturnType; +}; + +// ============================================================================= +// Types — System Notifications +// ============================================================================= + +/** + * System notification data for chain status updates on market-data channels. + */ +export type OHLCVSystemNotificationData = { + chainIds: string[]; + status: 'down' | 'up'; + timestamp?: number; +}; + +// ============================================================================= +// Types — Service Options +// ============================================================================= + +/** + * Configuration options for the OHLCV service. + */ +export type OHLCVServiceOptions = { + /** Optional callback to trace performance of OHLCV operations (default: no-op) */ + traceFn?: TraceCallback; +}; + +// ============================================================================= +// Action and Event Types +// ============================================================================= + +export type OHLCVServiceActions = OHLCVServiceMethodActions; + +export const OHLCV_SERVICE_ALLOWED_ACTIONS = [ + 'BackendWebSocketService:connect', + 'BackendWebSocketService:forceReconnection', + 'BackendWebSocketService:subscribe', + 'BackendWebSocketService:getConnectionInfo', + 'BackendWebSocketService:channelHasSubscription', + 'BackendWebSocketService:getSubscriptionsByChannel', + 'BackendWebSocketService:findSubscriptionsByChannelPrefix', + 'BackendWebSocketService:addChannelCallback', + 'BackendWebSocketService:removeChannelCallback', +] as const; + +export const OHLCV_SERVICE_ALLOWED_EVENTS = [ + 'BackendWebSocketService:connectionStateChanged', +] as const; + +export type AllowedActions = BackendWebSocketServiceMethodActions; + +// Events published by OHLCVService + +export type OHLCVServiceBarUpdatedEvent = { + type: `OHLCVService:barUpdated`; + payload: [{ channel: string; bar: OHLCVBar }]; +}; + +export type OHLCVServiceChainStatusChangedEvent = { + type: `OHLCVService:chainStatusChanged`; + payload: [{ chainIds: string[]; status: 'up' | 'down'; timestamp?: number }]; +}; + +export type OHLCVServiceSubscriptionErrorEvent = { + type: `OHLCVService:subscriptionError`; + payload: [{ channel: string; error: string; operation: string }]; +}; + +export type OHLCVServiceEvents = + | OHLCVServiceBarUpdatedEvent + | OHLCVServiceChainStatusChangedEvent + | OHLCVServiceSubscriptionErrorEvent; + +export type AllowedEvents = + BackendWebSocketServiceConnectionStateChangedEvent; + +export type OHLCVServiceMessenger = Messenger< + typeof SERVICE_NAME, + OHLCVServiceActions | AllowedActions, + OHLCVServiceEvents | AllowedEvents +>; + +// ============================================================================= +// Main Service Class +// ============================================================================= + +/** + * Service for real-time OHLCV candlestick streaming via the backend WebSocket + * gateway. Communicates with {@link BackendWebSocketService} exclusively + * through the messenger — no direct import of the class. + * + * Features: + * - Reference counting: multiple UI consumers share one WebSocket subscription + * - Grace-period unsubscribe: avoids rapid unsub/resub during navigation + * - Idempotency: duplicate subscribe calls for the same channel are no-ops + * - Reconnect resilience: resubscribes all active channels on reconnect + * - Chain-status forwarding: listens to system-notifications for chain up/down + * + * @example + * ```typescript + * const service = new OHLCVService({ messenger }); + * + * // Subscribe from a UI hook + * await messenger.call('OHLCVService:subscribe', { + * assetId: 'eip155:8453/erc20:0x833...', + * interval: '1m', + * currency: 'usd', + * }); + * + * // Listen for bar updates + * messenger.subscribe('OHLCVService:barUpdated', ({ channel, bar }) => { + * chart.appendBar(bar); + * }); + * + * // Unsubscribe when the view unmounts + * await messenger.call('OHLCVService:unsubscribe', { + * assetId: 'eip155:8453/erc20:0x833...', + * interval: '1m', + * currency: 'usd', + * }); + * ``` + */ +export class OHLCVService { + readonly name = SERVICE_NAME; + + readonly #messenger: OHLCVServiceMessenger; + + readonly #trace: TraceCallback; + + readonly #channels = new Map(); + + readonly #chainsUp = new Set(); + + // ============================================================================= + // Constructor + // ============================================================================= + + constructor( + options: OHLCVServiceOptions & { messenger: OHLCVServiceMessenger }, + ) { + this.#messenger = options.messenger; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + this.#trace = options.traceFn ?? (((_req: any, fn?: any) => fn?.()) as TraceCallback); + + this.#messenger.registerMethodActionHandlers(this, MESSENGER_EXPOSED_METHODS); + + this.#messenger.subscribe( + 'BackendWebSocketService:connectionStateChanged', + // eslint-disable-next-line @typescript-eslint/no-misused-promises + (connectionInfo: WebSocketConnectionInfo) => + this.#handleWebSocketStateChange(connectionInfo), + ); + } + + /** + * Register the system-notifications channel callback. Must be called after + * construction so that clients are not forced to instantiate services in a + * specific order. + */ + init(): void { + this.#messenger.call('BackendWebSocketService:addChannelCallback', { + channelName: SYSTEM_NOTIFICATIONS_CHANNEL, + callback: (notification: ServerNotificationMessage) => + this.#handleSystemNotification(notification), + }); + } + + // ============================================================================= + // Public — Subscribe / Unsubscribe + // ============================================================================= + + /** + * Subscribe to an OHLCV channel. If this is the first subscriber for the + * given asset/interval/currency combination a WebSocket subscription is + * created. Additional calls for the same combination only bump the reference + * count. + * + * @param options - The subscription parameters. + */ + async subscribe(options: OHLCVSubscriptionOptions): Promise { + const channel = this.#buildChannel(options); + const entry = this.#channels.get(channel); + + if (entry?.gracePeriodTimer) { + clearTimeout(entry.gracePeriodTimer); + entry.gracePeriodTimer = undefined; + entry.refCount += 1; + log('Cancelled grace-period unsubscribe, bumped refCount', { + channel, + refCount: entry.refCount, + }); + return; + } + + if (entry && entry.refCount > 0) { + entry.refCount += 1; + return; + } + + try { + await this.#messenger.call('BackendWebSocketService:connect'); + + if ( + this.#messenger.call( + 'BackendWebSocketService:channelHasSubscription', + channel, + ) + ) { + this.#channels.set(channel, { refCount: 1 }); + return; + } + + await this.#messenger.call('BackendWebSocketService:subscribe', { + channels: [channel], + channelType: SUBSCRIPTION_NAMESPACE, + callback: (notification: ServerNotificationMessage) => { + this.#handleBarUpdate(channel, notification); + }, + }); + + this.#channels.set(channel, { refCount: 1 }); + } catch (error) { + log('Subscription failed, forcing reconnection', { channel, error }); + this.#messenger.publish('OHLCVService:subscriptionError', { + channel, + error: String(error), + operation: 'subscribe', + }); + await this.#forceReconnection(); + } + } + + /** + * Unsubscribe from an OHLCV channel. Decrements the reference count and, + * when it reaches zero, starts a grace-period timer before actually + * unsubscribing from the WebSocket to absorb rapid navigation patterns. + * + * @param options - The subscription parameters to unsubscribe from. + */ + async unsubscribe(options: OHLCVSubscriptionOptions): Promise { + const channel = this.#buildChannel(options); + const entry = this.#channels.get(channel); + + if (!entry || entry.refCount <= 0) { + return; + } + + entry.refCount -= 1; + + if (entry.refCount > 0) { + return; + } + + entry.gracePeriodTimer = setTimeout(() => { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.#performUnsubscribe(channel); + }, GRACE_PERIOD_MS); + } + + // ============================================================================= + // Private — WebSocket Subscription Helpers + // ============================================================================= + + async #performUnsubscribe(channel: string): Promise { + this.#channels.delete(channel); + + try { + const subscriptions = this.#messenger.call( + 'BackendWebSocketService:getSubscriptionsByChannel', + channel, + ); + + for (const sub of subscriptions) { + await sub.unsubscribe(); + } + } catch (error) { + log('Unsubscription failed, forcing reconnection', { channel, error }); + this.#messenger.publish('OHLCVService:subscriptionError', { + channel, + error: String(error), + operation: 'unsubscribe', + }); + await this.#forceReconnection(); + } + } + + /** + * Resubscribe all channels that were active before a disconnect. + * Called when WebSocket transitions to CONNECTED. + */ + async #resubscribeActiveChannels(): Promise { + for (const [channel, entry] of this.#channels.entries()) { + if (entry.refCount <= 0 && !entry.gracePeriodTimer) { + continue; + } + + try { + if ( + this.#messenger.call( + 'BackendWebSocketService:channelHasSubscription', + channel, + ) + ) { + continue; + } + + await this.#messenger.call('BackendWebSocketService:subscribe', { + channels: [channel], + channelType: SUBSCRIPTION_NAMESPACE, + callback: (notification: ServerNotificationMessage) => { + this.#handleBarUpdate(channel, notification); + }, + }); + } catch (error) { + log('Resubscription failed for channel', { channel, error }); + } + } + } + + // ============================================================================= + // Private — Message Handlers + // ============================================================================= + + #handleBarUpdate( + channel: string, + notification: ServerNotificationMessage, + ): void { + const bar = notification.data as OHLCVBar; + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.#trace( + { + name: `${SERVICE_NAME} Bar Update`, + data: { channel, timestamp: bar.timestamp }, + tags: { service: SERVICE_NAME }, + }, + () => { + this.#messenger.publish('OHLCVService:barUpdated', { channel, bar }); + }, + ); + } + + #handleSystemNotification(notification: ServerNotificationMessage): void { + const data = notification.data as OHLCVSystemNotificationData; + const { timestamp } = notification; + + if (!data.chainIds || !Array.isArray(data.chainIds) || !data.status) { + throw new Error( + 'Invalid system notification data: missing chainIds or status', + ); + } + + if (data.status === 'up') { + for (const chainId of data.chainIds) { + this.#chainsUp.add(chainId); + } + } else { + for (const chainId of data.chainIds) { + this.#chainsUp.delete(chainId); + } + } + + this.#messenger.publish('OHLCVService:chainStatusChanged', { + chainIds: data.chainIds, + status: data.status, + timestamp, + }); + + log(`Chain status change: ${data.status}`, { + chains: data.chainIds, + status: data.status, + }); + } + + async #handleWebSocketStateChange( + connectionInfo: WebSocketConnectionInfo, + ): Promise { + const { state } = connectionInfo; + + if (state === WebSocketState.CONNECTED) { + await this.#resubscribeActiveChannels(); + } else if (state === WebSocketState.DISCONNECTED) { + const chainsToMarkDown = Array.from(this.#chainsUp); + + if (chainsToMarkDown.length > 0) { + this.#messenger.publish('OHLCVService:chainStatusChanged', { + chainIds: chainsToMarkDown, + status: 'down', + timestamp: Date.now(), + }); + + log('WebSocket disconnection - marked tracked chains as down', { + count: chainsToMarkDown.length, + chains: chainsToMarkDown, + }); + + this.#chainsUp.clear(); + } + } + } + + // ============================================================================= + // Private — Utility + // ============================================================================= + + #buildChannel(options: OHLCVSubscriptionOptions): string { + return `${SUBSCRIPTION_NAMESPACE}.${options.assetId}.${options.interval}.${options.currency}`; + } + + async #forceReconnection(): Promise { + log('Forcing WebSocket reconnection'); + await this.#messenger.call('BackendWebSocketService:forceReconnection'); + } + + // ============================================================================= + // Public — Cleanup + // ============================================================================= + + /** + * Destroy the service and clean up all resources. + */ + destroy(): void { + for (const entry of this.#channels.values()) { + if (entry.gracePeriodTimer) { + clearTimeout(entry.gracePeriodTimer); + } + } + this.#channels.clear(); + + this.#messenger.call( + 'BackendWebSocketService:removeChannelCallback', + SYSTEM_NOTIFICATIONS_CHANNEL, + ); + } +} diff --git a/packages/core-backend/src/api/ohlcv/index.ts b/packages/core-backend/src/api/ohlcv/index.ts new file mode 100644 index 0000000000..e40d24d8c3 --- /dev/null +++ b/packages/core-backend/src/api/ohlcv/index.ts @@ -0,0 +1,18 @@ +export { OHLCVService } from './OHLCVService'; +export { + OHLCV_SERVICE_ALLOWED_ACTIONS, + OHLCV_SERVICE_ALLOWED_EVENTS, +} from './OHLCVService'; +export type { + OHLCVSystemNotificationData, + OHLCVServiceOptions, + OHLCVServiceActions, + AllowedActions as OHLCVServiceAllowedActions, + OHLCVServiceBarUpdatedEvent, + OHLCVServiceChainStatusChangedEvent, + OHLCVServiceSubscriptionErrorEvent, + OHLCVServiceEvents, + AllowedEvents as OHLCVServiceAllowedEvents, + OHLCVServiceMessenger, +} from './OHLCVService'; +export type { OHLCVBar, OHLCVSubscriptionOptions } from './types'; diff --git a/packages/core-backend/src/api/ohlcv/types.ts b/packages/core-backend/src/api/ohlcv/types.ts new file mode 100644 index 0000000000..b8e64caf33 --- /dev/null +++ b/packages/core-backend/src/api/ohlcv/types.ts @@ -0,0 +1,33 @@ +/** + * OHLCV WebSocket streaming types for real-time candlestick data. + */ + +/** + * A single OHLCV candlestick bar received from the market-data WebSocket stream. + */ +export type OHLCVBar = { + /** Unix timestamp (seconds) of the candle open */ + timestamp: number; + /** Opening price */ + open: number; + /** Highest price during the candle period */ + high: number; + /** Lowest price during the candle period */ + low: number; + /** Closing price (latest) */ + close: number; + /** Trading volume during the candle period */ + volume: number; +}; + +/** + * Options for subscribing to an OHLCV channel. + */ +export type OHLCVSubscriptionOptions = { + /** CAIP-19 asset identifier, e.g. "eip155:8453/erc20:0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" */ + assetId: string; + /** Candle interval, e.g. "1m", "5m", "15m", "1h", "4h", "1d" */ + interval: string; + /** Fiat currency code, e.g. "usd", "eur" */ + currency: string; +}; diff --git a/packages/core-backend/src/index.ts b/packages/core-backend/src/index.ts index 8245157118..a25a3c3cd0 100644 --- a/packages/core-backend/src/index.ts +++ b/packages/core-backend/src/index.ts @@ -80,6 +80,31 @@ export type { ApiPlatformClientServiceMessenger, } from './ApiPlatformClientService'; +// ============================================================================ +// OHLCV SERVICE +// ============================================================================ + +export { + OHLCVService, + OHLCV_SERVICE_ALLOWED_ACTIONS, + OHLCV_SERVICE_ALLOWED_EVENTS, +} from './api/ohlcv'; + +export type { + OHLCVBar, + OHLCVSubscriptionOptions, + OHLCVSystemNotificationData, + OHLCVServiceOptions, + OHLCVServiceActions, + OHLCVServiceAllowedActions, + OHLCVServiceBarUpdatedEvent, + OHLCVServiceChainStatusChangedEvent, + OHLCVServiceSubscriptionErrorEvent, + OHLCVServiceEvents, + OHLCVServiceAllowedEvents, + OHLCVServiceMessenger, +} from './api/ohlcv'; + // ============================================================================ // API PLATFORM CLIENT // ============================================================================ From 42bfec7f6a1b5786f42e195c59ac945aabac63ea Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 5 May 2026 14:46:55 +0200 Subject: [PATCH 2/4] chore: lint --- .../src/api/ohlcv/OHLCVService.ts | 44 ++++++------------- 1 file changed, 14 insertions(+), 30 deletions(-) diff --git a/packages/core-backend/src/api/ohlcv/OHLCVService.ts b/packages/core-backend/src/api/ohlcv/OHLCVService.ts index 1dcf9753e4..6def835ecf 100644 --- a/packages/core-backend/src/api/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/api/ohlcv/OHLCVService.ts @@ -7,7 +7,11 @@ * chain-status forwarding, and automatic resubscription on reconnect. */ -import type { TraceCallback } from '@metamask/controller-utils'; +import type { + TraceCallback, + TraceContext, + TraceRequest, +} from '@metamask/controller-utils'; import type { Messenger } from '@metamask/messenger'; import type { @@ -143,29 +147,6 @@ export type OHLCVServiceMessenger = Messenger< * - Reconnect resilience: resubscribes all active channels on reconnect * - Chain-status forwarding: listens to system-notifications for chain up/down * - * @example - * ```typescript - * const service = new OHLCVService({ messenger }); - * - * // Subscribe from a UI hook - * await messenger.call('OHLCVService:subscribe', { - * assetId: 'eip155:8453/erc20:0x833...', - * interval: '1m', - * currency: 'usd', - * }); - * - * // Listen for bar updates - * messenger.subscribe('OHLCVService:barUpdated', ({ channel, bar }) => { - * chart.appendBar(bar); - * }); - * - * // Unsubscribe when the view unmounts - * await messenger.call('OHLCVService:unsubscribe', { - * assetId: 'eip155:8453/erc20:0x833...', - * interval: '1m', - * currency: 'usd', - * }); - * ``` */ export class OHLCVService { readonly name = SERVICE_NAME; @@ -187,10 +168,15 @@ export class OHLCVService { ) { this.#messenger = options.messenger; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - this.#trace = options.traceFn ?? (((_req: any, fn?: any) => fn?.()) as TraceCallback); + this.#trace = + options.traceFn ?? + (((_request: TraceRequest, fn?: (context?: TraceContext) => Result) => + fn?.()) as TraceCallback); - this.#messenger.registerMethodActionHandlers(this, MESSENGER_EXPOSED_METHODS); + this.#messenger.registerMethodActionHandlers( + this, + MESSENGER_EXPOSED_METHODS, + ); this.#messenger.subscribe( 'BackendWebSocketService:connectionStateChanged', @@ -201,9 +187,7 @@ export class OHLCVService { } /** - * Register the system-notifications channel callback. Must be called after - * construction so that clients are not forced to instantiate services in a - * specific order. + * Register the system-notifications channel callback. */ init(): void { this.#messenger.call('BackendWebSocketService:addChannelCallback', { From ab85cc57f420367a4268a25b64218fe13f88ad21 Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 5 May 2026 14:53:53 +0200 Subject: [PATCH 3/4] chore: lint --- .../src/api/ohlcv/OHLCVService.test.ts | 22 +++++-------------- .../src/api/ohlcv/OHLCVService.ts | 9 ++++---- 2 files changed, 11 insertions(+), 20 deletions(-) diff --git a/packages/core-backend/src/api/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/api/ohlcv/OHLCVService.test.ts index 44d9060c0c..4533adb74b 100644 --- a/packages/core-backend/src/api/ohlcv/OHLCVService.test.ts +++ b/packages/core-backend/src/api/ohlcv/OHLCVService.test.ts @@ -84,9 +84,7 @@ const getMessenger = (): { const mockSubscribe = jest.fn(); const mockChannelHasSubscription = jest.fn().mockReturnValue(false); const mockGetSubscriptionsByChannel = jest.fn().mockReturnValue([]); - const mockFindSubscriptionsByChannelPrefix = jest - .fn() - .mockReturnValue([]); + const mockFindSubscriptionsByChannelPrefix = jest.fn().mockReturnValue([]); const mockAddChannelCallback = jest.fn(); const mockRemoveChannelCallback = jest.fn(); const mockGetConnectionInfo = jest.fn(); @@ -299,7 +297,8 @@ describe('OHLCVService', () => { it('should publish barUpdated events when WebSocket delivers data', async () => { await withService(async ({ service, mocks, messenger }) => { - let capturedCallback: (n: ServerNotificationMessage) => void = jest.fn(); + let capturedCallback: (n: ServerNotificationMessage) => void = + jest.fn(); mocks.subscribe.mockImplementation((opts) => { capturedCallback = opts.callback; @@ -524,10 +523,7 @@ describe('OHLCVService', () => { it('should publish chainStatusChanged down on DISCONNECTED', async () => { await withService(async ({ mocks, messenger, rootMessenger }) => { const statusListener = jest.fn(); - messenger.subscribe( - 'OHLCVService:chainStatusChanged', - statusListener, - ); + messenger.subscribe('OHLCVService:chainStatusChanged', statusListener); // Simulate a system notification marking a chain as up const systemCallback = getSystemNotificationCallback(mocks); @@ -568,10 +564,7 @@ describe('OHLCVService', () => { it('should forward chain-down notifications via chainStatusChanged event', async () => { await withService(async ({ mocks, messenger }) => { const statusListener = jest.fn(); - messenger.subscribe( - 'OHLCVService:chainStatusChanged', - statusListener, - ); + messenger.subscribe('OHLCVService:chainStatusChanged', statusListener); const systemCallback = getSystemNotificationCallback(mocks); systemCallback({ @@ -592,10 +585,7 @@ describe('OHLCVService', () => { it('should forward chain-up notifications', async () => { await withService(async ({ mocks, messenger }) => { const statusListener = jest.fn(); - messenger.subscribe( - 'OHLCVService:chainStatusChanged', - statusListener, - ); + messenger.subscribe('OHLCVService:chainStatusChanged', statusListener); const systemCallback = getSystemNotificationCallback(mocks); systemCallback({ diff --git a/packages/core-backend/src/api/ohlcv/OHLCVService.ts b/packages/core-backend/src/api/ohlcv/OHLCVService.ts index 6def835ecf..b3cf3c351c 100644 --- a/packages/core-backend/src/api/ohlcv/OHLCVService.ts +++ b/packages/core-backend/src/api/ohlcv/OHLCVService.ts @@ -122,8 +122,7 @@ export type OHLCVServiceEvents = | OHLCVServiceChainStatusChangedEvent | OHLCVServiceSubscriptionErrorEvent; -export type AllowedEvents = - BackendWebSocketServiceConnectionStateChangedEvent; +export type AllowedEvents = BackendWebSocketServiceConnectionStateChangedEvent; export type OHLCVServiceMessenger = Messenger< typeof SERVICE_NAME, @@ -170,8 +169,10 @@ export class OHLCVService { this.#trace = options.traceFn ?? - (((_request: TraceRequest, fn?: (context?: TraceContext) => Result) => - fn?.()) as TraceCallback); + ((( + _request: TraceRequest, + fn?: (context?: TraceContext) => Result, + ) => fn?.()) as TraceCallback); this.#messenger.registerMethodActionHandlers( this, From bd0d4d2e92c9705865486073942c5d45a2c8b980 Mon Sep 17 00:00:00 2001 From: sahar-fehri Date: Tue, 5 May 2026 15:03:59 +0200 Subject: [PATCH 4/4] chore: changelog --- packages/core-backend/CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/packages/core-backend/CHANGELOG.md b/packages/core-backend/CHANGELOG.md index 9e2241fcbc..4c7d2e4a3c 100644 --- a/packages/core-backend/CHANGELOG.md +++ b/packages/core-backend/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add `OHLCVService` for real-time OHLCV (candlestick) data streaming via WebSocket ([#8695](https://github.com/MetaMask/core/pull/8695)) + - Wraps `BackendWebSocketService` through the messenger pattern to provide subscribe/unsubscribe semantics for market-data OHLCV channels + - Includes reference counting, grace-period unsubscribe, idempotency checks, chain-status forwarding, and automatic resubscription on reconnect +- Export new types `OHLCVBar`, `OHLCVSubscriptionOptions`, `OHLCVSystemNotificationData`, `OHLCVServiceOptions`, `OHLCVServiceActions`, `OHLCVServiceAllowedActions`, `OHLCVServiceBarUpdatedEvent`, `OHLCVServiceChainStatusChangedEvent`, `OHLCVServiceSubscriptionErrorEvent`, `OHLCVServiceEvents`, `OHLCVServiceAllowedEvents`, and `OHLCVServiceMessenger` ([#8695](https://github.com/MetaMask/core/pull/8695)) +- Export new constants `OHLCV_SERVICE_ALLOWED_ACTIONS` and `OHLCV_SERVICE_ALLOWED_EVENTS` for configuring the messenger ([#8695](https://github.com/MetaMask/core/pull/8695)) + ### Changed - Bump `@metamask/accounts-controller` from `^37.1.0` to `^38.0.0` ([#8325](https://github.com/MetaMask/core/pull/8325), [#8363](https://github.com/MetaMask/core/pull/8363), [#8665](https://github.com/MetaMask/core/pull/8665))