Skip to content

Commit

Permalink
spillover_manifest: remove get_manifest_path()
Browse files Browse the repository at this point in the history
  • Loading branch information
andrwng committed Jun 20, 2024
1 parent bd4cb71 commit ed82b8a
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 91 deletions.
22 changes: 11 additions & 11 deletions src/v/archival/tests/ntp_archiver_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,18 @@ static void log_segment_set(storage::log_manager& lm) {
}

static remote_manifest_path generate_spill_manifest_path(
model::ntp ntp,
model::initial_revision_id rev_id,
const cloud_storage::segment_meta& meta) {
const cloud_storage::partition_manifest& stm_manifest,
const cloud_storage::segment_meta& spillover_manifest) {
cloud_storage::spillover_manifest_path_components comp{
.base = meta.base_offset,
.last = meta.committed_offset,
.base_kafka = meta.base_kafka_offset(),
.next_kafka = meta.next_kafka_offset(),
.base_ts = meta.base_timestamp,
.last_ts = meta.max_timestamp,
.base = spillover_manifest.base_offset,
.last = spillover_manifest.committed_offset,
.base_kafka = spillover_manifest.base_kafka_offset(),
.next_kafka = spillover_manifest.next_kafka_offset(),
.base_ts = spillover_manifest.base_timestamp,
.last_ts = spillover_manifest.max_timestamp,
};
return cloud_storage::generate_spillover_manifest_path(ntp, rev_id, comp);
return remote_manifest_path{
path_provider.spillover_manifest_path(stm_manifest, comp)};
}

