Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write data streaming to a parquet file #542

Open
andresgutgon opened this issue May 24, 2024 · 3 comments
Open

Write data streaming to a parquet file #542

andresgutgon opened this issue May 24, 2024 · 3 comments

Comments

@andresgutgon
Copy link

andresgutgon commented May 24, 2024

What?

Hi, we're using at the time @dsnp/parquetjs to write parquet files in node. But is a fork of an old package. And doesn't look super maintained.

So I came across this repo that looks super active but is not clear to me if we can do what we're doing now with parquet-wasm. So maybe you can help me understand.

What do we want to do?

We want to iterate a huge PostgreSQL table with a cursor so we have batches of rows that we want to iterate and store in a parquet file.

So I was wondering if that's possible with parquet-wasm. Handle streaming of data and at the end save the file in disk

This is how we do with @dsnp/parquetjs

const BATCH_SIZE = 4096
const SQL_QUERY = 'SELECT * FROM users'
async function writeParquet(): Promise<string> {
  return new Promise<string>((resolve) => {
    let url: string
    // This doesn't matter. 
    // Source batchquery do a cursor pg iteration 
    // and we receive N rows for each batch in `onBatch` method
    OUR_POSTGREST_DB.batchQuery(SQL_QUERY, {
      batchSize: BATCH_SIZE,
      onBatch: async (batch) => {
        if (!writer) {
          const schema = this.buildParquetSchema(batch.fields)
          writer = await ParquetWriter.openFile(schema, '/path/to/file.parquet', {
              rowGroupSize: size > ROW_GROUP_SIZE ? size : ROW_GROUP_SIZE,
          })
        }

        for (const row of batch.rows) {
          // This does not write in parquet I think but accumulate as many rows
          // as you define in `rowGroupSize`
          await writer.appendRow(row)
        }

        if (batch.lastBatch) {
          await writer.close()
          resolve(url)
        }
      },
    })
  })
}

Thanks for the help!

@kylebarron
Copy link
Owner

Right now we support streaming reads but not yet streaming writes. That's pending #305

@andresgutgon
Copy link
Author

Thanks! looking forward. Do you know how much work is left to do in that PR?

@kylebarron
Copy link
Owner

I haven't looked at that PR in a while. It looks like it needs a little work to be updated with the latest main branch. But aside from that it might work with few changes. You can ask @H-Plus-Time if he's interested in working on that PR more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants