Skip to content

support ingress array datatype #105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion c-questdb-client
Submodule c-questdb-client updated 105 files
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import shutil
import platform
import numpy as np

from setuptools import setup, find_packages
from setuptools.extension import Extension
Expand Down Expand Up @@ -83,7 +84,8 @@ def ingress_extension():
["src/questdb/ingress.pyx"],
include_dirs=[
"c-questdb-client/include",
"pystr-to-utf8/include"],
"pystr-to-utf8/include",
np.get_include()],
library_dirs=lib_paths,
libraries=libraries,
extra_compile_args=extra_compile_args,
Expand Down
49 changes: 46 additions & 3 deletions src/questdb/dataframe.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ cdef auto_flush_t auto_flush_blank() noexcept nogil:
af.last_flush_ms = NULL
return af


cdef bint should_auto_flush(
const auto_flush_mode_t* af_mode,
line_sender_buffer* ls_buf,
Expand Down Expand Up @@ -73,7 +72,8 @@ cdef enum col_target_t:
col_target_column_f64 = 5
col_target_column_str = 6
col_target_column_ts = 7
col_target_at = 8
col_target_column_array = 8
col_target_at = 9


cdef dict _TARGET_NAMES = {
Expand All @@ -85,6 +85,7 @@ cdef dict _TARGET_NAMES = {
col_target_t.col_target_column_f64: "float",
col_target_t.col_target_column_str: "string",
col_target_t.col_target_column_ts: "timestamp",
col_target_t.col_target_column_array: "array",
col_target_t.col_target_at: "designated timestamp",
}

Expand Down Expand Up @@ -125,6 +126,7 @@ cdef enum col_source_t:
col_source_str_lrg_utf8_arrow = 406000
col_source_dt64ns_numpy = 501000
col_source_dt64ns_tz_arrow = 502000
col_source_array_numpy = 503000


cdef bint col_source_needs_gil(col_source_t source) noexcept nogil:
Expand Down Expand Up @@ -213,6 +215,9 @@ cdef dict _TARGET_TO_SOURCES = {
col_source_t.col_source_dt64ns_numpy,
col_source_t.col_source_dt64ns_tz_arrow,
},
col_target_t.col_target_column_array: {
col_source_t.col_source_array_numpy,
},
col_target_t.col_target_at: {
col_source_t.col_source_dt64ns_numpy,
col_source_t.col_source_dt64ns_tz_arrow,
Expand All @@ -227,7 +232,8 @@ cdef tuple _FIELD_TARGETS = (
col_target_t.col_target_column_i64,
col_target_t.col_target_column_f64,
col_target_t.col_target_column_str,
col_target_t.col_target_column_ts)
col_target_t.col_target_column_ts,
col_target_t.col_target_column_array)


# Targets that map directly from a meta target.
Expand Down Expand Up @@ -349,6 +355,9 @@ cdef enum col_dispatch_code_t:
col_dispatch_code_at__dt64ns_tz_arrow = \
col_target_t.col_target_at + col_source_t.col_source_dt64ns_tz_arrow

col_dispatch_code_column_array__array_numpy = \
col_target_t.col_target_column_array + col_source_t.col_source_array_numpy


# Int values in order for sorting (as needed for API's sequential coupling).
cdef enum meta_target_t:
Expand Down Expand Up @@ -932,6 +941,8 @@ cdef void_int _dataframe_series_sniff_pyobj(
col.setup.source = col_source_t.col_source_float_pyobj
elif PyUnicode_CheckExact(obj):
col.setup.source = col_source_t.col_source_str_pyobj
elif PyArray_CheckExact(obj):
col.setup.source = col_source_t.col_source_array_numpy
elif PyBytes_CheckExact(obj):
raise IngressError(
IngressErrorCode.BadDataFrame,
Expand Down Expand Up @@ -2016,6 +2027,36 @@ cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_numpy(
_ensure_has_gil(gs)
raise c_err_to_py(err)

cimport numpy as cnp
cnp.import_array()

cdef void_int _dataframe_serialize_cell_column_array__array_numpy(
line_sender_buffer* ls_buf,
qdb_pystr_buf* b,
col_t* col,
PyThreadState** gs) except -1:
cdef PyObject** access = <PyObject**>col.cursor.chunk.buffers[1]
cdef PyObject* cell = access[col.cursor.offset]
cdef cnp.ndarray arr = <cnp.ndarray> cell
cdef PyArray_Descr* dtype_ptr = cnp.PyArray_DESCR(arr)
if dtype_ptr.type_num != NPY_FLOAT64:
raise IngressError(IngressErrorCode.ArrayWriteToBufferError,
'Only support float64 array, got: %s' % str(arr.dtype))
cdef:
size_t rank = cnp.PyArray_NDIM(arr)
const uint8_t* data_ptr
line_sender_error * err = NULL
if rank == 0:
raise IngressError(IngressErrorCode.ArrayWriteToBufferError, 'Zero-dimensional arrays are not supported')
if rank > 32:
raise IngressError(IngressErrorCode.ArrayLargeDimError, f'Max dimensions 32, got {rank}')
data_ptr = <const uint8_t *> cnp.PyArray_DATA(arr)

if not line_sender_buffer_column_f64_arr(
ls_buf, col.name, rank, <const size_t*> cnp.PyArray_DIMS(arr),
<const ssize_t*> cnp.PyArray_STRIDES(arr), data_ptr, cnp.PyArray_NBYTES(arr), &err):
_ensure_has_gil(gs)
raise c_err_to_py(err)

cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow(
line_sender_buffer* ls_buf,
Expand Down Expand Up @@ -2173,6 +2214,8 @@ cdef void_int _dataframe_serialize_cell(
_dataframe_serialize_cell_column_str__str_i32_cat(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_numpy:
_dataframe_serialize_cell_column_ts__dt64ns_numpy(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_column_array__array_numpy:
_dataframe_serialize_cell_column_array__array_numpy(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_tz_arrow:
_dataframe_serialize_cell_column_ts__dt64ns_tz_arrow(ls_buf, b, col, gs)
elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64ns_numpy:
Expand Down
2 changes: 2 additions & 0 deletions src/questdb/extra_cpython.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ cdef extern from "Python.h":

bint PyLong_CheckExact(PyObject* o)

bint PyArray_CheckExact(PyObject * o)

bint PyFloat_CheckExact(PyObject* o)

double PyFloat_AS_DOUBLE(PyObject* o)
Expand Down
48 changes: 33 additions & 15 deletions src/questdb/ingress.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Dict, List, Optional, Union

import numpy as np
import pandas as pd

class IngressErrorCode(Enum):
Expand All @@ -54,8 +55,17 @@ class IngressErrorCode(Enum):
HttpNotSupported = ...
ServerFlushError = ...
ConfigError = ...
ArrayLargeDimError = ...
ArrayInternalError = ...
ArrayWriteToBufferError = ...
LineProtocolVersionError = ...
BadDataFrame = ...

class LineProtocolVersion(Enum):
"""Line protocol version."""
LineProtocolVersionV1 = ...
LineProtocolVersionV2 = ...

class IngressError(Exception):
"""An error whilst using the ``Sender`` or constructing its ``Buffer``."""

Expand Down Expand Up @@ -194,7 +204,7 @@ class SenderTransaction:
*,
symbols: Optional[Dict[str, Optional[str]]] = None,
columns: Optional[
Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime]]
Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray]]
] = None,
at: Union[ServerTimestamp, TimestampNanos, datetime],
) -> SenderTransaction:
Expand Down Expand Up @@ -300,7 +310,7 @@ class Buffer:

"""

def __init__(self, init_buf_size: int = 65536, max_name_len: int = 127):
def __init__(self, init_buf_size: int = 65536, max_name_len: int = 127, line_protocol_version: LineProtocolVersion = LineProtocolVersion.LineProtocolVersionV2):
"""
Create a new buffer with the an initial capacity and max name length.
:param int init_buf_size: Initial capacity of the buffer in bytes.
Expand Down Expand Up @@ -345,19 +355,19 @@ class Buffer:
"""
The current number of bytes currently in the buffer.

Equivalent (but cheaper) to ``len(str(sender))``.
Equivalent (but cheaper) to ``len(bytes(buffer))``.
"""

def __str__(self) -> str:
"""Return the constructed buffer as a string. Use for debugging."""
def __bytes__(self) -> bytes:
"""Return the constructed buffer as bytes. Use for debugging."""

def row(
self,
table_name: str,
*,
symbols: Optional[Dict[str, Optional[str]]] = None,
columns: Optional[
Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime]]
Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray]]
] = None,
at: Union[ServerTimestamp, TimestampNanos, datetime],
) -> Buffer:
Expand Down Expand Up @@ -806,6 +816,7 @@ class Sender:
auto_flush_rows: Optional[int] = None,
auto_flush_bytes: bool = False,
auto_flush_interval: int = 1000,
disable_line_protocol_validation: bool = False,
init_buf_size: int = 65536,
max_name_len: int = 127,
): ...
Expand All @@ -831,6 +842,7 @@ class Sender:
auto_flush_rows: Optional[int] = None,
auto_flush_bytes: bool = False,
auto_flush_interval: int = 1000,
disable_line_protocol_validation: bool = False,
init_buf_size: int = 65536,
max_name_len: int = 127,
) -> Sender:
Expand Down Expand Up @@ -866,6 +878,7 @@ class Sender:
auto_flush_rows: Optional[int] = None,
auto_flush_bytes: bool = False,
auto_flush_interval: int = 1000,
disable_line_protocol_validation: bool = False,
init_buf_size: int = 65536,
max_name_len: int = 127,
) -> Sender:
Expand Down Expand Up @@ -925,6 +938,11 @@ class Sender:
Time interval threshold for the auto-flush logic, or None if disabled.
"""

def default_line_protocol_version(self) -> LineProtocolVersion:
"""
Returns the QuestDB server's recommended default line protocol version.
"""

def establish(self):
"""
Prepare the sender for use.
Expand All @@ -941,20 +959,20 @@ class Sender:
def __enter__(self) -> Sender:
"""Call :func:`Sender.establish` at the start of a ``with`` block."""

def __str__(self) -> str:
def __len__(self) -> int:
"""
Inspect the contents of the internal buffer.

The ``str`` value returned represents the unsent data.
Number of bytes of unsent data in the internal buffer.

Also see :func:`Sender.__len__`.
Equivalent (but cheaper) to ``len(bytes(sender))``.
"""

def __len__(self) -> int:
def __bytes__(self) -> bytes:
"""
Number of bytes of unsent data in the internal buffer.
Inspect the contents of the internal buffer.

The ``bytes`` value returned represents the unsent data.

Equivalent (but cheaper) to ``len(str(sender))``.
Also see :func:`Sender.__len__`.
"""

def transaction(self, table_name: str) -> SenderTransaction:
Expand All @@ -968,7 +986,7 @@ class Sender:
*,
symbols: Optional[Dict[str, str]] = None,
columns: Optional[
Dict[str, Union[bool, int, float, str, TimestampMicros, datetime]]
Dict[str, Union[bool, int, float, str, TimestampMicros, datetime, np.ndarray]]
] = None,
at: Union[TimestampNanos, datetime, ServerTimestamp],
) -> Sender:
Expand Down
Loading
Loading