diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 444a5c5c..15ad44bb 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -33,6 +33,11 @@ updates: schedule: interval: "daily" + - directory: "/by-dataframe/sqlframe" + package-ecosystem: "pip" + schedule: + interval: "daily" + - directory: "/by-language/csharp-npgsql" package-ecosystem: "nuget" schedule: diff --git a/.github/workflows/dataframe-sqlframe.yml b/.github/workflows/dataframe-sqlframe.yml new file mode 100644 index 00000000..1635a488 --- /dev/null +++ b/.github/workflows/dataframe-sqlframe.yml @@ -0,0 +1,74 @@ +name: SQLFrame + +on: + pull_request: + branches: ~ + paths: + - 'dataframe-sqlframe.yml' + - 'by-dataframe/sqlframe/**' + - '/requirements.txt' + push: + branches: [ main ] + paths: + - 'dataframe-sqlframe.yml' + - 'by-dataframe/sqlframe/**' + - '/requirements.txt' + + # Allow job to be triggered manually. + workflow_dispatch: + + # Run job each night after CrateDB nightly has been published. + schedule: + - cron: '0 3 * * *' + +# Cancel in-progress jobs when pushing to the same branch. +concurrency: + cancel-in-progress: true + group: ${{ github.workflow }}-${{ github.ref }} + +jobs: + test: + name: " + Python: ${{ matrix.python-version }} + CrateDB: ${{ matrix.cratedb-version }} + on ${{ matrix.os }}" + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ 'ubuntu-latest' ] + python-version: [ '3.9', '3.13' ] + cratedb-version: [ 'nightly' ] + + services: + cratedb: + image: crate/crate:${{ matrix.cratedb-version }} + ports: + - 4200:4200 + - 5432:5432 + env: + CRATE_HEAP_SIZE: 4g + + steps: + + - name: Acquire sources + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + architecture: x64 + cache: 'pip' + cache-dependency-path: | + requirements.txt + by-dataframe/sqlframe/requirements.txt + by-dataframe/sqlframe/requirements-test.txt + + - name: Install utilities + run: | + pip install -r requirements.txt + + - name: Validate by-dataframe/sqlframe + run: | + ngr test --accept-no-venv by-dataframe/sqlframe diff --git a/by-dataframe/sqlframe/.gitignore b/by-dataframe/sqlframe/.gitignore new file mode 100644 index 00000000..afed0735 --- /dev/null +++ b/by-dataframe/sqlframe/.gitignore @@ -0,0 +1 @@ +*.csv diff --git a/by-dataframe/sqlframe/README.md b/by-dataframe/sqlframe/README.md new file mode 100644 index 00000000..fd13fb01 --- /dev/null +++ b/by-dataframe/sqlframe/README.md @@ -0,0 +1,77 @@ +# Verify the `sqlframe` library with CrateDB + +Turning PySpark Into a Universal DataFrame API + +## About + +This folder includes software integration tests for verifying +that the [SQLFrame] Python library works well together with [CrateDB]. + +SQLFrame implements the [PySpark] DataFrame API in order to enable running +transformation pipelines directly on database engines - no Spark clusters +or dependencies required. + +## What's Inside + +- `example_basic.py`: A few examples that read CrateDB's `sys.summits` table. + An example inquiring existing tables. + +- `example_types.py`: An example that exercises all data types supported by + CrateDB. + +## Synopsis + +```shell +pip install --upgrade sqlframe +``` +```python +from psycopg2 import connect +from sqlframe import activate +from sqlframe.base.functions import col + +# Define database connection parameters, suitable for CrateDB on localhost. +# For CrateDB Cloud, use `crate://:@`. +conn = connect( + dbname="crate", + user="crate", + password="", + host="localhost", + port="5432", +) +# Activate SQLFrame to run directly on CrateDB. +activate("postgres", conn=conn) + +from pyspark.sql import SparkSession + +spark = SparkSession.builder.getOrCreate() + +# Invoke query. +df = spark.sql( + spark.table("sys.summits") + .where(col("region").ilike("ortler%")) + .sort(col("height").desc()) + .limit(3) +) +print(df.sql()) +df.show() +``` + +## Tests + +Set up sandbox and install packages. +```bash +pip install uv +uv venv .venv +source .venv/bin/activate +uv pip install -r requirements.txt -r requirements-test.txt +``` + +Run integration tests. +```bash +pytest +``` + + +[CrateDB]: https://cratedb.com/database +[PySpark]: https://spark.apache.org/docs/latest/api/python/ +[SQLFrame]: https://pypi.org/project/sqlframe/ diff --git a/by-dataframe/sqlframe/example_basic.py b/by-dataframe/sqlframe/example_basic.py new file mode 100644 index 00000000..eabc618b --- /dev/null +++ b/by-dataframe/sqlframe/example_basic.py @@ -0,0 +1,98 @@ +""" +Using `sqlframe` with CrateDB: Basic usage. + + pip install --upgrade sqlframe + +A few basic operations using the `sqlframe` library with CrateDB. + +- https://pypi.org/project/sqlframe/ +""" + +from psycopg2 import connect +from sqlframe import activate +from sqlframe.base.functions import col + +from patch import monkeypatch + + +def connect_spark(): + # Connect to database. + conn = connect( + dbname="crate", + user="crate", + password="", + host="localhost", + port="5432", + ) + # Activate SQLFrame to run directly on CrateDB. + activate("postgres", conn=conn) + + from pyspark.sql import SparkSession + + spark = SparkSession.builder.getOrCreate() + return spark + + +def sqlframe_select_sys_summits(): + """ + Query CrateDB's built-in `sys.summits` table. + :return: + """ + spark = connect_spark() + df = spark.sql( + spark.table("sys.summits") + .where(col("region").ilike("ortler%")) + .sort(col("height").desc()) + .limit(3) + ) + print(df.sql()) + df.show() + return df + + +def sqlframe_export_sys_summits_pandas(): + """ + Query CrateDB's built-in `sys.summits` table, returning a pandas dataframe. + """ + spark = connect_spark() + df = spark.sql( + spark.table("sys.summits") + .where(col("region").ilike("ortler%")) + .sort(col("height").desc()) + .limit(3) + ).toPandas() + return df + + +def sqlframe_export_sys_summits_csv(): + """ + Query CrateDB's built-in `sys.summits` table, saving the output to CSV. + """ + spark = connect_spark() + df = spark.sql( + spark.table("sys.summits") + .where(col("region").ilike("ortler%")) + .sort(col("height").desc()) + .limit(3) + ) + df.write.csv("summits.csv", mode="overwrite") + return df + + +def sqlframe_get_table_names(): + """ + Inquire table names of the system schema `sys`. + """ + spark = connect_spark() + tables = spark.catalog.listTables(dbName="sys") + return tables + + +monkeypatch() + + +if __name__ == "__main__": + print(sqlframe_select_sys_summits()) + print(sqlframe_export_sys_summits_pandas()) + print(sqlframe_export_sys_summits_csv()) + print(sqlframe_get_table_names()) diff --git a/by-dataframe/sqlframe/example_types.py b/by-dataframe/sqlframe/example_types.py new file mode 100644 index 00000000..92731e8d --- /dev/null +++ b/by-dataframe/sqlframe/example_types.py @@ -0,0 +1,137 @@ +""" +Using `sqlframe` with CrateDB: All data types. + + pip install --upgrade sqlframe + +An end-to-end lifecycle, defining a table, inserting data, and querying it. +This example uses all data types supported by CrateDB. + +- https://pypi.org/project/sqlframe/ +- https://cratedb.com/docs/crate/reference/en/latest/general/ddl/data-types.html#supported-types +""" + +import datetime as dt +from copy import deepcopy +from unittest import mock + +from psycopg2 import connect +from sqlframe import activate + +from patch import monkeypatch + + +def connect_spark(): + # Connect to database. + conn = connect( + dbname="crate", + user="crate", + password="", + host="localhost", + port="5432", + ) + # Activate SQLFrame to run directly on CrateDB. + activate("postgres", conn=conn) + + from pyspark.sql import SparkSession + + spark = SparkSession.builder.getOrCreate() + return spark + + +# The record that is inserted into the database. +RECORD_IN = dict( + null_integer=None, + integer=42, + bigint=42, + float=42.42, + double=42.42, + decimal=42.42, + bit="01010101", + bool=True, + text="foobar", + character="foo", + timestamp_tz="1970-01-02T00:00:00+01:00", + timestamp_notz="1970-01-02T00:00:00", + ip="127.0.0.1", + array=["foo", "bar"], + object={"foo": "bar"}, + geopoint=[85.43, 66.23], + geoshape="POLYGON ((5 5, 10 5, 10 10, 5 10, 5 5))", + float_vector=[1.0, 2.0, 3.0], +) + +# When querying it, a few values will be canonicalized. +RECORD_OUT = deepcopy(RECORD_IN) +RECORD_OUT.update( + dict( + character="foo ", + timestamp_tz=dt.datetime(1970, 1, 1, 23, 0, tzinfo=dt.timezone.utc), + timestamp_notz=dt.datetime(1970, 1, 2, 0, 0, tzinfo=dt.timezone.utc), + # FIXME: `geopoint` comes back as string, `'(85.42999997735023,66.22999997343868)'` + # geopoint=[pytest.approx(85.43), pytest.approx(66.23)], + geopoint=mock.ANY, + geoshape={ + "coordinates": [ + [[5.0, 5.0], [5.0, 10.0], [10.0, 10.0], [10.0, 5.0], [5.0, 5.0]] + ], + "type": "Polygon", + }, + ) +) + + +def sqlframe_ddl_dml_dql(): + """ + Validate all types of CrateDB. + + https://cratedb.com/docs/crate/reference/en/latest/general/ddl/data-types.html#supported-types + """ + spark = connect_spark() + run_sql = spark._connection.cursor().execute + + FRAME_IN = spark.createDataFrame([RECORD_IN]) + + # DDL + run_sql("DROP TABLE IF EXISTS testdrive.example") + run_sql(""" + CREATE TABLE testdrive.example ( + -- Numeric types + null_integer INT, + integer INT, + bigint BIGINT, + float FLOAT, + double DOUBLE, + decimal DECIMAL(8, 2), + -- Other scalar types + bit BIT(8), + bool BOOLEAN, + text TEXT, + character CHARACTER(5), + timestamp_tz TIMESTAMP WITH TIME ZONE, + timestamp_notz TIMESTAMP WITHOUT TIME ZONE, + ip IP, + -- Container types + "array" ARRAY(STRING), + "object" OBJECT(DYNAMIC), + -- Geospatial types + geopoint GEO_POINT, + geoshape GEO_SHAPE, + -- Vector type + "float_vector" FLOAT_VECTOR(3) + ); + """) + + # DML + FRAME_IN.write.saveAsTable("testdrive.example", mode="append") + + # DQL + run_sql("REFRESH TABLE testdrive.example") + data = spark.sql("SELECT * FROM testdrive.example").collect() + return data + + +monkeypatch() + + +if __name__ == "__main__": + print(sqlframe_ddl_dml_dql()) diff --git a/by-dataframe/sqlframe/patch.py b/by-dataframe/sqlframe/patch.py new file mode 100644 index 00000000..e7bdb39c --- /dev/null +++ b/by-dataframe/sqlframe/patch.py @@ -0,0 +1,65 @@ +import json +import typing as t +from collections import OrderedDict + +from sqlframe.base.session import _BaseSession +from sqlframe.postgres import PostgresCatalog, PostgresSession +from sqlglot import Generator, TokenType, exp +from sqlglot.dialects import Postgres + + +def monkeypatch(): + """ + Monkeypatch `sqlglot` and `sqlframe` until submitting a dedicated dialect for CrateDB. + """ + Postgres.Tokenizer.KEYWORDS.update( + { + "GEO_POINT": TokenType.GEOGRAPHY, + "GEO_SHAPE": TokenType.GEOGRAPHY, + } + ) + + def var_map_sql( + self: Generator, + expression: t.Union[exp.Map, exp.VarMap], + map_func_name: str = "MAP", + ) -> str: + """ + CrateDB accepts values for `OBJECT` types serialized as JSON. + """ + keys = expression.args["keys"] + values = expression.args["values"] + data = OrderedDict() + for key, value in zip(keys.expressions, values.expressions): + data[str(key).strip("'")] = str(value).strip("'") + return "'{}'".format(json.dumps(data)) + + def cast_sql( + self, expression: exp.Cast, safe_prefix: t.Optional[str] = None + ) -> str: + """ + Omit CASTing with CrateDB: Some values reflect as `TEXT`, which is wrong. + + TODO: REVIEW: Is it sane to do it this way? + Can the type mapping be improved somehow? + """ + format_sql = self.sql(expression, "format") + format_sql = f" FORMAT {format_sql}" if format_sql else "" + to_sql = self.sql(expression, "to") + to_sql = f" {to_sql}" if to_sql else "" + action = self.sql(expression, "action") + action = f" {action}" if action else "" + # Original: + # return f"{safe_prefix or ''}CAST({self.sql(expression, 'this')} AS{to_sql}{format_sql}{action})" + # CrateDB adjusted: + return f"{self.sql(expression, 'this')}" + + Postgres.Generator.TRANSFORMS.update( + { + exp.Map: var_map_sql, + exp.VarMap: var_map_sql, + } + ) + Generator.cast_sql = cast_sql + PostgresCatalog.currentCatalog = lambda _: "crate" + PostgresSession.__init__ = _BaseSession.__init__ diff --git a/by-dataframe/sqlframe/pyproject.toml b/by-dataframe/sqlframe/pyproject.toml new file mode 100644 index 00000000..b7997815 --- /dev/null +++ b/by-dataframe/sqlframe/pyproject.toml @@ -0,0 +1,12 @@ +[tool.pytest.ini_options] +minversion = "2.0" +addopts = """ + -rfEXs -p pytester --strict-markers --verbosity=3 + --capture=no + """ +log_level = "DEBUG" +log_cli_level = "DEBUG" +testpaths = ["*.py"] +xfail_strict = true +markers = [ +] diff --git a/by-dataframe/sqlframe/requirements-test.txt b/by-dataframe/sqlframe/requirements-test.txt new file mode 100644 index 00000000..508a3d0d --- /dev/null +++ b/by-dataframe/sqlframe/requirements-test.txt @@ -0,0 +1 @@ +pytest<9 diff --git a/by-dataframe/sqlframe/requirements.txt b/by-dataframe/sqlframe/requirements.txt new file mode 100644 index 00000000..e27ac7ed --- /dev/null +++ b/by-dataframe/sqlframe/requirements.txt @@ -0,0 +1,2 @@ +sqlframe[pandas]<3.10 +psycopg2-binary<2.10 diff --git a/by-dataframe/sqlframe/test.py b/by-dataframe/sqlframe/test.py new file mode 100644 index 00000000..e59013b4 --- /dev/null +++ b/by-dataframe/sqlframe/test.py @@ -0,0 +1,73 @@ +from sqlframe.base.catalog import Table + +from example_basic import ( + sqlframe_export_sys_summits_csv, + sqlframe_export_sys_summits_pandas, + sqlframe_get_table_names, + sqlframe_select_sys_summits, +) +from example_types import RECORD_OUT, sqlframe_ddl_dml_dql + + +def test_sys_summits(): + """ + Read built-in data from CrateDB's `sys` table through `sqlframe`. + """ + data = sqlframe_select_sys_summits().collect() + assert data[0]["mountain"] == "Ortler" + + +def test_get_table_names(): + """ + Validate inquiry of tables from `sys´ schema. + """ + data = sqlframe_get_table_names() + assert ( + Table( + name="nodes", + catalog="crate", + namespace=["sys"], + description=None, + tableType="MANAGED", + isTemporary=False, + ) + in data + ) + assert ( + Table( + name="shards", + catalog="crate", + namespace=["sys"], + description=None, + tableType="MANAGED", + isTemporary=False, + ) + in data + ) + assert len(data) > 10 + + +def test_export_sys_summits_pandas(): + """ + Validate exporting to pandas data frame works. + """ + data = sqlframe_export_sys_summits_pandas() + assert list(data["mountain"]) == ["Ortler", "Königspitze", "Monte Cevedale"] + + +def test_export_sys_summits_csv(): + """ + Validate exporting to CSV works. + """ + data = sqlframe_export_sys_summits_csv() + assert "classification,coordinates,country" in data + assert "Mont Blanc,4695,U-Savoy/Aosta" in data + + +def test_ddl_dml_dql(): + """ + Validate an end-to-end lifecycle, defining a table, inserting data, and querying it. + This example uses all data types supported by CrateDB. + """ + data = sqlframe_ddl_dml_dql() + assert data[0].asDict() == RECORD_OUT