-
Notifications
You must be signed in to change notification settings - Fork 81
[WIP] CHIP-10: PySpark + Notebook Integration for Chronon Feature Development #981
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
camweston-stripe
wants to merge
32
commits into
main
Choose a base branch
from
camweston/CHIP-10
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
04e900d
Initial cherry pick of CHIP-10 from stripe chronon repo
camweston-stripe be70595
Remove stripe specific comment
camweston-stripe 8b0bd1a
Add base README file
camweston-stripe a24bc2f
Remvoe s3 logic
camweston-stripe 959d3f0
Update readme
camweston-stripe 295cc2b
Update readme
camweston-stripe 848ba9f
Fix imports in utils.py
camweston-stripe e45de5f
Fix comments in constants
camweston-stripe 7357519
add new line to end of constants.py
camweston-stripe 380b928
Change output namespace
camweston-stripe e45f9dc
Add setup step for log file
camweston-stripe 9ef3955
Reformat databricks.py to be flake8 compliant
camweston-stripe 6f80b89
Reformat executables to be flake8 compliant
camweston-stripe f5c83eb
Reformat executables to be flake8 compliant
camweston-stripe 5346d5b
Reformat executables to be flake8 compliant
camweston-stripe a69a5b0
Reformat utils to be flake8 compliant
camweston-stripe 6f72c4e
Remove stripe specific logic from Databricks Table Utils
camweston-stripe a015583
Remove stripe specific logic from Databricks Table Utils
camweston-stripe 6792e0e
add explanation to databricks constants provider
camweston-stripe 6e859e0
Run scalafmt on new scala code
camweston-stripe 54c1828
Remove validate functions as the spark validator does not exist is OS…
camweston-stripe c300197
Remove constants provider + validate logic from python api
camweston-stripe d7fd828
Adjust table utils logic
camweston-stripe 695c6ef
Remove unused imports from databricks table utils
camweston-stripe 01d05f7
Fix table utils naming in pyspark utils
camweston-stripe bb98830
Fix linting in python + scala files
camweston-stripe 34e001e
Had to remove Databricks table utils as we don't have table utils set…
camweston-stripe b9d13da
Remove unused imports
camweston-stripe 605ca75
Trying to fix linting issue for pyspark utils
camweston-stripe 3685525
Trying to fix linting issue for pyspark utils
camweston-stripe e34a33f
Update readme
camweston-stripe 56b858f
Merge branch 'main' of https://github.com/airbnb/chronon into camwest…
camweston-stripe File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,312 @@ | ||
# Chronon Python Interface for PySpark Environments | ||
|
||
## Table of Contents | ||
1. [Introduction](#introduction) | ||
2. [Architecture Overview](#architecture-overview) | ||
3. [Core Components](#core-components) | ||
4. [Flow of Execution](#flow-of-execution) | ||
5. [Extending the Framework](#extending-the-framework) | ||
6. [Setup and Dependencies](#setup-and-dependencies) | ||
|
||
## Introduction | ||
|
||
The Chronon PySpark Interface provides a clean, object-oriented framework for executing Chronon feature definitions directly within a PySpark environment, like Databricks Notebooks. This interface streamlines the developer experience by removing the need to switch between multiple tools, allowing rapid prototyping and iteration of Chronon feature engineering workflows. | ||
|
||
This library enables users to: | ||
- Run and Analyze GroupBy and Join operations in a type-safe manner | ||
- Execute feature computations within notebook environments like Databricks | ||
- Implement platform-specific behavior while preserving a consistent interface | ||
- Access JVM-based functionality directly from Python code | ||
|
||
## Architecture Overview | ||
|
||
### The Python-JVM Bridge | ||
|
||
At the core of this implementation is the interaction between Python and the Java Virtual Machine (JVM): | ||
|
||
``` | ||
Python Environment | JVM Environment | ||
| | ||
┌─────────────────────┐ | ┌─────────────────────┐ | ||
│ Python Thrift Obj │ | │ Scala Thrift Obj │ | ||
│ (GroupBy, Join) │─────┐ | ┌────▶│ (GroupBy, Join) │ | ||
└─────────────────────┘ │ | │ └─────────────────────┘ | ||
│ │ | │ │ | ||
▼ │ | │ ▼ | ||
┌─────────────────────┐ │ | │ ┌─────────────────────┐ | ||
│ thrift_simple_json()│ │ | │ │ PySparkUtils.parse │ | ||
└─────────────────────┘ │ | │ └─────────────────────┘ | ||
│ │ | │ │ | ||
▼ │ | │ ▼ | ||
┌─────────────────────┐ │ | │ ┌─────────────────────┐ | ||
│ JSON String │─────┼──────┼──────┼────▶│ Java Objects │ | ||
└─────────────────────┘ │ | │ └─────────────────────┘ | ||
│ | │ │ | ||
│ | │ ▼ | ||
┌─────────────────────┐ │ | │ ┌─────────────────────┐ | ||
│ PySparkExecutable │ │ | │ │ PySparkUtils.run │ | ||
│ .run() │─────┼──────┼──────┼────▶│ GroupBy/Join │ | ||
└─────────────────────┘ │ | │ └─────────────────────┘ | ||
▲ │ | │ │ | ||
│ │ | │ ▼ | ||
┌─────────────────────┐ │ Py4J Socket │ ┌─────────────────────┐ | ||
│ Python DataFrame │◀────┴──────┼──────┴─────│ JVM DataFrame │ | ||
└─────────────────────┘ | └─────────────────────┘ | ||
| | ||
| | ||
| | ||
| | ||
| | ||
|
||
``` | ||
|
||
- **Py4J**: Enables Python code to dynamically access Java objects, methods, and fields across the JVM boundary | ||
- **PySpark**: Uses Py4J to communicate with the Spark JVM, translating Python calls into Spark's Java/Scala APIs | ||
- **Thrift Objects**: Chronon features defined as Python thrift objects are converted to Java thrift objects for execution | ||
|
||
This design ensures that Python users can access the full power of Chronon's JVM-based computation engine all from a centralized Python environment. | ||
|
||
## Core Components | ||
|
||
The framework is built around several core abstractions: | ||
|
||
### PySparkExecutable | ||
|
||
An abstract base class that provides common functionality for executing Chronon features via PySpark: | ||
|
||
```python | ||
class PySparkExecutable(Generic[T], ABC): | ||
""" | ||
Abstract base class defining common functionality for executing features via PySpark. | ||
""" | ||
``` | ||
|
||
- Handles initialization with object and SparkSession | ||
- Provides utilities for updating dates in sources and queries | ||
- Manages the execution of underlying join sources | ||
|
||
### Specialized Executables | ||
|
||
Two specialized interfaces extend the base executable for different Chronon types: | ||
|
||
- **GroupByExecutable**: Interface for executing GroupBy objects | ||
- **JoinExecutable**: Interface for executing Join objects | ||
|
||
These interfaces define type-specific behaviors for running and analyzing features. | ||
|
||
### Platform Interface | ||
|
||
A key abstraction that enables platform-specific behavior: | ||
|
||
```python | ||
class PlatformInterface(ABC): | ||
""" | ||
Interface for platform-specific operations. | ||
""" | ||
``` | ||
|
||
This interface defines operations that vary by platform (Databricks, Jupyter, etc.) and must be implemented by platform-specific classes. | ||
|
||
### Platform-Specific Implementations | ||
|
||
Concrete implementations for specific notebook environments: | ||
|
||
- **DatabricksPlatform**: Implements platform-specific operations for Databricks | ||
- **DatabricksGroupBy**: Executes GroupBy objects in Databricks | ||
- **DatabricksJoin**: Executes Join objects in Databricks | ||
|
||
``` | ||
┌─────────────────────────┐ | ||
│ PySparkExecutable │ | ||
│ (Generic[T], ABC) │ | ||
├─────────────────────────┤ | ||
│ - obj: T │ | ||
│ - spark: SparkSession │ | ||
│ - jvm: JVMView │ | ||
├─────────────────────────┤ | ||
│ + get_platform() │ | ||
│ # _update_query_dates() │ | ||
│ # _update_source_dates()│ | ||
│ # print_with_timestamp()│ | ||
└───────────────┬─────────┘ | ||
│ | ||
┌───────────┴────────────┐ | ||
│ │ | ||
┌───▼───────────────┐ ┌───▼───────────────┐ | ||
│ GroupByExecutable│ │ JoinExecutable │ | ||
├───────────────────┤ ├───────────────────┤ | ||
│ │ │ │ | ||
├───────────────────┤ ├───────────────────┤ | ||
│ + run() │ │ + run() │ | ||
│ + analyze() │ │ + analyze() │ | ||
└────────┬──────────┘ └────────┬──────────┘ | ||
│ │ | ||
│ │ | ||
┌────────▼──────────┐ ┌────────▼──────────┐ | ||
│ DatabricksGroupBy │ │ DatabricksJoin │ | ||
├───────────────────┤ ├───────────────────┤ | ||
│ │ │ │ | ||
├───────────────────┤ ├───────────────────┤ | ||
│ + get_platform() │ │ + get_platform() │ | ||
└───────────────────┘ └───────────────────┘ | ||
|
||
┌─────────────────────────────┐ | ||
│ PlatformInterface │ | ||
│ (ABC) │ | ||
├─────────────────────────────┤ | ||
│ - spark: SparkSession │ | ||
├─────────────────────────────┤ | ||
│ + register_udfs() │ | ||
│ + get_executable_join_cls() │ | ||
│ + start_log_capture() │ | ||
│ + end_log_capture() │ | ||
│ + log_operation() │ | ||
│ + drop_table_if_exists() │ | ||
└───────────┬─────────────────┘ | ||
│ | ||
│ | ||
┌───────────▼────────────────┐ | ||
│ DatabricksPlatform │ | ||
├────────────────────────────┤ | ||
│ - dbutils: DBUtils │ | ||
├────────────────────────────┤ | ||
│ + register_udfs() │ | ||
│ + get_executable_join_cls()│ | ||
│ + start_log_capture() │ | ||
│ + end_log_capture() │ | ||
│ + get_databricks_user() │ | ||
└────────────────────────────┘ | ||
``` | ||
|
||
## Flow of Execution | ||
|
||
When a user calls a method like `DatabricksGroupBy(group_by, py_spark_session).run()`, the following sequence occurs: | ||
|
||
1. **Object Preparation**: | ||
- The Python thrift object (GroupBy, Join) is copied and updated with appropriate dates (This interface is meant to be used to run prototypes over smaller date ranges and not full backfills) | ||
- Underlying join sources are executed if needed | ||
|
||
2. **JVM Conversion**: | ||
- The Python thrift object is converted to JSON representation | ||
- The JSON is parsed into a Java thrift object on the JVM side via Py4J | ||
|
||
3. **Execution**: | ||
- The JVM executes the computation using Spark | ||
- Results are captured in a Spark DataFrame on the JVM side | ||
|
||
4. **Result Return**: | ||
- The JVM DataFrame is wrapped in a Python DataFrame object | ||
- The Python DataFrame is returned to the user | ||
|
||
5. **Log Handling**: | ||
- Throughout the process, JVM logs are captured | ||
- Logs are printed in the notebook upon completion or errors | ||
|
||
## Extending the Framework | ||
|
||
### Implementing a New Platform Interface | ||
|
||
To add support for a new notebook environment (e.g., Jupyter), follow these steps: | ||
|
||
1. **Create a new platform implementation**: | ||
|
||
```python | ||
class JupyterPlatform(PlatformInterface): | ||
def __init__(self, spark: SparkSession): | ||
super().__init__(spark) | ||
# Initialize Jupyter-specific components | ||
|
||
@override | ||
def register_udfs(self) -> None: | ||
# Register any necessary UDFs for Jupyter | ||
# Recall that UDFs are registered to the shared spark-sql engine | ||
# So you can register python and or scala udfs and use them on both spark sessions | ||
pass | ||
|
||
@override | ||
def get_executable_join_cls(self) -> type[JoinExecutable]: | ||
# Return the Jupyter-specific join executable class | ||
return JupyterJoin | ||
|
||
@override | ||
def start_log_capture(self, job_name: str) -> Any: | ||
# Start capturing logs in Jupyter | ||
pass | ||
|
||
@override | ||
def end_log_capture(self, capture_token: Any) -> None: | ||
# End log capturing and print the logs in Jupyter | ||
pass | ||
``` | ||
|
||
2. **Create concrete executable implementations**: | ||
|
||
```python | ||
class JupyterGroupBy(GroupByExecutable): | ||
def __init__(self, group_by: GroupBy, spark_session: SparkSession): | ||
super().__init__(group_by, spark_session) | ||
# You can pass Jupyter specific parameters into to set metadata | ||
# that allow you to customize things like: | ||
# - What namespace is written to | ||
# - Table name prefixing (in the Databricks implementation we prefix the table name with the notebook username) | ||
# - Root dir for where your existing feature defs are if you want to import features that were defined in an IDE | ||
self.obj: GroupBy = self.platform.set_metadata(obj=self.obj) | ||
|
||
@override | ||
def get_platform(self) -> PlatformInterface: | ||
return JupyterPlatform(self.spark) | ||
|
||
class JupyterJoin(JoinExecutable): | ||
def __init__(self, join: Join, spark_session: SparkSession): | ||
super().__init__(join, spark_session) | ||
# You can pass Jupyter specific parameters into to set metadata | ||
# that allow you to customize things like: | ||
# - What namespace is written to | ||
# - Table name prefixing (in the Databricks implementation we prefix the table name with the notebook username) | ||
# - Root dir for where your existing feature defs are if you want to import features that were defined in an IDE | ||
self.obj: Join = self.platform.set_metadata(obj=self.obj) | ||
|
||
@override | ||
def get_platform(self) -> PlatformInterface: | ||
return JupyterPlatform(self.spark) | ||
``` | ||
|
||
### Key Methods to Override | ||
|
||
When implementing a platform interface, pay special attention to these methods: | ||
|
||
- **start_log_capture()** and **end_log_capture()**: Implement platform-specific log capturing | ||
|
||
## Setup and Dependencies | ||
|
||
### Requirements | ||
|
||
1. **Spark Dependencies**: The Chronon Java/Scala JARs must be included in your Spark cluster | ||
|
||
2. **Python Dependencies**: | ||
- pyspark (tested on both 3.1 and 3.3) | ||
|
||
3. **Log File**: You will need to make sure that your Chronon JVM logs are writting to single file. This is generally platform specific. | ||
|
||
### Example Setup | ||
|
||
Here's a minimal example of setting up and using the Chronon Python interface in a Databricks notebook. It assumes that you have already included the necessary jars in your cluster dependencies. | ||
|
||
```python | ||
# Import the required modules | ||
from pyspark.sql import SparkSession | ||
from ai.chronon.pyspark.databricks import DatabricksGroupBy, DatabricksJoin | ||
from ai.chronon.api.ttypes import GroupBy, Join | ||
from ai.chronon.group_by import Aggregation, Operation, Window, TimeUnit | ||
|
||
# Define your GroupBy or Join object | ||
my_group_by = GroupBy(...) | ||
|
||
# Create an executable | ||
executable = DatabricksGroupBy(my_group_by, spark) | ||
|
||
# Run the executable | ||
result_df = executable.run(start_date='20250101', end_date='20250107') | ||
``` | ||
|
||
--- |
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from __future__ import annotations | ||
from typing import Optional | ||
|
||
# -------------------------------------------------------------------------- | ||
# Company Specific Constants | ||
# -------------------------------------------------------------------------- | ||
|
||
PARTITION_COLUMN_FORMAT: str = '%Y%m%d' | ||
|
||
# -------------------------------------------------------------------------- | ||
# Platform Specific Constants | ||
# -------------------------------------------------------------------------- | ||
|
||
# -------------------------------------------------------------------------- | ||
# Databricks Constants | ||
# -------------------------------------------------------------------------- | ||
DATABRICKS_OUTPUT_NAMESPACE: Optional[str] = None | ||
DATABRICKS_JVM_LOG_FILE: str = "/databricks/chronon_logfile.log" | ||
DATABRICKS_ROOT_DIR_FOR_IMPORTED_FEATURES: str = "src" |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like those placeholders