-
Notifications
You must be signed in to change notification settings - Fork 595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(cdc): in-memory backfill for single mysql cdc table #11707
Conversation
…labs/risingwave into siyuan/cdc-backfill-framework
…ramework-single-table
…ramework-single-table
e7b76ff
to
05d3de8
Compare
src/connector/src/source/external.rs
Outdated
}; | ||
(pk.clone(), val) | ||
} else { | ||
(pk.clone(), Value::NULL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will generate filter conditions like a > NULL
or a = NULL
, which are always evaluated to false. Is this expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since primary key cannot be null in mysql, I fixed by returning an error to the caller.
} | ||
|
||
/// A wrapper of upstream table for snapshot read | ||
/// because we need to customize the snapshot read for managed upstream table (e.g. mv, index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UpstreamTableReader
is only used for cdc backfill. Based on the doc here, are we planning to use the same trait for cdc backfill and table/mv backfill?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, I want to unify the implementation of backfill for mv and backfill for external table. But the fact is that it is a little hard to tune the implementation details to become a unified backfill executor. May revisit this part in future.
@@ -506,10 +506,11 @@ where | |||
) | |||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits: we can use map
to call KeyedRow::into_owned_row
to avoid introdcuing the owned_row_iter
method.
@@ -540,7 +540,7 @@ where | |||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits: we can use map
to call KeyedRow::into_owned_row
to avoid introdcuing the owned_row_iter
method.
let order_key = primary_keys.iter().join(","); | ||
let sql = if start_pk.is_none() { | ||
format!( | ||
"SELECT {} FROM {} ORDER BY {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could there be potential injection attacks here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could there be potential injection attacks here?
Are spaces allowed in the RW field name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems possible.
dev=> create table t ("1; DROP DATABASE foo; SELECT 1" int);
CREATE_TABLE
dev=> select * from t;
1; DROP DATABASE foo; SELECT 1
--------------------------------
(0 rows)
Similar issues: #4217
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reminder🥵
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will unify to binary protocol in a separate PR.
…ramework-single-table
Seems like it is set to true by default https://github.com/risingwavelabs/risingwave/pull/11707/files#diff-4e19861816f5cf913890db3abd8088d01fa135d85f43080d5e6ee5c6c8ac40aaR337 Is that expected? |
Yes. |
Co-authored-by: StrikeW <[email protected]>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
This PR is the first part of the risingwavelabs/rfcs#63. It implements a
CdcBackfillExecutor
to take care of the backfill logic for a CDC table. Right now, we will instantiate aCdcBackfillExecutor
to wrap a source executor so that we can use the changelog events to backfill the table.Main changes are as follows:
cdc_backfill
to gate the this new featureStreamChunkMeta
field to theStreamChunk
for storing offsets of change log events, because the backfill algorithm needs to filter events based on offset (snapshot chunk doesn't have offsets)ExternalTableReader
component for reading snapshot and binlog from upstream table, this PR implements a table reader for MySQLStreamSourceInfo
in the process ofCREATE TABLE
TODO:
- Rewrite primary key comparison conditions for MySQL- Support more MySQL data typestracked: #11079
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
Enhance the
mysql-cdc
connector to allow lock-free ingestion, the connector won't need to lock upstream tables.set cdc_backfill="true"
to enable this feature.