diff --git a/.readthedocs.yml b/.readthedocs.yml new file mode 100644 index 00000000..bfc1d655 --- /dev/null +++ b/.readthedocs.yml @@ -0,0 +1,25 @@ +# .readthedocs.yml +# Read the Docs configuration file + +# Details +# - https://docs.readthedocs.io/en/stable/config-file/v2.html + +# Required +version: 2 + +build: + os: "ubuntu-22.04" + tools: + python: "3.11" + +# Build documentation in the docs/ directory with Sphinx +sphinx: + configuration: docs/conf.py + +python: + install: + - requirements: docs/requirements.txt + +# Optionally build your docs in additional formats such as PDF +# formats: +# - pdf diff --git a/CHANGES.txt b/CHANGES.txt index 0bbb7465..078296dc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,6 +17,11 @@ Unreleased - SQLAlchemy: Refactor ``OBJECT`` type to use SQLAlchemy's JSON type infrastructure. +- SQLAlchemy: Added ``insert_bulk`` fast-path ``INSERT`` method for pandas, in + order to support efficient batch inserts using CrateDB's "bulk operations" endpoint. + +- SQLAlchemy: Add documentation and software tests for usage with Dask + 2023/04/18 0.31.1 ================= diff --git a/docs/by-example/index.rst b/docs/by-example/index.rst index 54f90ce8..589beb99 100644 --- a/docs/by-example/index.rst +++ b/docs/by-example/index.rst @@ -48,6 +48,7 @@ its corresponding API interfaces, see also :ref:`sqlalchemy-support`. sqlalchemy/working-with-types sqlalchemy/advanced-querying sqlalchemy/inspection-reflection + sqlalchemy/dataframe .. _Python DB API: https://peps.python.org/pep-0249/ diff --git a/docs/by-example/sqlalchemy/dataframe.rst b/docs/by-example/sqlalchemy/dataframe.rst new file mode 100644 index 00000000..a2be1f88 --- /dev/null +++ b/docs/by-example/sqlalchemy/dataframe.rst @@ -0,0 +1,258 @@ +.. _sqlalchemy-pandas: +.. _sqlalchemy-dataframe: + +================================ +SQLAlchemy: DataFrame operations +================================ + +.. rubric:: Table of Contents + +.. contents:: + :local: + + +About +===== + +This section of the documentation demonstrates support for efficient batch/bulk +``INSERT`` operations with `pandas`_ and `Dask`_, using the CrateDB SQLAlchemy dialect. + +Efficient bulk operations are needed for typical `ETL`_ batch processing and +data streaming workloads, for example to move data in and out of OLAP data +warehouses, as contrasted to interactive online transaction processing (OLTP) +applications. The strategies of `batching`_ together series of records for +improving performance are also referred to as `chunking`_. + + +Introduction +============ + +pandas +------ +The :ref:`pandas DataFrame <pandas:api.dataframe>` is a structure that contains +two-dimensional data and its corresponding labels. DataFrames are widely used +in data science, machine learning, scientific computing, and many other +data-intensive fields. + +DataFrames are similar to SQL tables or the spreadsheets that you work with in +Excel or Calc. In many cases, DataFrames are faster, easier to use, and more +powerful than tables or spreadsheets because they are an integral part of the +`Python`_ and `NumPy`_ ecosystems. + +The :ref:`pandas I/O subsystem <pandas:api.io>` for `relational databases`_ +using `SQL`_ is based on `SQLAlchemy`_. + +Dask +---- +`Dask`_ is a flexible library for parallel computing in Python, which scales +Python code from multi-core local machines to large distributed clusters in +the cloud. Dask provides a familiar user interface by mirroring the APIs of +other libraries in the PyData ecosystem, including `pandas`_, `scikit-learn`_, +and `NumPy`_. + +A :doc:`dask:dataframe` is a large parallel DataFrame composed of many smaller +pandas DataFrames, split along the index. These pandas DataFrames may live on +disk for larger-than-memory computing on a single machine, or on many different +machines in a cluster. One Dask DataFrame operation triggers many operations on +the constituent pandas DataFrames. + + +Compatibility notes +=================== + +.. NOTE:: + + Please note that DataFrame support for pandas and Dask is only validated + with Python 3.8 and higher, and SQLAlchemy 1.4 and higher. We recommend + to use the most recent versions of those libraries. + + +Efficient ``INSERT`` operations with pandas +=========================================== + +The package provides a ``bulk_insert`` function to use the +:meth:`pandas:pandas.DataFrame.to_sql` method more efficiently, based on the +`CrateDB bulk operations`_ endpoint. It will effectively split your insert +workload across multiple batches, using a defined chunk size. + + >>> import sqlalchemy as sa + >>> from pandas._testing import makeTimeDataFrame + >>> from crate.client.sqlalchemy.support import insert_bulk + ... + >>> # Define number of records, and chunk size. + >>> INSERT_RECORDS = 42 + >>> CHUNK_SIZE = 8 + ... + >>> # Create a pandas DataFrame, and connect to CrateDB. + >>> df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S") + >>> engine = sa.create_engine(f"crate://{crate_host}") + ... + >>> # Insert content of DataFrame using batches of records. + >>> # Effectively, it's six. 42 / 8 = 5.25. + >>> df.to_sql( + ... name="test-testdrive", + ... con=engine, + ... if_exists="replace", + ... index=False, + ... chunksize=CHUNK_SIZE, + ... method=insert_bulk, + ... ) + +.. TIP:: + + You will observe that the optimal chunk size highly depends on the shape of + your data, specifically the width of each record, i.e. the number of columns + and their individual sizes, which will in the end determine the total size of + each batch/chunk. + + A few details should be taken into consideration when determining the optimal + chunk size for a specific dataset. We are outlining the two major ones. + + - First, when working with data larger than the main memory available on your + machine, each chunk should be small enough to fit into the memory, but large + enough to minimize the overhead of a single data insert operation. Depending + on whether you are running other workloads on the same machine, you should + also account for the total share of heap memory you will assign to each domain, + to prevent overloading the system as a whole. + + - Second, as each batch is submitted using HTTP, you should know about the request + size limits and other constraints of your HTTP infrastructure, which may include + any types of HTTP intermediaries relaying information between your database client + application and your CrateDB cluster. For example, HTTP proxy servers or load + balancers not optimally configured for performance, or web application firewalls + and intrusion prevention systems may hamper HTTP communication, sometimes in + subtle ways, for example based on request size constraints, or throttling + mechanisms. If you are working with very busy systems, and hosting it on shared + infrastructure, details like `SNAT port exhaustion`_ may also come into play. + + You will need to determine a good chunk size by running corresponding experiments + on your own behalf. For that purpose, you can use the `insert_pandas.py`_ program + as a blueprint. + + It is a good idea to start your explorations with a chunk size of 5_000, and + then see if performance improves when you increase or decrease that figure. + People are reporting that 10_000-20_000 is their optimal setting, but if you + process, for example, just three "small" columns, you may also experiment with + `leveling up to 200_000`_, because `the chunksize should not be too small`_. + If it is too small, the I/O cost will be too high to overcome the benefit of + batching. + + In order to learn more about what wide- vs. long-form (tidy, stacked, narrow) + data means in the context of `DataFrame computing`_, let us refer you to `a + general introduction <wide-narrow-general_>`_, the corresponding section in + the `Data Computing book <wide-narrow-data-computing_>`_, and a `pandas + tutorial <wide-narrow-pandas-tutorial_>`_ about the same topic. + + +Efficient ``INSERT`` operations with Dask +========================================= + +The same ``bulk_insert`` function presented in the previous section will also +be used in the context of `Dask`_, in order to make the +:func:`dask:dask.dataframe.to_sql` method more efficiently, based on the +`CrateDB bulk operations`_ endpoint. + +The example below will partition your insert workload into equal-sized parts, and +schedule it to be executed on Dask cluster resources, using a defined number of +compute partitions. Each worker instance will then insert its partition's records +in a batched/chunked manner, using a defined chunk size, effectively using the +pandas implementation introduced in the previous section. + + >>> import dask.dataframe as dd + >>> from pandas._testing import makeTimeDataFrame + >>> from crate.client.sqlalchemy.support import insert_bulk + ... + >>> # Define the number of records, the number of computing partitions, + >>> # and the chunk size of each database insert operation. + >>> INSERT_RECORDS = 100 + >>> NPARTITIONS = 4 + >>> CHUNK_SIZE = 25 + ... + >>> # Create a Dask DataFrame. + >>> df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S") + >>> ddf = dd.from_pandas(df, npartitions=NPARTITIONS) + ... + >>> # Insert content of DataFrame using multiple workers on a + >>> # compute cluster, transferred using batches of records. + >>> ddf.to_sql( + ... name="test-testdrive", + ... uri=f"crate://{crate_host}", + ... if_exists="replace", + ... index=False, + ... chunksize=CHUNK_SIZE, + ... method=insert_bulk, + ... parallel=True, + ... ) + + +.. TIP:: + + You will observe that optimizing your workload will now also involve determining a + good value for the ``NPARTITIONS`` argument, based on the capacity and topology of + the available compute resources, and based on workload characteristics or policies + like peak- vs. balanced- vs. shared-usage. For example, on a machine or cluster fully + dedicated to the problem at hand, you may want to use all available processor cores, + while on a shared system, this strategy may not be appropriate. + + If you want to dedicate all available compute resources on your machine, you may want + to use the number of CPU cores as a value to the ``NPARTITIONS`` argument. You can find + out about the available CPU cores on your machine, for example by running the ``nproc`` + command in your terminal. + + Depending on the implementation and runtime behavior of the compute task, the optimal + number of worker processes, determined by the ``NPARTITIONS`` argument, also needs to be + figured out by running a few test iterations. For that purpose, you can use the + `insert_dask.py`_ program as a blueprint. + + Adjusting this value in both directions is perfectly fine: If you observe that you are + overloading the machine, maybe because there are workloads scheduled other than the one + you are running, try to reduce the value. If fragments/steps of your implementation + involve waiting for network or disk I/O, you may want to increase the number of workers + beyond the number of available CPU cores, to increase utilization. On the other hand, + you should be wary about not over-committing resources too much, as it may slow your + system down. + + Before getting more serious with Dask, you are welcome to read and watch the excellent + :doc:`dask:best-practices` and :ref:`dask:dataframe.performance` resources, in order to + learn about things to avoid, and beyond. For finding out if your compute workload + scheduling is healthy, you can, for example, use Dask's :doc:`dask:dashboard`. + +.. WARNING:: + + Because the settings assigned in the example above fit together well, the ``to_sql()`` + instruction will effectively run four insert operations, executed in parallel, and + scheduled optimally on the available cluster resources. + + However, not using those settings sensibly, you can easily misconfigure the resource + scheduling system, and overload the underlying hardware or operating system, virtualized + or not. This is why experimenting with different parameters, and a real dataset, is crucial. + + + +.. hidden: Disconnect from database + + >>> engine.dispose() + + +.. _batching: https://en.wikipedia.org/wiki/Batch_processing#Common_batch_processing_usage +.. _chunking: https://en.wikipedia.org/wiki/Chunking_(computing) +.. _CrateDB bulk operations: https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations +.. _Dask: https://en.wikipedia.org/wiki/Dask_(software) +.. _DataFrame computing: https://realpython.com/pandas-dataframe/ +.. _ETL: https://en.wikipedia.org/wiki/Extract,_transform,_load +.. _insert_dask.py: https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/insert_dask.py +.. _insert_pandas.py: https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/insert_pandas.py +.. _leveling up to 200_000: https://acepor.github.io/2017/08/03/using-chunksize/ +.. _NumPy: https://en.wikipedia.org/wiki/NumPy +.. _pandas: https://en.wikipedia.org/wiki/Pandas_(software) +.. _pandas DataFrame: https://pandas.pydata.org/pandas-docs/stable/reference/frame.html +.. _Python: https://en.wikipedia.org/wiki/Python_(programming_language) +.. _relational databases: https://en.wikipedia.org/wiki/Relational_database +.. _scikit-learn: https://en.wikipedia.org/wiki/Scikit-learn +.. _SNAT port exhaustion: https://learn.microsoft.com/en-us/azure/load-balancer/troubleshoot-outbound-connection +.. _SQL: https://en.wikipedia.org/wiki/SQL +.. _SQLAlchemy: https://aosabook.org/en/v2/sqlalchemy.html +.. _the chunksize should not be too small: https://acepor.github.io/2017/08/03/using-chunksize/ +.. _wide-narrow-general: https://en.wikipedia.org/wiki/Wide_and_narrow_data +.. _wide-narrow-data-computing: https://dtkaplan.github.io/DataComputingEbook/chap-wide-vs-narrow.html#chap:wide-vs-narrow +.. _wide-narrow-pandas-tutorial: https://anvil.works/blog/tidy-data diff --git a/docs/conf.py b/docs/conf.py index 8267b131..59cc622f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -12,11 +12,14 @@ intersphinx_mapping.update({ 'py': ('https://docs.python.org/3/', None), 'sa': ('https://docs.sqlalchemy.org/en/14/', None), - 'urllib3': ('https://urllib3.readthedocs.io/en/1.26.13/', None) + 'urllib3': ('https://urllib3.readthedocs.io/en/1.26.13/', None), + 'dask': ('https://docs.dask.org/en/stable/', None), + 'pandas': ('https://pandas.pydata.org/docs/', None), }) linkcheck_anchors = True +linkcheck_ignore = [r"https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/.*"] rst_prolog = """ diff --git a/setup.py b/setup.py index be1b8a5c..5d4ee00b 100644 --- a/setup.py +++ b/setup.py @@ -68,8 +68,10 @@ def read(path): 'zope.testrunner>=5,<7', 'zc.customdoctests>=1.0.1,<2', 'createcoverage>=1,<2', + 'dask', 'stopit>=1.1.2,<2', 'flake8>=4,<7', + 'pandas', 'pytz', # `test_http.py` needs `setuptools.ssl_support` 'setuptools<57', diff --git a/src/crate/client/sqlalchemy/support.py b/src/crate/client/sqlalchemy/support.py new file mode 100644 index 00000000..326e41ce --- /dev/null +++ b/src/crate/client/sqlalchemy/support.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8; -*- +# +# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. Crate licenses +# this file to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# However, if you have executed another commercial license agreement +# with Crate these terms will supersede the license and you may use the +# software solely pursuant to the terms of the relevant commercial agreement. +import logging + + +logger = logging.getLogger(__name__) + + +def insert_bulk(pd_table, conn, keys, data_iter): + """ + Use CrateDB's "bulk operations" endpoint as a fast path for pandas' and Dask's `to_sql()` [1] method. + + The idea is to break out of SQLAlchemy, compile the insert statement, and use the raw + DBAPI connection client, in order to invoke a request using `bulk_parameters` [2]:: + + cursor.execute(sql=sql, bulk_parameters=data) + + The vanilla implementation, used by SQLAlchemy, is:: + + data = [dict(zip(keys, row)) for row in data_iter] + conn.execute(pd_table.table.insert(), data) + + Batch chunking will happen outside of this function, for example [3] demonstrates + the relevant code in `pandas.io.sql`. + + [1] https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html + [2] https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations + [3] https://github.com/pandas-dev/pandas/blob/v2.0.1/pandas/io/sql.py#L1011-L1027 + """ + + # Compile SQL statement and materialize batch. + sql = str(pd_table.table.insert().compile(bind=conn)) + data = list(data_iter) + + # For debugging and tracing the batches running through this method. + if logger.level == logging.DEBUG: + logger.debug(f"Bulk SQL: {sql}") + logger.debug(f"Bulk records: {len(data)}") + # logger.debug(f"Bulk data: {data}") + + # Invoke bulk insert operation. + cursor = conn._dbapi_connection.cursor() + cursor.execute(sql=sql, bulk_parameters=data) + cursor.close() diff --git a/src/crate/client/sqlalchemy/tests/bulk_test.py b/src/crate/client/sqlalchemy/tests/bulk_test.py index 71949f25..4546d1a4 100644 --- a/src/crate/client/sqlalchemy/tests/bulk_test.py +++ b/src/crate/client/sqlalchemy/tests/bulk_test.py @@ -18,14 +18,15 @@ # However, if you have executed another commercial license agreement # with Crate these terms will supersede the license and you may use the # software solely pursuant to the terms of the relevant commercial agreement. - +import math +import sys from unittest import TestCase, skipIf from unittest.mock import patch, MagicMock import sqlalchemy as sa from sqlalchemy.orm import Session -from crate.client.sqlalchemy.sa_version import SA_VERSION, SA_2_0 +from crate.client.sqlalchemy.sa_version import SA_VERSION, SA_2_0, SA_1_4 try: from sqlalchemy.orm import declarative_base @@ -36,8 +37,7 @@ fake_cursor = MagicMock(name='fake_cursor') -FakeCursor = MagicMock(name='FakeCursor', spec=Cursor) -FakeCursor.return_value = fake_cursor +FakeCursor = MagicMock(name='FakeCursor', spec=Cursor, return_value=fake_cursor) class SqlAlchemyBulkTest(TestCase): @@ -168,3 +168,89 @@ def test_bulk_save_modern(self): 'Callisto', 37, ) self.assertSequenceEqual(expected_bulk_args, bulk_args) + + @skipIf(sys.version_info < (3, 8), "SQLAlchemy/pandas is not supported on Python <3.8") + @skipIf(SA_VERSION < SA_1_4, "SQLAlchemy 1.3 is not supported by pandas") + @patch('crate.client.connection.Cursor', mock_cursor=FakeCursor) + def test_bulk_save_pandas(self, mock_cursor): + """ + Verify bulk INSERT with pandas. + """ + from pandas._testing import makeTimeDataFrame + from crate.client.sqlalchemy.support import insert_bulk + + # 42 records / 8 chunksize = 5.25, which means 6 batches will be emitted. + INSERT_RECORDS = 42 + CHUNK_SIZE = 8 + OPCOUNT = math.ceil(INSERT_RECORDS / CHUNK_SIZE) + + # Create a DataFrame to feed into the database. + df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S") + + dburi = "crate://localhost:4200" + engine = sa.create_engine(dburi, echo=True) + retval = df.to_sql( + name="test-testdrive", + con=engine, + if_exists="replace", + index=False, + chunksize=CHUNK_SIZE, + method=insert_bulk, + ) + self.assertIsNone(retval) + + # Initializing the query has an overhead of two calls to the cursor object, probably one + # initial connection from the DB-API driver, to inquire the database version, and another + # one, for SQLAlchemy. SQLAlchemy will use it to inquire the table schema using `information_schema`, + # and to eventually issue the `CREATE TABLE ...` statement. + effective_op_count = mock_cursor.call_count - 2 + + # Verify number of batches. + self.assertEqual(effective_op_count, OPCOUNT) + + @skipIf(sys.version_info < (3, 8), "SQLAlchemy/Dask is not supported on Python <3.8") + @skipIf(SA_VERSION < SA_1_4, "SQLAlchemy 1.3 is not supported by pandas") + @patch('crate.client.connection.Cursor', mock_cursor=FakeCursor) + def test_bulk_save_dask(self, mock_cursor): + """ + Verify bulk INSERT with Dask. + """ + import dask.dataframe as dd + from pandas._testing import makeTimeDataFrame + from crate.client.sqlalchemy.support import insert_bulk + + # 42 records / 4 partitions means each partition has a size of 10.5 elements. + # Because the chunk size 8 is slightly smaller than 10, the partition will not + # fit into it, so two batches will be emitted to the database for each data + # partition. 4 partitions * 2 batches = 8 insert operations will be emitted. + # Those settings are a perfect example of non-optimal settings, and have been + # made so on purpose, in order to demonstrate that using optimal settings + # is crucial. + INSERT_RECORDS = 42 + NPARTITIONS = 4 + CHUNK_SIZE = 8 + OPCOUNT = math.ceil(INSERT_RECORDS / NPARTITIONS / CHUNK_SIZE) * NPARTITIONS + + # Create a DataFrame to feed into the database. + df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S") + ddf = dd.from_pandas(df, npartitions=NPARTITIONS) + + dburi = "crate://localhost:4200" + retval = ddf.to_sql( + name="test-testdrive", + uri=dburi, + if_exists="replace", + index=False, + chunksize=CHUNK_SIZE, + method=insert_bulk, + parallel=True, + ) + self.assertIsNone(retval) + + # Each of the insert operation incurs another call to the cursor object. This is probably + # the initial connection from the DB-API driver, to inquire the database version. + # This compensation formula has been determined empirically / by educated guessing. + effective_op_count = (mock_cursor.call_count - 2 * NPARTITIONS) - 2 + + # Verify number of batches. + self.assertEqual(effective_op_count, OPCOUNT) diff --git a/src/crate/client/tests.py b/src/crate/client/tests.py index 953988ab..4ce9c950 100644 --- a/src/crate/client/tests.py +++ b/src/crate/client/tests.py @@ -24,6 +24,7 @@ import json import os import socket +import sys import unittest import doctest from pprint import pprint @@ -40,6 +41,7 @@ crate_host, crate_path, crate_port, \ crate_transport_port, docs_path, localhost from crate.client import connect +from .sqlalchemy import SA_VERSION, SA_1_4 from .test_cursor import CursorTest from .test_connection import ConnectionTest @@ -382,12 +384,23 @@ def test_suite(): s.layer = ensure_cratedb_layer() suite.addTest(s) - s = doctest.DocFileSuite( + sqlalchemy_integration_tests = [ 'docs/by-example/sqlalchemy/getting-started.rst', 'docs/by-example/sqlalchemy/crud.rst', 'docs/by-example/sqlalchemy/working-with-types.rst', 'docs/by-example/sqlalchemy/advanced-querying.rst', 'docs/by-example/sqlalchemy/inspection-reflection.rst', + ] + + # Don't run DataFrame integration tests on SQLAlchemy 1.3 and Python 3.7. + skip_dataframe = SA_VERSION < SA_1_4 or sys.version_info < (3, 8) + if not skip_dataframe: + sqlalchemy_integration_tests += [ + 'docs/by-example/sqlalchemy/dataframe.rst', + ] + + s = doctest.DocFileSuite( + *sqlalchemy_integration_tests, module_relative=False, setUp=setUpCrateLayerSqlAlchemy, tearDown=tearDownDropEntitiesSqlAlchemy,