Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diagnostics for finalize staged data #2083

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ folly::Future<entity::VariantKey> write(
"Descriptor id mismatch in atom key {} != {}",
stream_id,
segment.descriptor().id());

ARCTICDB_DEBUG(log::version(), "Creating write future key_type {} stream_id {} start_index {} end_index {}");
return async::submit_cpu_task(EncodeAtomTask{
key_type, version_id, stream_id, start_index, end_index, creation_ts,
std::move(segment), codec_, encoding_version_
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ struct WriteSegmentTask : BaseTask {
ARCTICDB_SAMPLE(WriteSegmentTask, 0)
auto k = key_seg.variant_key();
lib_->write(Composite<storage::KeySegmentPair>(std::move(key_seg)));
ARCTICDB_DEBUG(log::version(), "Finished write task key {}", variant_key_view(k));
return k;
}
};
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/stream/segment_aggregator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ template<class Index, class Schema, class SegmentingPolicy = RowCountSegmentPoli
if(convert_int_to_float)
convert_column_types(segment);

ARCTICDB_DEBUG(log::version(), "Adding segment with descriptor {}", segment.descriptor());
ARCTICDB_DEBUG(log::version(), "Adding segment with descriptor {} uncompressed_bytes {}", segment.descriptor(), segment.descriptor().uncompressed_bytes());
segments_.push_back(segment);
slices_.push_back(slice);
if (AggregatorType::segmenting_policy()(AggregatorType::stats())) {
Expand Down
21 changes: 20 additions & 1 deletion cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1009,17 +1009,25 @@ VersionedItem LocalVersionedEngine::compact_incomplete_dynamic(
const StreamId& stream_id,
const std::optional<arcticdb::proto::descriptors::UserDefinedMetadata>& user_meta,
const CompactIncompleteOptions& options) {
log::version().debug("Compacting incomplete symbol {}", stream_id);
log::version().debug("Compacting incomplete symbol {} with options {}", stream_id, options);

auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id);
if (update_info.previous_index_key_) {
ARCTICDB_DEBUG(log::version(), "Found previous version for symbol {}", stream_id);
} else {
ARCTICDB_DEBUG(log::version(), "No previous version found for symbol {}", stream_id);
};
auto pipeline_context = std::make_shared<PipelineContext>();
pipeline_context->stream_id_ = stream_id;
pipeline_context->version_id_ = update_info.next_version_id_;
auto delete_keys_on_failure = get_delete_keys_on_failure(pipeline_context, store(), options);

auto versioned_item = compact_incomplete_impl(store_, stream_id, user_meta, update_info, options, get_write_options(), pipeline_context);
ARCTICDB_DEBUG(log::version(), "Finished compact_incomplete_impl for symbol {}", stream_id);

write_version_and_prune_previous(options.prune_previous_versions_, versioned_item.key_, update_info.previous_index_key_);
ARCTICDB_DEBUG(log::version(), "Finished write_version_and_prune_previous for symbol {}", stream_id);

add_to_symbol_list_on_compaction(stream_id, options, update_info);
if (delete_keys_on_failure)
delete_keys_on_failure->release();
Expand Down Expand Up @@ -1585,15 +1593,26 @@ VersionedItem LocalVersionedEngine::sort_merge_internal(
const StreamId& stream_id,
const std::optional<arcticdb::proto::descriptors::UserDefinedMetadata>& user_meta,
const CompactIncompleteOptions& options) {
log::version().debug("Sort merge for symbol {} with options {}", stream_id, options);

auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id);
if (update_info.previous_index_key_) {
ARCTICDB_DEBUG(log::version(), "Found previous version for symbol {}", stream_id);
} else {
ARCTICDB_DEBUG(log::version(), "No previous version found for symbol {}", stream_id);
};

auto pipeline_context = std::make_shared<PipelineContext>();
pipeline_context->stream_id_ = stream_id;
pipeline_context->version_id_ = update_info.next_version_id_;
auto delete_keys_on_failure = get_delete_keys_on_failure(pipeline_context, store(), options);

auto versioned_item = sort_merge_impl(store_, stream_id, user_meta, update_info, options, get_write_options(), pipeline_context);
ARCTICDB_DEBUG(log::version(), "Finished sort_merge_impl for symbol {}", stream_id);

write_version_and_prune_previous(options.prune_previous_versions_, versioned_item.key_, update_info.previous_index_key_);
ARCTICDB_DEBUG(log::version(), "Finished write_version_and_prune_previous for symbol {}", stream_id);

add_to_symbol_list_on_compaction(stream_id, options, update_info);
if (delete_keys_on_failure)
delete_keys_on_failure->release();
Expand Down
29 changes: 26 additions & 3 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@
pipeline_context->norm_meta_ = std::make_unique<arcticdb::proto::descriptors::NormalizationMetadata>(std::move(*index_segment_reader.mutable_tsd().mutable_proto().mutable_normalization()));
pipeline_context->user_meta_ = std::make_unique<arcticdb::proto::descriptors::UserDefinedMetadata>(std::move(*index_segment_reader.mutable_tsd().mutable_proto().mutable_user_meta()));
pipeline_context->bucketize_dynamic_ = bucketize_dynamic;
ARCTICDB_DEBUG(log::version(), "read_indexed_keys_to_pipeline: Symbol {} Found {} keys with {} total rows", pipeline_context->slice_and_keys_.size(), pipeline_context->total_rows_, version_info.symbol());
}

// Returns true if there are staged segments
Expand All @@ -901,18 +902,24 @@
via_iteration,
false);

if(incomplete_segments.empty())
ARCTICDB_DEBUG(log::version(), "Symbol {}: Found {} incomplete segments", pipeline_context->stream_id_, incomplete_segments.size());
if(incomplete_segments.empty()) {
return false;
}

// In order to have the right normalization metadata and descriptor we need to find the first non-empty segment.
// Picking an empty segment when there are non-empty ones will impact the index type and column namings.
// If all segments are empty we will proceed as if were appending/writing and empty dataframe.
debug::check<ErrorCode::E_ASSERTION_FAILURE>(!incomplete_segments.empty(), "Incomplete segments must be non-empty");
const auto first_non_empty_seg = std::find_if(incomplete_segments.begin(), incomplete_segments.end(), [&](auto& slice){
return slice.segment(store).row_count() > 0;
auto res = slice.segment(store).row_count() > 0;
ARCTICDB_DEBUG(log::version(), "Testing for non-empty seg {} res={}", slice.key(), res);
return res;
});
const auto& seg =
first_non_empty_seg != incomplete_segments.end() ? first_non_empty_seg->segment(store) : incomplete_segments.begin()->segment(store);
ARCTICDB_DEBUG(log::version(), "Symbol {}: First segment has rows {} columns {} uncompressed bytes {} descriptor {}",
pipeline_context->stream_id_, seg.row_count(), seg.columns().size(), seg.descriptor().uncompressed_bytes(), seg.index_descriptor());
// Mark the start point of the incompletes, so we know that there is no column slicing after this point
pipeline_context->incompletes_after_ = pipeline_context->slice_and_keys_.size();

Expand All @@ -929,6 +936,7 @@
}

