Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch loading sometimes missing a records #188

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

jeonguihyeong
Copy link

  1. If sink task write gcs file in parallel, gcs filename is dupulicated currently.
    gcs filename added first offset for unique file name

  2. If sink task write gcs file in same folder, other process delete other tables gcs file.
    so seperated folder for tables

  3. waiting job status for bigquery load job
    adding log for job's suceeded rows count

Update GCSToBQLoadRunnable.java
If sink task write gcs file in parallel, gcs filename is dupulicated currently.
gcs filename added first offset for unique file name

If sink task write gcs file in same folder, other process delete other tables gcs file.
so seperated folder for tables

waiting job status for bigquery load job

adding log for job's suceeded rows count

Update BigQuerySinkTask.java
If sink task write gcs file in parallel, gcs filename is dupulicated currently.
gcs filename added first offset for unique file name

If sink task write gcs file in same folder, other process delete other tables gcs file.
so seperated folder for tables

If sink task write gcs file in parallel, gcs filename is dupulicated currently. 
gcs filename added first offset for unique file name 

If sink task write gcs file in same folder, other process delete other tables gcs file. 
so seperated folder for tables
If sink task write gcs file in parallel, gcs filename is dupulicated currently. 
gcs filename added first offset for unique file name 

If sink task write gcs file in same folder, other process delete other tables gcs file. 
so seperated folder for tables

waiting job status for bigquery load job 

adding log for job's suceeded rows count
@jeonguihyeong jeonguihyeong requested a review from a team as a code owner April 6, 2022 05:40
@raphaelauv
Copy link

Hey @jeonguihyeong could you reformulate your PR text , it's very complicate to understand your message , thanks

@@ -247,7 +249,8 @@ public void put(Collection<SinkRecord> records) {
TableWriterBuilder tableWriterBuilder;
if (config.getList(BigQuerySinkConfig.ENABLE_BATCH_CONFIG).contains(record.topic())) {
String topic = record.topic();
String gcsBlobName = topic + "_" + uuid + "_" + Instant.now().toEpochMilli();
long offset = record.kafkaOffset();
String gcsBlobName = topic + "_" + uuid + "_" + Instant.now().toEpochMilli()+"_"+records.size()+"_"+offset;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would having a test case for validating that parallel puts create different files with the right offset help?

Copy link
Author

@jeonguihyeong jeonguihyeong Apr 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gcp side

  • you can use gcs versioning and how many times write gcs name.

bigquery sink

  • it's concept is used s3 sink. i convert to bigquery sink.
    it is using first offset of records. and it is not corrupted because kafka message offset is unique.

if you need more comment, connect me.

@b-goyal
Copy link
Member

b-goyal commented Dec 7, 2022

We have not been able to reproduce missing records issue on our end.
@jeonguihyeong , since you mention you observed it at GCS filename duplication, could you help us with steps to replicate the 'GCS filename duplication' please ?

CC: @kapilchhajer @binoy-fernandez

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants