-
Notifications
You must be signed in to change notification settings - Fork 557
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE-2841] Transforms: Start consuming from an arbitrary offset (numeric from start/end or timestamp) #19975
base: dev
Are you sure you want to change the base?
[CORE-2841] Transforms: Start consuming from an arbitrary offset (numeric from start/end or timestamp) #19975
Conversation
b177085
to
78e09c0
Compare
78e09c0
to
612cd16
Compare
/ci-repeat 1 |
612cd16
to
2d34443
Compare
serde::variant< | ||
latest_offset, | ||
model::timestamp, | ||
model::transform_from_start, | ||
model::transform_from_end> | ||
position; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a feature gate to prevent using the new position options on a partially rolled out cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, right, because it's a new envelope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is done. feels odd to have a whole feature flag for such a small change, but maybe we just don't do things like this very often.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, or we just overlook it and customers are well behaved, but in this case if someone does this old brokers won't be able to read transform metadata - which would be real bad.
Thin proxy for `kafka::partition_proxy::start_offset` Signed-off-by: Oren Leiman <[email protected]>
model::offset_delta resolves arithmetic operator overloads that perform automatic conversions between kafka and model offsets. This commit introduces an offset_delta that is used specifically for applying a numeric delta to an existing kafka::offset without adjusting its type. Useful for transform start offset calculations. Signed-off-by: Oren Leiman <[email protected]>
d0dd6cd
to
4177c74
Compare
force push rebase dev to avoid conflicts on feature flag |
4177c74
to
28b7d63
Compare
force push feature flag for the new position types, minor CR changes |
28b7d63
to
3b34b67
Compare
empty diff for commit signoff |
src/v/transform/api.cc
Outdated
|| !_feature_table->local().is_active( | ||
features::feature::transforms_specify_offset)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This blocks all deploys during the upgrade - we should only block usage of new position offsets because old versions will crash trying to read them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, there might be a disconnect in my understanding of how the variant serialization works. Won't we invariably write out a transform_offset_options which is versioned on the new variant layout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like if we allow a deploy (any deploy) to go through on the new version before the upgrade is complete, then we need to downgrade, won't metadata deser trigger this code?
redpanda/src/v/serde/rw/variant.h
Lines 146 to 156 in fd1434d
if (size != std::variant_size_v<UnderlyingType>) [[unlikely]] { | |
throw serde_exception(fmt_with_ctx( | |
ssx::sformat, | |
"reading type {} of size {}: {} bytes left - unexpected variant " | |
"size: {}, current variant size: {}, likely backwards compat issues.", | |
type_str<Type>(), | |
sizeof(Type), | |
in.bytes_left(), | |
size, | |
std::variant_size_v<UnderlyingType>)); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, i think. needs a test of some kind, but posting now for visibility
serde::variant< | ||
latest_offset, | ||
model::timestamp, | ||
model::transform_from_start, | ||
model::transform_from_end> | ||
position; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, or we just overlook it and customers are well behaved, but in this case if someone does this old brokers won't be able to read transform metadata - which would be real bad.
Thin wrappers around a kafka::offset_delta meant for use in a sum type to indicate how the delta should be applied - i.e. added to the start offset of a topic or subtracted from the latest offset of a topic. Enclosed delta should always be greater than or equal to zero. Signed-off-by: Oren Leiman <[email protected]>
3b34b67
to
f743216
Compare
force push contents:
|
As a consequence of this, to maintain compatibility during partial upgrades, introduce a legacy_transform_offset_options. This gives us the ability to feature gate use of the new offset alternatives and avoid writing non-forwards-compatible transform metadata in case a partial upgrade needs to be rolled back. This commit also adds transform_metadata::serde_write, which conditionally writes the legacy version of the offset options struct iff the position variant holds one of the legacy alternatives. This code can be removed after v24.3 ships. Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
{ "format": enum[timestamp, from_start, from_end], "value": int64 } Signed-off-by: Oren Leiman <[email protected]>
Specifically, we want to prevent the use of the new offset options alternatives before an upgrade has been finalized. Signed-off-by: Oren Leiman <[email protected]>
--from-offset to start from this offset * @t: start from UNIX timestamp (ms from epoch) * +oo: start offset + oo * -oo: latest ofset - oo Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
- Consume records that were produced before the deploy - Specify offsets that run off the end of the input topic - Ill-formed offsets Signed-off-by: Oren Leiman <[email protected]>
f743216
to
f6f1e38
Compare
force push make |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One small edge case to redeem otherwise LGTM
@@ -102,6 +170,26 @@ std::ostream& operator<<(std::ostream& os, const transform_metadata& meta) { | |||
return os; | |||
} | |||
|
|||
void transform_metadata::serde_write(iobuf& out) const { | |||
serde::write(out, name); | |||
write(out, input_topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No serde namespace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
surprisingly (to me anyway) apparently not. we're shooting for this overload:
redpanda/src/v/model/metadata.h
Lines 363 to 367 in fd1434d
friend void write(iobuf& out, topic_namespace t) { | |
using serde::write; | |
write(out, std::move(t.ns)); | |
write(out, std::move(t.tp)); | |
} |
perhaps making that a free function in namespace serde
would be a good cleanup though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ADL before we had the tag invoke pattern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just leave it it's fine for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh yeah wasn't going to change it today
}, | ||
[this, &latest](model::transform_from_end off) { | ||
vlog(_logger.debug, "starting at offset: {}", off); | ||
auto actual_offset = latest - off.delta; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
underflow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sigh. goood catch. thought i made a note somewhere to address that 😕
}, | ||
[this](model::transform_from_start off) { | ||
vlog(_logger.debug, "starting at offset: {}", off); | ||
auto actual_offset = _source->start_offset() + off.delta; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overflow
This PR wires up the ability to configure the start offset of a transform at deploy time. This can be either a unix timestamp (ms since epoch) or an offset delta (+oo from start offset of -oo from end).
Includes rpk experience.
Backports Required
Release Notes
Improvements