Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python DB API 2.0 driver #30

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions chdb/dbapi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from .converters import escape_dict, escape_sequence, escape_string
from .constants import FIELD_TYPE
from .err import (
Warning, Error, InterfaceError, DataError,
DatabaseError, OperationalError, IntegrityError, InternalError,
NotSupportedError, ProgrammingError)
from . import connections as _orig_conn

VERSION = (0, 1, 0, None)
if VERSION[3] is not None:
VERSION_STRING = "%d.%d.%d_%s" % VERSION
else:
VERSION_STRING = "%d.%d.%d" % VERSION[:3]

threadsafety = 1
apilevel = "2.0"
paramstyle = "format"


class DBAPISet(frozenset):

def __ne__(self, other):
if isinstance(other, set):
return frozenset.__ne__(self, other)
else:
return other not in self

def __eq__(self, other):
if isinstance(other, frozenset):
return frozenset.__eq__(self, other)
else:
return other in self

def __hash__(self):
return frozenset.__hash__(self)


# TODO it's in pep249 find out meaning and usage of this
# https://www.python.org/dev/peps/pep-0249/#string
STRING = DBAPISet([FIELD_TYPE.ENUM, FIELD_TYPE.STRING,
FIELD_TYPE.VAR_STRING])
BINARY = DBAPISet([FIELD_TYPE.BLOB, FIELD_TYPE.LONG_BLOB,
FIELD_TYPE.MEDIUM_BLOB, FIELD_TYPE.TINY_BLOB])
NUMBER = DBAPISet([FIELD_TYPE.DECIMAL, FIELD_TYPE.DOUBLE, FIELD_TYPE.FLOAT,
FIELD_TYPE.INT24, FIELD_TYPE.LONG, FIELD_TYPE.LONGLONG,
FIELD_TYPE.TINY, FIELD_TYPE.YEAR])
DATE = DBAPISet([FIELD_TYPE.DATE, FIELD_TYPE.NEWDATE])
TIME = DBAPISet([FIELD_TYPE.TIME])
TIMESTAMP = DBAPISet([FIELD_TYPE.TIMESTAMP, FIELD_TYPE.DATETIME])
DATETIME = TIMESTAMP
ROWID = DBAPISet()


def Binary(x):
"""Return x as a binary type."""
return bytes(x)


def Connect(*args, **kwargs):
"""
Connect to the database; see connections.Connection.__init__() for
more information.
"""
from .connections import Connection
return Connection(*args, **kwargs)


if _orig_conn.Connection.__init__.__doc__ is not None:
Connect.__doc__ = _orig_conn.Connection.__init__.__doc__
del _orig_conn


def get_client_info(): # for MySQLdb compatibility
version = VERSION
if VERSION[3] is None:
version = VERSION[:3]
return '.'.join(map(str, version))


connect = Connection = Connect

NULL = "NULL"

__version__ = get_client_info()
206 changes: 206 additions & 0 deletions chdb/dbapi/connections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
import json
from . import err
from .cursors import Cursor
from . import converters

DEBUG = False
VERBOSE = False


class Connection(object):
"""
Representation of a connection with chdb.

The proper way to get an instance of this class is to call
connect().

Accepts several arguments:

:param cursorclass: Custom cursor class to use.

See `Connection <https://www.python.org/dev/peps/pep-0249/#connection-objects>`_ in the
specification.
"""

_closed = False

def __init__(self, cursorclass=Cursor):

self._resp = None

# 1. pre-process params in init
self.encoding = 'utf8'

self.cursorclass = cursorclass

self._result = None
self._affected_rows = 0

self.connect()

def connect(self):
self._closed = False
self._execute_command("select 1;")
self._read_query_result()

def close(self):
"""
Send the quit message and close the socket.

See `Connection.close() <https://www.python.org/dev/peps/pep-0249/#Connection.close>`_
in the specification.

:raise Error: If the connection is already closed.
"""
if self._closed:
raise err.Error("Already closed")
self._closed = True

@property
def open(self):
"""Return True if the connection is open"""
return not self._closed

