Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 33 additions & 12 deletions orso/compute/compiled.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -214,23 +214,32 @@ cpdef list calculate_column_widths(list rows):
return widths


cpdef list extract_columns_to_lists(list rows):
cpdef list extract_columns_to_lists(list rows, int limit=-1):
"""
Fast column extraction for Arrow table conversion.
Converts row-oriented data to column-oriented lists.

Parameters:
rows: list of tuples
Row-oriented data
limit: int
Maximum number of rows to materialize (-1 means all rows)

Returns:
list of lists: Column-oriented data
"""
cdef Py_ssize_t i, j
cdef Py_ssize_t num_rows = len(rows)

cdef Py_ssize_t empty_cols

if limit >= 0 and limit < num_rows:
num_rows = limit

if num_rows == 0:
return []
if len(rows) == 0:
return []
empty_cols = len(<tuple>rows[0])
return [[] for _ in range(empty_cols)]

cdef tuple first_row = <tuple>rows[0]
cdef Py_ssize_t num_cols = len(first_row)
Expand Down Expand Up @@ -274,23 +283,35 @@ def process_table(table, row_factory, int max_chunksize) -> list:
A list of transformed rows.
"""
cdef list rows = [None] * table.num_rows
cdef int64_t i = 0, j
cdef list batch_rows
cdef list column_data
cdef int64_t i = 0, k
cdef list columns
cdef int num_cols
cdef int batch_size
cdef object column_method
cdef object row_iter
cdef object row_values
cdef object factory = row_factory

for batch in table.to_batches(max_chunksize):
# Convert batch columns to Python lists (column-oriented)
# This is faster than converting to dicts first
num_cols = batch.num_columns
batch_size = batch.num_rows
columns = [batch.column(j).to_pylist() for j in range(num_cols)]

# Reconstruct tuples from columns
for j in range(batch_size):
row = tuple(columns[col_idx][j] for col_idx in range(num_cols))
rows[i] = row_factory(row)

if batch_size == 0:
continue

if num_cols == 0:
for k in range(batch_size):
rows[i] = factory(())
i += 1
continue

column_method = batch.column
columns = [column_method(col_idx).to_pylist() for col_idx in range(num_cols)]
row_iter = zip(*columns)

for row_values in row_iter:
rows[i] = factory(row_values)
i += 1
return rows
32 changes: 24 additions & 8 deletions orso/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from orso.row import Row
from orso.schema import FlatColumn
from orso.schema import RelationSchema
from orso.schema import convert_orso_schema_to_arrow_schema

# Cache for optional dependencies
_pyarrow = None
Expand Down Expand Up @@ -87,17 +88,32 @@ def to_arrow(dataset, size=None):
except ImportError as import_error:
raise MissingDependencyError(import_error.name) from import_error

if size is not None and size >= 0:
dataset = dataset.head(size)
dataset.materialize()
rows = dataset._rows
total_rows = len(rows)

if dataset.rowcount == 0:
arrays = [list() for _ in range(dataset.columncount)]
limit = min(size, total_rows) if size is not None and size >= 0 else total_rows

column_names = dataset.column_names

if limit == 0:
arrays = [list() for _ in column_names]
else:
# Use Cython for faster column extraction
dataset.materialize()
arrays = extract_columns_to_lists(dataset._rows)
# Use Cython for faster column extraction and respect the requested limit
arrays = extract_columns_to_lists(rows, limit if limit >= 0 else -1)

return _pyarrow.Table.from_arrays(arrays, dataset.column_names)
arrow_schema = None
dataset_schema = getattr(dataset, "schema", None)
if isinstance(dataset_schema, RelationSchema):
try:
arrow_schema = convert_orso_schema_to_arrow_schema(dataset_schema)
except MissingDependencyError:
# Fall back to letting PyArrow infer types if optional dependency missing
arrow_schema = None

if arrow_schema is not None:
return _pyarrow.Table.from_arrays(arrays, schema=arrow_schema)
return _pyarrow.Table.from_arrays(arrays, column_names)


def from_arrow(tables, size=None):
Expand Down
68 changes: 44 additions & 24 deletions orso/row.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import datetime
import time
from functools import cached_property
from threading import RLock
from typing import Any
from typing import Dict
from typing import List
from typing import Tuple
from typing import Union
from weakref import WeakValueDictionary

import numpy
import orjson
Expand All @@ -45,6 +47,10 @@
HEADER_PREFIX: bytes = b"\x10\x00"
MAXIMUM_RECORD_SIZE: int = 16 * 1024 * 1024

# Cache Row subclasses so we reuse the lightweight tuple wrappers across identical schemas.
_ROW_CLASS_CACHE: "WeakValueDictionary[Tuple[Tuple[str, ...], bool], type]" = WeakValueDictionary()
_ROW_CLASS_CACHE_LOCK = RLock()


def extract_columns(table: List[Dict[str, Any]], columns: List[str]) -> Tuple[List[Any], ...]:
"""
Expand Down Expand Up @@ -210,27 +216,41 @@ def create_class(
else:
fields = tuple(str(s) for s in schema)

# Create field map for O(1) lookups in .get() method
field_map = {field: i for i, field in enumerate(fields)}

if tuples_only:
# if we're only handling tuples, we can delegate to super, which is faster
return type(
"RowFactory",
(Row,),
{
"_fields": fields,
"_field_map": field_map,
"__new__": super().__new__,
"as_dict": make_as_dict(fields),
},
)
return type(
"RowFactory",
(Row,),
{
"_fields": fields,
"_field_map": field_map,
"as_dict": make_as_dict(fields),
},
)
cache_key = (fields, tuples_only)
cached_factory = _ROW_CLASS_CACHE.get(cache_key)
if cached_factory is not None:
return cached_factory

with _ROW_CLASS_CACHE_LOCK:
cached_factory = _ROW_CLASS_CACHE.get(cache_key)
if cached_factory is not None:
return cached_factory

# Create field map for O(1) lookups in .get() method
field_map = {field: i for i, field in enumerate(fields)}

if tuples_only:
# if we're only handling tuples, we can delegate to super, which is faster
row_factory = type(
"RowFactory",
(Row,),
{
"_fields": fields,
"_field_map": field_map,
"__new__": super().__new__,
"as_dict": make_as_dict(fields),
},
)
else:
row_factory = type(
"RowFactory",
(Row,),
{
"_fields": fields,
"_field_map": field_map,
"as_dict": make_as_dict(fields),
},
)

_ROW_CLASS_CACHE[cache_key] = row_factory
return row_factory
Loading