Skip to content

Commit

Permalink
updated some
Browse files Browse the repository at this point in the history
  • Loading branch information
akorchyn committed Mar 18, 2024
1 parent bc600f7 commit 1ad4ecd
Showing 1 changed file with 75 additions and 32 deletions.
107 changes: 75 additions & 32 deletions snapshotter/stake.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import Big from 'big.js'
import fs from 'fs';
import { program } from 'commander';

const EMPTY_HASH = '11111111111111111111111111111111'

program
.description('Load and process staking pools data from NEAR blockchain.')
.option('--block <type>', 'Block ID to fetch data from', '108194270')
Expand Down Expand Up @@ -75,6 +77,11 @@ _near.viewCall = (contractId, methodName, args, blockHeightOrFinality) => {
return viewCall(_near.nearArchivalConnection.provider, blockId ?? undefined, contractId, methodName, args, finality);
};

_near.viewAccount = (accountId, blockHeightOrFinality) => {
const { blockId, finality } = transformBlockId(blockHeightOrFinality);
return _near.nearArchivalConnection.provider.query({ request_type: "view_account", account_id: accountId, block_id: blockId, finality });
};

const processLockup = async (lockup) => {
return _near.viewCall(lockup, "get_owner_account_id", {}, blockId)
.then(owner => {
Expand Down Expand Up @@ -107,10 +114,17 @@ const processLockups = async (delegators) => {
return delegators;
}

async function checkRecordExists(client, account_id) {
const query = `SELECT EXISTS(SELECT 1 FROM ${tableName} WHERE ${columnName} = $1)`;
const res = await client.query(query, [account_id]);
return res.rows[0].exists;
async function checkRecordsExist(client, accountIds) {
const inClause = accountIds.map((value) => `'${value}'`).join(', ');

const query = `
SELECT ${columnName} from ${tableName} where ${columnName} IN (${inClause})
`;

const res = await client.query(query);
const exists = res.rows.map(row => row[columnName]);
return accountIds.map(accountId => ({ account_id: accountId, exists: exists.includes(accountId) }));

}

async function loadDelegatorsFromValidators(validators) {
Expand All @@ -119,10 +133,11 @@ async function loadDelegatorsFromValidators(validators) {
.withConcurrency(8)
.for(validators)
.process(async (accountId) =>
pRetry(() => _near.viewCall(accountId, "get_number_of_accounts", {}, blockId), { retries: 100 })
pRetry(() => _near.viewCall(accountId, "get_number_of_accounts", {}, blockId), { shouldRetry: (err) => !err.message.includes("MethodResolveError(MethodNotFound)"), retries: 100 })
.then(number_of_accounts => ({ account_id: accountId, number_of_accounts }))
);


let validatorRequests = [];

allValidatorsDetails.map(validatorsDetails => {
Expand All @@ -146,7 +161,7 @@ async function loadDelegatorsFromValidators(validators) {
}, blockId).then((accounts) => {
console.log(`Loading ${validatorRequest.account_id} delegators: batch #${1 + validatorRequest.from_index / 100}, added ${accounts.length} accounts`)
return accounts;
}), { retries: 100 });
}), { retries: 100, shouldRetry: (err) => !err.message.includes("MethodResolveError(MethodNotFound)") });
return data;
});
if (delegatorsError.length > 0) {
Expand All @@ -171,28 +186,67 @@ async function loadDelegatorsFromValidators(validators) {
return { results, errors: poolsError };
}

async function addToDatabase(client, account_id) {
const query = `INSERT INTO ${tableName} (${columnName}) VALUES ($1)`;
const res = await client.query(query, [account_id]);
async function addToDatabase(client, accounts) {
if (accounts === undefined || accounts.length === 0) {
return {};
}
const accountString = accounts.map((value) => `'${value}'`).join(', ');
const query = `INSERT INTO ${tableName} (${columnName}) VALUES (${accountString})`;
const res = await client.query(query);
return res;

}

async function processGaps(accounts, client) {
// We need to check the gap if it has staking interface:
// 1. If it has staking interface, we need to get the delegators
// 2. If it doesn't, we add the account to the database (it may be a gap / or unsupported staking mechanism)
let { results: newDelegators, errors: newDelegatorsErrors } = await loadDelegatorsFromValidators(accounts.map(account => account.account_id));
async function processGaps(accountsOrPools, client) {
if (accountsOrPools === undefined || accountsOrPools.length === 0) {
return {};
}

// Check if user has a contract
console.log(`Checking ${accountsOrPools.length} accounts...`)
let { results: accounts, errors } = await PromisePool
.withConcurrency(8)
.for(accountsOrPools)
.process(async (account) =>
pRetry(() =>
_near.viewAccount(account.account_id, blockId)
.then((data) => { return { ...account, data } }),
{
retries: 20,
// Account deleted :(
shouldRetry: (err) => !err.message.includes("does not exist while viewing")
}
)
);
if (errors.length > 0) {
console.log("Errors", errors);
}

let users = accounts.filter(account => account.data.code_hash === EMPTY_HASH);
console.log(`Found ${users.length} accounts without a contract.`);
let pools = accounts.filter(account => account.data.code_hash !== EMPTY_HASH);
console.log(`Found ${pools.length} accounts with a contract.`);

let { results: newDelegators, errors: newDelegatorsErrors } = await loadDelegatorsFromValidators(pools.map(account => account.account_id));
let failedAccoutns = newDelegatorsErrors.map(error => error.item);
if (newDelegators === undefined) {
newDelegators = {};
}
let added = [];

for (let i in failedAccoutns) {
let account = accounts[i];
console.log('Unsupported staking mechanism/missed account: Adding to the database:', account.account_id);
newDelegators[account.account_id] = account.stake;
await addToDatabase(client, account.account_id);
let contract = pools[i];
console.log('Unsupported staking mechanism: Adding to the database:', contract.account_id);
newDelegators[contract.account_id] = (newDelegators[contract.account_id] ?? 0) + contract.stake;
added.push(contract.account_id);
}
for (let account of users) {
console.log('Missed account: Adding to the database:', account.account_id);
newDelegators[account.account_id] = (newDelegators[account.account_id] ?? 0) + account.stake;
added.push(account.account_id);
}

await addToDatabase(client, added);
return newDelegators;
}

Expand All @@ -206,26 +260,15 @@ async function checkAndFixGaps(delegators, client) {
let lockuplessDelegators = await processLockups(delegators);

// Check the database for the records
const { results: existsResults, errors: existsErrors } = await PromisePool
.withConcurrency(8)
.for(Object.keys(lockuplessDelegators))
.process(async (account_id) => {
const exists = await checkRecordExists(client, account_id);
if (!exists) {
console.log(`Record for '${account_id}' does NOT exist in the database.`);
}
return { account_id, exists, stake: lockuplessDelegators[account_id] };
});
if (existsErrors.length > 0) {
console.log("Exists Errors", existsErrors);
}

const existsResults = await checkRecordsExist(client, Object.keys(lockuplessDelegators))
.then(records => records.map((elem) => { return { ...elem, stake: lockuplessDelegators[elem.account_id] } }));
const existedDelegators = existsResults.filter(result => result.exists);
const potentialDelegators = existsResults.filter(result => !result.exists);

console.log(`Found ${existedDelegators.length} delegators with records in the database.`);
console.log(`Found ${potentialDelegators.length} delegators without records in the database.`);


const processedDelegator = await processGaps(potentialDelegators, client);
const newDelegators = await checkAndFixGaps(processedDelegator, client);

Expand Down

0 comments on commit 1ad4ecd

Please sign in to comment.