Skip to content

Commit

Permalink
Emit progress events in Node.js again (#656)
Browse files Browse the repository at this point in the history
* Emit progress events in Node.js again

* Add a test for progress events when sending Buffers
  • Loading branch information
Acconut committed Dec 7, 2023
1 parent 14e57fa commit 8cc91de
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 0 deletions.
58 changes: 58 additions & 0 deletions lib/node/httpStack.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,16 @@ class Request {
})

if (body instanceof Readable) {
// Readable stream are piped through a PassThrough instance, which
// counts the number of bytes passed through. This is used, for example,
// when an fs.ReadStream is provided to tus-js-client.
body.pipe(new ProgressEmitter(this._progressHandler)).pipe(req)
} else if (body instanceof Uint8Array) {
// For Buffers and Uint8Arrays (in Node.js all buffers are instances of Uint8Array),
// we write chunks of the buffer to the stream and use that to track the progress.
// This is used when either a Buffer or a normal readable stream is provided
// to tus-js-client.
writeBufferToStreamWithProgress(req, body, this._progressHandler)
} else {
req.end(body)
}
Expand Down Expand Up @@ -153,3 +162,52 @@ class ProgressEmitter extends Transform {
callback(null, chunk)
}
}

// writeBufferToStreamWithProgress writes chunks from `source` (either a
// Buffer or Uint8Array) to the readable stream `stream`.
// The size of the chunk depends on the stream's highWaterMark to fill the
// stream's internal buffer as best as possible.
// If the internal buffer is full, the callback `onprogress` will be invoked
// to notify about the write progress. Writing will be resumed once the internal
// buffer is empty, as indicated by the emitted `drain` event.
// See https://nodejs.org/docs/latest/api/stream.html#buffering for more details
// on the buffering behavior of streams.
const writeBufferToStreamWithProgress = (stream, source, onprogress) => {
onprogress = throttle(onprogress, 100, {
leading: true,
trailing: false,
})

let offset = 0

function writeNextChunk() {
// Take at most the amount of bytes from highWaterMark. This should fill the streams
// internal buffer already.
const chunkSize = Math.min(stream.writableHighWaterMark, source.length - offset)

// Note: We use subarray instead of slice because it works without copying data for
// Buffers and Uint8Arrays.
const chunk = source.subarray(offset, offset + chunkSize)
offset += chunk.length

// `write` returns true if the internal buffer is not full and we should write more.
// If the stream is destroyed because the request is aborted, it will return false
// and no 'drain' event is emitted, so won't continue writing data.
const canContinue = stream.write(chunk)

if (!canContinue) {
// If the buffer is full, wait for the 'drain' event to write more data.
stream.once('drain', writeNextChunk)
onprogress(offset)
} else if (offset < source.length) {
// If there's still data to write and the buffer is not full, write next chunk.
writeNextChunk()
} else {
// If all data has been written, close the stream if needed, and emit a 'finish' event.
stream.end()
}
}

// Start writing the first chunk.
writeNextChunk()
}
40 changes: 40 additions & 0 deletions test/spec/test-node-specific.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ const stream = require('stream')
const temp = require('temp')
const fs = require('fs')
const https = require('https')
const http = require('http')
const crypto = require('crypto')
const intoStream = require('into-stream')
const { once } = require('events')
const tus = require('../..')
const assertUrlStorage = require('./helpers/assertUrlStorage')
const { TestHttpStack, waitableFunction } = require('./helpers/utils')
Expand Down Expand Up @@ -400,6 +403,43 @@ describe('tus', () => {
expect(req.getUnderlyingObject().agent).toBe(customAgent)
expect(req.getUnderlyingObject().agent).not.toBe(https.globalAgent)
})

it('should emit progress events when sending a Buffer', async () => {
// Start a simple HTTP server on a random port that accepts POST requests.
const server = http.createServer((req, res) => {
if (req.method === 'POST') {
req.on('data', () => {})
req.on('end', () => {
res.writeHead(200)
res.end('Data received')
})
} else {
res.writeHead(404)
res.end('Not found')
}
})

server.listen(0)
await once(server, 'listening')
const { port } = server.address()

const progressEvents = []

// Send POST request with 20MB of random data
const randomData = crypto.randomBytes(20 * 1024 * 1024)
const stack = new tus.DefaultHttpStack({})
const req = stack.createRequest('POST', `http://localhost:${port}`)
req.setProgressHandler((bytesSent) => progressEvents.push(bytesSent))
await req.send(randomData)

server.close()

// We should have received progress events and at least one event should not be for 0% or 100%.
expect(progressEvents.length).toBeGreaterThan(0)
expect(
progressEvents.some((bytesSent) => bytesSent !== 0 && bytesSent !== randomData.length),
).toBeTrue()
})
})

describe('#StreamSource', () => {
Expand Down

0 comments on commit 8cc91de

Please sign in to comment.