Skip to content

Commit

Permalink
reviewer suggestions implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
fforbeck committed Oct 30, 2024
1 parent b6fffc0 commit 25982f4
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 71 deletions.
2 changes: 1 addition & 1 deletion src/middleware/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ export { withRateLimit } from './withRateLimit.js'
export { withVersionHeader } from './withVersionHeader.js'
export { withNotFound } from './withNotFound.js'
export { withLocator } from './withLocator.js'
export { withEgressTracker } from './withEgressTracker.js'
export { withEgressTracker } from './withEgressTracker.js'
67 changes: 21 additions & 46 deletions src/middleware/withEgressTracker.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,7 @@ export function withEgressTracker (handler) {
return handler(req, env, ctx)
}

let response
try {
response = await handler(req, env, ctx)
} catch (error) {
console.error('Error in egress tracker handler:', error)
throw error
}

const response = await handler(req, env, ctx)
if (!response.ok || !response.body) {
return response
}
Expand All @@ -38,17 +31,18 @@ export function withEgressTracker (handler) {
serviceURL: env.ACCOUNTING_SERVICE_URL
})

const { readable, writable } = createEgressPassThroughStream(ctx, accounting, dataCid)

try {
ctx.waitUntil(response.body.pipeTo(writable))
} catch (error) {
console.error('Error in egress tracker handler:', error)
// Original response in case of an error to avoid breaking the chain and serve the content
return response
}
const responseBody = response.body.pipeThrough(
createByteCountStream((totalBytesServed) => {
// Non-blocking call to the accounting service to record egress
if (totalBytesServed > 0) {
ctx.waitUntil(
accounting.record(dataCid, totalBytesServed, new Date().toISOString())
)
}
})
)

