Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fast-path INSERT method insert_bulk for SQLAlchemy/pandas/Dask #553

Merged
merged 5 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .readthedocs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# .readthedocs.yml
# Read the Docs configuration file

# Details
# - https://docs.readthedocs.io/en/stable/config-file/v2.html

# Required
version: 2

build:
os: "ubuntu-22.04"
tools:
python: "3.11"

# Build documentation in the docs/ directory with Sphinx
sphinx:
configuration: docs/conf.py

python:
install:
- requirements: docs/requirements.txt

# Optionally build your docs in additional formats such as PDF
# formats:
# - pdf
5 changes: 5 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ Unreleased

- SQLAlchemy: Refactor ``OBJECT`` type to use SQLAlchemy's JSON type infrastructure.

- SQLAlchemy: Added ``insert_bulk`` fast-path ``INSERT`` method for pandas, in
order to support efficient batch inserts using CrateDB's "bulk operations" endpoint.

- SQLAlchemy: Add documentation and software tests for usage with Dask


2023/04/18 0.31.1
=================
Expand Down
1 change: 1 addition & 0 deletions docs/by-example/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ its corresponding API interfaces, see also :ref:`sqlalchemy-support`.
sqlalchemy/working-with-types
sqlalchemy/advanced-querying
sqlalchemy/inspection-reflection
sqlalchemy/dataframe


.. _Python DB API: https://peps.python.org/pep-0249/
Expand Down
258 changes: 258 additions & 0 deletions docs/by-example/sqlalchemy/dataframe.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
.. _sqlalchemy-pandas:
.. _sqlalchemy-dataframe:

================================
SQLAlchemy: DataFrame operations
================================

.. rubric:: Table of Contents

.. contents::
:local:


About
=====

This section of the documentation demonstrates support for efficient batch/bulk
``INSERT`` operations with `pandas`_ and `Dask`_, using the CrateDB SQLAlchemy dialect.

Efficient bulk operations are needed for typical `ETL`_ batch processing and
data streaming workloads, for example to move data in and out of OLAP data
warehouses, as contrasted to interactive online transaction processing (OLTP)
applications. The strategies of `batching`_ together series of records for
improving performance are also referred to as `chunking`_.


Introduction
============

pandas
------
The :ref:`pandas DataFrame <pandas:api.dataframe>` is a structure that contains
two-dimensional data and its corresponding labels. DataFrames are widely used
in data science, machine learning, scientific computing, and many other
data-intensive fields.

DataFrames are similar to SQL tables or the spreadsheets that you work with in
Excel or Calc. In many cases, DataFrames are faster, easier to use, and more
powerful than tables or spreadsheets because they are an integral part of the
`Python`_ and `NumPy`_ ecosystems.

The :ref:`pandas I/O subsystem <pandas:api.io>` for `relational databases`_
using `SQL`_ is based on `SQLAlchemy`_.

Dask
----
`Dask`_ is a flexible library for parallel computing in Python, which scales
Python code from multi-core local machines to large distributed clusters in
the cloud. Dask provides a familiar user interface by mirroring the APIs of
other libraries in the PyData ecosystem, including `pandas`_, `scikit-learn`_,
and `NumPy`_.

A :doc:`dask:dataframe` is a large parallel DataFrame composed of many smaller
pandas DataFrames, split along the index. These pandas DataFrames may live on
disk for larger-than-memory computing on a single machine, or on many different
machines in a cluster. One Dask DataFrame operation triggers many operations on
the constituent pandas DataFrames.


Compatibility notes
===================

.. NOTE::

Please note that DataFrame support for pandas and Dask is only validated
with Python 3.8 and higher, and SQLAlchemy 1.4 and higher. We recommend
to use the most recent versions of those libraries.


Efficient ``INSERT`` operations with pandas
===========================================

The package provides a ``bulk_insert`` function to use the
:meth:`pandas:pandas.DataFrame.to_sql` method more efficiently, based on the
`CrateDB bulk operations`_ endpoint. It will effectively split your insert
workload across multiple batches, using a defined chunk size.

