Skip to content

Commit b99bf75

Browse files
committed
feat: count egress bytes
1 parent d097a91 commit b99bf75

File tree

10 files changed

+619
-27
lines changed

10 files changed

+619
-27
lines changed

src/bindings.d.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
1-
import { CID } from '@web3-storage/gateway-lib/handlers'
21
import { Environment as RateLimiterEnvironment } from './handlers/rate-limiter.types.ts'
32
import { Environment as CarBlockEnvironment } from './handlers/car-block.types.ts'
3+
import { Environment as EgressTrackerEnvironment } from './handlers/egress-tracker.types.ts'
4+
import { UnknownLink } from 'multiformats'
45

5-
export interface Environment extends CarBlockEnvironment, RateLimiterEnvironment {
6+
export interface Environment extends CarBlockEnvironment, RateLimiterEnvironment, EgressTrackerEnvironment {
67
VERSION: string
78
CONTENT_CLAIMS_SERVICE_URL?: string
89
}
910

1011
export interface AccountingService {
11-
record: (cid: CID, options: GetCIDRequestConfig) => Promise<void>
12+
record: (resource: UnknownLink, bytes: number, servedAt: string) => Promise<void>
1213
getTokenMetadata: (token: string) => Promise<TokenMetadata | null>
1314
}
1415

1516
export interface Accounting {
16-
create: ({ serviceURL }: { serviceURL?: string }) => AccountingService
17+
create: ({ serviceURL }: { serviceURL: string }) => AccountingService
1718
}
1819

src/handlers/egress-tracker.js

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import { Accounting } from '../services/accounting.js'
2+
3+
/**
4+
* @import { Context, IpfsUrlContext, Middleware } from '@web3-storage/gateway-lib'
5+
* @import { Environment } from './egress-tracker.types.js'
6+
* @import { AccountingService } from '../bindings.js'
7+
* @typedef {IpfsUrlContext & { ACCOUNTING_SERVICE?: AccountingService }} EgressTrackerContext
8+
*/
9+
10+
/**
11+
* The egress tracking handler must be enabled after the rate limiting handler,
12+
* and before any handler that serves the response body. It uses the CID of the
13+
* served content to record the egress in the accounting service, and it counts
14+
* the bytes served with a TransformStream to determine the egress amount.
15+
*
16+
* @type {Middleware<EgressTrackerContext, EgressTrackerContext, Environment>}
17+
*/
18+
export function withEgressHandler (handler) {
19+
return async (req, env, ctx) => {
20+
if (env.FF_EGRESS_TRACKER_ENABLED !== 'true') {
21+
return handler(req, env, ctx)
22+
}
23+
24+
let response
25+
try {
26+
response = await handler(req, env, ctx)
27+
} catch (error) {
28+
console.error('Error in egress tracker handler:', error)
29+
throw error
30+
}
31+
32+
if (!response.ok || !response.body) {
33+
return response
34+
}
35+
36+
const { dataCid } = ctx
37+
const accounting = ctx.ACCOUNTING_SERVICE ?? Accounting.create({
38+
serviceURL: env.ACCOUNTING_SERVICE_URL
39+
})
40+
41+
const { readable, writable } = createEgressPassThroughStream(ctx, accounting, dataCid)
42+
43+
try {
44+
ctx.waitUntil(response.body.pipeTo(writable))
45+
} catch (error) {
46+
console.error('Error in egress tracker handler:', error)
47+
// Original response in case of an error to avoid breaking the chain and serve the content
48+
return response
49+
}
50+
51+
return new Response(readable, {
52+
status: response.status,
53+
statusText: response.statusText,
54+
headers: response.headers
55+
})
56+
}
57+
}
58+
59+
/**
60+
* Creates a TransformStream to count bytes served to the client.
61+
* It records egress when the stream is finalized without an error.
62+
*
63+
* @param {import('@web3-storage/gateway-lib/middleware').Context} ctx - The context object.
64+
* @param {AccountingService} accounting - The accounting service instance to record egress.
65+
* @param {import('@web3-storage/gateway-lib/handlers').CID} dataCid - The CID of the served content.
66+
* @returns {TransformStream} - The created TransformStream.
67+
*/
68+
function createEgressPassThroughStream (ctx, accounting, dataCid) {
69+
let totalBytesServed = 0
70+
71+
return new TransformStream({
72+
/**
73+
* The start function is called when the stream is being initialized.
74+
* It resets the total bytes served to 0.
75+
*/
76+
start () {
77+
totalBytesServed = 0
78+
},
79+
/**
80+
* The transform function is called for each chunk of the response body.
81+
* It enqueues the chunk and updates the total bytes served.
82+
* If an error occurs, it signals an error to the controller and logs it.
83+
* The bytes are not counted in case of enqueuing an error.
84+
* @param {Uint8Array} chunk
85+
* @param {TransformStreamDefaultController} controller
86+
*/
87+
async transform (chunk, controller) {
88+
try {
89+
controller.enqueue(chunk)
90+
totalBytesServed += chunk.byteLength
91+
} catch (error) {
92+
console.error('Error while counting egress bytes:', error)
93+
controller.error(error)
94+
}
95+
},
96+
97+
/**
98+
* The flush function is called when the stream is being finalized,
99+
* which is when the response is being sent to the client.
100+
* So before the response is sent, we record the egress.
101+
* It is called only once and it triggers a non-blocking call to the accounting service.
102+
* If an error occurs, the egress is not recorded.
103+
* NOTE: The flush function is NOT called in case of an stream error.
104+
*/
105+
async flush (controller) {
106+
try {
107+
// Non-blocking call to the accounting service to record egress
108+
if (totalBytesServed > 0) {
109+
ctx.waitUntil(accounting.record(dataCid, totalBytesServed, new Date().toISOString()))
110+
}
111+
} catch (error) {
112+
console.error('Error while recording egress:', error)
113+
controller.error(error)
114+
}
115+
}
116+
})
117+
}

src/handlers/egress-tracker.types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import { Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib'
2+
3+
export interface Environment extends MiddlewareEnvironment {
4+
ACCOUNTING_SERVICE_URL: string
5+
FF_EGRESS_TRACKER_ENABLED: string
6+
}

src/handlers/rate-limiter.js

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ import { Accounting } from '../services/accounting.js'
77
* @import { R2Bucket, KVNamespace, RateLimit } from '@cloudflare/workers-types'
88
* @import {
99
* Environment,
10-
* TokenMetadata,
10+
* RateLimitExceeded,
1111
* RateLimitService,
12-
* RateLimitExceeded
12+
* TokenMetadata,
1313
* } from './rate-limiter.types.js'
14+
* @typedef {IpfsUrlContext & { ACCOUNTING_SERVICE?: import('../bindings.js').AccountingService }} RateLimiterContext
1415
*/
1516

1617
/**
@@ -19,7 +20,7 @@ import { Accounting } from '../services/accounting.js'
1920
* it can be enabled or disabled using the FF_RATE_LIMITER_ENABLED flag.
2021
* Every successful request is recorded in the accounting service.
2122
*
22-
* @type {Middleware<IpfsUrlContext, IpfsUrlContext, Environment>}
23+
* @type {Middleware<RateLimiterContext, RateLimiterContext, Environment>}
2324
*/
2425
export function withRateLimit (handler) {
2526
return async (req, env, ctx) => {
@@ -32,20 +33,14 @@ export function withRateLimit (handler) {
3233
const isRateLimitExceeded = await rateLimitService.check(dataCid, req)
3334
if (isRateLimitExceeded === RATE_LIMIT_EXCEEDED.YES) {
3435
throw new HttpError('Too Many Requests', { status: 429 })
35-
} else {
36-
const accounting = Accounting.create({
37-
serviceURL: env.ACCOUNTING_SERVICE_URL
38-
})
39-
// NOTE: non-blocking call to the accounting service
40-
ctx.waitUntil(accounting.record(dataCid, req))
41-
return handler(req, env, ctx)
4236
}
37+
return handler(req, env, ctx)
4338
}
4439
}
4540

4641
/**
4742
* @param {Environment} env
48-
* @param {IpfsUrlContext} ctx
43+
* @param {RateLimiterContext} ctx
4944
* @returns {RateLimitService}
5045
*/
5146
function create (env, ctx) {
@@ -116,7 +111,7 @@ async function isRateLimited (rateLimitAPI, cid) {
116111
/**
117112
* @param {Environment} env
118113
* @param {string} authToken
119-
* @param {Context} ctx
114+
* @param {RateLimiterContext} ctx
120115
* @returns {Promise<TokenMetadata | null>}
121116
*/
122117
async function getTokenMetadata (env, authToken, ctx) {
@@ -127,9 +122,7 @@ async function getTokenMetadata (env, authToken, ctx) {
127122
return decode(cachedValue)
128123
}
129124

130-
const accounting = Accounting.create({
131-
serviceURL: env.ACCOUNTING_SERVICE_URL
132-
})
125+
const accounting = ctx.ACCOUNTING_SERVICE ?? Accounting.create({ serviceURL: env.ACCOUNTING_SERVICE_URL })
133126
const tokenMetadata = await accounting.getTokenMetadata(authToken)
134127
if (tokenMetadata) {
135128
// NOTE: non-blocking call to the auth token metadata cache

src/index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
withCarBlockHandler
2222
} from './middleware.js'
2323
import { withRateLimit } from './handlers/rate-limiter.js'
24+
import { withEgressHandler } from './handlers/egress-tracker.js'
2425

2526
/**
2627
* @typedef {import('./bindings.js').Environment} Environment
@@ -43,6 +44,7 @@ export default {
4344
withParsedIpfsUrl,
4445
withRateLimit,
4546
createWithHttpMethod('GET', 'HEAD'),
47+
withEgressHandler,
4648
withCarBlockHandler,
4749
withContentClaimsDagula,
4850
withFormatRawHandler,

src/services/accounting.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
*/
44
export const Accounting = {
55
create: ({ serviceURL }) => ({
6-
record: async (cid, options) => {
7-
console.log(`using ${serviceURL} to record a GET for ${cid} with options`, options)
6+
record: async (cid, bytes, servedAt) => {
7+
console.log(`using ${serviceURL} to record egress for ${cid} with total bytes: ${bytes} and servedAt: ${servedAt}`)
88
},
99

1010
getTokenMetadata: async () => {

test/fixtures/worker-fixture.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ const __dirname = path.dirname(__filename)
1212
*/
1313
const wranglerEnv = process.env.WRANGLER_ENV || 'integration'
1414

15+
const DEBUG = process.env.DEBUG === 'true' || false
16+
1517
/**
1618
* Worker information object
1719
* @typedef {Object} WorkerInfo
@@ -41,7 +43,7 @@ export const mochaGlobalSetup = async () => {
4143
)
4244
console.log(`Output: ${await workerInfo.getOutput()}`)
4345
console.log('WorkerInfo:', workerInfo)
44-
console.log('Test worker started!')
46+
console.log(`Test worker started! ENV: ${wranglerEnv}, DEBUG: ${DEBUG}`)
4547
} catch (error) {
4648
console.error('Failed to start test worker:', error)
4749
throw error
@@ -59,7 +61,9 @@ export const mochaGlobalTeardown = async () => {
5961
try {
6062
const { stop } = workerInfo
6163
await stop?.()
62-
// console.log('getOutput', getOutput()) // uncomment for debugging
64+
if (DEBUG) {
65+
console.log('getOutput', await workerInfo.getOutput())
66+
}
6367
console.log('Test worker stopped!')
6468
} catch (error) {
6569
console.error('Failed to stop test worker:', error)

0 commit comments

Comments
 (0)