Skip to content

Commit

Permalink
refactor: accounting service
Browse files Browse the repository at this point in the history
  • Loading branch information
fforbeck committed Oct 30, 2024
1 parent 847829b commit 902d69c
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 79 deletions.
12 changes: 2 additions & 10 deletions src/bindings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,13 @@ import { Environment as CarBlockEnvironment } from './middleware/withCarBlockHan
import { Environment as ContentClaimsDagulaEnvironment } from './middleware/withCarBlockHandler.types.ts'
import { Environment as EgressTrackerEnvironment } from './middleware/withEgressTracker.types.ts'
import { UnknownLink } from 'multiformats'
import { DIDKey } from '@ucanto/principal/ed25519'

export interface Environment
extends CarBlockEnvironment,
RateLimiterEnvironment,
ContentClaimsDagulaEnvironment,
EgressTrackerEnvironment {
VERSION: string
CONTENT_CLAIMS_SERVICE_URL?: string
ACCOUNTING_SERVICE_URL: string
}

export interface AccountingService {
record: (resource: UnknownLink, bytes: number, servedAt: string) => Promise<void>
getTokenMetadata: (token: string) => Promise<TokenMetadata | null>
}

export interface Accounting {
create: ({ serviceURL }: { serviceURL: string }) => AccountingService
}
37 changes: 37 additions & 0 deletions src/middleware/withAccountingService.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* @import { Middleware } from '@web3-storage/gateway-lib'
* @typedef {import('./withAccountingService.types.ts').AccountingServiceContext} AccountingServiceContext
* @typedef {import('./withAccountingService.types.ts').Environment} Environment
*/

/**
* The accounting service handler exposes the method `record` to record the egress bytes for a given SpaceDID, Content CID, and servedAt timestamp.
*
* @type {Middleware<AccountingServiceContext, AccountingServiceContext, Environment>}
*/
export function withAccountingService (handler) {
return async (req, env, ctx) => {
const accountingService = create(env, ctx)

return handler(req, env, { ...ctx, accountingService })
}
}

/**
* @param {Environment} env
* @param {AccountingServiceContext} ctx
* @returns {import('./withAccountingService.types.ts').AccountingService}
*/
function create (env, ctx) {
return {
/**
* @param {import('@ucanto/principal/ed25519').DIDKey} space - The Space DID where the content was served
* @param {import('@ucanto/principal/ed25519').UnknownLink} resource - The link to the resource that was served
* @param {number} bytes - The number of bytes served
* @param {string} servedAt - The timestamp of when the content was served
*/
record: async (space, resource, bytes, servedAt) => {
console.log(`Record egress: ${space}, ${resource}, ${bytes}, ${servedAt}`)
}
}
}
19 changes: 19 additions & 0 deletions src/middleware/withAccountingService.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Environment as MiddlewareEnvironment, Context as MiddlewareContext } from '@web3-storage/gateway-lib'
import { DIDKey, UnknownLink } from '@ucanto/principal/ed25519'

export interface Environment extends MiddlewareEnvironment {
//TODO: ucanto signer principal key
}

export interface AccountingServiceContext extends MiddlewareContext {
accountingService?: AccountingService
}

export interface TokenMetadata {
locationClaim?: unknown // TODO: figure out the right type to use for this - we probably need it for the private data case to verify auth
invalid?: boolean
}

export interface AccountingService {
record: (space: DIDKey, resource: UnknownLink, bytes: number, servedAt: string) => Promise<void>
}
15 changes: 4 additions & 11 deletions src/middleware/withEgressTracker.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import { Accounting } from '../services/accounting.js'

/**
* @import { Context, IpfsUrlContext, Middleware } from '@web3-storage/gateway-lib'
* @import { Middleware } from '@web3-storage/gateway-lib'
* @import { Environment } from './withEgressTracker.types.js'
* @import { AccountingService } from '../bindings.js'
* @typedef {IpfsUrlContext & { ACCOUNTING_SERVICE?: AccountingService }} EgressTrackerContext
* @typedef {import('./withEgressTracker.types.js').Context} EgressTrackerContext
*/

