Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Iterators] Implement reading from Apache Arrow RecordBatches. #658

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions experimental/iterators/include/iterators-c/Runtime/Arrow.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
//===-- Arrow.h - Runtime for handling Apache Arrow data --------*- C++ -*-===//
//
// Licensed under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
//
// This file implements a shallow runtime around the C interfaces of Apache
// Arrow, namely the Arrow C data interface and the Arrow C stream interface
// (see https://arrow.apache.org/docs/format/CDataInterface.html and
// https://arrow.apache.org/docs/format/CStreamInterface.html). While these
// interfaces are already very simple and low-level, the goal of this runtime is
// to simplify its usage even further by doing all error handling and
// verification of current limitations.
//
//===----------------------------------------------------------------------===//

#ifndef ITERATORS_C_RUNTIME_ARROW_H
#define ITERATORS_C_RUNTIME_ARROW_H

#include "mlir-c/Support.h"

struct ArrowArray;
struct ArrowArrayStream;
struct ArrowSchema;

#ifdef __cplusplus
extern "C" {
#endif

//===----------------------------------------------------------------------===//
// Arrow Array (aka Arrow RecordBatch).
//===----------------------------------------------------------------------===//

/// Returns the number of rows in the given Arrow array.
MLIR_CAPI_EXPORTED
int64_t mlirIteratorsArrowArrayGetSize(ArrowArray *array);

/// Returns the raw data pointer to the buffer of the i-th column of the given
/// Arrow array, ensuring that that column is an int8 column.
MLIR_CAPI_EXPORTED const int8_t *
mlirIteratorsArrowArrayGetInt8Column(ArrowArray *array, ArrowSchema *schema,
int64_t i);

/// Returns the raw data pointer to the buffer of the i-th column of the given
/// Arrow array, ensuring that that column is a uint8 column.
MLIR_CAPI_EXPORTED const uint8_t *
mlirIteratorsArrowArrayGetUInt8Column(ArrowArray *array, ArrowSchema *schema,
int64_t i);

/// Returns the raw data pointer to the buffer of the i-th column of the given
/// Arrow array, ensuring that that column is an int16 column.
MLIR_CAPI_EXPORTED const int16_t *
mlirIteratorsArrowArrayGetInt16Column(ArrowArray *array, ArrowSchema *schema,
int64_t i);

/// Returns the raw data pointer to the buffer of the i-th column of the given
/// Arrow array, ensuring that that column is a uint16 column.
MLIR_CAPI_EXPORTED const uint16_t *
mlirIteratorsArrowArrayGetUInt16Column(ArrowArray *array, ArrowSchema *schema,
int64_t i);

/// Returns the raw data pointer to the buffer of the i-th column of the given
/// Arrow array, ensuring that that column is an int32 column.
MLIR_CAPI_EXPORTED const int32_t *
mlirIteratorsArrowArrayGetInt32Column(ArrowArray *array, ArrowSchema *schema,
int64_t i);

/// Returns the raw data pointer to the buffer of the i-th column of the given
/// Arrow array, ensuring that that column is a uint32 column.
MLIR_CAPI_EXPORTED const uint32_t *
mlirIteratorsArrowArrayGetUInt32Column(ArrowArray *array, ArrowSchema *schema,
int64_t i);

/// Returns the raw data pointer to the buffer of the i-th column of the given
/// Arrow array, ensuring that that column is an int64 column.
MLIR_CAPI_EXPORTED const int64_t *
mlirIteratorsArrowArrayGetInt64Column(ArrowArray *array, ArrowSchema *schema,
int64_t i);

/// Returns the raw data pointer to the buffer of the i-th column of the given
/// Arrow array, ensuring that that column is a uint64 column.
MLIR_CAPI_EXPORTED const uint64_t *
mlirIteratorsArrowArrayGetUInt64Column(ArrowArray *array, ArrowSchema *schema,
int64_t i);

/// Returns the raw data pointer to the buffer of the i-th column of the given
/// Arrow array, ensuring that that column is a float16 column.
MLIR_CAPI_EXPORTED const uint16_t *
mlirIteratorsArrowArrayGetFloat16Column(ArrowArray *array, ArrowSchema *schema,
int64_t i);

/// Returns the raw data pointer to the buffer of the i-th column of the given
/// Arrow array, ensuring that that column is a float32 column.
MLIR_CAPI_EXPORTED const float *
mlirIteratorsArrowArrayGetFloat32Column(ArrowArray *array, ArrowSchema *schema,
int64_t i);

/// Returns the raw data pointer to the buffer of the i-th column of the given
/// Arrow array, ensuring that that column is a float64 column.
MLIR_CAPI_EXPORTED const double *
mlirIteratorsArrowArrayGetFloat64Column(ArrowArray *array, ArrowSchema *schema,
int64_t i);

/// Releases the memory owned by the given Arrow array (by calling its release
/// function). Unlike the lower-level release function from the Arrow C
/// interface, this function may be called on already released structs, in which
/// case the release function is not called.
MLIR_CAPI_EXPORTED
void mlirIteratorsArrowArrayRelease(ArrowArray *array);

//===----------------------------------------------------------------------===//
// ArrowSchema.
//===----------------------------------------------------------------------===//

/// Releases the memory owned by the given schema (by calling its release
/// function). Unlike the lower-level release function from the Arrow C
/// interface, this function may be called on already released structs, in which
/// case the release function is not called.
MLIR_CAPI_EXPORTED
void mlirIteratorsArrowSchemaRelease(ArrowSchema *schema);

//===----------------------------------------------------------------------===//
// ArrowArrayStream (aka RecordBatchReader).
//===----------------------------------------------------------------------===//

/// Attempts to extract the next record batch from the given stream. Stores the
/// returned batch in the given result pointer and returns true iff the stream
/// did return a batch. If an error occurs, prints a message and exits.
MLIR_CAPI_EXPORTED
bool mlirIteratorsArrowArrayStreamGetNext(ArrowArrayStream *stream,
ArrowArray *result);

/// Gets the schema for the stream and stores it in the result pointer. If an
/// error occurs, prints a message and exits.
MLIR_CAPI_EXPORTED
void mlirIteratorsArrowArrayStreamGetSchema(ArrowArrayStream *stream,
ArrowSchema *result);

/// Releases the memory owned by the given schema (by calling its release
/// function). Unlike the lower-level release function from the Arrow C
/// interface, this function may be called on already released structs, in which
/// case the release function is not called.
MLIR_CAPI_EXPORTED
void mlirIteratorsArrowArrayStreamRelease(ArrowArrayStream *stream);

#ifdef __cplusplus
}
#endif

