Skip to content

Commit

Permalink
Add Iterator and ObjectMode Readable checks
Browse files Browse the repository at this point in the history
  • Loading branch information
BobdenOs committed Oct 15, 2024
1 parent 3a928f7 commit 20bba0e
Showing 1 changed file with 16 additions and 14 deletions.
30 changes: 16 additions & 14 deletions db-service/lib/cqn2sql.js
Original file line number Diff line number Diff line change
Expand Up @@ -520,13 +520,14 @@ class CQN2SQLRenderer {
// Include this.values for placeholders
/** @type {unknown[][]} */
this.entries = []
if (INSERT.entries[0] instanceof Readable) {
if (INSERT.entries[0] instanceof Readable && !INSERT.entries[0].readableObjectMode) {
INSERT.entries[0].type = 'json'
this.entries = [[...this.values, INSERT.entries[0]]]
} else {
const stream = Readable.from(this.INSERT_entries_stream(INSERT.entries), { objectMode: false })
const entries = INSERT.entries[0] instanceof Iterator || INSERT.entries[0] instanceof Readable ? INSERT.entries[0] : INSERT.entries
const stream = Readable.from(this.INSERT_entries_stream(entries), { objectMode: false })
stream.type = 'json'
stream._raw = INSERT.entries
stream._raw = entries
this.entries = [[...this.values, stream]]
}

Expand All @@ -536,14 +537,11 @@ class CQN2SQLRenderer {

async *INSERT_entries_stream(entries, binaryEncoding = 'base64') {
const elements = this.cqn.target?.elements || {}
const transformBase64 = binaryEncoding === 'base64'
? a => a
: a => a != null ? Buffer.from(a, 'base64').toString(binaryEncoding) : a
const bufferLimit = 65536 // 1 << 16
let buffer = '['

let sep = ''
for (const row of entries) {
for await (const row of entries) {
buffer += `${sep}{`
if (!sep) sep = ','

Expand All @@ -570,9 +568,13 @@ class CQN2SQLRenderer {
buffer += '"'
} else {
if (elements[key]?.type in BINARY_TYPES) {
val = transformBase64(val)
}
buffer += `${keyJSON}${val != null
? `"${Buffer.from(val, 'base64').toString(binaryEncoding)}"`
: 'null'
}`
} else {
buffer += `${keyJSON}${JSON.stringify(val)}`
}
}
}
buffer += '}'
Expand All @@ -588,9 +590,6 @@ class CQN2SQLRenderer {

async *INSERT_rows_stream(entries, binaryEncoding = 'base64') {
const elements = this.cqn.target?.elements || {}
const transformBase64 = binaryEncoding === 'base64'
? a => a
: a => a != null ? Buffer.from(a, 'base64').toString(binaryEncoding) : a
const bufferLimit = 65536 // 1 << 16
let buffer = '['

Expand Down Expand Up @@ -618,9 +617,12 @@ class CQN2SQLRenderer {
buffer += '"'
} else {
if (elements[this.columns[key]]?.type in BINARY_TYPES) {
val = transformBase64(val)
buffer += val != null
? `"${Buffer.from(val, 'base64').toString(binaryEncoding)}"`
: 'null'
} else {
buffer += `${sepsub}${val == null ? 'null' : JSON.stringify(val)}`
}
buffer += `${sepsub}${val === undefined ? 'null' : JSON.stringify(val)}`
}

if (!sepsub) sepsub = ','
Expand Down

0 comments on commit 20bba0e

Please sign in to comment.