Skip to content

Commit 979b13b

Browse files
author
Alan Shaw
committed
feat: add memory budgeting
1 parent c2be13d commit 979b13b

File tree

8 files changed

+139
-24
lines changed

8 files changed

+139
-24
lines changed

package-lock.json

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
"@ipld/dag-json": "^8.0.11",
2929
"@ipld/dag-pb": "^2.1.18",
3030
"@web3-storage/fast-unixfs-exporter": "^0.2.0",
31-
"@web3-storage/gateway-lib": "^1.3.0",
31+
"@web3-storage/gateway-lib": "^2.0.0",
3232
"cardex": "^1.0.0",
3333
"chardet": "^1.5.0",
3434
"dagula": "^4.1.0",

src/bindings.d.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { CID } from 'multiformats/cid'
22
import type { Context } from '@web3-storage/gateway-lib'
3+
import type { MemoryBudget } from './lib/mem-budget'
34

45
export {}
56

@@ -14,6 +15,10 @@ export interface CarCidsContext extends Context {
1415
carCids: CID[]
1516
}
1617

18+
export interface MemoryBudgetContext extends Context {
19+
memoryBudget: MemoryBudget
20+
}
21+
1722
export interface R2GetOptions {
1823
range?: {
1924
offset: number

src/index.js

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,21 @@ import {
66
withHttpGet,
77
withCdnCache,
88
withParsedIpfsUrl,
9+
withFixedLengthStream,
910
composeMiddleware
1011
} from '@web3-storage/gateway-lib/middleware'
1112
import {
1213
handleUnixfs,
1314
handleBlock,
1415
handleCar
1516
} from '@web3-storage/gateway-lib/handlers'
16-
import { withDagula, withCarCids, withUnsupportedFeaturesHandler } from './middleware.js'
17+
import {
18+
withDagula,
19+
withCarCids,
20+
withUnsupportedFeaturesHandler,
21+
withMemoryBudget,
22+
withResponseMemoryRelease
23+
} from './middleware.js'
1724

1825
/**
1926
* @typedef {import('./bindings').Environment} Environment
@@ -35,7 +42,10 @@ export default {
3542
withHttpGet,
3643
withParsedIpfsUrl,
3744
withCarCids,
38-
withDagula
45+
withMemoryBudget,
46+
withDagula,
47+
withResponseMemoryRelease,
48+
withFixedLengthStream
3949
)
4050
return middleware(handler)(request, env, ctx)
4151
}

src/lib/blockstore.js

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@ export class R2Blockstore {
2525
* @param {R2Bucket} dataBucket
2626
* @param {R2Bucket} indexBucket
2727
* @param {CID[]} carCids
28+
* @param {import('./mem-budget.js').MemoryBudget} memoryBudget
2829
*/
29-
constructor (dataBucket, indexBucket, carCids) {
30+
constructor (dataBucket, indexBucket, carCids, memoryBudget) {
3031
this._dataBucket = dataBucket
32+
this._memoryBudget = memoryBudget
3133
this._idx = new MultiCarIndex()
3234
for (const carCid of carCids) {
3335
this._idx.addIndex(carCid, new StreamingCarIndex((async function * () {
@@ -54,14 +56,10 @@ export class R2Blockstore {
5456

5557
const reader = res.body.getReader()
5658
const bytesReader = asyncIterableReader((async function * () {
57-
try {
58-
while (true) {
59-
const { done, value } = await reader.read()
60-
if (done) return
61-
yield value
62-
}
63-
} finally {
64-
reader.releaseLock()
59+
while (true) {
60+
const { done, value } = await reader.read()
61+
if (done) return
62+
yield value
6563
}
6664
})())
6765

@@ -110,6 +108,7 @@ export class BatchingR2Blockstore extends R2Blockstore {
110108
}
111109

112110
async #processBatch () {
111+
console.log('processing batch')
113112
const batches = this.#batches
114113
const batchBlocks = this.#batchBlocks
115114
this.#batches = new Map()
@@ -121,7 +120,9 @@ export class BatchingR2Blockstore extends R2Blockstore {
121120
if (!batch) break
122121
const carPath = `${carCid}/${carCid}.car`
123122
const range = { offset: batch[0], length: batch[batch.length - 1] - batch[0] + MAX_BLOCK_LENGTH }
124-
console.log(`requesting ${batch.length} blocks from ${carCid} (${range.length} bytes @ ${range.offset})`)
123+
await this._memoryBudget.request(range.length)
124+
125+
console.log(`fetching ${batch.length} blocks from ${carCid} (${range.length} bytes @ ${range.offset})`)
125126
const res = await this._dataBucket.get(carPath, { range })
126127
if (!res) {
127128
for (const blocks of batchBlocks.values()) {
@@ -139,6 +140,7 @@ export class BatchingR2Blockstore extends R2Blockstore {
139140
}
140141
})())
141142

143+
let bytesResolved = 0
142144
while (true) {
143145
try {
144146
const blockHeader = await readBlockHead(bytesReader)
@@ -151,15 +153,19 @@ export class BatchingR2Blockstore extends R2Blockstore {
151153
// console.log(`got wanted block for ${blockHeader.cid}`)
152154
blocks.forEach(b => b.resolve({ cid: blockHeader.cid, bytes }))
153155
batchBlocks.delete(key)
156+
bytesResolved += bytes.length
154157
}
155158
} catch {
156159
break
157160
}
158161
}
159162

163+
// release the bytes we didn't send on (they are released later)
164+
this._memoryBudget.release(range.length - bytesResolved)
160165
reader.cancel()
161166
}
162167
}
168+
console.log('finished processing batch')
163169
}
164170

165171
/** @param {CID} cid */

src/lib/mem-budget.js

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import defer from 'p-defer'
2+
3+
export class MemoryBudget {
4+
/** @type {Array<{ size: number, deferred: import('p-defer').DeferredPromise<void> }>} */
5+
#requests = []
6+
7+
/**
8+
* Allocated bytes.
9+
* @type {number}
10+
*/
11+
#allocated = 0
12+
13+
/**
14+
* Maximum number of bytes that can be allocated.
15+
* @type {number}
16+
*/
17+
#max
18+
19+
/**
20+
* @param {number} max
21+
*/
22+
constructor (max) {
23+
this.#max = max
24+
}
25+
26+
/**
27+
* @param {number} size
28+
*/
29+
async request (size) {
30+
if (!this.#requests.length && this.#allocated + size < this.#max) {
31+
this.#allocated += size
32+
console.log(`allocating ${size} bytes, ${Math.floor(this.#allocated / this.#max * 100)}% allocated`)
33+
return
34+
}
35+
const deferred = defer()
36+
this.#requests.push({ size, deferred })
37+
return deferred.promise
38+
}
39+
40+
/**
41+
* @param {number} size
42+
*/
43+
release (size) {
44+
this.#allocated = Math.max(0, this.#allocated - size)
45+
console.log(`released ${size} bytes, ${Math.floor(this.#allocated / this.#max * 100)}% allocated`)
46+
while (this.#requests.length) {
47+
const req = this.#requests[0]
48+
if (this.#allocated + req.size >= this.#max) break
49+
this.#allocated += req.size
50+
console.log(`allocating ${req.size} bytes, ${Math.floor(this.#allocated / this.#max * 100)}% allocated`)
51+
req.deferred.resolve()
52+
this.#requests.shift()
53+
}
54+
}
55+
}

src/middleware.js

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,18 @@ import { Dagula } from 'dagula'
33
import { CarReader } from '@ipld/car'
44
import { parseCid, HttpError, toIterable } from '@web3-storage/gateway-lib/util'
55
import { BatchingR2Blockstore } from './lib/blockstore.js'
6+
import { MemoryBudget } from './lib/mem-budget.js'
67

78
const MAX_CAR_BYTES_IN_MEMORY = 1024 * 1024 * 5
89
const CAR_CODE = 0x0202
10+
const MAX_MEMORY_BUDGET = 1024 * 1024 * 50
911

1012
/**
1113
* @typedef {import('./bindings').Environment} Environment
1214
* @typedef {import('@web3-storage/gateway-lib').IpfsUrlContext} IpfsUrlContext
1315
* @typedef {import('./bindings').CarCidsContext} CarCidsContext
1416
* @typedef {import('@web3-storage/gateway-lib').DagulaContext} DagulaContext
17+
* @typedef {import('./bindings').MemoryBudgetContext} MemoryBudgetContext
1518
*/
1619

1720
/**
@@ -75,15 +78,50 @@ export function withCarCids (handler) {
7578
}
7679
}
7780

81+
/**
82+
* @type {import('@web3-storage/gateway-lib').Middleware<MemoryBudgetContext>}
83+
*/
84+
export function withMemoryBudget (handler) {
85+
return async (request, env, ctx) => {
86+
const memoryBudget = new MemoryBudget(MAX_MEMORY_BUDGET)
87+
return handler(request, env, { ...ctx, memoryBudget })
88+
}
89+
}
90+
91+
/**
92+
* @type {import('@web3-storage/gateway-lib').Middleware<MemoryBudgetContext, MemoryBudgetContext>}
93+
*/
94+
export function withResponseMemoryRelease (handler) {
95+
return async (request, env, ctx) => {
96+
const response = await handler(request, env, ctx)
97+
98+
const body = response.body
99+
if (!body) return response
100+
101+
console.log('adding response memory release transform...')
102+
103+
return new Response(
104+
body.pipeThrough(new TransformStream({
105+
transform (chunk, controller) {
106+
ctx.memoryBudget.release(chunk.length)
107+
controller.enqueue(chunk)
108+
}
109+
})),
110+
response
111+
)
112+
}
113+
}
114+
78115
/**
79116
* Creates a dagula instance backed by the R2 blockstore.
80-
* @type {import('@web3-storage/gateway-lib').Middleware<DagulaContext & CarCidsContext & IpfsUrlContext, CarCidsContext & IpfsUrlContext, Environment>}
117+
* @type {import('@web3-storage/gateway-lib').Middleware<DagulaContext & MemoryBudgetContext & CarCidsContext & IpfsUrlContext, MemoryBudgetContext & CarCidsContext & IpfsUrlContext, Environment>}
81118
*/
82119
export function withDagula (handler) {
83120
return async (request, env, ctx) => {
84-
const { carCids, searchParams } = ctx
121+
const { carCids, searchParams, memoryBudget } = ctx
85122
if (!carCids) throw new Error('missing CAR CIDs in context')
86123
if (!searchParams) throw new Error('missing URL search params in context')
124+
if (!memoryBudget) throw new Error('missing memory budget instance')
87125

88126
/** @type {import('dagula').Blockstore?} */
89127
let blockstore = null
@@ -99,7 +137,7 @@ export function withDagula (handler) {
99137
}
100138

101139
if (!blockstore) {
102-
blockstore = new BatchingR2Blockstore(env.CARPARK, env.SATNAV, carCids)
140+
blockstore = new BatchingR2Blockstore(env.CARPARK, env.SATNAV, carCids, memoryBudget)
103141
}
104142

105143
const dagula = new Dagula(blockstore)

wrangler.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ name = "freeway"
22
main = "./dist/worker.mjs"
33
compatibility_date = "2022-10-03"
44
compatibility_flags = [
5-
"streams_enable_constructors"
5+
"streams_enable_constructors",
6+
"transformstream_enable_standard_constructor"
67
]
78
r2_buckets = [
89
{ binding = "CARPARK", bucket_name = "carpark-dev-0" },

0 commit comments

Comments
 (0)