Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CORE-2841] Transforms: Start consuming from an arbitrary offset (numeric from start/end or timestamp) #19975

Open
wants to merge 10 commits into
base: dev
Choose a base branch
from

Conversation

oleiman
Copy link
Member

@oleiman oleiman commented Jun 24, 2024

This PR wires up the ability to configure the start offset of a transform at deploy time. This can be either a unix timestamp (ms since epoch) or an offset delta (+oo from start offset of -oo from end).

Includes rpk experience.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v24.1.x
  • v23.3.x
  • v23.2.x

Release Notes

Improvements

  • Adds the ability to start transform processing from an arbitrary offset on the input topic.

@oleiman oleiman self-assigned this Jun 24, 2024
oleiman

This comment was marked as resolved.

@oleiman oleiman changed the title Transforms: Start consuming from an arbitrary offset (numeric from start/end or timestamp) [CORE-2841] Transforms: Start consuming from an arbitrary offset (numeric from start/end or timestamp) Jun 24, 2024
@oleiman oleiman force-pushed the xform/core-2841/specify-start-offset branch 5 times, most recently from b177085 to 78e09c0 Compare June 25, 2024 17:26
@oleiman oleiman marked this pull request as ready for review June 25, 2024 17:28
@oleiman oleiman requested review from a team and michael-redpanda and removed request for a team June 25, 2024 17:29
@oleiman oleiman force-pushed the xform/core-2841/specify-start-offset branch from 78e09c0 to 612cd16 Compare June 25, 2024 18:01
@oleiman
Copy link
Member Author

oleiman commented Jun 27, 2024

/ci-repeat 1

@oleiman oleiman force-pushed the xform/core-2841/specify-start-offset branch from 612cd16 to 2d34443 Compare June 27, 2024 19:13
@oleiman oleiman requested review from rockwood-openai and pgellert and removed request for rockwood-openai June 27, 2024 19:15
@oleiman oleiman requested a review from rockwotj July 1, 2024 14:26
src/v/model/transform.cc Outdated Show resolved Hide resolved
Comment on lines +168 to +173
serde::variant<
latest_offset,
model::timestamp,
model::transform_from_start,
model::transform_from_end>
position;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a feature gate to prevent using the new position options on a partially rolled out cluster.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right, because it's a new envelope.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done. feels odd to have a whole feature flag for such a small change, but maybe we just don't do things like this very often.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, or we just overlook it and customers are well behaved, but in this case if someone does this old brokers won't be able to read transform metadata - which would be real bad.

src/v/model/transform.cc Show resolved Hide resolved
kbatuigas
kbatuigas previously approved these changes Jul 1, 2024
Thin proxy for `kafka::partition_proxy::start_offset`

Signed-off-by: Oren Leiman <[email protected]>
model::offset_delta resolves arithmetic operator overloads that perform
automatic conversions between kafka and model offsets.

This commit introduces an offset_delta that is used specifically for
applying a numeric delta to an existing kafka::offset without adjusting
its type.

Useful for transform start offset calculations.

Signed-off-by: Oren Leiman <[email protected]>
@oleiman oleiman force-pushed the xform/core-2841/specify-start-offset branch from d0dd6cd to 4177c74 Compare July 1, 2024 18:58
@oleiman
Copy link
Member Author

oleiman commented Jul 1, 2024

force push rebase dev to avoid conflicts on feature flag

@oleiman oleiman dismissed stale reviews from kbatuigas and pgellert via 28b7d63 July 1, 2024 19:53
@oleiman oleiman force-pushed the xform/core-2841/specify-start-offset branch from 4177c74 to 28b7d63 Compare July 1, 2024 19:53
@oleiman
Copy link
Member Author

oleiman commented Jul 1, 2024

force push feature flag for the new position types, minor CR changes

@oleiman oleiman force-pushed the xform/core-2841/specify-start-offset branch from 28b7d63 to 3b34b67 Compare July 1, 2024 19:57
@oleiman
Copy link
Member Author

oleiman commented Jul 1, 2024

empty diff for commit signoff