/**
Expand All @@ -26,17 +23,13 @@ export function withEgressTracker (handler) {
return response
}

const { dataCid } = ctx
const accounting = ctx.ACCOUNTING_SERVICE ?? Accounting.create({
serviceURL: env.ACCOUNTING_SERVICE_URL
})

const responseBody = response.body.pipeThrough(
createByteCountStream((totalBytesServed) => {
// Non-blocking call to the accounting service to record egress
if (totalBytesServed > 0) {
const { space, dataCid: resource } = ctx
ctx.waitUntil(
accounting.record(dataCid, totalBytesServed, new Date().toISOString())
ctx.accountingService.record(space, resource, totalBytesServed, new Date().toISOString())
)
}
})
Expand Down
10 changes: 8 additions & 2 deletions src/middleware/withEgressTracker.types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib'
import { IpfsUrlContext, Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib'
import { AccountingService } from './withAccountingService.types.js'
import { DIDKey, UnknownLink } from '@ucanto/client'

export interface Environment extends MiddlewareEnvironment {
ACCOUNTING_SERVICE_URL: string
FF_EGRESS_TRACKER_ENABLED: string
}

export interface Context extends IpfsUrlContext {
space: DIDKey
accountingService: AccountingService
}
23 changes: 15 additions & 8 deletions src/middleware/withRateLimit.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import { HttpError } from '@web3-storage/gateway-lib/util'
import { RATE_LIMIT_EXCEEDED } from '../constants.js'
import { Accounting } from '../services/accounting.js'

/**
* @import { Middleware } from '@web3-storage/gateway-lib'
* @import { R2Bucket, KVNamespace, RateLimit } from '@cloudflare/workers-types'
* @import {
* Environment,
* Context,
* TokenMetadata,
* RateLimitService,
* RateLimitExceeded
* } from './withRateLimit.types.js'
* @typedef {Context & { ACCOUNTING_SERVICE?: import('../bindings.js').AccountingService }} RateLimiterContext
* @typedef {Context} RateLimiterContext
*/

