Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions packages/synapse-core/src/sp/upload-streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export async function uploadPieceStreaming(
? new Blob([options.data as Uint8Array<ArrayBuffer>]).stream()
: (options.data as ReadableStream) // ReadableStream types dont match between browsers and Node.js

let size = isUint8Array(options.data) ? options.data.length : options.size
const size = isUint8Array(options.data) ? options.data.length : options.size

// Add size tracking and progress reporting
let bytesUploaded = 0
Expand Down Expand Up @@ -140,10 +140,15 @@ export async function uploadPieceStreaming(
// both execute regardless of path.
let fetchBody: ReadableStream | Blob
let fetchOptions: Record<string, string> = {}
// PUT /pdp/piece/uploads/{uuid}
const headers: Record<string, string> = {
'Content-Type': 'application/octet-stream',
}

if (supportsStreamingFetchBody()) {
fetchBody = bodyStream
fetchOptions = { duplex: 'half' }
// Length on a ReadableStream body is conveyed via chunked encoding.
} else {
const chunks: Uint8Array[] = []
let totalSize = 0
Expand All @@ -155,17 +160,7 @@ export async function uploadPieceStreaming(
totalSize += value.length
}
fetchBody = new Blob(chunks as BlobPart[])
// Override Content-Length with the actual accumulated size since we now
// know it precisely, even for ReadableStream inputs without a pre-set size
if (size == null) {
size = totalSize
}
}

// PUT /pdp/piece/uploads/{uuid}
const headers: Record<string, string> = {
'Content-Type': 'application/octet-stream',
...(size == null ? {} : { 'Content-Length': size.toString() }),
headers['Content-Length'] = (size ?? totalSize).toString()
}

const uploadResponse = await request.put(new URL(`pdp/piece/uploads/${uploadUuid}`, options.serviceURL), {
Expand Down
3 changes: 2 additions & 1 deletion packages/synapse-core/src/utils/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ export function supportsStreamingFetchBody(): boolean {
if (_supportsStreamBody !== undefined) return _supportsStreamBody
try {
let duplexAccessed = false
const hasContentType = new Request('', {
// Absolute URL required: Node throws on `''` (no base URL).
const hasContentType = new Request('http://x', {
body: new ReadableStream(),
method: 'POST',
get duplex() {
Expand Down
27 changes: 27 additions & 0 deletions packages/synapse-core/test/sp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,33 @@ InvalidSignature(address expected, address actual)
assert.strictEqual(progressCalls[progressCalls.length - 1], testData.length)
})

it('should not set Content-Length on a streaming ReadableStream body', async () => {
// Length on a streaming body is conveyed via chunked encoding.
const pieceCid = Piece.parse(mockPieceCidStr)
const testData = new Uint8Array(SIZE_CONSTANTS.MIN_UPLOAD_SIZE).fill(0x42)
let capturedHeaders: Headers | undefined

server.use(
postPieceUploadsHandler(mockUuid),
http.put(`https://pdp.example.com/pdp/piece/uploads/${mockUuid}`, async ({ request }) => {
capturedHeaders = request.headers
// Drain the body so the upstream pipeline runs to completion.
await request.arrayBuffer()
return HttpResponse.text('No Content', { status: 204 })
}),
finalizePieceUploadHandler(mockUuid, mockPieceCidStr)
)

await uploadPieceStreaming({
serviceURL: 'https://pdp.example.com',
data: testData,
pieceCid,
})

assert.isDefined(capturedHeaders)
assert.strictEqual(capturedHeaders?.get('content-length'), null)
})

it('should fail when session creation returns error', async () => {
const pieceCid = Piece.parse(mockPieceCidStr)
const testData = new Uint8Array(SIZE_CONSTANTS.MIN_UPLOAD_SIZE).fill(0x42)
Expand Down
Loading