Skip to content

Commit

Permalink
MAINT: remove hard dependency on odo
Browse files Browse the repository at this point in the history
Odo is currently a hard dependency of warp_prism to convert the sqlalchemy types
into a numpy dtypes. Odo is no longer actively maintained and breaks with newer
versions of pandas. This change reimplements the needed functionality in
warp_prism directly without using odo. This PR does leave the odo edge
registration code so that existing users don't see a change in functionality.
  • Loading branch information
Joe Jevnik committed Jan 8, 2020
1 parent dbd61bf commit ed0ecf0
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 16 deletions.
5 changes: 2 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,16 @@
),
],
install_requires=[
'datashape',
'numpy',
'pandas',
'sqlalchemy',
'psycopg2',
'odo',
'toolz',
'networkx<=1.11',
],
extras_require={
'dev': [
'odo',
'networkx<=1.11',
'flake8==3.3.0',
'pycodestyle==2.3.1',
'pyflakes==1.5.0',
Expand Down
120 changes: 107 additions & 13 deletions warp_prism/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from io import BytesIO
import numbers

from datashape import discover
from datashape.predicates import istabular
import numpy as np
from odo import convert
import pandas as pd
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql as _postgresql
from sqlalchemy.ext.compiler import compiles
from toolz import keymap

from ._warp_prism import (
raw_to_arrays as _raw_to_arrays,
Expand All @@ -18,7 +16,7 @@
__version__ = '0.1.1'


_typeid_map = keymap(np.dtype, _raw_typeid_map)
_typeid_map = {np.dtype(k): v for k, v in _raw_typeid_map.items()}
_object_type_id = _raw_typeid_map['object']


Expand Down Expand Up @@ -66,14 +64,107 @@ def _compile_copy_to_binary_postgres(element, compiler, **kwargs):
)


types = {np.dtype(k): v for k, v in {
'i8': sa.BigInteger,
'i4': sa.Integer,
'i2': sa.SmallInteger,
'f4': sa.REAL,
'f8': sa.FLOAT,
'O': sa.Text,
'M8[D]': sa.Date,
'M8[us]': sa.DateTime,
'?': sa.Boolean,
"m8[D]": sa.Interval(second_precision=0, day_precision=9),
"m8[h]": sa.Interval(second_precision=0, day_precision=0),
"m8[m]": sa.Interval(second_precision=0, day_precision=0),
"m8[s]": sa.Interval(second_precision=0, day_precision=0),
"m8[ms]": sa.Interval(second_precision=3, day_precision=0),
"m8[us]": sa.Interval(second_precision=6, day_precision=0),
"m8[ns]": sa.Interval(second_precision=9, day_precision=0),
}.items()}

_revtypes = dict(map(reversed, types.items()))
_revtypes.update({
sa.DATETIME: np.dtype('M8[us]'),
sa.TIMESTAMP: np.dtype('M8[us]'),
sa.FLOAT: np.dtype('f8'),
sa.DATE: np.dtype('M8[D]'),
sa.BIGINT: np.dtype('i8'),
sa.INTEGER: np.dtype('i4'),
sa.BIGINT: np.dtype('i8'),
sa.types.NullType: np.dtype('O'),
sa.REAL: np.dtype('f4'),
sa.Float: np.dtype('f8'),
})

_precision_types = {
sa.Float,
_postgresql.base.DOUBLE_PRECISION,
}


def _precision_to_dtype(precision):
if isinstance(precision, numbers.Integral):
if 1 <= precision <= 24:
return np.dtype('f4')
elif 25 <= precision <= 53:
return np.dtype('f8')
raise ValueError('%s is not a supported precision' % precision)


_units_of_power = {
0: 's',
3: 'ms',
6: 'us',
9: 'ns'
}


def _discover_type(type_):
if isinstance(type_, sa.Interval):
if type_.second_precision is None and type_.day_precision is None:
return np.dtype('m8[us]')
elif type_.second_precision == 0 and type_.day_precision == 0:
return np.dtype('m8[s]')

if (type_.second_precision in _units_of_power and
not type_.day_precision):
unit = _units_of_power[type_.second_precision]
elif type_.day_precision > 0:
unit = 'D'
else:
raise ValueError(
'Cannot infer INTERVAL type_e with parameters'
'second_precision=%d, day_precision=%d' %
(type_.second_precision, type_.day_precision),
)
return np.dtype('m8[%s]' % unit)
if type(type_) in _precision_types and type_.precision is not None:
return _precision_to_dtype(type_.precision)
if type_ in _revtypes:
return _revtypes[type_]
if type(type_) in _revtypes:
return _revtypes[type(type_)]
if isinstance(type_, sa.Numeric):
raise ValueError('Cannot adapt numeric type to numpy dtype')
if isinstance(type_, (sa.String, sa.Unicode)):
return np.dtype('O')
else:
for k, v in _revtypes.items():
if isinstance(k, type) and (isinstance(type_, k) or
hasattr(type_, 'impl') and
isinstance(type_.impl, k)):
return v
if k == type_:
return v
raise NotImplementedError('No SQL-numpy match for type %s' % type_)


def _warp_prism_types(query):
for name, dtype in discover(query).measure.fields:
for col in query.columns:
dtype = _discover_type(col.type)
try:
np_dtype = getattr(dtype, 'ty', dtype).to_numpy_dtype()
if np_dtype.kind == 'U':
yield _object_type_id
else:
yield _typeid_map[np_dtype]
yield _typeid_map[dtype]
except KeyError:
raise TypeError(
'warp_prism cannot query columns of type %s' % dtype,
Expand Down Expand Up @@ -136,7 +227,7 @@ def to_arrays(query, *, bind=None):
return {column_names[n]: v for n, v in enumerate(out)}


null_values = keymap(np.dtype, {
null_values = {np.dtype(k): v for k, v in {
'float32': np.nan,
'float64': np.nan,
'int16': np.nan,
Expand All @@ -145,7 +236,7 @@ def to_arrays(query, *, bind=None):
'bool': np.nan,
'datetime64[ns]': np.datetime64('nat', 'ns'),
'object': None,
})
}.items()}

# alias because ``to_dataframe`` shadows this name
_default_null_values_for_type = null_values
Expand Down Expand Up @@ -216,6 +307,9 @@ def register_odo_dataframe_edge():
If the selectable is not in a postgres database, it will fallback to the
default odo edge.
"""
from odo import convert
from datashape.predicates import istabular

# estimating 8 times faster
df_cost = convert.graph.edge[sa.sql.Select][pd.DataFrame]['cost'] / 8

Expand Down
93 changes: 93 additions & 0 deletions warp_prism/tests/bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import numpy as np
from odo import odo
import pandas as pd
from pandas import read_sql
import perf

from warp_prism import to_arrays, to_dataframe
from warp_prism.tests import tmp_db_uri


def setup(setup):
"""Mark that a test needs a benchmark needs a setup function to prepare the
inputes.
"""
def dec(f):
f._setup = setup
return f

return dec


def setup_largefloat(table_uri):
return odo(
pd.DataFrame({
'a': np.random.rand(1000000),
'b': np.random.rand(1000000)},
),
table_uri,
)


@setup(setup_largefloat)
def bench_largefloat_warp_prism_to_arrays(table):
counter = perf.perf_counter
start = counter()
_ = to_arrays(table) # noqa
return counter() - start


@setup(setup_largefloat)
def bench_largefloat_warp_prism_to_dataframe(table):
counter = perf.perf_counter
start = counter()
_ = to_dataframe(table) # noqa
return counter() - start


@setup(setup_largefloat)
def bench_largefloat_odo_to_dataframe(table):
counter = perf.perf_counter
start = counter()
_ = odo(table, pd.DataFrame) # noqa
return counter() - start


@setup(setup_largefloat)
def bench_largefloat_pandas_to_dataframe(table):
counter = perf.perf_counter
start = counter()
_ = read_sql(table) # noqa
return counter() - start


def main():
from traceback import print_exc

def wrapper(wrapped):
def bench(loops, *args):
return wrapped(*args)

return bench

with tmp_db_uri() as db_uri:
for k, v in globals().items():
if k.startswith('bench_'):
runner = perf.Runner()
runner.parse_args()
table_uri = '::'.join((db_uri, k))
setup = getattr(v, '_setup', lambda table_uri: table_uri)
try:
print('%s: ' % k, end='')
print(runner.bench_sample_func(
k,
wrapper(v),
setup(table_uri),
).format())
except Exception:
print('%s: error' % k)
print_exc()


if __name__ == '__main__':
main()

0 comments on commit ed0ecf0

Please sign in to comment.