Skip to content

Commit

Permalink
fix(blockchain-link): solana account subscription overhaul
Browse files Browse the repository at this point in the history
  • Loading branch information
martykan committed Nov 28, 2024
1 parent 64a2471 commit 018648a
Showing 1 changed file with 85 additions and 57 deletions.
142 changes: 85 additions & 57 deletions packages/blockchain-link/src/workers/solana/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import {
SolanaRpcApiMainnet,
SolanaRpcSubscriptionsApi,
TransactionWithBlockhashLifetime,
AccountInfoBase,
SolanaRpcResponse,
} from '@solana/web3.js';

import type {
Expand Down Expand Up @@ -65,6 +67,7 @@ export type SolanaAPI = Readonly<{

type Context = ContextType<SolanaAPI> & {
getTokenMetadata: () => Promise<TokenDetailByMint>;
onNetworkDisconnect: () => void;
};
type Request<T> = T & Context;

Expand Down Expand Up @@ -505,10 +508,76 @@ function abortSubscription(id: number) {
abortController?.abort();
}

const subscribeAccounts = async (
{ connect, state, post, getTokenMetadata }: Context,
accounts: SubscriptionAccountInfo[],
const handleAccountNotification = async (
context: Context,
accountNotifications: AsyncIterable<SolanaRpcResponse<AccountInfoBase>>,
account: SubscriptionAccountInfo,
) => {
const { connect, state, post, getTokenMetadata } = context;
try {
for await (const _ of accountNotifications) {
const api = await connect();
// get the last transaction signature for the account, since that what triggered this callback
const [lastSignatureResponse] = await api.rpc
.getSignaturesForAddress(address(account.descriptor), {
limit: 1,
})
.send();
const lastSignature = lastSignatureResponse?.signature;
if (!lastSignature) return;

// get the last transaction
const lastTx = await api.rpc
.getTransaction(lastSignature, {
encoding: 'jsonParsed',
maxSupportedTransactionVersion: 0,
commitment: 'confirmed',
})
.send();

if (!lastTx || !isValidTransaction(lastTx)) {
return;
}

const tokenMetadata = await getTokenMetadata();
const tx = solanaUtils.transformTransaction(
lastTx,
account.descriptor,
[],
tokenMetadata,
);

// For token accounts we need to emit an event with the owner account's descriptor
// since we don't store token accounts in the user's accounts.
const descriptor =
findTokenAccountOwner(state.getAccounts(), account.descriptor)?.descriptor ||
account.descriptor;

post({
id: -1,
type: RESPONSES.NOTIFICATION,
payload: {
type: 'notification',
payload: {
descriptor,
tx,
},
},
});
}
} catch (error) {
console.error('Solana subscription error:', error);
if (isSolanaError(error, SOLANA_ERROR__RPC_SUBSCRIPTIONS__CHANNEL_CONNECTION_CLOSED)) {
// The WS was closed, we should unsubscribe
if (account.subscriptionId) abortSubscription(account.subscriptionId);
state.removeAccounts([account]);
context.onNetworkDisconnect();
}
}
};

const subscribeAccounts = async (context: Context, accounts: SubscriptionAccountInfo[]) => {
const { connect, state } = context;
const api = await connect();
const subscribedAccounts = state.getAccounts();
const tokenAccounts = extractTokenAccounts(accounts);
Expand All @@ -523,7 +592,7 @@ const subscribeAccounts = async (
newAccounts.map(async a => {
const abortController = new AbortController();
const accountNotifications = await api.rpcSubscriptions
.accountNotifications(address(a.descriptor))
.accountNotifications(address(a.descriptor), { commitment: 'confirmed' })
.subscribe({ abortSignal: abortController.signal });
const subscriptionId = NEXT_ACCOUNT_SUBSCRIPTION_ID++;
ACCOUNT_SUBSCRIPTION_ABORT_CONTROLLERS.set(subscriptionId, abortController);
Expand All @@ -532,59 +601,7 @@ const subscribeAccounts = async (
subscriptionId,
};
state.addAccounts([account]);
(async () => {
// TODO: Wrap this for/await loop in a try/catch and write code to recover in the event
// that the account subscription going down.
for await (const _ of accountNotifications) {
// get the last transaction signature for the account, since that wha triggered this callback
const [lastSignatureResponse] = await api.rpc
.getSignaturesForAddress(address(a.descriptor), {
limit: 1,
})
.send();
const lastSignature = lastSignatureResponse?.signature;
if (!lastSignature) return;

// get the last transaction
const lastTx = await api.rpc
.getTransaction(lastSignature, {
encoding: 'jsonParsed',
maxSupportedTransactionVersion: 0,
commitment: 'finalized',
})
.send();

if (!lastTx || !isValidTransaction(lastTx)) {
return;
}

const tokenMetadata = await getTokenMetadata();
const tx = solanaUtils.transformTransaction(
lastTx,
a.descriptor,
[],
tokenMetadata,
);

// For token accounts we need to emit an event with the owner account's descriptor
// since we don't store token accounts in the user's accounts.
const descriptor =
findTokenAccountOwner(state.getAccounts(), a.descriptor)?.descriptor ||
a.descriptor;

post({
id: -1,
type: RESPONSES.NOTIFICATION,
payload: {
type: 'notification',
payload: {
descriptor,
tx,
},
},
});
}
})();
handleAccountNotification(context, accountNotifications, account);
}),
);

Expand Down Expand Up @@ -714,6 +731,17 @@ class SolanaWorker extends BaseWorker<SolanaAPI> {
const request: Request<MessageTypes.Message> = {
...event.data,
connect: () => this.connect(),
onNetworkDisconnect: () => {
if (this.api) {
// Broadcast that we are being disconnected
this.post({
id: -1,
type: RESPONSES.DISCONNECTED,
payload: true,
});
}
this.disconnect();
},
post: (data: Response) => this.post(data),
state: this.state,
getTokenMetadata: this.lazyTokens.getOrInit,
Expand Down

0 comments on commit 018648a

Please sign in to comment.