diff --git a/db-service/lib/SQLService.js b/db-service/lib/SQLService.js index a3679313e..9275cbc51 100644 --- a/db-service/lib/SQLService.js +++ b/db-service/lib/SQLService.js @@ -115,13 +115,13 @@ class SQLService extends DatabaseService { * Handler for SELECT * @type {Handler} */ - async onSELECT({ query, data }) { + async onSELECT({ query, data, hasPostProcessing, iterator }) { // REVISIT: for custom joins, infer is called twice, which is bad // --> make cds.infer properly work with custom joins and remove this if (!query.target) { try { this.infer(query) } catch { /**/ } } - if (query.target && !query.target._unresolved) { + if (query.SELECT.expand !== false && query.target && !query.target._unresolved) { // Will return multiple rows with objects inside query.SELECT.expand = 'root' } @@ -129,9 +129,10 @@ class SQLService extends DatabaseService { const { sql, values, cqn } = this.cqn2sql(query, data) const expand = query.SELECT.expand delete query.SELECT.expand + const isOne = cqn.SELECT.one || query.SELECT.from?.ref?.[0].cardinality?.max === 1 let ps = await this.prepare(sql) - let rows = await ps.all(values) + let rows = (hasPostProcessing === false || iterator) ? await ps.stream(values, isOne, iterator) : await ps.all(values) if (rows.length) if (expand) rows = rows.map(r => (typeof r._json_ === 'string' ? JSON.parse(r._json_) : r._json_ || r)) @@ -157,7 +158,7 @@ class SQLService extends DatabaseService { return SQLService._arrayWithCount(rows, await this.count(query, rows)) } - return cqn.SELECT.one || query.SELECT.from?.ref?.[0].cardinality?.max === 1 ? rows[0] : rows + return hasPostProcessing !== false && isOne ? rows[0] : rows } /** @@ -287,7 +288,7 @@ class SQLService extends DatabaseService { * @returns {Promise} */ async count(query, ret) { - if (ret) { + if (ret?.length) { const { one, limit: _ } = query.SELECT, n = ret.length const [max, offset = 0] = one ? [1] : _ ? [_.rows?.val, _.offset?.val] : [] diff --git a/hana/lib/HANAService.js b/hana/lib/HANAService.js index fbf500e50..28b8f3e3b 100644 --- a/hana/lib/HANAService.js +++ b/hana/lib/HANAService.js @@ -118,7 +118,7 @@ class HANAService extends SQLService { } async onSELECT(req) { - const { query, data } = req + const { query, data, hasPostProcessing, iterator } = req if (!query.target) { try { this.infer(query) } catch { /**/ } @@ -138,13 +138,15 @@ class HANAService extends SQLService { delete query.SELECT.expand const isSimple = temporary.length + blobs.length + withclause.length === 0 + const isOne = cqn.SELECT.one || query.SELECT.from.ref?.[0].cardinality?.max === 1 + const canStream = (hasPostProcessing === false || iterator) && !isLockQuery // REVISIT: add prepare options when param:true is used const sqlScript = isLockQuery || isSimple ? sql : this.wrapTemporary(temporary, withclause, blobs) let rows - if (values?.length || blobs.length > 0) { + if (values?.length || blobs.length > 0 || canStream) { const ps = await this.prepare(sqlScript, blobs.length) - rows = this.ensureDBC() && await ps.all(values || []) + rows = this.ensureDBC() && await ps[canStream ? 'stream' : 'all'](values || [], isOne, iterator) } else { rows = await this.exec(sqlScript) } @@ -164,7 +166,7 @@ class HANAService extends SQLService { // REVISIT: the runtime always expects that the count is preserved with .map, required for renaming in mocks return HANAService._arrayWithCount(rows, await this.count(query, rows)) } - return cqn.SELECT.one || query.SELECT.from.ref?.[0].cardinality?.max === 1 ? rows[0] : rows + return isOne && !canStream ? rows[0] : rows } async onINSERT({ query, data }) { diff --git a/hana/lib/drivers/base.js b/hana/lib/drivers/base.js index 1c198f90a..35b9c8394 100644 --- a/hana/lib/drivers/base.js +++ b/hana/lib/drivers/base.js @@ -221,7 +221,7 @@ const handleLevel = function (levels, path, expands) { const property = path.slice(level.path.length + 2, -7) if (property && property in level.expands) { const is2Many = level.expands[property] - delete level.expands[property] + // delete level.expands[property] if (level.hasProperties) { buffer += ',' } else { @@ -236,6 +236,7 @@ const handleLevel = function (levels, path, expands) { index: 1, suffix: is2Many ? ']' : '', path: path.slice(0, -6), + result: level.expands[property], expands, }) } else { @@ -247,6 +248,7 @@ const handleLevel = function (levels, path, expands) { index: 0, suffix: '}', path: path, + result: levels.at(-1).result, expands, }) break @@ -261,6 +263,11 @@ const handleLevel = function (levels, path, expands) { } } if (level.suffix) buffer += level.suffix + if (level.expands) { + for (const expand in level.expands) { + if (level.expands[expand]?.push) level.expands[expand]?.push(null) + } + } } } return buffer diff --git a/hana/lib/drivers/hana-client.js b/hana/lib/drivers/hana-client.js index c8125c44e..db162f3a1 100644 --- a/hana/lib/drivers/hana-client.js +++ b/hana/lib/drivers/hana-client.js @@ -1,8 +1,11 @@ const { Readable, Stream } = require('stream') +const { pipeline } = require('stream/promises') const cds = require('@sap/cds') const hdb = require('@sap/hana-client') const { driver, prom, handleLevel } = require('./base') +const { resultSetStream } = require('./stream') + const LOG = cds.log('@sap/hana-client') if (process.env.NODE_ENV === 'production' && !process.env.HDB_NODEJS_THREADPOOL_SIZE && !process.env.UV_THREADPOOL_SIZE) LOG.warn("When using @sap/hana-client, it's strongly recommended to adjust its thread pool size with environment variable `HDB_NODEJS_THREADPOOL_SIZE`, otherwise it might lead to performance issues.\nLearn more: https://help.sap.com/docs/SAP_HANA_CLIENT/f1b440ded6144a54ada97ff95dac7adf/31a8c93a574b4f8fb6a8366d2c758f21.html") @@ -134,7 +137,7 @@ class HANAClientDriver extends driver { return this._getResultForProcedure(rows, outParameters, stmt) } - ret.stream = async (values, one) => { + ret.stream = async (values, one, objectMode) => { const stmt = await ret._prep values = Array.isArray(values) ? values : [] // Uses the native exec method instead of executeQuery to initialize a full stream @@ -159,7 +162,7 @@ class HANAClientDriver extends driver { if (rs.isNull(0)) return null return Readable.from(streamBlob(rs, undefined, 0), { objectMode: false }) } - return Readable.from(rsIterator(rs, one), { objectMode: false }) + return rsIterator(rs, one, objectMode) } return ret } @@ -232,10 +235,13 @@ class HANAClientDriver extends driver { HANAClientDriver.pool = true -async function* rsIterator(rs, one) { - const next = prom(rs, 'next') // () => rs.next() - const getValue = prom(rs, 'getValue') // nr => rs.getValue(nr) - const getData = prom(rs, 'getData') // (nr, pos, buf, zero, bufSize) => rs.getData(nr, pos, buf, zero, bufSize) // +async function rsIterator(rs, one, objectMode) { + rs._rowPosition = -1 + rs.nextAsync = prom(rs, 'next') + rs.getValueAsync = prom(rs, 'getValue') + rs.getValueAsync = prom(rs, 'getData') + + const blobs = rs.getColumnInfo().slice(4).map(b => b.columnName) const levels = [ { index: 0, @@ -244,83 +250,67 @@ async function* rsIterator(rs, one) { expands: {}, }, ] - - const binaryBuffer = new Buffer.alloc(1 << 16) - - const blobColumns = {} - rs.getColumnInfo() - .slice(4) - .forEach((c, i) => { - blobColumns[c.columnName] = i + 4 - }) - - if (!one) { - yield '[' - } - - let buffer = '' - // Load next row of the result set (starts before the first row) - while (await next()) { - const values = await Promise.all([getValue(0), getValue(1), getValue(2)]) - - const [path, _blobs, _expands] = values - const expands = JSON.parse(_expands) - const blobs = JSON.parse(_blobs) - - yield handleLevel(levels, path, expands) - - let hasProperties = false - let jsonPosition = 0 - while (true) { - const read = await getData(3, jsonPosition, binaryBuffer, 0, binaryBuffer.byteLength) - if (read < binaryBuffer.byteLength) { - if (read > 2) hasProperties = true - // Pipe json stream.slice(0,-1) removing the } to keep the object open - yield binaryBuffer.slice(0, read - 1).toString('utf-8') - break - } - jsonPosition += read - yield binaryBuffer.toString('utf-8') - } - - for (const key of Object.keys(blobs)) { - if (hasProperties) buffer += ',' - hasProperties = true - buffer += `${JSON.stringify(key)}:` - - const columnIndex = blobColumns[key] - if (rs.isNull(columnIndex)) { - buffer += 'null' - continue + const state = { + rs, + levels, + blobs, + columnIndex: 0, + binaryBuffer: new Buffer.alloc(1 << 16), + done() { + this.columnIndex = 0 + this.rs._rowPosition++ + return this.rs.nextCanBlock() + ? this.rs.nextAsync().then(a => !a) + : !this.rs.next() + }, + inject(str) { + if (str == null) return + this.stream.push(str) + }, + read() { + this.columnIndex++ + }, + readString() { + const index = this.columnIndex++ + if (index === 3) { + const _inject = str => { + this.inject(str.slice(0, -1)) + return str.length + } + return this.rs.getValuesCanBlock() + ? this.rs.getValueAsync(index).then(_inject) + : _inject(this.rs.getValue(index)) } - - buffer += '"' - yield buffer - buffer = '' - - const stream = Readable.from(streamBlob(rs, undefined, columnIndex, binaryBuffer), { objectMode: false }) + return this.rs.getValuesCanBlock() + ? this.rs.getValueAsync(index) + : this.rs.getValue(index) + }, + readBlob() { + const index = this.columnIndex++ + const stream = Readable.from(streamBlob(this.rs, undefined, index, this.binaryBuffer), { objectMode: false }) stream.setEncoding('base64') - for await (const chunk of stream) { - yield chunk - } - buffer += '"' + this.stream.push('"') + return pipeline(stream, this.stream, { end: false }).then(() => { this.stream.push('"') }) } + } - if (buffer) { - yield buffer - buffer = '' + if (objectMode) { + state.inject = function inject() { } + state.readString = function readString() { + const index = this.columnIndex++ + return this.rs.getValuesCanBlock() + ? this.rs.getValueAsync(index) + : this.rs.getValue(index) + } + state.readBlob = function readBlob() { + const index = this.columnIndex++ + const stream = Readable.from(streamBlob(this.rs, rs._rowPosition, index, this.binaryBuffer), { objectMode: false }) + stream.setEncoding('base64') + return stream } - - const level = levels[levels.length - 1] - level.hasProperties = hasProperties } - // Close all left over levels - buffer += levels - .reverse() - .map(l => l.suffix) - .join('') - yield buffer + return resultSetStream(state, one, objectMode) } async function* streamBlob(rs, rowIndex = -1, columnIndex, binaryBuffer = Buffer.allocUnsafe(1 << 16)) { diff --git a/hana/lib/drivers/hdb.js b/hana/lib/drivers/hdb.js index a57537875..04587b839 100644 --- a/hana/lib/drivers/hdb.js +++ b/hana/lib/drivers/hdb.js @@ -6,6 +6,7 @@ const hdb = require('hdb') const iconv = require('iconv-lite') const { driver, prom, handleLevel } = require('./base') +const { resultSetStream } = require('./stream') const credentialMappings = [ { old: 'certificate', new: 'ca' }, @@ -111,32 +112,18 @@ class HDBDriver extends driver { return this._getResultForProcedure(rows, outParameters) } - ret.stream = async (values, one) => { + ret.stream = async (values, one, objectMode) => { const stmt = await ret._prep const rs = await prom(stmt, 'execute')(values || []) - const cols = rs.metadata - // If the query only returns a single row with a single blob it is the final stream - if (cols.length === 1 && cols[0].length === -1) { - const rowStream = rs.createObjectStream() - const { done, value } = await rowStream[Symbol.asyncIterator]().next() - if (done || !value[cols[0].columnName]) return null - const blobStream = value[cols[0].columnName].createReadStream() - blobStream.on('close', () => { - rowStream.end() - rs.close() - }) - return blobStream - } - // Create ResultSet stream from ResultSet iterator - return Readable.from(rsIterator(rs, one)) + return rsIterator(rs, one, objectMode) } return ret } _getResultForProcedure(rows, outParameters) { // on hdb, rows already contains results for scalar params - const isArray = Array.isArray(rows) - const result = isArray ? {...rows[0]} : {...rows} + const isArray = Array.isArray(rows) + const result = isArray ? { ...rows[0] } : { ...rows } // merge table output params into scalar params const args = isArray ? rows.slice(1) : [] @@ -146,7 +133,7 @@ class HDBDriver extends driver { result[params[i].PARAMETER_NAME] = args[i] } } - + return result } @@ -169,15 +156,11 @@ class HDBDriver extends driver { } } -function* echoStream(ret) { - yield ret -} - -async function* rsIterator(rs, one) { +async function rsIterator(rs, one, objectMode) { // Raw binary data stream unparsed const raw = rs.createBinaryStream()[Symbol.asyncIterator]() - const nativeBlobs = rs.metadata.slice(4).map(b => b.columnName) + const blobs = rs.metadata.slice(4).map(b => b.columnName) const levels = [ { index: 0, @@ -190,25 +173,38 @@ async function* rsIterator(rs, one) { const state = { rs, levels, + blobs, reading: 0, writing: 0, buffer: Buffer.allocUnsafe(0), - yields: [], + columnIndex: 0, + // yields: [], + next() { + const ret = this._prefetch || raw.next() + this._prefetch = raw.next() + return ret + }, done() { // Validate whether the current buffer is finished reading if (this.buffer.byteLength <= this.reading) { - return raw.next().then(next => { + return this.next().then(next => { if (next.done || next.value.byteLength === 0) { + // yield for raw mode + this.inject(handleLevel(this.levels, '$', {})) + if (this.writing) this.stream.push(this.buffer.subarray(0, this.writing)) return true } - this.yields.push(this.buffer.slice(0, this.writing)) + if (this.writing) this.stream.push(this.buffer.subarray(0, this.writing)) // Update state this.buffer = next.value + this.columnIndex = 0 this.reading = 0 this.writing = 0 }) .catch(() => { // TODO: check whether the error is early close + this.inject(handleLevel(this.levels, '$', {})) + if (this.writing) this.stream.push(this.buffer.subarray(0, this.writing)) return true }) } @@ -218,14 +214,14 @@ async function* rsIterator(rs, one) { if (this.buffer.byteLength >= totalSize) { return } - return raw.next().then(next => { + return this.next().then(next => { if (next.done) { throw new Error('Trying to read more bytes than are available') } // Write processed buffer to stream - if (this.writing) this.yields.push(this.buffer.slice(0, this.writing)) + if (this.writing) this.stream.push(this.buffer.subarray(0, this.writing)) // Keep unread buffer and prepend to new buffer - const leftover = this.buffer.slice(this.reading) + const leftover = this.buffer.subarray(this.reading) // Update state this.buffer = Buffer.concat([leftover, next.value]) this.reading = 0 @@ -240,24 +236,24 @@ async function* rsIterator(rs, one) { if (bytesLeft < length) { // Copy leftover bytes if (encoding) { - let slice = Buffer.from(iconv.decode(this.buffer.slice(this.reading), 'cesu8'), 'binary') + let slice = Buffer.from(iconv.decode(this.buffer.subarray(this.reading), 'cesu8'), 'binary') this.prefetchDecodedSize = slice.byteLength const encoded = Buffer.from(encoding.write(slice)) if (this.writing + encoded.byteLength > this.buffer.byteLength) { - this.yields.push(this.buffer.slice(0, this.writing)) - this.yields.push(encoded) + this.stream.push(this.buffer.subarray(0, this.writing)) + this.stream.push(encoded) } else { - this.buffer.copy(encoded, this.writing) + this.buffer.copy(encoded, this.writing) // REVISIT: make sure this is the correct copy direction this.writing += encoded.byteLength - this.yields.push(this.buffer.slice(0, this.writing)) + this.stream.push(this.buffer.subarray(0, this.writing)) } } else { this.buffer.copyWithin(this.writing, this.reading) this.writing += bytesLeft - this.yields.push(this.buffer.slice(0, this.writing)) + this.stream.push(this.buffer.subarray(0, this.writing)) } - return raw.next().then(next => { + return this.next().then(next => { length = length - bytesLeft if (next.done) { throw new Error('Trying to read more byte then are available') @@ -270,19 +266,19 @@ async function* rsIterator(rs, one) { }) } if (encoding) { - let slice = Buffer.from(iconv.decode(this.buffer.slice(this.reading, this.reading + length), 'cesu8'), 'binary') + let slice = Buffer.from(iconv.decode(this.buffer.subarray(this.reading, this.reading + length), 'cesu8'), 'binary') this.prefetchDecodedSize = slice.byteLength const encoded = Buffer.from(encoding.write(slice)) const nextWriting = this.writing + encoded.byteLength const nextReading = this.reading + length if (nextWriting > this.buffer.byteLength || nextWriting > nextReading) { - this.yields.push(this.buffer.slice(0, this.writing)) - this.yields.push(encoded) - this.buffer = this.buffer.slice(nextReading) + this.stream.push(this.buffer.subarray(0, this.writing)) + this.stream.push(encoded) + this.buffer = this.buffer.subarray(nextReading) this.reading = 0 this.writing = 0 } else { - this.buffer.copy(encoded, this.writing) + this.buffer.copy(encoded, this.writing) // REVISIT: make sure this is the correct copy direction this.writing += encoded.byteLength this.reading += length } @@ -293,11 +289,12 @@ async function* rsIterator(rs, one) { } }, inject(str) { + if (str == null) return str = Buffer.from(str) if (this.writing + str.byteLength > this.reading) { - this.yields.push(this.buffer.slice(0, this.writing)) - this.yields.push(str) - this.buffer = this.buffer.slice(this.reading) + this.stream.push(this.buffer.subarray(0, this.writing)) + this.stream.push(str) + this.buffer = this.buffer.subarray(this.reading) this.writing = 0 this.reading = 0 return @@ -308,83 +305,61 @@ async function* rsIterator(rs, one) { slice(length) { const ens = this.ensure(length) if (ens) return ens.then(() => this.slice(length)) - const ret = this.buffer.slice(this.reading, this.reading + length) + const ret = this.buffer.subarray(this.reading, this.reading + length) this.reading += length return ret }, + readString() { + this.columnIndex++ + return readString(this, this.columnIndex === 4) + }, + readBlob() { + return readBlob(state, new StringDecoder('base64')) + } } - if (!one) { - state.inject('[') - } - - let isDone = state.done() - if (isDone) { - isDone = await isDone - } - while (!isDone) { - // Made all functions possible promises giving a 5x speed up - const _path = readString(state) - const path = (typeof _path === 'string' ? _path : await _path).toString('utf-8') - const _blobs = readString(state) - const blobs = JSON.parse(typeof _blobs === 'string' ? _blobs : await _blobs) - const _expands = readString(state) - const expands = JSON.parse(typeof _expands === 'string' ? _expands : await _expands) - - state.inject(handleLevel(levels, path, expands)) - - // REVISIT: allow streaming with both NVARCHAR and NCLOB - // Read and write JSON blob data - const jsonLength = readString(state, true) - let hasProperties = (typeof jsonLength === 'number' ? jsonLength : await jsonLength) > 2 - - for (const blobColumn of nativeBlobs) { - // Skip all blobs that are not part of this row - if (!(blobColumn in blobs)) { - state.read(2) - continue - } - - if (hasProperties) state.inject(',') - hasProperties = true - state.inject(`${JSON.stringify(blobColumn)}:`) - - const blobLength = readBlob(state, new StringDecoder('base64')) - if (typeof blobLength !== 'number') await blobLength + // Mostly ignore buffer manipulation for objectMode + if (objectMode) { + state.write = function write(length, encoding) { + let slice = this.buffer.slice(this.reading, this.reading + length) + this.prefetchDecodedSize = slice.byteLength + this.reading += length + return encoding.write(slice) } - - const level = levels[levels.length - 1] - level.hasProperties = hasProperties - - for (const y of state.yields) { - if (y.byteLength) { - yield y - } + state.inject = function inject() { } + state.readString = function _readString() { + return readString(this) } - state.yields = [] - - isDone = state.done() - if (isDone) { - isDone = await isDone + state.readBlob = function _readBlob() { + const binaryStream = new Readable({ + read() { + if (binaryStream._prefetch) { + this.push(binaryStream._prefetch) + binaryStream._prefetch = null + } + this.resume() + } + }) + readBlob(state, { + end() { binaryStream.push(null) }, + write(chunk) { + if (!binaryStream.readableDidRead) { + binaryStream._prefetch = chunk + binaryStream.pause() + return new Promise((resolve, reject) => { + binaryStream.once('error', reject) + binaryStream.once('resume', resolve) + }) + } + binaryStream.push(chunk) + } + }) + ?.catch((err) => { if (binaryStream) binaryStream.emit('error', err) }) + return binaryStream } } - state.inject( - levels - .reverse() - .map(l => l.suffix) - .join(''), - ) - if (state.writing) { - state.yields.push(state.buffer.slice(0, state.writing)) - } - - for (const y of state.yields) { - if (y.byteLength) { - yield y - } - } - rs.close() + return resultSetStream(state, one, objectMode) } const readString = function (state, isJson = false) { diff --git a/hana/lib/drivers/stream.js b/hana/lib/drivers/stream.js new file mode 100644 index 000000000..89027386f --- /dev/null +++ b/hana/lib/drivers/stream.js @@ -0,0 +1,160 @@ +const { Readable } = require('stream') +const { handleLevel } = require('./base') + +function rsNext(state, cb) { + let done + if (!cb) done = state.done() + if (done?.then) return done.then(done => { + if (done) return { done } + return rsNext(state, true) + }) + if (done) return { done } + + let _path = state.readString() + // if (_path.then) _path = await _path + const path = (_path) + let _blobs = state.readString() + // if (_blobs.then) _blobs = await _blobs + const blobs = _blobs.length === 2 ? {} : JSON.parse(_blobs) + let _expands = state.readString() + // if (_expands.then) _expands = await _expands + const expands = _expands.length === 2 ? {} : JSON.parse(_expands) + + state.inject(handleLevel(state.levels, path, expands)) + + // REVISIT: allow streaming with both NVARCHAR and NCLOB + // Read JSON blob data + const value = state.readString() + + return { + // Iterator pattern + done, + value, + + // Additional row information + path, + blobs, + expands, + } +} + +function rsNextObjectMode(state, next) { + if (!next) next = rsNext(state) + if (next.then) return next.then(next => rsNextObjectMode(state, next)) + const { done, value, path, blobs, expands } = next + if (done) return next + + const json = JSON.parse(value) + + // Convert incoming blobs into their own native Readable streams + for (const blobColumn of state.blobs) { + // Skip all blobs that are not part of this row + if (!(blobColumn in blobs)) { + state.read(2) // 2 is the number of bytes to skip the column for hdb + continue + } + json[blobColumn] = state.readBlob() // Return delayed blob read stream or null + } + + const level = state.levels.at(-1) + + // Expose expanded columns as recursive Readable streams + for (const expandName in expands) { + level.expands[expandName] = json[expandName] = new Readable({ + objectMode: true, + read() { + state.stream.resume() + } + }) + } + + // Push current + const resultStream = level.result + resultStream.push(json) + resultStream._reading-- + + return { + // Iterator pattern + done, + value: json, + + // Additional row information + path, + } +} + +function rsNextRaw(state, next) { + if (!next) next = rsNext(state) + if (next.then) return next.then(next => rsNextRaw(state, next)) + const { done, value, path, blobs } = next + if (done) return next + + // Read and write JSON blob data + let hasProperties = value > 2 + + const nextBlob = function (state, i = 0) { + if (state.blobs.length <= i) return + const blobColumn = state.blobs[i] + + // Skip all blobs that are not part of this row + if (!(blobColumn in blobs)) { + state.read(2) + return nextBlob(state, i + 1) + } + + if (hasProperties) state.inject(',') + hasProperties = true + state.inject(`${JSON.stringify(blobColumn)}:`) + + const blobLength = state.readBlob() + if (blobLength.then) return blobLength.then(() => nextBlob(state, i + 1)) + } + + const _return = () => { + const level = state.levels.at(-1) + level.hasProperties = hasProperties + + return { + // Iterator pattern + done, + value, + + // Additional row information + path, + } + } + + const writeBlobs = nextBlob(state, 0) + return writeBlobs ? writeBlobs.then(_return) : _return() +} + +async function rsIterator(state, one, objectMode) { + const stream = state.stream = new Readable({ + objectMode, + async read() { + if (this._running) { + this._reading++ + return + } + this._running = true + this._reading = 1 + + const _next = objectMode ? rsNextObjectMode.bind(null, state) : rsNextRaw.bind(null, state) + while (this._reading > 0) { + let result = _next() + if (result.then) result = await result + if (result.done) return this.push(null) + } + this._running = false + } + }) + state.levels[0].result = stream + + if (!objectMode && !one) { + stream.push('[') + } + + return stream +} + +module.exports.resultSetStream = rsIterator \ No newline at end of file diff --git a/hana/test/perf.test.js b/hana/test/perf.test.js new file mode 100644 index 000000000..2edb9dcd2 --- /dev/null +++ b/hana/test/perf.test.js @@ -0,0 +1,3 @@ +describe('hana', () => { + require('../../test/scenarios/bookshop/stream.perf-test') +}) diff --git a/postgres/lib/PostgresService.js b/postgres/lib/PostgresService.js index 4b910f48b..37f11555d 100644 --- a/postgres/lib/PostgresService.js +++ b/postgres/lib/PostgresService.js @@ -177,9 +177,9 @@ GROUP BY k throw enhanceError(e, sql) } }, - stream: async (values, one) => { + stream: async (values, one, objectMode) => { try { - const streamQuery = new QueryStream({ ...query, values: this._getValues(values) }, one) + const streamQuery = new QueryStream({ ...query, values: this._getValues(values) }, one, objectMode) return await this.dbc.query(streamQuery) } catch (e) { throw enhanceError(e, sql) @@ -293,7 +293,8 @@ GROUP BY k return super.onPlainSQL(req, next) } - async onSELECT({ query, data }) { + async onSELECT(req) { + const { query, data } = req // workaround for chunking odata streaming if (query.SELECT?.columns?.find(col => col.as === '$mediaContentType')) { const columns = query.SELECT.columns @@ -311,7 +312,7 @@ GROUP BY k res[this.class.CQN2SQL.prototype.column_name(binary[0])] = stream return res } - return super.onSELECT({ query, data }) + return super.onSELECT(req) } async onINSERT(req) { @@ -619,7 +620,7 @@ GROUP BY k } class QueryStream extends Query { - constructor(config, one) { + constructor(config, one, objectMode) { // REVISIT: currently when setting the row chunk size // it results in an inconsistent connection state // if (!one) config.rows = 1000 @@ -628,6 +629,7 @@ class QueryStream extends Query { this._one = one || config.one this.stream = new Readable({ + objectMode, read: this.rows ? () => { this.stream.pause() @@ -690,8 +692,13 @@ class QueryStream extends Query { const val = msg.fields[0] if (!this._one && val !== null) this.push(this.constructor.open) this.emit('row', val) - this.push(val) + const objectMode = this.stream.readableObjectMode + this.push(objectMode ? JSON.parse(val) : val) + delete this.handleDataRow + if (objectMode) { + this.handleDataRow = this.handleDataRowObjectMode + } } } return super.handleRowDescription(msg) @@ -703,6 +710,11 @@ class QueryStream extends Query { this.push(msg.fields[0]) } + // Called when a new row is received + handleDataRowObjectMode(msg) { + this.push(JSON.parse(msg.fields[0])) + } + // Called when a new binary row is received handleBinaryRow(msg) { const val = msg.fields[0] === null ? null : this._result._parsers[0](msg.fields[0]) diff --git a/postgres/test/perf.test.js b/postgres/test/perf.test.js new file mode 100644 index 000000000..629879eb3 --- /dev/null +++ b/postgres/test/perf.test.js @@ -0,0 +1,3 @@ +describe('postgres', () => { + require('../../test/scenarios/bookshop/stream.perf-test') +}) diff --git a/sqlite/lib/SQLiteService.js b/sqlite/lib/SQLiteService.js index 899019e3f..25f13b9f6 100644 --- a/sqlite/lib/SQLiteService.js +++ b/sqlite/lib/SQLiteService.js @@ -77,7 +77,7 @@ class SQLiteService extends SQLService { run: (..._) => this._run(stmt, ..._), get: (..._) => stmt.get(..._), all: (..._) => stmt.all(..._), - stream: (..._) => this._stream(stmt, ..._), + stream: (..._) => this._allStream(stmt, ..._), } } catch (e) { e.message += ' in:\n' + (e.query = sql) @@ -98,7 +98,8 @@ class SQLiteService extends SQLService { return stmt.run(binding_params) } - async *_iterator(rs, one) { + async *_iteratorRaw(rs, one) { + const pageSize = (1 << 16) // Allow for both array and iterator result sets const first = Array.isArray(rs) ? { done: !rs[0], value: rs[0] } : rs.next() if (first.done) return @@ -109,13 +110,30 @@ class SQLiteService extends SQLService { return } - yield '[' + let buffer = '[' + first.value[0] // Print first value as stand alone to prevent comma check inside the loop - yield first.value[0] for (const row of rs) { - yield `,${row[0]}` + buffer += `,${row[0]}` + if (buffer.length > pageSize) { + yield buffer + buffer = '' + } } - yield ']' + buffer += ']' + yield buffer + } + + async _allStream(stmt, binding_params, one, objectMode) { + stmt = stmt.constructor.name === 'Statement' ? stmt : stmt.__proto__ + stmt.raw(true) + const get = stmt.get(binding_params) + if (!get) return [] + const rs = stmt.iterate(binding_params) + const stream = Readable.from(objectMode ? rs : this._iteratorRaw(rs, one), { objectMode }) + stream.on('close', () => { + rs.return() // finish result set when closed early + }) + return stream } exec(sql) { diff --git a/sqlite/test/perf.test.js b/sqlite/test/perf.test.js new file mode 100644 index 000000000..674a64439 --- /dev/null +++ b/sqlite/test/perf.test.js @@ -0,0 +1,3 @@ +describe('sqlite', () => { + require('../../test/scenarios/bookshop/stream.perf-test') +}) diff --git a/test/scenarios/bookshop/stream.perf-test.js b/test/scenarios/bookshop/stream.perf-test.js new file mode 100644 index 000000000..bc1a70362 --- /dev/null +++ b/test/scenarios/bookshop/stream.perf-test.js @@ -0,0 +1,141 @@ +const fs = require('fs') +const path = require('path') +const { Readable, Writable } = require('stream') +const { pipeline } = require('stream/promises') +const streamConsumers = require('stream/consumers') + +const cds = require('../../cds.js') +const bookshop = cds.utils.path.resolve(__dirname, '../../bookshop') + +describe.skip('Bookshop - Stream Performance', () => { + cds.test(bookshop) + + const imageData = fs.readFileSync(path.join(__dirname, '../../../sqlite/test/general/samples/test.jpg')) + + let numberOfBooks = 0 + beforeAll(async () => { + const { Books } = cds.entities('sap.capire.bookshop') + + let i = 1000 + const gen = function* () { + yield `[{"ID":${i++},"title":"${i}","author_ID":101,"genre_ID":11}` + for (; i < 100000; i++) { + yield `,{"ID":${i},"title":"${i}","author_ID":101,"genre_ID":11}` + } + yield ']' + } + await INSERT(Readable.from(gen(), { objectMode: false })).into(Books) + process.stdout.write(`DEPLOYED\n`) + numberOfBooks = (i - 1000 + 4) + }) + + const measure = [ + // { async: false }, + { async: true }, + ] + + const modes = [ + { objectMode: false }, + { objectMode: true }, + { objectMode: null }, + ] + + const scenarios = [ + { withImage: false, withExpands: false }, + // { withImage: false, withExpands: true }, + // { withImage: true, withExpands: false }, + // { withImage: true, withExpands: true }, + ] + + describe.each(measure)('$async', ({ async }) => { + + beforeAll(() => { process.stdout.write(`- ${async ? 'async' : 'sync'}\n`) }) + + describe.each(modes)('$objectMode', ({ objectMode }) => { + + beforeAll(() => { process.stdout.write(` - ${objectMode ? 'objectMode' : objectMode == null ? 'array' : 'raw'}\n`) }) + + it.each(scenarios)('$withImage $withExpands', async ({ withImage, withExpands }) => { + + if (scenarios.length > 1) process.stdout.write(` - Books ${withImage ? '+image ' : ''}${withExpands ? '+expand' : ''}\n`) + + const { Books } = cds.entities('sap.capire.bookshop') + + await UPDATE(Books).with({ image: withImage ? imageData : null }).where({ val: true }) + + const query = SELECT([ + '*', + ...(withImage ? [{ ref: ['image'] }] : []), + ...(withExpands ? [{ ref: ['author'], expand: ['*'] }, { ref: ['genre'], expand: ['*'] }] : []), + ]).from(Books) + const req = new cds.Request({ query, iterator: objectMode, hasPostProcessing: objectMode == null ? undefined : false }) + + const rows = numberOfBooks * (withExpands ? 3 : 1) + + let peakMemory = 0 + const proms = [] + const s = performance.now() + for (let x = 0; x < 20; x++) { + const prom = cds.tx(async tx => { + const stream = await tx.dispatch(req) + + if (objectMode !== false) { + await pipeline( + stream, + async function* (source) { + for await (const row of source) { + if (withImage) { + /* const buffer = */ await streamConsumers.buffer(row.image) + // if (imageData.compare(buffer)) throw new Error('Blob stream does not contain the original data') + } + + if (withExpands) { + await Promise.all([ + pipeline(row.genre, devNull()), + pipeline(row.author, devNull()), + ]) + } + + yield JSON.stringify(row) + } + }, + devNull() + ) + } else { + await pipeline(stream, devNull()) + } + + const curMemory = process.memoryUsage().heapUsed + if (curMemory > peakMemory) peakMemory = curMemory + }) + + proms.push(prom) + if (!async) { + await prom + } + } + + const allResults = await Promise.allSettled(proms) + const success = allResults.filter(r => r.status === 'fulfilled') + + const dur = performance.now() - s + const totalRows = rows * success.length + process.stdout.write( + `${scenarios.length > 1 ? ' ' : ''} - Duration: ${dur >>> 0} ms Rows: ${totalRows} (${(totalRows / dur) >>> 0} rows/ms) (${(peakMemory / 1024 / 1024) >>> 0} MiB mem)\n` + ) + + }, 120 * 1000) + + }) + + }) + +}) + +const devNull = function () { + return new Writable({ + objectMode: true, write(chunk, encoding, callback) { + callback() + } + }) +} \ No newline at end of file