>>> import sqlalchemy as sa
>>> from pandas._testing import makeTimeDataFrame
>>> from crate.client.sqlalchemy.support import insert_bulk
...
>>> # Define number of records, and chunk size.
>>> INSERT_RECORDS = 42
>>> CHUNK_SIZE = 8
...
>>> # Create a pandas DataFrame, and connect to CrateDB.
>>> df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
>>> engine = sa.create_engine(f"crate://{crate_host}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add here the full sqlachemy URI: crate://user:password@crate_host:port

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion. It is technically not possible, because this reStructuredText file is not only documentation, but is also part of the test suite, being tested using Python's doctests, in oder to make sure the code examples don't break. Apologies.

...
>>> # Insert content of DataFrame using batches of records.
>>> # Effectively, it's six. 42 / 8 = 5.25.
>>> df.to_sql(
... name="test-testdrive",
... con=engine,
... if_exists="replace",
... index=False,
... chunksize=CHUNK_SIZE,
... method=insert_bulk,
... )

.. TIP::

You will observe that the optimal chunk size highly depends on the shape of
your data, specifically the width of each record, i.e. the number of columns
and their individual sizes, which will in the end determine the total size of
each batch/chunk.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add that the chunk should be small enough to fit the memory but large enough to minimize the overhead of data ingestion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. It is there, a few more lines below.


A few details should be taken into consideration when determining the optimal
chunk size for a specific dataset. We are outlining the two major ones.

- First, when working with data larger than the main memory available on your
machine, each chunk should be small enough to fit into the memory, but large
enough to minimize the overhead of a single data insert operation. Depending
on whether you are running other workloads on the same machine, you should
also account for the total share of heap memory you will assign to each domain,
to prevent overloading the system as a whole.

- Second, as each batch is submitted using HTTP, you should know about the request
size limits and other constraints of your HTTP infrastructure, which may include
any types of HTTP intermediaries relaying information between your database client
application and your CrateDB cluster. For example, HTTP proxy servers or load
balancers not optimally configured for performance, or web application firewalls
and intrusion prevention systems may hamper HTTP communication, sometimes in
subtle ways, for example based on request size constraints, or throttling
mechanisms. If you are working with very busy systems, and hosting it on shared
infrastructure, details like `SNAT port exhaustion`_ may also come into play.

You will need to determine a good chunk size by running corresponding experiments
on your own behalf. For that purpose, you can use the `insert_pandas.py`_ program
as a blueprint.

It is a good idea to start your explorations with a chunk size of 5_000, and
then see if performance improves when you increase or decrease that figure.
People are reporting that 10_000-20_000 is their optimal setting, but if you
process, for example, just three "small" columns, you may also experiment with
`leveling up to 200_000`_, because `the chunksize should not be too small`_.
hlcianfagna marked this conversation as resolved.
Show resolved Hide resolved
If it is too small, the I/O cost will be too high to overcome the benefit of
batching.

In order to learn more about what wide- vs. long-form (tidy, stacked, narrow)
data means in the context of `DataFrame computing`_, let us refer you to `a
general introduction <wide-narrow-general_>`_, the corresponding section in
the `Data Computing book <wide-narrow-data-computing_>`_, and a `pandas
tutorial <wide-narrow-pandas-tutorial_>`_ about the same topic.


Efficient ``INSERT`` operations with Dask
=========================================

The same ``bulk_insert`` function presented in the previous section will also
be used in the context of `Dask`_, in order to make the
:func:`dask:dask.dataframe.to_sql` method more efficiently, based on the
`CrateDB bulk operations`_ endpoint.

The example below will partition your insert workload into equal-sized parts, and
schedule it to be executed on Dask cluster resources, using a defined number of
compute partitions. Each worker instance will then insert its partition's records
in a batched/chunked manner, using a defined chunk size, effectively using the
pandas implementation introduced in the previous section.

>>> import dask.dataframe as dd
>>> from pandas._testing import makeTimeDataFrame
>>> from crate.client.sqlalchemy.support import insert_bulk
...
>>> # Define the number of records, the number of computing partitions,
>>> # and the chunk size of each database insert operation.
>>> INSERT_RECORDS = 100
>>> NPARTITIONS = 4
>>> CHUNK_SIZE = 25
...
>>> # Create a Dask DataFrame.
>>> df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
>>> ddf = dd.from_pandas(df, npartitions=NPARTITIONS)
amotl marked this conversation as resolved.
Show resolved Hide resolved
...
>>> # Insert content of DataFrame using multiple workers on a
>>> # compute cluster, transferred using batches of records.
>>> ddf.to_sql(
... name="test-testdrive",
... uri=f"crate://{crate_host}",
... if_exists="replace",
... index=False,
... chunksize=CHUNK_SIZE,
... method=insert_bulk,
... parallel=True,
... )


.. TIP::

You will observe that optimizing your workload will now also involve determining a
good value for the ``NPARTITIONS`` argument, based on the capacity and topology of
the available compute resources, and based on workload characteristics or policies
like peak- vs. balanced- vs. shared-usage. For example, on a machine or cluster fully
dedicated to the problem at hand, you may want to use all available processor cores,
while on a shared system, this strategy may not be appropriate.

If you want to dedicate all available compute resources on your machine, you may want
to use the number of CPU cores as a value to the ``NPARTITIONS`` argument. You can find
out about the available CPU cores on your machine, for example by running the ``nproc``
command in your terminal.

Depending on the implementation and runtime behavior of the compute task, the optimal
number of worker processes, determined by the ``NPARTITIONS`` argument, also needs to be
figured out by running a few test iterations. For that purpose, you can use the
`insert_dask.py`_ program as a blueprint.

Adjusting this value in both directions is perfectly fine: If you observe that you are
overloading the machine, maybe because there are workloads scheduled other than the one
you are running, try to reduce the value. If fragments/steps of your implementation
involve waiting for network or disk I/O, you may want to increase the number of workers
beyond the number of available CPU cores, to increase utilization. On the other hand,
you should be wary about not over-committing resources too much, as it may slow your
system down.

Before getting more serious with Dask, you are welcome to read and watch the excellent
:doc:`dask:best-practices` and :ref:`dask:dataframe.performance` resources, in order to
learn about things to avoid, and beyond. For finding out if your compute workload
scheduling is healthy, you can, for example, use Dask's :doc:`dask:dashboard`.

.. WARNING::

Because the settings assigned in the example above fit together well, the ``to_sql()``
instruction will effectively run four insert operations, executed in parallel, and
scheduled optimally on the available cluster resources.

However, not using those settings sensibly, you can easily misconfigure the resource
scheduling system, and overload the underlying hardware or operating system, virtualized
or not. This is why experimenting with different parameters, and a real dataset, is crucial.



.. hidden: Disconnect from database

>>> engine.dispose()


.. _batching: https://en.wikipedia.org/wiki/Batch_processing#Common_batch_processing_usage
.. _chunking: https://en.wikipedia.org/wiki/Chunking_(computing)
.. _CrateDB bulk operations: https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
.. _Dask: https://en.wikipedia.org/wiki/Dask_(software)
.. _DataFrame computing: https://realpython.com/pandas-dataframe/
.. _ETL: https://en.wikipedia.org/wiki/Extract,_transform,_load
.. _insert_dask.py: https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/insert_dask.py
.. _insert_pandas.py: https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/insert_pandas.py
.. _leveling up to 200_000: https://acepor.github.io/2017/08/03/using-chunksize/
.. _NumPy: https://en.wikipedia.org/wiki/NumPy
.. _pandas: https://en.wikipedia.org/wiki/Pandas_(software)
.. _pandas DataFrame: https://pandas.pydata.org/pandas-docs/stable/reference/frame.html
.. _Python: https://en.wikipedia.org/wiki/Python_(programming_language)
.. _relational databases: https://en.wikipedia.org/wiki/Relational_database
.. _scikit-learn: https://en.wikipedia.org/wiki/Scikit-learn
.. _SNAT port exhaustion: https://learn.microsoft.com/en-us/azure/load-balancer/troubleshoot-outbound-connection
.. _SQL: https://en.wikipedia.org/wiki/SQL
.. _SQLAlchemy: https://aosabook.org/en/v2/sqlalchemy.html
.. _the chunksize should not be too small: https://acepor.github.io/2017/08/03/using-chunksize/
.. _wide-narrow-general: https://en.wikipedia.org/wiki/Wide_and_narrow_data
.. _wide-narrow-data-computing: https://dtkaplan.github.io/DataComputingEbook/chap-wide-vs-narrow.html#chap:wide-vs-narrow
.. _wide-narrow-pandas-tutorial: https://anvil.works/blog/tidy-data
5 changes: 4 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
intersphinx_mapping.update({
'py': ('https://docs.python.org/3/', None),
'sa': ('https://docs.sqlalchemy.org/en/14/', None),
'urllib3': ('https://urllib3.readthedocs.io/en/1.26.13/', None)
'urllib3': ('https://urllib3.readthedocs.io/en/1.26.13/', None),
'dask': ('https://docs.dask.org/en/stable/', None),
'pandas': ('https://pandas.pydata.org/docs/', None),
})


linkcheck_anchors = True
linkcheck_ignore = [r"https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/.*"]


rst_prolog = """
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ def read(path):
'zope.testrunner>=5,<7',
'zc.customdoctests>=1.0.1,<2',
'createcoverage>=1,<2',
'dask',

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure but shouldn't we have here dask[complete] to install all dask dependencies (e.g., dask.dataframe)

Copy link
Member Author

@amotl amotl Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list of dependencies is only used as requirements for running the test suite. We do not automatically install pandas or dask together with the library, as runtime dependencies. As such, I think it is fine to use a minimal set of dependencies, which is enough to satisfy the requirements of the test suite.

'stopit>=1.1.2,<2',
'flake8>=4,<7',
'pandas',
'pytz',
# `test_http.py` needs `setuptools.ssl_support`
'setuptools<57',
Expand Down
62 changes: 62 additions & 0 deletions src/crate/client/sqlalchemy/support.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# -*- coding: utf-8; -*-
#
# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
# license agreements. See the NOTICE file distributed with this work for
# additional information regarding copyright ownership. Crate licenses
# this file to you under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. You may
# obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# However, if you have executed another commercial license agreement
# with Crate these terms will supersede the license and you may use the
# software solely pursuant to the terms of the relevant commercial agreement.
import logging


logger = logging.getLogger(__name__)


def insert_bulk(pd_table, conn, keys, data_iter):
"""
Use CrateDB's "bulk operations" endpoint as a fast path for pandas' and Dask's `to_sql()` [1] method.

The idea is to break out of SQLAlchemy, compile the insert statement, and use the raw
DBAPI connection client, in order to invoke a request using `bulk_parameters` [2]::

cursor.execute(sql=sql, bulk_parameters=data)

The vanilla implementation, used by SQLAlchemy, is::

data = [dict(zip(keys, row)) for row in data_iter]
conn.execute(pd_table.table.insert(), data)

Batch chunking will happen outside of this function, for example [3] demonstrates
the relevant code in `pandas.io.sql`.

[1] https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html
[2] https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
[3] https://github.com/pandas-dev/pandas/blob/v2.0.1/pandas/io/sql.py#L1011-L1027
"""

# Compile SQL statement and materialize batch.
sql = str(pd_table.table.insert().compile(bind=conn))
data = list(data_iter)

# For debugging and tracing the batches running through this method.
if logger.level == logging.DEBUG:
logger.debug(f"Bulk SQL: {sql}")
logger.debug(f"Bulk records: {len(data)}")
# logger.debug(f"Bulk data: {data}")

# Invoke bulk insert operation.
cursor = conn._dbapi_connection.cursor()
cursor.execute(sql=sql, bulk_parameters=data)
hlcianfagna marked this conversation as resolved.
Show resolved Hide resolved
cursor.close()
Loading