Skip to content

Commit 4036086

Browse files
committed
feat: count egress bytes
1 parent d097a91 commit 4036086

File tree

11 files changed

+542
-17
lines changed

11 files changed

+542
-17
lines changed

src/bindings.d.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
import { CID } from '@web3-storage/gateway-lib/handlers'
21
import { Environment as RateLimiterEnvironment } from './handlers/rate-limiter.types.ts'
32
import { Environment as CarBlockEnvironment } from './handlers/car-block.types.ts'
3+
import { Environment as EgressTrackerEnvironment } from './handlers/egress-tracker.types.ts'
4+
import { UnknownLink } from 'multiformats'
45

5-
export interface Environment extends CarBlockEnvironment, RateLimiterEnvironment {
6+
export interface Environment extends CarBlockEnvironment, RateLimiterEnvironment, EgressTrackerEnvironment {
67
VERSION: string
78
CONTENT_CLAIMS_SERVICE_URL?: string
89
}
910

1011
export interface AccountingService {
11-
record: (cid: CID, options: GetCIDRequestConfig) => Promise<void>
12+
record: (resource: UnknownLink, bytes: number, servedAt: string) => Promise<void>
1213
getTokenMetadata: (token: string) => Promise<TokenMetadata | null>
1314
}
1415

src/handlers/egress-tracker.js

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import { CID } from 'multiformats'
2+
import { Accounting } from '../services/accounting.js'
3+
/**
4+
* @import { Context, IpfsUrlContext, Middleware } from '@web3-storage/gateway-lib'
5+
* @import { Environment } from './egress-tracker.types.js'
6+
*/
7+
8+
/**
9+
* The egress tracking handler must be enabled after the rate limiting handler,
10+
* and before any handler that serves the response body. It uses the CID of the
11+
* served content to record the egress in the accounting service, and it counts
12+
* the bytes served with a TransformStream to determine the egress amount.
13+
*
14+
* @type {Middleware<IpfsUrlContext, IpfsUrlContext, Environment>}
15+
*/
16+
export function withEgressHandler (handler) {
17+
return async (req, env, ctx) => {
18+
const egressTrackerEnabled = env.FF_EGRESS_TRACKER_ENABLED === 'true'
19+
if (!egressTrackerEnabled) {
20+
return handler(req, env, ctx)
21+
}
22+
23+
let response
24+
try {
25+
response = await handler(req, env, ctx)
26+
} catch (error) {
27+
console.error('Error in egress tracker handler:', error)
28+
throw error
29+
}
30+
31+
if (!response.ok || !response.body) {
32+
return response
33+
}
34+
35+
const { dataCid } = ctx
36+
const accounting = Accounting.create({
37+
serviceURL: env.ACCOUNTING_SERVICE_URL
38+
})
39+
40+
const { readable, writable } = createEgressPassThroughStream(ctx, accounting, dataCid)
41+
42+
try {
43+
ctx.waitUntil(response.body.pipeTo(writable))
44+
} catch (error) {
45+
console.error('Error in egress tracker handler:', error)
46+
// Original response in case of an error to avoid breaking the chain and serve the content
47+
return response
48+
}
49+
50+
return new Response(readable, {
51+
status: response.status,
52+
statusText: response.statusText,
53+
headers: response.headers
54+
})
55+
}
56+
}
57+
58+
/**
59+
* Creates a TransformStream to count bytes served to the client.
60+
* It records egress when the stream is finalized without an error.
61+
*
62+
* @param {Context} ctx - The context object.
63+
* @param {import('../bindings.js').AccountingService} accounting - The accounting service instance to record egress.
64+
* @param {CID} dataCid - The CID of the served content.
65+
* @returns {TransformStream} - The created TransformStream.
66+
*/
67+
function createEgressPassThroughStream (ctx, accounting, dataCid) {
68+
let totalBytesServed = 0
69+
70+
return new TransformStream({
71+
/**
72+
* The start function is called when the stream is being initialized.
73+
* It resets the total bytes served to 0.
74+
*/
75+
start () {
76+
totalBytesServed = 0
77+
},
78+
/**
79+
* The transform function is called for each chunk of the response body.
80+
* It enqueues the chunk and updates the total bytes served.
81+
* If an error occurs, it signals an error to the controller and logs it.
82+
* The bytes are not counted in case of enqueuing an error.
83+
* @param {Uint8Array} chunk
84+
* @param {TransformStreamDefaultController} controller
85+
*/
86+
async transform (chunk, controller) {
87+
try {
88+
controller.enqueue(chunk)
89+
totalBytesServed += chunk.byteLength
90+
} catch (error) {
91+
controller.error(error)
92+
}
93+
},
94+
95+
/**
96+
* The flush function is called when the stream is being finalized,
97+
* which is when the response is being sent to the client.
98+
* So before the response is sent, we record the egress.
99+
* It is called only once and it triggers a non-blocking call to the accounting service.
100+
* If an error occurs, the egress is not recorded.
101+
* NOTE: The flush function is NOT called in case of an stream error.
102+
*/
103+
async flush (controller) {
104+
try {
105+
// Non-blocking call to the accounting service to record egress
106+
ctx.waitUntil(accounting.record(dataCid, totalBytesServed, new Date().toISOString()))
107+
} catch (error) {
108+
controller.error(error)
109+
}
110+
}
111+
})
112+
}

src/handlers/egress-tracker.types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import { Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib'
2+
3+
export interface Environment extends MiddlewareEnvironment {
4+
ACCOUNTING_SERVICE_URL: string
5+
FF_EGRESS_TRACKER_ENABLED: string
6+
}

src/handlers/rate-limiter.js

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ import { Accounting } from '../services/accounting.js'
77
* @import { R2Bucket, KVNamespace, RateLimit } from '@cloudflare/workers-types'
88
* @import {
99
* Environment,
10-
* TokenMetadata,
10+
* RateLimitExceeded,
1111
* RateLimitService,
12-
* RateLimitExceeded
12+
* TokenMetadata,
1313
* } from './rate-limiter.types.js'
1414
*/
1515

@@ -32,14 +32,8 @@ export function withRateLimit (handler) {
3232
const isRateLimitExceeded = await rateLimitService.check(dataCid, req)
3333
if (isRateLimitExceeded === RATE_LIMIT_EXCEEDED.YES) {
3434
throw new HttpError('Too Many Requests', { status: 429 })
35-
} else {
36-
const accounting = Accounting.create({
37-
serviceURL: env.ACCOUNTING_SERVICE_URL
38-
})
39-
// NOTE: non-blocking call to the accounting service
40-
ctx.waitUntil(accounting.record(dataCid, req))
41-
return handler(req, env, ctx)
4235
}
36+
return handler(req, env, ctx)
4337
}
4438
}
4539

src/handlers/rate-limiter.types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export interface Environment extends MiddlewareEnvironment {
88
RATE_LIMITER: RateLimit
99
AUTH_TOKEN_METADATA: KVNamespace
1010
FF_RATE_LIMITER_ENABLED: string
11+
FF_EGRESS_TRACKER_ENABLED: string
1112
}
1213

1314
export interface TokenMetadata {

src/index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
withCarBlockHandler
2222
} from './middleware.js'
2323
import { withRateLimit } from './handlers/rate-limiter.js'
24+
import { withEgressHandler } from './handlers/egress-tracker.js'
2425

2526
/**
2627
* @typedef {import('./bindings.js').Environment} Environment
@@ -43,6 +44,7 @@ export default {
4344
withParsedIpfsUrl,
4445
withRateLimit,
4546
createWithHttpMethod('GET', 'HEAD'),
47+
withEgressHandler,
4648
withCarBlockHandler,
4749
withContentClaimsDagula,
4850
withFormatRawHandler,

src/services/accounting.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
*/
44
export const Accounting = {
55
create: ({ serviceURL }) => ({
6-
record: async (cid, options) => {
7-
console.log(`using ${serviceURL} to record a GET for ${cid} with options`, options)
6+
record: async (cid, bytes, servedAt) => {
7+
console.log(`using ${serviceURL} to record egress for ${cid} with total bytes: ${bytes} and servedAt: ${servedAt}`)
88
},
99

1010
getTokenMetadata: async () => {

test/fixtures/worker-fixture.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ const __dirname = path.dirname(__filename)
1212
*/
1313
const wranglerEnv = process.env.WRANGLER_ENV || 'integration'
1414

15+
const DEBUG = process.env.DEBUG === 'true' || false
16+
1517
/**
1618
* Worker information object
1719
* @typedef {Object} WorkerInfo
@@ -41,7 +43,7 @@ export const mochaGlobalSetup = async () => {
4143
)
4244
console.log(`Output: ${await workerInfo.getOutput()}`)
4345
console.log('WorkerInfo:', workerInfo)
44-
console.log('Test worker started!')
46+
console.log(`Test worker started! ENV: ${wranglerEnv}, DEBUG: ${DEBUG}`)
4547
} catch (error) {
4648
console.error('Failed to start test worker:', error)
4749
throw error
@@ -59,7 +61,9 @@ export const mochaGlobalTeardown = async () => {
5961
try {
6062
const { stop } = workerInfo
6163
await stop?.()
62-
// console.log('getOutput', getOutput()) // uncomment for debugging
64+
if (DEBUG) {
65+
console.log('getOutput', await workerInfo.getOutput())
66+
}
6367
console.log('Test worker stopped!')
6468
} catch (error) {
6569
console.error('Failed to stop test worker:', error)
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import { expect } from 'chai'
2+
import { describe, it } from 'mocha'
3+
import fetch from 'node-fetch'
4+
import { getWorkerInfo } from '../fixtures/worker-fixture.js'
5+
import Sinon from 'sinon'
6+
import { Accounting } from '../../src/services/accounting.js'
7+
8+
describe('Egress Handler', () => {
9+
/**
10+
* See https://bafybeibv7vzycdcnydl5n5lbws6lul2omkm6a6b5wmqt77sicrwnhesy7y.ipfs.w3s.link
11+
*/
12+
const cid = 'bafybeibv7vzycdcnydl5n5lbws6lul2omkm6a6b5wmqt77sicrwnhesy7y'
13+
14+
it('should track egress bytes for a successful request', async () => {
15+
const { ip, port } = getWorkerInfo()
16+
const expectedTotalBytes = 5806
17+
18+
const recordSpy = Sinon.spy((cid, bytes, servedAt) => {
19+
console.log(`[mock] record called with cid: ${cid}, bytes: ${bytes}, servedAt: ${servedAt}`)
20+
return Promise.resolve()
21+
})
22+
const createStub = Sinon.stub(Accounting, 'create')
23+
.callsFake(({ serviceURL }) => {
24+
console.log(`[mock] create called with serviceURL: ${serviceURL}`)
25+
return {
26+
record: recordSpy,
27+
getTokenMetadata: Sinon.stub().returns(null)
28+
}
29+
})
30+
31+
const response = await fetch(`http://${ip}:${port}/ipfs/${cid}`)
32+
expect(response.status).to.equal(200)
33+
34+
const body = await response.text()
35+
36+
expect(Buffer.byteLength(body)).to.be.equal(expectedTotalBytes, 'total bytes should be ')
37+
expect(recordSpy.calledOnce, 'record should be called once').to.be.true
38+
expect(recordSpy.args[0][0], 'first argument should be the cid').to.equal(cid)
39+
expect(recordSpy.args[0][1], 'second argument should be the total bytes').to.equal(expectedTotalBytes)
40+
createStub.restore()
41+
}).timeout(10_000)
42+
43+
// it('should not record egress for a failed request', async () => {
44+
// const { ip, port } = getWorkerInfo()
45+
// const initialEgress = await Accounting.getEgress(cid)
46+
47+
// // Simulate a failed request by querying a non-existing CID
48+
// const response = await fetch(`http://${ip}:${port}/ipfs/nonexistent-cid`)
49+
// expect(response.status).to.not.equal(200)
50+
51+
// const finalEgress = await Accounting.getEgress(cid)
52+
53+
// // Egress should remain unchanged since the request failed
54+
// expect(finalEgress).to.equal(initialEgress)
55+
// }).timeout(10_000)
56+
57+
// it('should record partial egress when the connection is interrupted', async () => {
58+
// const { ip, port } = getWorkerInfo()
59+
// const initialEgress = await Accounting.getEgress(cid)
60+
61+
// const controller = new AbortController()
62+
// const timeout = setTimeout(() => controller.abort(), 100) // Abort after 100ms to simulate an interrupted connection
63+
64+
// try {
65+
// await fetch(`http://${ip}:${port}/ipfs/${cid}`, { signal: controller.signal })
66+
// } catch (error) {
67+
// expect(error.name).to.equal('AbortError')
68+
// } finally {
69+
// clearTimeout(timeout)
70+
// }
71+
72+
// const finalEgress = await Accounting.getEgress(cid)
73+
// expect(finalEgress).to.be.greaterThan(initialEgress) // Some bytes should have been counted before abort
74+
// }).timeout(10_000)
75+
})

0 commit comments

Comments
 (0)