def commit(self):
"""
Commit changes to stable storage.

See `Connection.commit() <https://www.python.org/dev/peps/pep-0249/#commit>`_
in the specification.
"""
return

def rollback(self):
"""
Roll back the current transaction.

See `Connection.rollback() <https://www.python.org/dev/peps/pep-0249/#rollback>`_
in the specification.
"""
return

def cursor(self, cursor=None):
"""
Create a new cursor to execute queries with.

:param cursor: The type of cursor to create; current only :py:class:`Cursor`
None means use Cursor.
"""
if cursor:
return cursor(self)
return self.cursorclass(self)

# The following methods are INTERNAL USE ONLY (called from Cursor)
def query(self, sql):
if isinstance(sql, str):
sql = sql.encode(self.encoding, 'surrogateescape')
self._execute_command(sql)
self._affected_rows = self._read_query_result()
return self._affected_rows

def _execute_command(self, sql):
"""
:raise InterfaceError: If the connection is closed.
:raise ValueError: If no username was specified.
"""
if self._closed:
raise err.InterfaceError("Connection closed")

if isinstance(sql, str):
sql = sql.encode(self.encoding)

if isinstance(sql, bytearray):
sql = bytes(sql)

# drop last command return
if self._resp is not None:
self._resp = None

if DEBUG:
print("DEBUG: query:", sql)
try:
import chdb
self._resp = chdb.query(sql, output_format="JSON").data()
except Exception as error:
raise err.InterfaceError("query err: %s" % error)

def escape(self, obj, mapping=None):
"""Escape whatever value you pass to it.

Non-standard, for internal use; do not use this in your applications.
"""
if isinstance(obj, str):
return "'" + self.escape_string(obj) + "'"
if isinstance(obj, (bytes, bytearray)):
ret = self._quote_bytes(obj)
return ret
return converters.escape_item(obj, mapping=mapping)

def escape_string(self, s):
return converters.escape_string(s)

def _quote_bytes(self, s):
return converters.escape_bytes(s)

def _read_query_result(self):
self._result = None
result = CHDBResult(self)
result.read()
self._result = result
return result.affected_rows

def __enter__(self):
"""Context manager that returns a Cursor"""
return self.cursor()

def __exit__(self, exc, value, traceback):
"""On successful exit, commit. On exception, rollback"""
if exc:
self.rollback()
else:
self.commit()

@property
def resp(self):
return self._resp


class CHDBResult(object):
def __init__(self, connection):
"""
:type connection: Connection
"""
self.connection = connection
self.affected_rows = 0
self.insert_id = None
self.warning_count = 0
self.message = None
self.field_count = 0
self.description = None
self.rows = None
self.has_next = None

def read(self):
try:
data = json.loads(self.connection.resp)
except Exception as error:
raise err.InterfaceError("Unsupported response format:" % error)

try:
self.field_count = len(data["meta"])
description = []
for meta in data["meta"]:
fields = [meta["name"], meta["type"]]
description.append(tuple(fields))
self.description = tuple(description)

rows = []
for line in data["data"]:
row = []
for i in range(self.field_count):
column_data = converters.convert_column_data(self.description[i][1], line[self.description[i][0]])
row.append(column_data)
rows.append(tuple(row))
self.rows = tuple(rows)
except Exception as error:
raise err.InterfaceError("Read return data err:" % error)
32 changes: 32 additions & 0 deletions chdb/dbapi/constants/FIELD_TYPE.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
DECIMAL = 0
TINY = 1
SHORT = 2
LONG = 3
FLOAT = 4
DOUBLE = 5
NULL = 6
TIMESTAMP = 7
LONGLONG = 8
INT24 = 9
DATE = 10
TIME = 11
DATETIME = 12
YEAR = 13
NEWDATE = 14
VARCHAR = 15
BIT = 16
JSON = 245
NEWDECIMAL = 246
ENUM = 247
SET = 248
TINY_BLOB = 249
MEDIUM_BLOB = 250
LONG_BLOB = 251
BLOB = 252
VAR_STRING = 253
STRING = 254
GEOMETRY = 255

CHAR = TINY
INTERVAL = ENUM

Empty file.
Loading