Skip to content

Commit

Permalink
Read arrow data
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Jan 30, 2025
1 parent d261361 commit ca2ff8d
Show file tree
Hide file tree
Showing 73 changed files with 1,435 additions and 711 deletions.
2 changes: 1 addition & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ if(${ARCTICDB_COUNT_ALLOCATIONS})
FetchContent_Declare(
cpptrace
GIT_REPOSITORY https://github.com/jeremy-rifkin/cpptrace.git
GIT_TAG v0.7.3 # <HASH or TAG>
GIT_TAG v0.7.3
)
FetchContent_MakeAvailable(cpptrace)
endif()
Expand Down
17 changes: 12 additions & 5 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ set(arcticdb_srcs
entity/data_error.hpp
entity/descriptors.hpp
entity/native_tensor.hpp
entity/output_type.hpp
entity/output_format.hpp
entity/performance_tracing.hpp
entity/protobuf_mappings.hpp
entity/read_result.hpp
Expand Down Expand Up @@ -243,6 +243,7 @@ set(arcticdb_srcs
pipeline/read_options.hpp
pipeline/read_options.hpp
pipeline/read_pipeline.hpp
pipeline/read_query.hpp
pipeline/slicing.hpp
pipeline/string_pool_utils.hpp
pipeline/value.hpp
Expand Down Expand Up @@ -440,6 +441,9 @@ set(arcticdb_srcs
pipeline/pipeline_context.cpp
pipeline/python_output_frame.cpp
pipeline/query.cpp
pipeline/read_query.cpp
pipeline/read_query.cpp
pipeline/read_pipeline.cpp
pipeline/read_frame.cpp
pipeline/slicing.cpp
pipeline/string_pool_utils.cpp
Expand Down Expand Up @@ -500,7 +504,6 @@ set(arcticdb_srcs
stream/protobuf_mappings.cpp
toolbox/library_tool.cpp
util/allocator.cpp
util/buffer_holder.cpp
util/allocation_tracing.cpp
util/buffer_pool.cpp
util/configs_map.cpp
Expand All @@ -526,7 +529,9 @@ set(arcticdb_srcs
version/version_core.cpp
version/version_store_api.cpp
version/version_utils.cpp
version/version_map_batch_methods.cpp)
version/symbol_list.cpp
version/version_map_batch_methods.cpp
storage/s3/ec2_utils.cpp)

add_library(arcticdb_core_object OBJECT ${arcticdb_srcs})

Expand Down Expand Up @@ -759,7 +764,6 @@ if (SSL_LINK)
endif ()
target_link_libraries(arcticdb_core_object PUBLIC ${arcticdb_core_libraries})


target_include_directories(arcticdb_core_object
PRIVATE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
Expand Down Expand Up @@ -924,6 +928,7 @@ if(${TEST})
pipeline/test/test_container.hpp
pipeline/test/test_pipeline.cpp
pipeline/test/test_query.cpp
pipeline/test/test_frame_allocation.cpp
util/test/test_regex.cpp
processing/test/test_arithmetic_type_promotion.cpp
processing/test/test_clause.cpp
Expand All @@ -941,6 +946,7 @@ if(${TEST})
storage/test/test_memory_storage.cpp
storage/test/test_s3_storage.cpp
storage/test/test_storage_factory.cpp
storage/test/common.hpp
storage/test/test_storage_exceptions.cpp
storage/test/test_azure_storage.cpp
storage/test/common.hpp
Expand Down Expand Up @@ -983,7 +989,8 @@ if(${TEST})
version/test/test_version_map_batch.cpp
version/test/test_version_store.cpp
version/test/version_map_model.hpp
python/python_handlers.cpp)
python/python_handlers.cpp
)

set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755

Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/async/task_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,12 @@ class TaskScheduler {
}

SchedulerWrapper<CPUSchedulerType>& cpu_exec() {
ARCTICDB_DEBUG(log::schedule(), "Getting CPU executor: {}", cpu_exec_.getTaskQueueSize());
ARCTICDB_TRACE(log::schedule(), "Getting CPU executor: {}", cpu_exec_.getTaskQueueSize());
return cpu_exec_;
}

SchedulerWrapper<IOSchedulerType>& io_exec() {
ARCTICDB_DEBUG(log::schedule(), "Getting IO executor: {}", io_exec_.getPendingTaskCount());
ARCTICDB_TRACE(log::schedule(), "Getting IO executor: {}", io_exec_.getPendingTaskCount());
return io_exec_;
}

Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/codec/codec-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ std::size_t decode_ndarray(

const auto data_size = encoding_sizes::data_uncompressed_size(field);
const bool is_empty_array = (data_size == 0) && type_desc_tag.dimension() > Dimension::Dim0;
ARCTICDB_TRACE(log::version(), "Decoding ndarray of size {}", data_size);
// Empty array types will not contain actual data, however, its sparse map should be loaded
// so that we can distinguish None from []
if(data_size == 0 && !is_empty_array) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/codec/codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ void decode_v2(const Segment& segment,
data += encoding_sizes::field_compressed_size(*encoded_field) + sizeof(ColumnMagic);
}
++encoded_field;
ARCTICDB_TRACE(log::codec(), "Decoded column {} to position {}", i, data-begin);
ARCTICDB_TRACE(log::codec(), "V2 Decoded column {} to position {}", i, data-begin);
}

util::check_magic<StringPoolMagic>(data);
Expand Down
9 changes: 9 additions & 0 deletions cpp/arcticdb/codec/encoding_sizes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ switch (field.encoding_case()) {
}
}

inline std::size_t field_uncompressed_size(const EncodedFieldImpl &field) {
switch (field.encoding_case()) {
case EncodedFieldType::NDARRAY:
return uncompressed_size(field.ndarray());
default:
util::raise_rte("Unsupported encoding {}", field.DebugString());
}
}

template <typename FieldCollectionType>
std::size_t segment_compressed_size(const FieldCollectionType &fields) {
std::size_t total = 0;
Expand Down
6 changes: 5 additions & 1 deletion cpp/arcticdb/column_store/block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ struct MemBlock {
}

~MemBlock() {
magic_.check();
if(owns_external_data_) {
util::check(is_external(), "Cannot free inline allocated block");
if(external_data_ != nullptr) {
Expand Down Expand Up @@ -103,7 +104,7 @@ struct MemBlock {
void copy_from(const uint8_t *src, size_t bytes, size_t pos) {
arcticdb::util::check_arg(pos + bytes <= capacity_, "Copying more bytes: {} is greater than capacity {}", bytes,
capacity_);
memcpy(data_ + pos, src, bytes);
memcpy(data() + pos, src, bytes);
}

uint8_t &operator[](size_t pos) {
Expand All @@ -114,10 +115,13 @@ struct MemBlock {

[[nodiscard]] const uint8_t *data() const { return is_external() ? external_data_ : data_; }

[[nodiscard]] uint8_t *data() { return is_external() ? external_data_ : data_; }

[[nodiscard]] const uint8_t* release() {
util::check(is_external() && owns_external_data_, "Cannot release inlined or external data pointer");
auto* tmp = external_data_;
external_data_ = nullptr;
owns_external_data_ = false;
return tmp;
}

Expand Down
Loading

0 comments on commit ca2ff8d

Please sign in to comment.