return new Response(readable, {
return new Response(responseBody, {
status: response.status,
statusText: response.statusText,
headers: response.headers
Expand All @@ -60,58 +54,39 @@ export function withEgressTracker (handler) {
* Creates a TransformStream to count bytes served to the client.
* It records egress when the stream is finalized without an error.
*
* @param {import('@web3-storage/gateway-lib/middleware').Context} ctx - The context object.
* @param {AccountingService} accounting - The accounting service instance to record egress.
* @param {import('@web3-storage/gateway-lib/handlers').CID} dataCid - The CID of the served content.
* @returns {TransformStream} - The created TransformStream.
* @param {(totalBytesServed: number) => void} onClose
* @template {Uint8Array} T
* @returns {TransformStream<T, T>} - The created TransformStream.
*/
function createEgressPassThroughStream (ctx, accounting, dataCid) {
function createByteCountStream (onClose) {
let totalBytesServed = 0

return new TransformStream({
/**
* The start function is called when the stream is being initialized.
* It resets the total bytes served to 0.
*/
start () {
totalBytesServed = 0
},
/**
* The transform function is called for each chunk of the response body.
* It enqueues the chunk and updates the total bytes served.
* If an error occurs, it signals an error to the controller and logs it.
* The bytes are not counted in case of enqueuing an error.
* @param {Uint8Array} chunk
* @param {TransformStreamDefaultController} controller
*/
async transform (chunk, controller) {
try {
controller.enqueue(chunk)
totalBytesServed += chunk.byteLength
} catch (error) {
console.error('Error while counting egress bytes:', error)
console.error('Error while counting bytes:', error)
controller.error(error)
}
},

/**
* The flush function is called when the stream is being finalized,
* which is when the response is being sent to the client.
* So before the response is sent, we record the egress.
* It is called only once and it triggers a non-blocking call to the accounting service.
* So before the response is sent, we record the egress using the callback.
* If an error occurs, the egress is not recorded.
* NOTE: The flush function is NOT called in case of an stream error.
* NOTE: The flush function is NOT called in case of a stream error.
*/
async flush (controller) {
try {
// Non-blocking call to the accounting service to record egress
if (totalBytesServed > 0) {
ctx.waitUntil(accounting.record(dataCid, totalBytesServed, new Date().toISOString()))
}
} catch (error) {
console.error('Error while recording egress:', error)
controller.error(error)
}
async flush () {
onClose(totalBytesServed)
}
})
}
2 changes: 1 addition & 1 deletion test/fixtures/worker-fixture.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const __dirname = path.dirname(__filename)
*/
const wranglerEnv = process.env.WRANGLER_ENV || 'integration'

const DEBUG = process.env.DEBUG === 'true' || false
const DEBUG = process.env.DEBUG === 'true'

/**
* Worker information object
Expand Down
46 changes: 23 additions & 23 deletions test/unit/middleware/withEgressTracker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { describe, it, afterEach, before } from 'mocha'
import { assert, expect } from 'chai'
import sinon from 'sinon'
import { CID } from 'multiformats'
import { withEgressHandler } from '../../../src/handlers/egress-tracker.js'
import { withEgressTracker } from '../../../src/middleware/withEgressTracker.js'
import { Builder, toBlobKey } from '../../helpers/builder.js'
import { CARReaderStream } from 'carstream'

Expand All @@ -27,19 +27,19 @@ const createRequest = async ({ authorization } = {}) =>
})

const env =
/** @satisfies {import('../../../src/handlers/egress-tracker.types.js').Environment} */
/** @satisfies {import('../../../src/middleware/withEgressTracker.types.js').Environment} */
({
DEBUG: 'true',
ACCOUNTING_SERVICE_URL: 'http://example.com',
FF_EGRESS_TRACKER_ENABLED: 'true'
})

const accountingRecordMethodStub = sinon.stub()
// @ts-expect-error
.returns(async (cid, bytes, servedAt) => {
console.log(`[mock] record called with cid: ${cid}, bytes: ${bytes}, servedAt: ${servedAt}`)
return Promise.resolve()
})
.returns(
/** @type {import('../../../src/bindings.js').AccountingService['record']} */
async (cid, bytes, servedAt) => {
console.log(`[mock] record called with cid: ${cid}, bytes: ${bytes}, servedAt: ${servedAt}`)
})

/**
* Mock implementation of the AccountingService.
Expand All @@ -58,7 +58,7 @@ const AccountingService = ({ serviceURL }) => {
}

const ctx =
/** @satisfies {import('../../../src/handlers/egress-tracker.js').EgressTrackerContext} */
/** @satisfies {import('../../../src/middleware/withEgressTracker.js').EgressTrackerContext} */
({
dataCid: CID.parse('bafybeibv7vzycdcnydl5n5lbws6lul2omkm6a6b5wmqt77sicrwnhesy7y'),
waitUntil: sinon.stub().returns(undefined),
Expand Down Expand Up @@ -110,7 +110,7 @@ describe('withEgressTracker', async () => {

const innerHandler = sinon.stub().resolves(mockResponse)

const handler = withEgressHandler(innerHandler)
const handler = withEgressTracker(innerHandler)
const request = await createRequest()
const response = await handler(request, env, ctx)
// Ensure the response body is fully consumed
Expand All @@ -134,7 +134,7 @@ describe('withEgressTracker', async () => {
}), { status: 200 })

const innerHandler = sinon.stub().resolves(mockResponse)
const handler = withEgressHandler(innerHandler)
const handler = withEgressTracker(innerHandler)
const request = await createRequest()

const response = await handler(request, env, ctx)
Expand All @@ -160,7 +160,7 @@ describe('withEgressTracker', async () => {
}), { status: 200 })

const innerHandler = sinon.stub().resolves(mockResponse)
const handler = withEgressHandler(innerHandler)
const handler = withEgressTracker(innerHandler)
const request = await createRequest()

const response = await handler(request, env, ctx)
Expand Down Expand Up @@ -197,7 +197,7 @@ describe('withEgressTracker', async () => {
})

const innerHandler = sinon.stub().resolves(mockResponse)
const handler = withEgressHandler(innerHandler)
const handler = withEgressTracker(innerHandler)
const request = await createRequest()

const response = await handler(request, env, ctx)
Expand Down Expand Up @@ -233,7 +233,7 @@ describe('withEgressTracker', async () => {
}), { status: 200 })

const innerHandler = sinon.stub().resolves(mockResponse)
const handler = withEgressHandler(innerHandler)
const handler = withEgressTracker(innerHandler)
const request = await createRequest()

const response = await handler(request, env, ctx)
Expand All @@ -249,7 +249,7 @@ describe('withEgressTracker', async () => {
describe('withEgressTracker -> Feature Flag', () => {
it('should not track egress if the feature flag is disabled', async () => {
const innerHandler = sinon.stub().resolves(new Response(null, { status: 200 }))
const handler = withEgressHandler(innerHandler)
const handler = withEgressTracker(innerHandler)
const request = await createRequest()
const envDisabled = { ...env, FF_EGRESS_TRACKER_ENABLED: 'false' }

Expand All @@ -264,7 +264,7 @@ describe('withEgressTracker', async () => {
it('should not track egress for non-OK responses', async () => {
const mockResponse = new Response(null, { status: 404 })
const innerHandler = sinon.stub().resolves(mockResponse)
const handler = withEgressHandler(innerHandler)
const handler = withEgressTracker(innerHandler)
const request = await createRequest()

const response = await handler(request, env, ctx)
Expand All @@ -276,7 +276,7 @@ describe('withEgressTracker', async () => {
it('should not track egress if the response has no body', async () => {
const mockResponse = new Response(null, { status: 200 })
const innerHandler = sinon.stub().resolves(mockResponse)
const handler = withEgressHandler(innerHandler)
const handler = withEgressTracker(innerHandler)
const request = await createRequest()

const response = await handler(request, env, ctx)
Expand Down Expand Up @@ -310,8 +310,8 @@ describe('withEgressTracker', async () => {
const innerHandler1 = sinon.stub().resolves(mockResponse1)
const innerHandler2 = sinon.stub().resolves(mockResponse2)

const handler1 = withEgressHandler(innerHandler1)
const handler2 = withEgressHandler(innerHandler2)
const handler1 = withEgressTracker(innerHandler1)
const handler2 = withEgressTracker(innerHandler2)

const request1 = await createRequest()
const request2 = await createRequest()
Expand Down Expand Up @@ -347,7 +347,7 @@ describe('withEgressTracker', async () => {
}), { status: 200, headers: { 'Content-Type': 'application/json' } })

const innerHandler = sinon.stub().resolves(mockResponse)
const handler = withEgressHandler(innerHandler)
const handler = withEgressTracker(innerHandler)
const request = await createRequest()

const response = await handler(request, env, ctx)
Expand All @@ -370,7 +370,7 @@ describe('withEgressTracker', async () => {
}), { status: 200 })

const innerHandler = sinon.stub().resolves(mockResponse)
const handler = withEgressHandler(innerHandler)
const handler = withEgressTracker(innerHandler)
const request = await createRequest()

const response = await handler(request, env, ctx)
Expand All @@ -391,7 +391,7 @@ describe('withEgressTracker', async () => {
}), { status: 200 })

const innerHandler = sinon.stub().resolves(mockResponse)
const handler = withEgressHandler(innerHandler)
const handler = withEgressTracker(innerHandler)
const request = await createRequest()

const response = await handler(request, env, ctx)
Expand All @@ -418,7 +418,7 @@ describe('withEgressTracker', async () => {
}), { status: 200 })

const innerHandler = sinon.stub().resolves(mockResponse)
const handler = withEgressHandler(innerHandler)
const handler = withEgressTracker(innerHandler)
const request = await createRequest()
const response = await handler(request, env, ctx)

Expand All @@ -445,7 +445,7 @@ describe('withEgressTracker', async () => {
}), { status: 200 })

const innerHandler = sinon.stub().resolves(mockResponse)
const handler = withEgressHandler(innerHandler)
const handler = withEgressTracker(innerHandler)
const request = await createRequest()

// Simulate an error in the accounting service record method
Expand Down

0 comments on commit 25982f4

Please sign in to comment.