Skip to content

Commit

Permalink
Added Parquet format support for streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
slvrtrn committed Oct 30, 2023
1 parent c836bb1 commit bc310bb
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 6 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
## 0.2.6 (Common, Node.js)

### New features

- Added [Parquet format](https://clickhouse.com/docs/en/integrations/data-formats/parquet) streaming support.
See the new examples:
[insert from a file](./examples/node/insert_file_stream_parquet.ts),
[select into a file](./examples/node/select_parquet_as_file.ts).

## 0.2.5 (Common, Node.js, Web)

### Bug fixes
Expand Down
3 changes: 2 additions & 1 deletion examples/node/insert_file_stream_csv.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { createClient } from '@clickhouse/client'
import type { Row } from '@clickhouse/client-common'
import Fs from 'fs'
import { cwd } from 'node:process'
import Path from 'path'

void (async () => {
Expand All @@ -19,7 +20,7 @@ void (async () => {
})

// contains data as 1,"foo","[1,2]"\n2,"bar","[3,4]"\n...
const filename = Path.resolve(process.cwd(), './node/resources/data.csv')
const filename = Path.resolve(cwd(), './node/resources/data.csv')

await client.insert({
table: tableName,
Expand Down
3 changes: 2 additions & 1 deletion examples/node/insert_file_stream_ndjson.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Row } from '@clickhouse/client'
import { createClient } from '@clickhouse/client'
import Fs from 'fs'
import { cwd } from 'node:process'
import Path from 'path'
import split from 'split2'

Expand All @@ -20,7 +21,7 @@ void (async () => {

// contains id as numbers in JSONCompactEachRow format ["0"]\n["0"]\n...
// see also: NDJSON format
const filename = Path.resolve(process.cwd(), './node/resources/data.ndjson')
const filename = Path.resolve(cwd(), './node/resources/data.ndjson')

await client.insert({
table: tableName,
Expand Down
58 changes: 58 additions & 0 deletions examples/node/insert_file_stream_parquet.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { createClient } from '@clickhouse/client'
import type { Row } from '@clickhouse/client-common'
import Fs from 'fs'
import { cwd } from 'node:process'
import Path from 'path'

void (async () => {
const client = createClient()
const tableName = 'insert_file_stream_parquet'
await client.command({
query: `DROP TABLE IF EXISTS ${tableName}`,
})
await client.command({
query: `
CREATE TABLE ${tableName}
(id UInt64, name String, sku Array(UInt8))
ENGINE MergeTree()
ORDER BY (id)
`,
})

const filename = Path.resolve(cwd(), './node/resources/data.parquet')

/*
(examples) $ pqrs cat node/resources/data.parquet
############################
File: node/resources/data.parquet
############################
{id: 0, name: [97], sku: [1, 2]}
{id: 1, name: [98], sku: [3, 4]}
{id: 2, name: [99], sku: [5, 6]}
*/

await client.insert({
table: tableName,
values: Fs.createReadStream(filename),
format: 'Parquet',
})

const rs = await client.query({
query: `SELECT * from ${tableName}`,
format: 'JSONEachRow',
})

for await (const rows of rs.stream()) {
// or just `rows.json()`
// to consume the entire response at once
rows.forEach((row: Row) => {
console.log(row.json())
})
}

await client.close()
})()
Binary file added examples/node/resources/data.parquet
Binary file not shown.
42 changes: 42 additions & 0 deletions examples/node/select_parquet_as_file.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { createClient } from '@clickhouse/client'
import Fs from 'fs'
import { cwd } from 'node:process'
import Path from 'path'

void (async () => {
const client = createClient()

const { stream } = await client.exec({
query: `SELECT * from system.numbers LIMIT 10 FORMAT Parquet`,
})

const filename = Path.resolve(cwd(), './node/out.parquet')
const writeStream = Fs.createWriteStream(filename)
stream.pipe(writeStream)
await new Promise((resolve) => {
stream.on('end', resolve)
})

/*
(examples) $ pqrs cat node/out.parquet
#################
File: node/out.parquet
#################
{number: 0}
{number: 1}
{number: 2}
{number: 3}
{number: 4}
{number: 5}
{number: 6}
{number: 7}
{number: 8}
{number: 9}
*/

await client.close()
})()
Binary file not shown.
1 change: 1 addition & 0 deletions packages/client-common/src/data_formatter/formatter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const supportedRawFormats = [
'CustomSeparated',
'CustomSeparatedWithNames',
'CustomSeparatedWithNamesAndTypes',
'Parquet',
] as const

export type JSONDataFormat = (typeof supportedJSONFormats)[number]
Expand Down
2 changes: 1 addition & 1 deletion packages/client-common/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '0.2.5'
export default '0.2.6'
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { fakerRU } from '@faker-js/faker'
import { createSimpleTable } from '@test/fixtures/simple_table'
import { createTableWithFields } from '@test/fixtures/table_with_fields'
import { createTestClient, guid } from '@test/utils'
import { Buffer } from 'buffer'
import Fs from 'fs'
import split from 'split2'
import Stream from 'stream'
Expand All @@ -28,7 +29,7 @@ describe('[Node.js] streaming e2e', () => {
['2', 'c', [5, 6]],
]

it('should stream a file', async () => {
it('should stream an NDJSON file', async () => {
// contains id as numbers in JSONCompactEachRow format ["0"]\n["1"]\n...
const filename =
'packages/client-common/__tests__/fixtures/streaming_e2e_data.ndjson'
Expand All @@ -55,6 +56,43 @@ describe('[Node.js] streaming e2e', () => {
expect(actual).toEqual(expected)
})

it('should stream a Parquet file', async () => {
const filename =
'packages/client-common/__tests__/fixtures/streaming_e2e_data.parquet'
await client.insert({
table: tableName,
values: Fs.createReadStream(filename),
format: 'Parquet',
})

// check that the data was inserted correctly
const rs = await client.query({
query: `SELECT * from ${tableName}`,
format: 'JSONCompactEachRow',
})

const actual: unknown[] = []
for await (const rows of rs.stream()) {
rows.forEach((row: Row) => {
actual.push(row.json())
})
}
expect(actual).toEqual(expected)

// check if we can stream it back and get the output matching the input file
const stream = await client
.exec({
query: `SELECT * from ${tableName} FORMAT Parquet`,
})
.then((r) => r.stream)

const parquetChunks: Buffer[] = []
for await (const chunk of stream) {
parquetChunks.push(chunk)
}
expect(Buffer.concat(parquetChunks)).toEqual(Fs.readFileSync(filename))
})

it('should stream a stream created in-place', async () => {
await client.insert({
table: tableName,
Expand Down
2 changes: 1 addition & 1 deletion packages/client-node/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '0.2.5'
export default '0.2.6'
2 changes: 1 addition & 1 deletion packages/client-web/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '0.2.5'
export default '0.2.6'

0 comments on commit bc310bb

Please sign in to comment.