Skip to content

Commit

Permalink
feat: egress tracker middleware (#120)
Browse files Browse the repository at this point in the history
### Egress Tracker Middleware

**Summary:**
This PR introduces an egress tracking middleware for the Freeway
project, enabling accurate measurement and recording of egress data for
served content. The middleware tracks the bytes sent in each response
body and logs them with the accounting service tied to each content ID
(CID).

**Key Changes:**
- **Egress Middleware (`withEgressHandler`)**: 
  - Wraps response handlers to track and count bytes sent to the client.
- Controlled by the `FF_EGRESS_TRACKER_ENABLED` feature flag, enabling
or disabling egress tracking as needed. It is disabled by default.

- **Accounting Service Integration**:
- Logs egress data with the accounting service, using either an
`ACCOUNTING_SERVICE` from the context or a new instance based on the
`ACCOUNTING_SERVICE_URL` environment variable.
- Egress data is linked to the CID of the served content, ensuring
precise tracking. (The actual accounting service implementation,
integrating `w3up-client` for the new `usage/record` capability, will
follow in a separate PR.)

- **Efficient Byte Counting via `TransformStream`**:
- Utilizes a `TransformStream` (`createEgressPassThroughStream`) to
passively count bytes in the response body without altering content.
- On stream completion, the `flush` method records total egress to the
accounting service using `ctx.waitUntil()` for non-blocking calls.

**Error Handling**:
- Logs errors encountered during data streaming and halts byte counting
without interrupting the original response chain. This ensures
resilience even in cases of partial or interrupted streams.

**Testing**:
- Added thorough tests to validate egress recording across scenarios,
including complete responses, interrupted streams, and error cases.

**Next Steps**:
- Integration tests for verifying egress tracking accuracy and
accounting service interactions in various streaming conditions (planned
for a future PR).
- `w3up-client` integration to execute the new `usage/record` capability
in subsequent development.
  • Loading branch information
fforbeck authored Oct 30, 2024
1 parent 4796ed3 commit 847829b
Show file tree
Hide file tree
Showing 10 changed files with 596 additions and 23 deletions.
12 changes: 8 additions & 4 deletions src/bindings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@ import { CID } from '@web3-storage/gateway-lib/handlers'
import { Environment as RateLimiterEnvironment } from './middleware/withRateLimit.types.ts'
import { Environment as CarBlockEnvironment } from './middleware/withCarBlockHandler.types.ts'
import { Environment as ContentClaimsDagulaEnvironment } from './middleware/withCarBlockHandler.types.ts'

import { Environment as EgressTrackerEnvironment } from './middleware/withEgressTracker.types.ts'
import { UnknownLink } from 'multiformats'
export interface Environment
extends CarBlockEnvironment,
RateLimiterEnvironment,
ContentClaimsDagulaEnvironment {
ContentClaimsDagulaEnvironment,
EgressTrackerEnvironment {
VERSION: string
CONTENT_CLAIMS_SERVICE_URL?: string
ACCOUNTING_SERVICE_URL: string
}

export interface AccountingService {
record: (cid: CID, options: GetCIDRequestConfig) => Promise<void>
record: (resource: UnknownLink, bytes: number, servedAt: string) => Promise<void>
getTokenMetadata: (token: string) => Promise<TokenMetadata | null>
}

export interface Accounting {
create: ({ serviceURL }: { serviceURL?: string }) => AccountingService
create: ({ serviceURL }: { serviceURL: string }) => AccountingService
}
6 changes: 5 additions & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import {
withCarBlockHandler,
withRateLimit,
withNotFound,
withLocator
withLocator,
withEgressTracker
} from './middleware/index.js'

/**
Expand Down Expand Up @@ -57,6 +58,9 @@ export default {
// Rate-limit requests
withRateLimit,

// Track egress bytes
withEgressTracker,

// Fetch data
withCarBlockHandler,
withNotFound,
Expand Down
1 change: 1 addition & 0 deletions src/middleware/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export { withRateLimit } from './withRateLimit.js'
export { withVersionHeader } from './withVersionHeader.js'
export { withNotFound } from './withNotFound.js'
export { withLocator } from './withLocator.js'
export { withEgressTracker } from './withEgressTracker.js'
92 changes: 92 additions & 0 deletions src/middleware/withEgressTracker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { Accounting } from '../services/accounting.js'

/**
* @import { Context, IpfsUrlContext, Middleware } from '@web3-storage/gateway-lib'
* @import { Environment } from './withEgressTracker.types.js'
* @import { AccountingService } from '../bindings.js'
* @typedef {IpfsUrlContext & { ACCOUNTING_SERVICE?: AccountingService }} EgressTrackerContext
*/

/**
* The egress tracking handler must be enabled after the rate limiting handler,
* and before any handler that serves the response body. It uses the CID of the
* served content to record the egress in the accounting service, and it counts
* the bytes served with a TransformStream to determine the egress amount.
*
* @type {Middleware<EgressTrackerContext, EgressTrackerContext, Environment>}
*/
export function withEgressTracker (handler) {
return async (req, env, ctx) => {
if (env.FF_EGRESS_TRACKER_ENABLED !== 'true') {
return handler(req, env, ctx)
}

const response = await handler(req, env, ctx)
if (!response.ok || !response.body) {
return response
}

const { dataCid } = ctx
const accounting = ctx.ACCOUNTING_SERVICE ?? Accounting.create({
serviceURL: env.ACCOUNTING_SERVICE_URL
})

const responseBody = response.body.pipeThrough(
createByteCountStream((totalBytesServed) => {
// Non-blocking call to the accounting service to record egress
if (totalBytesServed > 0) {
ctx.waitUntil(
accounting.record(dataCid, totalBytesServed, new Date().toISOString())
)
}
})
)

return new Response(responseBody, {
status: response.status,
statusText: response.statusText,
headers: response.headers
})
}
}

/**
* Creates a TransformStream to count bytes served to the client.
* It records egress when the stream is finalized without an error.
*
* @param {(totalBytesServed: number) => void} onClose
* @template {Uint8Array} T
* @returns {TransformStream<T, T>} - The created TransformStream.
*/
function createByteCountStream (onClose) {
let totalBytesServed = 0

return new TransformStream({
/**
* The transform function is called for each chunk of the response body.
* It enqueues the chunk and updates the total bytes served.
* If an error occurs, it signals an error to the controller and logs it.
* The bytes are not counted in case of enqueuing an error.
*/
async transform (chunk, controller) {
try {
controller.enqueue(chunk)
totalBytesServed += chunk.byteLength
} catch (error) {
console.error('Error while counting bytes:', error)
controller.error(error)
}
},

/**
* The flush function is called when the stream is being finalized,
* which is when the response is being sent to the client.
* So before the response is sent, we record the egress using the callback.
* If an error occurs, the egress is not recorded.
* NOTE: The flush function is NOT called in case of a stream error.
*/
async flush () {
onClose(totalBytesServed)
}
})
}
6 changes: 6 additions & 0 deletions src/middleware/withEgressTracker.types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib'

export interface Environment extends MiddlewareEnvironment {
ACCOUNTING_SERVICE_URL: string
FF_EGRESS_TRACKER_ENABLED: string
}
19 changes: 6 additions & 13 deletions src/middleware/withRateLimit.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Accounting } from '../services/accounting.js'
* RateLimitService,
* RateLimitExceeded
* } from './withRateLimit.types.js'
* @typedef {Context & { ACCOUNTING_SERVICE?: import('../bindings.js').AccountingService }} RateLimiterContext
*/

