diff --git a/c-questdb-client b/c-questdb-client index 242c1f3..b091bf6 160000 --- a/c-questdb-client +++ b/c-questdb-client @@ -1 +1 @@ -Subproject commit 242c1f3c6a830ce28ca515168bc90306c9c96ab4 +Subproject commit b091bf62cede3153a74bb3f09ad104026b5b1e7f diff --git a/setup.py b/setup.py index 2d0ec2b..da0a10a 100755 --- a/setup.py +++ b/setup.py @@ -5,6 +5,7 @@ import os import shutil import platform +import numpy as np from setuptools import setup, find_packages from setuptools.extension import Extension @@ -83,7 +84,8 @@ def ingress_extension(): ["src/questdb/ingress.pyx"], include_dirs=[ "c-questdb-client/include", - "pystr-to-utf8/include"], + "pystr-to-utf8/include", + np.get_include()], library_dirs=lib_paths, libraries=libraries, extra_compile_args=extra_compile_args, diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index 7601587..8e3d85a 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -27,7 +27,6 @@ cdef auto_flush_t auto_flush_blank() noexcept nogil: af.last_flush_ms = NULL return af - cdef bint should_auto_flush( const auto_flush_mode_t* af_mode, line_sender_buffer* ls_buf, @@ -73,7 +72,8 @@ cdef enum col_target_t: col_target_column_f64 = 5 col_target_column_str = 6 col_target_column_ts = 7 - col_target_at = 8 + col_target_column_array = 8 + col_target_at = 9 cdef dict _TARGET_NAMES = { @@ -85,6 +85,7 @@ cdef dict _TARGET_NAMES = { col_target_t.col_target_column_f64: "float", col_target_t.col_target_column_str: "string", col_target_t.col_target_column_ts: "timestamp", + col_target_t.col_target_column_array: "array", col_target_t.col_target_at: "designated timestamp", } @@ -125,6 +126,7 @@ cdef enum col_source_t: col_source_str_lrg_utf8_arrow = 406000 col_source_dt64ns_numpy = 501000 col_source_dt64ns_tz_arrow = 502000 + col_source_array_numpy = 503000 cdef bint col_source_needs_gil(col_source_t source) noexcept nogil: @@ -213,6 +215,9 @@ cdef dict _TARGET_TO_SOURCES = { col_source_t.col_source_dt64ns_numpy, col_source_t.col_source_dt64ns_tz_arrow, }, + col_target_t.col_target_column_array: { + col_source_t.col_source_array_numpy, + }, col_target_t.col_target_at: { col_source_t.col_source_dt64ns_numpy, col_source_t.col_source_dt64ns_tz_arrow, @@ -227,7 +232,8 @@ cdef tuple _FIELD_TARGETS = ( col_target_t.col_target_column_i64, col_target_t.col_target_column_f64, col_target_t.col_target_column_str, - col_target_t.col_target_column_ts) + col_target_t.col_target_column_ts, + col_target_t.col_target_column_array) # Targets that map directly from a meta target. @@ -349,6 +355,9 @@ cdef enum col_dispatch_code_t: col_dispatch_code_at__dt64ns_tz_arrow = \ col_target_t.col_target_at + col_source_t.col_source_dt64ns_tz_arrow + col_dispatch_code_column_array__array_numpy = \ + col_target_t.col_target_column_array + col_source_t.col_source_array_numpy + # Int values in order for sorting (as needed for API's sequential coupling). cdef enum meta_target_t: @@ -932,6 +941,8 @@ cdef void_int _dataframe_series_sniff_pyobj( col.setup.source = col_source_t.col_source_float_pyobj elif PyUnicode_CheckExact(obj): col.setup.source = col_source_t.col_source_str_pyobj + elif PyArray_CheckExact(obj): + col.setup.source = col_source_t.col_source_array_numpy elif PyBytes_CheckExact(obj): raise IngressError( IngressErrorCode.BadDataFrame, @@ -2016,6 +2027,36 @@ cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_numpy( _ensure_has_gil(gs) raise c_err_to_py(err) +cimport numpy as cnp +cnp.import_array() + +cdef void_int _dataframe_serialize_cell_column_array__array_numpy( + line_sender_buffer* ls_buf, + qdb_pystr_buf* b, + col_t* col, + PyThreadState** gs) except -1: + cdef PyObject** access = col.cursor.chunk.buffers[1] + cdef PyObject* cell = access[col.cursor.offset] + cdef cnp.ndarray arr = cell + cdef PyArray_Descr* dtype_ptr = cnp.PyArray_DESCR(arr) + if dtype_ptr.type_num != NPY_FLOAT64: + raise IngressError(IngressErrorCode.ArrayWriteToBufferError, + 'Only support float64 array, got: %s' % str(arr.dtype)) + cdef: + size_t rank = cnp.PyArray_NDIM(arr) + const uint8_t* data_ptr + line_sender_error * err = NULL + if rank == 0: + raise IngressError(IngressErrorCode.ArrayWriteToBufferError, 'Zero-dimensional arrays are not supported') + if rank > 32: + raise IngressError(IngressErrorCode.ArrayLargeDimError, f'Max dimensions 32, got {rank}') + data_ptr = cnp.PyArray_DATA(arr) + + if not line_sender_buffer_column_f64_arr( + ls_buf, col.name, rank, cnp.PyArray_DIMS(arr), + cnp.PyArray_STRIDES(arr), data_ptr, cnp.PyArray_NBYTES(arr), &err): + _ensure_has_gil(gs) + raise c_err_to_py(err) cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow( line_sender_buffer* ls_buf, @@ -2173,6 +2214,8 @@ cdef void_int _dataframe_serialize_cell( _dataframe_serialize_cell_column_str__str_i32_cat(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_numpy: _dataframe_serialize_cell_column_ts__dt64ns_numpy(ls_buf, b, col, gs) + elif dc == col_dispatch_code_t.col_dispatch_code_column_array__array_numpy: + _dataframe_serialize_cell_column_array__array_numpy(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_column_ts__dt64ns_tz_arrow: _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow(ls_buf, b, col, gs) elif dc == col_dispatch_code_t.col_dispatch_code_at__dt64ns_numpy: diff --git a/src/questdb/extra_cpython.pxd b/src/questdb/extra_cpython.pxd index 3e79456..bc339c6 100644 --- a/src/questdb/extra_cpython.pxd +++ b/src/questdb/extra_cpython.pxd @@ -54,6 +54,8 @@ cdef extern from "Python.h": bint PyLong_CheckExact(PyObject* o) + bint PyArray_CheckExact(PyObject * o) + bint PyFloat_CheckExact(PyObject* o) double PyFloat_AS_DOUBLE(PyObject* o) diff --git a/src/questdb/ingress.pyi b/src/questdb/ingress.pyi index cc642c0..16b6dae 100644 --- a/src/questdb/ingress.pyi +++ b/src/questdb/ingress.pyi @@ -38,6 +38,7 @@ from datetime import datetime, timedelta from enum import Enum from typing import Any, Dict, List, Optional, Union +import numpy as np import pandas as pd class IngressErrorCode(Enum): @@ -54,8 +55,17 @@ class IngressErrorCode(Enum): HttpNotSupported = ... ServerFlushError = ... ConfigError = ... + ArrayLargeDimError = ... + ArrayInternalError = ... + ArrayWriteToBufferError = ... + LineProtocolVersionError = ... BadDataFrame = ... +class LineProtocolVersion(Enum): + """Line protocol version.""" + LineProtocolVersionV1 = ... + LineProtocolVersionV2 = ... + class IngressError(Exception): """An error whilst using the ``Sender`` or constructing its ``Buffer``.""" @@ -194,7 +204,7 @@ class SenderTransaction: *, symbols: Optional[Dict[str, Optional[str]]] = None, columns: Optional[ - Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime]] + Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray]] ] = None, at: Union[ServerTimestamp, TimestampNanos, datetime], ) -> SenderTransaction: @@ -300,7 +310,7 @@ class Buffer: """ - def __init__(self, init_buf_size: int = 65536, max_name_len: int = 127): + def __init__(self, init_buf_size: int = 65536, max_name_len: int = 127, line_protocol_version: LineProtocolVersion = LineProtocolVersion.LineProtocolVersionV2): """ Create a new buffer with the an initial capacity and max name length. :param int init_buf_size: Initial capacity of the buffer in bytes. @@ -345,11 +355,11 @@ class Buffer: """ The current number of bytes currently in the buffer. - Equivalent (but cheaper) to ``len(str(sender))``. + Equivalent (but cheaper) to ``len(bytes(buffer))``. """ - def __str__(self) -> str: - """Return the constructed buffer as a string. Use for debugging.""" + def __bytes__(self) -> bytes: + """Return the constructed buffer as bytes. Use for debugging.""" def row( self, @@ -357,7 +367,7 @@ class Buffer: *, symbols: Optional[Dict[str, Optional[str]]] = None, columns: Optional[ - Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime]] + Dict[str, Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray]] ] = None, at: Union[ServerTimestamp, TimestampNanos, datetime], ) -> Buffer: @@ -806,6 +816,7 @@ class Sender: auto_flush_rows: Optional[int] = None, auto_flush_bytes: bool = False, auto_flush_interval: int = 1000, + disable_line_protocol_validation: bool = False, init_buf_size: int = 65536, max_name_len: int = 127, ): ... @@ -831,6 +842,7 @@ class Sender: auto_flush_rows: Optional[int] = None, auto_flush_bytes: bool = False, auto_flush_interval: int = 1000, + disable_line_protocol_validation: bool = False, init_buf_size: int = 65536, max_name_len: int = 127, ) -> Sender: @@ -866,6 +878,7 @@ class Sender: auto_flush_rows: Optional[int] = None, auto_flush_bytes: bool = False, auto_flush_interval: int = 1000, + disable_line_protocol_validation: bool = False, init_buf_size: int = 65536, max_name_len: int = 127, ) -> Sender: @@ -925,6 +938,11 @@ class Sender: Time interval threshold for the auto-flush logic, or None if disabled. """ + def default_line_protocol_version(self) -> LineProtocolVersion: + """ + Returns the QuestDB server's recommended default line protocol version. + """ + def establish(self): """ Prepare the sender for use. @@ -941,20 +959,20 @@ class Sender: def __enter__(self) -> Sender: """Call :func:`Sender.establish` at the start of a ``with`` block.""" - def __str__(self) -> str: + def __len__(self) -> int: """ - Inspect the contents of the internal buffer. - - The ``str`` value returned represents the unsent data. + Number of bytes of unsent data in the internal buffer. - Also see :func:`Sender.__len__`. + Equivalent (but cheaper) to ``len(bytes(sender))``. """ - def __len__(self) -> int: + def __bytes__(self) -> bytes: """ - Number of bytes of unsent data in the internal buffer. + Inspect the contents of the internal buffer. + + The ``bytes`` value returned represents the unsent data. - Equivalent (but cheaper) to ``len(str(sender))``. + Also see :func:`Sender.__len__`. """ def transaction(self, table_name: str) -> SenderTransaction: @@ -968,7 +986,7 @@ class Sender: *, symbols: Optional[Dict[str, str]] = None, columns: Optional[ - Dict[str, Union[bool, int, float, str, TimestampMicros, datetime]] + Dict[str, Union[bool, int, float, str, TimestampMicros, datetime, np.ndarray]] ] = None, at: Union[TimestampNanos, datetime, ServerTimestamp], ) -> Sender: diff --git a/src/questdb/ingress.pyx b/src/questdb/ingress.pyx index 9af2238..2e99a4c 100644 --- a/src/questdb/ingress.pyx +++ b/src/questdb/ingress.pyx @@ -78,16 +78,25 @@ from enum import Enum from typing import List, Tuple, Dict, Union, Any, Optional, Callable, \ Iterable import pathlib +from cpython.bytes cimport PyBytes_FromStringAndSize import sys import os +cimport numpy as cnp +import numpy as np +cdef extern from "numpy/ndarraytypes.h": + ctypedef struct PyArray_Descr: + int type_num + enum: NPY_FLOAT64 +cnp.import_array() # This value is automatically updated by the `bump2version` tool. # If you need to update it, also update the search definition in # .bumpversion.cfg. VERSION = '2.0.4' +MAX_ARRAY_DIM = 32 cdef bint _has_gil(PyThreadState** gs): return gs[0] == NULL @@ -106,7 +115,6 @@ cdef void _ensure_has_gil(PyThreadState** gs): PyEval_RestoreThread(gs[0]) gs[0] = NULL - class IngressErrorCode(Enum): """Category of Error.""" CouldNotResolveAddr = line_sender_error_could_not_resolve_addr @@ -120,12 +128,20 @@ class IngressErrorCode(Enum): HttpNotSupported = line_sender_error_http_not_supported ServerFlushError = line_sender_error_server_flush_error ConfigError = line_sender_error_config_error - BadDataFrame = line_sender_error_server_flush_error + 1 + ArrayLargeDimError = line_sender_error_array_large_dim + ArrayInternalError = line_sender_error_array_view_internal_error + ArrayWriteToBufferError = line_sender_error_array_view_write_to_buffer_error + LineProtocolVersionError = line_sender_error_line_protocol_version_error + BadDataFrame = line_sender_error_line_protocol_version_error + 1 def __str__(self) -> str: """Return the name of the enum.""" return self.name +class LineProtocolVersion(Enum): + """Line protocol version.""" + LineProtocolVersionV1 = line_protocol_version_1 + LineProtocolVersionV2 = line_protocol_version_2 class IngressError(Exception): """An error whilst using the ``Sender`` or constructing its ``Buffer``.""" @@ -162,6 +178,14 @@ cdef inline object c_err_code_to_py(line_sender_error_code code): return IngressErrorCode.ServerFlushError elif code == line_sender_error_config_error: return IngressErrorCode.ConfigError + elif code == line_sender_error_array_large_dim: + return IngressErrorCode.ArrayLargeDimError + elif code == line_sender_error_array_view_internal_error: + return IngressErrorCode.ArrayInternalError + elif code == line_sender_error_array_view_write_to_buffer_error: + return IngressErrorCode.ArrayWriteToBufferError + elif code == line_sender_error_line_protocol_version_error: + return IngressErrorCode.LineProtocolVersionError else: raise ValueError('Internal error converting error code.') @@ -607,7 +631,7 @@ cdef class SenderTransaction: symbols: Optional[Dict[str, Optional[str]]]=None, columns: Optional[Dict[ str, - Union[None, bool, int, float, str, TimestampMicros, datetime]] + Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray]] ]=None, at: Union[ServerTimestamp, TimestampNanos, datetime]): """ @@ -688,7 +712,6 @@ cdef class SenderTransaction: self._sender._in_txn = False self._complete = True - cdef class Buffer: """ Construct QuestDB-flavored InfluxDB Line Protocol (ILP) messages. @@ -759,18 +782,22 @@ cdef class Buffer: cdef size_t _max_name_len cdef object _row_complete_sender - def __cinit__(self, init_buf_size: int=65536, max_name_len: int=127): + def __cinit__(self, init_buf_size: int=65536, max_name_len: int=127, line_protocol_version: LineProtocolVersion=LineProtocolVersion.LineProtocolVersionV2): """ Create a new buffer with the an initial capacity and max name length. :param int init_buf_size: Initial capacity of the buffer in bytes. :param int max_name_len: Maximum length of a table or column name. """ - self._cinit_impl(init_buf_size, max_name_len) + self._cinit_impl(init_buf_size, max_name_len, line_protocol_version.value) - cdef inline _cinit_impl(self, size_t init_buf_size, size_t max_name_len): + cdef inline _cinit_impl(self, size_t init_buf_size, size_t max_name_len, line_protocol_version version): self._impl = line_sender_buffer_with_max_name_len(max_name_len) self._b = qdb_pystr_buf_new() line_sender_buffer_reserve(self._impl, init_buf_size) + cdef line_sender_error* err = NULL + if not line_sender_buffer_set_line_protocol_version(self._impl, version, &err): + raise c_err_to_py(err) + self._init_buf_size = init_buf_size self._max_name_len = max_name_len self._row_complete_sender = None @@ -825,18 +852,17 @@ cdef class Buffer: """ The current number of bytes currently in the buffer. - Equivalent (but cheaper) to ``len(str(sender))``. + Equivalent (but cheaper) to ``len(bytes(buffer))``. """ return line_sender_buffer_size(self._impl) - def __str__(self) -> str: - """Return the constructed buffer as a string. Use for debugging.""" - return self._to_str() + def __bytes__(self) -> bytes: + """Return the constructed buffer as bytes. Use for debugging.""" + return self._to_bytes() - cdef inline object _to_str(self): - cdef size_t size = 0 - cdef const char* utf8 = line_sender_buffer_peek(self._impl, &size) - return PyUnicode_FromStringAndSize(utf8, size) + cdef inline object _to_bytes(self): + cdef line_sender_buffer_view view = line_sender_buffer_peek(self._impl) + return PyBytes_FromStringAndSize( view.buf, view.len) cdef inline void_int _set_marker(self) except -1: cdef line_sender_error* err = NULL @@ -905,6 +931,27 @@ cdef class Buffer: if not line_sender_buffer_column_ts_micros(self._impl, c_name, ts._value, &err): raise c_err_to_py(err) + cdef inline void_int _column_numpy( + self, line_sender_column_name c_name, cnp.ndarray arr) except -1: + cdef PyArray_Descr * dtype_ptr = cnp.PyArray_DESCR(arr) + if dtype_ptr.type_num != NPY_FLOAT64: + raise IngressError(IngressErrorCode.ArrayWriteToBufferError, 'Only support float64 array, got: %s' % str(arr.dtype)) + cdef: + size_t rank = cnp.PyArray_NDIM(arr) + const uint8_t * data_ptr + line_sender_error * err = NULL + + if rank == 0: + raise IngressError(IngressErrorCode.ArrayWriteToBufferError, 'Zero-dimensional arrays are not supported') + if rank > MAX_ARRAY_DIM: + raise IngressError(IngressErrorCode.ArrayLargeDimError, f'Max dimensions {MAX_ARRAY_DIM}, got {rank}') + data_ptr = cnp.PyArray_DATA(arr) + + if not line_sender_buffer_column_f64_arr( + self._impl, c_name, rank, cnp.PyArray_DIMS(arr), + cnp.PyArray_STRIDES(arr), data_ptr, cnp.PyArray_NBYTES(arr), &err): + raise c_err_to_py(err) + cdef inline void_int _column_dt( self, line_sender_column_name c_name, datetime dt) except -1: cdef line_sender_error* err = NULL @@ -925,6 +972,8 @@ cdef class Buffer: self._column_str(c_name, value) elif isinstance(value, TimestampMicros): self._column_ts(c_name, value) + elif PyArray_CheckExact( value): + self._column_numpy(c_name, value) elif isinstance(value, datetime): self._column_dt(c_name, value) else: @@ -934,7 +983,8 @@ cdef class Buffer: 'float', 'str', 'TimestampMicros', - 'datetime.datetime')) + 'datetime.datetime' + 'np.ndarray')) raise TypeError( f'Unsupported type: {_fqn(type(value))}. Must be one of: {valid}') @@ -1016,7 +1066,7 @@ cdef class Buffer: symbols: Optional[Dict[str, Optional[str]]]=None, columns: Optional[Dict[ str, - Union[None, bool, int, float, str, TimestampMicros, datetime]] + Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray]] ]=None, at: Union[ServerTimestamp, TimestampNanos, datetime]): """ @@ -1706,6 +1756,7 @@ cdef object parse_conf_str( 'auto_flush_rows': str, 'auto_flush_bytes': str, 'auto_flush_interval': str, + 'disable_line_protocol_version': str, 'init_buf_size': int, 'max_name_len': int, } @@ -1736,6 +1787,8 @@ cdef class Sender: cdef size_t _init_buf_size cdef size_t _max_name_len cdef bint _in_txn + cdef line_protocol_version _line_protocol_version + cdef bint _auto_detect_line_protocol_version cdef void_int _set_sender_fields( self, @@ -1759,6 +1812,7 @@ cdef class Sender: object auto_flush_rows, object auto_flush_bytes, object auto_flush_interval, + str default_line_protocol_version, object init_buf_size, object max_name_len) except -1: """ @@ -1906,11 +1960,40 @@ cdef class Sender: auto_flush_interval, &self._auto_flush_mode) + # default line protocol version is v2 for tcp/tcps and auto-detection for http/https + if self._c_protocol == line_sender_protocol_tcp or self._c_protocol == line_sender_protocol_tcps: + self._line_protocol_version = line_protocol_version_2 + self._auto_detect_line_protocol_version = False + else: + self._auto_detect_line_protocol_version = True + + if default_line_protocol_version is not None: + if default_line_protocol_version == "v1": + self._line_protocol_version = line_protocol_version_1 + self._auto_detect_line_protocol_version = False + if not line_sender_opts_disable_line_protocol_validation(self._opts, &err): + raise c_err_to_py(err) + elif default_line_protocol_version == "v2": + self._line_protocol_version = line_protocol_version_2 + self._auto_detect_line_protocol_version = False + if not line_sender_opts_disable_line_protocol_validation(self._opts, &err): + raise c_err_to_py(err) + elif default_line_protocol_version != "auto": + raise IngressError( + IngressErrorCode.ConfigError, + '"default_line_protocol_version" must be None, "auto", "v1" or "v2"' + + f'not {default_line_protocol_version!r}') + self._init_buf_size = init_buf_size or 65536 self._max_name_len = max_name_len or 127 - self._buffer = Buffer( - init_buf_size=self._init_buf_size, - max_name_len=self._max_name_len) + + # self._buffer will be constructed after establish connection for http/https. + if self._c_protocol == line_sender_protocol_tcp or self._c_protocol == line_sender_protocol_tcps: + self._buffer = Buffer( + init_buf_size=self._init_buf_size, + max_name_len=self._max_name_len, + line_protocol_version=LineProtocolVersion(self._line_protocol_version)) + self._last_flush_ms = calloc(1, sizeof(int64_t)) def __cinit__(self): @@ -1948,6 +2031,7 @@ cdef class Sender: object auto_flush_rows=None, # Default 75000 (HTTP) or 600 (TCP) object auto_flush_bytes=None, # Default off object auto_flush_interval=None, # Default 1000 milliseconds + object default_line_protocol_version=None, # Default auto object init_buf_size=None, # 64KiB object max_name_len=None): # 127 @@ -1991,6 +2075,7 @@ cdef class Sender: auto_flush_rows, auto_flush_bytes, auto_flush_interval, + default_line_protocol_version, init_buf_size, max_name_len) finally: @@ -2018,6 +2103,7 @@ cdef class Sender: object auto_flush_rows=None, # Default 75000 (HTTP) or 600 (TCP) object auto_flush_bytes=None, # Default off object auto_flush_interval=None, # Default 1000 milliseconds + object default_line_protocol_version=None, # Default auto object init_buf_size=None, # 64KiB object max_name_len=None): # 127 """ @@ -2072,6 +2158,7 @@ cdef class Sender: 'auto_flush_rows': auto_flush_rows, 'auto_flush_bytes': auto_flush_bytes, 'auto_flush_interval': auto_flush_interval, + 'default_line_protocol_version': default_line_protocol_version, 'init_buf_size': init_buf_size, 'max_name_len': max_name_len, }.items(): @@ -2112,6 +2199,7 @@ cdef class Sender: params.get('auto_flush_rows'), params.get('auto_flush_bytes'), params.get('auto_flush_interval'), + params.get('default_line_protocol_version'), params.get('init_buf_size'), params.get('max_name_len')) @@ -2140,6 +2228,7 @@ cdef class Sender: object auto_flush_rows=None, # Default 75000 (HTTP) or 600 (TCP) object auto_flush_bytes=None, # Default off object auto_flush_interval=None, # Default 1000 milliseconds + object default_line_protocol_version=None, # Default auto object init_buf_size=None, # 64KiB object max_name_len=None): # 127 """ @@ -2179,6 +2268,7 @@ cdef class Sender: auto_flush_rows=auto_flush_rows, auto_flush_bytes=auto_flush_bytes, auto_flush_interval=auto_flush_interval, + default_line_protocol_version=default_line_protocol_version, init_buf_size=init_buf_size, max_name_len=max_name_len) @@ -2192,7 +2282,8 @@ cdef class Sender: """ return Buffer( init_buf_size=self._init_buf_size, - max_name_len=self._max_name_len) + max_name_len=self._max_name_len, + line_protocol_version=self.default_line_protocol_version()) @property def init_buf_size(self) -> int: @@ -2247,6 +2338,15 @@ cdef class Sender: return None return timedelta(milliseconds=self._auto_flush_mode.interval) + def default_line_protocol_version(self) -> LineProtocolVersion: + if self._auto_detect_line_protocol_version: + if self._impl == NULL: + raise IngressError( + IngressErrorCode.InvalidApiCall, + 'default_line_protocol_version() can\'t be called: Not connected.') + return LineProtocolVersion(line_sender_default_line_protocol_version(self._impl)) + return LineProtocolVersion(self._line_protocol_version) + def establish(self): """ Prepare the sender for use. @@ -2260,13 +2360,23 @@ cdef class Sender: method will return only *after* the handshake(s) is/are complete. """ cdef line_sender_error* err = NULL + cdef PyThreadState * gs = NULL if self._opts == NULL: raise IngressError( IngressErrorCode.InvalidApiCall, 'establish() can\'t be called after close().') + _ensure_doesnt_have_gil(&gs) self._impl = line_sender_build(self._opts, &err) + _ensure_has_gil(&gs) if self._impl == NULL: raise c_err_to_py(err) + + if self._buffer is None: + self._buffer = Buffer( + init_buf_size=self._init_buf_size, + max_name_len=self._max_name_len, + line_protocol_version=self.default_line_protocol_version()) + line_sender_opts_free(self._opts) self._opts = NULL @@ -2281,21 +2391,21 @@ cdef class Sender: self.establish() return self - def __str__(self) -> str: + def __bytes__(self) -> bytes: """ Inspect the contents of the internal buffer. - The ``str`` value returned represents the unsent data. + The ``bytes`` value returned represents the unsent data. Also see :func:`Sender.__len__`. """ - return str(self._buffer) + return bytes(self._buffer) def __len__(self) -> int: """ Number of bytes of unsent data in the internal buffer. - Equivalent (but cheaper) to ``len(str(sender))``. + Equivalent (but cheaper) to ``len(bytes(sender))``. """ return len(self._buffer) @@ -2311,7 +2421,7 @@ cdef class Sender: symbols: Optional[Dict[str, str]]=None, columns: Optional[Dict[ str, - Union[bool, int, float, str, TimestampMicros, datetime]]]=None, + Union[bool, int, float, str, TimestampMicros, datetime, np.ndarray]]]=None, at: Union[TimestampNanos, datetime, ServerTimestamp]): """ Write a row to the internal buffer. diff --git a/src/questdb/line_sender.pxd b/src/questdb/line_sender.pxd index 50490ab..43f1703 100644 --- a/src/questdb/line_sender.pxd +++ b/src/questdb/line_sender.pxd @@ -22,7 +22,7 @@ ## ################################################################################ -from libc.stdint cimport int64_t, uint16_t, uint64_t +from libc.stdint cimport int64_t, uint16_t, uint64_t, uint8_t, uint32_t, int32_t cdef extern from "questdb/ingress/line_sender.h": cdef struct line_sender_error: @@ -40,6 +40,10 @@ cdef extern from "questdb/ingress/line_sender.h": line_sender_error_http_not_supported, line_sender_error_server_flush_error, line_sender_error_config_error, + line_sender_error_array_large_dim + line_sender_error_array_view_internal_error + line_sender_error_array_view_write_to_buffer_error + line_sender_error_line_protocol_version_error cdef enum line_sender_protocol: line_sender_protocol_tcp, @@ -47,6 +51,10 @@ cdef extern from "questdb/ingress/line_sender.h": line_sender_protocol_http, line_sender_protocol_https, + cdef enum line_protocol_version: + line_protocol_version_1 = 1, + line_protocol_version_2 = 2, + cdef enum line_sender_ca: line_sender_ca_webpki_roots, line_sender_ca_os_roots, @@ -102,6 +110,10 @@ cdef extern from "questdb/ingress/line_sender.h": size_t len const char* buf + cdef struct line_sender_buffer_view: + size_t len + const uint8_t* buf + bint line_sender_column_name_init( line_sender_column_name* name, size_t len, @@ -124,6 +136,12 @@ cdef extern from "questdb/ingress/line_sender.h": size_t max_name_len ) noexcept nogil + bint line_sender_buffer_set_line_protocol_version( + line_sender_buffer* buffer, + line_protocol_version version, + line_sender_error** err_out + ) noexcept nogil + void line_sender_buffer_free( line_sender_buffer* buffer ) noexcept nogil @@ -171,9 +189,8 @@ cdef extern from "questdb/ingress/line_sender.h": const line_sender_buffer* buffer ) noexcept nogil - const char* line_sender_buffer_peek( - const line_sender_buffer* buffer, - size_t* len_out + line_sender_buffer_view line_sender_buffer_peek( + const line_sender_buffer* buffer ) noexcept nogil bint line_sender_buffer_table( @@ -217,6 +234,17 @@ cdef extern from "questdb/ingress/line_sender.h": line_sender_error** err_out ) noexcept nogil + bint line_sender_buffer_column_f64_arr( + line_sender_buffer* buffer, + line_sender_column_name name, + size_t rank, + const size_t* shapes, + const ssize_t* strides, + const uint8_t* data_buffer, + size_t data_buffer_len, + line_sender_error** err_out + ) noexcept nogil + bint line_sender_buffer_column_ts_nanos( line_sender_buffer* buffer, line_sender_column_name name, @@ -311,6 +339,11 @@ cdef extern from "questdb/ingress/line_sender.h": line_sender_error** err_out ) noexcept nogil + bint line_sender_opts_disable_line_protocol_validation( + line_sender_opts* opts, + line_sender_error** err_out + ) noexcept nogil + bint line_sender_opts_auth_timeout( line_sender_opts* opts, uint64_t millis, @@ -381,6 +414,9 @@ cdef extern from "questdb/ingress/line_sender.h": line_sender_error** err_out ) noexcept nogil + line_protocol_version line_sender_default_line_protocol_version( + const line_sender * sender); + bint line_sender_must_close( const line_sender* sender ) noexcept nogil diff --git a/test/common_tools.py b/test/common_tools.py new file mode 100644 index 0000000..69da3ae --- /dev/null +++ b/test/common_tools.py @@ -0,0 +1,53 @@ + +import struct +import numpy as np + +ARRAY_TYPE_TAGS = { + np.float64: 10, +} + +import math +import struct + +def _float_binary_bytes(value: float, text_format: bool = False) -> bytes: + if text_format: + if math.isnan(value): + return b'=NaN' + elif math.isinf(value): + return f'={"-Infinity" if value < 0 else "Infinity"}'.encode('utf-8') + else: + return f'={value}'.encode('utf-8').replace(b'+', b'') + else: + return b'==' + struct.pack(' bytes: + header = b'=' + format_type = struct.pack(' len(buf): + break + index = new_index + continue + + if index > 0 and buf[index] == ord('\n') and buf[index - 1] != ord('\\'): + new_msgs.append(buf[head:index]) + head = index + 1 + + index += 1 + self.msgs.extend(new_msgs) return new_msgs + def _parse_binary_data(self, buf, index): + if buf[index] != ord('=') or index + 1 >= len(buf) or buf[index + 1] != ord('='): + return index + + index += 2 # skip "==" + if index >= len(buf): + return index + binary_type = buf[index] + index += 1 + + if binary_type == 16: + index += 8 + elif binary_type == 14: + # dims + if index + 1 >= len(buf): + return index + index += 1 + if index >= len(buf): + return index + dims = buf[index] + index += 1 + + total_elements = 1 + for _ in range(dims): + if index + 4 > len(buf): + return index + dim_size = struct.unpack('.: .*insert null .*boolean col'): - _dataframe(df2, table_name='tbl1', at=qi.ServerTimestamp) - - def test_bool_obj_col(self): - df = pd.DataFrame({'a': pd.Series([ - True, False, False, - False, True, False], - dtype='object')}) - buf = _dataframe(df, table_name='tbl1', at=qi.ServerTimestamp) - self.assertEqual( - buf, - 'tbl1 a=t\n' + - 'tbl1 a=f\n' + - 'tbl1 a=f\n' + - 'tbl1 a=f\n' + - 'tbl1 a=t\n' + - 'tbl1 a=f\n') - - df2 = pd.DataFrame({'a': pd.Series([ - True, False, 'false'], - dtype='object')}) - with self.assertRaisesRegex( - qi.IngressError, - 'serialize .* column .a. .* 2 .*false.*bool'): - _dataframe(df2, table_name='tbl1', at=qi.ServerTimestamp) - - df3 = pd.DataFrame({'a': pd.Series([ - None, True, False], - dtype='object')}) - with self.assertRaisesRegex( - qi.IngressError, - 'serialize.*\\(None\\): Cannot insert null.*boolean column'): - _dataframe(df3, table_name='tbl1', at=qi.ServerTimestamp) - - def test_datetime64_numpy_col(self): - df = pd.DataFrame({ - 'a': pd.Series([ - pd.Timestamp('2019-01-01 00:00:00'), - pd.Timestamp('2019-01-01 00:00:01'), - pd.Timestamp('2019-01-01 00:00:02'), - pd.Timestamp('2019-01-01 00:00:03'), - pd.Timestamp('2019-01-01 00:00:04'), - pd.Timestamp('2019-01-01 00:00:05'), - None, - float('nan'), - pd.NA], - dtype='datetime64[ns]'), - 'b': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']}) - buf = _dataframe(df, table_name='tbl1', at=qi.ServerTimestamp) - self.assertEqual( - buf, - 'tbl1 a=1546300800000000t,b="a"\n' + - 'tbl1 a=1546300801000000t,b="b"\n' + - 'tbl1 a=1546300802000000t,b="c"\n' + - 'tbl1 a=1546300803000000t,b="d"\n' + - 'tbl1 a=1546300804000000t,b="e"\n' + - 'tbl1 a=1546300805000000t,b="f"\n' + - 'tbl1 b="g"\n' + - 'tbl1 b="h"\n' + - 'tbl1 b="i"\n') - - df = pd.DataFrame({'a': pd.Series([ - pd.Timestamp('1970-01-01 00:00:00'), - pd.Timestamp('1970-01-01 00:00:01'), - pd.Timestamp('1970-01-01 00:00:02')])}) - buf = _dataframe(df, table_name='tbl1', at=qi.ServerTimestamp) - self.assertEqual( - buf, - 'tbl1 a=0t\n' + - 'tbl1 a=1000000t\n' + - 'tbl1 a=2000000t\n') - - def test_datetime64_tz_arrow_col(self): - df = pd.DataFrame({ - 'a': [ - pd.Timestamp( - year=2019, month=1, day=1, - hour=0, minute=0, second=0, tz=_TZ), - pd.Timestamp( - year=2019, month=1, day=1, - hour=0, minute=0, second=1, tz=_TZ), - None, - pd.Timestamp( - year=2019, month=1, day=1, - hour=0, minute=0, second=3, tz=_TZ)], - 'b': ['sym1', 'sym2', 'sym3', 'sym4']}) - buf = _dataframe(df, table_name='tbl1', symbols=['b'], at=qi.ServerTimestamp) - self.assertEqual( - buf, - # Note how these are 5hr offset from `test_datetime64_numpy_col`. - 'tbl1,b=sym1 a=1546318800000000t\n' + - 'tbl1,b=sym2 a=1546318801000000t\n' + - 'tbl1,b=sym3\n' + - 'tbl1,b=sym4 a=1546318803000000t\n') - - # Not epoch 0. - df = pd.DataFrame({ - 'a': [ - pd.Timestamp( - year=1970, month=1, day=1, - hour=0, minute=0, second=0, tz=_TZ), - pd.Timestamp( - year=1970, month=1, day=1, - hour=0, minute=0, second=1, tz=_TZ), - pd.Timestamp( - year=1970, month=1, day=1, - hour=0, minute=0, second=2, tz=_TZ)], - 'b': ['sym1', 'sym2', 'sym3']}) - buf = _dataframe(df, table_name='tbl1', symbols=['b'], at=qi.ServerTimestamp) - self.assertEqual( - buf, - # Note how these are 5hr offset from `test_datetime64_numpy_col`. - 'tbl1,b=sym1 a=18000000000t\n' + - 'tbl1,b=sym2 a=18001000000t\n' + - 'tbl1,b=sym3 a=18002000000t\n') - - # Actual epoch 0. - df = pd.DataFrame({ - 'a': [ - pd.Timestamp( - year=1969, month=12, day=31, - hour=19, minute=0, second=0, tz=_TZ), - pd.Timestamp( - year=1969, month=12, day=31, - hour=19, minute=0, second=1, tz=_TZ), - pd.Timestamp( - year=1969, month=12, day=31, - hour=19, minute=0, second=2, tz=_TZ)], - 'b': ['sym1', 'sym2', 'sym3']}) - buf = _dataframe(df, table_name='tbl1', symbols=['b'], at=qi.ServerTimestamp) - self.assertEqual( - buf, - 'tbl1,b=sym1 a=0t\n' + - 'tbl1,b=sym2 a=1000000t\n' + - 'tbl1,b=sym3 a=2000000t\n') - - df2 = pd.DataFrame({ - 'a': [ - pd.Timestamp( - year=1900, month=1, day=1, - hour=0, minute=0, second=0, tz=_TZ)], - 'b': ['sym1']}) - buf = _dataframe(df2, table_name='tbl1', symbols=['b'], at=qi.ServerTimestamp) - - # Accounting for different datatime library differences. - # Mostly, here assert that negative timestamps are allowed. - self.assertIn( - buf, - ['tbl1,b=sym1 a=-2208970800000000t\n', - 'tbl1,b=sym1 a=-2208971040000000t\n']) - - def test_datetime64_numpy_at(self): - df = pd.DataFrame({ - 'a': pd.Series([ - pd.Timestamp('2019-01-01 00:00:00'), - pd.Timestamp('2019-01-01 00:00:01'), - pd.Timestamp('2019-01-01 00:00:02'), - pd.Timestamp('2019-01-01 00:00:03'), - pd.Timestamp('2019-01-01 00:00:04'), - pd.Timestamp('2019-01-01 00:00:05'), - float('nan'), - None, - pd.NaT], - dtype='datetime64[ns]'), - 'b': [1, 2, 3, 4, 5, 6, 7, 8, 9]}) - buf = _dataframe(df, table_name='tbl1', at='a') - self.assertEqual( - buf, - 'tbl1 b=1i 1546300800000000000\n' + - 'tbl1 b=2i 1546300801000000000\n' + - 'tbl1 b=3i 1546300802000000000\n' + - 'tbl1 b=4i 1546300803000000000\n' + - 'tbl1 b=5i 1546300804000000000\n' + - 'tbl1 b=6i 1546300805000000000\n' + - 'tbl1 b=7i\n' + - 'tbl1 b=8i\n' + - 'tbl1 b=9i\n') - - df = pd.DataFrame({ - 'a': pd.Series([ + 1.7976931348623157e308], # f64 max + dtype='float64')}) + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 a' + _float_binary_bytes(1.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n' + + b'tbl1 a' + _float_binary_bytes(2.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n' + + b'tbl1 a' + _float_binary_bytes(3.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n' + + b'tbl1 a' + _float_binary_bytes(0.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n' + + b'tbl1 a' + _float_binary_bytes(float('inf'), self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n' + + b'tbl1 a' + _float_binary_bytes(float('-inf'), self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n' + + b'tbl1 a' + _float_binary_bytes(float('NAN'), self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n' + + b'tbl1 a' + _float_binary_bytes(1.7976931348623157e308, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n') + + def test_u8_arrow_col(self): + df = pd.DataFrame({ + 'a': pd.Series([ + 1, 2, 3, + 0, + None, + 255], # u8 max + dtype=pd.UInt8Dtype()), + 'b': ['a', 'b', 'c', 'd', 'e', 'f']}) + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 a=1i,b="a"\n' + + b'tbl1 a=2i,b="b"\n' + + b'tbl1 a=3i,b="c"\n' + + b'tbl1 a=0i,b="d"\n' + + b'tbl1 b="e"\n' + + b'tbl1 a=255i,b="f"\n') + + def test_i8_arrow_col(self): + df = pd.DataFrame({ + 'a': pd.Series([ + 1, 2, 3, + -128, # i8 min + 0, + None, + 127], # i8 max + dtype=pd.Int8Dtype()), + 'b': ['a', 'b', 'c', 'd', 'e', 'f', 'g']}) + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 a=1i,b="a"\n' + + b'tbl1 a=2i,b="b"\n' + + b'tbl1 a=3i,b="c"\n' + + b'tbl1 a=-128i,b="d"\n' + + b'tbl1 a=0i,b="e"\n' + + b'tbl1 b="f"\n' + + b'tbl1 a=127i,b="g"\n') + + def test_u16_arrow_col(self): + df = pd.DataFrame({ + 'a': pd.Series([ + 1, 2, 3, + 0, + None, + 65535], # u16 max + dtype=pd.UInt16Dtype()), + 'b': ['a', 'b', 'c', 'd', 'e', 'f']}) + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + ('tbl1 a=1i,b="a"\n' + + 'tbl1 a=2i,b="b"\n' + + 'tbl1 a=3i,b="c"\n' + + 'tbl1 a=0i,b="d"\n' + + 'tbl1 b="e"\n' + + 'tbl1 a=65535i,b="f"\n').encode('utf-8')) + + def test_i16_arrow_col(self): + df = pd.DataFrame({ + 'a': pd.Series([ + 1, 2, 3, + -32768, # i16 min + 0, + None, + 32767], # i16 max + dtype=pd.Int16Dtype()), + 'b': ['a', 'b', 'c', 'd', 'e', 'f', 'g']}) + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 a=1i,b="a"\n' + + b'tbl1 a=2i,b="b"\n' + + b'tbl1 a=3i,b="c"\n' + + b'tbl1 a=-32768i,b="d"\n' + + b'tbl1 a=0i,b="e"\n' + + b'tbl1 b="f"\n' + + b'tbl1 a=32767i,b="g"\n') + + def test_u32_arrow_col(self): + df = pd.DataFrame({ + 'a': pd.Series([ + 1, 2, 3, + 0, + None, + 4294967295], # u32 max + dtype=pd.UInt32Dtype()), + 'b': ['a', 'b', 'c', 'd', 'e', 'f']}) + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 a=1i,b="a"\n' + + b'tbl1 a=2i,b="b"\n' + + b'tbl1 a=3i,b="c"\n' + + b'tbl1 a=0i,b="d"\n' + + b'tbl1 b="e"\n' + + b'tbl1 a=4294967295i,b="f"\n') + + def test_i32_arrow_col(self): + df = pd.DataFrame({ + 'a': pd.Series([ + 1, 2, 3, + -2147483648, # i32 min + 0, + None, + 2147483647], # i32 max + dtype=pd.Int32Dtype()), + 'b': ['a', 'b', 'c', 'd', 'e', 'f', 'g']}) + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 a=1i,b="a"\n' + + b'tbl1 a=2i,b="b"\n' + + b'tbl1 a=3i,b="c"\n' + + b'tbl1 a=-2147483648i,b="d"\n' + + b'tbl1 a=0i,b="e"\n' + + b'tbl1 b="f"\n' + + b'tbl1 a=2147483647i,b="g"\n') + + def test_u64_arrow_col(self): + df = pd.DataFrame({ + 'a': pd.Series([ + 1, 2, 3, + 0, + None, + 9223372036854775807], # i64 max + dtype=pd.UInt64Dtype()), + 'b': ['a', 'b', 'c', 'd', 'e', 'f']}) + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 a=1i,b="a"\n' + + b'tbl1 a=2i,b="b"\n' + + b'tbl1 a=3i,b="c"\n' + + b'tbl1 a=0i,b="d"\n' + + b'tbl1 b="e"\n' + + b'tbl1 a=9223372036854775807i,b="f"\n') + + df2 = pd.DataFrame({'a': pd.Series([ + 1, 2, 3, + 0, + 9223372036854775808], # i64 max + 1 + dtype=pd.UInt64Dtype())}) + with self.assertRaisesRegex( + qi.IngressError, + '.* serialize .* column .a. .* 4 .*9223372036854775808.*int64.*'): + _dataframe(self.version, df2, table_name='tbl1', at=qi.ServerTimestamp) + + def test_i64_arrow_col(self): + df = pd.DataFrame({ + 'a': pd.Series([ + 1, 2, 3, + -9223372036854775808, # i64 min + 0, + None, + 9223372036854775807], # i64 max + dtype=pd.Int64Dtype()), + 'b': ['a', 'b', 'c', 'd', 'e', 'f', 'g']}) + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 a=1i,b="a"\n' + + b'tbl1 a=2i,b="b"\n' + + b'tbl1 a=3i,b="c"\n' + + b'tbl1 a=-9223372036854775808i,b="d"\n' + + b'tbl1 a=0i,b="e"\n' + + b'tbl1 b="f"\n' + + b'tbl1 a=9223372036854775807i,b="g"\n') + + def test_f32_arrow_col(self): + df = pd.DataFrame({ + 'a': pd.Series([ + 1.0, 2.0, 3.0, + 0.0, + float('inf'), + float('-inf'), + float('nan'), + 3.4028234663852886e38, # f32 max + None], + dtype=pd.Float32Dtype()), + 'b': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']}) + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 a' + _float_binary_bytes(1.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b="a"\n' + + b'tbl1 a' + _float_binary_bytes(2.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b="b"\n' + + b'tbl1 a' + _float_binary_bytes(3.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b="c"\n' + + b'tbl1 a' + _float_binary_bytes(0.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b="d"\n' + + b'tbl1 a' + _float_binary_bytes(float('inf'), self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b="e"\n' + + b'tbl1 a' + _float_binary_bytes(float('-inf'), self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b="f"\n' + + b'tbl1 b="g"\n' + # This one is wierd: `nan` gets 0 in the bitmask. + b'tbl1 a' + _float_binary_bytes(3.4028234663852886e38, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b="h"\n' + + b'tbl1 b="i"\n') + + def test_f64_arrow_col(self): + df = pd.DataFrame({ + 'a': pd.Series([ + 1.0, 2.0, 3.0, + 0.0, + float('inf'), + float('-inf'), + float('nan'), + 1.7976931348623157e308, # f64 max + None], + dtype=pd.Float64Dtype()), + 'b': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']}) + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 a' + _float_binary_bytes(1.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b="a"\n' + + b'tbl1 a' + _float_binary_bytes(2.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b="b"\n' + + b'tbl1 a' + _float_binary_bytes(3.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b="c"\n' + + b'tbl1 a' + _float_binary_bytes(0.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b="d"\n' + + b'tbl1 a' + _float_binary_bytes(float('inf'), self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b="e"\n' + + b'tbl1 a' + _float_binary_bytes(float('-inf'), self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b="f"\n' + + b'tbl1 b="g"\n' + # This one is wierd: `nan` gets 0 in the bitmask. + b'tbl1 a' + _float_binary_bytes(1.7976931348623157e308, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b="h"\n' + + b'tbl1 b="i"\n') + + def test_bool_numpy_col(self): + df = pd.DataFrame({'a': pd.Series([ + True, False, False, + False, True, False], + dtype='bool')}) + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 a=t\n' + + b'tbl1 a=f\n' + + b'tbl1 a=f\n' + + b'tbl1 a=f\n' + + b'tbl1 a=t\n' + + b'tbl1 a=f\n') + + def test_bool_arrow_col(self): + df = pd.DataFrame({'a': pd.Series([ + True, False, False, + False, True, False, + True, True, True, + False, False, False], + dtype='boolean')}) # Note `boolean` != `bool`. + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 a=t\n' + + b'tbl1 a=f\n' + + b'tbl1 a=f\n' + + b'tbl1 a=f\n' + + b'tbl1 a=t\n' + + b'tbl1 a=f\n' + + b'tbl1 a=t\n' + + b'tbl1 a=t\n' + + b'tbl1 a=t\n' + + b'tbl1 a=f\n' + + b'tbl1 a=f\n' + + b'tbl1 a=f\n') + + df2 = pd.DataFrame({'a': pd.Series([ + True, False, False, + None, True, False], + dtype='boolean')}) + with self.assertRaisesRegex( + qi.IngressError, + 'Failed.*at row index 3 .*.: .*insert null .*boolean col'): + _dataframe(self.version, df2, table_name='tbl1', at=qi.ServerTimestamp) + + def test_bool_obj_col(self): + df = pd.DataFrame({'a': pd.Series([ + True, False, False, + False, True, False], + dtype='object')}) + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 a=t\n' + + b'tbl1 a=f\n' + + b'tbl1 a=f\n' + + b'tbl1 a=f\n' + + b'tbl1 a=t\n' + + b'tbl1 a=f\n') + + df2 = pd.DataFrame({'a': pd.Series([ + True, False, 'false'], + dtype='object')}) + with self.assertRaisesRegex( + qi.IngressError, + 'serialize .* column .a. .* 2 .*false.*bool'): + _dataframe(self.version, df2, table_name='tbl1', at=qi.ServerTimestamp) + + df3 = pd.DataFrame({'a': pd.Series([ + None, True, False], + dtype='object')}) + with self.assertRaisesRegex( + qi.IngressError, + 'serialize.*\\(None\\): Cannot insert null.*boolean column'): + _dataframe(self.version, df3, table_name='tbl1', at=qi.ServerTimestamp) + + def test_datetime64_numpy_col(self): + df = pd.DataFrame({ + 'a': pd.Series([ + pd.Timestamp('2019-01-01 00:00:00'), + pd.Timestamp('2019-01-01 00:00:01'), + pd.Timestamp('2019-01-01 00:00:02'), + pd.Timestamp('2019-01-01 00:00:03'), + pd.Timestamp('2019-01-01 00:00:04'), + pd.Timestamp('2019-01-01 00:00:05'), + None, + float('nan'), + pd.NA], + dtype='datetime64[ns]'), + 'b': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']}) + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 a=1546300800000000t,b="a"\n' + + b'tbl1 a=1546300801000000t,b="b"\n' + + b'tbl1 a=1546300802000000t,b="c"\n' + + b'tbl1 a=1546300803000000t,b="d"\n' + + b'tbl1 a=1546300804000000t,b="e"\n' + + b'tbl1 a=1546300805000000t,b="f"\n' + + b'tbl1 b="g"\n' + + b'tbl1 b="h"\n' + + b'tbl1 b="i"\n') + + df = pd.DataFrame({'a': pd.Series([ pd.Timestamp('1970-01-01 00:00:00'), pd.Timestamp('1970-01-01 00:00:01'), - pd.Timestamp('1970-01-01 00:00:02')], - dtype='datetime64[ns]'), - 'b': [1, 2, 3]}) - buf = _dataframe(df, table_name='tbl1', at='a') - self.assertEqual( - buf, - 'tbl1 b=1i 0\n' + - 'tbl1 b=2i 1000000000\n' + - 'tbl1 b=3i 2000000000\n') - - def test_datetime64_tz_arrow_at(self): - df = pd.DataFrame({ - 'a': [ - pd.Timestamp( - year=2019, month=1, day=1, - hour=0, minute=0, second=0, tz=_TZ), - pd.Timestamp( - year=2019, month=1, day=1, - hour=0, minute=0, second=1, tz=_TZ), - None, - pd.Timestamp( - year=2019, month=1, day=1, - hour=0, minute=0, second=3, tz=_TZ)], - 'b': ['sym1', 'sym2', 'sym3', 'sym4']}) - buf = _dataframe(df, table_name='tbl1', symbols=['b'], at='a') - self.assertEqual( - buf, - # Note how these are 5hr offset from `test_datetime64_numpy_col`. - 'tbl1,b=sym1 1546318800000000000\n' + - 'tbl1,b=sym2 1546318801000000000\n' + - 'tbl1,b=sym3\n' + - 'tbl1,b=sym4 1546318803000000000\n') - - df2 = pd.DataFrame({ - 'a': [ - pd.Timestamp( - year=1900, month=1, day=1, - hour=0, minute=0, second=0, tz=_TZ)], - 'b': ['sym1']}) - with self.assertRaisesRegex( - qi.IngressError, "Failed.*'a'.*-220897.* is neg"): - _dataframe(df2, table_name='tbl1', symbols=['b'], at='a') - - def _test_pyobjstr_table(self, dtype): - df = pd.DataFrame({ - '../bad col name/../it does not matter...': - pd.Series([ + pd.Timestamp('1970-01-01 00:00:02')])}) + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 a=0t\n' + + b'tbl1 a=1000000t\n' + + b'tbl1 a=2000000t\n') + + def test_datetime64_tz_arrow_col(self): + df = pd.DataFrame({ + 'a': [ + pd.Timestamp( + year=2019, month=1, day=1, + hour=0, minute=0, second=0, tz=_TZ), + pd.Timestamp( + year=2019, month=1, day=1, + hour=0, minute=0, second=1, tz=_TZ), + None, + pd.Timestamp( + year=2019, month=1, day=1, + hour=0, minute=0, second=3, tz=_TZ)], + 'b': ['sym1', 'sym2', 'sym3', 'sym4']}) + buf = _dataframe(self.version, df, table_name='tbl1', symbols=['b'], at=qi.ServerTimestamp) + self.assertEqual( + buf, + # Note how these are 5hr offset from `test_datetime64_numpy_col`. + b'tbl1,b=sym1 a=1546318800000000t\n' + + b'tbl1,b=sym2 a=1546318801000000t\n' + + b'tbl1,b=sym3\n' + + b'tbl1,b=sym4 a=1546318803000000t\n') + + # Not epoch 0. + df = pd.DataFrame({ + 'a': [ + pd.Timestamp( + year=1970, month=1, day=1, + hour=0, minute=0, second=0, tz=_TZ), + pd.Timestamp( + year=1970, month=1, day=1, + hour=0, minute=0, second=1, tz=_TZ), + pd.Timestamp( + year=1970, month=1, day=1, + hour=0, minute=0, second=2, tz=_TZ)], + 'b': ['sym1', 'sym2', 'sym3']}) + buf = _dataframe(self.version, df, table_name='tbl1', symbols=['b'], at=qi.ServerTimestamp) + self.assertEqual( + buf, + # Note how these are 5hr offset from `test_datetime64_numpy_col`. + b'tbl1,b=sym1 a=18000000000t\n' + + b'tbl1,b=sym2 a=18001000000t\n' + + b'tbl1,b=sym3 a=18002000000t\n') + + # Actual epoch 0. + df = pd.DataFrame({ + 'a': [ + pd.Timestamp( + year=1969, month=12, day=31, + hour=19, minute=0, second=0, tz=_TZ), + pd.Timestamp( + year=1969, month=12, day=31, + hour=19, minute=0, second=1, tz=_TZ), + pd.Timestamp( + year=1969, month=12, day=31, + hour=19, minute=0, second=2, tz=_TZ)], + 'b': ['sym1', 'sym2', 'sym3']}) + buf = _dataframe(self.version, df, table_name='tbl1', symbols=['b'], at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1,b=sym1 a=0t\n' + + b'tbl1,b=sym2 a=1000000t\n' + + b'tbl1,b=sym3 a=2000000t\n') + + df2 = pd.DataFrame({ + 'a': [ + pd.Timestamp( + year=1900, month=1, day=1, + hour=0, minute=0, second=0, tz=_TZ)], + 'b': ['sym1']}) + buf = _dataframe(self.version, df2, table_name='tbl1', symbols=['b'], at=qi.ServerTimestamp) + + # Accounting for different datatime library differences. + # Mostly, here assert that negative timestamps are allowed. + self.assertIn( + buf, + [b'tbl1,b=sym1 a=-2208970800000000t\n', + b'tbl1,b=sym1 a=-2208971040000000t\n']) + + def test_datetime64_numpy_at(self): + df = pd.DataFrame({ + 'a': pd.Series([ + pd.Timestamp('2019-01-01 00:00:00'), + pd.Timestamp('2019-01-01 00:00:01'), + pd.Timestamp('2019-01-01 00:00:02'), + pd.Timestamp('2019-01-01 00:00:03'), + pd.Timestamp('2019-01-01 00:00:04'), + pd.Timestamp('2019-01-01 00:00:05'), + float('nan'), + None, + pd.NaT], + dtype='datetime64[ns]'), + 'b': [1, 2, 3, 4, 5, 6, 7, 8, 9]}) + buf = _dataframe(self.version, df, table_name='tbl1', at='a') + self.assertEqual( + buf, + b'tbl1 b=1i 1546300800000000000\n' + + b'tbl1 b=2i 1546300801000000000\n' + + b'tbl1 b=3i 1546300802000000000\n' + + b'tbl1 b=4i 1546300803000000000\n' + + b'tbl1 b=5i 1546300804000000000\n' + + b'tbl1 b=6i 1546300805000000000\n' + + b'tbl1 b=7i\n' + + b'tbl1 b=8i\n' + + b'tbl1 b=9i\n') + + df = pd.DataFrame({ + 'a': pd.Series([ + pd.Timestamp('1970-01-01 00:00:00'), + pd.Timestamp('1970-01-01 00:00:01'), + pd.Timestamp('1970-01-01 00:00:02')], + dtype='datetime64[ns]'), + 'b': [1, 2, 3]}) + buf = _dataframe(self.version, df, table_name='tbl1', at='a') + self.assertEqual( + buf, + b'tbl1 b=1i 0\n' + + b'tbl1 b=2i 1000000000\n' + + b'tbl1 b=3i 2000000000\n') + + def test_datetime64_tz_arrow_at(self): + df = pd.DataFrame({ + 'a': [ + pd.Timestamp( + year=2019, month=1, day=1, + hour=0, minute=0, second=0, tz=_TZ), + pd.Timestamp( + year=2019, month=1, day=1, + hour=0, minute=0, second=1, tz=_TZ), + None, + pd.Timestamp( + year=2019, month=1, day=1, + hour=0, minute=0, second=3, tz=_TZ)], + 'b': ['sym1', 'sym2', 'sym3', 'sym4']}) + buf = _dataframe(self.version, df, table_name='tbl1', symbols=['b'], at='a') + self.assertEqual( + buf, + # Note how these are 5hr offset from `test_datetime64_numpy_col`. + b'tbl1,b=sym1 1546318800000000000\n' + + b'tbl1,b=sym2 1546318801000000000\n' + + b'tbl1,b=sym3\n' + + b'tbl1,b=sym4 1546318803000000000\n') + + df2 = pd.DataFrame({ + 'a': [ + pd.Timestamp( + year=1900, month=1, day=1, + hour=0, minute=0, second=0, tz=_TZ)], + 'b': ['sym1']}) + with self.assertRaisesRegex( + qi.IngressError, "Failed.*'a'.*-220897.* is neg"): + _dataframe(self.version, df2, table_name='tbl1', symbols=['b'], at='a') + + def _test_pyobjstr_table(self, dtype): + df = pd.DataFrame({ + '../bad col name/../it does not matter...': + pd.Series([ + 'a', # ASCII + 'b' * 127, # Max table name length. + 'q❤️p', # Mixed ASCII and UCS-2 + '嚜꓂', # UCS-2, 3 bytes for UTF-8. + '💩🦞'], # UCS-4, 4 bytes for UTF-8. + dtype=dtype), + 'b': [1, 2, 3, 4, 5]}) + buf = _dataframe(self.version, df, table_name_col=0, at=qi.ServerTimestamp) + self.assertEqual( + buf, + ('a b=1i\n' + + ('b' * 127) + ' b=2i\n' + + 'q❤️p b=3i\n' + + '嚜꓂ b=4i\n' + + '💩🦞 b=5i\n').encode("utf-8")) + + with self.assertRaisesRegex( + qi.IngressError, "Too long"): + _dataframe(self.version, + pd.DataFrame({'a': pd.Series(['b' * 128], dtype=dtype)}), + table_name_col='a', at=qi.ServerTimestamp) + + with self.assertRaisesRegex( + qi.IngressError, 'Failed.*Expected a table name, got a null.*'): + _dataframe(self.version, + pd.DataFrame({ + '.': pd.Series(['x', None], dtype=dtype), + 'b': [1, 2]}), + table_name_col='.', at=qi.ServerTimestamp) + + with self.assertRaisesRegex( + qi.IngressError, 'Failed.*Expected a table name, got a null.*'): + _dataframe(self.version, + pd.DataFrame({ + '.': pd.Series(['x', float('nan')], dtype=dtype), + 'b': [1, 2]}), + table_name_col='.', at=qi.ServerTimestamp) + + with self.assertRaisesRegex( + qi.IngressError, 'Failed.*Expected a table name, got a null.*'): + _dataframe(self.version, + pd.DataFrame({ + '.': pd.Series(['x', pd.NA], dtype=dtype), + 'b': [1, 2]}), + table_name_col='.', at=qi.ServerTimestamp) + + with self.assertRaisesRegex( + qi.IngressError, "''.*must have a non-zero length"): + _dataframe(self.version, + pd.DataFrame({ + '/': pd.Series([''], dtype=dtype), + 'b': [1]}), + table_name_col='/', at=qi.ServerTimestamp) + + with self.assertRaisesRegex( + qi.IngressError, "'tab..1'.*invalid dot `\\.` at position 4"): + _dataframe(self.version, + pd.DataFrame({ + '/': pd.Series(['tab..1'], dtype=dtype), + 'b': [1]}), + table_name_col='/', at=qi.ServerTimestamp) + + def test_obj_str_table(self): + self._test_pyobjstr_table('object') + + with self.assertRaisesRegex( + qi.IngressError, 'table name .*got an object of type int'): + _dataframe(self.version, + pd.DataFrame({ + '.': pd.Series(['x', 42], dtype='object'), + 'z': [1, 2]}), + table_name_col='.', at=qi.ServerTimestamp) + + def test_obj_string_table(self): + self._test_pyobjstr_table('string') + + self.assertEqual( + _dataframe(self.version, + pd.DataFrame({ + '.': pd.Series(['x', 42], dtype='string'), + 'z': [1, 2]}), + table_name_col='.', at=qi.ServerTimestamp), + b'x z=1i\n' + + b'42 z=2i\n') + + def _test_pyobjstr_numpy_symbol(self, dtype): + df = pd.DataFrame({'a': pd.Series([ 'a', # ASCII - 'b' * 127, # Max table name length. 'q❤️p', # Mixed ASCII and UCS-2 + '❤️' * 1200, # Over the 1024 buffer prealloc. + 'Questo è un qualcosa', # Non-ASCII UCS-1 + 'щось', # UCS-2, 2 bytes for UTF-8. + '', # Empty string '嚜꓂', # UCS-2, 3 bytes for UTF-8. '💩🦞'], # UCS-4, 4 bytes for UTF-8. - dtype=dtype), - 'b': [1, 2, 3, 4, 5]}) - buf = _dataframe(df, table_name_col=0, at=qi.ServerTimestamp) - self.assertEqual( - buf, - 'a b=1i\n' + - ('b' * 127) + ' b=2i\n' + - 'q❤️p b=3i\n' + - '嚜꓂ b=4i\n' + - '💩🦞 b=5i\n') - - with self.assertRaisesRegex( - qi.IngressError, "Too long"): - _dataframe( - pd.DataFrame({'a': pd.Series(['b' * 128], dtype=dtype)}), - table_name_col='a', at=qi.ServerTimestamp) - - with self.assertRaisesRegex( - qi.IngressError, 'Failed.*Expected a table name, got a null.*'): - _dataframe( - pd.DataFrame({ - '.': pd.Series(['x', None], dtype=dtype), - 'b': [1, 2]}), - table_name_col='.', at=qi.ServerTimestamp) - - with self.assertRaisesRegex( - qi.IngressError, 'Failed.*Expected a table name, got a null.*'): - _dataframe( - pd.DataFrame({ - '.': pd.Series(['x', float('nan')], dtype=dtype), - 'b': [1, 2]}), - table_name_col='.', at=qi.ServerTimestamp) - - with self.assertRaisesRegex( - qi.IngressError, 'Failed.*Expected a table name, got a null.*'): - _dataframe( - pd.DataFrame({ - '.': pd.Series(['x', pd.NA], dtype=dtype), - 'b': [1, 2]}), - table_name_col='.', at=qi.ServerTimestamp) - - with self.assertRaisesRegex( - qi.IngressError, "''.*must have a non-zero length"): - _dataframe( - pd.DataFrame({ - '/': pd.Series([''], dtype=dtype), - 'b': [1]}), - table_name_col='/', at=qi.ServerTimestamp) - - with self.assertRaisesRegex( - qi.IngressError, "'tab..1'.*invalid dot `\\.` at position 4"): - _dataframe( - pd.DataFrame({ - '/': pd.Series(['tab..1'], dtype=dtype), - 'b': [1]}), - table_name_col='/', at=qi.ServerTimestamp) - - def test_obj_str_table(self): - self._test_pyobjstr_table('object') - - with self.assertRaisesRegex( - qi.IngressError, 'table name .*got an object of type int'): - _dataframe( - pd.DataFrame({ - '.': pd.Series(['x', 42], dtype='object'), - 'z': [1, 2]}), - table_name_col='.', at=qi.ServerTimestamp) - - def test_obj_string_table(self): - self._test_pyobjstr_table('string') - - self.assertEqual( - _dataframe( - pd.DataFrame({ - '.': pd.Series(['x', 42], dtype='string'), - 'z': [1, 2]}), - table_name_col='.', at=qi.ServerTimestamp), - 'x z=1i\n' + - '42 z=2i\n') - - def _test_pyobjstr_numpy_symbol(self, dtype): - df = pd.DataFrame({'a': pd.Series([ - 'a', # ASCII - 'q❤️p', # Mixed ASCII and UCS-2 - '❤️' * 1200, # Over the 1024 buffer prealloc. - 'Questo è un qualcosa', # Non-ASCII UCS-1 - 'щось', # UCS-2, 2 bytes for UTF-8. - '', # Empty string - '嚜꓂', # UCS-2, 3 bytes for UTF-8. - '💩🦞'], # UCS-4, 4 bytes for UTF-8. - dtype=dtype)}) - buf = _dataframe(df, table_name='tbl1', symbols=True, at=qi.ServerTimestamp) - self.assertEqual( - buf, - 'tbl1,a=a\n' + - 'tbl1,a=q❤️p\n' + - 'tbl1,a=' + ('❤️' * 1200) + '\n' + - 'tbl1,a=Questo\\ è\\ un\\ qualcosa\n' + - 'tbl1,a=щось\n' + - 'tbl1,a=\n' + - 'tbl1,a=嚜꓂\n' + - 'tbl1,a=💩🦞\n') - - for null_obj in (None, float('nan'), pd.NA): + dtype=dtype)}) + buf = _dataframe(self.version, df, table_name='tbl1', symbols=True, at=qi.ServerTimestamp) self.assertEqual( + buf, + ('tbl1,a=a\n' + + 'tbl1,a=q❤️p\n' + + 'tbl1,a=' + ('❤️' * 1200) + '\n' + + 'tbl1,a=Questo\\ è\\ un\\ qualcosa\n' + + 'tbl1,a=щось\n' + + 'tbl1,a=\n' + + 'tbl1,a=嚜꓂\n' + + 'tbl1,a=💩🦞\n').encode("utf-8")) + + for null_obj in (None, float('nan'), pd.NA): + self.assertEqual( + _dataframe( + self.version, + pd.DataFrame({ + 'x': pd.Series(['a', null_obj], dtype=dtype), + 'y': [1, 2]}), + table_name='tbl1', symbols=[0], at=qi.ServerTimestamp), + b'tbl1,x=a y=1i\n' + + b'tbl1 y=2i\n') + + def test_obj_str_numpy_symbol(self): + self._test_pyobjstr_numpy_symbol('object') + + with self.assertRaisesRegex( + qi.IngressError, 'Expected a string, got an .* type int'): _dataframe( + self.version, pd.DataFrame({ - 'x': pd.Series(['a', null_obj], dtype=dtype), + 'x': pd.Series(['x', 42], dtype='object'), + 'y': [1, 2]}), + table_name='tbl1', symbols=[0], at=qi.ServerTimestamp) + + def test_obj_string_numpy_symbol(self): + self._test_pyobjstr_numpy_symbol('string') + + self.assertEqual( + _dataframe( + self.version, + pd.DataFrame({ + 'x': pd.Series(['x', 42], dtype='string'), 'y': [1, 2]}), table_name='tbl1', symbols=[0], at=qi.ServerTimestamp), - 'tbl1,x=a y=1i\n' + - 'tbl1 y=2i\n') - - def test_obj_str_numpy_symbol(self): - self._test_pyobjstr_numpy_symbol('object') - - with self.assertRaisesRegex( - qi.IngressError, 'Expected a string, got an .* type int'): - _dataframe( - pd.DataFrame({ - 'x': pd.Series(['x', 42], dtype='object'), - 'y': [1, 2]}), - table_name='tbl1', symbols=[0], at=qi.ServerTimestamp) - - def test_obj_string_numpy_symbol(self): - self._test_pyobjstr_numpy_symbol('string') - - self.assertEqual( - _dataframe( - pd.DataFrame({ - 'x': pd.Series(['x', 42], dtype='string'), - 'y': [1, 2]}), - table_name='tbl1', symbols=[0], at=qi.ServerTimestamp), - 'tbl1,x=x y=1i\n' + - 'tbl1,x=42 y=2i\n') - - def test_str_numpy_col(self): - df = pd.DataFrame({'a': pd.Series([ - 'a', # ASCII - 'q❤️p', # Mixed ASCII and UCS-2 - '❤️' * 1200, # Over the 1024 buffer prealloc. - 'Questo è un qualcosa', # Non-ASCII UCS-1 - 'щось', # UCS-2, 2 bytes for UTF-8. - '', # Empty string - '嚜꓂', # UCS-2, 3 bytes for UTF-8. - '💩🦞'], # UCS-4, 4 bytes for UTF-8. - dtype='str')}) - buf = _dataframe(df, table_name='tbl1', at=qi.ServerTimestamp) - self.assertEqual( - buf, - 'tbl1 a="a"\n' + - 'tbl1 a="q❤️p"\n' + - 'tbl1 a="' + ('❤️' * 1200) + '"\n' + - 'tbl1 a="Questo è un qualcosa"\n' + - 'tbl1 a="щось"\n' + - 'tbl1 a=""\n' + - 'tbl1 a="嚜꓂"\n' + - 'tbl1 a="💩🦞"\n') - - def test_str_arrow_table(self): - df = pd.DataFrame({ - '../bad col name/../it does not matter...': pd.Series([ - 'a', # ASCII - 'b' * 127, # Max table name length. - 'q❤️p', # Mixed ASCII and UCS-2 - '嚜꓂', # UCS-2, 3 bytes for UTF-8. - '💩🦞'], # UCS-4, 4 bytes for UTF-8. - dtype='string[pyarrow]'), - 'b': [1, 2, 3, 4, 5]}) - buf = _dataframe(df, table_name_col=0, at=qi.ServerTimestamp) - self.assertEqual( - buf, - 'a b=1i\n' + - ('b' * 127) + ' b=2i\n' + - 'q❤️p b=3i\n' + - '嚜꓂ b=4i\n' + - '💩🦞 b=5i\n') - - with self.assertRaisesRegex( - qi.IngressError, "Too long"): - _dataframe( - pd.DataFrame({ - 'a': pd.Series(['b' * 128], dtype='string[pyarrow]')}), - table_name_col='a', at = qi.ServerTimestamp) - - with self.assertRaisesRegex( - qi.IngressError, "Failed .*.*Table name cannot be null"): - _dataframe( - pd.DataFrame({ - '.': pd.Series(['x', None], dtype='string[pyarrow]'), - 'b': [1, 2]}), - table_name_col='.', at = qi.ServerTimestamp) - - with self.assertRaisesRegex( - qi.IngressError, "''.*must have a non-zero length"): - _dataframe( - pd.DataFrame({ - '/': pd.Series([''], dtype='string[pyarrow]')}), - table_name_col='/', at = qi.ServerTimestamp) - - with self.assertRaisesRegex( - qi.IngressError, "'tab..1'.*invalid dot `\\.` at position 4"): - _dataframe( - pd.DataFrame({ - '/': pd.Series(['tab..1'], dtype='string[pyarrow]')}), - table_name_col='/', at = qi.ServerTimestamp) - - def test_str_arrow_symbol(self): - df = pd.DataFrame({ - 'a': pd.Series([ - 'a', # ASCII - 'q❤️p', # Mixed ASCII and UCS-2 - '❤️' * 1200, # Over the 1024 buffer prealloc. - 'Questo è un qualcosa', # Non-ASCII UCS-1 - 'щось', # UCS-2, 2 bytes for UTF-8. - '', # Empty string - None, - '嚜꓂', # UCS-2, 3 bytes for UTF-8. - '💩🦞'], # UCS-4, 4 bytes for UTF-8. - dtype='string[pyarrow]'), - 'b': [1, 2, 3, 4, 5, 6, 7, 8, 9]}) - buf = _dataframe(df, table_name='tbl1', symbols=True, at = qi.ServerTimestamp) - self.assertEqual( - buf, - 'tbl1,a=a b=1i\n' + - 'tbl1,a=q❤️p b=2i\n' + - 'tbl1,a=' + ('❤️' * 1200) + ' b=3i\n' + - 'tbl1,a=Questo\\ è\\ un\\ qualcosa b=4i\n' + - 'tbl1,a=щось b=5i\n' + - 'tbl1,a= b=6i\n' + - 'tbl1 b=7i\n' + - 'tbl1,a=嚜꓂ b=8i\n' + - 'tbl1,a=💩🦞 b=9i\n') - - def test_str_arrow_col(self): - df = pd.DataFrame({ - 'a': pd.Series([ - 'a', # ASCII - 'q❤️p', # Mixed ASCII and UCS-2 - '❤️' * 1200, # Over the 1024 buffer prealloc. - 'Questo è un qualcosa', # Non-ASCII UCS-1 - 'щось', # UCS-2, 2 bytes for UTF-8. - '', # Empty string - None, - '嚜꓂', # UCS-2, 3 bytes for UTF-8. - '💩🦞'], # UCS-4, 4 bytes for UTF-8. - dtype='string[pyarrow]'), - 'b': [1, 2, 3, 4, 5, 6, 7, 8, 9]}) - buf = _dataframe(df, table_name='tbl1', symbols=False, at = qi.ServerTimestamp) - self.assertEqual( - buf, - 'tbl1 a="a",b=1i\n' + - 'tbl1 a="q❤️p",b=2i\n' + - 'tbl1 a="' + ('❤️' * 1200) + '",b=3i\n' + - 'tbl1 a="Questo è un qualcosa",b=4i\n' + - 'tbl1 a="щось",b=5i\n' + - 'tbl1 a="",b=6i\n' + - 'tbl1 b=7i\n' + - 'tbl1 a="嚜꓂",b=8i\n' + - 'tbl1 a="💩🦞",b=9i\n') - - def test_pyobj_int_col(self): - int64_min = -2**63 - int64_max = 2**63 - 1 - self.assertEqual( - _dataframe( - pd.DataFrame({ - 'a': pd.Series([ - 1, 2, 3, None, float('nan'), pd.NA, 7, - 0, - int64_min, - int64_max], dtype='object'), - 'b': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]}), - table_name='tbl1', at = qi.ServerTimestamp), - 'tbl1 a=1i,b=1i\n' + - 'tbl1 a=2i,b=2i\n' + - 'tbl1 a=3i,b=3i\n' + - 'tbl1 b=4i\n' + - 'tbl1 b=5i\n' + - 'tbl1 b=6i\n' + - 'tbl1 a=7i,b=7i\n' + - 'tbl1 a=0i,b=8i\n' + - 'tbl1 a=' + str(int64_min) + 'i,b=9i\n' + - 'tbl1 a=' + str(int64_max) + 'i,b=10i\n') - - with self.assertRaisesRegex( - qi.IngressError, "1 \\('STRING'\\): .*type int, got.*str\\."): - _dataframe( - pd.DataFrame({ - 'a': pd.Series([1, 'STRING'], dtype='object'), - 'b': [1, 2]}), - table_name='tbl1', at = qi.ServerTimestamp) - - out_of_range = [int64_min - 1, int64_max + 1] - for num in out_of_range: + b'tbl1,x=x y=1i\n' + + b'tbl1,x=42 y=2i\n') + + def test_str_numpy_col(self): + df = pd.DataFrame({'a': pd.Series([ + 'a', # ASCII + 'q❤️p', # Mixed ASCII and UCS-2 + '❤️' * 1200, # Over the 1024 buffer prealloc. + 'Questo è un qualcosa', # Non-ASCII UCS-1 + 'щось', # UCS-2, 2 bytes for UTF-8. + '', # Empty string + '嚜꓂', # UCS-2, 3 bytes for UTF-8. + '💩🦞'], # UCS-4, 4 bytes for UTF-8. + dtype='str')}) + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + ('tbl1 a="a"\n' + + 'tbl1 a="q❤️p"\n' + + 'tbl1 a="' + ('❤️' * 1200) + '"\n' + + 'tbl1 a="Questo è un qualcosa"\n' + + 'tbl1 a="щось"\n' + + 'tbl1 a=""\n' + + 'tbl1 a="嚜꓂"\n' + + 'tbl1 a="💩🦞"\n').encode("utf-8")) + + def test_str_arrow_table(self): + df = pd.DataFrame({ + '../bad col name/../it does not matter...': pd.Series([ + 'a', # ASCII + 'b' * 127, # Max table name length. + 'q❤️p', # Mixed ASCII and UCS-2 + '嚜꓂', # UCS-2, 3 bytes for UTF-8. + '💩🦞'], # UCS-4, 4 bytes for UTF-8. + dtype='string[pyarrow]'), + 'b': [1, 2, 3, 4, 5]}) + buf = _dataframe(self.version, df, table_name_col=0, at=qi.ServerTimestamp) + self.assertEqual( + buf, + ('a b=1i\n' + + ('b' * 127) + ' b=2i\n' + + 'q❤️p b=3i\n' + + '嚜꓂ b=4i\n' + + '💩🦞 b=5i\n').encode("utf-8")) + + with self.assertRaisesRegex( + qi.IngressError, "Too long"): + _dataframe( + self.version, + pd.DataFrame({ + 'a': pd.Series(['b' * 128], dtype='string[pyarrow]')}), + table_name_col='a', at = qi.ServerTimestamp) + + with self.assertRaisesRegex( + qi.IngressError, "Failed .*.*Table name cannot be null"): + _dataframe( + self.version, + pd.DataFrame({ + '.': pd.Series(['x', None], dtype='string[pyarrow]'), + 'b': [1, 2]}), + table_name_col='.', at = qi.ServerTimestamp) + + with self.assertRaisesRegex( + qi.IngressError, "''.*must have a non-zero length"): + _dataframe( + self.version, + pd.DataFrame({ + '/': pd.Series([''], dtype='string[pyarrow]')}), + table_name_col='/', at = qi.ServerTimestamp) + + with self.assertRaisesRegex( + qi.IngressError, "'tab..1'.*invalid dot `\\.` at position 4"): + _dataframe( + self.version, + pd.DataFrame({ + '/': pd.Series(['tab..1'], dtype='string[pyarrow]')}), + table_name_col='/', at = qi.ServerTimestamp) + + def test_str_arrow_symbol(self): + df = pd.DataFrame({ + 'a': pd.Series([ + 'a', # ASCII + 'q❤️p', # Mixed ASCII and UCS-2 + '❤️' * 1200, # Over the 1024 buffer prealloc. + 'Questo è un qualcosa', # Non-ASCII UCS-1 + 'щось', # UCS-2, 2 bytes for UTF-8. + '', # Empty string + None, + '嚜꓂', # UCS-2, 3 bytes for UTF-8. + '💩🦞'], # UCS-4, 4 bytes for UTF-8. + dtype='string[pyarrow]'), + 'b': [1, 2, 3, 4, 5, 6, 7, 8, 9]}) + buf = _dataframe(self.version, df, table_name='tbl1', symbols=True, at = qi.ServerTimestamp) + self.assertEqual( + buf, + ('tbl1,a=a b=1i\n' + + 'tbl1,a=q❤️p b=2i\n' + + 'tbl1,a=' + ('❤️' * 1200) + ' b=3i\n' + + 'tbl1,a=Questo\\ è\\ un\\ qualcosa b=4i\n' + + 'tbl1,a=щось b=5i\n' + + 'tbl1,a= b=6i\n' + + 'tbl1 b=7i\n' + + 'tbl1,a=嚜꓂ b=8i\n' + + 'tbl1,a=💩🦞 b=9i\n').encode('utf-8')) + + def test_str_arrow_col(self): + df = pd.DataFrame({ + 'a': pd.Series([ + 'a', # ASCII + 'q❤️p', # Mixed ASCII and UCS-2 + '❤️' * 1200, # Over the 1024 buffer prealloc. + 'Questo è un qualcosa', # Non-ASCII UCS-1 + 'щось', # UCS-2, 2 bytes for UTF-8. + '', # Empty string + None, + '嚜꓂', # UCS-2, 3 bytes for UTF-8. + '💩🦞'], # UCS-4, 4 bytes for UTF-8. + dtype='string[pyarrow]'), + 'b': [1, 2, 3, 4, 5, 6, 7, 8, 9]}) + buf = _dataframe(self.version, df, table_name='tbl1', symbols=False, at = qi.ServerTimestamp) + self.assertEqual( + buf, + ('tbl1 a="a",b=1i\n' + + 'tbl1 a="q❤️p",b=2i\n' + + 'tbl1 a="' + ('❤️' * 1200) + '",b=3i\n' + + 'tbl1 a="Questo è un qualcosa",b=4i\n' + + 'tbl1 a="щось",b=5i\n' + + 'tbl1 a="",b=6i\n' + + 'tbl1 b=7i\n' + + 'tbl1 a="嚜꓂",b=8i\n' + + 'tbl1 a="💩🦞",b=9i\n').encode('utf-8')) + + def test_pyobj_int_col(self): + int64_min = -2**63 + int64_max = 2**63 - 1 + self.assertEqual( + _dataframe( + self.version, + pd.DataFrame({ + 'a': pd.Series([ + 1, 2, 3, None, float('nan'), pd.NA, 7, + 0, + int64_min, + int64_max], dtype='object'), + 'b': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]}), + table_name='tbl1', at = qi.ServerTimestamp), + ('tbl1 a=1i,b=1i\n' + + 'tbl1 a=2i,b=2i\n' + + 'tbl1 a=3i,b=3i\n' + + 'tbl1 b=4i\n' + + 'tbl1 b=5i\n' + + 'tbl1 b=6i\n' + + 'tbl1 a=7i,b=7i\n' + + 'tbl1 a=0i,b=8i\n' + + 'tbl1 a=' + str(int64_min) + 'i,b=9i\n' + + 'tbl1 a=' + str(int64_max) + 'i,b=10i\n').encode('utf-8')) + + with self.assertRaisesRegex( + qi.IngressError, "1 \\('STRING'\\): .*type int, got.*str\\."): + _dataframe( + self.version, + pd.DataFrame({ + 'a': pd.Series([1, 'STRING'], dtype='object'), + 'b': [1, 2]}), + table_name='tbl1', at = qi.ServerTimestamp) + + out_of_range = [int64_min - 1, int64_max + 1] + for num in out_of_range: + with self.assertRaisesRegex( + qi.IngressError, "index 1 .*922337203685477.*int too big"): + _dataframe( + self.version, + pd.DataFrame({ + 'a': pd.Series([1, num], dtype='object'), + 'b': [1, 2]}), + table_name='tbl1', at = qi.ServerTimestamp) + + def test_pyobj_float_col(self): + self.assertEqual( + _dataframe( + self.version, + pd.DataFrame({ + 'a': pd.Series( + [1.0, 2.0, 3.0, None, float('nan'), pd.NA, 7.0], + dtype='object'), + 'b': [1, 2, 3, 4, 5, 6, 7]}), + table_name='tbl1', at = qi.ServerTimestamp), + b'tbl1 a' + _float_binary_bytes(1.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b=1i\n' + + b'tbl1 a' + _float_binary_bytes(2.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b=2i\n' + + b'tbl1 a' + _float_binary_bytes(3.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b=3i\n' + + b'tbl1 b=4i\n' + + b'tbl1 a' + _float_binary_bytes(float('NaN'), self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b=5i\n' + + b'tbl1 b=6i\n' + + b'tbl1 a' + _float_binary_bytes(7.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',b=7i\n') + with self.assertRaisesRegex( - qi.IngressError, "index 1 .*922337203685477.*int too big"): + qi.IngressError, "1 \\('STRING'\\): .*type float, got.*str\\."): _dataframe( + self.version, pd.DataFrame({ - 'a': pd.Series([1, num], dtype='object'), + 'a': pd.Series([1.0, 'STRING'], dtype='object'), 'b': [1, 2]}), table_name='tbl1', at = qi.ServerTimestamp) - def test_pyobj_float_col(self): - self.assertEqual( - _dataframe( - pd.DataFrame({ - 'a': pd.Series( - [1.0, 2.0, 3.0, None, float('nan'), pd.NA, 7.0], - dtype='object'), - 'b': [1, 2, 3, 4, 5, 6, 7]}), - table_name='tbl1', at = qi.ServerTimestamp), - 'tbl1 a=1.0,b=1i\n' + - 'tbl1 a=2.0,b=2i\n' + - 'tbl1 a=3.0,b=3i\n' + - 'tbl1 b=4i\n' + - 'tbl1 a=NaN,b=5i\n' + - 'tbl1 b=6i\n' + - 'tbl1 a=7.0,b=7i\n') - - with self.assertRaisesRegex( - qi.IngressError, "1 \\('STRING'\\): .*type float, got.*str\\."): - _dataframe( - pd.DataFrame({ - 'a': pd.Series([1.0, 'STRING'], dtype='object'), - 'b': [1, 2]}), - table_name='tbl1', at = qi.ServerTimestamp) - - def test_bad_category(self): - # We only support string categories - # (unless anyone asks for additional ones). - # We want to test others are rejected. - with self.assertRaisesRegex( - qi.IngressError, "Bad column 'a'.*got a category of .*int64"): - _dataframe( - pd.DataFrame({'a': pd.Series([1, 2, 3, 2], dtype='category')}), - table_name='tbl1', at = qi.ServerTimestamp) - - def _test_cat_table(self, count): - slist = [f's{i}' for i in range(count)] - - df = pd.DataFrame({ - 'a': pd.Series(slist, dtype='category'), - 'b': list(range(len(slist)))}) - - buf = _dataframe(df, table_name_col=0, at = qi.ServerTimestamp) - exp = ''.join( - f'{s} b={i}i\n' - for i, s in enumerate(slist)) - self.assertEqual(buf, exp) - - slist[2] = None - df2 = pd.DataFrame({ - 'a': pd.Series(slist, dtype='category'), - 'b': list(range(len(slist)))}) - with self.assertRaisesRegex( - qi.IngressError, 'Table name cannot be null'): - _dataframe(df2, table_name_col=0, at = qi.ServerTimestamp) - - def test_cat_i8_table(self): - self._test_cat_table(30) - self._test_cat_table(127) - - def test_cat_i16_table(self): - self._test_cat_table(128) - self._test_cat_table(4000) - self._test_cat_table(32767) - - def test_cat_i32_table(self): - self._test_cat_table(32768) - self._test_cat_table(40000) - - def _test_cat_symbol(self, count): - slist = [f's{i}' for i in range(count)] - - df = pd.DataFrame({ - 'a': pd.Series(slist, dtype='category'), - 'b': list(range(len(slist)))}) - - buf = _dataframe(df, table_name='tbl1', symbols=True, at = qi.ServerTimestamp) - exp = ''.join( - f'tbl1,a={s} b={i}i\n' - for i, s in enumerate(slist)) - self.assertEqual(buf, exp) - - slist[2] = None - df2 = pd.DataFrame({ - 'a': pd.Series(slist, dtype='category'), - 'b': list(range(len(slist)))}) - - exp2 = exp.replace('tbl1,a=s2 b=2i\n', 'tbl1 b=2i\n') - buf2 = _dataframe(df2, table_name='tbl1', symbols=True, at = qi.ServerTimestamp) - self.assertEqual(buf2, exp2) - - def test_cat_i8_symbol(self): - self._test_cat_symbol(30) - self._test_cat_symbol(127) - - def test_cat_i16_symbol(self): - self._test_cat_symbol(128) - self._test_cat_symbol(4000) - self._test_cat_symbol(32767) - - def test_cat_i32_symbol(self): - self._test_cat_symbol(32768) - self._test_cat_symbol(40000) - - def _test_cat_str(self, count): - slist = [f's{i}' for i in range(count)] - - df = pd.DataFrame({ - 'a': pd.Series(slist, dtype='category'), - 'b': list(range(len(slist)))}) - - buf = _dataframe(df, table_name='tbl1', symbols=False, at = qi.ServerTimestamp) - exp = ''.join( - f'tbl1 a="{s}",b={i}i\n' - for i, s in enumerate(slist)) - self.assertEqual(buf, exp) - - slist[2] = None - df2 = pd.DataFrame({ - 'a': pd.Series(slist, dtype='category'), - 'b': list(range(len(slist)))}) - - exp2 = exp.replace('tbl1 a="s2",b=2i\n', 'tbl1 b=2i\n') - buf2 = _dataframe(df2, table_name='tbl1', symbols=False, at = qi.ServerTimestamp) - self.assertEqual(buf2, exp2) - - def test_cat_i8_str(self): - self._test_cat_str(30) - self._test_cat_str(127) - - def test_cat_i16_str(self): - self._test_cat_str(128) - self._test_cat_str(4000) - self._test_cat_str(32767) - - def test_cat_i32_str(self): - self._test_cat_str(32768) - self._test_cat_str(40000) - - def test_all_nulls_pyobj_col(self): - df = pd.DataFrame({ - 'a': [None, pd.NA, float('nan')], - 'b': [1, 2, 3]}) - buf = _dataframe(df, table_name='tbl1', at = qi.ServerTimestamp) - self.assertEqual( - buf, - 'tbl1 b=1i\n' + - 'tbl1 b=2i\n' + - 'tbl1 b=3i\n') - - def test_strided_numpy_column(self): - two_d = np.array([ - [1, 10], - [2, 20], - [3, 30]], dtype='int64') - col2 = two_d[:, 1] - col2.flags['WRITEABLE'] = False - - # Checking our test case setup. - mv = memoryview(col2) - self.assertEqual(mv.contiguous, False) - self.assertEqual(mv.strides, (16,)) - - df = pd.DataFrame(col2, copy=False) - df.columns = ['a'] - - with self.assertRaisesRegex( - qi.IngressError, "Bad column 'a': .*not.*contiguous"): - _dataframe(df, table_name='tbl1', at = qi.ServerTimestamp) - - def test_serializing_in_chunks(self): - df = pd.DataFrame({ - 'a': pd.Series(np.arange(30), dtype='int64'), - 'b': pd.Series(np.arange(30), dtype='Int64')}) - parts = [ - df.iloc[:10], - df.iloc[10:20], - df.iloc[20:]] - for index, part in enumerate(parts): - buf = _dataframe(part, table_name='tbl1', at = qi.ServerTimestamp) + def test_bad_category(self): + # We only support string categories + # (unless anyone asks for additional ones). + # We want to test others are rejected. + with self.assertRaisesRegex( + qi.IngressError, "Bad column 'a'.*got a category of .*int64"): + _dataframe( + self.version, + pd.DataFrame({'a': pd.Series([1, 2, 3, 2], dtype='category')}), + table_name='tbl1', at = qi.ServerTimestamp) + + def _test_cat_table(self, count): + slist = [f's{i}' for i in range(count)] + + df = pd.DataFrame({ + 'a': pd.Series(slist, dtype='category'), + 'b': list(range(len(slist)))}) + + buf = _dataframe(self.version, df, table_name_col=0, at = qi.ServerTimestamp) + exp = ''.join( + f'{s} b={i}i\n' + for i, s in enumerate(slist)) + self.assertEqual(buf, exp.encode("utf-8")) + + slist[2] = None + df2 = pd.DataFrame({ + 'a': pd.Series(slist, dtype='category'), + 'b': list(range(len(slist)))}) + with self.assertRaisesRegex( + qi.IngressError, 'Table name cannot be null'): + _dataframe(self.version, df2, table_name_col=0, at = qi.ServerTimestamp) + + def test_cat_i8_table(self): + self._test_cat_table(30) + self._test_cat_table(127) + + def test_cat_i16_table(self): + self._test_cat_table(128) + self._test_cat_table(4000) + self._test_cat_table(32767) + + def test_cat_i32_table(self): + self._test_cat_table(32768) + self._test_cat_table(40000) + + def _test_cat_symbol(self, count): + slist = [f's{i}' for i in range(count)] + + df = pd.DataFrame({ + 'a': pd.Series(slist, dtype='category'), + 'b': list(range(len(slist)))}) + + buf = _dataframe(self.version, df, table_name='tbl1', symbols=True, at = qi.ServerTimestamp) exp = ''.join( - f'tbl1 a={i}i,b={i}i\n' - for i in range(index * 10, (index + 1) * 10)) + f'tbl1,a={s} b={i}i\n' + for i, s in enumerate(slist)) + self.assertEqual(buf, exp.encode("utf-8")) + + slist[2] = None + df2 = pd.DataFrame({ + 'a': pd.Series(slist, dtype='category'), + 'b': list(range(len(slist)))}) + + exp2 = exp.replace('tbl1,a=s2 b=2i\n', 'tbl1 b=2i\n') + buf2 = _dataframe(self.version, df2, table_name='tbl1', symbols=True, at = qi.ServerTimestamp) + self.assertEqual(buf2, exp2.encode("utf-8")) + + def test_cat_i8_symbol(self): + self._test_cat_symbol(30) + self._test_cat_symbol(127) + + def test_cat_i16_symbol(self): + self._test_cat_symbol(128) + self._test_cat_symbol(4000) + self._test_cat_symbol(32767) + + def test_cat_i32_symbol(self): + self._test_cat_symbol(32768) + self._test_cat_symbol(40000) + + def _test_cat_str(self, count): + slist = [f's{i}' for i in range(count)] + + df = pd.DataFrame({ + 'a': pd.Series(slist, dtype='category'), + 'b': list(range(len(slist)))}) + + buf = _dataframe(self.version, df, table_name='tbl1', symbols=False, at = qi.ServerTimestamp) + exp = ''.join( + f'tbl1 a="{s}",b={i}i\n' + for i, s in enumerate(slist)) + self.assertEqual(buf, exp.encode("utf-8")) + + slist[2] = None + df2 = pd.DataFrame({ + 'a': pd.Series(slist, dtype='category'), + 'b': list(range(len(slist)))}) + + exp2 = exp.replace('tbl1 a="s2",b=2i\n', 'tbl1 b=2i\n') + buf2 = _dataframe(self.version, df2, table_name='tbl1', symbols=False, at = qi.ServerTimestamp) + self.assertEqual(buf2, exp2.encode("utf-8")) + + def test_cat_i8_str(self): + self._test_cat_str(30) + self._test_cat_str(127) + + def test_cat_i16_str(self): + self._test_cat_str(128) + self._test_cat_str(4000) + self._test_cat_str(32767) + + def test_cat_i32_str(self): + self._test_cat_str(32768) + self._test_cat_str(40000) + + def test_all_nulls_pyobj_col(self): + df = pd.DataFrame({ + 'a': [None, pd.NA, float('nan')], + 'b': [1, 2, 3]}) + buf = _dataframe(self.version, df, table_name='tbl1', at = qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 b=1i\n' + + b'tbl1 b=2i\n' + + b'tbl1 b=3i\n') + + def test_strided_numpy_column(self): + two_d = np.array([ + [1, 10], + [2, 20], + [3, 30]], dtype='int64') + col2 = two_d[:, 1] + col2.flags['WRITEABLE'] = False + + # Checking our test case setup. + mv = memoryview(col2) + self.assertEqual(mv.contiguous, False) + self.assertEqual(mv.strides, (16,)) + + df = pd.DataFrame(col2, copy=False) + df.columns = ['a'] + + with self.assertRaisesRegex( + qi.IngressError, "Bad column 'a': .*not.*contiguous"): + _dataframe(self.version, df, table_name='tbl1', at = qi.ServerTimestamp) + + def test_serializing_in_chunks(self): + df = pd.DataFrame({ + 'a': pd.Series(np.arange(30), dtype='int64'), + 'b': pd.Series(np.arange(30), dtype='Int64')}) + parts = [ + df.iloc[:10], + df.iloc[10:20], + df.iloc[20:]] + for index, part in enumerate(parts): + buf = _dataframe(self.version, part, table_name='tbl1', at = qi.ServerTimestamp) + exp = ''.join( + f'tbl1 a={i}i,b={i}i\n' + for i in range(index * 10, (index + 1) * 10)) + self.assertEqual(buf, exp.encode("utf-8")) + + def test_arrow_chunked_array(self): + # We build a table with chunked arrow arrays as columns. + chunks_a = [ + pa.array([1, 2, 3], type=pa.int16()), + pa.array([4, 5, 6], type=pa.int16()), + pa.array([], type=pa.int16()), + pa.array([7, 8, 9], type=pa.int16())] + chunked_a = pa.chunked_array(chunks_a) + chunks_b = [ + pa.array([10, 20], type=pa.int32()), + pa.array([], type=pa.int32()), + pa.array([30, 40, 50, 60], type=pa.int32()), + pa.array([70, 80, 90], type=pa.int32())] + chunked_b = pa.chunked_array(chunks_b) + arr_tab = pa.Table.from_arrays([chunked_a, chunked_b], names=['a', 'b']) + + # NOTE! + # This does *not* preserve the chunking of the arrow arrays. + df = arr_tab.to_pandas() + buf = _dataframe(self.version, df, table_name='tbl1', at = qi.ServerTimestamp) + exp = ( + b'tbl1 a=1i,b=10i\n' + + b'tbl1 a=2i,b=20i\n' + + b'tbl1 a=3i,b=30i\n' + + b'tbl1 a=4i,b=40i\n' + + b'tbl1 a=5i,b=50i\n' + + b'tbl1 a=6i,b=60i\n' + + b'tbl1 a=7i,b=70i\n' + + b'tbl1 a=8i,b=80i\n' + + b'tbl1 a=9i,b=90i\n') self.assertEqual(buf, exp) - def test_arrow_chunked_array(self): - # We build a table with chunked arrow arrays as columns. - chunks_a = [ - pa.array([1, 2, 3], type=pa.int16()), - pa.array([4, 5, 6], type=pa.int16()), - pa.array([], type=pa.int16()), - pa.array([7, 8, 9], type=pa.int16())] - chunked_a = pa.chunked_array(chunks_a) - chunks_b = [ - pa.array([10, 20], type=pa.int32()), - pa.array([], type=pa.int32()), - pa.array([30, 40, 50, 60], type=pa.int32()), - pa.array([70, 80, 90], type=pa.int32())] - chunked_b = pa.chunked_array(chunks_b) - arr_tab = pa.Table.from_arrays([chunked_a, chunked_b], names=['a', 'b']) - - # NOTE! - # This does *not* preserve the chunking of the arrow arrays. - df = arr_tab.to_pandas() - buf = _dataframe(df, table_name='tbl1', at = qi.ServerTimestamp) - exp = ( - 'tbl1 a=1i,b=10i\n' + - 'tbl1 a=2i,b=20i\n' + - 'tbl1 a=3i,b=30i\n' + - 'tbl1 a=4i,b=40i\n' + - 'tbl1 a=5i,b=50i\n' + - 'tbl1 a=6i,b=60i\n' + - 'tbl1 a=7i,b=70i\n' + - 'tbl1 a=8i,b=80i\n' + - 'tbl1 a=9i,b=90i\n') - self.assertEqual(buf, exp) - - if not hasattr(pd, 'ArrowDtype'): - # We don't have pandas ArrowDtype, so we can't test the rest. - return - - # To preserve the chunking we need to use a special pandas type: - pandarrow_a = pd.array(chunked_a, dtype='int16[pyarrow]') - pandarrow_b = pd.array(chunked_b, dtype='int32[pyarrow]') - df = pd.DataFrame({'a': pandarrow_a, 'b': pandarrow_b}) - - # Note that this dtype is experimental (currently), - # so we don't support it yet.. but we have everything in place should we - # need to, so - as for now - we just test that we raise a nice error. - with self.assertRaisesRegex( - qi.IngressError, - "Unsupported dtype int16\[pyarrow\] for column 'a'.*github"): - _dataframe(df, table_name='tbl1', at = qi.ServerTimestamp) - - @unittest.skipIf(not fastparquet, 'fastparquet not installed') - @with_tmp_dir - def test_parquet_roundtrip(self, tmpdir): - pa_parquet_path = tmpdir / 'test_pa.parquet' - fp_parquet_path = tmpdir / 'test_fp.parquet' - df = pd.DataFrame({ - 's': pd.Categorical(['a', 'b', 'a', 'c', 'a']), - 'a': pd.Series([1, 2, 3, 4, 5], dtype='int16'), - 'b': pd.Series([10, 20, 30, None, 50], dtype='UInt8'), - 'c': [0.5, float('nan'), 2.5, 3.5, None]}) - df.to_parquet(pa_parquet_path, engine='pyarrow') - df.to_parquet(fp_parquet_path, engine='fastparquet') - pa2pa_df = pd.read_parquet(pa_parquet_path, engine='pyarrow') - pa2fp_df = pd.read_parquet(pa_parquet_path, engine='fastparquet') - fp2pa_df = pd.read_parquet(fp_parquet_path, engine='pyarrow') - fp2fp_df = pd.read_parquet(fp_parquet_path, engine='fastparquet') - - exp_dtypes = ['category', 'int16', 'UInt8', 'float64'] - self.assertEqual(list(df.dtypes), exp_dtypes) - - def df_eq(exp_df, deser_df, exp_dtypes): - self.assertEqual(list(deser_df.dtypes), exp_dtypes) - if not exp_df.equals(deser_df): - print('\nexp_df:') - print(exp_df) - print('\ndeser_df:') - print(deser_df) - self.assertTrue(exp_df.equals(deser_df)) - - # fastparquet doesn't roundtrip with pyarrow parquet properly. - # It decays categories to object and UInt8 to float64. - # We need to set up special case expected results for that. - fallback_exp_dtypes = [ - np.dtype('O'), - np.dtype('int16'), - np.dtype('float64'), - np.dtype('float64')] - fallback_df = df.astype({'s': 'object', 'b': 'float64'}) - - df_eq(df, pa2pa_df, exp_dtypes) - df_eq(df, pa2fp_df, exp_dtypes) - df_eq(fallback_df, fp2pa_df, fallback_exp_dtypes) - df_eq(df, fp2fp_df, exp_dtypes) - - exp = ( - 'tbl1,s=a a=1i,b=10i,c=0.5\n' + - 'tbl1,s=b a=2i,b=20i,c=NaN\n' + - 'tbl1,s=a a=3i,b=30i,c=2.5\n' + - 'tbl1,s=c a=4i,c=3.5\n' + - 'tbl1,s=a a=5i,b=50i,c=NaN\n') - - fallback_exp = ( - 'tbl1 s="a",a=1i,b=10.0,c=0.5\n' + - 'tbl1 s="b",a=2i,b=20.0,c=NaN\n' + - 'tbl1 s="a",a=3i,b=30.0,c=2.5\n' + - 'tbl1 s="c",a=4i,b=NaN,c=3.5\n' + - 'tbl1 s="a",a=5i,b=50.0,c=NaN\n') - - self.assertEqual(_dataframe(df, table_name='tbl1', at=qi.ServerTimestamp), exp) - self.assertEqual(_dataframe(pa2pa_df, table_name='tbl1', at=qi.ServerTimestamp), exp) - self.assertEqual(_dataframe(pa2fp_df, table_name='tbl1', at=qi.ServerTimestamp), exp) - self.assertEqual(_dataframe(fp2pa_df, table_name='tbl1', at=qi.ServerTimestamp), fallback_exp) - self.assertEqual(_dataframe(fp2fp_df, table_name='tbl1', at=qi.ServerTimestamp), exp) + if not hasattr(pd, 'ArrowDtype'): + # We don't have pandas ArrowDtype, so we can't test the rest. + return + # To preserve the chunking we need to use a special pandas type: + pandarrow_a = pd.array(chunked_a, dtype='int16[pyarrow]') + pandarrow_b = pd.array(chunked_b, dtype='int32[pyarrow]') + df = pd.DataFrame({'a': pandarrow_a, 'b': pandarrow_b}) + + # Note that this dtype is experimental (currently), + # so we don't support it yet.. but we have everything in place should we + # need to, so - as for now - we just test that we raise a nice error. + with self.assertRaisesRegex( + qi.IngressError, + "Unsupported dtype int16\[pyarrow\] for column 'a'.*github"): + _dataframe(self.version, df, table_name='tbl1', at = qi.ServerTimestamp) + + @unittest.skipIf(not fastparquet, 'fastparquet not installed') + @with_tmp_dir + def test_parquet_roundtrip(self, tmpdir): + pa_parquet_path = tmpdir / 'test_pa.parquet' + fp_parquet_path = tmpdir / 'test_fp.parquet' + df = pd.DataFrame({ + 's': pd.Categorical(['a', 'b', 'a', 'c', 'a']), + 'a': pd.Series([1, 2, 3, 4, 5], dtype='int16'), + 'b': pd.Series([10, 20, 30, None, 50], dtype='UInt8'), + 'c': [0.5, float('nan'), 2.5, 3.5, None]}) + df.to_parquet(pa_parquet_path, engine='pyarrow') + df.to_parquet(fp_parquet_path, engine='fastparquet') + pa2pa_df = pd.read_parquet(pa_parquet_path, engine='pyarrow') + pa2fp_df = pd.read_parquet(pa_parquet_path, engine='fastparquet') + fp2pa_df = pd.read_parquet(fp_parquet_path, engine='pyarrow') + fp2fp_df = pd.read_parquet(fp_parquet_path, engine='fastparquet') + + exp_dtypes = ['category', 'int16', 'UInt8', 'float64'] + self.assertEqual(list(df.dtypes), exp_dtypes) + + def df_eq(exp_df, deser_df, exp_dtypes): + self.assertEqual(list(deser_df.dtypes), exp_dtypes) + if not exp_df.equals(deser_df): + print('\nexp_df:') + print(exp_df) + print('\ndeser_df:') + print(deser_df) + self.assertTrue(exp_df.equals(deser_df)) + + # fastparquet doesn't roundtrip with pyarrow parquet properly. + # It decays categories to object and UInt8 to float64. + # We need to set up special case expected results for that. + fallback_exp_dtypes = [ + np.dtype('O'), + np.dtype('int16'), + np.dtype('float64'), + np.dtype('float64')] + fallback_df = df.astype({'s': 'object', 'b': 'float64'}) + + df_eq(df, pa2pa_df, exp_dtypes) + df_eq(df, pa2fp_df, exp_dtypes) + df_eq(fallback_df, fp2pa_df, fallback_exp_dtypes) + df_eq(df, fp2fp_df, exp_dtypes) + + exp = ( + b'tbl1,s=a a=1i,b=10i,c' + _float_binary_bytes(0.5, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n' + + b'tbl1,s=b a=2i,b=20i,c' + _float_binary_bytes(float('NaN'), self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n' + + b'tbl1,s=a a=3i,b=30i,c' + _float_binary_bytes(2.5, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n' + + b'tbl1,s=c a=4i,c' + _float_binary_bytes(3.5, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n' + + b'tbl1,s=a a=5i,b=50i,c' + _float_binary_bytes(float('NaN'), self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n') + + fallback_exp = ( + b'tbl1 s="a",a=1i,b' + _float_binary_bytes(10.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',c' + + _float_binary_bytes(0.5, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n' + + b'tbl1 s="b",a=2i,b' + _float_binary_bytes(20.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',c' + + _float_binary_bytes(float('NaN'), self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n' + + b'tbl1 s="a",a=3i,b' + _float_binary_bytes(30.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',c' + + _float_binary_bytes(2.5, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n' + + b'tbl1 s="c",a=4i,b' + _float_binary_bytes(float('NaN'), self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',c' + + _float_binary_bytes(3.5, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n' + + b'tbl1 s="a",a=5i,b' + _float_binary_bytes(50.0, self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b',c' + + _float_binary_bytes(float('NaN'), self.version == qi.LineProtocolVersion.LineProtocolVersionV1) + b'\n') + + self.assertEqual(_dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp), exp) + self.assertEqual(_dataframe(self.version, pa2pa_df, table_name='tbl1', at=qi.ServerTimestamp), exp) + self.assertEqual(_dataframe(self.version, pa2fp_df, table_name='tbl1', at=qi.ServerTimestamp), exp) + self.assertEqual(_dataframe(self.version, fp2pa_df, table_name='tbl1', at=qi.ServerTimestamp), fallback_exp) + self.assertEqual(_dataframe(self.version, fp2fp_df, table_name='tbl1', at=qi.ServerTimestamp), exp) + + def test_f64_np_array(self): + df = pd.DataFrame({ + 'a': [np.array([1.0], np.float64), np.array([2.0], np.float64), np.array([3.0], np.float64)]}) + + if self.version == qi.LineProtocolVersion.LineProtocolVersionV1: + with self.assertRaisesRegex( + qi.IngressError, + "line protocol version v1 does not support array datatype"): + _ = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + else: + buf = _dataframe(self.version, df, table_name='tbl1', at=qi.ServerTimestamp) + self.assertEqual( + buf, + b'tbl1 a=' + _array_binary_bytes(np.array([1.0], np.float64)) + b'\n' + + b'tbl1 a=' + _array_binary_bytes(np.array([2.0], np.float64)) + b'\n' + + b'tbl1 a=' + _array_binary_bytes(np.array([3.0], np.float64)) + b'\n') + +class TestPandasLineProtocolVersionV1(TestPandasBase.TestPandas): + name = 'init' + version = qi.LineProtocolVersion.LineProtocolVersionV1 + +class TestPandasLineProtocolVersionV2(TestPandasBase.TestPandas): + name = 'init' + version = qi.LineProtocolVersion.LineProtocolVersionV2 if __name__ == '__main__': if os.environ.get('TEST_QUESTDB_PROFILE') == '1':