Skip to content

Commit

Permalink
profiling package and results for pandas extensions (#11)
Browse files Browse the repository at this point in the history
# Pull request description

## Branch summary

The below package was used to generate the following performance results for the Ohio-Pandas integration extensions to `DataFrame` ­– `pg_copy_to` & `pg_copy_from`.

## Results summary

In general, _all_ solutions involving the Ohio extensions require significantly less RAM; and, (except for the `buffer_size=1` case, which is no longer the default), Ohio extensions are also significantly faster.

Unlike Pandas built-in methods `to_sql` and `read_sql`, doing a custom PostgreSQL `COPY` via `io.StringIO` *is* competitive, but still significantly slower and more RAM-intensive than Ohio extensions. (And Ohio extensions have the added benefit of being one-liners, simple method invocations, and "built-in" to `DataFrame`, once the integration is loaded.)

## Profiling

The profiling package, `prof`, is executable as a package, (like a module):

    $ python -m prof …

I added a management command wrapper largely for added documentation and cleanliness:

    $ manage profile …

Generally, I used variations of the following command:

    $ time manage profile - --random --count 3 --plot ./plots/ ../dirtyduck/risks_aggregation.csv | tee profile-$(date +%s).txt

*I.e.*:

* Profile everything 3 times
* Plot averages with error bars
* Use some dirtyduck data – (it doesn't really matter what the CSV is, but it should be big, and the package was tested against something like this, with lots of floats and ints, and one as_of_dates column)
* It's probably best to randomize the order in which things are profiled
* It takes a long time to run, so GNU `time` reports on that.
* Store console (debugging) output in a file.

## Results

These take a really long time to run, and so while you could just run it as above, and get plots for each group, I generally filtered down to the one group of the two I was trying to test.

### Copy from DataFrame to database table

    $ time manage profile - --tag-filter "to database" --random --count 3 --plot ./plots/ ../dirtyduck/risks_aggregation.csv | tee profile-$(date +%s).txt

![image](https://user-images.githubusercontent.com/530998/55442312-a9ce9980-5573-11e9-9d5f-fe021ab94cce.png)

Basically, `to_sql` is a lost cause, (even playing with the `multi` parameter). However, the `StringIO` method comes close; and, it's unclear how close, at this scale.

So, ignoring `to_sql`:

    $ time manage profile - --tag-filter "to database" --filter "^(copy|ohio)" --random --count 3 --plot ./plots/ ../dirtyduck/risks_aggregation.csv | tee profile-$(date +%s).txt

![image](https://user-images.githubusercontent.com/530998/55442423-2d888600-5574-11e9-8127-33b53640726b.png)

Now we can see more clearly what was apparent in the previous plot: the `StringIO` version is significantly slower. (This makes sense, since it has to encode _all_ the data into a string before beginning to `COPY` it into the database.)

The error bars on RAM usage are a mess, however. It doesn't help that I'm comparing a ton of different configurations of Ohio, (including unreleased ones, which this profiling has almost certainly proven to be unnecessary).

These RAM usage measurements were tough to make seem "true." Running all profilers, by default, in a subprocess made a huge difference; but, for whatever reason, you can still get a wide variance – perhaps because I'm just running these on my laptop, and even at three trials, a weird one can throw things off.

In conclusion, I'd say that Ohio – especially with the current default configuration – does fair better on RAM usage, as well. And, that's just with this particular dataset. Unlike the `StringIO` version, the overhead of Ohio's solution shouldn't increase linearly.

### Copy from database table to DataFrame

    $ time manage profile - --tag-filter "to dataframe" --random --count 3 --plot ./plots/ ../dirtyduck/risks_aggregation.csv | tee profile-$(date +%s).txt

![image](https://user-images.githubusercontent.com/530998/55442862-f7e49c80-5575-11e9-9243-dc0ee33ee773.png)

Pandas – `read_sql` – is again a huge outlier in slowness and RAM usage.

    $ time manage profile - -f "(ohio|stringio)" --tag-filter "to dataframe" --random --count 3 --plot ./plots/ ../dirtyduck/risks_aggregation.csv | tee profile-$(date +%s).txt

![image](https://user-images.githubusercontent.com/530998/55442929-4134ec00-5576-11e9-99f3-16fcd79c2727.png)

Unlike in the previous context, here `StringIO` does _much_ worse on RAM. And, again, it's significantly slower, (if not tremendously so).

The Ohio solutions all do about the same, (and it's unclear which of them is definitely best).

---

All that said, I'm happy to run more trials. And, with this package, others are welcome to do the same! 😸 

# Pull request addendum

All right I have:

* moved some experimental stuff into dependent branch [experiment/profiling](profiling...experiment/profiling)
* cleaned up some utility stuff and added missing doc strings
* replaced custom `histogram` with `collections.Counter`
* added a profiler of `DataFrame > StringIO > COPY` that goes through a `DataFrame` accessor, such that we're comparing apples-to-apples

The results of the latter are interesting.

![image](https://user-images.githubusercontent.com/530998/55506410-fd47f280-561a-11e9-9d6d-a49bccc6a758.png)

As we saw before, the "generic" version, which uses `df.to_csv(string_io)`, is *way* slow. It has similar RAM usage; but, that's misleading, since there might be some weird (and/or bad) RAM usage going on when Pandas initiates the extension.

The novelty of the new profiler, which goes through a Pandas extension – `df.stringio_copy_to` – is that it should get no advantages with RAM. And it's ludicrous how much RAM it does use. (And, again, we can expect this to increase as the payload scales.)

What's interesting is that the StringIO accessor is as *fast* as it is. It's just using `csv.writer`.

So …

1. `DataFrame.to_csv(StringIO)` might be _terrible_, (at least without special configuration).
2. We're still gaining what we set out to gain from `pg_copy_to` – low, stable RAM usage, and *huge* a speed increase (thanks to `COPY`). The speed differential between `pg_copy_to` and the `StringIO` version isn't huge, (and the former is hugely preferable for its RAM usage). **But**, for future speed improvement, we might consider things like replacing `PipeTextIO` in `pg_copy_to` with another `ohio` utility, (as noted in that method's comments). (Or, an `asyncio` implementation of `PipeTextIO` _might_ help, as well.)

# Pull request URL

See: #11

# Change sets

* initial profiling (ipython) scripting & local results

* stand-alone profiling script & results for pg_copy_from

* moved profile/ directory to prof/ to avoid conflict with built-in module `profile`

* refactored profiling `prof` into executable package

* added management command for profiling

* completed profiling package and execution profile of pandas extensions

* profilers for copying dataframe to database
* profilers moved to subpackage
* profiler group tagging
* profilers executed in subprocess to ensure memory measurements
* automatic plotting of memory & time results for each profiler group
* randomize profiler order in group
* run multiple trials
* filter by profiler function name and/or by group tag
* removed in-development artifacts (txt, yaml, png)
* made management command wrapper handle arguments and errors correctly

* updated profiling package requirements

* removed profilers of experimental feature `iter_size`

* functional decorator for `Wrapper` class

* use built-in `collections.Counter` in place of custom `histogram`

* ``__doc__`` for ``tool`` & ``util``

* profiler for StringIO -> COPY via DataFrame accessor

This profiler permits a proper comparision to pg_copy_to, (and exposes
potential opportunities for improvement).
  • Loading branch information
jesteria authored Apr 3, 2019
1 parent c6b68fd commit 1f208d9
Show file tree
Hide file tree
Showing 10 changed files with 1,249 additions and 0 deletions.
18 changes: 18 additions & 0 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,24 @@ def test(self, args):
"""test the codebase"""
return (self.local.FG, self.local['tox'][args.remainder])

@local('remainder', metavar='...', nargs=REMAINDER,
help='for help with underlying command: "manage profile - -h"')
def profile(self, args):
"""profile the codebase"""
# -flags intended for subprocess must be distinguished by
# preceding empty flag / argument "-" or "--"
# (Or else argparse will copmlain.)
# But, we don't want to send these on to the subcommand.
if args.remainder and args.remainder[0] in '--':
remainder = args.remainder[1:]
else:
remainder = args.remainder[:]

try:
yield (self.local.FG, self.local['python']['-m', 'prof'][remainder])
except self.local.ProcessExecutionError as exc:
raise SystemExit(exc.retcode)

@local('part', choices=('major', 'minor', 'patch'),
help="part of the version to be bumped")
def bump(self, args):
Expand Down
Empty file added prof/__init__.py
Empty file.
5 changes: 5 additions & 0 deletions prof/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .profile import main


if __name__ == '__main__':
main(prog='python -m prof')
194 changes: 194 additions & 0 deletions prof/profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import argparse
import datetime
import itertools
import multiprocessing
import pathlib
import random
import re

import matplotlib.pyplot as plt
import numpy
import pandas

import ohio.ext.pandas # noqa

from .tool import (
free,
handle_method,
loadconfig,
profiler,
report,
report_input,
report_tag,
report_trial,
results,
save_child_results,
)

# profiler submodules
import prof.profilers.copy_from # noqa
import prof.profilers.copy_to # noqa


PROFILE_DIMS = ('memory', 'time')
PROFILE_UNITS = ('mb', 's')
PROFILE_LABELS = tuple(
f'{dim} ({unit})' for (dim, unit) in zip(PROFILE_DIMS, PROFILE_UNITS)
)


def main(prog=None, argv=None):
start_datetime = datetime.datetime.now()

parser = argparse.ArgumentParser(prog=prog,
description="profile the codebase")

parser.add_argument('-r', '--random', action='store_true',
help="randomize profiler ordering")
parser.add_argument('-f', '--filter', action='append', dest='filters',
metavar='FILTER', type=re.compile,
help="select only profilers matching regular expression(s)")
parser.add_argument('-t', '--tag-filter', action='append', dest='tag_filters',
metavar='FILTER', type=re.compile,
help="select only profilers with tags matching regular expression(s)")

exec_group = parser.add_argument_group("execution arguments")
exec_group.add_argument('-c', '--count', type=int, default=1,
help="number of trials (default: 1)")
exec_group.add_argument('--table', default='profiling', dest='table_name',
help="name to give to table in temporary database")
exec_group.add_argument('-p', '--plot', default='profile', metavar='PLOT_NAME',
help="plot results to path with file name derived from "
"base directory and/or name specified (default: profile)")
exec_group.add_argument('-np', '--no-plot', action='store_false', dest='plot',
help="do not plot results")
exec_group.add_argument('--subprocess', action='store_true', default=True,
help="execute each trial run in a new child process (the default)")
exec_group.add_argument('--no-subprocess', action='store_false', dest='subprocess',
help="execute all runs in the same (parent) process")

p_required = parser.add_mutually_exclusive_group(required=True)
p_required.add_argument('-l', '--list',
action='store_false', default=True, dest='execute',
help="list profilers without executing")
p_required.add_argument('data_path', type=pathlib.Path, nargs='?',
help="path to csv data file to load as input")

args = parser.parse_args(argv)

loadconfig.set(args)

tagged_profilers = profiler.filtered()

if args.execute:
report_input()
print()

trial_num = args.count if args.execute else 1

for trial_count in range(trial_num):
if trial_num > 1:
if trial_count > 0:
print()

report_trial(trial_count)
print()

for (tag_index, (tag, profilers)) in enumerate(tagged_profilers.items()):
if tag_index > 0:
print()

if tag is None:
if tag_index > 0:
# only report untagged if any *are* tagged
report_tag("<untagged>")
print()
else:
report_tag(tag)
print()

if args.random:
random.shuffle(profilers)

for (index, method) in enumerate(profilers):
if args.execute and index > 0:
print()
free()
print()

desc = method.__doc__
if args.execute:
desc = 'begin: ' + desc if desc else 'begin ...'

report(method, desc)

if args.execute:
if args.subprocess:
shared_results = multiprocessing.Array('f', len(PROFILE_DIMS))

proc = multiprocessing.Process(
target=handle_method,
args=(method, shared_results, PROFILE_DIMS),
)
proc.start()
proc.join()

save_child_results(method, shared_results, PROFILE_DIMS)
else:
method()

if args.plot and args.execute:
for (tag, profilers) in tagged_profilers.items():
tag_results = {
profiler.__name__: results.get(profiler)
for profiler in profilers
}

# NOTE: MultiIndex would perhaps be better
df = pandas.DataFrame(tag_results).transpose()
df_means = df.applymap(numpy.mean)
df_stds = df.applymap(numpy.std)

axis = None
plot_colors = 'bgrcmyk'
plot_fmts = 'ov^<>1234spP*+xXDd|_'
for (index, (label, color, symbol)) in enumerate(zip(
df.index,
itertools.cycle(plot_colors),
itertools.cycle(plot_fmts),
)):
data_slice = df_means[index:(index + 1)]
error_slice = df_stds[index:(index + 1)]
axis = data_slice.plot.scatter(
x=PROFILE_DIMS[0],
y=PROFILE_DIMS[1],
xerr=error_slice[PROFILE_DIMS[0]],
yerr=error_slice[PROFILE_DIMS[1]],
ax=axis,
label=label,
color=color,
marker=symbol,
)

axis.set(xlabel=PROFILE_LABELS[0], ylabel=PROFILE_LABELS[1])

plt.title(tag)

filename_base = pathlib.Path(args.plot)
if filename_base.is_dir():
filename_base /= 'profile' # append default filename base

tag_slug = tag and tag.replace(' ', '-')[:30]
tag_tag = f'-{tag_slug}' if tag_slug else ''
date_suffix = int(start_datetime.timestamp())

plot_filename = f'{filename_base}{tag_tag}-{date_suffix}.svg'

plt.savefig(plot_filename)

print()
print('[plot]', "saved:", plot_filename)


if __name__ == '__main__':
main()
Empty file added prof/profilers/__init__.py
Empty file.
163 changes: 163 additions & 0 deletions prof/profilers/copy_from.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import io

import pandas
import sqlalchemy

import prof.tool
from prof.tool import (
dtypes,
loaddata,
loadquery,
mprof,
sizecheck,
time,
)


profiler = prof.tool.profiler('copy from database to dataframe')


@profiler
@loadquery
@loaddata
@dtypes
@mprof
@time
@sizecheck
def ohio_pg_copy_from_1(engine, query):
"""pg_copy_from(buffer_size=1) {COPY → PipeTextIO → pandas.read_csv}"""
return pandas.DataFrame.pg_copy_from(query,
engine,
parse_dates=['as_of_date'],
buffer_size=1)


@profiler
@loadquery
@loaddata
@dtypes
@mprof
@time
@sizecheck
def ohio_pg_copy_from_10(engine, query):
"""pg_copy_from(buffer_size=10) {COPY → PipeTextIO → pandas.read_csv}"""
return pandas.DataFrame.pg_copy_from(query,
engine,
parse_dates=['as_of_date'],
buffer_size=10)


@profiler
@loadquery
@loaddata
@dtypes
@mprof
@time
@sizecheck
def ohio_pg_copy_from_10_stream_results(engine, query):
"""pg_copy_from(buffer_size=10) {stream_results | COPY → PipeTextIO → pandas.read_csv}"""
engine1 = sqlalchemy.create_engine(engine.url, execution_options=dict(stream_results=True,
max_row_buffer=10))
try:
return pandas.DataFrame.pg_copy_from(query,
engine1,
parse_dates=['as_of_date'],
buffer_size=10)
finally:
engine1.dispose()


@profiler
@loadquery
@loaddata
@dtypes
@mprof
@time
@sizecheck
def ohio_pg_copy_from_100(engine, query):
"""pg_copy_from(buffer_size=100) {COPY → PipeTextIO → pandas.read_csv}"""
return pandas.DataFrame.pg_copy_from(query,
engine,
parse_dates=['as_of_date'],
buffer_size=100)


@profiler
@loadquery
@loaddata
@dtypes
@mprof
@time
@sizecheck
def ohio_pg_copy_from_1000(engine, query):
"""pg_copy_from(buffer_size=1000) {COPY → PipeTextIO → pandas.read_csv}"""
return pandas.DataFrame.pg_copy_from(query,
engine,
parse_dates=['as_of_date'],
buffer_size=1000)


@profiler
@loadquery
@loaddata
@dtypes
@mprof
@time
@sizecheck
def pandas_read_csv_stringio(engine, query):
"""COPY → StringIO → pandas.read_csv"""
connection = engine.raw_connection()
cursor = connection.cursor()
buffer = io.StringIO()
cursor.copy_expert(
f'COPY ({query}) TO STDOUT WITH CSV HEADER',
buffer,
)
buffer.seek(0)
return pandas.read_csv(
buffer,
parse_dates=['as_of_date'],
)


@profiler
@loadquery
@loaddata
@dtypes
@mprof
@time
@sizecheck
def pandas_read_sql_chunks_100_stream_results(engine, query):
"""pandas.read_sql(chunksize=100) {stream_results}"""
engine1 = sqlalchemy.create_engine(engine.url, execution_options=dict(stream_results=True,
max_row_buffer=100))
try:
chunks = pandas.read_sql(query, engine1, parse_dates=['as_of_date'], chunksize=100)
return pandas.concat(chunks, copy=False)
finally:
engine1.dispose()


@profiler
@loadquery
@loaddata
@dtypes
@mprof
@time
@sizecheck
def pandas_read_sql_chunks_100(engine, query):
"""pandas.read_sql(chunksize=100)"""
chunks = pandas.read_sql(query, engine, parse_dates=['as_of_date'], chunksize=100)
return pandas.concat(chunks, copy=False)


@profiler
@loadquery
@loaddata
@dtypes
@mprof
@time
@sizecheck
def pandas_read_sql(engine, query):
"""pandas.read_sql"""
return pandas.read_sql(query, engine, parse_dates=['as_of_date'])
Loading

0 comments on commit 1f208d9

Please sign in to comment.