Skip to content

Commit c14b146

Browse files
committed
Add batch APIs for configurable strings
- Creates a new `BatchReadOptions` which separates batch level options and per symbol level options. - Exposes APIs both on V1 and V2 for `read_batch`, `read_batch_and_join` and tests for them
1 parent 13e91a3 commit c14b146

File tree

10 files changed

+366
-48
lines changed

10 files changed

+366
-48
lines changed

cpp/arcticdb/pipeline/read_options.hpp

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#include <arcticdb/entity/output_format.hpp>
1212
#include <arcticdb/util/optional_defaults.hpp>
13+
#include <arcticdb/util/variant.hpp>
1314
#include <arcticdb/arrow/arrow_output_options.hpp>
1415

1516
namespace arcticdb {
@@ -22,7 +23,6 @@ struct ReadOptionsData {
2223
std::optional<bool> allow_sparse_;
2324
std::optional<bool> set_tz_;
2425
std::optional<bool> optimise_string_memory_;
25-
std::optional<bool> batch_throw_on_error_;
2626
OutputFormat output_format_ = OutputFormat::PANDAS;
2727
ArrowOutputConfig arrow_output_config_ = ArrowOutputConfig{};
2828
};
@@ -60,10 +60,6 @@ struct ReadOptions {
6060

6161
[[nodiscard]] const std::optional<bool>& incompletes() const { return data_->incompletes_; }
6262

63-
[[nodiscard]] const std::optional<bool>& batch_throw_on_error() const { return data_->batch_throw_on_error_; }
64-
65-
void set_batch_throw_on_error(bool batch_throw_on_error) { data_->batch_throw_on_error_ = batch_throw_on_error; }
66-
6763
void set_output_format(OutputFormat output_format) { data_->output_format_ = output_format; }
6864

6965
[[nodiscard]] OutputFormat output_format() const { return data_->output_format_; }
@@ -99,4 +95,39 @@ struct ReadOptions {
9995

10096
[[nodiscard]] ReadOptions clone() const { return ReadOptions(std::make_shared<ReadOptionsData>(*data_)); }
10197
};
98+
99+
using ReadOptionsPerSymbol = std::variant<ReadOptions, std::vector<ReadOptions>>;
100+
101+
struct BatchReadOptionsData {
102+
ReadOptionsPerSymbol read_options_per_symbol_;
103+
std::optional<bool> batch_throw_on_error_;
104+
OutputFormat output_format_ = OutputFormat::PANDAS;
105+
};
106+
107+
struct BatchReadOptions {
108+
std::shared_ptr<BatchReadOptionsData> data_ = std::make_shared<BatchReadOptionsData>();
109+
110+
void set_read_options(const ReadOptions& read_options) { data_->read_options_per_symbol_ = read_options; }
111+
112+
void set_read_options_per_symbol(const std::vector<ReadOptions>& read_options_per_symbol) {
113+
data_->read_options_per_symbol_ = read_options_per_symbol;
114+
}
115+
116+
[[nodiscard]] ReadOptions at(size_t idx) const {
117+
return util::variant_match(
118+
data_->read_options_per_symbol_,
119+
[&](std::vector<ReadOptions> read_options) { return read_options.at(idx); },
120+
[&](ReadOptions read_options) { return read_options; }
121+
);
122+
}
123+
124+
void set_batch_throw_on_error(bool batch_throw_on_error) { data_->batch_throw_on_error_ = batch_throw_on_error; }
125+
126+
[[nodiscard]] const std::optional<bool>& batch_throw_on_error() const { return data_->batch_throw_on_error_; }
127+
128+
void set_output_format(OutputFormat output_format) { data_->output_format_ = output_format; }
129+
130+
[[nodiscard]] OutputFormat output_format() const { return data_->output_format_; }
131+
};
132+
102133
} // namespace arcticdb

cpp/arcticdb/version/local_versioned_engine.cpp

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -491,11 +491,11 @@ DescriptorItem LocalVersionedEngine::read_descriptor_internal(
491491

492492
std::vector<std::variant<DescriptorItem, DataError>> LocalVersionedEngine::batch_read_descriptor_internal(
493493
const std::vector<StreamId>& stream_ids, const std::vector<VersionQuery>& version_queries,
494-
const ReadOptions& read_options
494+
const BatchReadOptions& batch_read_options
495495
) {
496496

497497
internal::check<ErrorCode::E_ASSERTION_FAILURE>(
498-
read_options.batch_throw_on_error().has_value(),
498+
batch_read_options.batch_throw_on_error().has_value(),
499499
"ReadOptions::batch_throw_on_error_ should always be set here"
500500
);
501501

@@ -508,7 +508,7 @@ std::vector<std::variant<DescriptorItem, DataError>> LocalVersionedEngine::batch
508508
}
509509
auto descriptors = folly::collectAll(descriptor_futures).get();
510510
TransformBatchResultsFlags flags;
511-
flags.throw_on_error_ = *read_options.batch_throw_on_error();
511+
flags.throw_on_error_ = *batch_read_options.batch_throw_on_error();
512512
return transform_batch_items_or_throw(std::move(descriptors), stream_ids, flags, version_queries);
513513
}
514514

@@ -1242,12 +1242,16 @@ std::vector<ReadVersionOutput> LocalVersionedEngine::batch_read_keys(
12421242

12431243
std::vector<std::variant<ReadVersionOutput, DataError>> LocalVersionedEngine::batch_read_internal(
12441244
const std::vector<StreamId>& stream_ids, const std::vector<VersionQuery>& version_queries,
1245-
std::vector<std::shared_ptr<ReadQuery>>& read_queries, const ReadOptions& read_options, std::any& handler_data
1245+
std::vector<std::shared_ptr<ReadQuery>>& read_queries, const BatchReadOptions& batch_read_options,
1246+
std::any& handler_data
12461247
) {
12471248
py::gil_scoped_release release_gil;
1249+
if (stream_ids.empty()) {
1250+
return {};
1251+
}
12481252
// This read option should always be set when calling batch_read
12491253
internal::check<ErrorCode::E_ASSERTION_FAILURE>(
1250-
read_options.batch_throw_on_error().has_value(),
1254+
batch_read_options.batch_throw_on_error().has_value(),
12511255
"ReadOptions::batch_throw_on_error_ should always be set here"
12521256
);
12531257
auto opt_index_key_futs = batch_get_versions_async(store(), version_map(), stream_ids, version_queries);
@@ -1267,7 +1271,7 @@ std::vector<std::variant<ReadVersionOutput, DataError>> LocalVersionedEngine::ba
12671271
&version_queries,
12681272
read_query =
12691273
read_queries.empty() ? std::make_shared<ReadQuery>() : read_queries[idx],
1270-
&read_options,
1274+
read_options = batch_read_options.at(idx),
12711275
&handler_data](auto&& opt_index_key) {
12721276
auto version_info = get_version_identifier(
12731277
stream_ids[idx],
@@ -1303,7 +1307,7 @@ std::vector<std::variant<ReadVersionOutput, DataError>> LocalVersionedEngine::ba
13031307

13041308
TransformBatchResultsFlags flags;
13051309
flags.convert_no_data_found_to_key_not_found_ = true;
1306-
flags.throw_on_error_ = *read_options.batch_throw_on_error();
1310+
flags.throw_on_error_ = *batch_read_options.batch_throw_on_error();
13071311
return transform_batch_items_or_throw(std::move(all_results), stream_ids, flags, version_queries);
13081312
}
13091313

@@ -2033,11 +2037,11 @@ folly::Future<std::pair<VariantKey, std::optional<google::protobuf::Any>>> Local
20332037
std::vector<std::variant<std::pair<VariantKey, std::optional<google::protobuf::Any>>, DataError>> LocalVersionedEngine::
20342038
batch_read_metadata_internal(
20352039
const std::vector<StreamId>& stream_ids, const std::vector<VersionQuery>& version_queries,
2036-
const ReadOptions& read_options
2040+
const BatchReadOptions& batch_read_options
20372041
) {
20382042
// This read option should always be set when calling batch_read_metadata
20392043
internal::check<ErrorCode::E_ASSERTION_FAILURE>(
2040-
read_options.batch_throw_on_error().has_value(),
2044+
batch_read_options.batch_throw_on_error().has_value(),
20412045
"ReadOptions::batch_throw_on_error_ should always be set here"
20422046
);
20432047
auto opt_index_key_futs = batch_get_versions_async(store(), version_map(), stream_ids, version_queries);
@@ -2052,7 +2056,7 @@ std::vector<std::variant<std::pair<VariantKey, std::optional<google::protobuf::A
20522056
// For legacy reason read_metadata_batch is not throwing if the symbol is missing
20532057
TransformBatchResultsFlags flags;
20542058
flags.throw_on_missing_symbol_ = false;
2055-
flags.throw_on_error_ = *read_options.batch_throw_on_error();
2059+
flags.throw_on_error_ = *batch_read_options.batch_throw_on_error();
20562060
return transform_batch_items_or_throw(std::move(metadatas), stream_ids, flags, version_queries);
20572061
}
20582062

cpp/arcticdb/version/local_versioned_engine.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ class LocalVersionedEngine : public VersionedEngine {
234234

235235
std::vector<std::variant<ReadVersionOutput, DataError>> batch_read_internal(
236236
const std::vector<StreamId>& stream_ids, const std::vector<VersionQuery>& version_queries,
237-
std::vector<std::shared_ptr<ReadQuery>>& read_queries, const ReadOptions& read_options,
237+
std::vector<std::shared_ptr<ReadQuery>>& read_queries, const BatchReadOptions& batch_read_options,
238238
std::any& handler_data
239239
);
240240

@@ -247,7 +247,7 @@ class LocalVersionedEngine : public VersionedEngine {
247247

248248
std::vector<std::variant<DescriptorItem, DataError>> batch_read_descriptor_internal(
249249
const std::vector<StreamId>& stream_ids, const std::vector<VersionQuery>& version_queries,
250-
const ReadOptions& read_options
250+
const BatchReadOptions& batch_read_options
251251
);
252252

253253
std::vector<std::pair<VersionedItem, TimeseriesDescriptor>> batch_restore_version_internal(
@@ -263,7 +263,7 @@ class LocalVersionedEngine : public VersionedEngine {
263263
std::vector<std::variant<std::pair<VariantKey, std::optional<google::protobuf::Any>>, DataError>>
264264
batch_read_metadata_internal(
265265
const std::vector<StreamId>& stream_ids, const std::vector<VersionQuery>& version_queries,
266-
const ReadOptions& read_options
266+
const BatchReadOptions& batch_read_options
267267
);
268268

269269
std::pair<std::optional<VariantKey>, std::optional<google::protobuf::Any>> read_metadata_internal(

cpp/arcticdb/version/python_bindings.cpp

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -254,13 +254,20 @@ void register_bindings(py::module& version, py::exception<arcticdb::ArcticExcept
254254
.def("set_incompletes", &ReadOptions::set_incompletes)
255255
.def("set_set_tz", &ReadOptions::set_set_tz)
256256
.def("set_optimise_string_memory", &ReadOptions::set_optimise_string_memory)
257-
.def("set_batch_throw_on_error", &ReadOptions::set_batch_throw_on_error)
258257
.def("set_output_format", &ReadOptions::set_output_format)
259258
.def("set_arrow_output_default_string_format", &ReadOptions::set_arrow_output_default_string_format)
260259
.def("set_arrow_output_per_column_string_format", &ReadOptions::set_arrow_output_per_column_string_format)
261260
.def_property_readonly("incompletes", &ReadOptions::get_incompletes)
262261
.def_property_readonly("output_format", &ReadOptions::output_format);
263262

263+
py::class_<BatchReadOptions>(version, "PythonVersionStoreBatchReadOptions")
264+
.def(py::init())
265+
.def("set_read_options", &BatchReadOptions::set_read_options)
266+
.def("set_read_options_per_symbol", &BatchReadOptions::set_read_options_per_symbol)
267+
.def("set_output_format", &BatchReadOptions::set_output_format)
268+
.def("set_batch_throw_on_error", &BatchReadOptions::set_batch_throw_on_error)
269+
.def("at", &BatchReadOptions::at);
270+
264271
version.def("write_dataframe_to_file", &write_dataframe_to_file);
265272
version.def(
266273
"read_dataframe_from_file",
@@ -961,11 +968,13 @@ void register_bindings(py::module& version, py::exception<arcticdb::ArcticExcept
961968
const std::vector<StreamId>& stream_ids,
962969
const std::vector<VersionQuery>& version_queries,
963970
std::vector<std::shared_ptr<ReadQuery>>& read_queries,
964-
const ReadOptions& read_options) {
971+
const BatchReadOptions& batch_read_options) {
965972
auto handler_data =
966-
TypeHandlerRegistry::instance()->get_handler_data(read_options.output_format());
973+
TypeHandlerRegistry::instance()->get_handler_data(batch_read_options.output_format());
967974
return python_util::adapt_read_dfs(
968-
v.batch_read(stream_ids, version_queries, read_queries, read_options, handler_data),
975+
v.batch_read(
976+
stream_ids, version_queries, read_queries, batch_read_options, handler_data
977+
),
969978
&handler_data
970979
);
971980
},

cpp/arcticdb/version/version_store_api.cpp

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -899,19 +899,20 @@ std::unordered_map<VersionId, bool> PythonVersionStore::get_all_tombstoned_versi
899899

900900
std::vector<std::variant<ReadResult, DataError>> PythonVersionStore::batch_read(
901901
const std::vector<StreamId>& stream_ids, const std::vector<VersionQuery>& version_queries,
902-
std::vector<std::shared_ptr<ReadQuery>>& read_queries, const ReadOptions& read_options, std::any& handler_data
902+
std::vector<std::shared_ptr<ReadQuery>>& read_queries, const BatchReadOptions& batch_read_options,
903+
std::any& handler_data
903904
) {
904905

905906
auto read_versions_or_errors =
906-
batch_read_internal(stream_ids, version_queries, read_queries, read_options, handler_data);
907+
batch_read_internal(stream_ids, version_queries, read_queries, batch_read_options, handler_data);
907908
std::vector<std::variant<ReadResult, DataError>> res;
908909
for (auto&& [idx, read_version_or_error] : folly::enumerate(read_versions_or_errors)) {
909910
util::variant_match(
910911
read_version_or_error,
911-
[&res, &read_options](ReadVersionOutput& read_version) {
912+
[&res, &batch_read_options](ReadVersionOutput& read_version) {
912913
res.emplace_back(create_python_read_result(
913914
read_version.versioned_item_,
914-
read_options.output_format(),
915+
batch_read_options.output_format(),
915916
std::move(read_version.frame_and_descriptor_)
916917
));
917918
},
@@ -1282,10 +1283,10 @@ std::vector<std::pair<VersionedItem, TimeseriesDescriptor>> PythonVersionStore::
12821283

12831284
std::vector<std::variant<std::pair<VersionedItem, py::object>, DataError>> PythonVersionStore::batch_read_metadata(
12841285
const std::vector<StreamId>& stream_ids, const std::vector<VersionQuery>& version_queries,
1285-
const ReadOptions& read_options
1286+
const BatchReadOptions& batch_read_options
12861287
) {
12871288
ARCTICDB_SAMPLE(BatchReadMetadata, 0)
1288-
auto metadatas_or_errors = batch_read_metadata_internal(stream_ids, version_queries, read_options);
1289+
auto metadatas_or_errors = batch_read_metadata_internal(stream_ids, version_queries, batch_read_options);
12891290

12901291
std::vector<std::variant<std::pair<VersionedItem, py::object>, DataError>> results;
12911292
for (auto& metadata_or_error : metadatas_or_errors) {
@@ -1311,10 +1312,10 @@ DescriptorItem PythonVersionStore::read_descriptor(const StreamId& stream_id, co
13111312

13121313
std::vector<std::variant<DescriptorItem, DataError>> PythonVersionStore::batch_read_descriptor(
13131314
const std::vector<StreamId>& stream_ids, const std::vector<VersionQuery>& version_queries,
1314-
const ReadOptions& read_options
1315+
const BatchReadOptions& batch_read_options
13151316
) {
13161317

1317-
return batch_read_descriptor_internal(stream_ids, version_queries, read_options);
1318+
return batch_read_descriptor_internal(stream_ids, version_queries, batch_read_options);
13181319
}
13191320

13201321
ReadResult PythonVersionStore::read_index(

cpp/arcticdb/version/version_store_api.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ class PythonVersionStore : public LocalVersionedEngine {
134134

135135
std::vector<std::variant<DescriptorItem, DataError>> batch_read_descriptor(
136136
const std::vector<StreamId>& stream_ids, const std::vector<VersionQuery>& version_queries,
137-
const ReadOptions& read_options
137+
const BatchReadOptions& batch_read_options
138138
);
139139

140140
DescriptorItem read_descriptor(const StreamId& stream_id, const VersionQuery& version_query);
@@ -224,7 +224,7 @@ class PythonVersionStore : public LocalVersionedEngine {
224224

225225
std::vector<std::variant<ReadResult, DataError>> batch_read(
226226
const std::vector<StreamId>& stream_ids, const std::vector<VersionQuery>& version_queries,
227-
std::vector<std::shared_ptr<ReadQuery>>& read_queries, const ReadOptions& read_options,
227+
std::vector<std::shared_ptr<ReadQuery>>& read_queries, const BatchReadOptions& batch_read_options,
228228
std::any& handler_data
229229
);
230230

@@ -243,7 +243,7 @@ class PythonVersionStore : public LocalVersionedEngine {
243243

244244
std::vector<std::variant<std::pair<VersionedItem, py::object>, DataError>> batch_read_metadata(
245245
const std::vector<StreamId>& stream_ids, const std::vector<VersionQuery>& version_queries,
246-
const ReadOptions& read_options
246+
const BatchReadOptions& batch_read_options
247247
);
248248

249249
std::set<StreamId> list_streams(

0 commit comments

Comments
 (0)