/**
Expand Down Expand Up @@ -101,7 +99,7 @@ async function isRateLimited (rateLimitAPI, cid) {
* @param {Environment} env
* @param {string} authToken
* @param {RateLimiterContext} ctx
* @returns {Promise<TokenMetadata | null>}
* @returns {Promise<import('./withAccountingService.types.js').TokenMetadata | null>}
*/
async function getTokenMetadata (env, authToken, ctx) {
const cachedValue = await env.AUTH_TOKEN_METADATA.get(authToken)
Expand All @@ -111,8 +109,7 @@ async function getTokenMetadata (env, authToken, ctx) {
return decode(cachedValue)
}

const accounting = ctx.ACCOUNTING_SERVICE ?? Accounting.create({ serviceURL: env.ACCOUNTING_SERVICE_URL })
const tokenMetadata = await accounting.getTokenMetadata(authToken)
const tokenMetadata = findTokenMetadata(authToken)
if (tokenMetadata) {
// NOTE: non-blocking call to the auth token metadata cache
ctx.waitUntil(env.AUTH_TOKEN_METADATA.put(authToken, encode(tokenMetadata)))
Expand All @@ -122,17 +119,27 @@ async function getTokenMetadata (env, authToken, ctx) {
return null
}

/**
* @param {string} authToken
* @returns {import('./withAccountingService.types.js').TokenMetadata | null}
*/
function findTokenMetadata (authToken) {
// TODO I think this needs to check the content claims service (?) for any claims relevant to this token
// TODO do we have a plan for this? need to ask Hannah if the indexing service covers this?
return null
}

/**
* @param {string} s
* @returns {TokenMetadata}
* @returns {import('./withAccountingService.types.js').TokenMetadata}
*/
function decode (s) {
// TODO should this be dag-json?
return JSON.parse(s)
}

/**
* @param {TokenMetadata} m
* @param {import('./withAccountingService.types.js').TokenMetadata} m
* @returns {string}
*/
function encode (m) {
Expand Down
6 changes: 0 additions & 6 deletions src/middleware/withRateLimit.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { KVNamespace, RateLimit } from '@cloudflare/workers-types'
import { RATE_LIMIT_EXCEEDED } from '../constants.js'

export interface Environment extends MiddlewareEnvironment {
ACCOUNTING_SERVICE_URL: string
RATE_LIMITER: RateLimit
AUTH_TOKEN_METADATA: KVNamespace
FF_RATE_LIMITER_ENABLED: string
Expand All @@ -14,11 +13,6 @@ export interface Context extends IpfsUrlContext {
authToken: string | null
}

export interface TokenMetadata {
locationClaim?: unknown // TODO: figure out the right type to use for this - we probably need it for the private data case to verify auth
invalid?: boolean
}

export type RateLimitExceeded = typeof RATE_LIMIT_EXCEEDED[keyof typeof RATE_LIMIT_EXCEEDED]

export interface RateLimitService {
Expand Down
16 changes: 0 additions & 16 deletions src/services/accounting.js

This file was deleted.

60 changes: 36 additions & 24 deletions test/unit/middleware/withEgressTracker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,41 +30,38 @@ const env =
/** @satisfies {import('../../../src/middleware/withEgressTracker.types.js').Environment} */
({
DEBUG: 'true',
ACCOUNTING_SERVICE_URL: 'http://example.com',
FF_EGRESS_TRACKER_ENABLED: 'true'
})

const accountingRecordMethodStub = sinon.stub()
.returns(
/** @type {import('../../../src/bindings.js').AccountingService['record']} */
async (cid, bytes, servedAt) => {
console.log(`[mock] record called with cid: ${cid}, bytes: ${bytes}, servedAt: ${servedAt}`)
/** @type {import('../../../src/middleware/withAccountingService.types.js').AccountingService['record']} */
async (space, resource, bytes, servedAt) => {
console.log(`[mock] record called with space: ${space}, resource: ${resource}, bytes: ${bytes}, servedAt: ${servedAt}`)
})

/**
* Mock implementation of the AccountingService.
*
* @param {Object} options
* @param {string} options.serviceURL - The URL of the accounting service.
* @returns {import('../../../src/bindings.js').AccountingService}
* @returns {import('../../../src/middleware/withAccountingService.types.js').AccountingService}
*/
const AccountingService = ({ serviceURL }) => {
console.log(`[mock] Accounting.create called with serviceURL: ${serviceURL}`)
const AccountingService = () => {
console.log('[mock] Accounting.create called')

return {
record: accountingRecordMethodStub,
getTokenMetadata: sinon.stub().resolves(undefined)
record: accountingRecordMethodStub
}
}

const ctx =
/** @satisfies {import('../../../src/middleware/withEgressTracker.js').EgressTrackerContext} */
/** @satisfies {import('../../../src/middleware/withEgressTracker.types.js').Context} */
({
space: 'did:key:z6MkknBAHEGCWvBzAi4amdH5FXEXrdKoWF1UJuvc8Psm2Mda',
dataCid: CID.parse('bafybeibv7vzycdcnydl5n5lbws6lul2omkm6a6b5wmqt77sicrwnhesy7y'),
waitUntil: sinon.stub().returns(undefined),
path: '',
searchParams: new URLSearchParams(),
ACCOUNTING_SERVICE: AccountingService({ serviceURL: env.ACCOUNTING_SERVICE_URL })
accountingService: AccountingService()
})

describe('withEgressTracker', async () => {
Expand Down Expand Up @@ -119,8 +116,9 @@ describe('withEgressTracker', async () => {
expect(response.status).to.equal(200)
expect(responseBody).to.equal('Hello, world!')
expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true
expect(accountingRecordMethodStub.args[0][0], 'first argument should be the cid').to.equal(ctx.dataCid)
expect(accountingRecordMethodStub.args[0][1], 'second argument should be the total bytes').to.equal(totalBytes)
expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space)
expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid)
expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes)
}).timeout(10_000)

it('should record egress for a large file', async () => {
Expand All @@ -142,8 +140,9 @@ describe('withEgressTracker', async () => {

expect(response.status).to.equal(200)
expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true
expect(accountingRecordMethodStub.args[0][0], 'first argument should be the cid').to.equal(ctx.dataCid)
expect(accountingRecordMethodStub.args[0][1], 'second argument should be the total bytes').to.equal(totalBytes)
expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space)
expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid)
expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes)
})

it('should correctly track egress for responses with chunked transfer encoding', async () => {
Expand All @@ -169,7 +168,9 @@ describe('withEgressTracker', async () => {
expect(response.status).to.equal(200)
expect(responseBody).to.equal('Hello, world!')
expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true
expect(accountingRecordMethodStub.args[0][1], 'second argument should be the total bytes').to.equal(totalBytes)
expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space)
expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid)
expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes)
})

it('should record egress bytes for a CAR file request', async () => {
Expand Down Expand Up @@ -216,7 +217,9 @@ describe('withEgressTracker', async () => {

// expect(blocks[0].bytes).to.deep.equal(carBytes) - FIXME (fforbeck): how to get the correct byte count?
expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true
expect(accountingRecordMethodStub.args[0][1], 'second argument should be the total bytes').to.equal(expectedTotalBytes)
expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space)
expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid)
expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(expectedTotalBytes)
})

it('should correctly track egress for delayed responses', async () => {
Expand All @@ -242,7 +245,9 @@ describe('withEgressTracker', async () => {
expect(response.status).to.equal(200)
expect(responseBody).to.equal('Delayed response content')
expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true
expect(accountingRecordMethodStub.args[0][1], 'second argument should be the total bytes').to.equal(totalBytes)
expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space)
expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid)
expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes)
}).timeout(5000)
})

Expand Down Expand Up @@ -330,8 +335,13 @@ describe('withEgressTracker', async () => {
expect(responseBody2).to.equal('Goodbye, world!')

expect(accountingRecordMethodStub.calledTwice, 'record should be called twice').to.be.true
expect(accountingRecordMethodStub.args[0][1], 'second argument should be the total bytes for first request').to.equal(totalBytes1)
expect(accountingRecordMethodStub.args[1][1], 'second argument should be the total bytes for second request').to.equal(totalBytes2)
expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space)
expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid for first request').to.equal(ctx.dataCid)
expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes for first request').to.equal(totalBytes1)

expect(accountingRecordMethodStub.args[1][0], 'first argument should be the space').to.equal(ctx.space)
expect(accountingRecordMethodStub.args[1][1], 'second argument should be the cid for second request').to.equal(ctx.dataCid)
expect(accountingRecordMethodStub.args[1][2], 'third argument should be the total bytes for second request').to.equal(totalBytes2)
}).timeout(10_000)
})

