diff --git a/orso/compute/compiled.pyx b/orso/compute/compiled.pyx index 228a54d..5154f26 100644 --- a/orso/compute/compiled.pyx +++ b/orso/compute/compiled.pyx @@ -214,7 +214,7 @@ cpdef list calculate_column_widths(list rows): return widths -cpdef list extract_columns_to_lists(list rows): +cpdef list extract_columns_to_lists(list rows, int limit=-1): """ Fast column extraction for Arrow table conversion. Converts row-oriented data to column-oriented lists. @@ -222,15 +222,24 @@ cpdef list extract_columns_to_lists(list rows): Parameters: rows: list of tuples Row-oriented data + limit: int + Maximum number of rows to materialize (-1 means all rows) Returns: list of lists: Column-oriented data """ cdef Py_ssize_t i, j cdef Py_ssize_t num_rows = len(rows) - + cdef Py_ssize_t empty_cols + + if limit >= 0 and limit < num_rows: + num_rows = limit + if num_rows == 0: - return [] + if len(rows) == 0: + return [] + empty_cols = len(rows[0]) + return [[] for _ in range(empty_cols)] cdef tuple first_row = rows[0] cdef Py_ssize_t num_cols = len(first_row) @@ -274,23 +283,35 @@ def process_table(table, row_factory, int max_chunksize) -> list: A list of transformed rows. """ cdef list rows = [None] * table.num_rows - cdef int64_t i = 0, j - cdef list batch_rows - cdef list column_data + cdef int64_t i = 0, k cdef list columns cdef int num_cols cdef int batch_size + cdef object column_method + cdef object row_iter + cdef object row_values + cdef object factory = row_factory for batch in table.to_batches(max_chunksize): # Convert batch columns to Python lists (column-oriented) # This is faster than converting to dicts first num_cols = batch.num_columns batch_size = batch.num_rows - columns = [batch.column(j).to_pylist() for j in range(num_cols)] - - # Reconstruct tuples from columns - for j in range(batch_size): - row = tuple(columns[col_idx][j] for col_idx in range(num_cols)) - rows[i] = row_factory(row) + + if batch_size == 0: + continue + + if num_cols == 0: + for k in range(batch_size): + rows[i] = factory(()) + i += 1 + continue + + column_method = batch.column + columns = [column_method(col_idx).to_pylist() for col_idx in range(num_cols)] + row_iter = zip(*columns) + + for row_values in row_iter: + rows[i] = factory(row_values) i += 1 return rows diff --git a/orso/converters.py b/orso/converters.py index e0f41c0..b620ec8 100644 --- a/orso/converters.py +++ b/orso/converters.py @@ -19,6 +19,7 @@ from orso.row import Row from orso.schema import FlatColumn from orso.schema import RelationSchema +from orso.schema import convert_orso_schema_to_arrow_schema # Cache for optional dependencies _pyarrow = None @@ -87,17 +88,32 @@ def to_arrow(dataset, size=None): except ImportError as import_error: raise MissingDependencyError(import_error.name) from import_error - if size is not None and size >= 0: - dataset = dataset.head(size) + dataset.materialize() + rows = dataset._rows + total_rows = len(rows) - if dataset.rowcount == 0: - arrays = [list() for _ in range(dataset.columncount)] + limit = min(size, total_rows) if size is not None and size >= 0 else total_rows + + column_names = dataset.column_names + + if limit == 0: + arrays = [list() for _ in column_names] else: - # Use Cython for faster column extraction - dataset.materialize() - arrays = extract_columns_to_lists(dataset._rows) + # Use Cython for faster column extraction and respect the requested limit + arrays = extract_columns_to_lists(rows, limit if limit >= 0 else -1) - return _pyarrow.Table.from_arrays(arrays, dataset.column_names) + arrow_schema = None + dataset_schema = getattr(dataset, "schema", None) + if isinstance(dataset_schema, RelationSchema): + try: + arrow_schema = convert_orso_schema_to_arrow_schema(dataset_schema) + except MissingDependencyError: + # Fall back to letting PyArrow infer types if optional dependency missing + arrow_schema = None + + if arrow_schema is not None: + return _pyarrow.Table.from_arrays(arrays, schema=arrow_schema) + return _pyarrow.Table.from_arrays(arrays, column_names) def from_arrow(tables, size=None): diff --git a/orso/row.py b/orso/row.py index cc0488d..61b1677 100644 --- a/orso/row.py +++ b/orso/row.py @@ -25,11 +25,13 @@ import datetime import time from functools import cached_property +from threading import RLock from typing import Any from typing import Dict from typing import List from typing import Tuple from typing import Union +from weakref import WeakValueDictionary import numpy import orjson @@ -45,6 +47,10 @@ HEADER_PREFIX: bytes = b"\x10\x00" MAXIMUM_RECORD_SIZE: int = 16 * 1024 * 1024 +# Cache Row subclasses so we reuse the lightweight tuple wrappers across identical schemas. +_ROW_CLASS_CACHE: "WeakValueDictionary[Tuple[Tuple[str, ...], bool], type]" = WeakValueDictionary() +_ROW_CLASS_CACHE_LOCK = RLock() + def extract_columns(table: List[Dict[str, Any]], columns: List[str]) -> Tuple[List[Any], ...]: """ @@ -210,27 +216,41 @@ def create_class( else: fields = tuple(str(s) for s in schema) - # Create field map for O(1) lookups in .get() method - field_map = {field: i for i, field in enumerate(fields)} - - if tuples_only: - # if we're only handling tuples, we can delegate to super, which is faster - return type( - "RowFactory", - (Row,), - { - "_fields": fields, - "_field_map": field_map, - "__new__": super().__new__, - "as_dict": make_as_dict(fields), - }, - ) - return type( - "RowFactory", - (Row,), - { - "_fields": fields, - "_field_map": field_map, - "as_dict": make_as_dict(fields), - }, - ) + cache_key = (fields, tuples_only) + cached_factory = _ROW_CLASS_CACHE.get(cache_key) + if cached_factory is not None: + return cached_factory + + with _ROW_CLASS_CACHE_LOCK: + cached_factory = _ROW_CLASS_CACHE.get(cache_key) + if cached_factory is not None: + return cached_factory + + # Create field map for O(1) lookups in .get() method + field_map = {field: i for i, field in enumerate(fields)} + + if tuples_only: + # if we're only handling tuples, we can delegate to super, which is faster + row_factory = type( + "RowFactory", + (Row,), + { + "_fields": fields, + "_field_map": field_map, + "__new__": super().__new__, + "as_dict": make_as_dict(fields), + }, + ) + else: + row_factory = type( + "RowFactory", + (Row,), + { + "_fields": fields, + "_field_map": field_map, + "as_dict": make_as_dict(fields), + }, + ) + + _ROW_CLASS_CACHE[cache_key] = row_factory + return row_factory