@oleiman oleiman requested review from rockwotj and pgellert July 1, 2024 20:12
Comment on lines 673 to 674
|| !_feature_table->local().is_active(
features::feature::transforms_specify_offset)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This blocks all deploys during the upgrade - we should only block usage of new position offsets because old versions will crash trying to read them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, there might be a disconnect in my understanding of how the variant serialization works. Won't we invariably write out a transform_offset_options which is versioned on the new variant layout?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like if we allow a deploy (any deploy) to go through on the new version before the upgrade is complete, then we need to downgrade, won't metadata deser trigger this code?

if (size != std::variant_size_v<UnderlyingType>) [[unlikely]] {
throw serde_exception(fmt_with_ctx(
ssx::sformat,
"reading type {} of size {}: {} bytes left - unexpected variant "
"size: {}, current variant size: {}, likely backwards compat issues.",
type_str<Type>(),
sizeof(Type),
in.bytes_left(),
size,
std::variant_size_v<UnderlyingType>));
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, i think. needs a test of some kind, but posting now for visibility

Comment on lines +168 to +173
serde::variant<
latest_offset,
model::timestamp,
model::transform_from_start,
model::transform_from_end>
position;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, or we just overlook it and customers are well behaved, but in this case if someone does this old brokers won't be able to read transform metadata - which would be real bad.

Thin wrappers around a kafka::offset_delta meant for use in a sum
type to indicate how the delta should be applied - i.e. added to
the start offset of a topic or subtracted from the latest offset
of a topic.

Enclosed delta should always be greater than or equal to zero.

Signed-off-by: Oren Leiman <[email protected]>
@oleiman oleiman force-pushed the xform/core-2841/specify-start-offset branch from 3b34b67 to f743216 Compare July 2, 2024 02:10
@oleiman oleiman requested a review from rockwotj July 2, 2024 02:12
@oleiman
Copy link
Member Author

oleiman commented Jul 2, 2024

force push contents:

  • serde shenanigans
  • adjust feature gate to guard new offset alternatives only

As a consequence of this, to maintain compatibility during partial
upgrades, introduce a legacy_transform_offset_options. This gives us
the ability to feature gate use of the new offset alternatives and avoid
writing non-forwards-compatible transform metadata in case a partial
upgrade needs to be rolled back.

This commit also adds transform_metadata::serde_write, which conditionally
writes the legacy version of the offset options struct iff the position
variant holds one of the legacy alternatives. This code can be removed after
v24.3 ships.

Signed-off-by: Oren Leiman <[email protected]>
{
  "format": enum[timestamp, from_start, from_end],
  "value": int64
}

Signed-off-by: Oren Leiman <[email protected]>
Specifically, we want to prevent the use of the new offset options
alternatives before an upgrade has been finalized.

Signed-off-by: Oren Leiman <[email protected]>
--from-offset to start from this offset
  * @t: start from UNIX timestamp (ms from epoch)
  * +oo: start offset + oo
  * -oo: latest ofset - oo

Signed-off-by: Oren Leiman <[email protected]>
- Consume records that were produced before the deploy
- Specify offsets that run off the end of the input topic
- Ill-formed offsets

Signed-off-by: Oren Leiman <[email protected]>
@oleiman oleiman force-pushed the xform/core-2841/specify-start-offset branch from f743216 to f6f1e38 Compare July 2, 2024 02:27
@oleiman
Copy link
Member Author

oleiman commented Jul 2, 2024

force push make legacy_transform_offset_options struct write-only.

Copy link
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small edge case to redeem otherwise LGTM

@@ -102,6 +170,26 @@ std::ostream& operator<<(std::ostream& os, const transform_metadata& meta) {
return os;
}

void transform_metadata::serde_write(iobuf& out) const {
serde::write(out, name);
write(out, input_topic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No serde namespace?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

surprisingly (to me anyway) apparently not. we're shooting for this overload:

friend void write(iobuf& out, topic_namespace t) {
using serde::write;
write(out, std::move(t.ns));
write(out, std::move(t.tp));
}

perhaps making that a free function in namespace serde would be a good cleanup though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ADL before we had the tag invoke pattern

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just leave it it's fine for now

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah wasn't going to change it today

},
[this, &latest](model::transform_from_end off) {
vlog(_logger.debug, "starting at offset: {}", off);
auto actual_offset = latest - off.delta;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

underflow

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sigh. goood catch. thought i made a note somewhere to address that 😕

},
[this](model::transform_from_start off) {
vlog(_logger.debug, "starting at offset: {}", off);
auto actual_offset = _source->start_offset() + off.delta;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overflow

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants