Skip to content

DIPs agent rebased (2) #1113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/action-queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ The action execution worker will only grab items from the action queue to execut

## Allocation management modes:
- `auto`: The indexer-agent will act similarly to the legacy paradigm. When it identifies allocation actions it will add them to the queue with ActionStatus = `approved`; the execution worker process will pick up the approved actions within 30 seconds and execute them.
- `manual`: The indexer-agent will not add any items to the action queue in this mode. It will spin up an indexer-management server which can be interacted with manually or integrated with 3rd party tools to add actions to the action queue and execute them.
- `oversight`: The indexer-agent will add run its reconciliation loop to make allocation decisions and when actions are identified it will queue them. These actions will then require approval before they can be executed.
- `manual`: The indexer-agent will not add any items to the action queue in this mode. It will spin up an indexer-management server which can be interacted with manually or integrated with 3rd party tools to add actions to the action queue and execute them. An exception to this is indexing agreements (DIPs), for which actions will be queued and executed even in this mode.
- `oversight`: The indexer-agent will add run its reconciliation loop to make allocation decisions and when actions are identified it will queue them. These actions will then require approval before they can be executed. An exception to this is indexing agreements (DIPs), for which actions will be queued as approved and executed even in this mode.

## Actions CLI
The indexer-cli provides an `actions` module for manually working with the action queue. It uses the #Graphql API hosted by the indexer management server to interact with the actions queue.
Expand Down
1 change: 1 addition & 0 deletions packages/indexer-agent/src/__tests__/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ const setup = async () => {
const network = await Network.create(
logger,
networkSpecification,
models,
queryFeeModels,
graphNode,
metrics,
Expand Down
73 changes: 67 additions & 6 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,20 @@ export class Agent {
{ logger, milliseconds: requestIntervalSmall },
async () => {
return this.multiNetworks.map(async ({ network, operator }) => {
if (network.specification.indexerOptions.enableDips) {
// There should be a DipsManager in the operator
if (!operator.dipsManager) {
throw new Error('DipsManager is not available')
}
logger.debug('Ensuring indexing rules for DIPs', {
protocolNetwork: network.specification.networkIdentifier,
})
await operator.dipsManager.ensureAgreementRules()
} else {
logger.debug(
'DIPs is disabled, skipping indexing rule enforcement',
)
}
logger.trace('Fetching indexing rules', {
protocolNetwork: network.specification.networkIdentifier,
})
Expand Down Expand Up @@ -324,12 +338,21 @@ export class Agent {
},
)

// Skip fetching active deployments if the deployment management mode is manual and POI tracking is disabled
// Skip fetching active deployments if the deployment management mode is manual, DIPs is disabled, and POI tracking is disabled
const activeDeployments: Eventual<SubgraphDeploymentID[]> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalLarge },
async () => {
if (this.deploymentManagement === DeploymentManagementMode.AUTO) {
let dipsEnabled = false
await this.multiNetworks.map(async ({ network }) => {
if (network.specification.indexerOptions.enableDips) {
dipsEnabled = true
}
})
if (
this.deploymentManagement === DeploymentManagementMode.AUTO ||
dipsEnabled
) {
logger.debug('Fetching active deployments')
const assignments =
await this.graphNode.subgraphDeploymentsAssignments(
Expand All @@ -338,7 +361,7 @@ export class Agent {
return assignments.map(assignment => assignment.id)
} else {
logger.info(
"Skipping fetching active deployments fetch since DeploymentManagementMode = 'manual' and POI tracking is disabled",
"Skipping fetching active deployments fetch since DeploymentManagementMode = 'manual' and DIPs is disabled",
)
return []
}
Expand Down Expand Up @@ -729,9 +752,42 @@ export class Agent {
}
break
case DeploymentManagementMode.MANUAL:
this.logger.debug(
`Skipping subgraph deployment reconciliation since DeploymentManagementMode = 'manual'`,
)
await this.multiNetworks.map(async ({ network, operator }) => {
if (network.specification.indexerOptions.enableDips) {
// Reconcile DIPs deployments anyways
this.logger.warn(
`Deployment management is manual, but DIPs is enabled. Reconciling DIPs deployments anyways.`,
)
if (!operator.dipsManager) {
throw new Error('DipsManager is not available')
}
const dipsDeployments =
await operator.dipsManager.getActiveDipsDeployments()
const newTargetDeployments = new Set([
...activeDeployments,
...dipsDeployments,
])
try {
await this.reconcileDeployments(
activeDeployments,
Array.from(newTargetDeployments),
eligibleAllocations,
)
} catch (err) {
logger.warn(
`Exited early while reconciling deployments. Skipped reconciling actions.`,
{
err: indexerError(IndexerErrorCode.IE005, err),
},
)
return
}
} else {
this.logger.debug(
`Skipping subgraph deployment reconciliation since DeploymentManagementMode = 'manual'`,
)
}
})
break
default:
throw new Error(
Expand Down Expand Up @@ -1053,6 +1109,7 @@ export class Agent {
maxAllocationEpochs: number,
network: Network,
operator: Operator,
forceAction: boolean = false,
): Promise<void> {
const logger = this.logger.child({
deployment: deploymentAllocationDecision.deployment.ipfsHash,
Expand All @@ -1074,6 +1131,7 @@ export class Agent {
logger,
deploymentAllocationDecision,
activeDeploymentAllocations,
forceAction,
)
case true: {
// If no active allocations and subgraph health passes safety check, create one
Expand Down Expand Up @@ -1110,6 +1168,7 @@ export class Agent {
logger,
deploymentAllocationDecision,
mostRecentlyClosedAllocation,
forceAction,
)
}
} else if (activeDeploymentAllocations.length > 0) {
Expand All @@ -1118,6 +1177,7 @@ export class Agent {
logger,
deploymentAllocationDecision,
activeDeploymentAllocations,
forceAction,
)
} else {
// Refresh any expiring allocations
Expand All @@ -1134,6 +1194,7 @@ export class Agent {
logger,
deploymentAllocationDecision,
expiringAllocations,
forceAction,
)
}
}
Expand Down
36 changes: 35 additions & 1 deletion packages/indexer-agent/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,26 @@ export const start = {
default: 1,
group: 'Indexer Infrastructure',
})
.option('enable-dips', {
description: 'Whether to enable Indexing Fees (DIPs)',
type: 'boolean',
default: false,
group: 'Indexing Fees ("DIPs")',
})
.option('dipper-endpoint', {
description: 'Gateway endpoint for DIPs receipts',
type: 'string',
array: false,
required: false,
group: 'Indexing Fees ("DIPs")',
})
.option('dips-allocation-amount', {
description: 'Amount of GRT to allocate for DIPs',
type: 'number',
default: 1,
required: false,
group: 'Indexing Fees ("DIPs")',
})
.check(argv => {
if (
!argv['network-subgraph-endpoint'] &&
Expand Down Expand Up @@ -330,6 +350,9 @@ export const start = {
) {
return 'Invalid --rebate-claim-max-batch-size provided. Must be > 0 and an integer.'
}
if (argv['enable-dips'] && !argv['dipper-endpoint']) {
return 'Invalid --dipper-endpoint provided. Must be provided when --enable-dips is true.'
}
return true
})
},
Expand Down Expand Up @@ -365,6 +388,10 @@ export async function createNetworkSpecification(
allocateOnNetworkSubgraph: argv.allocateOnNetworkSubgraph,
register: argv.register,
finalityTime: argv.chainFinalizeTime,
enableDips: argv.enableDips,
dipperEndpoint: argv.dipperEndpoint,
dipsAllocationAmount: argv.dipsAllocationAmount,
dipsEpochsMargin: argv.dipsEpochsMargin,
}

const transactionMonitoring = {
Expand Down Expand Up @@ -583,7 +610,14 @@ export async function run(
const networks: Network[] = await pMap(
networkSpecifications,
async (spec: NetworkSpecification) =>
Network.create(logger, spec, queryFeeModels, graphNode, metrics),
Network.create(
logger,
spec,
managementModels,
queryFeeModels,
graphNode,
metrics,
),
)

// --------------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions packages/indexer-cli/src/__tests__/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ export const setup = async (multiNetworksEnabled: boolean) => {
const network = await Network.create(
logger,
testNetworkSpecification,
models,
queryFeeModels,
graphNode,
metrics,
Expand Down
3 changes: 3 additions & 0 deletions packages/indexer-common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
},
"dependencies": {
"@pinax/graph-networks-registry": "0.6.7",
"@bufbuild/protobuf": "2.2.3",
"@graphprotocol/dips-proto": "0.2.2",
"@grpc/grpc-js": "^1.12.6",
"@graphprotocol/common-ts": "2.0.11",
"@semiotic-labs/tap-contracts-bindings": "^1.2.1",
"@thi.ng/heaps": "1.2.38",
Expand Down
3 changes: 3 additions & 0 deletions packages/indexer-common/src/allocations/__tests__/tap.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
TapSubgraphResponse,
TapCollector,
Allocation,
defineIndexerManagementModels,
} from '@graphprotocol/indexer-common'
import {
Address,
Expand Down Expand Up @@ -43,6 +44,7 @@ const setup = async () => {
// Clearing the registry prevents duplicate metric registration in the default registry.
metrics.registry.clear()
sequelize = await connectDatabase(__DATABASE__)
const models = defineIndexerManagementModels(sequelize)
queryFeeModels = defineQueryFeeModels(sequelize)
sequelize = await sequelize.sync({ force: true })

Expand All @@ -57,6 +59,7 @@ const setup = async () => {
const network = await Network.create(
logger,
testNetworkSpecification,
models,
queryFeeModels,
graphNode,
metrics,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
defineIndexerManagementModels,
defineQueryFeeModels,
GraphNode,
Network,
Expand Down Expand Up @@ -36,6 +37,7 @@ const setup = async () => {
// Clearing the registry prevents duplicate metric registration in the default registry.
metrics.registry.clear()
sequelize = await connectDatabase(__DATABASE__)
const models = defineIndexerManagementModels(sequelize)
queryFeeModels = defineQueryFeeModels(sequelize)
sequelize = await sequelize.sync({ force: true })

Expand All @@ -50,6 +52,7 @@ const setup = async () => {
const network = await Network.create(
logger,
testNetworkSpecification,
models,
queryFeeModels,
graphNode,
metrics,
Expand Down
31 changes: 31 additions & 0 deletions packages/indexer-common/src/allocations/escrow-accounts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ export type EscrowAccountResponse = {
}[]
}

export type EscrowSenderResponse = {
signer: {
sender: {
id: string
}
}
}

export class EscrowAccounts {
constructor(private sendersBalances: Map<Address, U256>) {}

Expand Down Expand Up @@ -65,3 +73,26 @@ export const getEscrowAccounts = async (
}
return EscrowAccounts.fromResponse(result.data)
}

export const getEscrowSenderForSigner = async (
tapSubgraph: SubgraphClient,
signer: Address,
): Promise<Address> => {
const signerLower = signer.toLowerCase()
const result = await tapSubgraph.query<EscrowSenderResponse>(
gql`
query EscrowAccountQuery($signer: ID!) {
signer(id: $signer) {
sender {
id
}
}
}
`,
{ signer: signerLower },
)
if (!result.data) {
throw `There was an error while querying Tap Subgraph. Errors: ${result.error}`
}
return toAddress(result.data.signer.sender.id)
}
16 changes: 16 additions & 0 deletions packages/indexer-common/src/graph-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,22 @@ export class GraphNode {
}
}

public async entityCount(deployments: SubgraphDeploymentID[]): Promise<number[]> {
// Query the entity count for each deployment using the indexingStatuses query
const query = `
query entityCounts($deployments: [String!]!) {
indexingStatuses(subgraphs: $deployments) {
entityCount
}
}
`
const result = await this.status
.query(query, { deployments: deployments.map((id) => id.ipfsHash) })
.toPromise()

return result.data.indexingStatuses.map((status) => status.entityCount) as number[]
}

public async proofOfIndexing(
deployment: SubgraphDeploymentID,
block: BlockPointer,
Expand Down
2 changes: 2 additions & 0 deletions packages/indexer-common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export * from './allocations'
export * from './async-cache'
export * from './errors'
export * from './indexer-management'
export * from './indexing-fees'
export * from './graph-node'
export * from './operator'
export * from './network'
Expand All @@ -17,3 +18,4 @@ export * from './parsers'
export * as specification from './network-specification'
export * from './multi-networks'
export * from './sequential-timer'
export * from './indexing-fees'
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const setup = async () => {
const network = await Network.create(
logger,
testNetworkSpecification,
managementModels,
queryFeeModels,
graphNode,
metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export const createTestManagementClient = async (
const network = await Network.create(
logger,
networkSpecification,
managementModels,
queryFeeModels,
graphNode,
metrics,
Expand Down
Loading
Loading