/**
Expand All @@ -20,7 +21,7 @@ import { Accounting } from '../services/accounting.js'
* it can be enabled or disabled using the FF_RATE_LIMITER_ENABLED flag.
* Every successful request is recorded in the accounting service.
*
* @type {Middleware<Context, Context, Environment>}
* @type {Middleware<RateLimiterContext, RateLimiterContext, Environment>}
*/
export function withRateLimit (handler) {
return async (req, env, ctx) => {
Expand All @@ -33,20 +34,14 @@ export function withRateLimit (handler) {
const isRateLimitExceeded = await rateLimitService.check(dataCid, req)
if (isRateLimitExceeded === RATE_LIMIT_EXCEEDED.YES) {
throw new HttpError('Too Many Requests', { status: 429 })
} else {
const accounting = Accounting.create({
serviceURL: env.ACCOUNTING_SERVICE_URL
})
// NOTE: non-blocking call to the accounting service
ctx.waitUntil(accounting.record(dataCid, req))
return handler(req, env, ctx)
}
return handler(req, env, ctx)
}
}

/**
* @param {Environment} env
* @param {Context} ctx
* @param {RateLimiterContext} ctx
* @returns {RateLimitService}
*/
function create (env, ctx) {
Expand Down Expand Up @@ -105,7 +100,7 @@ async function isRateLimited (rateLimitAPI, cid) {
/**
* @param {Environment} env
* @param {string} authToken
* @param {Context} ctx
* @param {RateLimiterContext} ctx
* @returns {Promise<TokenMetadata | null>}
*/
async function getTokenMetadata (env, authToken, ctx) {
Expand All @@ -116,9 +111,7 @@ async function getTokenMetadata (env, authToken, ctx) {
return decode(cachedValue)
}

const accounting = Accounting.create({
serviceURL: env.ACCOUNTING_SERVICE_URL
})
const accounting = ctx.ACCOUNTING_SERVICE ?? Accounting.create({ serviceURL: env.ACCOUNTING_SERVICE_URL })
const tokenMetadata = await accounting.getTokenMetadata(authToken)
if (tokenMetadata) {
// NOTE: non-blocking call to the auth token metadata cache
Expand Down
4 changes: 2 additions & 2 deletions src/services/accounting.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
*/
export const Accounting = {
create: ({ serviceURL }) => ({
record: async (cid, options) => {
console.log(`using ${serviceURL} to record a GET for ${cid} with options`, options)
record: async (cid, bytes, servedAt) => {
console.log(`using ${serviceURL} to record egress for ${cid} with total bytes: ${bytes} and servedAt: ${servedAt}`)
},

getTokenMetadata: async () => {
Expand Down
8 changes: 6 additions & 2 deletions test/fixtures/worker-fixture.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const __dirname = path.dirname(__filename)
*/
const wranglerEnv = process.env.WRANGLER_ENV || 'integration'

const DEBUG = process.env.DEBUG === 'true'

/**
* Worker information object
* @typedef {Object} WorkerInfo
Expand Down Expand Up @@ -41,7 +43,7 @@ export const mochaGlobalSetup = async () => {
)
console.log(`Output: ${await workerInfo.getOutput()}`)
console.log('WorkerInfo:', workerInfo)
console.log('Test worker started!')
console.log(`Test worker started! ENV: ${wranglerEnv}, DEBUG: ${DEBUG}`)
} catch (error) {
console.error('Failed to start test worker:', error)
throw error
Expand All @@ -59,7 +61,9 @@ export const mochaGlobalTeardown = async () => {
try {
const { stop } = workerInfo
await stop?.()
// console.log('getOutput', getOutput()) // uncomment for debugging
if (DEBUG) {
console.log('getOutput', await workerInfo.getOutput())
}
console.log('Test worker stopped!')
} catch (error) {
console.error('Failed to stop test worker:', error)
Expand Down
Loading

0 comments on commit 847829b

Please sign in to comment.