void log_upload_candidate(const archival::upload_candidate& up) {
Expand Down Expand Up @@ -633,7 +633,7 @@ FIXTURE_TEST(test_archive_retention, archiver_fixture) {
BOOST_REQUIRE_EQUAL(spills.begin()->base_offset, model::offset{0});
BOOST_REQUIRE_EQUAL(spills.begin()->committed_offset, model::offset{1999});
auto spill_path = generate_spill_manifest_path(
manifest_ntp, manifest_revision, *(spills.begin()));
part->archival_meta_stm()->manifest(), *(spills.begin()));

config::shard_local_cfg().log_retention_ms.set_value(
std::chrono::milliseconds{5min});
Expand Down
34 changes: 0 additions & 34 deletions src/v/cloud_storage/partition_manifest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,40 +250,6 @@ partition_manifest::partition_manifest(
, _segments()
, _last_offset(0) {}

// NOTE: the methods that generate remote paths use the xxhash function
// to randomize the prefix. S3 groups the objects into chunks based on
// these prefixes. It also applies rate limit to chunks so if all segments
// and manifests will have the same prefix we will be able to do around
// 3000-5000 req/sec. AWS doc mentions that having only two prefix
// characters should be enough for most workloads
// (https://aws.amazon.com/blogs/aws/amazon-s3-performance-tips-tricks-seattle-hiring-event/)
// We're using eight because it's free and because AWS S3 is not the only
// backend and other S3 API implementations might benefit from that.

remote_manifest_path generate_partition_manifest_path(
const model::ntp& ntp,
model::initial_revision_id rev,
manifest_format format) {
// NOTE: the idea here is to split all possible hash values into
// 16 bins. Every bin should have lowest 28-bits set to 0.
// As result, for segment names all prefixes are possible, but
// for manifests, only 0x00000000, 0x10000000, ... 0xf0000000
// are used. This will allow us to quickly find all manifests
// that S3 bucket contains.
constexpr uint32_t bitmask = 0xF0000000;
auto path = ssx::sformat("{}_{}", ntp.path(), rev());
uint32_t hash = bitmask & xxhash_32(path.data(), path.size());
return remote_manifest_path(fmt::format(
"{:08x}/meta/{}_{}/manifest.{}", hash, ntp.path(), rev(), [&] {
switch (format) {
case manifest_format::json:
return "json";
case manifest_format::serde:
return "bin";
}
}()));
}

remote_manifest_path partition_manifest::get_manifest_path(
const remote_path_provider& path_provider) const {
return remote_manifest_path{path_provider.partition_manifest_path(*this)};
Expand Down
3 changes: 0 additions & 3 deletions src/v/cloud_storage/partition_manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ remote_segment_path generate_remote_segment_path(
/// Generate correct S3 segment name based on term and base offset
segment_name generate_local_segment_name(model::offset o, model::term_id t);

remote_manifest_path generate_partition_manifest_path(
const model::ntp&, model::initial_revision_id, manifest_format);

// This structure can be impelenented
// to allow access to private fields of the manifest.
struct partition_manifest_accessor;
Expand Down
38 changes: 1 addition & 37 deletions src/v/cloud_storage/spillover_manifest.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,6 @@

namespace cloud_storage {

namespace {

remote_manifest_path generate_spillover_manifest_path(
const model::ntp& ntp,
model::initial_revision_id rev,
const spillover_manifest_path_components& c) {
auto path = generate_partition_manifest_path(
ntp, rev, manifest_format::serde);
// Given the topic name size limit the name should fit into
// the AWS S3 size limit.
return remote_manifest_path(fmt::format(
"{}.{}.{}.{}.{}.{}.{}",
path().string(),
c.base(),
c.last(),
c.base_kafka(),
c.next_kafka(),
c.base_ts.value(),
c.last_ts.value()));
}
} // namespace

/// The section of the partition manifest
///
/// The only purpose of this class is to provide different implementation of the
Expand Down Expand Up @@ -73,21 +51,7 @@ class spillover_manifest final : public partition_manifest {
c.base_ts.value(),
c.last_ts.value());
}
remote_manifest_path get_manifest_path() const {
const auto ls = last_segment();
vassert(ls.has_value(), "Spillover manifest can't be empty");
const auto fs = *begin();
spillover_manifest_path_components smc{
.base = fs.base_offset,
.last = ls->committed_offset,
.base_kafka = fs.base_kafka_offset(),
.next_kafka = ls->next_kafka_offset(),
.base_ts = fs.base_timestamp,
.last_ts = ls->max_timestamp,
};
return generate_spillover_manifest_path(
get_ntp(), get_revision_id(), smc);
}

remote_manifest_path
get_manifest_path(const remote_path_provider& path_provider) const {
return remote_manifest_path{
Expand Down
11 changes: 5 additions & 6 deletions src/v/cloud_storage/tests/anomalies_detector_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,8 @@ class bucket_view_fixture : http_imposter_fixture {
.last_ts = iter->max_timestamp,
};

auto spill_path = cloud_storage::generate_spillover_manifest_path(
_stm_manifest.get_ntp(), _stm_manifest.get_revision_id(), comp);
auto spill_path = cloud_storage::remote_manifest_path{
path_provider.spillover_manifest_path(_stm_manifest, comp)};
BOOST_REQUIRE_EQUAL(
spill_path, spill.get_manifest_path(path_provider));
}
Expand Down Expand Up @@ -608,10 +608,9 @@ FIXTURE_TEST(test_missing_spillover_manifest, bucket_view_fixture) {

const auto& missing_spills = result.detected.missing_spillover_manifests;
BOOST_REQUIRE_EQUAL(missing_spills.size(), 1);
const auto expected_path = cloud_storage::generate_spillover_manifest_path(
first_spill.get_ntp(),
first_spill.get_revision_id(),
*missing_spills.begin());
auto expected_path = cloud_storage::remote_manifest_path{
path_provider.spillover_manifest_path(
first_spill, *missing_spills.begin())};
BOOST_REQUIRE_EQUAL(
first_spill.get_manifest_path(path_provider), expected_path);

Expand Down

0 comments on commit ed82b8a

Please sign in to comment.