Skip to content

feat: indexing fees (DIPs) - WIP - rebased #1094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/action-queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ The action execution worker will only grab items from the action queue to execut

## Allocation management modes:
- `auto`: The indexer-agent will act similarly to the legacy paradigm. When it identifies allocation actions it will add them to the queue with ActionStatus = `approved`; the execution worker process will pick up the approved actions within 30 seconds and execute them.
- `manual`: The indexer-agent will not add any items to the action queue in this mode. It will spin up an indexer-management server which can be interacted with manually or integrated with 3rd party tools to add actions to the action queue and execute them.
- `oversight`: The indexer-agent will add run its reconciliation loop to make allocation decisions and when actions are identified it will queue them. These actions will then require approval before they can be executed.
- `manual`: The indexer-agent will not add any items to the action queue in this mode. It will spin up an indexer-management server which can be interacted with manually or integrated with 3rd party tools to add actions to the action queue and execute them. An exception to this is indexing agreements (DIPs), for which actions will be queued and executed even in this mode.
- `oversight`: The indexer-agent will add run its reconciliation loop to make allocation decisions and when actions are identified it will queue them. These actions will then require approval before they can be executed. An exception to this is indexing agreements (DIPs), for which actions will be queued as approved and executed even in this mode.

## Actions CLI
The indexer-cli provides an `actions` module for manually working with the action queue. It uses the #Graphql API hosted by the indexer management server to interact with the actions queue.
Expand Down
1 change: 1 addition & 0 deletions packages/indexer-agent/src/__tests__/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ const setup = async () => {
const network = await Network.create(
logger,
networkSpecification,
models,
queryFeeModels,
graphNode,
metrics,
Expand Down
94 changes: 79 additions & 15 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,16 @@ export class Agent {
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
async () => {
if (network.specification.indexerOptions.enableDips) {
// There should be a DipsManager in the operator
if (!operator.dipsManager) {
throw new Error('DipsManager is not available')
}
logger.trace('Ensuring indexing rules for DIPS', {
protocolNetwork: network.specification.networkIdentifier,
})
await operator.dipsManager.ensureAgreementRules()
}
logger.trace('Fetching indexing rules', {
protocolNetwork: network.specification.networkIdentifier,
})
Expand Down Expand Up @@ -252,14 +262,15 @@ export class Agent {
},
)

// Skip fetching active deployments if the deployment management mode is manual and POI tracking is disabled
// Skip fetching active deployments if the deployment management mode is manual, DIPs is disabled, and POI tracking is disabled
const activeDeployments: Eventual<SubgraphDeploymentID[]> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalLarge },
async () => {
if (
this.deploymentManagement === DeploymentManagementMode.AUTO ||
network.networkMonitor.poiDisputeMonitoringEnabled()
network.networkMonitor.poiDisputeMonitoringEnabled() ||
network.specification.indexerOptions.enableDips
) {
logger.trace('Fetching active deployments')
const assignments =
Expand Down Expand Up @@ -487,9 +498,40 @@ export class Agent {
}
break
case DeploymentManagementMode.MANUAL:
this.logger.debug(
`Skipping subgraph deployment reconciliation since DeploymentManagementMode = 'manual'`,
)
if (network.specification.indexerOptions.enableDips) {
// Reconcile DIPs deployments anyways
this.logger.warn(
`Deployment management is manual, but DIPs is enabled. Reconciling DIPs deployments anyways.`,
)
if (!operator.dipsManager) {
throw new Error('DipsManager is not available')
}
const dipsDeployments =
await operator.dipsManager.getActiveDipsDeployments()
const newTargetDeployments = new Set([
...activeDeployments,
...dipsDeployments,
])
try {
await this.reconcileDeployments(
activeDeployments,
Array.from(newTargetDeployments),
eligibleAllocations,
)
} catch (err) {
logger.warn(
`Exited early while reconciling deployments. Skipped reconciling actions.`,
{
err: indexerError(IndexerErrorCode.IE005, err),
},
)
return
}
} else {
this.logger.debug(
`Skipping subgraph deployment reconciliation since DeploymentManagementMode = 'manual'`,
)
}
break
default:
throw new Error(
Expand Down Expand Up @@ -810,6 +852,7 @@ export class Agent {
maxAllocationEpochs: number,
network: Network,
operator: Operator,
forceAction: boolean = false,
): Promise<void> {
const logger = this.logger.child({
deployment: deploymentAllocationDecision.deployment.ipfsHash,
Expand All @@ -831,6 +874,7 @@ export class Agent {
logger,
deploymentAllocationDecision,
activeDeploymentAllocations,
forceAction,
)
case true: {
// If no active allocations and subgraph health passes safety check, create one
Expand Down Expand Up @@ -867,6 +911,7 @@ export class Agent {
logger,
deploymentAllocationDecision,
mostRecentlyClosedAllocation,
forceAction,
)
}
} else if (activeDeploymentAllocations.length > 0) {
Expand All @@ -875,6 +920,7 @@ export class Agent {
logger,
deploymentAllocationDecision,
activeDeploymentAllocations,
forceAction,
)
} else {
// Refresh any expiring allocations
Expand All @@ -891,6 +937,7 @@ export class Agent {
logger,
deploymentAllocationDecision,
expiringAllocations,
forceAction,
)
}
}
Expand All @@ -910,21 +957,37 @@ export class Agent {
// --------------------------------------------------------------------------------
const { network, operator } = this.networkAndOperator
let validatedAllocationDecisions = [...allocationDecisions]
let dipsDeployments: SubgraphDeploymentID[] = []
if (network.specification.indexerOptions.enableDips) {
if (!operator.dipsManager) {
throw new Error('DipsManager is not available')
}
dipsDeployments = await operator.dipsManager.getActiveDipsDeployments()
}

if (
network.specification.indexerOptions.allocationManagementMode ===
AllocationManagementMode.MANUAL
) {
this.logger.debug(
`Skipping allocation reconciliation since AllocationManagementMode = 'manual'`,
{
protocolNetwork: network.specification.networkIdentifier,
targetDeployments: allocationDecisions
.filter(decision => decision.toAllocate)
.map(decision => decision.deployment.ipfsHash),
},
)
validatedAllocationDecisions = [] as AllocationDecision[]
if (network.specification.indexerOptions.enableDips) {
this.logger.warn(
`Allocation management is manual, but DIPs is enabled. Reconciling DIPs allocations anyways.`,
)
validatedAllocationDecisions = validatedAllocationDecisions.filter(
decision => dipsDeployments.includes(decision.deployment),
)
} else {
this.logger.trace(
`Skipping allocation reconciliation since AllocationManagementMode = 'manual'`,
{
protocolNetwork: network.specification.networkIdentifier,
targetDeployments: allocationDecisions
.filter(decision => decision.toAllocate)
.map(decision => decision.deployment.ipfsHash),
},
)
validatedAllocationDecisions = [] as AllocationDecision[]
}
} else {
const networkSubgraphDeployment = network.networkSubgraph.deployment
if (
Expand Down Expand Up @@ -985,6 +1048,7 @@ export class Agent {
maxAllocationEpochs,
network,
operator,
dipsDeployments.includes(decision.deployment), // Force actions if this is a DIPs deployment
),
)
return
Expand Down
28 changes: 28 additions & 0 deletions packages/indexer-agent/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,26 @@ export const start = {
default: 1,
group: 'Indexer Infrastructure',
})
.option('enable-dips', {
description: 'Whether to enable Indexing Fees (DIPs)',
type: 'boolean',
default: false,
group: 'Indexing Fees ("DIPs")',
})
.option('dipper-endpoint', {
description: 'Gateway endpoint for DIPs receipts',
type: 'string',
array: false,
required: false,
group: 'Indexing Fees ("DIPs")',
})
.option('dips-allocation-amount', {
description: 'Amount of GRT to allocate for DIPs',
type: 'number',
default: 1,
required: false,
group: 'Indexing Fees ("DIPs")',
})
.check(argv => {
if (
!argv['network-subgraph-endpoint'] &&
Expand Down Expand Up @@ -335,6 +355,9 @@ export const start = {
) {
return 'Invalid --rebate-claim-max-batch-size provided. Must be > 0 and an integer.'
}
if (argv['enable-dips'] && !argv['dipper-endpoint']) {
return 'Invalid --dipper-endpoint provided. Must be provided when --enable-dips is true.'
}
return true
})
},
Expand Down Expand Up @@ -370,6 +393,10 @@ export async function createNetworkSpecification(
allocateOnNetworkSubgraph: argv.allocateOnNetworkSubgraph,
register: argv.register,
finalityTime: argv.chainFinalizeTime,
enableDips: argv.enableDips,
dipperEndpoint: argv.dipperEndpoint,
dipsAllocationAmount: argv.dipsAllocationAmount,
dipsEpochsMargin: argv.dipsEpochsMargin,
}

const transactionMonitoring = {
Expand Down Expand Up @@ -587,6 +614,7 @@ export async function run(
const network = await Network.create(
logger,
networkSpecification,
managementModels,
queryFeeModels,
graphNode,
metrics,
Expand Down
1 change: 1 addition & 0 deletions packages/indexer-cli/src/__tests__/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export const setup = async () => {
const network = await Network.create(
logger,
testNetworkSpecification,
models,
queryFeeModels,
graphNode,
metrics,
Expand Down
3 changes: 3 additions & 0 deletions packages/indexer-common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
"clean": "rm -rf ./node_modules ./dist ./tsconfig.tsbuildinfo"
},
"dependencies": {
"@bufbuild/protobuf": "2.2.3",
"@graphprotocol/common-ts": "2.0.11",
"@graphprotocol/dips-proto": "0.2.2",
"@grpc/grpc-js": "^1.12.6",
"@pinax/graph-networks-registry": "0.6.7",
"@semiotic-labs/tap-contracts-bindings": "^1.2.1",
"@thi.ng/heaps": "1.2.38",
Expand Down
3 changes: 3 additions & 0 deletions packages/indexer-common/src/allocations/__tests__/tap.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
TapSubgraphResponse,
TapCollector,
Allocation,
defineIndexerManagementModels,
} from '@graphprotocol/indexer-common'
import {
Address,
Expand Down Expand Up @@ -43,6 +44,7 @@ const setup = async () => {
// Clearing the registry prevents duplicate metric registration in the default registry.
metrics.registry.clear()
sequelize = await connectDatabase(__DATABASE__)
const models = defineIndexerManagementModels(sequelize)
queryFeeModels = defineQueryFeeModels(sequelize)
sequelize = await sequelize.sync({ force: true })

Expand All @@ -56,6 +58,7 @@ const setup = async () => {
const network = await Network.create(
logger,
testNetworkSpecification,
models,
queryFeeModels,
graphNode,
metrics,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
defineIndexerManagementModels,
defineQueryFeeModels,
GraphNode,
Network,
Expand Down Expand Up @@ -36,6 +37,7 @@ const setup = async () => {
// Clearing the registry prevents duplicate metric registration in the default registry.
metrics.registry.clear()
sequelize = await connectDatabase(__DATABASE__)
const models = defineIndexerManagementModels(sequelize)
queryFeeModels = defineQueryFeeModels(sequelize)
sequelize = await sequelize.sync({ force: true })

Expand All @@ -49,6 +51,7 @@ const setup = async () => {
const network = await Network.create(
logger,
testNetworkSpecification,
models,
queryFeeModels,
graphNode,
metrics,
Expand Down
31 changes: 31 additions & 0 deletions packages/indexer-common/src/allocations/escrow-accounts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ export type EscrowAccountResponse = {
}[]
}

export type EscrowSenderResponse = {
signer: {
sender: {
id: string
}
}
}

export class EscrowAccounts {
constructor(private sendersBalances: Map<Address, U256>) {}

Expand Down Expand Up @@ -65,3 +73,26 @@ export const getEscrowAccounts = async (
}
return EscrowAccounts.fromResponse(result.data)
}

export const getEscrowSenderForSigner = async (
tapSubgraph: SubgraphClient,
signer: Address,
): Promise<Address> => {
const signerLower = signer.toLowerCase()
const result = await tapSubgraph.query<EscrowSenderResponse>(
gql`
query EscrowAccountQuery($signer: ID!) {
signer(id: $signer) {
sender {
id
}
}
}
`,
{ signer: signerLower },
)
if (!result.data) {
throw `There was an error while querying Tap Subgraph. Errors: ${result.error}`
}
return toAddress(result.data.signer.sender.id)
}
16 changes: 16 additions & 0 deletions packages/indexer-common/src/graph-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,22 @@ export class GraphNode {
}
}

public async entityCount(deployments: SubgraphDeploymentID[]): Promise<number[]> {
// Query the entity count for each deployment using the indexingStatuses query
const query = `
query entityCounts($deployments: [String!]!) {
indexingStatuses(subgraphs: $deployments) {
entityCount
}
}
`
const result = await this.status
.query(query, { deployments: deployments.map((id) => id.ipfsHash) })
.toPromise()

return result.data.indexingStatuses.map((status) => status.entityCount) as number[]
}

public async proofOfIndexing(
deployment: SubgraphDeploymentID,
block: BlockPointer,
Expand Down
2 changes: 2 additions & 0 deletions packages/indexer-common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export * from './allocations'
export * from './async-cache'
export * from './errors'
export * from './indexer-management'
export * from './indexing-fees'
export * from './graph-node'
export * from './operator'
export * from './network'
Expand All @@ -16,3 +17,4 @@ export * from './utils'
export * from './parsers'
export * as specification from './network-specification'
export * from './sequential-timer'
export * from './indexing-fees'
Loading
Loading