Skip to content

Commit

Permalink
default use large_utf8 in arrow to avoid 2G size limit (#136)
Browse files Browse the repository at this point in the history
  • Loading branch information
jingshi-ant authored Aug 11, 2023
1 parent 7696715 commit f19263e
Show file tree
Hide file tree
Showing 26 changed files with 75 additions and 73 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- Speed up GROUP BY with HEU in some scenarios.
- Optimized to support billion-level PSI scenarios.

### Fixed

Expand Down
4 changes: 2 additions & 2 deletions engine/core/string_tensor_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace scql::engine {
/// @brief Builder for UTF8 strings tensor
class StringTensorBuilder : public TensorBuilder {
public:
StringTensorBuilder() : TensorBuilder(arrow::utf8()) {}
StringTensorBuilder() : TensorBuilder(arrow::large_utf8()) {}
~StringTensorBuilder() = default;

void AppendNull() override;
Expand All @@ -34,7 +34,7 @@ class StringTensorBuilder : public TensorBuilder {
private:
arrow::ArrayBuilder* GetBaseBuilder() override { return &builder_; }

arrow::StringBuilder builder_;
arrow::LargeStringBuilder builder_;
};

} // namespace scql::engine
3 changes: 2 additions & 1 deletion engine/core/type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pb::PrimitiveDataType FromArrowDataType(
ty = pb::PrimitiveDataType::FLOAT64;
break;
case arrow::Type::STRING:
case arrow::Type::LARGE_STRING:
ty = pb::PrimitiveDataType::STRING;
break;
default:
Expand Down Expand Up @@ -78,7 +79,7 @@ std::shared_ptr<arrow::DataType> ToArrowDataType(pb::PrimitiveDataType dtype) {
dt = arrow::float64();
break;
case pb::PrimitiveDataType::STRING:
dt = arrow::utf8();
dt = arrow::large_utf8();
break;
default:
dt = nullptr;
Expand Down
15 changes: 8 additions & 7 deletions engine/datasource/csvdb_adaptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,19 @@ void ConvertDuckResultToTensors(std::unique_ptr<duckdb::QueryResult> result,
for (int i = 0; i < table->num_columns(); ++i) {
auto chunked_arr = table->column(i);
YACL_ENFORCE(chunked_arr, "get column(idx={}) from table failed", i);
if (FromArrowDataType(chunked_arr->type()) != expected_outputs[i].dtype) {
if (FromArrowDataType(chunked_arr->type()) != expected_outputs[i].dtype ||
// strings from duckdb use arrow::utf8() by default, which limits size
// < 2G, so should be converted to large_utf8() to support large scale
chunked_arr->type() == arrow::utf8()) {
auto to_type = ToArrowDataType(expected_outputs[i].dtype);
SPDLOG_WARN("arrow type mismatch, convert from {} to {}",
chunked_arr->type()->ToString(), to_type->ToString());

auto origin_arr = util::ConcatenateChunkedArray(chunked_arr);
auto result = arrow::compute::Cast(chunked_arr, to_type);
YACL_ENFORCE(result.ok(), "caught error while invoking arrow cast: {}",
result.status().ToString());

std::shared_ptr<arrow::Array> array;
ASSIGN_OR_THROW_ARROW_STATUS(array,
arrow::compute::Cast(*origin_arr, to_type));

chunked_arr = std::make_shared<arrow::ChunkedArray>(array);
chunked_arr = result.ValueOrDie().chunked_array();
}
auto tensor = std::make_shared<Tensor>(std::move(chunked_arr));
tensors->push_back(std::move(tensor));
Expand Down
4 changes: 2 additions & 2 deletions engine/datasource/csvdb_adaptor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ TEST_F(CsvdbAdaptorTest, NormalQuery) {
CheckTensorEqual(results[0], TensorFromJSON(arrow::int64(), "[1,2,3,4]"));
CheckTensorEqual(results[1], TensorFromJSON(arrow::int64(), "[21,42,19,32]"));
CheckTensorEqual(results[2],
TensorFromJSON(arrow::utf8(),
TensorFromJSON(arrow::large_utf8(),
R"json(["alice","bob","carol","dave"])json"));
CheckTensorEqual(results[3], TensorFromJSON(arrow::float64(),
"[2100.2,4500.8,1900.5,8900]"));
Expand All @@ -125,7 +125,7 @@ TEST_F(CsvdbAdaptorTest, QueryWithPredicate) {

CheckTensorEqual(results[0], TensorFromJSON(arrow::int64(), "[42,32]"));
CheckTensorEqual(results[1],
TensorFromJSON(arrow::utf8(), R"json(["bob", "dave"])json"));
TensorFromJSON(arrow::large_utf8(), R"json(["bob", "dave"])json"));
CheckTensorEqual(results[2],
TensorFromJSON(arrow::float64(), "[4500.8,8900]"));
}
Expand Down
11 changes: 5 additions & 6 deletions engine/datasource/odbc_adaptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,13 @@ std::vector<TensorPtr> OdbcAdaptor::ExecQueryImpl(
pb::PrimitiveDataType_Name(results[i]->Type()),
pb::PrimitiveDataType_Name(expected_outputs[i].dtype));

auto origin_arr =
util::ConcatenateChunkedArray(results[i]->ToArrowChunkedArray());
auto to_type = ToArrowDataType(expected_outputs[i].dtype);
std::shared_ptr<arrow::Array> array;
ASSIGN_OR_THROW_ARROW_STATUS(array,
arrow::compute::Cast(*origin_arr, to_type));
auto result =
arrow::compute::Cast(results[i]->ToArrowChunkedArray(), to_type);
YACL_ENFORCE(result.ok(), "caught error while invoking arrow cast: {}",
result.status().ToString());

auto chunked_arr = std::make_shared<arrow::ChunkedArray>(array);
auto chunked_arr = result.ValueOrDie().chunked_array();
results[i] = std::make_shared<Tensor>(std::move(chunked_arr));
}
}
Expand Down
2 changes: 1 addition & 1 deletion engine/framework/session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class StringToHashConverter {
array.type()->name()));
}

arrow::Status Visit(const arrow::StringArray& array) {
arrow::Status Visit(const arrow::LargeStringArray& array) {
for (int64_t i = 0; i < array.length(); i++) {
const std::string& cur_str = array.GetString(i);
size_t hash_value = CryptoHash(cur_str);
Expand Down
16 changes: 8 additions & 8 deletions engine/operator/compare_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ INSTANTIATE_TEST_SUITE_P(
BinaryTestCase{
.op_type = Equal::kOpType,
.left_inputs = {test::NamedTensor(
"x", TensorFromJSON(arrow::utf8(),
"x", TensorFromJSON(arrow::large_utf8(),
R"json(["A", "B", "C", "D"])json"))},
.left_input_status = pb::TENSORSTATUS_SECRET,
.right_inputs = {test::NamedTensor(
"y", TensorFromJSON(arrow::utf8(),
"y", TensorFromJSON(arrow::large_utf8(),
R"json(["A", "C", "B", "D"])json"))},
.right_input_status = pb::TENSORSTATUS_SECRET,
.outputs = {test::NamedTensor(
Expand Down Expand Up @@ -120,11 +120,11 @@ INSTANTIATE_TEST_SUITE_P(
BinaryTestCase{
.op_type = Equal::kOpType,
.left_inputs = {test::NamedTensor(
"x", TensorFromJSON(arrow::utf8(),
"x", TensorFromJSON(arrow::large_utf8(),
R"json(["A", "B", "C", "D"])json"))},
.left_input_status = pb::TENSORSTATUS_PRIVATE,
.right_inputs = {test::NamedTensor(
"y", TensorFromJSON(arrow::utf8(),
"y", TensorFromJSON(arrow::large_utf8(),
R"json(["A", "C", "B", "D"])json"))},
.right_input_status = pb::TENSORSTATUS_PUBLIC,
.outputs = {test::NamedTensor(
Expand Down Expand Up @@ -183,11 +183,11 @@ INSTANTIATE_TEST_SUITE_P(
BinaryTestCase{
.op_type = NotEqual::kOpType,
.left_inputs = {test::NamedTensor(
"x", TensorFromJSON(arrow::utf8(),
"x", TensorFromJSON(arrow::large_utf8(),
R"json(["A", "B", "C", "D"])json"))},
.left_input_status = pb::TENSORSTATUS_SECRET,
.right_inputs = {test::NamedTensor(
"y", TensorFromJSON(arrow::utf8(),
"y", TensorFromJSON(arrow::large_utf8(),
R"json(["A", "C", "B", "D"])json"))},
.right_input_status = pb::TENSORSTATUS_SECRET,
.outputs = {test::NamedTensor(
Expand Down Expand Up @@ -245,11 +245,11 @@ INSTANTIATE_TEST_SUITE_P(
BinaryTestCase{
.op_type = NotEqual::kOpType,
.left_inputs = {test::NamedTensor(
"x", TensorFromJSON(arrow::utf8(),
"x", TensorFromJSON(arrow::large_utf8(),
R"json(["A", "B", "C", "D"])json"))},
.left_input_status = pb::TENSORSTATUS_PRIVATE,
.right_inputs = {test::NamedTensor(
"y", TensorFromJSON(arrow::utf8(),
"y", TensorFromJSON(arrow::large_utf8(),
R"json(["A", "C", "B", "D"])json"))},
.right_input_status = pb::TENSORSTATUS_PUBLIC,
.outputs = {test::NamedTensor(
Expand Down
6 changes: 3 additions & 3 deletions engine/operator/concat_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ INSTANTIATE_TEST_SUITE_P(
ConcatTestCase{
.inputs =
{test::NamedTensor(
"a", TensorFromJSON(arrow::utf8(),
"a", TensorFromJSON(arrow::large_utf8(),
R"json(["A", "B", "C"])json")),
test::NamedTensor("b", TensorFromJSON(arrow::utf8(),
test::NamedTensor("b", TensorFromJSON(arrow::large_utf8(),
R"json([])json"))},
.expect_out = test::NamedTensor(
"out", TensorFromJSON(arrow::utf8(),
"out", TensorFromJSON(arrow::large_utf8(),
R"json(["A", "B", "C"])json"))})),
TestParamNameGenerator(ConcatTest));

Expand Down
2 changes: 1 addition & 1 deletion engine/operator/constant_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ INSTANTIATE_TEST_SUITE_P(
.output_status = pb::TENSORSTATUS_PRIVATE},
ConstantTestCase{
.scalar = {test::NamedTensor(
"y", TensorFromJSON(arrow::utf8(),
"y", TensorFromJSON(arrow::large_utf8(),
R"json(["2022-11-22"])json"))},
.output_status = pb::TENSORSTATUS_PRIVATE},
ConstantTestCase{
Expand Down
2 changes: 1 addition & 1 deletion engine/operator/copy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ INSTANTIATE_TEST_SUITE_P(
.output_names = {"x1_copy", "x2_copy"}},
CopyTestCase{
.datas = {test::NamedTensor(
"x1", TensorFromJSON(arrow::utf8(),
"x1", TensorFromJSON(arrow::large_utf8(),
R"json(["D","C",null,"B","A"])json"))},
.output_names = {"x1_copy"}},
CopyTestCase{
Expand Down
2 changes: 1 addition & 1 deletion engine/operator/dump_file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ INSTANTIATE_TEST_SUITE_P(
)csv"},
DumpFileTestCase{
.inputs = {test::NamedTensor(
"x1", TensorFromJSON(arrow::utf8(),
"x1", TensorFromJSON(arrow::large_utf8(),
R"json(["D","C","B","B","A"])json"))},
.output_names = {"x1_dump"},
.output_file_path = "./dumpfile_out.2",
Expand Down
12 changes: 6 additions & 6 deletions engine/operator/filter_by_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,34 @@ INSTANTIATE_TEST_SUITE_P(
.indice = test::NamedTensor("indice", TensorFromJSON(arrow::int64(),
"[3,2,1,0]")),
.datas = {test::NamedTensor(
"x1", TensorFromJSON(arrow::utf8(),
"x1", TensorFromJSON(arrow::large_utf8(),
R"json(["A","B","C","D"])json")),
test::NamedTensor("x2", TensorFromJSON(arrow::int64(),
"[10,11,12,13]"))},
.expect_outs =
{test::NamedTensor(
"y1", TensorFromJSON(arrow::utf8(),
"y1", TensorFromJSON(arrow::large_utf8(),
R"json(["D","C","B","A"])json")),
test::NamedTensor("y2", TensorFromJSON(arrow::int64(),
"[13,12,11,10]"))}},
FilterByIndexTestCase{
.indice = test::NamedTensor(
"indice", TensorFromJSON(arrow::int64(), "[3,2,2,0,null]")),
.datas = {test::NamedTensor(
"x1", TensorFromJSON(arrow::utf8(),
"x1", TensorFromJSON(arrow::large_utf8(),
R"json(["A","B","C",null,"E"])json"))},
.expect_outs = {test::NamedTensor(
"y1", TensorFromJSON(arrow::utf8(),
"y1", TensorFromJSON(arrow::large_utf8(),
R"json([null,"C","C","A",null])json"))}},
// testcase: empty indice
FilterByIndexTestCase{
.indice = test::NamedTensor("indice",
TensorFromJSON(arrow::int64(), "[]")),
.datas = {test::NamedTensor(
"x1", TensorFromJSON(arrow::utf8(),
"x1", TensorFromJSON(arrow::large_utf8(),
R"json(["A","B","C",null,"E"])json"))},
.expect_outs = {test::NamedTensor(
"y1", TensorFromJSON(arrow::utf8(), R"json([])json"))}}));
"y1", TensorFromJSON(arrow::large_utf8(), R"json([])json"))}}));

TEST_P(FilterByIndexTest, works) {
// Given
Expand Down
12 changes: 6 additions & 6 deletions engine/operator/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ INSTANTIATE_TEST_SUITE_P(
.filter_status = pb::TENSORSTATUS_PRIVATE,
.datas =
{test::NamedTensor(
"x1", TensorFromJSON(arrow::utf8(),
"x1", TensorFromJSON(arrow::large_utf8(),
R"json(["A","B","C","D"])json")),
test::NamedTensor("x2", TensorFromJSON(arrow::int64(),
"[10,11,12,13]"))},
.data_status = pb::TENSORSTATUS_PRIVATE,
.expect_outs =
{test::NamedTensor(
"y1",
TensorFromJSON(arrow::utf8(), R"json(["A","D"])json")),
TensorFromJSON(arrow::large_utf8(), R"json(["A","D"])json")),
test::NamedTensor("y2", TensorFromJSON(arrow::int64(),
"[10,13]"))}},
FilterTestCase{
Expand All @@ -66,24 +66,24 @@ INSTANTIATE_TEST_SUITE_P(
"[0,0,1,1,null]")),
.filter_status = pb::TENSORSTATUS_PUBLIC,
.datas = {test::NamedTensor(
"x1", TensorFromJSON(arrow::utf8(),
"x1", TensorFromJSON(arrow::large_utf8(),
R"json(["A","B","C",null,"E"])json"))},
.data_status = pb::TENSORSTATUS_PRIVATE,
.expect_outs = {test::NamedTensor(
"y1",
TensorFromJSON(arrow::utf8(), R"json(["C",null])json"))}},
TensorFromJSON(arrow::large_utf8(), R"json(["C",null])json"))}},
FilterTestCase{
.filter = test::NamedTensor("filter",
TensorFromJSON(arrow::boolean(),
"[0,0,1,1,null]")),
.filter_status = pb::TENSORSTATUS_PUBLIC,
.datas = {test::NamedTensor(
"x1", TensorFromJSON(arrow::utf8(),
"x1", TensorFromJSON(arrow::large_utf8(),
R"json(["A","B","C","D","E"])json"))},
.data_status = pb::TENSORSTATUS_SECRET,
.expect_outs = {test::NamedTensor(
"y1",
TensorFromJSON(arrow::utf8(), R"json(["C","D"])json"))}},
TensorFromJSON(arrow::large_utf8(), R"json(["C","D"])json"))}},
FilterTestCase{
.filter = test::NamedTensor(
"filter", TensorFromJSON(arrow::boolean(), "[1,0,null,1]")),
Expand Down
8 changes: 4 additions & 4 deletions engine/operator/group_agg_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ INSTANTIATE_TEST_SUITE_P(
"[0, 0, 1.1, 1.1, 2.2]")),
test::NamedTensor(
"in_c",
TensorFromJSON(arrow::utf8(),
TensorFromJSON(arrow::large_utf8(),
R"json(["A","A","B","B","CCC"])json"))},
.group_id = test::NamedTensor(
"group_id", TensorFromJSON(arrow::uint32(), "[0, 0, 1, 1, 2]")),
Expand All @@ -126,7 +126,7 @@ INSTANTIATE_TEST_SUITE_P(
test::NamedTensor("out_b", TensorFromJSON(arrow::float32(),
"[0, 1.1, 2.2]")),
test::NamedTensor(
"out_c", TensorFromJSON(arrow::utf8(),
"out_c", TensorFromJSON(arrow::large_utf8(),
R"json(["A","B","CCC"])json"))}},
GroupAggTestCase{
.op_type = GroupCountDistinct::kOpType,
Expand Down Expand Up @@ -244,7 +244,7 @@ INSTANTIATE_TEST_SUITE_P(
test::NamedTensor(
"in_b", TensorFromJSON(arrow::float32(), "[]")),
test::NamedTensor("in_c",
TensorFromJSON(arrow::utf8(),
TensorFromJSON(arrow::large_utf8(),
R"json([])json"))},
.group_id = test::NamedTensor(
"group_id", TensorFromJSON(arrow::uint32(), "[]")),
Expand All @@ -255,7 +255,7 @@ INSTANTIATE_TEST_SUITE_P(
test::NamedTensor(
"out_b", TensorFromJSON(arrow::float32(), "[]")),
test::NamedTensor("out_c",
TensorFromJSON(arrow::utf8(),
TensorFromJSON(arrow::large_utf8(),
R"json([])json"))}},
GroupAggTestCase{
.op_type = GroupCountDistinct::kOpType,
Expand Down
2 changes: 1 addition & 1 deletion engine/operator/group_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ INSTANTIATE_TEST_SUITE_P(
test::NamedTensor(
"in_c",
TensorFromJSON(
arrow::utf8(),
arrow::large_utf8(),
R"json(["A","B","B","CCC","CCC","CCC"])json"))},
.group_id = test::NamedTensor("out_id",
TensorFromJSON(arrow::uint32(),
Expand Down
Loading

0 comments on commit f19263e

Please sign in to comment.