diff --git a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp index e82bbae9198..12799f2f523 100644 --- a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp +++ b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp @@ -289,8 +289,6 @@ static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr &>(*offsets_column).getData(); offsets_data.reserve(arrow_column->length()); - uint64_t start_offset = 0u; - for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i) { arrow::ListArray & list_chunk = dynamic_cast(*(arrow_column->chunk(chunk_i))); @@ -298,21 +296,27 @@ static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr(*arrow_offsets_array); /* - * It seems like arrow::ListArray::values() (nested column data) might or might not be shared across chunks. - * When it is shared, the offsets will be monotonically increasing. Otherwise, the offsets will be zero based. - * In order to account for both cases, the starting offset is updated whenever a zero-based offset is found. - * More info can be found in: https://lists.apache.org/thread/rrwfb9zo2dc58dhd9rblf20xd7wmy7jm and - * https://github.com/ClickHouse/ClickHouse/pull/43297 + * CH uses element size as "offsets", while arrow uses actual offsets as offsets. + * That's why CH usually starts reading offsets with i=1 and i=0 is ignored. + * In case multiple batches are used to read a column, there is a chance the offsets are + * monotonically increasing, which will cause inconsistencies with the batch data length on `DB::ColumnArray`. + * + * If the offsets are monotonically increasing, `arrow_offsets.Value(0)` will be non-zero for the nth batch, where n > 0. + * If they are not monotonically increasing, it'll always be 0. + * Therefore, we subtract the previous offset from the current offset to get the corresponding CH "offset". + * + * The same might happen for multiple chunks. In this case, we need to add the last offset of the previous chunk, hence + * `offsets.back()`. More info can be found in https://lists.apache.org/thread/rrwfb9zo2dc58dhd9rblf20xd7wmy7jm, + * https://github.com/ClickHouse/ClickHouse/pull/43297 and https://github.com/ClickHouse/ClickHouse/pull/54370 * */ - if (list_chunk.offset() == 0) - { - start_offset = offsets_data.back(); - } + uint64_t previous_offset = arrow_offsets.Value(0); for (int64_t i = 1; i < arrow_offsets.length(); ++i) { auto offset = arrow_offsets.Value(i); - offsets_data.emplace_back(start_offset + offset); + uint64_t elements = offset - previous_offset; + previous_offset = offset; + offsets_data.emplace_back(offsets_data.back() + elements); } } return offsets_column; diff --git a/tests/queries/0_stateless/02874_parquet_multiple_batches_array_inconsistent_offsets.reference b/tests/queries/0_stateless/02874_parquet_multiple_batches_array_inconsistent_offsets.reference new file mode 100644 index 00000000000..ba63f2f7e9c --- /dev/null +++ b/tests/queries/0_stateless/02874_parquet_multiple_batches_array_inconsistent_offsets.reference @@ -0,0 +1,3 @@ +Parquet +e76a749f346078a6a43e0cbd25f0d18a - +400 diff --git a/tests/queries/0_stateless/02874_parquet_multiple_batches_array_inconsistent_offsets.sh b/tests/queries/0_stateless/02874_parquet_multiple_batches_array_inconsistent_offsets.sh new file mode 100755 index 00000000000..83196458a84 --- /dev/null +++ b/tests/queries/0_stateless/02874_parquet_multiple_batches_array_inconsistent_offsets.sh @@ -0,0 +1,129 @@ +#!/usr/bin/env bash +# Tags: no-ubsan, no-fasttest + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +echo "Parquet" + +# More info on: https://github.com/ClickHouse/ClickHouse/pull/54370 + +# File generated with the below code + +#std::string random_string(size_t length) { +# static const std::string characters = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; +# +# std::random_device random_device; +# std::mt19937 generator(random_device()); +# std::uniform_int_distribution<> distribution(0, characters.size() - 1); +# +# std::string random_string; +# random_string.reserve(length); +# +# std::generate_n(std::back_inserter(random_string), length, [&]() { +# return characters[distribution(generator)]; +# }); +# +# return random_string; +#} +# +#static const std::string the_string = random_string(9247124u); +# +#std::shared_ptr CreateIntArray(std::size_t length) { +# arrow::MemoryPool* pool = arrow::default_memory_pool(); +# +# auto int_builder_ptr = std::make_shared(pool); +# auto & int_builder = *int_builder_ptr; +# arrow::ListBuilder list_builder(pool, int_builder_ptr); +# +# for (auto i = 0u; i < length; i++) +# { +# if (i % 10 == 0) +# { +# ARROW_CHECK_OK(list_builder.Append()); +# } +# else +# { +# ARROW_CHECK_OK(int_builder.Append(i)); +# } +# } +# +# std::shared_ptr int_list_array; +# ARROW_CHECK_OK(list_builder.Finish(&int_list_array)); +# return int_list_array; +#} +# +#std::shared_ptr CreateStringArray(std::size_t length) { +# arrow::MemoryPool* pool = arrow::default_memory_pool(); +# +# auto str_builder = std::make_shared(arrow::large_utf8(), pool); +# +# for (auto i = 0u; i < length; i++) +# { +# if (i % 10 == 0) +# { +# ARROW_CHECK_OK(str_builder->AppendNull()); +# } +# else +# { +# ARROW_CHECK_OK(str_builder->Append(the_string)); +# } +# } +# +# std::shared_ptr str_array; +# ARROW_CHECK_OK(str_builder->Finish(&str_array)); +# return str_array; +#} +# +#void run() +#{ +# auto schema = arrow::schema({ +# arrow::field("ints", arrow::list(arrow::int64())), +# arrow::field("strings", arrow::utf8()) +# }); +# +# auto l1_length = 2000; +# auto l2_length = 2000; +# +# std::vector> batches; +# +# auto int_array1 = CreateIntArray(l1_length); +# +# auto int_array2 = CreateIntArray(l1_length); +# +# auto str_array1 = CreateStringArray(l2_length); +# +# auto str_array2 = CreateStringArray(l2_length); +# +# batches.push_back(arrow::RecordBatch::Make(schema, int_array1->length(), {int_array1, str_array1})); +# +# batches.push_back(arrow::RecordBatch::Make(schema, int_array2->length(), {int_array2, str_array2})); +# +# std::shared_ptr outfile; +# PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open("generated.parquet")); +# +# parquet::WriterProperties::Builder builder; +# builder.compression(arrow::Compression::GZIP); +# builder.dictionary_pagesize_limit(10*1024*1024); +# builder.data_pagesize(20*1024*1024); +# +# std::shared_ptr props = builder.build(); +# +# std::unique_ptr file_writer; +# PARQUET_ASSIGN_OR_THROW(file_writer, parquet::arrow::FileWriter::Open(*schema, ::arrow::default_memory_pool(), outfile, props)); +# +# for (const auto& batch : batches) { +# PARQUET_THROW_NOT_OK(file_writer->WriteRecordBatch(*batch)); +# } +# +# PARQUET_THROW_NOT_OK(file_writer->Close()); +#} + +DATA_FILE=$CUR_DIR/data_parquet/string_int_list_inconsistent_offset_multiple_batches.parquet +${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS parquet_load" +${CLICKHOUSE_CLIENT} --query="CREATE TABLE parquet_load (ints Array(Int64), strings Nullable(String)) ENGINE = Memory" +cat "$DATA_FILE" | ${CLICKHOUSE_CLIENT} -q "INSERT INTO parquet_load FORMAT Parquet" +${CLICKHOUSE_CLIENT} --query="SELECT * FROM parquet_load" | md5sum +${CLICKHOUSE_CLIENT} --query="SELECT count() FROM parquet_load" +${CLICKHOUSE_CLIENT} --query="drop table parquet_load" \ No newline at end of file diff --git a/tests/queries/0_stateless/data_parquet/string_int_list_inconsistent_offset_multiple_batches.parquet b/tests/queries/0_stateless/data_parquet/string_int_list_inconsistent_offset_multiple_batches.parquet new file mode 100644 index 00000000000..ca0e2cf6762 Binary files /dev/null and b/tests/queries/0_stateless/data_parquet/string_int_list_inconsistent_offset_multiple_batches.parquet differ