if (dynamic_schema) {
ARCTICDB_DEBUG(log::version(), "read_incompletes_to_pipeline: Dynamic schema");
pipeline_context->staged_descriptor_ =
merge_descriptors(seg.descriptor(), incomplete_segments, read_query.columns);
if (pipeline_context->desc_) {
Expand All @@ -939,7 +947,15 @@
pipeline_context->desc_ = pipeline_context->staged_descriptor_;
}
} else {
const StreamDescriptor &staged_desc = incomplete_segments[0].segment(store).descriptor();
ARCTICDB_DEBUG(log::version(), "read_incompletes_to_pipeline: Static schema");
auto& first_incomplete_seg = incomplete_segments[0].segment(store);
const StreamDescriptor &staged_desc = first_incomplete_seg.descriptor();
ARCTICDB_DEBUG(log::version(), "Symbol {}: First incomplete segment has rows {} columns {} uncompressed bytes {} descriptor {}",
pipeline_context->stream_id_,
first_incomplete_seg.row_count(),
first_incomplete_seg.columns().size(),
first_incomplete_seg.descriptor().uncompressed_bytes(),
first_incomplete_seg.index_descriptor());
if (pipeline_context->desc_) {
schema::check<ErrorCode::E_DESCRIPTOR_MISMATCH>(
columns_match(staged_desc, *pipeline_context->desc_),
Expand Down Expand Up @@ -1422,6 +1438,8 @@
);
}
}

ARCTICDB_DEBUG(log::version(), "delete_incomplete_keys Symbol {}: Deleting {} keys", pipeline_context.stream_id_, keys_to_delete.size());
store.remove_keys(keys_to_delete).get();
}

Expand Down Expand Up @@ -1548,8 +1566,13 @@
fut_vec.emplace_back(store->write(pk, std::move(segment)));
}};

size_t count = 0;

Check failure on line 1569 in cpp/arcticdb/version/version_core.cpp

View workflow job for this annotation

