Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

model/x: Fix typo in legacy offset_options envelope template #20828

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions src/v/model/tests/transform_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "model/timestamp.h"
#include "model/transform.h"
#include "random/generators.h"
#include "serde/serde.h"
#include "test_utils/randoms.h"

#include <seastar/core/chunked_fifo.hh>
Expand Down Expand Up @@ -194,4 +195,110 @@ TEST(TransformedDataTest, MakeBatch) {
}
}

/**
* Verbatim offset options from v24.1.x
*
* We can and should remove this code after v24.3
*/
struct legacy_transform_offset_options_2
: serde::envelope<
legacy_transform_offset_options_2,
serde::version<0>,
serde::compat_version<0>> {
struct latest_offset
: serde::
envelope<latest_offset, serde::version<0>, serde::compat_version<0>> {
bool operator==(const latest_offset&) const = default;
};
serde::variant<latest_offset, model::timestamp> position;
bool operator==(const legacy_transform_offset_options_2&) const = default;
auto serde_fields() { return std::tie(position); }
};

/**
* Verbatim offset options from v24.1.x
*
* We can and should remove this code after v24.3
*/
struct legacy_transform_metadata
: serde::envelope<
legacy_transform_metadata,
serde::version<1>,
serde::compat_version<0>> {
model::transform_name name;
model::topic_namespace input_topic;
std::vector<model::topic_namespace> output_topics;
absl::flat_hash_map<ss::sstring, ss::sstring> environment;
uuid_t uuid;
model::offset source_ptr;
legacy_transform_offset_options_2 offset_options;
friend bool operator==(
const legacy_transform_metadata&, const legacy_transform_metadata&)
= default;
auto serde_fields() {
return std::tie(
name,
input_topic,
output_topics,
environment,
uuid,
source_ptr,
offset_options);
}
};

TEST(TransformMetadataTest, TestOffsetOptionsCompat) {
// Build latest-version transform metadata with one of the old version
// position alternatives
model::transform_metadata m{
.name = model::transform_name{"foo"},
.input_topic
= model::topic_namespace{model::ns{"bar"}, model::topic{"baz"}},
.uuid = uuid_t::create(),
.offset_options = {.position = model::timestamp::now()},
};

// ser/de
auto buf = serde::to_iobuf(m);
std::optional<legacy_transform_metadata> lm;
auto deser = [&lm, &buf] {
lm = serde::from_iobuf<legacy_transform_metadata>(std::move(buf));
};
EXPECT_NO_THROW(deser());
ASSERT_TRUE(lm.has_value());

// Make sure the deserialized legacy version of the struct has the same
// data, incl offset options
EXPECT_EQ(lm->name, m.name);
EXPECT_EQ(lm->input_topic, m.input_topic);
EXPECT_EQ(lm->uuid, m.uuid);
ASSERT_TRUE(
std::holds_alternative<model::timestamp>(lm->offset_options.position));
EXPECT_EQ(
std::get<model::timestamp>(lm->offset_options.position),
std::get<model::timestamp>(m.offset_options.position));
}

TEST(TransformMetadataTest, TestOffsetOptionsCompatFail) {
// Build latest-version transform metadata with one of the old version
// position alternatives
model::transform_metadata m{
.name = model::transform_name{"foo"},
.input_topic
= model::topic_namespace{model::ns{"bar"}, model::topic{"baz"}},
.uuid = uuid_t::create(),
.offset_options
= {.position = model::transform_from_start{kafka::offset_delta{0}}},
};

// ser/de should fail on the variant
auto buf = serde::to_iobuf(m);
std::optional<legacy_transform_metadata> lm;
auto deser = [&lm, &buf] {
lm = serde::from_iobuf<legacy_transform_metadata>(std::move(buf));
};
EXPECT_THROW(deser(), serde::serde_exception);
ASSERT_FALSE(lm.has_value());
}

} // namespace model
2 changes: 1 addition & 1 deletion src/v/model/transform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ operator<<(std::ostream& os, const transform_offset_options& opts) {
*/
struct legacy_transform_offset_options
: serde::envelope<
transform_offset_options,
legacy_transform_offset_options,
serde::version<0>,
serde::compat_version<0>> {
serde::variant<transform_offset_options::latest_offset, model::timestamp>
Expand Down
Loading