Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/assets-controller/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `TokenDataSource` removes tokens flagged malicious by Blockaid (via `PhishingController:bulkScanTokens`) before merging metadata, instead of filtering non-native tokens by a minimum occurrence count ([#8329](https://github.com/MetaMask/core/pull/8329))

### Fixed

- `TokenDataSource` splits v3 asset metadata fetches into batches of at most 120 asset IDs per request, matching the Tokens API limit, and runs chunk requests in parallel with bounded concurrency via `p-limit` ([#8294](https://github.com/MetaMask/core/pull/8294))
- `PriceDataSource` splits v3 spot-price fetches into batches of at most 120 asset IDs per request, matching the same scale as token metadata requests, with bounded concurrency via `p-limit` ([#8294](https://github.com/MetaMask/core/pull/8294))

## [3.3.0]

### Changed
Expand Down
9 changes: 8 additions & 1 deletion packages/assets-controller/src/AssetsController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,14 @@ export class AssetsController extends BaseController<
chains: ChainId[],
previousChains: ChainId[],
): void => {
this.#handleActiveChainsUpdate(dataSourceName, chains, previousChains);
try {
this.#handleActiveChainsUpdate(dataSourceName, chains, previousChains);
} catch (error) {
log('Failed to handle active chains update', {
dataSourceName,
error,
});
}
};

this.#backendWebsocketDataSource = new BackendWebsocketDataSource({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import {
MockAnyNamespace,
} from '@metamask/messenger';
import { NetworkStatus } from '@metamask/network-controller';

import {
NetworkState,
RpcEndpoint,
RpcEndpointType,
} from '../../../network-controller/src/NetworkController';
} from '@metamask/network-controller/src/NetworkController';

import {
AssetsControllerMessenger,
getDefaultAssetsControllerState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,75 @@ describe('PriceDataSource', () => {
controller.destroy();
});

it('fetch batches spot price requests in chunks of 120 asset IDs', async () => {
const assetIds = Array.from({ length: 121 }, (_, i) => {
const hexString = i.toString(16).padStart(40, '0');
return `eip155:1/erc20:0x${hexString}` as Caip19AssetId;
});
const balanceState: Record<string, Record<string, unknown>> = {
'mock-account-id': Object.fromEntries(
assetIds.map((id) => [id, { amount: '1' }]),
),
};
const priceResponse = Object.fromEntries(
assetIds.map((id) => [id, createMockPriceData(1)]),
);

const { controller, apiClient, getAssetsState } = setupController({
balanceState,
priceResponse,
});

await controller.fetch(createDataRequest({ chainIds: [] }), getAssetsState);

expect(apiClient.prices.fetchV3SpotPrices).toHaveBeenCalledTimes(2);
const chunkSizes = apiClient.prices.fetchV3SpotPrices.mock.calls
.map((call) => (call[0] as string[]).length)
.sort((a, b) => b - a);
expect(chunkSizes).toStrictEqual([120, 1]);

controller.destroy();
});

it('fetch batches spot price requests per chunk for non-USD currency', async () => {
const assetIds = Array.from({ length: 121 }, (_, i) => {
const hexString = i.toString(16).padStart(40, '0');
return `eip155:1/erc20:0x${hexString}` as Caip19AssetId;
});
const balanceState: Record<string, Record<string, unknown>> = {
'mock-account-id': Object.fromEntries(
assetIds.map((id) => [id, { amount: '1' }]),
),
};
const priceResponse = Object.fromEntries(
assetIds.map((id) => [id, createMockPriceData(1)]),
);

const { controller, apiClient, getAssetsState } = setupController({
getSelectedCurrency: () => 'eur',
balanceState,
priceResponse,
});

await controller.fetch(createDataRequest({ chainIds: [] }), getAssetsState);

expect(apiClient.prices.fetchV3SpotPrices).toHaveBeenCalledTimes(4);
const eurChunks = apiClient.prices.fetchV3SpotPrices.mock.calls.filter(
(call) => (call[1] as { currency?: string }).currency === 'eur',
);
const usdChunks = apiClient.prices.fetchV3SpotPrices.mock.calls.filter(
(call) => (call[1] as { currency?: string }).currency === 'usd',
);
expect(eurChunks).toHaveLength(2);
expect(usdChunks).toHaveLength(2);
const eurSizes = eurChunks
.map((call) => (call[0] as string[]).length)
.sort((a, b) => b - a);
expect(eurSizes).toStrictEqual([120, 1]);

controller.destroy();
});

it('fetch handles getState error gracefully', async () => {
const { controller } = setupController();

Expand Down
63 changes: 45 additions & 18 deletions packages/assets-controller/src/data-sources/PriceDataSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type {
} from '@metamask/core-backend';
import { ApiPlatformClient } from '@metamask/core-backend';
import { parseCaipAssetType } from '@metamask/utils';
import pLimit from 'p-limit';

import type { SubscriptionRequest } from './AbstractDataSource';
import { projectLogger, createModuleLogger } from '../logger';
Expand All @@ -24,6 +25,12 @@ import type {
const CONTROLLER_NAME = 'PriceDataSource';
const DEFAULT_POLL_INTERVAL = 60_000; // 1 minute for price updates

/** Price API v3 spot-prices accepts at most this many `assetIds` per request (same cap as tokens `/v3/assets`). */
const V3_SPOT_PRICES_MAX_IDS_PER_REQUEST = 120;

/** Max concurrent spot-price chunk requests (aligned with TokenDataSource metadata fetches). */
const V3_SPOT_PRICES_FETCH_CONCURRENCY = 3;

const log = createModuleLogger(projectLogger, CONTROLLER_NAME);

// ============================================================================
Expand Down Expand Up @@ -219,30 +226,50 @@ export class PriceDataSource {
async #fetchSpotPrices(
assetIds: string[],
): Promise<Record<Caip19AssetId, FungibleAssetPrice>> {
if (assetIds.length === 0) {
return {};
}

const selectedCurrency = this.#getSelectedCurrency();
const chunks: string[][] = [];
for (
let i = 0;
i < assetIds.length;
i += V3_SPOT_PRICES_MAX_IDS_PER_REQUEST
) {
chunks.push(assetIds.slice(i, i + V3_SPOT_PRICES_MAX_IDS_PER_REQUEST));
}

const limit = pLimit(V3_SPOT_PRICES_FETCH_CONCURRENCY);
const queryOpts = { includeMarketData: true as const };

const selectedChunkResults = await Promise.all(
chunks.map((chunk) =>
limit(() =>
this.#apiClient.prices.fetchV3SpotPrices(chunk, {
currency: selectedCurrency,
...queryOpts,
}),
),
),
);
const selectedCurrencyPrices = Object.assign({}, ...selectedChunkResults);

let selectedCurrencyPrices: V3SpotPricesResponse;
let usdPrices: V3SpotPricesResponse;
if (selectedCurrency === 'usd') {
selectedCurrencyPrices = await this.#apiClient.prices.fetchV3SpotPrices(
assetIds,
{
currency: selectedCurrency,
includeMarketData: true,
},
);
usdPrices = selectedCurrencyPrices;
} else {
[selectedCurrencyPrices, usdPrices] = await Promise.all([
this.#apiClient.prices.fetchV3SpotPrices(assetIds, {
currency: selectedCurrency,
includeMarketData: true,
}),
this.#apiClient.prices.fetchV3SpotPrices(assetIds, {
currency: 'usd',
includeMarketData: true,
}),
]);
const usdChunkResults = await Promise.all(
chunks.map((chunk) =>
limit(() =>
this.#apiClient.prices.fetchV3SpotPrices(chunk, {
currency: 'usd',
...queryOpts,
}),
),
),
);
usdPrices = Object.assign({}, ...usdChunkResults);
}

const prices: Record<Caip19AssetId, FungibleAssetPrice> = {};
Expand Down
22 changes: 16 additions & 6 deletions packages/assets-controller/src/data-sources/RpcDataSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,14 @@ export class RpcDataSource extends AbstractDataSource<
balanceFetcherMessenger,
{ pollingInterval: balanceInterval },
);
this.#balanceFetcher.setOnBalanceUpdate(
this.#handleBalanceUpdate.bind(this),
);
// Polling controller awaits this callback; rejections must not become unhandled.
this.#balanceFetcher.setOnBalanceUpdate(async (result) => {
try {
await this.#handleBalanceUpdate(result);
} catch (error) {
log('Balance update handler failed', { error });
}
});

// Initialize TokenDetector with polling interval
this.#tokenDetector = new TokenDetector(
Expand All @@ -295,9 +300,14 @@ export class RpcDataSource extends AbstractDataSource<
useExternalService: this.#useExternalService,
},
);
this.#tokenDetector.setOnDetectionUpdate(
this.#handleDetectionUpdate.bind(this),
);
// Sync throw in the detector would reject the poll tick if uncaught.
this.#tokenDetector.setOnDetectionUpdate((result) => {
try {
this.#handleDetectionUpdate(result);
} catch (error) {
log('Detection update handler failed', { error });
}
});

this.#subscribeToNetworkController();
this.#subscribeToTransactionEvents();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ export class SnapDataSource extends AbstractDataSource<
// Transform the snap keyring payload to DataResponse format
let assetsBalance: NonNullable<DataResponse['assetsBalance']> | undefined;

