-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multiple threads processing #342
base: develop
Are you sure you want to change the base?
Conversation
It would be better if it didn't depend on the hard-coded <server>.<database>.<table> template
It runs ClickhouseBatchRunnable sequently, instead of parallel. We should create a few ClickhouseBatchRunnable tasks by scheduleAtFixedRate method to make them work in parallel. Such behavior for single task is described in docs for ScheduledThreadPoolExecutor for method scheduleAtFixedRate.
This reverts commit e8cef17.
It is likely that this code will cause race conditions (e.g. records inserted/updated/removed in an order that is different from binlog), you will have to implement per topic locks to avoid that (and to avoid your issue here #350). I've actually tried to implement a similar lock (though it wasn't needed it seems), in
|
@AdamKatzDev I think the order of insertions is not important. There is a version column that helps Clickhouse determine the order in which rows are changed. Unfortunately, locks won't make the queue processing faster |
@IlyaTsoi I couldn't figure out the moment when the version column is generated, version correctness itself might depend on the order records are processed. There is also an issue caused by records coming before or after a DDL (failures injecting records with different schema, lost or corrupted data after truncates). @aadant @subkanthi could you please tell at what moment row version generated and how? |
@AdamKatzDev If I'm not mistaken, this value is taken from the debezium message field ts_ms. From debezium docs for mysql connector:
You are right about DDL handling. I haven't explored the current logic yet) |
@IlyaTsoi if you are correct then this version control won't work for very hot data. https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree
Chances of the same row updating in the same millisecond can be quite high depending on the load. |
@IlyaTsoi @AdamKatzDev for the MySQL case the version is increasing as it contains a snowflakeID + a counter coming from the GTID. Regarding the DDL, all table writers are flushed before the DDL is applied with the sink-connector-lightweight. @subkanthi can confirm. If you have a test case that is failing or losing data, please report it as a separate issue ideally with a test case. Thanks ! |
So it looks like it is safe to insert data in an arbitrary order. If there are same millisecond collisions then GTID should take care of this.
I remember that code. Looks safe. @IlyaTsoi looks like DDL operations are not a problem after all. MySQL replication is not an issue too since it is safe to insert rows in any order as you assumed. The only issue is other databases as @aadant mentioned, their version value is just a millisecond timestamp. Lines 640 to 649 in 5883bf4
It looks like there is an equivalent for GTID in both PostgreSQL and SQL Sever called LSN and Commit LSN respectively that could be used instead. |
Hm, it should be checked. Would be nice if it worked) |
I think, that the current code doesn't work properly. I have checked it by logs. It runs ClickhouseBatchRunnable sequently, instead of parallel. We should create a few ClickhouseBatchRunnable tasks by scheduleAtFixedRate method to make them work in parallel. Such behavior for single task is described in docs for ScheduledThreadPoolExecutor for method scheduleAtFixedRate.