#endif // ITERATORS_C_RUNTIME_ARROW_H
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//===- ArrowCInterfaces.h - Arrow C data and stream interfaces ----*- C -*-===//
//
// This file is licensed under the Apache License v2.0.
// See https://www.apache.org/licenses/LICENSE-2.0 for license information.
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
//
// This file contains the struct definitions of the Apache Arrow C ABI from
// https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions
// and
// https://arrow.apache.org/docs/format/CStreamInterface.html#structure-definition.
//
//===----------------------------------------------------------------------===//

#ifndef ARROW_C_DATA_INTERFACE
#define ARROW_C_DATA_INTERFACE

#include <cstdint>

#define ARROW_FLAG_DICTIONARY_ORDERED 1
#define ARROW_FLAG_NULLABLE 2
#define ARROW_FLAG_MAP_KEYS_SORTED 4

struct ArrowSchema {
// Array type description
const char *format;
const char *name;
const char *metadata;
int64_t flags;
int64_t n_children;
struct ArrowSchema **children;
struct ArrowSchema *dictionary;

// Release callback
void (*release)(struct ArrowSchema *);
// Opaque producer-specific data
void *private_data;
};

struct ArrowArray {
// Array data description
int64_t length;
int64_t null_count;
int64_t offset;
int64_t n_buffers;
int64_t n_children;
const void **buffers;
struct ArrowArray **children;
struct ArrowArray *dictionary;

// Release callback
void (*release)(struct ArrowArray *);
// Opaque producer-specific data
void *private_data;
};

#endif // ARROW_C_DATA_INTERFACE

#ifndef ARROW_C_STREAM_INTERFACE
#define ARROW_C_STREAM_INTERFACE

struct ArrowArrayStream {
// Callbacks providing stream functionality
int (*get_schema)(struct ArrowArrayStream *, struct ArrowSchema *out);
int (*get_next)(struct ArrowArrayStream *, struct ArrowArray *out);
const char *(*get_last_error)(struct ArrowArrayStream *);

// Release callback
void (*release)(struct ArrowArrayStream *);

// Opaque producer-specific data
void *private_data;
};

#endif // ARROW_C_STREAM_INTERFACE
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//===-- ArrowUtils.h - IR utils related to Apache Arrow --------*- C++ -*-===//
//
// Licensed under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//

#ifndef ITERATORS_DIALECT_ITERATORS_IR_ARROWUTILS_H
#define ITERATORS_DIALECT_ITERATORS_IR_ARROWUTILS_H

namespace mlir {
class MLIRContext;
namespace LLVM {
class LLVMStructType;
} // namespace LLVM
} // namespace mlir

namespace mlir {
namespace iterators {

/// Returns the LLVM struct type for Arrow arrays of the Arrow C data interface.
/// For the official definition of the type, see
/// https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions.
LLVM::LLVMStructType getArrowArrayType(MLIRContext *context);

/// Returns the LLVM struct type for Arrow schemas of the Arrow C data
/// interface. For the official definition of the type, see
/// https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions.
LLVM::LLVMStructType getArrowSchemaType(MLIRContext *context);

/// Returns the LLVM struct type for Arrow streams of the Arrow C stream
/// interface. For the official definition of the type, see
/// https://arrow.apache.org/docs/format/CStreamInterface.html#structure-definition.
LLVM::LLVMStructType getArrowArrayStreamType(MLIRContext *context);

} // namespace iterators
} // namespace mlir

#endif // ITERATORS_DIALECT_ITERATORS_IR_ARROWUTILS_H
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@

#include "iterators/Dialect/Iterators/IR/IteratorsOpsDialect.h.inc"

namespace mlir {
namespace LLVM {
class LLVMPointerType;
} // namespace LLVM
} // namespace mlir

namespace mlir {
namespace iterators {
#include "iterators/Dialect/Iterators/IR/IteratorsOpInterfaces.h.inc"
#include "iterators/Dialect/Iterators/IR/IteratorsTypeInterfaces.h.inc"

} // namespace iterators
} // namespace mlir

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,54 @@ def Iterators_FilterOp : Iterators_Op<"filter",
}];
}

def Iterators_FromArrowArrayStreamOp : Iterators_Op<"from_arrow_array_stream", [
TypesMatchWith<"type of operand must be an LLVM 'struct ArrowArrayStream'",
"result", "arrowStream",
"mlir::LLVM::LLVMPointerType::get("
" mlir::iterators::getArrowArrayStreamType("
" $_self.getContext()))">,
DeclareOpInterfaceMethods<OpAsmOpInterface, ["getAsmResultNames"]>]> {
let summary = "Converts an arrow array stream into a stream of tabular views";
let description = [{
Wraps an Apache Arrow Array Stream using the Arrow C stream interface in
an `Iterators_Op`, i.e., converts the `ArrowArrayStream`, which produces a
stream of `ArrowArrays`, into an `Iterator_Op` that produces a stream of
`Tabular_TabularView`. This allows to read from any producer of Apache Arrow
Arrays (a.k.a "record batches"), including file readers for CSV, JSON,
Parquet, and others, libraries and languages such as pyarrow (and thus
Python and pandas), remote processes via Arrow IPC, and many others.

**Limitations:** Currently, only a few numeric data types, only ArrowArrays
without offset, and no nullable or nested types are supported. These
limitations are inherited in part from the runtime and in part from
TabularView. Furthermore, returning TabularViews, which are references,
limits the lifetime of each returned element until when the next element is
produced, so it is up to consuming iterator ops to copy any buffers that
have to live longer. Finally, Arrow array streams have no "reset"/"re-open"
method, i.e., they can only consumed once, so the lowering to
open/next/close, in which that is possible, breaks if re-opening is
attempted.

Example:
```mlir
func.func @main(%external_input: !llvm.ptr<!arrow_array_stream>) ->
!iterators.stream<!tabular.tabular_view<i32>> {
%tabular_view_stream = iterators.from_arrow_array_stream %external_input
to !iterators.stream<!tabular.tabular_view<i32>>
```
}];
let arguments = (ins LLVM_PointerTo<LLVM_AnyStruct>:$arrowStream);
let results = (outs Iterators_StreamOf<Tabular_TabularView>:$result);
let assemblyFormat = "$arrowStream attr-dict `to` qualified(type($result))";
let extraClassDefinition = [{
/// Implement OpAsmOpInterface.
void $cppClass::getAsmResultNames(
llvm::function_ref<void(mlir::Value, llvm::StringRef)> setNameFn) {
setNameFn(getResult(), "fromarrowstream");
}
}];
}

def Iterators_MapOp : Iterators_Op<"map",
[DeclareOpInterfaceMethods<SymbolUserOpInterface>,
DeclareOpInterfaceMethods<OpAsmOpInterface, ["getAsmResultNames"]>]> {
Expand Down
1 change: 1 addition & 0 deletions experimental/iterators/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
add_subdirectory(CAPI)
add_subdirectory(Conversion)
add_subdirectory(Dialect)
add_subdirectory(Runtime)
add_subdirectory(Utils)
Loading