From 143e36b28de39a035efb461507c9779b5c72a2f7 Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Fri, 7 Jun 2024 10:33:03 +0200 Subject: [PATCH 01/16] Add hasPostProcessing check --- db-service/lib/SQLService.js | 7 ++++--- hana/lib/HANAService.js | 10 ++++++---- hana/lib/drivers/hdb.js | 22 ++++------------------ sqlite/lib/SQLiteService.js | 24 +++++++++++++++++++----- 4 files changed, 33 insertions(+), 30 deletions(-) diff --git a/db-service/lib/SQLService.js b/db-service/lib/SQLService.js index 9fc9f9f38..3c11961ac 100644 --- a/db-service/lib/SQLService.js +++ b/db-service/lib/SQLService.js @@ -113,7 +113,7 @@ class SQLService extends DatabaseService { * Handler for SELECT * @type {Handler} */ - async onSELECT({ query, data }) { + async onSELECT({ query, data, hasPostProcessing }) { if (!query.target) { try { this.infer(query) } catch (e) { /**/ } } @@ -125,9 +125,10 @@ class SQLService extends DatabaseService { const { sql, values, cqn } = this.cqn2sql(query, data) const expand = query.SELECT.expand delete query.SELECT.expand + const isOne = cqn.SELECT.one || query.SELECT.from?.ref?.[0].cardinality?.max === 1 let ps = await this.prepare(sql) - let rows = await ps.all(values) + let rows = await hasPostProcessing === false ? ps.stream(values, isOne) : ps.all(values) if (rows.length) if (expand) rows = rows.map(r => (typeof r._json_ === 'string' ? JSON.parse(r._json_) : r._json_ || r)) @@ -153,7 +154,7 @@ class SQLService extends DatabaseService { return SQLService._arrayWithCount(rows, await this.count(query, rows)) } - return cqn.SELECT.one || query.SELECT.from?.ref?.[0].cardinality?.max === 1 ? rows[0] : rows + return hasPostProcessing !== false && isOne ? rows[0] : rows } /** diff --git a/hana/lib/HANAService.js b/hana/lib/HANAService.js index 5fca68015..5c4456f78 100644 --- a/hana/lib/HANAService.js +++ b/hana/lib/HANAService.js @@ -114,7 +114,7 @@ class HANAService extends SQLService { } async onSELECT(req) { - const { query, data } = req + const { query, data, hasPostProcessing } = req if (!query.target) { try { this.infer(query) } catch (e) { /**/ } @@ -134,11 +134,13 @@ class HANAService extends SQLService { delete query.SELECT.expand const isSimple = temporary.length + blobs.length + withclause.length === 0 + const isOne = cqn.SELECT.one || query.SELECT.from.ref?.[0].cardinality?.max === 1 + const canStream = hasPostProcessing === false && !isLockQuery // REVISIT: add prepare options when param:true is used const sqlScript = isLockQuery || isSimple ? sql : this.wrapTemporary(temporary, withclause, blobs) - let rows = (values?.length || blobs.length > 0) - ? await (await this.prepare(sqlScript, blobs.length)).all(values || []) + let rows = (values?.length || blobs.length > 0 || canStream) + ? await (await this.prepare(sqlScript, blobs.length))[canStream ? 'stream' : 'all'](values || [], isOne) : await this.exec(sqlScript) if (isLockQuery) { @@ -156,7 +158,7 @@ class HANAService extends SQLService { // REVISIT: the runtime always expects that the count is preserved with .map, required for renaming in mocks return HANAService._arrayWithCount(rows, await this.count(query, rows)) } - return cqn.SELECT.one || query.SELECT.from.ref?.[0].cardinality?.max === 1 ? rows[0] : rows + return isOne && !canStream ? rows[0] : rows } async onINSERT({ query, data }) { diff --git a/hana/lib/drivers/hdb.js b/hana/lib/drivers/hdb.js index 587ab33f4..7a46755ce 100644 --- a/hana/lib/drivers/hdb.js +++ b/hana/lib/drivers/hdb.js @@ -114,29 +114,15 @@ class HDBDriver extends driver { ret.stream = async (values, one) => { const stmt = await ret._prep const rs = await prom(stmt, 'execute')(values || []) - const cols = rs.metadata - // If the query only returns a single row with a single blob it is the final stream - if (cols.length === 1 && cols[0].length === -1) { - const rowStream = rs.createObjectStream() - const { done, value } = await rowStream[Symbol.asyncIterator]().next() - if (done || !value[cols[0].columnName]) return null - const blobStream = value[cols[0].columnName].createReadStream() - blobStream.on('close', () => { - rowStream.end() - rs.close() - }) - return blobStream - } - // Create ResultSet stream from ResultSet iterator - return Readable.from(rsIterator(rs, one)) + return Readable.from(rsIterator(rs, one), { objectMode: false }) } return ret } _getResultForProcedure(rows, outParameters) { // on hdb, rows already contains results for scalar params - const isArray = Array.isArray(rows) - const result = isArray ? {...rows[0]} : {...rows} + const isArray = Array.isArray(rows) + const result = isArray ? { ...rows[0] } : { ...rows } // merge table output params into scalar params const args = isArray ? rows.slice(1) : [] @@ -146,7 +132,7 @@ class HDBDriver extends driver { result[params[i].PARAMETER_NAME] = args[i] } } - + return result } diff --git a/sqlite/lib/SQLiteService.js b/sqlite/lib/SQLiteService.js index 0615c72a4..1c05087d9 100644 --- a/sqlite/lib/SQLiteService.js +++ b/sqlite/lib/SQLiteService.js @@ -77,7 +77,7 @@ class SQLiteService extends SQLService { run: (..._) => this._run(stmt, ..._), get: (..._) => stmt.get(..._), all: (..._) => stmt.all(..._), - stream: (..._) => this._stream(stmt, ..._), + stream: (..._) => this._allStream(stmt, ..._), } } catch (e) { e.message += ' in:\n' + (e.query = sql) @@ -99,6 +99,7 @@ class SQLiteService extends SQLService { } async *_iterator(rs, one) { + const pageSize = (1 << 16) // Allow for both array and iterator result sets const first = Array.isArray(rs) ? { done: !rs[0], value: rs[0] } : rs.next() if (first.done) return @@ -109,13 +110,26 @@ class SQLiteService extends SQLService { return } - yield '[' + let buffer = '[' + first.value[0] // Print first value as stand alone to prevent comma check inside the loop - yield first.value[0] for (const row of rs) { - yield `,${row[0]}` + buffer += `,${row[0]}` + if (buffer.length > pageSize) { + yield buffer + buffer = '' + } } - yield ']' + buffer += ']' + yield buffer + } + + async _allStream(stmt, binding_params, one) { + stmt = stmt.__proto__ || stmt + stmt.raw(true) + const get = stmt.get(binding_params) + // if (!get) return [] + const rs = stmt.iterate(binding_params) + return Readable.from(this._iterator(rs, one), { objectMode: false }) } exec(sql) { From 86073b107057a63a2e8878be01b17bb800458adb Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Fri, 7 Jun 2024 11:46:55 +0200 Subject: [PATCH 02/16] Close resultset in case of early close of stream --- sqlite/lib/SQLiteService.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sqlite/lib/SQLiteService.js b/sqlite/lib/SQLiteService.js index 1c05087d9..8d19ba9ae 100644 --- a/sqlite/lib/SQLiteService.js +++ b/sqlite/lib/SQLiteService.js @@ -127,9 +127,13 @@ class SQLiteService extends SQLService { stmt = stmt.__proto__ || stmt stmt.raw(true) const get = stmt.get(binding_params) - // if (!get) return [] + if (!get) return [] const rs = stmt.iterate(binding_params) - return Readable.from(this._iterator(rs, one), { objectMode: false }) + const stream = Readable.from(this._iterator(rs, one), { objectMode: false }) + stream.on('close', () => { + rs.return() // finish result set when closed early + }) + return stream } exec(sql) { From bac0f37e299b1933091a8dec219ed98da332b59a Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Wed, 19 Jun 2024 14:35:37 +0200 Subject: [PATCH 03/16] Always fetch count when streaming results --- db-service/lib/SQLService.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db-service/lib/SQLService.js b/db-service/lib/SQLService.js index 3c11961ac..19d05c7f0 100644 --- a/db-service/lib/SQLService.js +++ b/db-service/lib/SQLService.js @@ -284,7 +284,7 @@ class SQLService extends DatabaseService { * @returns {Promise} */ async count(query, ret) { - if (ret) { + if (ret?.length) { const { one, limit: _ } = query.SELECT, n = ret.length const [max, offset = 0] = one ? [1] : _ ? [_.rows?.val, _.offset?.val] : [] From 8e100104c30e55fd2f27cb914089f05ed2a29f8d Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Thu, 20 Jun 2024 09:59:24 +0200 Subject: [PATCH 04/16] Start adding object mode streaming --- db-service/lib/SQLService.js | 6 +++--- hana/lib/HANAService.js | 6 +++--- hana/lib/drivers/hdb.js | 4 ++-- sqlite/lib/SQLiteService.js | 6 +++--- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/db-service/lib/SQLService.js b/db-service/lib/SQLService.js index 19d05c7f0..224294257 100644 --- a/db-service/lib/SQLService.js +++ b/db-service/lib/SQLService.js @@ -113,11 +113,11 @@ class SQLService extends DatabaseService { * Handler for SELECT * @type {Handler} */ - async onSELECT({ query, data, hasPostProcessing }) { + async onSELECT({ query, data, hasPostProcessing, iterator }) { if (!query.target) { try { this.infer(query) } catch (e) { /**/ } } - if (query.target && !query.target._unresolved) { + if (query.SELECT.expand !== false && query.target && !query.target._unresolved) { // Will return multiple rows with objects inside query.SELECT.expand = 'root' } @@ -128,7 +128,7 @@ class SQLService extends DatabaseService { const isOne = cqn.SELECT.one || query.SELECT.from?.ref?.[0].cardinality?.max === 1 let ps = await this.prepare(sql) - let rows = await hasPostProcessing === false ? ps.stream(values, isOne) : ps.all(values) + let rows = await (hasPostProcessing === false || iterator) ? ps.stream(values, isOne, iterator) : ps.all(values) if (rows.length) if (expand) rows = rows.map(r => (typeof r._json_ === 'string' ? JSON.parse(r._json_) : r._json_ || r)) diff --git a/hana/lib/HANAService.js b/hana/lib/HANAService.js index 5c4456f78..20d701ace 100644 --- a/hana/lib/HANAService.js +++ b/hana/lib/HANAService.js @@ -114,7 +114,7 @@ class HANAService extends SQLService { } async onSELECT(req) { - const { query, data, hasPostProcessing } = req + const { query, data, hasPostProcessing, iterator } = req if (!query.target) { try { this.infer(query) } catch (e) { /**/ } @@ -135,12 +135,12 @@ class HANAService extends SQLService { const isSimple = temporary.length + blobs.length + withclause.length === 0 const isOne = cqn.SELECT.one || query.SELECT.from.ref?.[0].cardinality?.max === 1 - const canStream = hasPostProcessing === false && !isLockQuery + const canStream = (hasPostProcessing === false || iterator) && !isLockQuery // REVISIT: add prepare options when param:true is used const sqlScript = isLockQuery || isSimple ? sql : this.wrapTemporary(temporary, withclause, blobs) let rows = (values?.length || blobs.length > 0 || canStream) - ? await (await this.prepare(sqlScript, blobs.length))[canStream ? 'stream' : 'all'](values || [], isOne) + ? await (await this.prepare(sqlScript, blobs.length))[canStream ? 'stream' : 'all'](values || [], isOne, iterator) : await this.exec(sqlScript) if (isLockQuery) { diff --git a/hana/lib/drivers/hdb.js b/hana/lib/drivers/hdb.js index 7a46755ce..4d25cf4fd 100644 --- a/hana/lib/drivers/hdb.js +++ b/hana/lib/drivers/hdb.js @@ -111,10 +111,10 @@ class HDBDriver extends driver { return this._getResultForProcedure(rows, outParameters) } - ret.stream = async (values, one) => { + ret.stream = async (values, one, objectMode) => { const stmt = await ret._prep const rs = await prom(stmt, 'execute')(values || []) - return Readable.from(rsIterator(rs, one), { objectMode: false }) + return Readable.from(rsIterator(rs, one), { objectMode }) } return ret } diff --git a/sqlite/lib/SQLiteService.js b/sqlite/lib/SQLiteService.js index 8d19ba9ae..0b5e78216 100644 --- a/sqlite/lib/SQLiteService.js +++ b/sqlite/lib/SQLiteService.js @@ -98,7 +98,7 @@ class SQLiteService extends SQLService { return stmt.run(binding_params) } - async *_iterator(rs, one) { + async *_iteratorRaw(rs, one) { const pageSize = (1 << 16) // Allow for both array and iterator result sets const first = Array.isArray(rs) ? { done: !rs[0], value: rs[0] } : rs.next() @@ -123,13 +123,13 @@ class SQLiteService extends SQLService { yield buffer } - async _allStream(stmt, binding_params, one) { + async _allStream(stmt, binding_params, one, objectMode) { stmt = stmt.__proto__ || stmt stmt.raw(true) const get = stmt.get(binding_params) if (!get) return [] const rs = stmt.iterate(binding_params) - const stream = Readable.from(this._iterator(rs, one), { objectMode: false }) + const stream = Readable.from(objectMode ? rs : this._iteratorRaw(rs, one), { objectMode }) stream.on('close', () => { rs.return() // finish result set when closed early }) From cedde73be0d9e9fb114d71571d21ea4d83794da5 Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Wed, 26 Jun 2024 11:21:41 +0200 Subject: [PATCH 05/16] Intermediate nested object stream --- hana/lib/drivers/base.js | 9 +- hana/lib/drivers/hdb.js | 368 ++++++++++++++++++++++++++- test/scenarios/bookshop/read.test.js | 30 +++ 3 files changed, 402 insertions(+), 5 deletions(-) diff --git a/hana/lib/drivers/base.js b/hana/lib/drivers/base.js index 1c198f90a..35b9c8394 100644 --- a/hana/lib/drivers/base.js +++ b/hana/lib/drivers/base.js @@ -221,7 +221,7 @@ const handleLevel = function (levels, path, expands) { const property = path.slice(level.path.length + 2, -7) if (property && property in level.expands) { const is2Many = level.expands[property] - delete level.expands[property] + // delete level.expands[property] if (level.hasProperties) { buffer += ',' } else { @@ -236,6 +236,7 @@ const handleLevel = function (levels, path, expands) { index: 1, suffix: is2Many ? ']' : '', path: path.slice(0, -6), + result: level.expands[property], expands, }) } else { @@ -247,6 +248,7 @@ const handleLevel = function (levels, path, expands) { index: 0, suffix: '}', path: path, + result: levels.at(-1).result, expands, }) break @@ -261,6 +263,11 @@ const handleLevel = function (levels, path, expands) { } } if (level.suffix) buffer += level.suffix + if (level.expands) { + for (const expand in level.expands) { + if (level.expands[expand]?.push) level.expands[expand]?.push(null) + } + } } } return buffer diff --git a/hana/lib/drivers/hdb.js b/hana/lib/drivers/hdb.js index 4d25cf4fd..5a18e9f7e 100644 --- a/hana/lib/drivers/hdb.js +++ b/hana/lib/drivers/hdb.js @@ -114,7 +114,7 @@ class HDBDriver extends driver { ret.stream = async (values, one, objectMode) => { const stmt = await ret._prep const rs = await prom(stmt, 'execute')(values || []) - return Readable.from(rsIterator(rs, one), { objectMode }) + return rsIterator(rs, one, objectMode) } return ret } @@ -159,7 +159,366 @@ function* echoStream(ret) { yield ret } -async function* rsIterator(rs, one) { +async function rsNext(state, objectMode) { + let done = state.done() + if (done) { + done = await done + if (done) return { done } + } + + const _path = readString(state) + const path = (typeof _path === 'string' ? _path : await _path).toString('utf-8') + const _blobs = readString(state) + const blobs = JSON.parse(typeof _blobs === 'string' ? _blobs : await _blobs) + const _expands = readString(state) + const expands = JSON.parse(typeof _expands === 'string' ? _expands : await _expands) + + handleLevel(state.levels, path, expands) + + // REVISIT: allow streaming with both NVARCHAR and NCLOB + // Read JSON blob data + const value = readString(state, !objectMode) + + done = state.done() + if (done) { + done = await done + } + return { + // Iterator pattern + done, + value, + + // Additional row information + path, + blobs, + expands, + } +} + +async function rsNextObjectMode(state) { + let { done, value, path, blobs, expands } = await rsNext(state, true) + if (done) return { done } + + const json = JSON.parse(value) + + // Convert incoming blobs into their own native Readable streams + for (const blobColumn of state.blobs) { + // Skip all blobs that are not part of this row + if (!(blobColumn in blobs)) { + state.read(2) + continue + } + + let binaryStream = new Readable({ + read() { + if (binaryStream._prefetch) { + this.push(binaryStream._prefetch) + binaryStream._prefetch = null + } + this.resume() + } + }) + readBlob(state, { + end() { binaryStream.push(null) }, + write(chunk) { + if (!binaryStream.readableDidRead) { + binaryStream._prefetch = chunk + binaryStream.pause() + return new Promise((resolve, reject) => { + binaryStream.once('error', reject) + binaryStream.once('resume', resolve) + }) + } + binaryStream.push(chunk) + } + }) + ?.catch((err) => { if (binaryStream) binaryStream.emit('error', err) }) + json[blobColumn] = binaryStream // Return delayed blob read stream or null + } + + const level = state.levels[state.levels.length - 1] + + // Expose expanded columns as recursive Readable streams + for (const expandName in expands) { + const stream = level.expands[expandName] = json[expandName] = new Readable({ + objectMode: true, + read() { + state.stream.resume() + } + }) + state.streams.push(stream) + stream.once('end', () => { + state.streams = state.streams.filter(a => a !== stream) + }) + } + + // Push current + const resultStream = level.result + resultStream.push(json) + resultStream._hasResult = true + + return { + // Iterator pattern + done, + value: json, + + // Additional row information + path, + } +} + +async function rsNextRaw(state) { + const { done, value, path, blobs, expands } = await rsNext(state, true) + if (done) return { done } + + const json = JSON.parse(readString(state)) + + // Convert incoming blobs into their own native Readable streams + for (const blobColumn of state.blobs) { + // Skip all blobs that are not part of this row + if (!(blobColumn in blobs)) { + state.read(2) + continue + } + + let binaryStream = new Readable({ + read() { + if (binaryStream._prefetch) { + this.push(binaryStream._prefetch) + binaryStream._prefetch = null + } + this.resume() + } + }) + readBlob(state, { + end() { binaryStream.push(null) }, + write(chunk) { + if (!binaryStream.readableDidRead) { + binaryStream._prefetch = chunk + binaryStream.pause() + return new Promise((resolve, reject) => { + binaryStream.once('error', reject) + binaryStream.once('resume', resolve) + }) + } + binaryStream.push(chunk) + } + }) + ?.catch((err) => { if (binaryStream) binaryStream.emit('error', err) }) + json[blobColumn] = binaryStream // Return delayed blob read stream or null + } + + const level = state.levels[state.levels.length - 1] + + // Expose expanded columns as recursive Readable streams + for (const expandName in expands) { + level.expands[expandName] = json[expandName] = new Readable({ + objectMode: true, + read() { } + }) + } + + return { + // Iterator pattern + done, + value, + + // Additional row information + path, + } +} + +async function rsIterator(rs, one, objectMode) { + // Raw binary data stream unparsed + const raw = rs.createBinaryStream()[Symbol.asyncIterator]() + + const blobs = rs.metadata.slice(4).map(b => b.columnName) + const levels = [ + { + index: 0, + suffix: ']', + path: '$[', + expands: {}, + }, + ] + + const state = { + rs, + levels, + blobs, + reading: 0, + writing: 0, + buffer: Buffer.allocUnsafe(0), + yields: [], + done() { + // Validate whether the current buffer is finished reading + if (this.buffer.byteLength <= this.reading) { + return raw.next().then(next => { + if (next.done || next.value.byteLength === 0) { + this.stream.push(null) + return true + } + if (this.writing) this.stream.push(this.buffer.slice(0, this.writing)) + // Update state + this.buffer = next.value + this.reading = 0 + this.writing = 0 + }) + .catch(() => { + // TODO: check whether the error is early close + return true + }) + } + }, + ensure(size) { + const totalSize = this.reading + size + if (this.buffer.byteLength >= totalSize) { + return + } + return raw.next().then(next => { + if (next.done) { + throw new Error('Trying to read more bytes than are available') + } + // Write processed buffer to stream + if (this.writing) this.stream.push(this.buffer.slice(0, this.writing)) + // Keep unread buffer and prepend to new buffer + const leftover = this.buffer.slice(this.reading) + // Update state + this.buffer = Buffer.concat([leftover, next.value]) + this.reading = 0 + this.writing = 0 + }) + }, + read(nr) { + this.reading += nr + }, + write(length, encoding) { + const bytesLeft = this.buffer.byteLength - this.reading + if (bytesLeft < length) { + // Copy leftover bytes + if (encoding) { + let slice = Buffer.from(iconv.decode(this.buffer.slice(this.reading), 'cesu8'), 'binary') + this.prefetchDecodedSize = slice.byteLength + const encoded = Buffer.from(encoding.write(slice)) + if (this.writing + encoded.byteLength > this.buffer.byteLength) { + this.stream.push(this.buffer.slice(0, this.writing)) + this.stream.push(encoded) + } else { + this.buffer.copy(encoded, this.writing) // REVISIT: make sure this is the correct copy direction + this.writing += encoded.byteLength + this.stream.push(this.buffer.slice(0, this.writing)) + } + } else { + this.buffer.copyWithin(this.writing, this.reading) + this.writing += bytesLeft + this.stream.push(this.buffer.slice(0, this.writing)) + } + + return raw.next().then(next => { + length = length - bytesLeft + if (next.done) { + throw new Error('Trying to read more byte then are available') + } + // Update state + this.buffer = next.value + this.reading = 0 + this.writing = 0 + return this.write(length, encoding) + }) + } + if (encoding) { + let slice = Buffer.from(iconv.decode(this.buffer.slice(this.reading, this.reading + length), 'cesu8'), 'binary') + this.prefetchDecodedSize = slice.byteLength + const encoded = Buffer.from(encoding.write(slice)) + const nextWriting = this.writing + encoded.byteLength + const nextReading = this.reading + length + if (nextWriting > this.buffer.byteLength || nextWriting > nextReading) { + this.stream.push(this.buffer.slice(0, this.writing)) + this.stream.push(encoded) + this.buffer = this.buffer.slice(nextReading) + this.reading = 0 + this.writing = 0 + } else { + this.buffer.copy(encoded, this.writing) // REVISIT: make sure this is the correct copy direction + this.writing += encoded.byteLength + this.reading += length + } + } else { + this.buffer.copyWithin(this.writing, this.reading, this.reading + length) + this.writing += length + this.reading += length + } + }, + inject(str) { + if (str == null) return + str = Buffer.from(str) + if (this.writing + str.byteLength > this.reading) { + this.stream.push(this.buffer.slice(0, this.writing)) + this.stream.push(str) + this.buffer = this.buffer.slice(this.reading) + this.writing = 0 + this.reading = 0 + return + } + str.copy(this.buffer, this.writing) + this.writing += str.byteLength + }, + slice(length) { + const ens = this.ensure(length) + if (ens) return ens.then(() => this.slice(length)) + const ret = this.buffer.slice(this.reading, this.reading + length) + this.reading += length + return ret + }, + } + + // Mostly ignore buffer manipulation for objectMode + if (objectMode) { + state.write = function write(length, encoding) { + let slice = this.buffer.slice(this.reading, this.reading + length) + this.prefetchDecodedSize = slice.byteLength + this.reading += length + return encoding.write(slice) + } + state.inject = function inject() { } + } + + const stream = new Readable({ + objectMode, + async read() { + while (true) { + let result = await (objectMode ? rsNextObjectMode(state) : rsNextRaw(state)) + if (result.done) { return this.push(null) } + } + }, + // Clean up current state + end() { + state.inject( + levels + .reverse() + .map(l => l.suffix) + .join(''), + ) + + if (state.writing) { + state.yields.push(state.buffer.slice(0, state.writing)) + } + + rs.close() + } + }) + levels[0].result = state.stream = stream + state.streams = [state.stream] + + let isDone = state.done() + if (isDone) { + isDone = await isDone + } + + return stream +} + +async function* rsIteratorRaw(rs, one) { // Raw binary data stream unparsed const raw = rs.createBinaryStream()[Symbol.asyncIterator]() @@ -233,7 +592,7 @@ async function* rsIterator(rs, one) { this.yields.push(this.buffer.slice(0, this.writing)) this.yields.push(encoded) } else { - this.buffer.copy(encoded, this.writing) + this.buffer.copy(encoded, this.writing) // REVISIT: make sure this is the correct copy direction this.writing += encoded.byteLength this.yields.push(this.buffer.slice(0, this.writing)) } @@ -268,7 +627,7 @@ async function* rsIterator(rs, one) { this.reading = 0 this.writing = 0 } else { - this.buffer.copy(encoded, this.writing) + this.buffer.copy(encoded, this.writing) // REVISIT: make sure this is the correct copy direction this.writing += encoded.byteLength this.reading += length } @@ -279,6 +638,7 @@ async function* rsIterator(rs, one) { } }, inject(str) { + if (str == null) return str = Buffer.from(str) if (this.writing + str.byteLength > this.reading) { this.yields.push(this.buffer.slice(0, this.writing)) diff --git a/test/scenarios/bookshop/read.test.js b/test/scenarios/bookshop/read.test.js index 9b60fb492..b73a29855 100644 --- a/test/scenarios/bookshop/read.test.js +++ b/test/scenarios/bookshop/read.test.js @@ -295,4 +295,34 @@ describe('Bookshop - Read', () => { return expect(cds.db.run(query)).to.be.rejectedWith(/joins must specify the selected columns/) }) + it.only('Object stream', async () => { + try { + const fs = require('fs') + const path = require('path') + const streamConsumers = require('stream/consumers') + let data = fs.createReadStream(path.join(__dirname, '../../../sqlite/test/general/samples/test.jpg')) + const originalImageData = fs.readFileSync(path.join(__dirname, '../../../sqlite/test/general/samples/test.jpg')) + + const { Books } = cds.entities('sap.capire.bookshop') + + await UPDATE(Books).with({ image: data }).where({ val: true }) + + const query = SELECT(['*', { ref: ['image'] }, { ref: ['author'], expand: ['*'] }]).from(Books) + const req = new cds.Request({ query, iterator: true }) + await cds.tx(async tx => { + const stream = await tx.dispatch(req) + for await (const row of stream) { + const buffer = await streamConsumers.buffer(row.image) + if (originalImageData.compare(buffer)) throw new Error('Blob stream does not contain the original data') + for await (const author of row.author) { + debugger + } + debugger + } + }) + } catch (err) { + debugger + } + }) + }) From 56a60bfe66489ec8bf8c53f7703dd63e517c46e5 Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Thu, 27 Jun 2024 11:47:00 +0200 Subject: [PATCH 06/16] Change test to use pipeline --- hana/lib/drivers/hdb.js | 21 +++++--- test/scenarios/bookshop/read.test.js | 71 ++++++++++++++++++++++++---- 2 files changed, 77 insertions(+), 15 deletions(-) diff --git a/hana/lib/drivers/hdb.js b/hana/lib/drivers/hdb.js index 5a18e9f7e..9516a9929 100644 --- a/hana/lib/drivers/hdb.js +++ b/hana/lib/drivers/hdb.js @@ -247,15 +247,15 @@ async function rsNextObjectMode(state) { } }) state.streams.push(stream) - stream.once('end', () => { - state.streams = state.streams.filter(a => a !== stream) + stream.once('end', function () { + state.streams.filter(a => a !== stream) }) } // Push current const resultStream = level.result resultStream.push(json) - resultStream._hasResult = true + resultStream._reading-- return { // Iterator pattern @@ -355,7 +355,8 @@ async function rsIterator(rs, one, objectMode) { if (this.buffer.byteLength <= this.reading) { return raw.next().then(next => { if (next.done || next.value.byteLength === 0) { - this.stream.push(null) + // yield for raw mode + handleLevel(this.levels, this.levels[0].path, {}) return true } if (this.writing) this.stream.push(this.buffer.slice(0, this.writing)) @@ -365,6 +366,7 @@ async function rsIterator(rs, one, objectMode) { this.writing = 0 }) .catch(() => { + handleLevel(this.levels, this.levels[0].path, {}) // TODO: check whether the error is early close return true }) @@ -486,10 +488,17 @@ async function rsIterator(rs, one, objectMode) { const stream = new Readable({ objectMode, async read() { - while (true) { + if (this._running) { + this._reading++ + return + } + this._running = true + this._reading = 1 + while (this._reading > 0) { let result = await (objectMode ? rsNextObjectMode(state) : rsNextRaw(state)) - if (result.done) { return this.push(null) } + if (result.done) return this.push(null) } + this._running = false }, // Clean up current state end() { diff --git a/test/scenarios/bookshop/read.test.js b/test/scenarios/bookshop/read.test.js index b73a29855..85be666c9 100644 --- a/test/scenarios/bookshop/read.test.js +++ b/test/scenarios/bookshop/read.test.js @@ -1,3 +1,5 @@ +const { Readable, Writable } = require('stream') +const { pipeline } = require('stream/promises') const cds = require('../../cds.js') const bookshop = cds.utils.path.resolve(__dirname, '../../bookshop') @@ -305,23 +307,74 @@ describe('Bookshop - Read', () => { const { Books } = cds.entities('sap.capire.bookshop') - await UPDATE(Books).with({ image: data }).where({ val: true }) + let i = 1000 + const gen = function* () { + yield `[{"ID":${i++},"title":"${i}","author_ID":101,"genre_ID":11}` + for (; i < 100000; i++) { + yield `,{"ID":${i++},"title":"${i}","author_ID":101,"genre_ID":11}` + } + yield ']' + } + + const withImage = false + const withExpands = true + + await INSERT(Readable.from(gen(), { objectMode: false })).into(Books) + if (withImage) await UPDATE(Books).with({ image: data }).where({ val: true }) + + process.stdout.write(`DEPLOYED\n`) - const query = SELECT(['*', { ref: ['image'] }, { ref: ['author'], expand: ['*'] }]).from(Books) + const query = SELECT([ + '*', + ...(withImage ? [{ ref: ['image'] }] : []), + ...(withExpands ? [{ ref: ['author'], expand: ['*'] }, { ref: ['genre'], expand: ['*'] }] : []), + ]).from(Books) const req = new cds.Request({ query, iterator: true }) + await cds.tx(async tx => { + let rows = 0 + const s = performance.now() + const stream = await tx.dispatch(req) - for await (const row of stream) { - const buffer = await streamConsumers.buffer(row.image) - if (originalImageData.compare(buffer)) throw new Error('Blob stream does not contain the original data') - for await (const author of row.author) { - debugger - } - debugger + + const devNull = function () { + return new Writable({ + objectMode: true, write(chunk, encoding, callback) { + rows++ + callback() + } + }) } + + await pipeline( + stream, + async function* (source) { + for await (const row of source) { + + if (withImage) { + const buffer = await streamConsumers.buffer(row.image) + if (originalImageData.compare(buffer)) throw new Error('Blob stream does not contain the original data') + } + + if (withExpands) { + Promise.all([ + pipeline(row.genre, devNull()), + pipeline(row.author, devNull()), + ]) + } + + yield row + } + }, + devNull() + ) + + const dur = performance.now() - s + process.stdout.write(`Duration: ${dur}ms Rows: ${rows} (${rows / dur} rows/ms)\n`) }) } catch (err) { debugger + throw err } }) From 899fbc631cca5cd4c0f0f97c119141e694591697 Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Thu, 27 Jun 2024 15:26:48 +0200 Subject: [PATCH 07/16] Add raw streaming support --- hana/lib/drivers/hdb.js | 59 ++++++------------ test/scenarios/bookshop/read.test.js | 92 +++++++++++++++------------- 2 files changed, 67 insertions(+), 84 deletions(-) diff --git a/hana/lib/drivers/hdb.js b/hana/lib/drivers/hdb.js index 9516a9929..7260b830f 100644 --- a/hana/lib/drivers/hdb.js +++ b/hana/lib/drivers/hdb.js @@ -268,12 +268,12 @@ async function rsNextObjectMode(state) { } async function rsNextRaw(state) { - const { done, value, path, blobs, expands } = await rsNext(state, true) + const { done, value, path, blobs } = await rsNext(state) if (done) return { done } - const json = JSON.parse(readString(state)) + // Read and write JSON blob data + let hasProperties = value > 2 - // Convert incoming blobs into their own native Readable streams for (const blobColumn of state.blobs) { // Skip all blobs that are not part of this row if (!(blobColumn in blobs)) { @@ -281,43 +281,17 @@ async function rsNextRaw(state) { continue } - let binaryStream = new Readable({ - read() { - if (binaryStream._prefetch) { - this.push(binaryStream._prefetch) - binaryStream._prefetch = null - } - this.resume() - } - }) - readBlob(state, { - end() { binaryStream.push(null) }, - write(chunk) { - if (!binaryStream.readableDidRead) { - binaryStream._prefetch = chunk - binaryStream.pause() - return new Promise((resolve, reject) => { - binaryStream.once('error', reject) - binaryStream.once('resume', resolve) - }) - } - binaryStream.push(chunk) - } - }) - ?.catch((err) => { if (binaryStream) binaryStream.emit('error', err) }) - json[blobColumn] = binaryStream // Return delayed blob read stream or null - } + if (hasProperties) state.inject(',') + hasProperties = true + state.inject(`${JSON.stringify(blobColumn)}:`) - const level = state.levels[state.levels.length - 1] - - // Expose expanded columns as recursive Readable streams - for (const expandName in expands) { - level.expands[expandName] = json[expandName] = new Readable({ - objectMode: true, - read() { } - }) + const blobLength = readBlob(state, new StringDecoder('base64')) + if (typeof blobLength !== 'number') await blobLength } + const level = state.levels.at(-1) + level.hasProperties = hasProperties + return { // Iterator pattern done, @@ -350,10 +324,15 @@ async function rsIterator(rs, one, objectMode) { writing: 0, buffer: Buffer.allocUnsafe(0), yields: [], + next() { + const ret = this._prefetch || raw.next() + this._prefetch = raw.next() + return ret + }, done() { // Validate whether the current buffer is finished reading if (this.buffer.byteLength <= this.reading) { - return raw.next().then(next => { + return this.next().then(next => { if (next.done || next.value.byteLength === 0) { // yield for raw mode handleLevel(this.levels, this.levels[0].path, {}) @@ -377,7 +356,7 @@ async function rsIterator(rs, one, objectMode) { if (this.buffer.byteLength >= totalSize) { return } - return raw.next().then(next => { + return this.next().then(next => { if (next.done) { throw new Error('Trying to read more bytes than are available') } @@ -416,7 +395,7 @@ async function rsIterator(rs, one, objectMode) { this.stream.push(this.buffer.slice(0, this.writing)) } - return raw.next().then(next => { + return this.next().then(next => { length = length - bytesLeft if (next.done) { throw new Error('Trying to read more byte then are available') diff --git a/test/scenarios/bookshop/read.test.js b/test/scenarios/bookshop/read.test.js index 85be666c9..56312e2c3 100644 --- a/test/scenarios/bookshop/read.test.js +++ b/test/scenarios/bookshop/read.test.js @@ -310,14 +310,14 @@ describe('Bookshop - Read', () => { let i = 1000 const gen = function* () { yield `[{"ID":${i++},"title":"${i}","author_ID":101,"genre_ID":11}` - for (; i < 100000; i++) { - yield `,{"ID":${i++},"title":"${i}","author_ID":101,"genre_ID":11}` + for (; i < 0; i++) { + yield `,{"ID":${i},"title":"${i}","author_ID":101,"genre_ID":11}` } yield ']' } const withImage = false - const withExpands = true + const withExpands = false await INSERT(Readable.from(gen(), { objectMode: false })).into(Books) if (withImage) await UPDATE(Books).with({ image: data }).where({ val: true }) @@ -329,49 +329,53 @@ describe('Bookshop - Read', () => { ...(withImage ? [{ ref: ['image'] }] : []), ...(withExpands ? [{ ref: ['author'], expand: ['*'] }, { ref: ['genre'], expand: ['*'] }] : []), ]).from(Books) - const req = new cds.Request({ query, iterator: true }) - - await cds.tx(async tx => { - let rows = 0 - const s = performance.now() - - const stream = await tx.dispatch(req) - - const devNull = function () { - return new Writable({ - objectMode: true, write(chunk, encoding, callback) { - rows++ - callback() - } - }) - } - - await pipeline( - stream, - async function* (source) { - for await (const row of source) { - - if (withImage) { - const buffer = await streamConsumers.buffer(row.image) - if (originalImageData.compare(buffer)) throw new Error('Blob stream does not contain the original data') + const req = new cds.Request({ query, iterator: false, hasPostProcessing: false }) + + const proms = [] + for (let x = 0; x < 20; x++) { + proms.push(cds.tx(async tx => { + let rows = i - 1000 + 4 + const s = performance.now() + + const stream = await tx.dispatch(req) + + let rawResult = [] + const devNull = function () { + return new Writable({ + objectMode: true, write(chunk, encoding, callback) { + rawResult.push(chunk) + callback() } - - if (withExpands) { - Promise.all([ - pipeline(row.genre, devNull()), - pipeline(row.author, devNull()), - ]) + }) + } + + await pipeline( + stream, + async function* (source) { + for await (const row of source) { + /* + if (withImage) { + const buffer = await streamConsumers.buffer(row.image) + if (originalImageData.compare(buffer)) throw new Error('Blob stream does not contain the original data') + } + + if (withExpands) { + await Promise.all([ + pipeline(row.genre, devNull()), + pipeline(row.author, devNull()), + ]) + } + */ + yield Buffer.isBuffer(row) ? row : JSON.stringify(row) } - - yield row - } - }, - devNull() - ) - - const dur = performance.now() - s - process.stdout.write(`Duration: ${dur}ms Rows: ${rows} (${rows / dur} rows/ms)\n`) - }) + }, + devNull() + ) + const dur = performance.now() - s + process.stdout.write(`Duration: ${dur}ms Rows: ${rows} (${rows / dur} rows/ms) (memory: ${process.memoryUsage().heapUsed / 1024 / 1024}MiB)\n`) + })) + } + await Promise.allSettled(proms) } catch (err) { debugger throw err From e66e94037ab15825a5c44f1da2edf380e4fa99b7 Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Thu, 4 Jul 2024 10:10:19 +0200 Subject: [PATCH 08/16] Optimize hdb resultset streaming --- hana/lib/drivers/hdb.js | 329 +++++++--------------------------------- 1 file changed, 59 insertions(+), 270 deletions(-) diff --git a/hana/lib/drivers/hdb.js b/hana/lib/drivers/hdb.js index 007be61e3..358838c03 100644 --- a/hana/lib/drivers/hdb.js +++ b/hana/lib/drivers/hdb.js @@ -155,34 +155,30 @@ class HDBDriver extends driver { } } -function* echoStream(ret) { - yield ret -} - -async function rsNext(state, objectMode) { +function rsNext(state, objectMode) { let done = state.done() - if (done) { - done = await done + if (done?.then) return done.then(done => { if (done) return { done } - } + return rsNext(state, objectMode) + }) + if (done) return { done } - const _path = readString(state) - const path = (typeof _path === 'string' ? _path : await _path).toString('utf-8') - const _blobs = readString(state) - const blobs = JSON.parse(typeof _blobs === 'string' ? _blobs : await _blobs) - const _expands = readString(state) - const expands = JSON.parse(typeof _expands === 'string' ? _expands : await _expands) + let _path = readString(state) + // if (_path.then) _path = await _path + const path = (_path) + let _blobs = readString(state) + // if (_blobs.then) _blobs = await _blobs + const blobs = _blobs.length === 2 ? {} : JSON.parse(_blobs) + let _expands = readString(state) + // if (_expands.then) _expands = await _expands + const expands = _expands.length === 2 ? {} : JSON.parse(_expands) - handleLevel(state.levels, path, expands) + state.inject(handleLevel(state.levels, path, expands)) // REVISIT: allow streaming with both NVARCHAR and NCLOB // Read JSON blob data const value = readString(state, !objectMode) - done = state.done() - if (done) { - done = await done - } return { // Iterator pattern done, @@ -195,9 +191,11 @@ async function rsNext(state, objectMode) { } } -async function rsNextObjectMode(state) { - let { done, value, path, blobs, expands } = await rsNext(state, true) - if (done) return { done } +function rsNextObjectMode(state, next) { + if (!next) next = rsNext(state, true) + if (next.then) return next.then(next => rsNextObjectMode(state, next)) + const { done, value, path, blobs, expands } = next + if (done) return next const json = JSON.parse(value) @@ -236,20 +234,16 @@ async function rsNextObjectMode(state) { json[blobColumn] = binaryStream // Return delayed blob read stream or null } - const level = state.levels[state.levels.length - 1] + const level = state.levels.at(-1) // Expose expanded columns as recursive Readable streams for (const expandName in expands) { - const stream = level.expands[expandName] = json[expandName] = new Readable({ + level.expands[expandName] = json[expandName] = new Readable({ objectMode: true, read() { state.stream.resume() } }) - state.streams.push(stream) - stream.once('end', function () { - state.streams.filter(a => a !== stream) - }) } // Push current @@ -267,18 +261,23 @@ async function rsNextObjectMode(state) { } } -async function rsNextRaw(state) { - const { done, value, path, blobs } = await rsNext(state) - if (done) return { done } +function rsNextRaw(state, next) { + if (!next) next = rsNext(state) + if (next.then) return next.then(next => rsNextRaw(state, next)) + const { done, value, path, blobs } = next + if (done) return next // Read and write JSON blob data let hasProperties = value > 2 - for (const blobColumn of state.blobs) { + const nextBlob = function (state, i = 0) { + if (state.blobs.length <= i) return + const blobColumn = state.blobs[i] + // Skip all blobs that are not part of this row if (!(blobColumn in blobs)) { state.read(2) - continue + return nextBlob(state, i + 1) } if (hasProperties) state.inject(',') @@ -286,9 +285,11 @@ async function rsNextRaw(state) { state.inject(`${JSON.stringify(blobColumn)}:`) const blobLength = readBlob(state, new StringDecoder('base64')) - if (typeof blobLength !== 'number') await blobLength + if (blobLength.then) return blobLength.then(() => nextBlob(state, i + 1)) } + nextBlob(state, 0) + const level = state.levels.at(-1) level.hasProperties = hasProperties @@ -310,7 +311,7 @@ async function rsIterator(rs, one, objectMode) { const levels = [ { index: 0, - suffix: ']', + suffix: one ? '' : ']', path: '$[', expands: {}, }, @@ -335,18 +336,20 @@ async function rsIterator(rs, one, objectMode) { return this.next().then(next => { if (next.done || next.value.byteLength === 0) { // yield for raw mode - handleLevel(this.levels, this.levels[0].path, {}) + this.inject(handleLevel(this.levels, '$', {})) + if (this.writing) this.stream.push(this.buffer.subarray(0, this.writing)) return true } - if (this.writing) this.stream.push(this.buffer.slice(0, this.writing)) + if (this.writing) this.stream.push(this.buffer.subarray(0, this.writing)) // Update state this.buffer = next.value this.reading = 0 this.writing = 0 }) .catch(() => { - handleLevel(this.levels, this.levels[0].path, {}) // TODO: check whether the error is early close + this.inject(handleLevel(this.levels, '$', {})) + if (this.writing) this.stream.push(this.buffer.subarray(0, this.writing)) return true }) } @@ -361,9 +364,9 @@ async function rsIterator(rs, one, objectMode) { throw new Error('Trying to read more bytes than are available') } // Write processed buffer to stream - if (this.writing) this.stream.push(this.buffer.slice(0, this.writing)) + if (this.writing) this.stream.push(this.buffer.subarray(0, this.writing)) // Keep unread buffer and prepend to new buffer - const leftover = this.buffer.slice(this.reading) + const leftover = this.buffer.subarray(this.reading) // Update state this.buffer = Buffer.concat([leftover, next.value]) this.reading = 0 @@ -378,21 +381,21 @@ async function rsIterator(rs, one, objectMode) { if (bytesLeft < length) { // Copy leftover bytes if (encoding) { - let slice = Buffer.from(iconv.decode(this.buffer.slice(this.reading), 'cesu8'), 'binary') + let slice = Buffer.from(iconv.decode(this.buffer.subarray(this.reading), 'cesu8'), 'binary') this.prefetchDecodedSize = slice.byteLength const encoded = Buffer.from(encoding.write(slice)) if (this.writing + encoded.byteLength > this.buffer.byteLength) { - this.stream.push(this.buffer.slice(0, this.writing)) + this.stream.push(this.buffer.subarray(0, this.writing)) this.stream.push(encoded) } else { this.buffer.copy(encoded, this.writing) // REVISIT: make sure this is the correct copy direction this.writing += encoded.byteLength - this.stream.push(this.buffer.slice(0, this.writing)) + this.stream.push(this.buffer.subarray(0, this.writing)) } } else { this.buffer.copyWithin(this.writing, this.reading) this.writing += bytesLeft - this.stream.push(this.buffer.slice(0, this.writing)) + this.stream.push(this.buffer.subarray(0, this.writing)) } return this.next().then(next => { @@ -408,15 +411,15 @@ async function rsIterator(rs, one, objectMode) { }) } if (encoding) { - let slice = Buffer.from(iconv.decode(this.buffer.slice(this.reading, this.reading + length), 'cesu8'), 'binary') + let slice = Buffer.from(iconv.decode(this.buffer.subarray(this.reading, this.reading + length), 'cesu8'), 'binary') this.prefetchDecodedSize = slice.byteLength const encoded = Buffer.from(encoding.write(slice)) const nextWriting = this.writing + encoded.byteLength const nextReading = this.reading + length if (nextWriting > this.buffer.byteLength || nextWriting > nextReading) { - this.stream.push(this.buffer.slice(0, this.writing)) + this.stream.push(this.buffer.subarray(0, this.writing)) this.stream.push(encoded) - this.buffer = this.buffer.slice(nextReading) + this.buffer = this.buffer.subarray(nextReading) this.reading = 0 this.writing = 0 } else { @@ -434,9 +437,9 @@ async function rsIterator(rs, one, objectMode) { if (str == null) return str = Buffer.from(str) if (this.writing + str.byteLength > this.reading) { - this.stream.push(this.buffer.slice(0, this.writing)) + this.stream.push(this.buffer.subarray(0, this.writing)) this.stream.push(str) - this.buffer = this.buffer.slice(this.reading) + this.buffer = this.buffer.subarray(this.reading) this.writing = 0 this.reading = 0 return @@ -447,7 +450,7 @@ async function rsIterator(rs, one, objectMode) { slice(length) { const ens = this.ensure(length) if (ens) return ens.then(() => this.slice(length)) - const ret = this.buffer.slice(this.reading, this.reading + length) + const ret = this.buffer.subarray(this.reading, this.reading + length) this.reading += length return ret }, @@ -473,8 +476,11 @@ async function rsIterator(rs, one, objectMode) { } this._running = true this._reading = 1 + + const _next = objectMode ? rsNextObjectMode.bind(null, state) : rsNextRaw.bind(null, state) while (this._reading > 0) { - let result = await (objectMode ? rsNextObjectMode(state) : rsNextRaw(state)) + let result = _next() + if (result.then) result = await result if (result.done) return this.push(null) } this._running = false @@ -496,229 +502,12 @@ async function rsIterator(rs, one, objectMode) { } }) levels[0].result = state.stream = stream - state.streams = [state.stream] - - let isDone = state.done() - if (isDone) { - isDone = await isDone - } - - return stream -} - -async function* rsIteratorRaw(rs, one) { - // Raw binary data stream unparsed - const raw = rs.createBinaryStream()[Symbol.asyncIterator]() - - const nativeBlobs = rs.metadata.slice(4).map(b => b.columnName) - const levels = [ - { - index: 0, - suffix: one ? '' : ']', - path: '$[', - expands: {}, - }, - ] - - const state = { - rs, - levels, - reading: 0, - writing: 0, - buffer: Buffer.allocUnsafe(0), - yields: [], - done() { - // Validate whether the current buffer is finished reading - if (this.buffer.byteLength <= this.reading) { - return raw.next().then(next => { - if (next.done || next.value.byteLength === 0) { - return true - } - this.yields.push(this.buffer.slice(0, this.writing)) - // Update state - this.buffer = next.value - this.reading = 0 - this.writing = 0 - }) - .catch(() => { - // TODO: check whether the error is early close - return true - }) - } - }, - ensure(size) { - const totalSize = this.reading + size - if (this.buffer.byteLength >= totalSize) { - return - } - return raw.next().then(next => { - if (next.done) { - throw new Error('Trying to read more bytes than are available') - } - // Write processed buffer to stream - if (this.writing) this.yields.push(this.buffer.slice(0, this.writing)) - // Keep unread buffer and prepend to new buffer - const leftover = this.buffer.slice(this.reading) - // Update state - this.buffer = Buffer.concat([leftover, next.value]) - this.reading = 0 - this.writing = 0 - }) - }, - read(nr) { - this.reading += nr - }, - write(length, encoding) { - const bytesLeft = this.buffer.byteLength - this.reading - if (bytesLeft < length) { - // Copy leftover bytes - if (encoding) { - let slice = Buffer.from(iconv.decode(this.buffer.slice(this.reading), 'cesu8'), 'binary') - this.prefetchDecodedSize = slice.byteLength - const encoded = Buffer.from(encoding.write(slice)) - if (this.writing + encoded.byteLength > this.buffer.byteLength) { - this.yields.push(this.buffer.slice(0, this.writing)) - this.yields.push(encoded) - } else { - this.buffer.copy(encoded, this.writing) // REVISIT: make sure this is the correct copy direction - this.writing += encoded.byteLength - this.yields.push(this.buffer.slice(0, this.writing)) - } - } else { - this.buffer.copyWithin(this.writing, this.reading) - this.writing += bytesLeft - this.yields.push(this.buffer.slice(0, this.writing)) - } - - return raw.next().then(next => { - length = length - bytesLeft - if (next.done) { - throw new Error('Trying to read more byte then are available') - } - // Update state - this.buffer = next.value - this.reading = 0 - this.writing = 0 - return this.write(length, encoding) - }) - } - if (encoding) { - let slice = Buffer.from(iconv.decode(this.buffer.slice(this.reading, this.reading + length), 'cesu8'), 'binary') - this.prefetchDecodedSize = slice.byteLength - const encoded = Buffer.from(encoding.write(slice)) - const nextWriting = this.writing + encoded.byteLength - const nextReading = this.reading + length - if (nextWriting > this.buffer.byteLength || nextWriting > nextReading) { - this.yields.push(this.buffer.slice(0, this.writing)) - this.yields.push(encoded) - this.buffer = this.buffer.slice(nextReading) - this.reading = 0 - this.writing = 0 - } else { - this.buffer.copy(encoded, this.writing) // REVISIT: make sure this is the correct copy direction - this.writing += encoded.byteLength - this.reading += length - } - } else { - this.buffer.copyWithin(this.writing, this.reading, this.reading + length) - this.writing += length - this.reading += length - } - }, - inject(str) { - if (str == null) return - str = Buffer.from(str) - if (this.writing + str.byteLength > this.reading) { - this.yields.push(this.buffer.slice(0, this.writing)) - this.yields.push(str) - this.buffer = this.buffer.slice(this.reading) - this.writing = 0 - this.reading = 0 - return - } - str.copy(this.buffer, this.writing) - this.writing += str.byteLength - }, - slice(length) { - const ens = this.ensure(length) - if (ens) return ens.then(() => this.slice(length)) - const ret = this.buffer.slice(this.reading, this.reading + length) - this.reading += length - return ret - }, - } - if (!one) { + if (!objectMode && !one) { state.inject('[') } - let isDone = state.done() - if (isDone) { - isDone = await isDone - } - while (!isDone) { - // Made all functions possible promises giving a 5x speed up - const _path = readString(state) - const path = (typeof _path === 'string' ? _path : await _path).toString('utf-8') - const _blobs = readString(state) - const blobs = JSON.parse(typeof _blobs === 'string' ? _blobs : await _blobs) - const _expands = readString(state) - const expands = JSON.parse(typeof _expands === 'string' ? _expands : await _expands) - - state.inject(handleLevel(levels, path, expands)) - - // REVISIT: allow streaming with both NVARCHAR and NCLOB - // Read and write JSON blob data - const jsonLength = readString(state, true) - let hasProperties = (typeof jsonLength === 'number' ? jsonLength : await jsonLength) > 2 - - for (const blobColumn of nativeBlobs) { - // Skip all blobs that are not part of this row - if (!(blobColumn in blobs)) { - state.read(2) - continue - } - - if (hasProperties) state.inject(',') - hasProperties = true - state.inject(`${JSON.stringify(blobColumn)}:`) - - const blobLength = readBlob(state, new StringDecoder('base64')) - if (typeof blobLength !== 'number') await blobLength - } - - const level = levels[levels.length - 1] - level.hasProperties = hasProperties - - for (const y of state.yields) { - if (y.byteLength) { - yield y - } - } - state.yields = [] - - isDone = state.done() - if (isDone) { - isDone = await isDone - } - } - - state.inject( - levels - .reverse() - .map(l => l.suffix) - .join(''), - ) - if (state.writing) { - state.yields.push(state.buffer.slice(0, state.writing)) - } - - for (const y of state.yields) { - if (y.byteLength) { - yield y - } - } - rs.close() + return stream } const readString = function (state, isJson = false) { From 6906320de7f9a57544e0bb3f8fa4d4d38d4e285e Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Fri, 5 Jul 2024 15:17:07 +0200 Subject: [PATCH 09/16] Unify resultset streaming between hana-client and hdb --- hana/lib/drivers/hana-client.js | 141 +++++++++----------- hana/lib/drivers/hdb.js | 230 ++++++-------------------------- hana/lib/drivers/stream.js | 160 ++++++++++++++++++++++ 3 files changed, 266 insertions(+), 265 deletions(-) create mode 100644 hana/lib/drivers/stream.js diff --git a/hana/lib/drivers/hana-client.js b/hana/lib/drivers/hana-client.js index 5f0f72e91..166128aac 100644 --- a/hana/lib/drivers/hana-client.js +++ b/hana/lib/drivers/hana-client.js @@ -1,7 +1,9 @@ const { Readable, Stream } = require('stream') +const { pipeline } = require('stream/promises') const hdb = require('@sap/hana-client') const { driver, prom, handleLevel } = require('./base') +const { resultSetStream } = require('./stream') const streamUnsafe = false @@ -131,7 +133,7 @@ class HANAClientDriver extends driver { return this._getResultForProcedure(rows, outParameters, stmt) } - ret.stream = async (values, one) => { + ret.stream = async (values, one, objectMode) => { const stmt = await ret._prep values = Array.isArray(values) ? values : [] // Uses the native exec method instead of executeQuery to initialize a full stream @@ -156,7 +158,7 @@ class HANAClientDriver extends driver { if (rs.isNull(0)) return null return Readable.from(streamBlob(rs, undefined, 0), { objectMode: false }) } - return Readable.from(rsIterator(rs, one), { objectMode: false }) + return rsIterator(rs, one, objectMode) } return ret } @@ -229,10 +231,13 @@ class HANAClientDriver extends driver { HANAClientDriver.pool = true -async function* rsIterator(rs, one) { - const next = prom(rs, 'next') // () => rs.next() - const getValue = prom(rs, 'getValue') // nr => rs.getValue(nr) - const getData = prom(rs, 'getData') // (nr, pos, buf, zero, bufSize) => rs.getData(nr, pos, buf, zero, bufSize) // +async function rsIterator(rs, one, objectMode) { + rs._rowPosition = -1 + rs.nextAsync = prom(rs, 'next') + rs.getValueAsync = prom(rs, 'getValue') + rs.getValueAsync = prom(rs, 'getData') + + const blobs = rs.getColumnInfo().slice(4).map(b => b.columnName) const levels = [ { index: 0, @@ -241,83 +246,67 @@ async function* rsIterator(rs, one) { expands: {}, }, ] - - const binaryBuffer = new Buffer.alloc(1 << 16) - - const blobColumns = {} - rs.getColumnInfo() - .slice(4) - .forEach((c, i) => { - blobColumns[c.columnName] = i + 4 - }) - - if (!one) { - yield '[' - } - - let buffer = '' - // Load next row of the result set (starts before the first row) - while (await next()) { - const values = await Promise.all([getValue(0), getValue(1), getValue(2)]) - - const [path, _blobs, _expands] = values - const expands = JSON.parse(_expands) - const blobs = JSON.parse(_blobs) - - yield handleLevel(levels, path, expands) - - let hasProperties = false - let jsonPosition = 0 - while (true) { - const read = await getData(3, jsonPosition, binaryBuffer, 0, binaryBuffer.byteLength) - if (read < binaryBuffer.byteLength) { - if (read > 2) hasProperties = true - // Pipe json stream.slice(0,-1) removing the } to keep the object open - yield binaryBuffer.slice(0, read - 1).toString('utf-8') - break - } - jsonPosition += read - yield binaryBuffer.toString('utf-8') - } - - for (const key of Object.keys(blobs)) { - if (hasProperties) buffer += ',' - hasProperties = true - buffer += `${JSON.stringify(key)}:` - - const columnIndex = blobColumns[key] - if (rs.isNull(columnIndex)) { - buffer += 'null' - continue + const state = { + rs, + levels, + blobs, + columnIndex: 0, + binaryBuffer: new Buffer.alloc(1 << 16), + done() { + this.columnIndex = 0 + this.rs._rowPosition++ + return this.rs.nextCanBlock() + ? this.rs.nextAsync().then(a => !a) + : !this.rs.next() + }, + inject(str) { + if (str == null) return + this.stream.push(str) + }, + read() { + this.columnIndex++ + }, + readString() { + const index = this.columnIndex++ + if (index === 3) { + const _inject = str => { + this.inject(str.slice(0, -1)) + return str.length + } + return this.rs.getValuesCanBlock() + ? this.rs.getValueAsync(index).then(_inject) + : _inject(this.rs.getValue(index)) } - - buffer += '"' - yield buffer - buffer = '' - - const stream = Readable.from(streamBlob(rs, undefined, columnIndex, binaryBuffer), { objectMode: false }) + return this.rs.getValuesCanBlock() + ? this.rs.getValueAsync(index) + : this.rs.getValue(index) + }, + readBlob() { + const index = this.columnIndex++ + const stream = Readable.from(streamBlob(this.rs, undefined, index, this.binaryBuffer), { objectMode: false }) stream.setEncoding('base64') - for await (const chunk of stream) { - yield chunk - } - buffer += '"' + this.stream.push('"') + return pipeline(stream, this.stream, { end: false }).then(() => { this.stream.push('"') }) } + } - if (buffer) { - yield buffer - buffer = '' + if (objectMode) { + state.inject = function inject() { } + state.readString = function readString() { + const index = this.columnIndex++ + return this.rs.getValuesCanBlock() + ? this.rs.getValueAsync(index) + : this.rs.getValue(index) + } + state.readBlob = function readBlob() { + const index = this.columnIndex++ + const stream = Readable.from(streamBlob(this.rs, rs._rowPosition, index, this.binaryBuffer), { objectMode: false }) + stream.setEncoding('base64') + return stream } - - const level = levels[levels.length - 1] - level.hasProperties = hasProperties } - // Close all left over levels - buffer += levels - .reverse() - .map(l => l.suffix) - .join('') - yield buffer + return resultSetStream(state, one, objectMode) } async function* streamBlob(rs, rowIndex = -1, columnIndex, binaryBuffer = Buffer.allocUnsafe(1 << 16)) { diff --git a/hana/lib/drivers/hdb.js b/hana/lib/drivers/hdb.js index 358838c03..04587b839 100644 --- a/hana/lib/drivers/hdb.js +++ b/hana/lib/drivers/hdb.js @@ -6,6 +6,7 @@ const hdb = require('hdb') const iconv = require('iconv-lite') const { driver, prom, handleLevel } = require('./base') +const { resultSetStream } = require('./stream') const credentialMappings = [ { old: 'certificate', new: 'ca' }, @@ -155,154 +156,6 @@ class HDBDriver extends driver { } } -function rsNext(state, objectMode) { - let done = state.done() - if (done?.then) return done.then(done => { - if (done) return { done } - return rsNext(state, objectMode) - }) - if (done) return { done } - - let _path = readString(state) - // if (_path.then) _path = await _path - const path = (_path) - let _blobs = readString(state) - // if (_blobs.then) _blobs = await _blobs - const blobs = _blobs.length === 2 ? {} : JSON.parse(_blobs) - let _expands = readString(state) - // if (_expands.then) _expands = await _expands - const expands = _expands.length === 2 ? {} : JSON.parse(_expands) - - state.inject(handleLevel(state.levels, path, expands)) - - // REVISIT: allow streaming with both NVARCHAR and NCLOB - // Read JSON blob data - const value = readString(state, !objectMode) - - return { - // Iterator pattern - done, - value, - - // Additional row information - path, - blobs, - expands, - } -} - -function rsNextObjectMode(state, next) { - if (!next) next = rsNext(state, true) - if (next.then) return next.then(next => rsNextObjectMode(state, next)) - const { done, value, path, blobs, expands } = next - if (done) return next - - const json = JSON.parse(value) - - // Convert incoming blobs into their own native Readable streams - for (const blobColumn of state.blobs) { - // Skip all blobs that are not part of this row - if (!(blobColumn in blobs)) { - state.read(2) - continue - } - - let binaryStream = new Readable({ - read() { - if (binaryStream._prefetch) { - this.push(binaryStream._prefetch) - binaryStream._prefetch = null - } - this.resume() - } - }) - readBlob(state, { - end() { binaryStream.push(null) }, - write(chunk) { - if (!binaryStream.readableDidRead) { - binaryStream._prefetch = chunk - binaryStream.pause() - return new Promise((resolve, reject) => { - binaryStream.once('error', reject) - binaryStream.once('resume', resolve) - }) - } - binaryStream.push(chunk) - } - }) - ?.catch((err) => { if (binaryStream) binaryStream.emit('error', err) }) - json[blobColumn] = binaryStream // Return delayed blob read stream or null - } - - const level = state.levels.at(-1) - - // Expose expanded columns as recursive Readable streams - for (const expandName in expands) { - level.expands[expandName] = json[expandName] = new Readable({ - objectMode: true, - read() { - state.stream.resume() - } - }) - } - - // Push current - const resultStream = level.result - resultStream.push(json) - resultStream._reading-- - - return { - // Iterator pattern - done, - value: json, - - // Additional row information - path, - } -} - -function rsNextRaw(state, next) { - if (!next) next = rsNext(state) - if (next.then) return next.then(next => rsNextRaw(state, next)) - const { done, value, path, blobs } = next - if (done) return next - - // Read and write JSON blob data - let hasProperties = value > 2 - - const nextBlob = function (state, i = 0) { - if (state.blobs.length <= i) return - const blobColumn = state.blobs[i] - - // Skip all blobs that are not part of this row - if (!(blobColumn in blobs)) { - state.read(2) - return nextBlob(state, i + 1) - } - - if (hasProperties) state.inject(',') - hasProperties = true - state.inject(`${JSON.stringify(blobColumn)}:`) - - const blobLength = readBlob(state, new StringDecoder('base64')) - if (blobLength.then) return blobLength.then(() => nextBlob(state, i + 1)) - } - - nextBlob(state, 0) - - const level = state.levels.at(-1) - level.hasProperties = hasProperties - - return { - // Iterator pattern - done, - value, - - // Additional row information - path, - } -} - async function rsIterator(rs, one, objectMode) { // Raw binary data stream unparsed const raw = rs.createBinaryStream()[Symbol.asyncIterator]() @@ -324,7 +177,8 @@ async function rsIterator(rs, one, objectMode) { reading: 0, writing: 0, buffer: Buffer.allocUnsafe(0), - yields: [], + columnIndex: 0, + // yields: [], next() { const ret = this._prefetch || raw.next() this._prefetch = raw.next() @@ -343,6 +197,7 @@ async function rsIterator(rs, one, objectMode) { if (this.writing) this.stream.push(this.buffer.subarray(0, this.writing)) // Update state this.buffer = next.value + this.columnIndex = 0 this.reading = 0 this.writing = 0 }) @@ -454,6 +309,13 @@ async function rsIterator(rs, one, objectMode) { this.reading += length return ret }, + readString() { + this.columnIndex++ + return readString(this, this.columnIndex === 4) + }, + readBlob() { + return readBlob(state, new StringDecoder('base64')) + } } // Mostly ignore buffer manipulation for objectMode @@ -465,49 +327,39 @@ async function rsIterator(rs, one, objectMode) { return encoding.write(slice) } state.inject = function inject() { } - } - - const stream = new Readable({ - objectMode, - async read() { - if (this._running) { - this._reading++ - return - } - this._running = true - this._reading = 1 - - const _next = objectMode ? rsNextObjectMode.bind(null, state) : rsNextRaw.bind(null, state) - while (this._reading > 0) { - let result = _next() - if (result.then) result = await result - if (result.done) return this.push(null) - } - this._running = false - }, - // Clean up current state - end() { - state.inject( - levels - .reverse() - .map(l => l.suffix) - .join(''), - ) - - if (state.writing) { - state.yields.push(state.buffer.slice(0, state.writing)) - } - - rs.close() + state.readString = function _readString() { + return readString(this) + } + state.readBlob = function _readBlob() { + const binaryStream = new Readable({ + read() { + if (binaryStream._prefetch) { + this.push(binaryStream._prefetch) + binaryStream._prefetch = null + } + this.resume() + } + }) + readBlob(state, { + end() { binaryStream.push(null) }, + write(chunk) { + if (!binaryStream.readableDidRead) { + binaryStream._prefetch = chunk + binaryStream.pause() + return new Promise((resolve, reject) => { + binaryStream.once('error', reject) + binaryStream.once('resume', resolve) + }) + } + binaryStream.push(chunk) + } + }) + ?.catch((err) => { if (binaryStream) binaryStream.emit('error', err) }) + return binaryStream } - }) - levels[0].result = state.stream = stream - - if (!objectMode && !one) { - state.inject('[') } - return stream + return resultSetStream(state, one, objectMode) } const readString = function (state, isJson = false) { diff --git a/hana/lib/drivers/stream.js b/hana/lib/drivers/stream.js new file mode 100644 index 000000000..89027386f --- /dev/null +++ b/hana/lib/drivers/stream.js @@ -0,0 +1,160 @@ +const { Readable } = require('stream') +const { handleLevel } = require('./base') + +function rsNext(state, cb) { + let done + if (!cb) done = state.done() + if (done?.then) return done.then(done => { + if (done) return { done } + return rsNext(state, true) + }) + if (done) return { done } + + let _path = state.readString() + // if (_path.then) _path = await _path + const path = (_path) + let _blobs = state.readString() + // if (_blobs.then) _blobs = await _blobs + const blobs = _blobs.length === 2 ? {} : JSON.parse(_blobs) + let _expands = state.readString() + // if (_expands.then) _expands = await _expands + const expands = _expands.length === 2 ? {} : JSON.parse(_expands) + + state.inject(handleLevel(state.levels, path, expands)) + + // REVISIT: allow streaming with both NVARCHAR and NCLOB + // Read JSON blob data + const value = state.readString() + + return { + // Iterator pattern + done, + value, + + // Additional row information + path, + blobs, + expands, + } +} + +function rsNextObjectMode(state, next) { + if (!next) next = rsNext(state) + if (next.then) return next.then(next => rsNextObjectMode(state, next)) + const { done, value, path, blobs, expands } = next + if (done) return next + + const json = JSON.parse(value) + + // Convert incoming blobs into their own native Readable streams + for (const blobColumn of state.blobs) { + // Skip all blobs that are not part of this row + if (!(blobColumn in blobs)) { + state.read(2) // 2 is the number of bytes to skip the column for hdb + continue + } + json[blobColumn] = state.readBlob() // Return delayed blob read stream or null + } + + const level = state.levels.at(-1) + + // Expose expanded columns as recursive Readable streams + for (const expandName in expands) { + level.expands[expandName] = json[expandName] = new Readable({ + objectMode: true, + read() { + state.stream.resume() + } + }) + } + + // Push current + const resultStream = level.result + resultStream.push(json) + resultStream._reading-- + + return { + // Iterator pattern + done, + value: json, + + // Additional row information + path, + } +} + +function rsNextRaw(state, next) { + if (!next) next = rsNext(state) + if (next.then) return next.then(next => rsNextRaw(state, next)) + const { done, value, path, blobs } = next + if (done) return next + + // Read and write JSON blob data + let hasProperties = value > 2 + + const nextBlob = function (state, i = 0) { + if (state.blobs.length <= i) return + const blobColumn = state.blobs[i] + + // Skip all blobs that are not part of this row + if (!(blobColumn in blobs)) { + state.read(2) + return nextBlob(state, i + 1) + } + + if (hasProperties) state.inject(',') + hasProperties = true + state.inject(`${JSON.stringify(blobColumn)}:`) + + const blobLength = state.readBlob() + if (blobLength.then) return blobLength.then(() => nextBlob(state, i + 1)) + } + + const _return = () => { + const level = state.levels.at(-1) + level.hasProperties = hasProperties + + return { + // Iterator pattern + done, + value, + + // Additional row information + path, + } + } + + const writeBlobs = nextBlob(state, 0) + return writeBlobs ? writeBlobs.then(_return) : _return() +} + +async function rsIterator(state, one, objectMode) { + const stream = state.stream = new Readable({ + objectMode, + async read() { + if (this._running) { + this._reading++ + return + } + this._running = true + this._reading = 1 + + const _next = objectMode ? rsNextObjectMode.bind(null, state) : rsNextRaw.bind(null, state) + while (this._reading > 0) { + let result = _next() + if (result.then) result = await result + if (result.done) return this.push(null) + } + this._running = false + } + }) + state.levels[0].result = stream + + if (!objectMode && !one) { + stream.push('[') + } + + return stream +} + +module.exports.resultSetStream = rsIterator \ No newline at end of file From 4ed567196652c7c7fcf029e8b107d5327ec4dce5 Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Fri, 5 Jul 2024 15:19:02 +0200 Subject: [PATCH 10/16] Move streaming performance test --- hana/test/perf.test.js | 3 + test/scenarios/bookshop/read.test.js | 85 ---------------------------- 2 files changed, 3 insertions(+), 85 deletions(-) create mode 100644 hana/test/perf.test.js diff --git a/hana/test/perf.test.js b/hana/test/perf.test.js new file mode 100644 index 000000000..2edb9dcd2 --- /dev/null +++ b/hana/test/perf.test.js @@ -0,0 +1,3 @@ +describe('hana', () => { + require('../../test/scenarios/bookshop/stream.perf-test') +}) diff --git a/test/scenarios/bookshop/read.test.js b/test/scenarios/bookshop/read.test.js index be0ac5b2a..f82b69fed 100644 --- a/test/scenarios/bookshop/read.test.js +++ b/test/scenarios/bookshop/read.test.js @@ -302,89 +302,4 @@ describe('Bookshop - Read', () => { return expect(cds.db.run(query)).to.be.rejectedWith(/joins must specify the selected columns/) }) - it.only('Object stream', async () => { - try { - const fs = require('fs') - const path = require('path') - const streamConsumers = require('stream/consumers') - let data = fs.createReadStream(path.join(__dirname, '../../../sqlite/test/general/samples/test.jpg')) - const originalImageData = fs.readFileSync(path.join(__dirname, '../../../sqlite/test/general/samples/test.jpg')) - - const { Books } = cds.entities('sap.capire.bookshop') - - let i = 1000 - const gen = function* () { - yield `[{"ID":${i++},"title":"${i}","author_ID":101,"genre_ID":11}` - for (; i < 0; i++) { - yield `,{"ID":${i},"title":"${i}","author_ID":101,"genre_ID":11}` - } - yield ']' - } - - const withImage = false - const withExpands = false - - await INSERT(Readable.from(gen(), { objectMode: false })).into(Books) - if (withImage) await UPDATE(Books).with({ image: data }).where({ val: true }) - - process.stdout.write(`DEPLOYED\n`) - - const query = SELECT([ - '*', - ...(withImage ? [{ ref: ['image'] }] : []), - ...(withExpands ? [{ ref: ['author'], expand: ['*'] }, { ref: ['genre'], expand: ['*'] }] : []), - ]).from(Books) - const req = new cds.Request({ query, iterator: false, hasPostProcessing: false }) - - const proms = [] - for (let x = 0; x < 20; x++) { - proms.push(cds.tx(async tx => { - let rows = i - 1000 + 4 - const s = performance.now() - - const stream = await tx.dispatch(req) - - let rawResult = [] - const devNull = function () { - return new Writable({ - objectMode: true, write(chunk, encoding, callback) { - rawResult.push(chunk) - callback() - } - }) - } - - await pipeline( - stream, - async function* (source) { - for await (const row of source) { - /* - if (withImage) { - const buffer = await streamConsumers.buffer(row.image) - if (originalImageData.compare(buffer)) throw new Error('Blob stream does not contain the original data') - } - - if (withExpands) { - await Promise.all([ - pipeline(row.genre, devNull()), - pipeline(row.author, devNull()), - ]) - } - */ - yield Buffer.isBuffer(row) ? row : JSON.stringify(row) - } - }, - devNull() - ) - const dur = performance.now() - s - process.stdout.write(`Duration: ${dur}ms Rows: ${rows} (${rows / dur} rows/ms) (memory: ${process.memoryUsage().heapUsed / 1024 / 1024}MiB)\n`) - })) - } - await Promise.allSettled(proms) - } catch (err) { - debugger - throw err - } - }) - }) From 137d49e614ab04997019de014d142f06e217daf6 Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Tue, 9 Jul 2024 12:15:34 +0200 Subject: [PATCH 11/16] Add object streaming support to postgres --- postgres/lib/PostgresService.js | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/postgres/lib/PostgresService.js b/postgres/lib/PostgresService.js index 3383b49ad..fb005611d 100644 --- a/postgres/lib/PostgresService.js +++ b/postgres/lib/PostgresService.js @@ -177,9 +177,9 @@ GROUP BY k throw enhanceError(e, sql) } }, - stream: async (values, one) => { + stream: async (values, one, objectMode) => { try { - const streamQuery = new QueryStream({ ...query, values: this._getValues(values) }, one) + const streamQuery = new QueryStream({ ...query, values: this._getValues(values) }, one, objectMode) return await this.dbc.query(streamQuery) } catch (e) { throw enhanceError(e, sql) @@ -293,7 +293,8 @@ GROUP BY k return super.onPlainSQL(req, next) } - async onSELECT({ query, data }) { + async onSELECT(req) { + const { query, data } = req // workaround for chunking odata streaming if (query.SELECT?.columns?.find(col => col.as === '$mediaContentType')) { const columns = query.SELECT.columns @@ -311,7 +312,7 @@ GROUP BY k res[this.class.CQN2SQL.prototype.column_name(binary[0])] = stream return res } - return super.onSELECT({ query, data }) + return super.onSELECT(req) } async onINSERT(req) { @@ -619,7 +620,7 @@ GROUP BY k } class QueryStream extends Query { - constructor(config, one) { + constructor(config, one, objectMode) { // REVISIT: currently when setting the row chunk size // it results in an inconsistent connection state // if (!one) config.rows = 1000 @@ -628,6 +629,7 @@ class QueryStream extends Query { this._one = one || config.one this.stream = new Readable({ + objectMode, read: this.rows ? () => { this.stream.pause() @@ -690,8 +692,13 @@ class QueryStream extends Query { const val = msg.fields[0] if (!this._one && val !== null) this.push(this.constructor.open) this.emit('row', val) - this.push(val) + const objectMode = this.stream.readableObjectMode + this.push(objectMode ? JSON.parse(val) : val) + delete this.handleDataRow + if (objectMode) { + this.handleDataRow = this.handleDataRowObjectMode + } } } return super.handleRowDescription(msg) @@ -703,6 +710,11 @@ class QueryStream extends Query { this.push(msg.fields[0]) } + // Called when a new row is received + handleDataRowObjectMode(msg) { + this.push(JSON.parse(msg.fields[0])) + } + // Called when a new binary row is received handleBinaryRow(msg) { const val = msg.fields[0] === null ? null : this._result._parsers[0](msg.fields[0]) From 79b8af3a4932ba1931c737c0866eab59e5c8be41 Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Tue, 9 Jul 2024 13:48:50 +0200 Subject: [PATCH 12/16] Fix invalid invocation errors --- sqlite/lib/SQLiteService.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlite/lib/SQLiteService.js b/sqlite/lib/SQLiteService.js index 50453d58a..f2fcde087 100644 --- a/sqlite/lib/SQLiteService.js +++ b/sqlite/lib/SQLiteService.js @@ -124,7 +124,7 @@ class SQLiteService extends SQLService { } async _allStream(stmt, binding_params, one, objectMode) { - stmt = stmt.__proto__ || stmt + stmt = stmt.constructor.name === 'Statement' ? stmt : stmt.__proto__ stmt.raw(true) const get = stmt.get(binding_params) if (!get) return [] From 0402d3fc23f8049db4a5666f38289cc867774e1e Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Tue, 9 Jul 2024 13:49:20 +0200 Subject: [PATCH 13/16] Add perf test for sqlite and postgres --- postgres/test/perf.test.js | 3 +++ sqlite/test/perf.test.js | 3 +++ 2 files changed, 6 insertions(+) create mode 100644 postgres/test/perf.test.js create mode 100644 sqlite/test/perf.test.js diff --git a/postgres/test/perf.test.js b/postgres/test/perf.test.js new file mode 100644 index 000000000..629879eb3 --- /dev/null +++ b/postgres/test/perf.test.js @@ -0,0 +1,3 @@ +describe('postgres', () => { + require('../../test/scenarios/bookshop/stream.perf-test') +}) diff --git a/sqlite/test/perf.test.js b/sqlite/test/perf.test.js new file mode 100644 index 000000000..674a64439 --- /dev/null +++ b/sqlite/test/perf.test.js @@ -0,0 +1,3 @@ +describe('sqlite', () => { + require('../../test/scenarios/bookshop/stream.perf-test') +}) From 5d1083c97f9159271519c255a7604c855bd1185a Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Tue, 9 Jul 2024 14:34:07 +0200 Subject: [PATCH 14/16] Add stream performance test --- test/scenarios/bookshop/read.test.js | 2 - test/scenarios/bookshop/stream.perf-test.js | 141 ++++++++++++++++++++ 2 files changed, 141 insertions(+), 2 deletions(-) create mode 100644 test/scenarios/bookshop/stream.perf-test.js diff --git a/test/scenarios/bookshop/read.test.js b/test/scenarios/bookshop/read.test.js index bc781a1a0..c3b78c6d2 100644 --- a/test/scenarios/bookshop/read.test.js +++ b/test/scenarios/bookshop/read.test.js @@ -1,5 +1,3 @@ -const { Readable, Writable } = require('stream') -const { pipeline } = require('stream/promises') const cds = require('../../cds.js') const bookshop = cds.utils.path.resolve(__dirname, '../../bookshop') diff --git a/test/scenarios/bookshop/stream.perf-test.js b/test/scenarios/bookshop/stream.perf-test.js new file mode 100644 index 000000000..109a83a1e --- /dev/null +++ b/test/scenarios/bookshop/stream.perf-test.js @@ -0,0 +1,141 @@ +const fs = require('fs') +const path = require('path') +const { Readable, Writable } = require('stream') +const { pipeline } = require('stream/promises') +const streamConsumers = require('stream/consumers') + +const cds = require('../../cds.js') +const bookshop = cds.utils.path.resolve(__dirname, '../../bookshop') + +describe('Bookshop - Stream Performance', () => { + cds.test(bookshop) + + const imageData = fs.readFileSync(path.join(__dirname, '../../../sqlite/test/general/samples/test.jpg')) + + let numberOfBooks = 0 + beforeAll(async () => { + const { Books } = cds.entities('sap.capire.bookshop') + + let i = 1000 + const gen = function* () { + yield `[{"ID":${i++},"title":"${i}","author_ID":101,"genre_ID":11}` + for (; i < 100000; i++) { + yield `,{"ID":${i},"title":"${i}","author_ID":101,"genre_ID":11}` + } + yield ']' + } + await INSERT(Readable.from(gen(), { objectMode: false })).into(Books) + process.stdout.write(`DEPLOYED\n`) + numberOfBooks = (i - 1000 + 4) + }) + + const measure = [ + // { async: false }, + { async: true }, + ] + + const modes = [ + // { objectMode: false }, + // { objectMode: true }, + { objectMode: null }, + ] + + const scenarios = [ + { withImage: false, withExpands: false }, + // { withImage: false, withExpands: true }, + // { withImage: true, withExpands: false }, + // { withImage: true, withExpands: true }, + ] + + describe.each(measure)('$async', ({ async }) => { + + beforeAll(() => { process.stdout.write(`- ${async ? 'async' : 'sync'}\n`) }) + + describe.each(modes)('$objectMode', ({ objectMode }) => { + + beforeAll(() => { process.stdout.write(` - ${objectMode ? 'objectMode' : objectMode == null ? 'array' : 'raw'}\n`) }) + + it.each(scenarios)('$withImage $withExpands', async ({ withImage, withExpands }) => { + + if (scenarios.length > 1) process.stdout.write(` - Books ${withImage ? '+image ' : ''}${withExpands ? '+expand' : ''}\n`) + + const { Books } = cds.entities('sap.capire.bookshop') + + await UPDATE(Books).with({ image: withImage ? imageData : null }).where({ val: true }) + + const query = SELECT([ + '*', + ...(withImage ? [{ ref: ['image'] }] : []), + ...(withExpands ? [{ ref: ['author'], expand: ['*'] }, { ref: ['genre'], expand: ['*'] }] : []), + ]).from(Books) + const req = new cds.Request({ query, iterator: objectMode, hasPostProcessing: objectMode == null ? undefined : false }) + + const rows = numberOfBooks * (withExpands ? 3 : 1) + + let peakMemory = 0 + const proms = [] + const s = performance.now() + for (let x = 0; x < 20; x++) { + const prom = cds.tx(async tx => { + const stream = await tx.dispatch(req) + + if (objectMode !== false) { + await pipeline( + stream, + async function* (source) { + for await (const row of source) { + if (withImage) { + /* const buffer = */ await streamConsumers.buffer(row.image) + // if (imageData.compare(buffer)) throw new Error('Blob stream does not contain the original data') + } + + if (withExpands) { + await Promise.all([ + pipeline(row.genre, devNull()), + pipeline(row.author, devNull()), + ]) + } + + yield JSON.stringify(row) + } + }, + devNull() + ) + } else { + await pipeline(stream, devNull()) + } + + const curMemory = process.memoryUsage().heapUsed + if (curMemory > peakMemory) peakMemory = curMemory + }) + + proms.push(prom) + if (!async) { + await prom + } + } + + const allResults = await Promise.allSettled(proms) + const success = allResults.filter(r => r.status === 'fulfilled') + + const dur = performance.now() - s + const totalRows = rows * success.length + process.stdout.write( + `${scenarios.length > 1 ? ' ' : ''} - Duration: ${dur >>> 0} ms Rows: ${totalRows} (${(totalRows / dur) >>> 0} rows/ms) (${(peakMemory / 1024 / 1024) >>> 0} MiB mem)\n` + ) + + }, 120 * 1000) + + }) + + }) + +}) + +const devNull = function () { + return new Writable({ + objectMode: true, write(chunk, encoding, callback) { + callback() + } + }) +} \ No newline at end of file From e803cbd68170c94c2e3b17e0656539ea509e90e4 Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Wed, 10 Jul 2024 11:42:43 +0200 Subject: [PATCH 15/16] Disable streaming performance test in CI --- test/scenarios/bookshop/stream.perf-test.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/scenarios/bookshop/stream.perf-test.js b/test/scenarios/bookshop/stream.perf-test.js index 109a83a1e..bc1a70362 100644 --- a/test/scenarios/bookshop/stream.perf-test.js +++ b/test/scenarios/bookshop/stream.perf-test.js @@ -7,7 +7,7 @@ const streamConsumers = require('stream/consumers') const cds = require('../../cds.js') const bookshop = cds.utils.path.resolve(__dirname, '../../bookshop') -describe('Bookshop - Stream Performance', () => { +describe.skip('Bookshop - Stream Performance', () => { cds.test(bookshop) const imageData = fs.readFileSync(path.join(__dirname, '../../../sqlite/test/general/samples/test.jpg')) @@ -35,8 +35,8 @@ describe('Bookshop - Stream Performance', () => { ] const modes = [ - // { objectMode: false }, - // { objectMode: true }, + { objectMode: false }, + { objectMode: true }, { objectMode: null }, ] From 0d2df8df3332e21e57ac6f303500ede98238a13e Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Thu, 11 Jul 2024 11:00:22 +0200 Subject: [PATCH 16/16] Fix await position --- db-service/lib/SQLService.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db-service/lib/SQLService.js b/db-service/lib/SQLService.js index beb3bbc9a..9275cbc51 100644 --- a/db-service/lib/SQLService.js +++ b/db-service/lib/SQLService.js @@ -132,7 +132,7 @@ class SQLService extends DatabaseService { const isOne = cqn.SELECT.one || query.SELECT.from?.ref?.[0].cardinality?.max === 1 let ps = await this.prepare(sql) - let rows = await (hasPostProcessing === false || iterator) ? ps.stream(values, isOne, iterator) : ps.all(values) + let rows = (hasPostProcessing === false || iterator) ? await ps.stream(values, isOne, iterator) : await ps.all(values) if (rows.length) if (expand) rows = rows.map(r => (typeof r._json_ === 'string' ? JSON.parse(r._json_) : r._json_ || r))