From 3ff22494f0a63e2398617a9bfeee7ee175e8e0ba Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Wed, 3 Jul 2024 10:26:29 -0700 Subject: [PATCH] model/x: Fix typo in legacy offset_options envelope template s/transform_offset_options/legacy_transform_offset_options/ Also adds unit tests to try prove out the theory of selectively writing out older versions of the serde::variant. Works as expected. Also worth noting that this works regardless of the serde::envelope CRTP param fixed by this diff...perhaps that's due to the narrow scope of the legacy struct - only ever being written out explicitly in transform_metadata::serde_write. Signed-off-by: Oren Leiman --- src/v/model/tests/transform_test.cc | 107 ++++++++++++++++++++++++++++ src/v/model/transform.cc | 2 +- 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/src/v/model/tests/transform_test.cc b/src/v/model/tests/transform_test.cc index 093ecb0ffcc0..90967f49b349 100644 --- a/src/v/model/tests/transform_test.cc +++ b/src/v/model/tests/transform_test.cc @@ -18,6 +18,7 @@ #include "model/timestamp.h" #include "model/transform.h" #include "random/generators.h" +#include "serde/serde.h" #include "test_utils/randoms.h" #include @@ -194,4 +195,110 @@ TEST(TransformedDataTest, MakeBatch) { } } +/** + * Verbatim offset options from v24.1.x + * + * We can and should remove this code after v24.3 + */ +struct legacy_transform_offset_options_2 + : serde::envelope< + legacy_transform_offset_options_2, + serde::version<0>, + serde::compat_version<0>> { + struct latest_offset + : serde:: + envelope, serde::compat_version<0>> { + bool operator==(const latest_offset&) const = default; + }; + serde::variant position; + bool operator==(const legacy_transform_offset_options_2&) const = default; + auto serde_fields() { return std::tie(position); } +}; + +/** + * Verbatim offset options from v24.1.x + * + * We can and should remove this code after v24.3 + */ +struct legacy_transform_metadata + : serde::envelope< + legacy_transform_metadata, + serde::version<1>, + serde::compat_version<0>> { + model::transform_name name; + model::topic_namespace input_topic; + std::vector output_topics; + absl::flat_hash_map environment; + uuid_t uuid; + model::offset source_ptr; + legacy_transform_offset_options_2 offset_options; + friend bool operator==( + const legacy_transform_metadata&, const legacy_transform_metadata&) + = default; + auto serde_fields() { + return std::tie( + name, + input_topic, + output_topics, + environment, + uuid, + source_ptr, + offset_options); + } +}; + +TEST(TransformMetadataTest, TestOffsetOptionsCompat) { + // Build latest-version transform metadata with one of the old version + // position alternatives + model::transform_metadata m{ + .name = model::transform_name{"foo"}, + .input_topic + = model::topic_namespace{model::ns{"bar"}, model::topic{"baz"}}, + .uuid = uuid_t::create(), + .offset_options = {.position = model::timestamp::now()}, + }; + + // ser/de + auto buf = serde::to_iobuf(m); + std::optional lm; + auto deser = [&lm, &buf] { + lm = serde::from_iobuf(std::move(buf)); + }; + EXPECT_NO_THROW(deser()); + ASSERT_TRUE(lm.has_value()); + + // Make sure the deserialized legacy version of the struct has the same + // data, incl offset options + EXPECT_EQ(lm->name, m.name); + EXPECT_EQ(lm->input_topic, m.input_topic); + EXPECT_EQ(lm->uuid, m.uuid); + ASSERT_TRUE( + std::holds_alternative(lm->offset_options.position)); + EXPECT_EQ( + std::get(lm->offset_options.position), + std::get(m.offset_options.position)); +} + +TEST(TransformMetadataTest, TestOffsetOptionsCompatFail) { + // Build latest-version transform metadata with one of the old version + // position alternatives + model::transform_metadata m{ + .name = model::transform_name{"foo"}, + .input_topic + = model::topic_namespace{model::ns{"bar"}, model::topic{"baz"}}, + .uuid = uuid_t::create(), + .offset_options + = {.position = model::transform_from_start{kafka::offset_delta{0}}}, + }; + + // ser/de should fail on the variant + auto buf = serde::to_iobuf(m); + std::optional lm; + auto deser = [&lm, &buf] { + lm = serde::from_iobuf(std::move(buf)); + }; + EXPECT_THROW(deser(), serde::serde_exception); + ASSERT_FALSE(lm.has_value()); +} + } // namespace model diff --git a/src/v/model/transform.cc b/src/v/model/transform.cc index 54e1bea35e15..4780d9665a88 100644 --- a/src/v/model/transform.cc +++ b/src/v/model/transform.cc @@ -118,7 +118,7 @@ operator<<(std::ostream& os, const transform_offset_options& opts) { */ struct legacy_transform_offset_options : serde::envelope< - transform_offset_options, + legacy_transform_offset_options, serde::version<0>, serde::compat_version<0>> { serde::variant