Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
04e900d
Initial cherry pick of CHIP-10 from stripe chronon repo
camweston-stripe Apr 24, 2025
be70595
Remove stripe specific comment
camweston-stripe Apr 24, 2025
8b0bd1a
Add base README file
camweston-stripe Apr 24, 2025
a24bc2f
Remvoe s3 logic
camweston-stripe Apr 24, 2025
959d3f0
Update readme
camweston-stripe Apr 24, 2025
295cc2b
Update readme
camweston-stripe Apr 24, 2025
848ba9f
Fix imports in utils.py
camweston-stripe Apr 24, 2025
e45de5f
Fix comments in constants
camweston-stripe Apr 24, 2025
7357519
add new line to end of constants.py
camweston-stripe Apr 24, 2025
380b928
Change output namespace
camweston-stripe Apr 24, 2025
e45f9dc
Add setup step for log file
camweston-stripe Apr 24, 2025
9ef3955
Reformat databricks.py to be flake8 compliant
camweston-stripe Apr 24, 2025
6f80b89
Reformat executables to be flake8 compliant
camweston-stripe Apr 25, 2025
f5c83eb
Reformat executables to be flake8 compliant
camweston-stripe Apr 25, 2025
5346d5b
Reformat executables to be flake8 compliant
camweston-stripe Apr 25, 2025
a69a5b0
Reformat utils to be flake8 compliant
camweston-stripe Apr 25, 2025
6f72c4e
Remove stripe specific logic from Databricks Table Utils
camweston-stripe Apr 25, 2025
a015583
Remove stripe specific logic from Databricks Table Utils
camweston-stripe Apr 25, 2025
6792e0e
add explanation to databricks constants provider
camweston-stripe Apr 25, 2025
6e859e0
Run scalafmt on new scala code
camweston-stripe Apr 25, 2025
54c1828
Remove validate functions as the spark validator does not exist is OS…
camweston-stripe Apr 28, 2025
c300197
Remove constants provider + validate logic from python api
camweston-stripe Apr 28, 2025
d7fd828
Adjust table utils logic
camweston-stripe Apr 28, 2025
695c6ef
Remove unused imports from databricks table utils
camweston-stripe Apr 28, 2025
01d05f7
Fix table utils naming in pyspark utils
camweston-stripe Apr 28, 2025
bb98830
Fix linting in python + scala files
camweston-stripe Apr 28, 2025
34e001e
Had to remove Databricks table utils as we don't have table utils set…
camweston-stripe Apr 28, 2025
b9d13da
Remove unused imports
camweston-stripe Apr 28, 2025
605ca75
Trying to fix linting issue for pyspark utils
camweston-stripe Apr 28, 2025
3685525
Trying to fix linting issue for pyspark utils
camweston-stripe Apr 28, 2025
e34a33f
Update readme
camweston-stripe Apr 28, 2025
56b858f
Merge branch 'main' of https://github.com/airbnb/chronon into camwest…
camweston-stripe Aug 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
312 changes: 312 additions & 0 deletions api/py/ai/chronon/pyspark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
# Chronon Python Interface for PySpark Environments