GitHub Actions / Windows C++ Tests / compile (windows, windows-cl, win_amd64, C:/cpp_build, C:/vcpkg_packages, *.pdb, *.lib, *.ilk, *....

'count': local variable is initialized but not referenced

Check failure on line 1569 in cpp/arcticdb/version/version_core.cpp

View workflow job for this annotation

GitHub Actions / Linux C++ Tests / compile (linux, linux, manylinux_x86_64, /tmp/cpp_build, *.gz, *.so, *.[ao], vcpkg_installed, mon...

unused variable ‘count’ [-Werror=unused-variable]
for(auto& sk : segments) {
SegmentInMemory segment = sk.release_segment(store);

ARCTICDB_DEBUG(log::version(), "sort_merge_impl Symbol {} Segment {}: Segment has rows {} columns {} uncompressed bytes {}",
pipeline_context->stream_id_, count++, segment.row_count(), segment.columns().size(),
segment.descriptor().uncompressed_bytes());
// Empty columns can appear only of one staged segment is empty and adds column which
// does not appear in any other segment. There can also be empty columns if all segments
// are empty in that case this loop won't be reached as segments.size() will be 0
Expand Down
24 changes: 24 additions & 0 deletions cpp/arcticdb/version/version_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@
segment_size.has_value() ? SegmentationPolicy{*segment_size} : SegmentationPolicy{}
};

size_t count = 0;

Check failure on line 297 in cpp/arcticdb/version/version_core.hpp

View workflow job for this annotation

GitHub Actions / Windows C++ Tests / compile (windows, windows-cl, win_amd64, C:/cpp_build, C:/vcpkg_packages, *.pdb, *.lib, *.ilk, *....

'count': local variable is initialized but not referenced

Check failure on line 297 in cpp/arcticdb/version/version_core.hpp

View workflow job for this annotation

GitHub Actions / Linux C++ Tests / compile (linux, linux, manylinux_x86_64, /tmp/cpp_build, *.gz, *.so, *.[ao], vcpkg_installed, mon...

unused variable ‘count’ [-Werror=unused-variable]

Check failure on line 297 in cpp/arcticdb/version/version_core.hpp

View workflow job for this annotation

GitHub Actions / Linux C++ Tests / compile (linux, linux, manylinux_x86_64, /tmp/cpp_build, *.gz, *.so, *.[ao], vcpkg_installed, mon...

unused variable ‘count’ [-Werror=unused-variable]

Check failure on line 297 in cpp/arcticdb/version/version_core.hpp

View workflow job for this annotation

GitHub Actions / Linux C++ Tests / compile (linux, linux, manylinux_x86_64, /tmp/cpp_build, *.gz, *.so, *.[ao], vcpkg_installed, mon...

unused variable ‘count’ [-Werror=unused-variable]

Check failure on line 297 in cpp/arcticdb/version/version_core.hpp

View workflow job for this annotation

GitHub Actions / Linux C++ Tests / compile (linux, linux, manylinux_x86_64, /tmp/cpp_build, *.gz, *.so, *.[ao], vcpkg_installed, mon...

unused variable ‘count’ [-Werror=unused-variable]

Check failure on line 297 in cpp/arcticdb/version/version_core.hpp

View workflow job for this annotation

GitHub Actions / Linux C++ Tests / compile (linux, linux, manylinux_x86_64, /tmp/cpp_build, *.gz, *.so, *.[ao], vcpkg_installed, mon...

unused variable ‘count’ [-Werror=unused-variable]

Check failure on line 297 in cpp/arcticdb/version/version_core.hpp

View workflow job for this annotation

GitHub Actions / Linux C++ Tests / compile (linux, linux, manylinux_x86_64, /tmp/cpp_build, *.gz, *.so, *.[ao], vcpkg_installed, mon...

unused variable ‘count’ [-Werror=unused-variable]

Check failure on line 297 in cpp/arcticdb/version/version_core.hpp

View workflow job for this annotation

GitHub Actions / Linux C++ Tests / compile (linux, linux, manylinux_x86_64, /tmp/cpp_build, *.gz, *.so, *.[ao], vcpkg_installed, mon...

unused variable ‘count’ [-Werror=unused-variable]

Check failure on line 297 in cpp/arcticdb/version/version_core.hpp

View workflow job for this annotation

GitHub Actions / Linux C++ Tests / compile (linux, linux, manylinux_x86_64, /tmp/cpp_build, *.gz, *.so, *.[ao], vcpkg_installed, mon...

unused variable ‘count’ [-Werror=unused-variable]

Check failure on line 297 in cpp/arcticdb/version/version_core.hpp

View workflow job for this annotation

GitHub Actions / Linux C++ Tests / compile (linux, linux, manylinux_x86_64, /tmp/cpp_build, *.gz, *.so, *.[ao], vcpkg_installed, mon...

unused variable ‘count’ [-Werror=unused-variable]
for (auto it = to_compact_start; it != to_compact_end; ++it) {
auto sk = [&it]() {
if constexpr (std::is_same_v<IteratorType, pipelines::PipelineContext::iterator>)
Expand All @@ -306,6 +307,8 @@
}

const SegmentInMemory& segment = sk.segment(store);
ARCTICDB_DEBUG(log::version(), "do_compact Symbol {} Segment {}: Segment has rows {} columns {} uncompressed bytes {}",
pipeline_context->stream_id_, count++, segment.row_count(), segment.columns().size(), segment.descriptor().uncompressed_bytes());

if(validate_index && is_segment_unsorted(segment)) {
auto written_keys = folly::collect(write_futures).get();
Expand Down Expand Up @@ -338,5 +341,26 @@

}

namespace fmt {
template<>
struct formatter<arcticdb::version_store::CompactIncompleteOptions> {
template<typename ParseContext>
constexpr auto parse(ParseContext &ctx) { return ctx.begin(); }

template<typename FormatContext>
auto format(const arcticdb::version_store::CompactIncompleteOptions &opts, FormatContext &ctx) const {
return fmt::format_to(ctx.out(), "CompactIncompleteOptions append={} convert_int_to_float={}, deleted_staged_data_on_failure={}, "
"prune_previous_versions={}, sparsify={}, validate_index={}, via_iteration={}",
opts.append_,
opts.convert_int_to_float_,
opts.delete_staged_data_on_failure_,
opts.prune_previous_versions_,
opts.sparsify_,
opts.validate_index_,
opts.via_iteration_);
}
};
}

#define ARCTICDB_VERSION_CORE_H_
#include <arcticdb/version/version_core-inl.hpp>
Loading
Loading