Skip to content

Commit

Permalink
Merge pull request #20828 from oleiman/xform/noticket/fix-legacy-opti…
Browse files Browse the repository at this point in the history
…ons-envelope

model/x: Fix typo in legacy offset_options envelope template
  • Loading branch information
aanthony-rp committed Jul 5, 2024
2 parents 7c9bbf8 + 3ff2249 commit fa0a2e1
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 1 deletion.
107 changes: 107 additions & 0 deletions src/v/model/tests/transform_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "model/timestamp.h"
#include "model/transform.h"
#include "random/generators.h"
#include "serde/serde.h"
#include "test_utils/randoms.h"

#include <seastar/core/chunked_fifo.hh>
Expand Down Expand Up @@ -194,4 +195,110 @@ TEST(TransformedDataTest, MakeBatch) {
}
}

/**
* Verbatim offset options from v24.1.x
*
* We can and should remove this code after v24.3
*/
struct legacy_transform_offset_options_2
: serde::envelope<
legacy_transform_offset_options_2,
serde::version<0>,
serde::compat_version<0>> {
struct latest_offset
: serde::
envelope<latest_offset, serde::version<0>, serde::compat_version<0>> {
bool operator==(const latest_offset&) const = default;
};
serde::variant<latest_offset, model::timestamp> position;
bool operator==(const legacy_transform_offset_options_2&) const = default;
auto serde_fields() { return std::tie(position); }
};

/**
* Verbatim offset options from v24.1.x
*
* We can and should remove this code after v24.3
*/
struct legacy_transform_metadata
: serde::envelope<
legacy_transform_metadata,
serde::version<1>,
serde::compat_version<0>> {
model::transform_name name;
model::topic_namespace input_topic;
std::vector<model::topic_namespace> output_topics;
absl::flat_hash_map<ss::sstring, ss::sstring> environment;
uuid_t uuid;
model::offset source_ptr;
legacy_transform_offset_options_2 offset_options;
friend bool operator==(
const legacy_transform_metadata&, const legacy_transform_metadata&)
= default;
auto serde_fields() {
return std::tie(
name,
input_topic,
output_topics,
environment,
uuid,
source_ptr,
offset_options);
}
};

TEST(TransformMetadataTest, TestOffsetOptionsCompat) {
// Build latest-version transform metadata with one of the old version
// position alternatives
model::transform_metadata m{
.name = model::transform_name{"foo"},
.input_topic
= model::topic_namespace{model::ns{"bar"}, model::topic{"baz"}},
.uuid = uuid_t::create(),
.offset_options = {.position = model::timestamp::now()},
};

// ser/de
auto buf = serde::to_iobuf(m);
std::optional<legacy_transform_metadata> lm;
auto deser = [&lm, &buf] {
lm = serde::from_iobuf<legacy_transform_metadata>(std::move(buf));
};
EXPECT_NO_THROW(deser());
ASSERT_TRUE(lm.has_value());

// Make sure the deserialized legacy version of the struct has the same
// data, incl offset options
EXPECT_EQ(lm->name, m.name);
EXPECT_EQ(lm->input_topic, m.input_topic);
EXPECT_EQ(lm->uuid, m.uuid);
ASSERT_TRUE(
std::holds_alternative<model::timestamp>(lm->offset_options.position));
EXPECT_EQ(
std::get<model::timestamp>(lm->offset_options.position),
std::get<model::timestamp>(m.offset_options.position));
}

TEST(TransformMetadataTest, TestOffsetOptionsCompatFail) {
// Build latest-version transform metadata with one of the old version
// position alternatives
model::transform_metadata m{
.name = model::transform_name{"foo"},
.input_topic
= model::topic_namespace{model::ns{"bar"}, model::topic{"baz"}},
.uuid = uuid_t::create(),
.offset_options
= {.position = model::transform_from_start{kafka::offset_delta{0}}},
};

// ser/de should fail on the variant
auto buf = serde::to_iobuf(m);
std::optional<legacy_transform_metadata> lm;
auto deser = [&lm, &buf] {
lm = serde::from_iobuf<legacy_transform_metadata>(std::move(buf));
};
EXPECT_THROW(deser(), serde::serde_exception);
ASSERT_FALSE(lm.has_value());
}

} // namespace model
2 changes: 1 addition & 1 deletion src/v/model/transform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ operator<<(std::ostream& os, const transform_offset_options& opts) {
*/
struct legacy_transform_offset_options
: serde::envelope<
transform_offset_options,
legacy_transform_offset_options,
serde::version<0>,
serde::compat_version<0>> {
serde::variant<transform_offset_options::latest_offset, model::timestamp>
Expand Down

0 comments on commit fa0a2e1

Please sign in to comment.