for (const [accountId, assets] of Object.entries(payload.balances)) {
for (const [accountId, assets] of Object.entries(payload?.balances ?? {})) {
let accountAssets: Record<Caip19AssetId, AssetBalance> | undefined;

for (const [assetId, balance] of Object.entries(assets)) {
for (const [assetId, balance] of Object.entries(assets ?? {})) {
let chainId: ChainId;
try {
chainId = extractChainFromAssetId(assetId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,14 @@ export class StakedBalanceDataSource extends AbstractDataSource<
});

// Wire the callback so polling results flow back to subscriptions
this.#stakedBalanceFetcher.setOnStakedBalanceUpdate(
this.#handleStakedBalanceUpdate.bind(this),
);
// Polling controller invokes this synchronously; keep failures inside the poll tick.
this.#stakedBalanceFetcher.setOnStakedBalanceUpdate((result) => {
try {
this.#handleStakedBalanceUpdate(result);
} catch (error) {
log('Staked balance update handler failed', { error });
}
});

this.#messenger.subscribe(
'TransactionController:transactionConfirmed',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,39 @@ describe('TokenDataSource', () => {
expect(next).toHaveBeenCalledWith(context);
});

it('middleware chunks fetchV3Assets when more than 120 asset IDs are requested', async () => {
const assetIds = Array.from({ length: 121 }, (_, i) => {
const hexString = (i + 1).toString(16).padStart(40, '0');
return `eip155:1/erc20:0x${hexString}` as Caip19AssetId;
});

const { controller, apiClient } = setupController({
supportedNetworks: ['eip155:1'],
});

apiClient.tokens.fetchV3Assets.mockImplementation((ids: string[]) =>
Promise.resolve(ids.map((id) => createMockAssetResponse(id))),
);

const next = jest.fn().mockResolvedValue(undefined);
const context = createMiddlewareContext({
response: {
detectedAssets: {
'mock-account-id': assetIds,
},
},
});

await controller.assetsMiddleware(context, next);

expect(apiClient.tokens.fetchV3Assets).toHaveBeenCalledTimes(2);
expect(apiClient.tokens.fetchV3Assets.mock.calls[0][0]).toHaveLength(120);
expect(apiClient.tokens.fetchV3Assets.mock.calls[1][0]).toHaveLength(1);
expect(context.response.assetsInfo?.[assetIds[0]]?.symbol).toBe('TEST');
expect(context.response.assetsInfo?.[assetIds[120]]?.symbol).toBe('TEST');
expect(next).toHaveBeenCalledWith(context);
});

it('middleware transforms native asset type correctly', async () => {
const { controller } = setupController({
messenger: createTestMessenger(),
Expand Down
53 changes: 40 additions & 13 deletions packages/assets-controller/src/data-sources/TokenDataSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
parseCaipAssetType,
} from '@metamask/utils';
import type { CaipAssetType } from '@metamask/utils';
import pLimit from 'p-limit';

import { isStakingContractAssetId } from './evm-rpc-services';
import type { AssetsControllerMessenger } from '../AssetsController';
Expand All @@ -29,6 +30,14 @@

const CONTROLLER_NAME = 'TokenDataSource';

/** Tokens API `/v3/assets` accepts at most this many `assetIds` per request. */
const V3_ASSETS_MAX_IDS_PER_REQUEST = 120;

/** Max concurrent `/v3/assets` chunk requests (same default scale as balance middleware). */
const V3_ASSETS_FETCH_CONCURRENCY = 3;

const MIN_TOKEN_OCCURRENCES = 3;

Check failure on line 39 in packages/assets-controller/src/data-sources/TokenDataSource.ts

View workflow job for this annotation

GitHub Actions / Lint, build, and test / Lint (lint:eslint)

'MIN_TOKEN_OCCURRENCES' is assigned a value but never used
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused MIN_TOKEN_OCCURRENCES constant is dead code

Low Severity

The MIN_TOKEN_OCCURRENCES constant is defined but never referenced anywhere in the codebase. A grep across the entire repository finds only this single definition. The changelog notes that the filtering approach changed from minimum occurrence count to Blockaid scanning, so this constant appears to be a leftover from the old approach that was inadvertently introduced in this diff.

Fix in Cursor Fix in Web


const log = createModuleLogger(projectLogger, CONTROLLER_NAME);

/** Max tokens per PhishingController:bulkScanTokens request (see PhishingController). */
Expand Down Expand Up @@ -369,20 +378,38 @@
}

try {
// Use ApiPlatformClient for fetching asset metadata
// API returns an array with assetId as a property on each item
const metadataResponse = await this.#apiClient.tokens.fetchV3Assets(
supportedAssetIds,
{
includeIconUrl: true,
includeMarketData: true,
includeMetadata: true,
includeLabels: true,
includeRwaData: true,
includeAggregators: true,
includeOccurrences: true,
},
const metadataQueryOptions = {
includeIconUrl: true,
includeMarketData: true,
includeMetadata: true,
includeLabels: true,
includeRwaData: true,
includeAggregators: true,
includeOccurrences: true,
};

// API returns an array with assetId as a property on each item.
// Request in chunks to stay within the per-request asset ID limit.
const chunks: string[][] = [];
for (
let i = 0;
i < supportedAssetIds.length;
i += V3_ASSETS_MAX_IDS_PER_REQUEST
) {
chunks.push(
supportedAssetIds.slice(i, i + V3_ASSETS_MAX_IDS_PER_REQUEST),
);
}

const limit = pLimit(V3_ASSETS_FETCH_CONCURRENCY);
const chunkResponses = await Promise.all(
chunks.map((chunk) =>
limit(() =>
this.#apiClient.tokens.fetchV3Assets(chunk, metadataQueryOptions),
),
),
);
const metadataResponse = chunkResponses.flat();

const assetIdsFromApi = metadataResponse.map((a) => a.assetId);
const allowedAssetIds = new Set(
Expand Down
Loading