## Table of Contents
1. [Introduction](#introduction)
2. [Architecture Overview](#architecture-overview)
3. [Core Components](#core-components)
4. [Flow of Execution](#flow-of-execution)
5. [Extending the Framework](#extending-the-framework)
6. [Setup and Dependencies](#setup-and-dependencies)

## Introduction

The Chronon PySpark Interface provides a clean, object-oriented framework for executing Chronon feature definitions directly within a PySpark environment, like Databricks Notebooks. This interface streamlines the developer experience by removing the need to switch between multiple tools, allowing rapid prototyping and iteration of Chronon feature engineering workflows.

This library enables users to:
- Run and Analyze GroupBy and Join operations in a type-safe manner
- Execute feature computations within notebook environments like Databricks
- Implement platform-specific behavior while preserving a consistent interface
- Access JVM-based functionality directly from Python code

## Architecture Overview

### The Python-JVM Bridge

At the core of this implementation is the interaction between Python and the Java Virtual Machine (JVM):

```
Python Environment | JVM Environment
|
┌─────────────────────┐ | ┌─────────────────────┐
│ Python Thrift Obj │ | │ Scala Thrift Obj │
│ (GroupBy, Join) │─────┐ | ┌────▶│ (GroupBy, Join) │
└─────────────────────┘ │ | │ └─────────────────────┘
│ │ | │ │
▼ │ | │ ▼
┌─────────────────────┐ │ | │ ┌─────────────────────┐
│ thrift_simple_json()│ │ | │ │ PySparkUtils.parse │
└─────────────────────┘ │ | │ └─────────────────────┘
│ │ | │ │
▼ │ | │ ▼
┌─────────────────────┐ │ | │ ┌─────────────────────┐
│ JSON String │─────┼──────┼──────┼────▶│ Java Objects │
└─────────────────────┘ │ | │ └─────────────────────┘
│ | │ │
│ | │ ▼
┌─────────────────────┐ │ | │ ┌─────────────────────┐
│ PySparkExecutable │ │ | │ │ PySparkUtils.run │
│ .run() │─────┼──────┼──────┼────▶│ GroupBy/Join │
└─────────────────────┘ │ | │ └─────────────────────┘
▲ │ | │ │
│ │ | │ ▼
┌─────────────────────┐ │ Py4J Socket │ ┌─────────────────────┐
│ Python DataFrame │◀────┴──────┼──────┴─────│ JVM DataFrame │
└─────────────────────┘ | └─────────────────────┘
|
|
|
|
|

```

- **Py4J**: Enables Python code to dynamically access Java objects, methods, and fields across the JVM boundary
- **PySpark**: Uses Py4J to communicate with the Spark JVM, translating Python calls into Spark's Java/Scala APIs
- **Thrift Objects**: Chronon features defined as Python thrift objects are converted to Java thrift objects for execution

This design ensures that Python users can access the full power of Chronon's JVM-based computation engine all from a centralized Python environment.

## Core Components

The framework is built around several core abstractions:

### PySparkExecutable

An abstract base class that provides common functionality for executing Chronon features via PySpark:

```python
class PySparkExecutable(Generic[T], ABC):
"""
Abstract base class defining common functionality for executing features via PySpark.
"""
```

- Handles initialization with object and SparkSession
- Provides utilities for updating dates in sources and queries
- Manages the execution of underlying join sources

### Specialized Executables

Two specialized interfaces extend the base executable for different Chronon types:

- **GroupByExecutable**: Interface for executing GroupBy objects
- **JoinExecutable**: Interface for executing Join objects

These interfaces define type-specific behaviors for running and analyzing features.

### Platform Interface

A key abstraction that enables platform-specific behavior:

```python
class PlatformInterface(ABC):
"""
Interface for platform-specific operations.
"""
```

This interface defines operations that vary by platform (Databricks, Jupyter, etc.) and must be implemented by platform-specific classes.

### Platform-Specific Implementations

Concrete implementations for specific notebook environments:

- **DatabricksPlatform**: Implements platform-specific operations for Databricks
- **DatabricksGroupBy**: Executes GroupBy objects in Databricks
- **DatabricksJoin**: Executes Join objects in Databricks

```
┌─────────────────────────┐
│ PySparkExecutable │
│ (Generic[T], ABC) │
├─────────────────────────┤
│ - obj: T │
│ - spark: SparkSession │
│ - jvm: JVMView │
├─────────────────────────┤
│ + get_platform() │
│ # _update_query_dates() │
│ # _update_source_dates()│
│ # print_with_timestamp()│
└───────────────┬─────────┘
┌───────────┴────────────┐
│ │
┌───▼───────────────┐ ┌───▼───────────────┐
│ GroupByExecutable│ │ JoinExecutable │
├───────────────────┤ ├───────────────────┤
│ │ │ │
├───────────────────┤ ├───────────────────┤
│ + run() │ │ + run() │
│ + analyze() │ │ + analyze() │
└────────┬──────────┘ └────────┬──────────┘
│ │
│ │
┌────────▼──────────┐ ┌────────▼──────────┐
│ DatabricksGroupBy │ │ DatabricksJoin │
├───────────────────┤ ├───────────────────┤
│ │ │ │
├───────────────────┤ ├───────────────────┤
│ + get_platform() │ │ + get_platform() │
└───────────────────┘ └───────────────────┘

┌─────────────────────────────┐
│ PlatformInterface │
│ (ABC) │
├─────────────────────────────┤
│ - spark: SparkSession │
├─────────────────────────────┤
│ + register_udfs() │
│ + get_executable_join_cls() │
│ + start_log_capture() │
│ + end_log_capture() │
│ + log_operation() │
│ + drop_table_if_exists() │
└───────────┬─────────────────┘
┌───────────▼────────────────┐
│ DatabricksPlatform │
├────────────────────────────┤
│ - dbutils: DBUtils │
├────────────────────────────┤
│ + register_udfs() │
│ + get_executable_join_cls()│
│ + start_log_capture() │
│ + end_log_capture() │
│ + get_databricks_user() │
└────────────────────────────┘
```

## Flow of Execution

When a user calls a method like `DatabricksGroupBy(group_by, py_spark_session).run()`, the following sequence occurs:

1. **Object Preparation**:
- The Python thrift object (GroupBy, Join) is copied and updated with appropriate dates (This interface is meant to be used to run prototypes over smaller date ranges and not full backfills)
- Underlying join sources are executed if needed

2. **JVM Conversion**:
- The Python thrift object is converted to JSON representation
- The JSON is parsed into a Java thrift object on the JVM side via Py4J

3. **Execution**:
- The JVM executes the computation using Spark
- Results are captured in a Spark DataFrame on the JVM side

4. **Result Return**:
- The JVM DataFrame is wrapped in a Python DataFrame object
- The Python DataFrame is returned to the user

5. **Log Handling**:
- Throughout the process, JVM logs are captured
- Logs are printed in the notebook upon completion or errors

## Extending the Framework

### Implementing a New Platform Interface

To add support for a new notebook environment (e.g., Jupyter), follow these steps:

1. **Create a new platform implementation**:

```python
class JupyterPlatform(PlatformInterface):
def __init__(self, spark: SparkSession):
super().__init__(spark)
# Initialize Jupyter-specific components

@override
def register_udfs(self) -> None:
# Register any necessary UDFs for Jupyter
# Recall that UDFs are registered to the shared spark-sql engine
# So you can register python and or scala udfs and use them on both spark sessions
pass

@override
def get_executable_join_cls(self) -> type[JoinExecutable]:
# Return the Jupyter-specific join executable class
return JupyterJoin

@override
def start_log_capture(self, job_name: str) -> Any:
# Start capturing logs in Jupyter
pass

@override
def end_log_capture(self, capture_token: Any) -> None:
# End log capturing and print the logs in Jupyter
pass
```

2. **Create concrete executable implementations**:

```python
class JupyterGroupBy(GroupByExecutable):
def __init__(self, group_by: GroupBy, spark_session: SparkSession):
super().__init__(group_by, spark_session)
# You can pass Jupyter specific parameters into to set metadata
# that allow you to customize things like:
# - What namespace is written to
# - Table name prefixing (in the Databricks implementation we prefix the table name with the notebook username)
# - Root dir for where your existing feature defs are if you want to import features that were defined in an IDE
self.obj: GroupBy = self.platform.set_metadata(obj=self.obj)

@override
def get_platform(self) -> PlatformInterface:
return JupyterPlatform(self.spark)

class JupyterJoin(JoinExecutable):
def __init__(self, join: Join, spark_session: SparkSession):
super().__init__(join, spark_session)
# You can pass Jupyter specific parameters into to set metadata
# that allow you to customize things like:
# - What namespace is written to
# - Table name prefixing (in the Databricks implementation we prefix the table name with the notebook username)
# - Root dir for where your existing feature defs are if you want to import features that were defined in an IDE
self.obj: Join = self.platform.set_metadata(obj=self.obj)

@override
def get_platform(self) -> PlatformInterface:
return JupyterPlatform(self.spark)
```

### Key Methods to Override

When implementing a platform interface, pay special attention to these methods:

- **start_log_capture()** and **end_log_capture()**: Implement platform-specific log capturing

## Setup and Dependencies

### Requirements

1. **Spark Dependencies**: The Chronon Java/Scala JARs must be included in your Spark cluster

2. **Python Dependencies**:
- pyspark (tested on both 3.1 and 3.3)

3. **Log File**: You will need to make sure that your Chronon JVM logs are writting to single file. This is generally platform specific.

### Example Setup

Here's a minimal example of setting up and using the Chronon Python interface in a Databricks notebook. It assumes that you have already included the necessary jars in your cluster dependencies.

```python
# Import the required modules
from pyspark.sql import SparkSession
from ai.chronon.pyspark.databricks import DatabricksGroupBy, DatabricksJoin
from ai.chronon.api.ttypes import GroupBy, Join
from ai.chronon.group_by import Aggregation, Operation, Window, TimeUnit

# Define your GroupBy or Join object
my_group_by = GroupBy(...)

# Create an executable
executable = DatabricksGroupBy(my_group_by, spark)

# Run the executable
result_df = executable.run(start_date='20250101', end_date='20250107')
```

---
Empty file.
19 changes: 19 additions & 0 deletions api/py/ai/chronon/pyspark/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from __future__ import annotations
from typing import Optional

# --------------------------------------------------------------------------
# Company Specific Constants
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like those placeholders

# --------------------------------------------------------------------------

PARTITION_COLUMN_FORMAT: str = '%Y%m%d'

# --------------------------------------------------------------------------
# Platform Specific Constants
# --------------------------------------------------------------------------

# --------------------------------------------------------------------------
# Databricks Constants
# --------------------------------------------------------------------------
DATABRICKS_OUTPUT_NAMESPACE: Optional[str] = None
DATABRICKS_JVM_LOG_FILE: str = "/databricks/chronon_logfile.log"
DATABRICKS_ROOT_DIR_FOR_IMPORTED_FEATURES: str = "src"
Loading