Skip to content

Commit

Permalink
feat: count egress bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
fforbeck committed Oct 29, 2024
1 parent d097a91 commit 232c5fc
Show file tree
Hide file tree
Showing 10 changed files with 621 additions and 27 deletions.
10 changes: 6 additions & 4 deletions src/bindings.d.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import { CID } from '@web3-storage/gateway-lib/handlers'
import { Environment as RateLimiterEnvironment } from './handlers/rate-limiter.types.ts'
import { Environment as CarBlockEnvironment } from './handlers/car-block.types.ts'
import { Environment as EgressTrackerEnvironment } from './handlers/egress-tracker.types.ts'
import { UnknownLink } from 'multiformats'

export interface Environment extends CarBlockEnvironment, RateLimiterEnvironment {
export interface Environment extends CarBlockEnvironment, RateLimiterEnvironment, EgressTrackerEnvironment {
VERSION: string
CONTENT_CLAIMS_SERVICE_URL?: string
ACCOUNTING_SERVICE_URL: string
}

export interface AccountingService {
record: (cid: CID, options: GetCIDRequestConfig) => Promise<void>
record: (resource: UnknownLink, bytes: number, servedAt: string) => Promise<void>
getTokenMetadata: (token: string) => Promise<TokenMetadata | null>
}

export interface Accounting {
create: ({ serviceURL }: { serviceURL?: string }) => AccountingService
create: ({ serviceURL }: { serviceURL: string }) => AccountingService
}

117 changes: 117 additions & 0 deletions src/handlers/egress-tracker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import { Accounting } from '../services/accounting.js'

/**
* @import { Context, IpfsUrlContext, Middleware } from '@web3-storage/gateway-lib'
* @import { Environment } from './egress-tracker.types.js'
* @import { AccountingService } from '../bindings.js'
* @typedef {IpfsUrlContext & { ACCOUNTING_SERVICE?: AccountingService }} EgressTrackerContext
*/

/**
* The egress tracking handler must be enabled after the rate limiting handler,
* and before any handler that serves the response body. It uses the CID of the
* served content to record the egress in the accounting service, and it counts
* the bytes served with a TransformStream to determine the egress amount.
*
* @type {Middleware<EgressTrackerContext, EgressTrackerContext, Environment>}
*/
export function withEgressHandler (handler) {
return async (req, env, ctx) => {
if (env.FF_EGRESS_TRACKER_ENABLED !== 'true') {
return handler(req, env, ctx)
}

let response
try {
response = await handler(req, env, ctx)
} catch (error) {
console.error('Error in egress tracker handler:', error)
throw error
}

if (!response.ok || !response.body) {
return response
}

const { dataCid } = ctx
const accounting = ctx.ACCOUNTING_SERVICE ?? Accounting.create({
serviceURL: env.ACCOUNTING_SERVICE_URL
})

const { readable, writable } = createEgressPassThroughStream(ctx, accounting, dataCid)

try {
ctx.waitUntil(response.body.pipeTo(writable))
} catch (error) {
console.error('Error in egress tracker handler:', error)
// Original response in case of an error to avoid breaking the chain and serve the content
return response
}

return new Response(readable, {
status: response.status,
statusText: response.statusText,
headers: response.headers
})
}
}

/**
* Creates a TransformStream to count bytes served to the client.
* It records egress when the stream is finalized without an error.
*
* @param {import('@web3-storage/gateway-lib/middleware').Context} ctx - The context object.
* @param {AccountingService} accounting - The accounting service instance to record egress.
* @param {import('@web3-storage/gateway-lib/handlers').CID} dataCid - The CID of the served content.
* @returns {TransformStream} - The created TransformStream.
*/
function createEgressPassThroughStream (ctx, accounting, dataCid) {
let totalBytesServed = 0

return new TransformStream({
/**
* The start function is called when the stream is being initialized.
* It resets the total bytes served to 0.
*/
start () {
totalBytesServed = 0
},
/**
* The transform function is called for each chunk of the response body.
* It enqueues the chunk and updates the total bytes served.
* If an error occurs, it signals an error to the controller and logs it.
* The bytes are not counted in case of enqueuing an error.
* @param {Uint8Array} chunk
* @param {TransformStreamDefaultController} controller
*/
async transform (chunk, controller) {
try {
controller.enqueue(chunk)
totalBytesServed += chunk.byteLength
} catch (error) {
console.error('Error while counting egress bytes:', error)
controller.error(error)
}
},

/**
* The flush function is called when the stream is being finalized,
* which is when the response is being sent to the client.
* So before the response is sent, we record the egress.
* It is called only once and it triggers a non-blocking call to the accounting service.
* If an error occurs, the egress is not recorded.
* NOTE: The flush function is NOT called in case of an stream error.
*/
async flush (controller) {
try {
// Non-blocking call to the accounting service to record egress
if (totalBytesServed > 0) {
ctx.waitUntil(accounting.record(dataCid, totalBytesServed, new Date().toISOString()))
}
} catch (error) {
console.error('Error while recording egress:', error)
controller.error(error)
}
}
})
}
6 changes: 6 additions & 0 deletions src/handlers/egress-tracker.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib'

export interface Environment extends MiddlewareEnvironment {
ACCOUNTING_SERVICE_URL: string
FF_EGRESS_TRACKER_ENABLED: string
}
23 changes: 8 additions & 15 deletions src/handlers/rate-limiter.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import { Accounting } from '../services/accounting.js'
* @import { R2Bucket, KVNamespace, RateLimit } from '@cloudflare/workers-types'
* @import {
* Environment,
* TokenMetadata,
* RateLimitExceeded,
* RateLimitService,
* RateLimitExceeded
* TokenMetadata,
* } from './rate-limiter.types.js'
* @typedef {IpfsUrlContext & { ACCOUNTING_SERVICE?: import('../bindings.js').AccountingService }} RateLimiterContext
*/

/**
Expand All @@ -19,7 +20,7 @@ import { Accounting } from '../services/accounting.js'
* it can be enabled or disabled using the FF_RATE_LIMITER_ENABLED flag.
* Every successful request is recorded in the accounting service.
*
* @type {Middleware<IpfsUrlContext, IpfsUrlContext, Environment>}
* @type {Middleware<RateLimiterContext, RateLimiterContext, Environment>}
*/
export function withRateLimit (handler) {
return async (req, env, ctx) => {
Expand All @@ -32,20 +33,14 @@ export function withRateLimit (handler) {
const isRateLimitExceeded = await rateLimitService.check(dataCid, req)
if (isRateLimitExceeded === RATE_LIMIT_EXCEEDED.YES) {
throw new HttpError('Too Many Requests', { status: 429 })
} else {
const accounting = Accounting.create({
serviceURL: env.ACCOUNTING_SERVICE_URL
})
// NOTE: non-blocking call to the accounting service
ctx.waitUntil(accounting.record(dataCid, req))
return handler(req, env, ctx)
}
return handler(req, env, ctx)
}
}

/**
* @param {Environment} env
* @param {IpfsUrlContext} ctx
* @param {RateLimiterContext} ctx
* @returns {RateLimitService}
*/
function create (env, ctx) {
Expand Down Expand Up @@ -116,7 +111,7 @@ async function isRateLimited (rateLimitAPI, cid) {
/**
* @param {Environment} env
* @param {string} authToken
* @param {Context} ctx
* @param {RateLimiterContext} ctx
* @returns {Promise<TokenMetadata | null>}
*/
async function getTokenMetadata (env, authToken, ctx) {
Expand All @@ -127,9 +122,7 @@ async function getTokenMetadata (env, authToken, ctx) {
return decode(cachedValue)
}

const accounting = Accounting.create({
serviceURL: env.ACCOUNTING_SERVICE_URL
})
const accounting = ctx.ACCOUNTING_SERVICE ?? Accounting.create({ serviceURL: env.ACCOUNTING_SERVICE_URL })
const tokenMetadata = await accounting.getTokenMetadata(authToken)
if (tokenMetadata) {
// NOTE: non-blocking call to the auth token metadata cache
Expand Down
2 changes: 2 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
withCarBlockHandler
} from './middleware.js'
import { withRateLimit } from './handlers/rate-limiter.js'
import { withEgressHandler } from './handlers/egress-tracker.js'

/**
* @typedef {import('./bindings.js').Environment} Environment
Expand All @@ -43,6 +44,7 @@ export default {
withParsedIpfsUrl,
withRateLimit,
createWithHttpMethod('GET', 'HEAD'),
withEgressHandler,
withCarBlockHandler,
withContentClaimsDagula,
withFormatRawHandler,
Expand Down
4 changes: 2 additions & 2 deletions src/services/accounting.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
*/
export const Accounting = {
create: ({ serviceURL }) => ({
record: async (cid, options) => {
console.log(`using ${serviceURL} to record a GET for ${cid} with options`, options)
record: async (cid, bytes, servedAt) => {
console.log(`using ${serviceURL} to record egress for ${cid} with total bytes: ${bytes} and servedAt: ${servedAt}`)
},

getTokenMetadata: async () => {
Expand Down
8 changes: 6 additions & 2 deletions test/fixtures/worker-fixture.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const __dirname = path.dirname(__filename)
*/
const wranglerEnv = process.env.WRANGLER_ENV || 'integration'

const DEBUG = process.env.DEBUG === 'true' || false

/**
* Worker information object
* @typedef {Object} WorkerInfo
Expand Down Expand Up @@ -41,7 +43,7 @@ export const mochaGlobalSetup = async () => {
)
console.log(`Output: ${await workerInfo.getOutput()}`)
console.log('WorkerInfo:', workerInfo)
console.log('Test worker started!')
console.log(`Test worker started! ENV: ${wranglerEnv}, DEBUG: ${DEBUG}`)
} catch (error) {
console.error('Failed to start test worker:', error)
throw error
Expand All @@ -59,7 +61,9 @@ export const mochaGlobalTeardown = async () => {
try {
const { stop } = workerInfo
await stop?.()
// console.log('getOutput', getOutput()) // uncomment for debugging
if (DEBUG) {
console.log('getOutput', await workerInfo.getOutput())
}
console.log('Test worker stopped!')
} catch (error) {
console.error('Failed to stop test worker:', error)
Expand Down
Loading

0 comments on commit 232c5fc

Please sign in to comment.