Skip to content

Commit a7c4926

Browse files
committed
Address PR comments
1 parent 1c94f45 commit a7c4926

File tree

9 files changed

+269
-224
lines changed

9 files changed

+269
-224
lines changed

cpp/arcticdb/entity/read_result.hpp

Lines changed: 46 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
#include <arcticdb/util/memory_tracing.hpp>
1818
#include <arcticdb/arrow/arrow_output_frame.hpp>
1919
#include <arcticdb/arrow/arrow_utils.hpp>
20-
#include <arcticdb/version/version_core.hpp>
2120

2221
#include <vector>
2322

@@ -27,13 +26,13 @@ using OutputFrame = std::variant<pipelines::PandasOutputFrame, ArrowOutputFrame>
2726

2827
struct ARCTICDB_VISIBILITY_HIDDEN NodeReadResult {
2928
NodeReadResult(
30-
const std::string& symbol, OutputFrame&& frame_data,
31-
const arcticdb::proto::descriptors::NormalizationMetadata& norm_meta
29+
const StreamId& symbol, OutputFrame&& frame_data,
30+
arcticdb::proto::descriptors::NormalizationMetadata&& norm_meta
3231
) :
3332
symbol_(symbol),
3433
frame_data_(std::move(frame_data)),
35-
norm_meta_(norm_meta) {};
36-
std::string symbol_;
34+
norm_meta_(std::move(norm_meta)) {};
35+
StreamId symbol_;
3736
OutputFrame frame_data_;
3837
arcticdb::proto::descriptors::NormalizationMetadata norm_meta_;
3938

@@ -71,76 +70,48 @@ struct ARCTICDB_VISIBILITY_HIDDEN ReadResult {
7170
ARCTICDB_MOVE_ONLY_DEFAULT(ReadResult)
7271
};
7372

74-
inline ReadResult create_python_read_result(
75-
const std::variant<VersionedItem, std::vector<VersionedItem>>& version, OutputFormat output_format,
76-
FrameAndDescriptor&& fd,
77-
std::optional<std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>>&& user_meta = std::nullopt,
78-
std::vector<version_store::ReadVersionOutput>&& node_outputs = {}
79-
) {
80-
auto result = std::move(fd);
81-
82-
// If version is a vector then this was a multi-symbol join, so the user_meta vector should have a value
83-
// Otherwise, there is a single piece of metadata on the frame descriptor
84-
util::check(
85-
std::holds_alternative<VersionedItem>(version) ^ user_meta.has_value(),
86-
"Unexpected argument combination to create_python_read_result"
87-
);
88-
89-
// Very old (pre Nov-2020) PandasIndex protobuf messages had no "start" or "step" fields. If is_physically_stored
90-
// (renamed from is_not_range_index) was false, the index was always RangeIndex(num_rows, 1)
91-
// This used to be handled in the Python layer by passing None to the DataFrame index parameter, which would then
92-
// default to RangeIndex(num_rows, 1). However, the empty index also has is_physically_stored as false, and because
93-
// integer protobuf fields default to zero if they are not present on the wire, it is impossible to tell from
94-
// the normalization metadata alone if the data was written with an empty index, or with a very old range index.
95-
// We therefore patch the normalization metadata here in this case
96-
auto norm_meta = result.desc_.mutable_proto().mutable_normalization();
97-
if (norm_meta->has_df() || norm_meta->has_series()) {
98-
auto common = norm_meta->has_df() ? norm_meta->mutable_df()->mutable_common()
99-
: norm_meta->mutable_series()->mutable_common();
100-
if (common->has_index()) {
101-
auto index = common->mutable_index();
102-
if (result.desc_.index().type() == IndexDescriptor::Type::ROWCOUNT && !index->is_physically_stored() &&
103-
index->start() == 0 && index->step() == 0) {
104-
index->set_step(1);
105-
}
106-
}
107-
}
108-
109-
auto get_python_frame = [output_format](auto& result) -> OutputFrame {
110-
if (output_format == OutputFormat::ARROW) {
111-
return ArrowOutputFrame{segment_to_arrow_data(result.frame_)};
112-
} else {
113-
return pipelines::PandasOutputFrame{result.frame_};
114-
}
115-
};
116-
auto python_frame = get_python_frame(result);
117-
util::print_total_mem_usage(__FILE__, __LINE__, __FUNCTION__);
118-
119-
const auto& desc_proto = result.desc_.proto();
120-
std::variant<
121-
arcticdb::proto::descriptors::UserDefinedMetadata,
122-
std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>>
123-
metadata;
124-
if (user_meta.has_value()) {
125-
metadata = std::move(*user_meta);
126-
} else {
127-
metadata = std::move(desc_proto.user_meta());
128-
}
73+
namespace version_store {
12974

130-
std::vector<NodeReadResult> node_results;
131-
for (auto& node_output : node_outputs) {
132-
auto& node_fd = node_output.frame_and_descriptor_;
133-
auto node_python_frame = get_python_frame(node_fd);
134-
auto node_metadata = node_fd.desc_.proto().normalization();
135-
node_results.emplace_back(node_output.versioned_item_.symbol(), std::move(node_python_frame), node_metadata);
136-
}
137-
return {version,
138-
std::move(python_frame),
139-
output_format,
140-
desc_proto.normalization(),
141-
metadata,
142-
desc_proto.multi_key_meta(),
143-
std::move(node_results)};
144-
}
75+
struct SymbolProcessingResult {
76+
VersionedItem versioned_item_;
77+
proto::descriptors::UserDefinedMetadata metadata_;
78+
OutputSchema output_schema_;
79+
std::vector<EntityId> entity_ids_;
80+
};
81+
82+
struct ReadVersionOutput {
83+
ReadVersionOutput() = delete;
84+
ReadVersionOutput(VersionedItem&& versioned_item, FrameAndDescriptor&& frame_and_descriptor) :
85+
versioned_item_(std::move(versioned_item)),
86+
frame_and_descriptor_(std::move(frame_and_descriptor)) {}
87+
88+
ARCTICDB_MOVE_ONLY_DEFAULT(ReadVersionOutput)
89+
90+
VersionedItem versioned_item_;
91+
FrameAndDescriptor frame_and_descriptor_;
92+
};
93+
94+
struct ReadVersionWithNodesOutput {
95+
ReadVersionOutput root_;
96+
std::vector<ReadVersionOutput> nodes_;
97+
};
98+
99+
struct MultiSymbolReadOutput {
100+
MultiSymbolReadOutput() = delete;
101+
MultiSymbolReadOutput(
102+
std::vector<VersionedItem>&& versioned_items,
103+
std::vector<proto::descriptors::UserDefinedMetadata>&& metadatas, FrameAndDescriptor&& frame_and_descriptor
104+
) :
105+
versioned_items_(std::move(versioned_items)),
106+
metadatas_(std::move(metadatas)),
107+
frame_and_descriptor_(std::move(frame_and_descriptor)) {}
108+
109+
ARCTICDB_MOVE_ONLY_DEFAULT(MultiSymbolReadOutput)
110+
111+
std::vector<VersionedItem> versioned_items_;
112+
std::vector<proto::descriptors::UserDefinedMetadata> metadatas_;
113+
FrameAndDescriptor frame_and_descriptor_;
114+
};
115+
} // namespace version_store
145116

146117
} // namespace arcticdb

cpp/arcticdb/pipeline/pipeline_utils.hpp

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,80 @@ inline void apply_type_handlers(SegmentInMemory seg, std::any& handler_data, Out
4747
}
4848
}
4949

50+
inline ReadResult create_python_read_result(
51+
const std::variant<VersionedItem, std::vector<VersionedItem>>& version, OutputFormat output_format,
52+
FrameAndDescriptor&& fd,
53+
std::optional<std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>>&& user_meta = std::nullopt,
54+
std::vector<version_store::ReadVersionOutput>&& node_outputs = {}
55+
) {
56+
auto result = std::move(fd);
57+
58+
// If version is a vector then this was a multi-symbol join, so the user_meta vector should have a value
59+
// Otherwise, there is a single piece of metadata on the frame descriptor
60+
util::check(
61+
std::holds_alternative<VersionedItem>(version) ^ user_meta.has_value(),
62+
"Unexpected argument combination to create_python_read_result"
63+
);
64+
65+
// Very old (pre Nov-2020) PandasIndex protobuf messages had no "start" or "step" fields. If is_physically_stored
66+
// (renamed from is_not_range_index) was false, the index was always RangeIndex(num_rows, 1)
67+
// This used to be handled in the Python layer by passing None to the DataFrame index parameter, which would then
68+
// default to RangeIndex(num_rows, 1). However, the empty index also has is_physically_stored as false, and because
69+
// integer protobuf fields default to zero if they are not present on the wire, it is impossible to tell from
70+
// the normalization metadata alone if the data was written with an empty index, or with a very old range index.
71+
// We therefore patch the normalization metadata here in this case
72+
auto norm_meta = result.desc_.mutable_proto().mutable_normalization();
73+
if (norm_meta->has_df() || norm_meta->has_series()) {
74+
auto common = norm_meta->has_df() ? norm_meta->mutable_df()->mutable_common()
75+
: norm_meta->mutable_series()->mutable_common();
76+
if (common->has_index()) {
77+
auto index = common->mutable_index();
78+
if (result.desc_.index().type() == IndexDescriptor::Type::ROWCOUNT && !index->is_physically_stored() &&
79+
index->start() == 0 && index->step() == 0) {
80+
index->set_step(1);
81+
}
82+
}
83+
}
84+
85+
auto get_python_frame = [output_format](auto& result) -> OutputFrame {
86+
if (output_format == OutputFormat::ARROW) {
87+
return ArrowOutputFrame{segment_to_arrow_data(result.frame_)};
88+
} else {
89+
return pipelines::PandasOutputFrame{result.frame_};
90+
}
91+
};
92+
auto python_frame = get_python_frame(result);
93+
util::print_total_mem_usage(__FILE__, __LINE__, __FUNCTION__);
94+
95+
const auto& desc_proto = result.desc_.proto();
96+
std::variant<
97+
arcticdb::proto::descriptors::UserDefinedMetadata,
98+
std::vector<arcticdb::proto::descriptors::UserDefinedMetadata>>
99+
metadata;
100+
if (user_meta.has_value()) {
101+
metadata = std::move(*user_meta);
102+
} else {
103+
metadata = std::move(desc_proto.user_meta());
104+
}
105+
106+
std::vector<NodeReadResult> node_results;
107+
for (auto& node_output : node_outputs) {
108+
auto& node_fd = node_output.frame_and_descriptor_;
109+
auto node_python_frame = get_python_frame(node_fd);
110+
auto node_metadata = node_fd.desc_.proto().normalization();
111+
node_results.emplace_back(
112+
node_output.versioned_item_.symbol(), std::move(node_python_frame), std::move(node_metadata)
113+
);
114+
}
115+
return {version,
116+
std::move(python_frame),
117+
output_format,
118+
desc_proto.normalization(),
119+
metadata,
120+
desc_proto.multi_key_meta(),
121+
std::move(node_results)};
122+
}
123+
50124
inline ReadResult read_result_from_single_frame(
51125
FrameAndDescriptor& frame_and_desc, const AtomKey& key, std::any& handler_data, OutputFormat output_format
52126
) {

cpp/arcticdb/version/local_versioned_engine.cpp

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -424,12 +424,18 @@ ReadVersionWithNodesOutput LocalVersionedEngine::read_dataframe_version_internal
424424
std::vector<folly::Future<ReadVersionOutput>> node_futures;
425425
node_futures.reserve(keys.size());
426426
for (const auto& key : keys) {
427-
node_futures.emplace_back(
428-
read_frame_for_version_async_get_context(store(), key, read_query, read_options, handler_data)
429-
);
427+
node_futures.emplace_back(read_frame_for_version(store(), key, read_query, read_options, handler_data));
430428
}
431-
432-
return {std::move(root_result), folly::collect(node_futures).get()};
429+
auto node_trys = folly::collectAll(node_futures).get();
430+
std::vector<ReadVersionOutput> node_results;
431+
node_results.reserve(node_trys.size());
432+
std::transform(
433+
std::make_move_iterator(node_trys.begin()),
434+
std::make_move_iterator(node_trys.end()),
435+
std::back_inserter(node_results),
436+
[](auto&& try_result) { return std::move(try_result).value(); }
437+
);
438+
return {std::move(root_result), std::move(node_results)};
433439
}
434440
}
435441

@@ -1286,22 +1292,26 @@ std::vector<std::variant<ReadVersionWithNodesOutput, DataError>> LocalVersionedE
12861292
&handler_data](ReadVersionOutput&& result) {
12871293
auto& keys = result.frame_and_descriptor_.keys_;
12881294
if (keys.empty()) {
1289-
return folly::makeFuture<ReadVersionWithNodesOutput>(
1290-
ReadVersionWithNodesOutput{std::move(result), {}}
1291-
);
1295+
return folly::makeFuture(ReadVersionWithNodesOutput{std::move(result), {}});
12921296
} else {
12931297
std::vector<folly::Future<ReadVersionOutput>> node_futures;
12941298
node_futures.reserve(keys.size());
12951299
for (const auto& key : keys) {
1296-
node_futures.emplace_back(read_frame_for_version_async_get_context(
1297-
store(), key, read_query, read_options, handler_data
1298-
));
1300+
node_futures.emplace_back(
1301+
read_frame_for_version(store(), key, read_query, read_options, handler_data)
1302+
);
12991303
}
1300-
return folly::collect(std::move(node_futures))
1304+
return folly::collectAll(std::move(node_futures))
13011305
.via(&async::cpu_executor())
1302-
.thenValue([result = std::move(result)](std::vector<ReadVersionOutput>&& nodes
1303-
) mutable {
1304-
return ReadVersionWithNodesOutput{std::move(result), std::move(nodes)};
1306+
.thenValue([result = std::move(result)](auto&& node_tries) mutable {
1307+
std::vector<ReadVersionOutput> node_results;
1308+
node_results.reserve(node_tries.size());
1309+
for (auto& try_result : node_tries) {
1310+
node_results.push_back(std::move(try_result).value());
1311+
}
1312+
return ReadVersionWithNodesOutput{
1313+
std::move(result), std::move(node_results)
1314+
};
13051315
});
13061316
}
13071317
})

0 commit comments

Comments
 (0)