-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
SQLFrame: Add basic and all-types examples
SQLFrame [1] implements the PySpark [2] DataFrame API in order to enable running transformation pipelines directly on database engines - no Spark clusters or dependencies required. [1] https://pypi.org/project/sqlframe/ [2] https://spark.apache.org/docs/latest/api/python/
- Loading branch information
Showing
11 changed files
with
545 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
name: SQLFrame | ||
|
||
on: | ||
pull_request: | ||
branches: ~ | ||
paths: | ||
- 'dataframe-sqlframe.yml' | ||
- 'by-dataframe/sqlframe/**' | ||
- '/requirements.txt' | ||
push: | ||
branches: [ main ] | ||
paths: | ||
- 'dataframe-sqlframe.yml' | ||
- 'by-dataframe/sqlframe/**' | ||
- '/requirements.txt' | ||
|
||
# Allow job to be triggered manually. | ||
workflow_dispatch: | ||
|
||
# Run job each night after CrateDB nightly has been published. | ||
schedule: | ||
- cron: '0 3 * * *' | ||
|
||
# Cancel in-progress jobs when pushing to the same branch. | ||
concurrency: | ||
cancel-in-progress: true | ||
group: ${{ github.workflow }}-${{ github.ref }} | ||
|
||
jobs: | ||
test: | ||
name: " | ||
Python: ${{ matrix.python-version }} | ||
CrateDB: ${{ matrix.cratedb-version }} | ||
on ${{ matrix.os }}" | ||
runs-on: ${{ matrix.os }} | ||
strategy: | ||
fail-fast: false | ||
matrix: | ||
os: [ 'ubuntu-latest' ] | ||
python-version: [ '3.9', '3.13' ] | ||
cratedb-version: [ 'nightly' ] | ||
|
||
services: | ||
cratedb: | ||
image: crate/crate:${{ matrix.cratedb-version }} | ||
ports: | ||
- 4200:4200 | ||
- 5432:5432 | ||
env: | ||
CRATE_HEAP_SIZE: 4g | ||
|
||
steps: | ||
|
||
- name: Acquire sources | ||
uses: actions/checkout@v4 | ||
|
||
- name: Set up Python | ||
uses: actions/setup-python@v5 | ||
with: | ||
python-version: ${{ matrix.python-version }} | ||
architecture: x64 | ||
cache: 'pip' | ||
cache-dependency-path: | | ||
requirements.txt | ||
by-dataframe/sqlframe/requirements.txt | ||
by-dataframe/sqlframe/requirements-test.txt | ||
- name: Install utilities | ||
run: | | ||
pip install -r requirements.txt | ||
- name: Validate by-dataframe/sqlframe | ||
run: | | ||
ngr test --accept-no-venv by-dataframe/sqlframe |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
*.csv |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
# Verify the `sqlframe` library with CrateDB | ||
|
||
Turning PySpark Into a Universal DataFrame API | ||
|
||
## About | ||
|
||
This folder includes software integration tests for verifying | ||
that the [SQLFrame] Python library works well together with [CrateDB]. | ||
|
||
SQLFrame implements the [PySpark] DataFrame API in order to enable running | ||
transformation pipelines directly on database engines - no Spark clusters | ||
or dependencies required. | ||
|
||
## What's Inside | ||
|
||
- `example_basic.py`: A few examples that read CrateDB's `sys.summits` table. | ||
An example inquiring existing tables. | ||
|
||
- `example_types.py`: An example that exercises all data types supported by | ||
CrateDB. | ||
|
||
## Synopsis | ||
|
||
```shell | ||
pip install --upgrade sqlframe | ||
``` | ||
```python | ||
from psycopg2 import connect | ||
from sqlframe import activate | ||
from sqlframe.base.functions import col | ||
|
||
# Define database connection parameters, suitable for CrateDB on localhost. | ||
# For CrateDB Cloud, use `crate://<username>:<password>@<host>`. | ||
conn = connect( | ||
dbname="crate", | ||
user="crate", | ||
password="", | ||
host="localhost", | ||
port="5432", | ||
) | ||
# Activate SQLFrame to run directly on CrateDB. | ||
activate("postgres", conn=conn) | ||
|
||
from pyspark.sql import SparkSession | ||
|
||
spark = SparkSession.builder.getOrCreate() | ||
|
||
# Invoke query. | ||
df = spark.sql( | ||
spark.table("sys.summits") | ||
.where(col("region").ilike("ortler%")) | ||
.sort(col("height").desc()) | ||
.limit(3) | ||
) | ||
print(df.sql()) | ||
df.show() | ||
``` | ||
|
||
## Tests | ||
|
||
Set up sandbox and install packages. | ||
```bash | ||
pip install uv | ||
uv venv .venv | ||
source .venv/bin/activate | ||
uv pip install -r requirements.txt -r requirements-test.txt | ||
``` | ||
|
||
Run integration tests. | ||
```bash | ||
pytest | ||
``` | ||
|
||
|
||
[CrateDB]: https://cratedb.com/database | ||
[PySpark]: https://spark.apache.org/docs/latest/api/python/ | ||
[SQLFrame]: https://pypi.org/project/sqlframe/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
""" | ||
Using `sqlframe` with CrateDB: Basic usage. | ||
pip install --upgrade sqlframe | ||
A few basic operations using the `sqlframe` library with CrateDB. | ||
- https://pypi.org/project/sqlframe/ | ||
""" | ||
|
||
from psycopg2 import connect | ||
from sqlframe import activate | ||
from sqlframe.base.functions import col | ||
|
||
from patch import monkeypatch | ||
|
||
|
||
def connect_spark(): | ||
# Connect to database. | ||
conn = connect( | ||
dbname="crate", | ||
user="crate", | ||
password="", | ||
host="localhost", | ||
port="5432", | ||
) | ||
# Activate SQLFrame to run directly on CrateDB. | ||
activate("postgres", conn=conn) | ||
|
||
from pyspark.sql import SparkSession | ||
|
||
spark = SparkSession.builder.getOrCreate() | ||
return spark | ||
|
||
|
||
def sqlframe_select_sys_summits(): | ||
""" | ||
Query CrateDB's built-in `sys.summits` table. | ||
:return: | ||
""" | ||
spark = connect_spark() | ||
df = spark.sql( | ||
spark.table("sys.summits") | ||
.where(col("region").ilike("ortler%")) | ||
.sort(col("height").desc()) | ||
.limit(3) | ||
) | ||
print(df.sql()) | ||
df.show() | ||
return df | ||
|
||
|
||
def sqlframe_export_sys_summits_pandas(): | ||
""" | ||
Query CrateDB's built-in `sys.summits` table, returning a pandas dataframe. | ||
""" | ||
spark = connect_spark() | ||
df = spark.sql( | ||
spark.table("sys.summits") | ||
.where(col("region").ilike("ortler%")) | ||
.sort(col("height").desc()) | ||
.limit(3) | ||
).toPandas() | ||
return df | ||
|
||
|
||
def sqlframe_export_sys_summits_csv(): | ||
""" | ||
Query CrateDB's built-in `sys.summits` table, saving the output to CSV. | ||
""" | ||
spark = connect_spark() | ||
df = spark.sql( | ||
spark.table("sys.summits") | ||
.where(col("region").ilike("ortler%")) | ||
.sort(col("height").desc()) | ||
.limit(3) | ||
) | ||
df.write.csv("summits.csv", mode="overwrite") | ||
return df | ||
|
||
|
||
def sqlframe_get_table_names(): | ||
""" | ||
Inquire table names of the system schema `sys`. | ||
""" | ||
spark = connect_spark() | ||
tables = spark.catalog.listTables(dbName="sys") | ||
return tables | ||
|
||
|
||
monkeypatch() | ||
|
||
|
||
if __name__ == "__main__": | ||
print(sqlframe_select_sys_summits()) | ||
print(sqlframe_export_sys_summits_pandas()) | ||
print(sqlframe_export_sys_summits_csv()) | ||
print(sqlframe_get_table_names()) |
Oops, something went wrong.