Expand All @@ -356,7 +366,9 @@ describe('withEgressTracker', async () => {
expect(response.status).to.equal(200)
expect(responseBody).to.deep.equal({ message: 'Hello, JSON!' })
expect(accountingRecordMethodStub.calledOnce, 'record should be called once').to.be.true
expect(accountingRecordMethodStub.args[0][1], 'second argument should be the total bytes').to.equal(totalBytes)
expect(accountingRecordMethodStub.args[0][0], 'first argument should be the space').to.equal(ctx.space)
expect(accountingRecordMethodStub.args[0][1], 'second argument should be the cid').to.equal(ctx.dataCid)
expect(accountingRecordMethodStub.args[0][2], 'third argument should be the total bytes').to.equal(totalBytes)
}).timeout(10_000)
})

Expand Down Expand Up @@ -449,7 +461,7 @@ describe('withEgressTracker', async () => {
const request = await createRequest()

// Simulate an error in the accounting service record method
ctx.ACCOUNTING_SERVICE.record = sinon.stub().rejects(new Error('Accounting service error'))
ctx.accountingService.record = sinon.stub().rejects(new Error('Accounting service error'))

const response = await handler(request, env, ctx)
const responseBody = await response.text()
Expand Down
1 change: 0 additions & 1 deletion test/unit/middleware/withRateLimit.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const env =
/** @satisfies {Environment} */
({
DEBUG: 'false',
ACCOUNTING_SERVICE_URL: 'http://example.com/accounting-service',
RATE_LIMITER: {
limit: strictStub(sandbox, 'limit')
},
Expand Down
1 change: 0 additions & 1 deletion wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ DEBUG = "true"
FF_RATE_LIMITER_ENABLED = "true"
FF_EGRESS_TRACKER_ENABLED = "true"
CONTENT_CLAIMS_SERVICE_URL = "https://staging.claims.web3.storage"
ACCOUNTING_SERVICE_URL = "https://example.com/service"

[[env.integration.unsafe.bindings]]
name = "RATE_LIMITER"
Expand Down

0 comments on commit 902d69c

Please sign in to comment.