diff --git a/README.md b/README.md index cdc7e302b60d..30e4fe06608e 100644 --- a/README.md +++ b/README.md @@ -135,8 +135,9 @@ This allows you to combine the flexibility of Python with the scale and performa ## Backends -Ibis supports nearly 20 backends: +Ibis supports over 20 backends: +- [Apache Beam](https://ibis-project.org/backends/beam/) - [Apache DataFusion](https://ibis-project.org/backends/datafusion/) - [Apache Druid](https://ibis-project.org/backends/druid/) - [Apache Flink](https://ibis-project.org/backends/flink) diff --git a/docs/backends/beam.qmd b/docs/backends/beam.qmd new file mode 100644 index 000000000000..fe69b0142d7f --- /dev/null +++ b/docs/backends/beam.qmd @@ -0,0 +1,465 @@ +--- +title: "Apache Beam" +description: "Apache Beam backend for Ibis" +--- + +## Overview + +The Apache Beam backend for Ibis provides support for running SQL queries using Apache Beam's SQL capabilities. This backend allows you to use Ibis expressions with Beam SQL, enabling portable data processing pipelines that can run on various runners including Apache Flink, Apache Spark, and Google Cloud Dataflow. + +## Installation + +To use the Beam backend, install Ibis with the beam extra: + +```bash +pip install 'ibis-framework[beam]' +``` + +## Connection + +Connect to a Beam pipeline: + +```python +import ibis +import apache_beam as beam + +# Create a Beam pipeline +pipeline = beam.Pipeline() + +# Connect Ibis to the pipeline +con = ibis.beam.connect(pipeline) +``` + +## Basic Usage + +### Creating Tables + +Create tables from pandas DataFrames: + +```python +import pandas as pd + +# Create sample data +data = pd.DataFrame({ + 'id': [1, 2, 3, 4, 5], + 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'], + 'age': [25, 30, 35, 40, 45], + 'salary': [50000, 60000, 70000, 80000, 90000] +}) + +# Create a temporary view +employees = con.create_view('employees', data, temp=True) +``` + +### Querying Data + +Use Ibis expressions to query your data: + +```python +# Simple selection +result = employees.select('name', 'age') + +# Filtering +young_employees = employees.filter(employees.age < 35) + +# Aggregations +avg_salary = employees.salary.mean() +salary_by_age = employees.group_by('age').agg( + avg_salary=employees.salary.mean(), + count=employees.count() +) +``` + +### Executing Queries + +Execute queries and get results: + +```python +# Execute and get pandas DataFrame +df = con.execute(avg_salary) +print(df) + +# Compile to SQL +sql = con.compile(salary_by_age) +print(sql) +``` + +## Features + +### Supported Operations + +The Beam backend supports most standard Ibis operations: + +- **Selection and Projection**: `select()`, column access +- **Filtering**: `filter()`, boolean operations +- **Aggregations**: `sum()`, `mean()`, `count()`, `min()`, `max()` +- **Grouping**: `group_by()`, `agg()` +- **Joins**: `join()`, `left_join()`, `right_join()`, `outer_join()` +- **Sorting**: `order_by()` +- **Window Functions**: `row_number()`, `rank()`, `dense_rank()` + +### Data Types + +The backend supports standard SQL data types: + +- **Numeric**: `INTEGER`, `BIGINT`, `FLOAT`, `DOUBLE` +- **String**: `VARCHAR`, `CHAR` +- **Boolean**: `BOOLEAN` +- **Date/Time**: `DATE`, `TIME`, `TIMESTAMP` +- **Complex**: `ARRAY`, `MAP`, `ROW` (struct) + +### Limitations + +- **Temporary Tables Only**: Currently, only temporary views are supported for in-memory data +- **Limited Schema Discovery**: Schema discovery for external tables requires manual specification +- **No NaN/Infinity**: Beam SQL doesn't support NaN or Infinity values +- **Limited UDF Support**: User-defined functions have limited support + +## Advanced Usage + +### Working with Catalogs and Databases + +With the upcoming Apache Iceberg catalog support in Beam SQL, you can now work with catalogs and databases: + +#### Creating Catalogs + +```python +# Create an Iceberg catalog +con.raw_sql("CREATE CATALOG iceberg_catalog WITH (type = 'iceberg', warehouse = 's3://my-bucket/warehouse')") + +# Create a Hive catalog +con.raw_sql("CREATE CATALOG hive_catalog WITH (type = 'hive', metastore_uri = 'thrift://localhost:9083', warehouse = 'hdfs://localhost:9000/warehouse')") + +# Set current catalog +con.raw_sql("SET catalog = 'iceberg_catalog'") +``` + +#### Creating Databases + +```python +# Create database in current catalog +con.raw_sql("CREATE DATABASE analytics") + +# Create database in specific catalog +con.raw_sql("CREATE DATABASE staging IN CATALOG iceberg_catalog") + +# Create database with properties +con.raw_sql("CREATE DATABASE analytics WITH (location = 's3://my-bucket/analytics')") + +# Set current database +con.raw_sql("SET database = 'analytics'") +``` + +#### Catalog and Database Management + +```python +# List catalogs +catalogs = con.list_catalogs() +print(catalogs) # ['iceberg_catalog', 'hive_catalog'] + +# List databases +databases = con.list_databases() +print(databases) # ['analytics', 'staging'] + +# Get current catalog and database +print(con.current_catalog) # 'iceberg_catalog' +print(con.current_database) # 'analytics' + +# Drop database +con.drop_database('staging') + +# Drop catalog +con.drop_catalog('hive_catalog') +``` + +### Working with External Data + +For external data sources, you can create tables with explicit schemas: + +```python +import ibis.expr.schema as sch + +# Define schema +schema = sch.Schema({ + 'id': 'int64', + 'name': 'string', + 'created_at': 'timestamp' +}) + +# Create table with properties +table = con.create_table( + 'external_table', + schema=schema, + tbl_properties={ + 'connector': 'filesystem', + 'path': '/path/to/data', + 'format': 'parquet' + } +) +``` + +### Apache Iceberg Integration + +With the new catalog support, you can leverage Apache Iceberg's advanced features: + +```python +# Complete Iceberg setup +con.raw_sql("CREATE CATALOG iceberg_catalog WITH (type = 'iceberg', warehouse = 's3://my-bucket/warehouse')") +con.raw_sql("SET catalog = 'iceberg_catalog'") +con.raw_sql("CREATE DATABASE analytics") +con.raw_sql("SET database = 'analytics'") + +# Create Iceberg table with partitioning +con.raw_sql(""" +CREATE TABLE user_events ( + user_id BIGINT, + event_type STRING, + event_time TIMESTAMP, + properties MAP +) PARTITIONED BY (event_time) +""") + +# Use Ibis with Iceberg tables +events = con.table('user_events') +result = events.filter(events.event_type == 'click').group_by('user_id').agg( + click_count=events.count() +) +``` + +### Pipeline Execution + +The Beam backend integrates with Beam's pipeline execution model: + +```python +# Your Ibis operations are automatically converted to Beam transforms +result = con.execute(complex_query) + +# The pipeline can be run on various runners +with beam.Pipeline(runner=beam.runners.DirectRunner()) as pipeline: + con = ibis.beam.connect(pipeline) + # ... your Ibis operations +``` + +## Examples + +### Data Processing Pipeline + +```python +import ibis +import apache_beam as beam +import pandas as pd + +# Create pipeline +pipeline = beam.Pipeline() +con = ibis.beam.connect(pipeline) + +# Load data +sales_data = pd.DataFrame({ + 'product_id': [1, 2, 3, 1, 2, 3], + 'quantity': [10, 5, 8, 12, 7, 9], + 'price': [100, 200, 150, 110, 190, 160], + 'date': ['2023-01-01', '2023-01-01', '2023-01-01', + '2023-01-02', '2023-01-02', '2023-01-02'] +}) + +sales = con.create_view('sales', sales_data, temp=True) + +# Calculate daily revenue +daily_revenue = sales.group_by('date').agg( + total_revenue=(sales.quantity * sales.price).sum(), + total_quantity=sales.quantity.sum() +).order_by('date') + +# Execute +result = con.execute(daily_revenue) +print(result) +``` + +### Complex Analytics + +```python +# Calculate running totals +sales_with_running_total = sales.order_by('date').mutate( + running_total=sales.quantity.cumsum() +) + +# Window functions +sales_with_rank = sales.mutate( + price_rank=sales.price.rank().over( + ibis.window(order_by=sales.price, group_by=sales.date) + ) +) + +# Execute complex query +result = con.execute(sales_with_rank) +print(result) +``` + +## Integration with Beam Runners + +The Beam backend works with all Beam runners and supports configuration through SQL SET statements: + +- **DirectRunner**: For local development and testing +- **FlinkRunner**: For Apache Flink clusters +- **SparkRunner**: For Apache Spark clusters +- **DataflowRunner**: For Google Cloud Dataflow +- **PortableRunner**: For portable execution + +### Configuring Runners with SET Statements + +You can configure runners and their options using SQL SET statements: + +```python +import ibis +import apache_beam as beam + +# Create a pipeline +pipeline = beam.Pipeline() +con = ibis.beam.connect(pipeline) + +# Configure for DataflowRunner +con.raw_sql("SET runner = 'dataflow'") +con.raw_sql("SET dataflow.project = 'my-gcp-project'") +con.raw_sql("SET dataflow.region = 'us-central1'") +con.raw_sql("SET dataflow.staging_location = 'gs://my-bucket/staging'") +con.raw_sql("SET dataflow.temp_location = 'gs://my-bucket/temp'") +con.raw_sql("SET dataflow.num_workers = '5'") +con.raw_sql("SET dataflow.max_num_workers = '10'") +con.raw_sql("SET dataflow.machine_type = 'n1-standard-4'") +con.raw_sql("SET dataflow.use_public_ips = 'false'") +con.raw_sql("SET dataflow.enable_streaming_engine = 'true'") + +# Create a configured pipeline +configured_pipeline = con.create_configured_pipeline() + +# Use the configured pipeline for your job +with configured_pipeline as pipeline: + con = ibis.beam.connect(pipeline) + # ... your Ibis operations +``` + +### DataflowRunner Configuration Options + +The following Dataflow-specific options can be set using `SET dataflow.option = 'value'`: + +#### Required Options +- `project`: GCP project ID +- `staging_location`: GCS bucket for staging files +- `temp_location`: GCS bucket for temporary files + +#### Optional Options +- `region`: GCP region (default: us-central1) +- `service_account`: Service account email +- `network`: VPC network name +- `subnetwork`: VPC subnetwork name +- `use_public_ips`: Use public IPs (true/false) +- `num_workers`: Initial number of workers +- `max_num_workers`: Maximum number of workers +- `machine_type`: Machine type for workers +- `disk_size_gb`: Disk size in GB +- `disk_type`: Disk type (pd-standard, pd-ssd) +- `worker_machine_type`: Worker machine type +- `worker_disk_type`: Worker disk type +- `worker_disk_size_gb`: Worker disk size +- `autoscaling_algorithm`: Autoscaling algorithm (THROUGHPUT_BASED, NONE) +- `enable_streaming_engine`: Enable streaming engine (true/false) +- `flexrs_goal`: FlexRS goal (COST_OPTIMIZED, SPEED_OPTIMIZED) +- `dataflow_kms_key`: KMS key for encryption +- `labels`: Comma-separated key=value pairs + +### Other Runner Examples + +```python +# FlinkRunner configuration +con.raw_sql("SET runner = 'flink'") +con.raw_sql("SET pipeline.flink_master = 'localhost:8081'") + +# SparkRunner configuration +con.raw_sql("SET runner = 'spark'") +con.raw_sql("SET pipeline.spark_master = 'local[*]'") + +# DirectRunner (default) +con.raw_sql("SET runner = 'direct'") +``` + +### Pipeline Options + +You can also set general pipeline options: + +```python +# Streaming pipeline +con.raw_sql("SET pipeline.streaming = 'true'") + +# Save main session +con.raw_sql("SET pipeline.save_main_session = 'true'") + +# Setup file +con.raw_sql("SET pipeline.setup_file = '/path/to/setup.py'") + +# Job name +con.raw_sql("SET job_name = 'my-beam-job'") +``` + +## Upcoming Features + +The Beam backend is designed to be forward-compatible with upcoming Apache Beam features: + +### Apache Iceberg Catalog Support + +Based on [Apache Beam PR #36325](https://github.com/apache/beam/pull/36325), Beam SQL will soon support: + +- **CREATE CATALOG** statements for managing external data sources +- **Apache Iceberg** integration with full catalog support +- **Database management** within catalogs +- **Schema evolution** and time travel capabilities +- **ACID transactions** for data consistency + +### Current Implementation + +The current implementation provides: + +- ✅ **Placeholder support** for catalog and database operations +- ✅ **SQL statement parsing** for CREATE CATALOG and CREATE DATABASE +- ✅ **Configuration management** through SET statements +- ✅ **Forward compatibility** with upcoming Beam features + +### Migration Path + +When the full catalog support becomes available in Apache Beam: + +1. **No code changes required** - the API remains the same +2. **Enhanced functionality** - catalogs and databases will work with real backends +3. **Better performance** - native catalog integration +4. **Full Iceberg features** - schema evolution, time travel, etc. + +## Troubleshooting + +### Common Issues + +1. **Schema Errors**: Ensure schemas are properly defined for external tables +2. **Type Mismatches**: Check that data types are compatible with Beam SQL +3. **Pipeline Execution**: Make sure the pipeline is properly configured for your runner +4. **Catalog Not Found**: Ensure catalogs are created before use +5. **Database Not Found**: Verify database exists in the specified catalog + +### Debugging + +Enable verbose logging to debug issues: + +```python +import logging +logging.basicConfig(level=logging.DEBUG) + +# Your Ibis operations will show detailed SQL compilation +result = con.execute(query) +``` + +### Version Compatibility + +- **Apache Beam 2.50+**: Full catalog and database support +- **Apache Beam 2.40-2.49**: Placeholder implementation with forward compatibility +- **Earlier versions**: Basic SQL support only + +## Contributing + +The Beam backend is part of the Ibis project. Contributions are welcome! Please see the [contributing guide](https://ibis-project.org/contribute/) for more information. diff --git a/ibis/backends/beam/__init__.py b/ibis/backends/beam/__init__.py new file mode 100644 index 000000000000..fe142d3127a5 --- /dev/null +++ b/ibis/backends/beam/__init__.py @@ -0,0 +1,1391 @@ +from __future__ import annotations + +import itertools +import re +import zoneinfo +from typing import TYPE_CHECKING, Any + +import sqlglot as sg +import sqlglot.expressions as sge + +import ibis +import ibis.backends.sql.compilers as sc +import ibis.common.exceptions as exc +import ibis.expr.operations as ops +import ibis.expr.schema as sch +import ibis.expr.types as ir +from ibis import util +from ibis.backends import ( + CanCreateDatabase, + HasCurrentCatalog, + HasCurrentDatabase, + NoUrl, + PyArrowExampleLoader, + SupportsTempTables, +) +from ibis.backends.beam.ddl import ( + CreateDatabase, + CreateTableWithSchema, + DropDatabase, + DropTable, + DropView, + InsertSelect, + RenameTable, +) +from ibis.backends.sql import SQLBackend +from ibis.expr.operations.udf import InputType +from ibis.util import gen_name + +if TYPE_CHECKING: + from collections.abc import Mapping + from pathlib import Path + + import pandas as pd + import pyarrow as pa + import apache_beam as beam + from apache_beam.pipeline import Pipeline + from apache_beam.runners.runner import PipelineResult + + from ibis.expr.api import Watermark + +_INPUT_TYPE_TO_FUNC_TYPE = {InputType.PYTHON: "general", InputType.PANDAS: "pandas"} + + +class Backend( + SupportsTempTables, + SQLBackend, + CanCreateDatabase, + HasCurrentCatalog, + HasCurrentDatabase, + NoUrl, + PyArrowExampleLoader, +): + name = "beam" + compiler = sc.beam.compiler + supports_temporary_tables = True + supports_python_udfs = True + + def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: + """No-op.""" + + def do_connect(self, pipeline: Pipeline) -> None: + """Create a Beam `Backend` for use with Ibis. + + Parameters + ---------- + pipeline + A Beam pipeline. + + Examples + -------- + >>> import ibis + >>> import apache_beam as beam + >>> pipeline = beam.Pipeline() + >>> ibis.beam.connect(pipeline) # doctest: +ELLIPSIS + + """ + self._pipeline = pipeline + self._temp_tables = {} + self._pipeline_options = {} + self._runner_config = {} + + @util.experimental + @classmethod + def from_connection(cls, pipeline: Pipeline, /) -> Backend: + """Create a Beam `Backend` from an existing pipeline. + + Parameters + ---------- + pipeline + A Beam pipeline. + """ + return ibis.beam.connect(pipeline) + + def disconnect(self) -> None: + pass + + def raw_sql(self, query: str) -> PipelineResult: + """Execute raw SQL query using Beam SQL. + + Parameters + ---------- + query + SQL query string + + Returns + ------- + PipelineResult + Result of the pipeline execution + """ + import apache_beam as beam + from apache_beam.sql import SqlTransform + + # Check if this is a SET statement for configuration + if self._is_set_statement(query): + return self._handle_set_statement(query) + + # Check if this is a CREATE CATALOG statement + if self._is_create_catalog_statement(query): + return self._handle_create_catalog_statement(query) + + # Check if this is a CREATE DATABASE statement + if self._is_create_database_statement(query): + return self._handle_create_database_statement(query) + + # For regular SQL queries, create a transform from the SQL query + sql_transform = SqlTransform(query) + + # Apply the transform to the pipeline + result = self._pipeline | sql_transform + + return result + + def _is_set_statement(self, query: str) -> bool: + """Check if the query is a SET statement for configuration. + + Parameters + ---------- + query + SQL query string + + Returns + ------- + bool + True if this is a SET statement + """ + query_upper = query.strip().upper() + return query_upper.startswith('SET ') + + def _handle_set_statement(self, query: str) -> PipelineResult: + """Handle SET statements for pipeline configuration. + + Parameters + ---------- + query + SET statement string + + Returns + ------- + PipelineResult + Dummy result for SET statements + """ + import apache_beam as beam + import re + + # Parse SET statement: SET key = value + pattern = r'SET\s+(\w+)\s*=\s*(.+)' + match = re.match(pattern, query.strip(), re.IGNORECASE) + + if not match: + raise exc.IbisError(f"Invalid SET statement: {query}") + + key = match.group(1).lower() + value = match.group(2).strip().strip("'\"") + + # Handle different types of configuration + if key == 'runner': + self._configure_runner(value) + elif key == 'catalog': + # Set current catalog + self._current_catalog = value + elif key == 'database': + # Set current database + self._current_database = value + elif key.startswith('catalog.'): + # Catalog-specific options + option_key = key.replace('catalog.', '') + if not hasattr(self, '_catalog_options'): + self._catalog_options = {} + self._catalog_options[option_key] = value + elif key.startswith('pipeline.'): + # Pipeline options + option_key = key.replace('pipeline.', '') + self._pipeline_options[option_key] = value + elif key.startswith('dataflow.'): + # Dataflow-specific options + option_key = key.replace('dataflow.', '') + self._runner_config[option_key] = value + else: + # Generic configuration + self._pipeline_options[key] = value + + # Return a dummy result for SET statements + return beam.Create([{'status': 'SET', 'key': key, 'value': value}]) + + def _configure_runner(self, runner_name: str) -> None: + """Configure the Beam runner. + + Parameters + ---------- + runner_name + Name of the runner to configure + """ + import apache_beam as beam + + runner_name = runner_name.lower() + + if runner_name == 'dataflow': + self._runner_config['runner'] = 'DataflowRunner' + elif runner_name == 'flink': + self._runner_config['runner'] = 'FlinkRunner' + elif runner_name == 'spark': + self._runner_config['runner'] = 'SparkRunner' + elif runner_name == 'direct': + self._runner_config['runner'] = 'DirectRunner' + elif runner_name == 'portable': + self._runner_config['runner'] = 'PortableRunner' + else: + raise exc.IbisError(f"Unknown runner: {runner_name}") + + def get_pipeline_options(self) -> dict: + """Get current pipeline options. + + Returns + ------- + dict + Dictionary of pipeline options + """ + return self._pipeline_options.copy() + + def get_runner_config(self) -> dict: + """Get current runner configuration. + + Returns + ------- + dict + Dictionary of runner configuration + """ + return self._runner_config.copy() + + def list_catalogs(self, *, like: str | None = None) -> list[str]: + """List available catalogs. + + Parameters + ---------- + like + Pattern to filter catalog names + + Returns + ------- + list[str] + List of catalog names + """ + # With upcoming Apache Iceberg catalog support, we can list actual catalogs + try: + if hasattr(self._pipeline, 'list_catalogs'): + catalogs = self._pipeline.list_catalogs() + else: + # Fallback to default catalog + catalogs = ["default"] + except Exception: + catalogs = ["default"] + + return self._filter_with_like(catalogs, like) + + def create_catalog( + self, + name: str, + /, + *, + catalog_type: str = "iceberg", + properties: dict | None = None, + force: bool = False, + ) -> None: + """Create a new catalog. + + Parameters + ---------- + name + Name of the new catalog. + catalog_type + Type of catalog (e.g., 'iceberg', 'hive'). + properties + Properties of the catalog. + force + If `False`, an exception is raised if the catalog already exists. + """ + # With upcoming Apache Iceberg catalog support, we can create actual catalogs + try: + if hasattr(self._pipeline, 'create_catalog'): + self._pipeline.create_catalog(name, catalog_type=catalog_type, properties=properties) + else: + # For now, create a placeholder catalog entry + if not hasattr(self, '_catalogs'): + self._catalogs = {} + self._catalogs[name] = {'type': catalog_type, 'properties': properties or {}} + except Exception as e: + if not force: + raise exc.IbisError(f"Failed to create catalog '{name}': {e}") + + def drop_catalog( + self, name: str, /, *, force: bool = False + ) -> None: + """Drop a catalog. + + Parameters + ---------- + name + Name of the catalog to drop. + force + If `False`, an exception is raised if the catalog does not exist. + """ + try: + if hasattr(self._pipeline, 'drop_catalog'): + self._pipeline.drop_catalog(name) + else: + # For now, remove from placeholder catalogs + if hasattr(self, '_catalogs') and name in self._catalogs: + del self._catalogs[name] + except Exception as e: + if not force: + raise exc.IbisError(f"Failed to drop catalog '{name}': {e}") + + def _is_create_catalog_statement(self, query: str) -> bool: + """Check if the query is a CREATE CATALOG statement. + + Parameters + ---------- + query + SQL query string + + Returns + ------- + bool + True if this is a CREATE CATALOG statement + """ + query_upper = query.strip().upper() + return query_upper.startswith('CREATE CATALOG ') + + def _is_create_database_statement(self, query: str) -> bool: + """Check if the query is a CREATE DATABASE statement. + + Parameters + ---------- + query + SQL query string + + Returns + ------- + bool + True if this is a CREATE DATABASE statement + """ + query_upper = query.strip().upper() + return query_upper.startswith('CREATE DATABASE ') + + def _handle_create_catalog_statement(self, query: str) -> PipelineResult: + """Handle CREATE CATALOG statements. + + Parameters + ---------- + query + CREATE CATALOG statement string + + Returns + ------- + PipelineResult + Dummy result for CREATE CATALOG statements + """ + import apache_beam as beam + import re + + # Parse CREATE CATALOG statement + # Example: CREATE CATALOG my_catalog WITH (type = 'iceberg', warehouse = 's3://bucket/warehouse') + pattern = r'CREATE CATALOG\s+(\w+)(?:\s+WITH\s*\(([^)]+)\))?' + match = re.match(pattern, query.strip(), re.IGNORECASE) + + if not match: + raise exc.IbisError(f"Invalid CREATE CATALOG statement: {query}") + + catalog_name = match.group(1) + properties_str = match.group(2) if match.group(2) else "" + + # Parse properties + properties = {} + if properties_str: + # Simple property parsing: key = 'value', key2 = 'value2' + prop_pattern = r"(\w+)\s*=\s*'([^']+)'" + for prop_match in re.finditer(prop_pattern, properties_str): + key = prop_match.group(1) + value = prop_match.group(2) + properties[key] = value + + # Determine catalog type from properties + catalog_type = properties.get('type', 'iceberg') + + # Create the catalog + self.create_catalog(catalog_name, catalog_type=catalog_type, properties=properties) + + # Return a dummy result + return beam.Create([{'status': 'CREATE CATALOG', 'catalog': catalog_name, 'type': catalog_type}]) + + def _handle_create_database_statement(self, query: str) -> PipelineResult: + """Handle CREATE DATABASE statements. + + Parameters + ---------- + query + CREATE DATABASE statement string + + Returns + ------- + PipelineResult + Dummy result for CREATE DATABASE statements + """ + import apache_beam as beam + import re + + # Parse CREATE DATABASE statement + # Example: CREATE DATABASE my_database IN CATALOG my_catalog + pattern = r'CREATE DATABASE\s+(\w+)(?:\s+IN\s+CATALOG\s+(\w+))?(?:\s+WITH\s*\(([^)]+)\))?' + match = re.match(pattern, query.strip(), re.IGNORECASE) + + if not match: + raise exc.IbisError(f"Invalid CREATE DATABASE statement: {query}") + + database_name = match.group(1) + catalog_name = match.group(2) if match.group(2) else self.current_catalog + properties_str = match.group(3) if match.group(3) else "" + + # Parse properties + properties = {} + if properties_str: + # Simple property parsing: key = 'value', key2 = 'value2' + prop_pattern = r"(\w+)\s*=\s*'([^']+)'" + for prop_match in re.finditer(prop_pattern, properties_str): + key = prop_match.group(1) + value = prop_match.group(2) + properties[key] = value + + # Create the database + self.create_database(database_name, db_properties=properties, catalog=catalog_name) + + # Return a dummy result + return beam.Create([{'status': 'CREATE DATABASE', 'database': database_name, 'catalog': catalog_name}]) + + def create_configured_pipeline(self) -> Pipeline: + """Create a new pipeline with the configured options. + + Returns + ------- + Pipeline + Configured Beam pipeline + """ + import apache_beam as beam + from apache_beam.options.pipeline_options import PipelineOptions + + # Create pipeline options + options = PipelineOptions() + + # Apply pipeline options + for key, value in self._pipeline_options.items(): + setattr(options, key, value) + + # Apply runner configuration + if 'runner' in self._runner_config: + runner_name = self._runner_config['runner'] + if runner_name == 'DataflowRunner': + from apache_beam.options.pipeline_options import GoogleCloudOptions + options.view_as(GoogleCloudOptions) + elif runner_name == 'FlinkRunner': + from apache_beam.runners.flink.flink_runner import FlinkRunner + options.view_as(FlinkRunner) + elif runner_name == 'SparkRunner': + from apache_beam.runners.spark.spark_runner import SparkRunner + options.view_as(SparkRunner) + + # Apply Dataflow-specific options + if 'runner' in self._runner_config and self._runner_config['runner'] == 'DataflowRunner': + from apache_beam.options.pipeline_options import GoogleCloudOptions + gcp_options = options.view_as(GoogleCloudOptions) + + for key, value in self._runner_config.items(): + if key == 'runner': + continue + elif key == 'project': + gcp_options.project = value + elif key == 'region': + gcp_options.region = value + elif key == 'staging_location': + gcp_options.staging_location = value + elif key == 'temp_location': + gcp_options.temp_location = value + elif key == 'service_account': + gcp_options.service_account = value + elif key == 'network': + gcp_options.network = value + elif key == 'subnetwork': + gcp_options.subnetwork = value + elif key == 'use_public_ips': + gcp_options.use_public_ips = value.lower() == 'true' + elif key == 'num_workers': + gcp_options.num_workers = int(value) + elif key == 'max_num_workers': + gcp_options.max_num_workers = int(value) + elif key == 'machine_type': + gcp_options.machine_type = value + elif key == 'disk_size_gb': + gcp_options.disk_size_gb = int(value) + elif key == 'disk_type': + gcp_options.disk_type = value + elif key == 'worker_machine_type': + gcp_options.worker_machine_type = value + elif key == 'worker_disk_type': + gcp_options.worker_disk_type = value + elif key == 'worker_disk_size_gb': + gcp_options.worker_disk_size_gb = int(value) + elif key == 'autoscaling_algorithm': + gcp_options.autoscaling_algorithm = value + elif key == 'enable_streaming_engine': + gcp_options.enable_streaming_engine = value.lower() == 'true' + elif key == 'flexrs_goal': + gcp_options.flexrs_goal = value + elif key == 'dataflow_kms_key': + gcp_options.dataflow_kms_key = value + elif key == 'labels': + # Parse labels as key=value pairs + labels = {} + for label_pair in value.split(','): + if '=' in label_pair: + k, v = label_pair.split('=', 1) + labels[k.strip()] = v.strip() + gcp_options.labels = labels + + # Create and return the pipeline + return beam.Pipeline(options=options) + + def _get_schema_using_query(self, query: str) -> sch.Schema: + """Get schema from a SQL query. + + Parameters + ---------- + query + SQL query string + + Returns + ------- + sch.Schema + Schema of the query result + """ + # For Beam SQL, we need to analyze the query to determine the schema + # This is a simplified implementation - in practice, you'd need to + # parse the SQL and determine the output schema + import apache_beam as beam + from apache_beam.sql import SqlTransform + + # Create a temporary transform to analyze the schema + sql_transform = SqlTransform(query) + + # This is a placeholder - actual implementation would need to + # analyze the transform to determine output schema + # For now, return an empty schema + return sch.Schema({}) + + def list_databases(self, *, like: str | None = None) -> list[str]: + """List available databases. + + Parameters + ---------- + like + Pattern to filter database names + + Returns + ------- + list[str] + List of database names + """ + # With upcoming Apache Iceberg catalog support, we can now list actual databases + # For now, we'll use a placeholder implementation that will be enhanced + # when the catalog support is available in Beam + try: + # Try to use catalog functionality if available + if hasattr(self._pipeline, 'list_databases'): + databases = self._pipeline.list_databases() + else: + # Fallback to default database + databases = ["default"] + except Exception: + # If catalog functionality is not available yet, use default + databases = ["default"] + + return self._filter_with_like(databases, like) + + @property + def current_catalog(self) -> str: + """Get current catalog name.""" + return getattr(self, '_current_catalog', "default") + + @property + def current_database(self) -> str: + """Get current database name.""" + return getattr(self, '_current_database', "default") + + def create_database( + self, + name: str, + /, + *, + db_properties: dict | None = None, + catalog: str | None = None, + force: bool = False, + ) -> None: + """Create a new database. + + Parameters + ---------- + name + Name of the new database. + db_properties + Properties of the database. + catalog + Name of the catalog in which the new database will be created. + force + If `False`, an exception is raised if the database already exists. + """ + # With upcoming Apache Iceberg catalog support, we can create actual databases + try: + # Try to use catalog functionality if available + if hasattr(self._pipeline, 'create_database'): + self._pipeline.create_database(name, properties=db_properties, catalog=catalog) + else: + # For now, create a placeholder database entry + if not hasattr(self, '_databases'): + self._databases = set() + self._databases.add(name) + except Exception as e: + if not force: + raise exc.IbisError(f"Failed to create database '{name}': {e}") + # If force=True, silently ignore the error + + def drop_database( + self, name: str, /, *, catalog: str | None = None, force: bool = False + ) -> None: + """Drop a database. + + Parameters + ---------- + name + Database to drop. + catalog + Name of the catalog from which the database will be dropped. + force + If `False`, an exception is raised if the database does not exist. + """ + # Beam SQL doesn't support database dropping in the traditional sense + # This is a no-op for now + pass + + def list_tables( + self, + *, + like: str | None = None, + database: str | None = None, + catalog: str | None = None, + temp: bool = False, + ) -> list[str]: + """Return the list of table/view names. + + Parameters + ---------- + like + A pattern in Python's regex format. + temp + Whether to list temporary tables or permanent tables. + database + The database to list tables of, if not the current one. + catalog + The catalog to list tables of, if not the current one. + + Returns + ------- + list[str] + The list of the table/view names that match the pattern `like`. + """ + if temp: + tables = list(self._temp_tables.keys()) + else: + # Beam SQL doesn't have a registry of permanent tables + # This would need to be implemented based on your specific use case + tables = [] + + return self._filter_with_like(tables, like) + + def list_views( + self, + like: str | None = None, + temp: bool = False, + ) -> list[str]: + """Return the list of view names. + + Parameters + ---------- + like + A pattern in Python's regex format. + temp + Whether to list temporary views or permanent views. + + Returns + ------- + list[str] + The list of the view names that match the pattern `like`. + """ + # Beam SQL doesn't distinguish between tables and views in the same way + # as traditional databases + return self.list_tables(like=like, temp=temp) + + def table( + self, + name: str, + /, + *, + database: str | None = None, + catalog: str | None = None, + ) -> ir.Table: + """Return a table expression from a table or view in the database. + + Parameters + ---------- + name + Table name. + database + Database in which the table resides. + catalog + Catalog in which the table resides. + + Returns + ------- + Table + Table named `name` from `database` + """ + if database is not None and not isinstance(database, str): + raise exc.IbisTypeError( + f"`database` must be a string; got {type(database)}" + ) + + # Check if it's a temporary table + if name in self._temp_tables: + schema = self._temp_tables[name] + else: + # For permanent tables, we'd need to implement schema discovery + # This is a placeholder implementation + schema = sch.Schema({}) + + node = ops.DatabaseTable( + name, + schema=schema, + source=self, + namespace=ops.Namespace(catalog=catalog, database=database), + ) + return node.to_expr() + + def get_schema( + self, + table_name: str, + *, + catalog: str | None = None, + database: str | None = None, + ) -> sch.Schema: + """Return a Schema object for the indicated table and database. + + Parameters + ---------- + table_name + Table name. + catalog + Catalog name. + database + Database name. + + Returns + ------- + sch.Schema + Ibis schema + """ + # Check if it's a temporary table + if table_name in self._temp_tables: + return self._temp_tables[table_name] + + # For permanent tables, we'd need to implement schema discovery + # This is a placeholder implementation + return sch.Schema({}) + + @property + def version(self) -> str: + """Get Beam version.""" + import apache_beam as beam + return beam.__version__ + + def _register_udfs(self, expr: ir.Expr) -> None: + """Register UDFs for the expression.""" + for udf_node in expr.op().find(ops.ScalarUDF): + register_func = getattr( + self, f"_register_{udf_node.__input_type__.name.lower()}_udf" + ) + register_func(udf_node) + + def _register_udf(self, udf_node: ops.ScalarUDF): + """Register a UDF with Beam.""" + import apache_beam as beam + from ibis.backends.beam.datatypes import BeamType + + name = type(udf_node).__name__ + + # Register the UDF with Beam + # This is a simplified implementation + beam.udf.register_udf(name, udf_node.__func__) + + _register_pandas_udf = _register_udf + _register_python_udf = _register_udf + + def compile( + self, + expr: ir.Expr, + /, + *, + limit: str | None = None, + params: Mapping[ir.Expr, Any] | None = None, + pretty: bool = False, + **_: Any, + ) -> str: + """Compile an Ibis expression to Beam SQL.""" + return super().compile( + expr, params=params, pretty=pretty + ) # Discard `limit` and other kwargs + + def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: + """Register an in-memory table.""" + if null_columns := op.schema.null_fields: + raise exc.IbisTypeError( + f"{self.name} cannot yet reliably handle `null` typed columns; " + f"got null typed columns: {null_columns}" + ) + self.create_view(op.name, op.data.to_frame(), schema=op.schema, temp=True) + + def execute( + self, + expr: ir.Expr, + /, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + **kwargs: Any, + ) -> pd.DataFrame | pd.Series | Any: + """Execute an Ibis expression and return a pandas `DataFrame`, `Series`, or scalar. + + Parameters + ---------- + expr + Ibis expression to execute. + params + Mapping of scalar parameter expressions to value. + limit + An integer to effect a specific row limit. + kwargs + Keyword arguments + """ + self._run_pre_execute_hooks(expr) + + sql = self.compile(expr.as_table(), params=params, **kwargs) + + # Execute the SQL using Beam SQL + import apache_beam as beam + from apache_beam.sql import SqlTransform + + # Create a transform from the SQL + sql_transform = SqlTransform(sql) + + # Apply the transform and collect results + result = self._pipeline | sql_transform | beam.Map(lambda row: dict(row)) + + # Convert to pandas DataFrame + # This is a simplified implementation + df = pd.DataFrame(list(result)) + + return expr.__pandas_result__(df) + + def create_table( + self, + name: str, + /, + obj: pd.DataFrame | pa.Table | ir.Table | None = None, + *, + schema: sch.Schema | None = None, + database: str | None = None, + catalog: str | None = None, + tbl_properties: dict | None = None, + watermark: Watermark | None = None, + primary_key: str | list[str] | None = None, + temp: bool = False, + overwrite: bool = False, + ) -> ir.Table: + """Create a new table in Beam. + + Parameters + ---------- + name + Name of the new table. + obj + An Ibis table expression, pandas DataFrame, or PyArrow Table. + schema + The schema for the new table. + database + Name of the database where the table will be created. + catalog + Name of the catalog where the table will be created. + tbl_properties + Table properties. + watermark + Watermark strategy for the table. + primary_key + Primary key columns. + temp + Whether a table is temporary or not. + overwrite + Whether to clobber existing data. + + Returns + ------- + Table + The table that was created. + """ + import pandas as pd + import pyarrow as pa + import pyarrow_hotfix # noqa: F401 + + import ibis.expr.types as ir + + if obj is None and schema is None: + raise exc.IbisError("`schema` or `obj` is required") + + if overwrite: + if self.list_tables(like=name, temp=temp): + self.drop_table( + name, catalog=catalog, database=database, temp=temp, force=True + ) + + # Handle in-memory data + if obj is not None: + if not isinstance(obj, ir.Table): + obj = ibis.memtable(obj) + + op = obj.op() + if isinstance(op, ops.InMemoryTable): + dataframe = op.data.to_frame() + else: + raise exc.IbisError( + "`obj` is of type ibis.expr.types.Table but it is not in-memory. " + "Currently, only in-memory tables are supported." + ) + + return self.create_view( + name, + obj=dataframe, + schema=schema, + database=database, + catalog=catalog, + temp=temp, + overwrite=overwrite, + ) + + # Handle external data + else: # obj is None, schema is not None + if not tbl_properties: + raise exc.IbisError( + "`tbl_properties` is required when creating table with schema" + ) + + statement = CreateTableWithSchema( + table_name=name, + schema=schema, + tbl_properties=tbl_properties, + watermark=watermark, + primary_key=primary_key, + temporary=temp, + database=database, + catalog=catalog, + ) + sql = statement.compile() + self.raw_sql(sql) + + return self.table(name, database=database, catalog=catalog) + + def drop_table( + self, + name: str, + /, + *, + database: str | None = None, + catalog: str | None = None, + temp: bool = False, + force: bool = False, + ) -> None: + """Drop a table. + + Parameters + ---------- + name + Name of the table to drop. + database + Name of the database where the table exists. + catalog + Name of the catalog where the table exists. + temp + Whether the table is temporary or not. + force + If `False`, an exception is raised if the table does not exist. + """ + if temp and name in self._temp_tables: + del self._temp_tables[name] + else: + statement = DropTable( + table_name=name, + database=database, + catalog=catalog, + must_exist=not force, + temporary=temp, + ) + sql = statement.compile() + self.raw_sql(sql) + + def rename_table( + self, + old_name: str, + new_name: str, + force: bool = True, + ) -> None: + """Rename an existing table. + + Parameters + ---------- + old_name + The old name of the table. + new_name + The new name of the table. + force + If `False`, an exception is raised if the table does not exist. + """ + statement = RenameTable( + old_name=old_name, + new_name=new_name, + must_exist=not force, + ) + sql = statement.compile() + self.raw_sql(sql) + + def create_view( + self, + name: str, + /, + obj: pd.DataFrame | ir.Table, + *, + schema: sch.Schema | None = None, + database: str | None = None, + catalog: str | None = None, + force: bool = False, + temp: bool = False, + overwrite: bool = False, + ) -> ir.Table: + """Create a new view from a dataframe or table. + + Parameters + ---------- + name + Name of the new view. + obj + An Ibis table expression that will be used to create the view. + schema + The schema for the new view. + database + Name of the database where the view will be created. + catalog + Name of the catalog where the table exists. + force + If `False`, an exception is raised if the table is already present. + temp + Whether the table is temporary or not. + overwrite + If `True`, remove the existing view, and create a new one. + + Returns + ------- + Table + The view that was created. + """ + import pandas as pd + + if overwrite and self.list_views(like=name, temp=temp): + self.drop_view( + name, database=database, catalog=catalog, temp=temp, force=True + ) + + if isinstance(obj, pd.DataFrame): + # Store the schema for temporary tables + if temp: + if schema is None: + schema = sch.Schema.from_pandas(obj) + self._temp_tables[name] = schema + elif isinstance(obj, ir.Table): + query_expression = self.compile(obj) + stmt = sge.Create( + kind="VIEW", + this=sg.table( + name, db=database, catalog=catalog, quoted=self.compiler.quoted + ), + expression=query_expression, + exists=force, + properties=sge.Properties(expressions=[sge.TemporaryProperty()]) + if temp + else None, + ) + self.raw_sql(stmt.sql(self.name)) + else: + raise exc.IbisError(f"Unsupported `obj` type: {type(obj)}") + + return self.table(name, database=database, catalog=catalog) + + def drop_view( + self, + name: str, + /, + *, + database: str | None = None, + catalog: str | None = None, + temp: bool = False, + force: bool = False, + ) -> None: + """Drop a view. + + Parameters + ---------- + name + Name of the view to drop. + database + Name of the database where the view exists. + catalog + Name of the catalog where the view exists. + temp + Whether the view is temporary or not. + force + If `False`, an exception is raised if the view does not exist. + """ + if temp and name in self._temp_tables: + del self._temp_tables[name] + else: + statement = DropView( + name=name, + database=database, + catalog=catalog, + must_exist=(not force), + temporary=temp, + ) + sql = statement.compile() + self.raw_sql(sql) + + def _read_file( + self, + file_type: str, + path: str | Path, + schema: sch.Schema | None = None, + table_name: str | None = None, + ) -> ir.Table: + """Register a file as a table in the current database. + + Parameters + ---------- + file_type + File type, e.g., parquet, csv, json. + path + The data source. + schema + The schema for the new table. + table_name + An optional name to use for the created table. + + Returns + ------- + ir.Table + The just-registered table + """ + if schema is None: + raise ValueError( + f"`schema` must be explicitly provided when calling `read_{file_type}`" + ) + + table_name = table_name or gen_name(f"read_{file_type}") + tbl_properties = { + "connector": "filesystem", + "path": path, + "format": file_type, + } + + return self.create_table( + table_name, schema=schema, tbl_properties=tbl_properties + ) + + def read_parquet( + self, + path: str | Path, + /, + *, + schema: sch.Schema | None = None, + table_name: str | None = None, + ) -> ir.Table: + """Register a parquet file as a table in the current database.""" + return self._read_file( + file_type="parquet", path=path, schema=schema, table_name=table_name + ) + + def read_csv( + self, + path: str | Path, + /, + *, + schema: sch.Schema | None = None, + table_name: str | None = None, + ) -> ir.Table: + """Register a csv file as a table in the current database.""" + return self._read_file( + file_type="csv", path=path, schema=schema, table_name=table_name + ) + + def read_json( + self, + path: str | Path, + /, + *, + schema: sch.Schema | None = None, + table_name: str | None = None, + ) -> ir.Table: + """Register a json file as a table in the current database.""" + return self._read_file( + file_type="json", path=path, schema=schema, table_name=table_name + ) + + def insert( + self, + name: str, + /, + obj: pa.Table | pd.DataFrame | ir.Table | list | dict, + *, + database: str | None = None, + catalog: str | None = None, + overwrite: bool = False, + ) -> PipelineResult: + """Insert data into a table. + + Parameters + ---------- + name + The name of the table to insert data into. + obj + The source data or expression to insert. + database + Name of the attached database that the table is located in. + catalog + Name of the attached catalog that the table is located in. + overwrite + If `True` then replace existing contents of table. + + Returns + ------- + PipelineResult + The pipeline result. + """ + import pandas as pd + import pyarrow as pa + import pyarrow_hotfix # noqa: F401 + + if isinstance(obj, ir.Table): + statement = InsertSelect( + name, + self.compile(obj), + database=database, + catalog=catalog, + overwrite=overwrite, + ) + return self.raw_sql(statement.compile()) + + identifier = sg.table( + name, db=database, catalog=catalog, quoted=self.compiler.quoted + ).sql(self.dialect) + + if isinstance(obj, pa.Table): + obj = obj.to_pandas() + if isinstance(obj, dict): + obj = pd.DataFrame.from_dict(obj) + if isinstance(obj, pd.DataFrame): + # Convert DataFrame to Beam PCollection and insert + import apache_beam as beam + + # Create a PCollection from the DataFrame + pcoll = self._pipeline | beam.Create(obj.to_dict('records')) + + # Apply insert transform + result = pcoll | beam.io.WriteToText(identifier) + return result + + if isinstance(obj, list): + # Convert list to Beam PCollection and insert + import apache_beam as beam + + pcoll = self._pipeline | beam.Create(obj) + result = pcoll | beam.io.WriteToText(identifier) + return result + + raise ValueError( + "No operation is being performed. Either the obj parameter " + "is not a pandas DataFrame or is not a ibis Table." + f"The given obj is of type {type(obj).__name__} ." + ) + + def to_pyarrow( + self, + expr: ir.Expr, + /, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + **kwargs: Any, + ) -> pa.Table: + """Convert expression result to PyArrow Table.""" + import pyarrow as pa + import pyarrow_hotfix # noqa: F401 + + pyarrow_batches = iter( + self.to_pyarrow_batches(expr, params=params, limit=limit, **kwargs) + ) + + first_batch = next(pyarrow_batches, None) + + if first_batch is None: + pa_table = expr.as_table().schema().to_pyarrow().empty_table() + else: + pa_table = pa.Table.from_batches( + itertools.chain((first_batch,), pyarrow_batches) + ) + return expr.__pyarrow_result__(pa_table) + + def to_pyarrow_batches( + self, + expr: ir.Table, + /, + *, + params: Mapping[ir.Scalar, Any] | None = None, + chunk_size: int | None = None, + limit: int | str | None = None, + **kwargs: Any, + ): + """Convert expression result to PyArrow batches.""" + import pyarrow as pa + import pyarrow_hotfix # noqa: F401 + + ibis_table = expr.as_table() + + # Execute the expression and convert to PyArrow + df = self.execute(ibis_table, limit=limit, **kwargs) + if limit: + df = df.head(limit) + + ibis_schema = ibis_table.schema() + arrow_schema = ibis_schema.to_pyarrow() + arrow_table = pa.Table.from_pandas(df, schema=arrow_schema) + return arrow_table.to_reader() diff --git a/ibis/backends/beam/datatypes.py b/ibis/backends/beam/datatypes.py new file mode 100644 index 000000000000..6ee147f28881 --- /dev/null +++ b/ibis/backends/beam/datatypes.py @@ -0,0 +1,181 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import ibis.expr.datatypes as dt +import ibis.expr.schema as sch +from ibis.formats import SchemaMapper, TypeMapper + +if TYPE_CHECKING: + pass + + +class BeamRowSchema(SchemaMapper): + """Schema mapper for Beam row types.""" + + @classmethod + def from_ibis(cls, schema: sch.Schema | None) -> dict: + """Convert Ibis schema to Beam row schema representation. + + Parameters + ---------- + schema + Ibis schema to convert + + Returns + ------- + dict + Beam row schema representation + """ + if schema is None: + return None + + return { + name: BeamType.from_ibis(dtype) + for name, dtype in schema.fields.items() + } + + +class BeamType(TypeMapper): + """Type mapper for Beam SQL types.""" + + @classmethod + def to_ibis(cls, typ: str) -> dt.DataType: + """Convert a Beam type string to an Ibis type. + + Parameters + ---------- + typ + Beam type string + + Returns + ------- + dt.DataType + Corresponding Ibis type + """ + # Map Beam SQL types to Ibis types + # This is a simplified mapping - actual implementation would need + # to handle more complex types and nullable information + + typ_lower = typ.lower() + + if typ_lower in ("string", "varchar", "char"): + return dt.String() + elif typ_lower in ("boolean", "bool"): + return dt.Boolean() + elif typ_lower in ("tinyint", "int8"): + return dt.Int8() + elif typ_lower in ("smallint", "int16"): + return dt.Int16() + elif typ_lower in ("integer", "int", "int32"): + return dt.Int32() + elif typ_lower in ("bigint", "int64"): + return dt.Int64() + elif typ_lower in ("float", "real", "float32"): + return dt.Float32() + elif typ_lower in ("double", "float64"): + return dt.Float64() + elif typ_lower in ("date"): + return dt.Date() + elif typ_lower in ("time"): + return dt.Time() + elif typ_lower.startswith("timestamp"): + # Extract precision if present + if "(" in typ_lower: + precision = int(typ_lower.split("(")[1].split(")")[0]) + else: + precision = 6 # default precision + return dt.Timestamp(scale=precision) + elif typ_lower.startswith("array"): + # Handle array types - this is simplified + return dt.Array(dt.String()) + elif typ_lower.startswith("map"): + # Handle map types - this is simplified + return dt.Map(dt.String(), dt.String()) + elif typ_lower.startswith("row"): + # Handle struct/row types - this is simplified + return dt.Struct({}) + else: + return super().to_ibis(typ) + + @classmethod + def from_ibis(cls, dtype: dt.DataType) -> str: + """Convert an Ibis type to a Beam type string. + + Parameters + ---------- + dtype + Ibis data type + + Returns + ------- + str + Beam type string + """ + nullable = dtype.nullable + + if dtype.is_string(): + return "VARCHAR" + elif dtype.is_boolean(): + return "BOOLEAN" + elif dtype.is_int8(): + return "TINYINT" + elif dtype.is_int16(): + return "SMALLINT" + elif dtype.is_int32(): + return "INTEGER" + elif dtype.is_int64(): + return "BIGINT" + elif dtype.is_uint8(): + return "TINYINT" + elif dtype.is_uint16(): + return "SMALLINT" + elif dtype.is_uint32(): + return "INTEGER" + elif dtype.is_uint64(): + return "BIGINT" + elif dtype.is_float16(): + return "FLOAT" + elif dtype.is_float32(): + return "FLOAT" + elif dtype.is_float64(): + return "DOUBLE" + elif dtype.is_date(): + return "DATE" + elif dtype.is_time(): + return "TIME" + elif dtype.is_timestamp(): + # Include precision if specified + precision = dtype.scale if dtype.scale is not None else 6 + return f"TIMESTAMP({precision})" + elif dtype.is_array(): + element_type = cls.from_ibis(dtype.value_type) + return f"ARRAY<{element_type}>" + elif dtype.is_map(): + key_type = cls.from_ibis(dtype.key_type) + value_type = cls.from_ibis(dtype.value_type) + return f"MAP<{key_type}, {value_type}>" + elif dtype.is_struct(): + fields = [] + for name, field_type in dtype.items(): + field_type_str = cls.from_ibis(field_type) + fields.append(f"{name} {field_type_str}") + return f"ROW({', '.join(fields)})" + else: + return super().from_ibis(dtype) + + @classmethod + def to_string(cls, dtype: dt.DataType) -> str: + """Convert Ibis type to string representation. + + Parameters + ---------- + dtype + Ibis data type + + Returns + ------- + str + String representation of the type + """ + return cls.from_ibis(dtype) diff --git a/ibis/backends/beam/ddl.py b/ibis/backends/beam/ddl.py new file mode 100644 index 000000000000..d61cbd65c845 --- /dev/null +++ b/ibis/backends/beam/ddl.py @@ -0,0 +1,366 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import sqlglot as sg + +import ibis.common.exceptions as exc +import ibis.expr.schema as sch +from ibis.backends.sql.datatypes import BeamType +from ibis.backends.sql.ddl import DDL, DML, CreateDDL, DropObject +from ibis.util import promote_list + +if TYPE_CHECKING: + from collections.abc import Sequence + + from ibis.expr.api import Watermark + + +class BeamBase: + """Base class for Beam SQL DDL operations.""" + + dialect = "beam" + + def format_dtype(self, dtype): + """Format data type for Beam SQL.""" + sql_string = BeamType.from_ibis(dtype) + if dtype.is_timestamp(): + return ( + f"TIMESTAMP({dtype.scale})" if dtype.scale is not None else "TIMESTAMP" + ) + else: + return sql_string + " NOT NULL" * (not dtype.nullable) + + def format_properties(self, props): + """Format table properties for Beam SQL.""" + tokens = [] + for k, v in sorted(props.items()): + tokens.append(f" '{k}'='{v}'") + return "(\n{}\n)".format(",\n".join(tokens)) + + def format_watermark_strategy(self, watermark: Watermark) -> str: + """Format watermark strategy for Beam SQL.""" + if watermark.allowed_delay is None: + return watermark.time_col + return f"{watermark.time_col} - INTERVAL '{watermark.allowed_delay}'" + + def format_schema_with_watermark( + self, + schema: sch.Schema, + watermark: Watermark | None = None, + primary_keys: Sequence[str] | None = None, + ) -> str: + """Format schema with watermark and primary keys.""" + elements = [ + f"{self.quote(name)} {self.format_dtype(t)}" + for name, t in zip(schema.names, schema.types) + ] + + if watermark is not None: + elements.append( + f"WATERMARK FOR {watermark.time_col} AS {self.format_watermark_strategy(watermark)}" + ) + + if primary_keys is not None and primary_keys: + # Beam SQL primary key support + comma_separated_keys = ", ".join(f"`{key}`" for key in primary_keys) + elements.append(f"PRIMARY KEY ({comma_separated_keys})") + + return "({})".format(",\n ".join(elements)) + + +class CreateTableWithSchema(BeamBase, CreateDDL): + """Create table with schema DDL for Beam SQL.""" + + def __init__( + self, + table_name: str, + schema: sch.Schema, + database=None, + catalog=None, + can_exist=False, + external=False, + partition=None, + primary_key: str | Sequence[str] | None = None, + tbl_properties=None, + temporary: bool = False, + watermark: Watermark | None = None, + ): + self.can_exist = can_exist + self.catalog = catalog + self.database = database + self.partition = partition + self.primary_keys = promote_list(primary_key) + self.schema = schema + self.table_name = table_name + self.tbl_properties = tbl_properties + self.temporary = temporary + self.watermark = watermark + + # Check if `primary_keys` is a subset of the columns in `schema`. + if self.primary_keys and not set(self.primary_keys) <= set(schema.names): + raise exc.IbisError( + "`primary_key` must be a subset of the columns in `schema`. \n" + f"\t primary_key= {primary_key} \n" + f"\t schema.names= {schema.names}" + ) + + @property + def _prefix(self) -> str: + """Get the CREATE TABLE prefix.""" + modifier = " TEMPORARY" if self.temporary else "" + return f"CREATE{modifier} TABLE" + + def _create_line(self) -> str: + """Get the CREATE TABLE line.""" + scoped_name = self.scoped_name(self.table_name, self.database, self.catalog) + return f"{self._prefix} {self._if_exists()}{scoped_name}" + + @property + def _pieces(self): + """Get the table definition pieces.""" + if self.partition is not None: + main_schema = self.schema + part_schema = self.partition + if not isinstance(part_schema, sch.Schema): + part_fields = {name: self.schema[name] for name in part_schema} + part_schema = sch.Schema(part_fields) + + to_delete = {name for name in self.partition if name in self.schema} + fields = { + name: dtype + for name, dtype in main_schema.items() + if name not in to_delete + } + main_schema = sch.Schema(fields) + + yield self.format_schema_with_watermark( + main_schema, self.watermark, self.primary_keys + ) + yield f"PARTITIONED BY {self.format_schema(part_schema)}" + else: + yield self.format_schema_with_watermark( + self.schema, self.watermark, self.primary_keys + ) + + if self.tbl_properties: + yield f"WITH {self.format_properties(self.tbl_properties)}" + + @property + def pieces(self): + """Get all DDL pieces.""" + yield self._create_line() + yield from filter(None, self._pieces) + + def compile(self): + """Compile the DDL statement.""" + return "\n".join(self.pieces) + + +class CreateView(BeamBase, CreateDDL): + """Create view DDL for Beam SQL.""" + + def __init__( + self, + name: str, + query_expression: str, + database: str | None = None, + catalog: str | None = None, + can_exist: bool = False, + temporary: bool = False, + ): + super().__init__( + table_name=name, + database=database, + can_exist=can_exist, + ) + self.name = name + self.query_expression = query_expression + self.catalog = catalog + self.temporary = temporary + + @property + def _prefix(self): + """Get the CREATE VIEW prefix.""" + if self.temporary: + return "CREATE TEMPORARY VIEW" + else: + return "CREATE VIEW" + + def _create_line(self): + """Get the CREATE VIEW line.""" + scoped_name = self.scoped_name(self.name, self.database, self.catalog) + return f"{self._prefix} {self._if_exists()}{scoped_name}" + + @property + def pieces(self): + """Get all DDL pieces.""" + yield self._create_line() + yield f"AS {self.query_expression}" + + def compile(self): + """Compile the DDL statement.""" + return "\n".join(self.pieces) + + +class DropTable(BeamBase, DropObject): + """Drop table DDL for Beam SQL.""" + + _object_type = "TABLE" + + def __init__( + self, + table_name: str, + database: str | None = None, + catalog: str | None = None, + must_exist: bool = True, + temporary: bool = False, + ): + super().__init__(must_exist=must_exist) + self.table_name = table_name + self.database = database + self.catalog = catalog + self.temporary = temporary + + def _object_name(self): + """Get the object name.""" + return self.scoped_name(self.table_name, self.database, self.catalog) + + def compile(self): + """Compile the DDL statement.""" + temporary = "TEMPORARY " if self.temporary else "" + if_exists = "" if self.must_exist else "IF EXISTS " + object_name = self._object_name() + return f"DROP {temporary}{self._object_type} {if_exists}{object_name}" + + +class DropView(DropTable): + """Drop view DDL for Beam SQL.""" + + _object_type = "VIEW" + + def __init__( + self, + name: str, + database: str | None = None, + catalog: str | None = None, + must_exist: bool = True, + temporary: bool = False, + ): + super().__init__( + table_name=name, + database=database, + catalog=catalog, + must_exist=must_exist, + temporary=temporary, + ) + + +class RenameTable(BeamBase, DDL): + """Rename table DDL for Beam SQL.""" + + def __init__(self, old_name: str, new_name: str, must_exist: bool = True): + self.old_name = old_name + self.new_name = new_name + self.must_exist = must_exist + + def compile(self): + """Compile the DDL statement.""" + if_exists = "" if self.must_exist else "IF EXISTS" + return f"ALTER TABLE {if_exists} {self.old_name} RENAME TO {self.new_name}" + + +class _DatabaseObject: + """Base class for database objects.""" + + def _object_name(self): + """Get the object name.""" + name = sg.to_identifier(self.name, quoted=True).sql(dialect=self.dialect) + if self.catalog: + catalog = sg.to_identifier(self.catalog, quoted=True).sql( + dialect=self.dialect + ) + return f"{catalog}.{name}" + else: + return name + + +class CreateDatabase(BeamBase, _DatabaseObject, CreateDDL): + """Create database DDL for Beam SQL.""" + + def __init__( + self, + name: str, + db_properties: dict | None, + catalog: str | None = None, + can_exist: bool = False, + ): + self.name = name + self.db_properties = db_properties + self.catalog = catalog + self.can_exist = can_exist + + def _format_db_properties(self) -> str: + """Format database properties.""" + return ( + f"WITH {self.format_properties(self.db_properties)}" + if self.db_properties + else "" + ) + + def compile(self): + """Compile the DDL statement.""" + create_decl = "CREATE DATABASE" + create_line = f"{create_decl} {self._if_exists()}{self._object_name()}" + + return f"{create_line}\n{self._format_db_properties()}" + + +class DropDatabase(BeamBase, _DatabaseObject, DropObject): + """Drop database DDL for Beam SQL.""" + + _object_type = "DATABASE" + + def __init__(self, name: str, catalog: str | None = None, must_exist: bool = True): + super().__init__(must_exist=must_exist) + self.name = name + self.catalog = catalog + + +class InsertSelect(BeamBase, DML): + """Insert select DML for Beam SQL.""" + + def __init__( + self, + table_name, + select_expr, + database: str | None = None, + catalog: str | None = None, + partition=None, + partition_schema=None, + overwrite=False, + ): + self.table_name = table_name + self.database = database + self.catalog = catalog + self.select = select_expr + self.partition = partition + self.partition_schema = partition_schema + self.overwrite = overwrite + + def compile(self): + """Compile the DML statement.""" + if self.overwrite: + cmd = "INSERT OVERWRITE" + else: + cmd = "INSERT INTO" + + if self.partition is not None: + part = self.format_partition(self.partition, self.partition_schema) + partition = f" {part} " + else: + partition = "" + + select_query = self.select + scoped_name = self.scoped_name(self.table_name, self.database, self.catalog) + return f"{cmd} {scoped_name}{partition}\n{select_query}" diff --git a/ibis/backends/beam/tests/__init__.py b/ibis/backends/beam/tests/__init__.py new file mode 100644 index 000000000000..ef4990e73ba4 --- /dev/null +++ b/ibis/backends/beam/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for the Beam backend.""" diff --git a/ibis/backends/beam/tests/conftest.py b/ibis/backends/beam/tests/conftest.py new file mode 100644 index 000000000000..e0292e0df36e --- /dev/null +++ b/ibis/backends/beam/tests/conftest.py @@ -0,0 +1,66 @@ +"""Configuration for Beam backend tests.""" + +import pytest + +import ibis +import ibis.expr.types as ir +from ibis.backends.beam import Backend + + +@pytest.fixture +def beam_backend(): + """Create a Beam backend for testing.""" + import apache_beam as beam + + # Create a test pipeline + pipeline = beam.Pipeline() + + # Create the backend + backend = Backend() + backend.do_connect(pipeline) + + return backend + + +@pytest.fixture +def simple_table(beam_backend): + """Create a simple test table.""" + import pandas as pd + + data = pd.DataFrame({ + 'id': [1, 2, 3, 4, 5], + 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'], + 'age': [25, 30, 35, 40, 45], + 'salary': [50000, 60000, 70000, 80000, 90000] + }) + + # Create a temporary view + table = beam_backend.create_view( + 'employees', + data, + temp=True + ) + + return table + + +@pytest.fixture +def empty_table(beam_backend): + """Create an empty test table.""" + import pandas as pd + + data = pd.DataFrame({ + 'id': [], + 'name': [], + 'age': [], + 'salary': [] + }) + + # Create a temporary view + table = beam_backend.create_view( + 'empty_employees', + data, + temp=True + ) + + return table diff --git a/ibis/backends/beam/tests/test_catalog_database.py b/ibis/backends/beam/tests/test_catalog_database.py new file mode 100644 index 000000000000..a7fc9c81c520 --- /dev/null +++ b/ibis/backends/beam/tests/test_catalog_database.py @@ -0,0 +1,297 @@ +"""Tests for catalog and database functionality in Beam backend.""" + +import pytest + +import ibis +from ibis.backends.beam import Backend + + +def test_create_catalog_sql(): + """Test creating catalogs using SQL CREATE CATALOG statements.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Test basic catalog creation + result = backend.raw_sql("CREATE CATALOG my_catalog") + assert result is not None + + # Test catalog creation with properties + result = backend.raw_sql("CREATE CATALOG iceberg_catalog WITH (type = 'iceberg', warehouse = 's3://my-bucket/warehouse')") + assert result is not None + + # Test catalog creation with multiple properties + result = backend.raw_sql("CREATE CATALOG hive_catalog WITH (type = 'hive', metastore_uri = 'thrift://localhost:9083', warehouse = 'hdfs://localhost:9000/warehouse')") + assert result is not None + + +def test_create_database_sql(): + """Test creating databases using SQL CREATE DATABASE statements.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Test basic database creation + result = backend.raw_sql("CREATE DATABASE my_database") + assert result is not None + + # Test database creation in specific catalog + result = backend.raw_sql("CREATE DATABASE my_database IN CATALOG my_catalog") + assert result is not None + + # Test database creation with properties + result = backend.raw_sql("CREATE DATABASE analytics_db WITH (location = 's3://my-bucket/analytics')") + assert result is not None + + +def test_catalog_management(): + """Test catalog management operations.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Create catalogs + backend.create_catalog("iceberg_catalog", catalog_type="iceberg", properties={"warehouse": "s3://bucket/warehouse"}) + backend.create_catalog("hive_catalog", catalog_type="hive", properties={"metastore_uri": "thrift://localhost:9083"}) + + # List catalogs + catalogs = backend.list_catalogs() + assert "iceberg_catalog" in catalogs + assert "hive_catalog" in catalogs + + # Drop catalog + backend.drop_catalog("hive_catalog") + catalogs = backend.list_catalogs() + assert "hive_catalog" not in catalogs + assert "iceberg_catalog" in catalogs + + +def test_database_management(): + """Test database management operations.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Create databases + backend.create_database("analytics_db", db_properties={"location": "s3://bucket/analytics"}) + backend.create_database("staging_db", catalog="my_catalog") + + # List databases + databases = backend.list_databases() + assert "analytics_db" in databases + assert "staging_db" in databases + + # Drop database + backend.drop_database("staging_db") + databases = backend.list_databases() + assert "staging_db" not in databases + assert "analytics_db" in databases + + +def test_set_catalog_database(): + """Test setting current catalog and database using SET statements.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Set current catalog + backend.raw_sql("SET catalog = 'my_iceberg_catalog'") + assert backend.current_catalog == "my_iceberg_catalog" + + # Set current database + backend.raw_sql("SET database = 'analytics'") + assert backend.current_database == "analytics" + + # Set catalog-specific options + backend.raw_sql("SET catalog.warehouse = 's3://my-bucket/warehouse'") + backend.raw_sql("SET catalog.type = 'iceberg'") + + +def test_iceberg_catalog_setup(): + """Test complete Iceberg catalog setup.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Create Iceberg catalog + backend.raw_sql("CREATE CATALOG iceberg_catalog WITH (type = 'iceberg', warehouse = 's3://my-bucket/warehouse')") + + # Set as current catalog + backend.raw_sql("SET catalog = 'iceberg_catalog'") + + # Create database in the catalog + backend.raw_sql("CREATE DATABASE analytics IN CATALOG iceberg_catalog") + + # Set as current database + backend.raw_sql("SET database = 'analytics'") + + # Verify configuration + assert backend.current_catalog == "iceberg_catalog" + assert backend.current_database == "analytics" + + +def test_hive_catalog_setup(): + """Test complete Hive catalog setup.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Create Hive catalog + backend.raw_sql("CREATE CATALOG hive_catalog WITH (type = 'hive', metastore_uri = 'thrift://localhost:9083', warehouse = 'hdfs://localhost:9000/warehouse')") + + # Set as current catalog + backend.raw_sql("SET catalog = 'hive_catalog'") + + # Create database in the catalog + backend.raw_sql("CREATE DATABASE staging IN CATALOG hive_catalog") + + # Set as current database + backend.raw_sql("SET database = 'staging'") + + # Verify configuration + assert backend.current_catalog == "hive_catalog" + assert backend.current_database == "staging" + + +def test_catalog_with_dataflow(): + """Test catalog setup with DataflowRunner configuration.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Configure DataflowRunner + backend.raw_sql("SET runner = 'dataflow'") + backend.raw_sql("SET dataflow.project = 'my-gcp-project'") + backend.raw_sql("SET dataflow.region = 'us-central1'") + backend.raw_sql("SET dataflow.staging_location = 'gs://my-bucket/staging'") + backend.raw_sql("SET dataflow.temp_location = 'gs://my-bucket/temp'") + + # Create Iceberg catalog + backend.raw_sql("CREATE CATALOG iceberg_catalog WITH (type = 'iceberg', warehouse = 'gs://my-bucket/warehouse')") + + # Set catalog and database + backend.raw_sql("SET catalog = 'iceberg_catalog'") + backend.raw_sql("SET database = 'analytics'") + + # Create database + backend.raw_sql("CREATE DATABASE analytics IN CATALOG iceberg_catalog") + + # Verify configuration + runner_config = backend.get_runner_config() + assert runner_config['runner'] == 'DataflowRunner' + assert runner_config['project'] == 'my-gcp-project' + assert backend.current_catalog == "iceberg_catalog" + assert backend.current_database == "analytics" + + +def test_catalog_properties_parsing(): + """Test parsing of catalog properties from SQL.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Test complex properties + result = backend.raw_sql("CREATE CATALOG complex_catalog WITH (type = 'iceberg', warehouse = 's3://bucket/warehouse', s3_endpoint = 'https://s3.amazonaws.com', s3_access_key = 'AKIAIOSFODNN7EXAMPLE', s3_secret_key = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY')") + assert result is not None + + # Test with spaces in values + result = backend.raw_sql("CREATE CATALOG spaced_catalog WITH (type = 'hive', warehouse = 'hdfs://localhost:9000/warehouse', description = 'My Hive Catalog')") + assert result is not None + + +def test_database_properties_parsing(): + """Test parsing of database properties from SQL.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Test database with properties + result = backend.raw_sql("CREATE DATABASE props_db WITH (location = 's3://bucket/databases/props_db', comment = 'Database with properties')") + assert result is not None + + # Test database in catalog with properties + result = backend.raw_sql("CREATE DATABASE catalog_db IN CATALOG my_catalog WITH (location = 's3://bucket/catalog_db')") + assert result is not None + + +def test_invalid_sql_statements(): + """Test error handling for invalid SQL statements.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Test invalid CREATE CATALOG + with pytest.raises(Exception): + backend.raw_sql("CREATE CATALOG") + + # Test invalid CREATE DATABASE + with pytest.raises(Exception): + backend.raw_sql("CREATE DATABASE") + + # Test malformed properties + with pytest.raises(Exception): + backend.raw_sql("CREATE CATALOG bad_catalog WITH (type = iceberg)") # Missing quotes + + +def test_catalog_database_integration(): + """Test integration between catalog and database operations.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Create multiple catalogs + backend.raw_sql("CREATE CATALOG prod_catalog WITH (type = 'iceberg', warehouse = 's3://prod-bucket/warehouse')") + backend.raw_sql("CREATE CATALOG dev_catalog WITH (type = 'iceberg', warehouse = 's3://dev-bucket/warehouse')") + + # Create databases in different catalogs + backend.raw_sql("CREATE DATABASE analytics IN CATALOG prod_catalog") + backend.raw_sql("CREATE DATABASE staging IN CATALOG prod_catalog") + backend.raw_sql("CREATE DATABASE test IN CATALOG dev_catalog") + + # List all catalogs and databases + catalogs = backend.list_catalogs() + databases = backend.list_databases() + + assert "prod_catalog" in catalogs + assert "dev_catalog" in catalogs + assert "analytics" in databases + assert "staging" in databases + assert "test" in databases + + # Switch between catalogs and databases + backend.raw_sql("SET catalog = 'dev_catalog'") + backend.raw_sql("SET database = 'test'") + + assert backend.current_catalog == "dev_catalog" + assert backend.current_database == "test" + + # Switch back to production + backend.raw_sql("SET catalog = 'prod_catalog'") + backend.raw_sql("SET database = 'analytics'") + + assert backend.current_catalog == "prod_catalog" + assert backend.current_database == "analytics" diff --git a/ibis/backends/beam/tests/test_compiler.py b/ibis/backends/beam/tests/test_compiler.py new file mode 100644 index 000000000000..baf0f6baa8d2 --- /dev/null +++ b/ibis/backends/beam/tests/test_compiler.py @@ -0,0 +1,127 @@ +"""Tests for the Beam SQL compiler.""" + +import pytest + +import ibis +from ibis.backends.beam import Backend + + +def test_beam_compiler_basic(): + """Test basic Beam SQL compilation.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Test basic SQL compilation + expr = ibis.literal(1) + sql = backend.compile(expr) + assert sql is not None + + +def test_beam_compiler_aggregation(): + """Test Beam SQL aggregation compilation.""" + import apache_beam as beam + import pandas as pd + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Create test data + data = pd.DataFrame({ + 'id': [1, 2, 3, 4, 5], + 'value': [10, 20, 30, 40, 50] + }) + + # Create table + table = backend.create_view('test_table', data, temp=True) + + # Test aggregation + expr = table.value.sum() + sql = backend.compile(expr) + assert sql is not None + assert 'SUM' in sql.upper() + + +def test_beam_compiler_filter(): + """Test Beam SQL filter compilation.""" + import apache_beam as beam + import pandas as pd + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Create test data + data = pd.DataFrame({ + 'id': [1, 2, 3, 4, 5], + 'value': [10, 20, 30, 40, 50] + }) + + # Create table + table = backend.create_view('test_table', data, temp=True) + + # Test filter + expr = table.filter(table.value > 20) + sql = backend.compile(expr) + assert sql is not None + assert 'WHERE' in sql.upper() + + +def test_beam_compiler_join(): + """Test Beam SQL join compilation.""" + import apache_beam as beam + import pandas as pd + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Create test data + data1 = pd.DataFrame({ + 'id': [1, 2, 3], + 'name': ['A', 'B', 'C'] + }) + + data2 = pd.DataFrame({ + 'id': [1, 2, 4], + 'value': [100, 200, 400] + }) + + # Create tables + table1 = backend.create_view('table1', data1, temp=True) + table2 = backend.create_view('table2', data2, temp=True) + + # Test join + expr = table1.join(table2, 'id') + sql = backend.compile(expr) + assert sql is not None + assert 'JOIN' in sql.upper() + + +def test_beam_compiler_groupby(): + """Test Beam SQL groupby compilation.""" + import apache_beam as beam + import pandas as pd + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Create test data + data = pd.DataFrame({ + 'category': ['A', 'A', 'B', 'B', 'C'], + 'value': [10, 20, 30, 40, 50] + }) + + # Create table + table = backend.create_view('test_table', data, temp=True) + + # Test groupby + expr = table.group_by('category').agg(value_sum=table.value.sum()) + sql = backend.compile(expr) + assert sql is not None + assert 'GROUP BY' in sql.upper() + assert 'SUM' in sql.upper() diff --git a/ibis/backends/beam/tests/test_dataflow_example.py b/ibis/backends/beam/tests/test_dataflow_example.py new file mode 100644 index 000000000000..e59c7169841c --- /dev/null +++ b/ibis/backends/beam/tests/test_dataflow_example.py @@ -0,0 +1,233 @@ +"""Example of using SET statements to configure DataflowRunner.""" + +import pytest + +import ibis +from ibis.backends.beam import Backend + + +def test_dataflow_runner_setup_example(): + """Example of setting up DataflowRunner with SET statements.""" + import apache_beam as beam + + # Create initial pipeline + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Configure DataflowRunner using SET statements + setup_commands = [ + # Set the runner + "SET runner = 'dataflow'", + + # Required Dataflow options + "SET dataflow.project = 'my-gcp-project'", + "SET dataflow.staging_location = 'gs://my-bucket/staging'", + "SET dataflow.temp_location = 'gs://my-bucket/temp'", + + # Optional Dataflow options + "SET dataflow.region = 'us-central1'", + "SET dataflow.service_account = 'my-service-account@my-gcp-project.iam.gserviceaccount.com'", + "SET dataflow.network = 'my-network'", + "SET dataflow.subnetwork = 'my-subnetwork'", + "SET dataflow.use_public_ips = 'false'", + "SET dataflow.num_workers = '3'", + "SET dataflow.max_num_workers = '10'", + "SET dataflow.machine_type = 'n1-standard-4'", + "SET dataflow.disk_size_gb = '100'", + "SET dataflow.disk_type = 'pd-ssd'", + "SET dataflow.autoscaling_algorithm = 'THROUGHPUT_BASED'", + "SET dataflow.enable_streaming_engine = 'true'", + "SET dataflow.flexrs_goal = 'COST_OPTIMIZED'", + "SET dataflow.labels = 'env=prod,team=data,version=1.0'", + + # Pipeline options + "SET pipeline.streaming = 'true'", + "SET pipeline.save_main_session = 'true'", + "SET job_name = 'my-dataflow-job'" + ] + + # Execute all SET statements + for command in setup_commands: + result = backend.raw_sql(command) + # Verify the command was processed + assert result is not None + + # Verify configuration + runner_config = backend.get_runner_config() + pipeline_options = backend.get_pipeline_options() + + # Check runner configuration + assert runner_config['runner'] == 'DataflowRunner' + assert runner_config['project'] == 'my-gcp-project' + assert runner_config['staging_location'] == 'gs://my-bucket/staging' + assert runner_config['temp_location'] == 'gs://my-bucket/temp' + assert runner_config['region'] == 'us-central1' + assert runner_config['service_account'] == 'my-service-account@my-gcp-project.iam.gserviceaccount.com' + assert runner_config['network'] == 'my-network' + assert runner_config['subnetwork'] == 'my-subnetwork' + assert runner_config['use_public_ips'] == 'false' + assert runner_config['num_workers'] == '3' + assert runner_config['max_num_workers'] == '10' + assert runner_config['machine_type'] == 'n1-standard-4' + assert runner_config['disk_size_gb'] == '100' + assert runner_config['disk_type'] == 'pd-ssd' + assert runner_config['autoscaling_algorithm'] == 'THROUGHPUT_BASED' + assert runner_config['enable_streaming_engine'] == 'true' + assert runner_config['flexrs_goal'] == 'COST_OPTIMIZED' + assert runner_config['labels'] == 'env=prod,team=data,version=1.0' + + # Check pipeline options + assert pipeline_options['streaming'] == 'true' + assert pipeline_options['save_main_session'] == 'true' + assert pipeline_options['job_name'] == 'my-dataflow-job' + + # Create configured pipeline + configured_pipeline = backend.create_configured_pipeline() + assert configured_pipeline is not None + + # The configured pipeline can now be used for actual data processing + return configured_pipeline + + +def test_dataflow_with_ibis_operations(): + """Example of using DataflowRunner with Ibis operations.""" + import apache_beam as beam + import pandas as pd + + # Create initial pipeline + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Configure for Dataflow + backend.raw_sql("SET runner = 'dataflow'") + backend.raw_sql("SET dataflow.project = 'my-gcp-project'") + backend.raw_sql("SET dataflow.staging_location = 'gs://my-bucket/staging'") + backend.raw_sql("SET dataflow.temp_location = 'gs://my-bucket/temp'") + backend.raw_sql("SET dataflow.region = 'us-central1'") + backend.raw_sql("SET dataflow.num_workers = '5'") + + # Create sample data + data = pd.DataFrame({ + 'user_id': [1, 2, 3, 4, 5], + 'event_type': ['click', 'view', 'click', 'purchase', 'view'], + 'timestamp': ['2023-01-01 10:00:00', '2023-01-01 10:01:00', + '2023-01-01 10:02:00', '2023-01-01 10:03:00', '2023-01-01 10:04:00'], + 'value': [10.0, 0.0, 15.0, 100.0, 0.0] + }) + + # Create table + events = backend.create_view('events', data, temp=True) + + # Perform analytics + click_events = events.filter(events.event_type == 'click') + total_clicks = click_events.count() + avg_click_value = click_events.value.mean() + + # Compile to SQL (this would be executed on Dataflow) + click_sql = backend.compile(click_events) + count_sql = backend.compile(total_clicks) + avg_sql = backend.compile(avg_click_value) + + # Verify SQL compilation + assert click_sql is not None + assert count_sql is not None + assert avg_sql is not None + + # Create configured pipeline for actual execution + configured_pipeline = backend.create_configured_pipeline() + assert configured_pipeline is not None + + # In a real scenario, you would run the pipeline: + # with configured_pipeline as pipeline: + # con = ibis.beam.connect(pipeline) + # result = con.execute(avg_click_value) + # print(result) + + +def test_dataflow_streaming_example(): + """Example of configuring DataflowRunner for streaming.""" + import apache_beam as beam + + # Create initial pipeline + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Configure for streaming Dataflow + streaming_setup = [ + "SET runner = 'dataflow'", + "SET dataflow.project = 'my-gcp-project'", + "SET dataflow.staging_location = 'gs://my-bucket/staging'", + "SET dataflow.temp_location = 'gs://my-bucket/temp'", + "SET dataflow.region = 'us-central1'", + "SET dataflow.enable_streaming_engine = 'true'", + "SET dataflow.num_workers = '3'", + "SET dataflow.max_num_workers = '20'", + "SET dataflow.autoscaling_algorithm = 'THROUGHPUT_BASED'", + "SET pipeline.streaming = 'true'", + "SET pipeline.save_main_session = 'true'" + ] + + # Execute setup + for command in streaming_setup: + backend.raw_sql(command) + + # Verify streaming configuration + runner_config = backend.get_runner_config() + pipeline_options = backend.get_pipeline_options() + + assert runner_config['enable_streaming_engine'] == 'true' + assert runner_config['autoscaling_algorithm'] == 'THROUGHPUT_BASED' + assert pipeline_options['streaming'] == 'true' + assert pipeline_options['save_main_session'] == 'true' + + # Create configured pipeline + configured_pipeline = backend.create_configured_pipeline() + assert configured_pipeline is not None + + +def test_dataflow_cost_optimization(): + """Example of configuring DataflowRunner for cost optimization.""" + import apache_beam as beam + + # Create initial pipeline + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Configure for cost-optimized Dataflow + cost_optimized_setup = [ + "SET runner = 'dataflow'", + "SET dataflow.project = 'my-gcp-project'", + "SET dataflow.staging_location = 'gs://my-bucket/staging'", + "SET dataflow.temp_location = 'gs://my-bucket/temp'", + "SET dataflow.region = 'us-central1'", + "SET dataflow.flexrs_goal = 'COST_OPTIMIZED'", + "SET dataflow.machine_type = 'n1-standard-1'", + "SET dataflow.disk_type = 'pd-standard'", + "SET dataflow.disk_size_gb = '50'", + "SET dataflow.num_workers = '1'", + "SET dataflow.max_num_workers = '5'", + "SET dataflow.autoscaling_algorithm = 'NONE'" + ] + + # Execute setup + for command in cost_optimized_setup: + backend.raw_sql(command) + + # Verify cost optimization configuration + runner_config = backend.get_runner_config() + + assert runner_config['flexrs_goal'] == 'COST_OPTIMIZED' + assert runner_config['machine_type'] == 'n1-standard-1' + assert runner_config['disk_type'] == 'pd-standard' + assert runner_config['disk_size_gb'] == '50' + assert runner_config['num_workers'] == '1' + assert runner_config['max_num_workers'] == '5' + assert runner_config['autoscaling_algorithm'] == 'NONE' + + # Create configured pipeline + configured_pipeline = backend.create_configured_pipeline() + assert configured_pipeline is not None diff --git a/ibis/backends/beam/tests/test_datatypes.py b/ibis/backends/beam/tests/test_datatypes.py new file mode 100644 index 000000000000..c4572005a544 --- /dev/null +++ b/ibis/backends/beam/tests/test_datatypes.py @@ -0,0 +1,93 @@ +"""Tests for Beam SQL datatypes.""" + +import pytest + +import ibis.expr.datatypes as dt +from ibis.backends.beam.datatypes import BeamType + + +def test_beam_type_to_ibis(): + """Test conversion from Beam types to Ibis types.""" + # Test basic types + assert isinstance(BeamType.to_ibis("VARCHAR"), dt.String) + assert isinstance(BeamType.to_ibis("BOOLEAN"), dt.Boolean) + assert isinstance(BeamType.to_ibis("INTEGER"), dt.Int32) + assert isinstance(BeamType.to_ibis("BIGINT"), dt.Int64) + assert isinstance(BeamType.to_ibis("FLOAT"), dt.Float32) + assert isinstance(BeamType.to_ibis("DOUBLE"), dt.Float64) + assert isinstance(BeamType.to_ibis("DATE"), dt.Date) + assert isinstance(BeamType.to_ibis("TIME"), dt.Time) + assert isinstance(BeamType.to_ibis("TIMESTAMP"), dt.Timestamp) + + +def test_beam_type_from_ibis(): + """Test conversion from Ibis types to Beam types.""" + # Test basic types + assert BeamType.from_ibis(dt.String()) == "VARCHAR" + assert BeamType.from_ibis(dt.Boolean()) == "BOOLEAN" + assert BeamType.from_ibis(dt.Int32()) == "INTEGER" + assert BeamType.from_ibis(dt.Int64()) == "BIGINT" + assert BeamType.from_ibis(dt.Float32()) == "FLOAT" + assert BeamType.from_ibis(dt.Float64()) == "DOUBLE" + assert BeamType.from_ibis(dt.Date()) == "DATE" + assert BeamType.from_ibis(dt.Time()) == "TIME" + assert BeamType.from_ibis(dt.Timestamp()) == "TIMESTAMP(6)" + + +def test_beam_type_timestamp_precision(): + """Test timestamp precision handling.""" + # Test with precision + assert BeamType.from_ibis(dt.Timestamp(scale=3)) == "TIMESTAMP(3)" + + # Test without precision (default) + assert BeamType.from_ibis(dt.Timestamp()) == "TIMESTAMP(6)" + + +def test_beam_type_array(): + """Test array type handling.""" + # Test array type + array_type = dt.Array(dt.String()) + beam_type = BeamType.from_ibis(array_type) + assert beam_type == "ARRAY" + + # Test nested array + nested_array = dt.Array(dt.Array(dt.Int32())) + beam_type = BeamType.from_ibis(nested_array) + assert beam_type == "ARRAY>" + + +def test_beam_type_map(): + """Test map type handling.""" + # Test map type + map_type = dt.Map(dt.String(), dt.Int32()) + beam_type = BeamType.from_ibis(map_type) + assert beam_type == "MAP" + + +def test_beam_type_struct(): + """Test struct type handling.""" + # Test struct type + struct_type = dt.Struct({ + 'name': dt.String(), + 'age': dt.Int32(), + 'active': dt.Boolean() + }) + beam_type = BeamType.from_ibis(struct_type) + assert 'name VARCHAR' in beam_type + assert 'age INTEGER' in beam_type + assert 'active BOOLEAN' in beam_type + assert beam_type.startswith('ROW(') + assert beam_type.endswith(')') + + +def test_beam_type_nullable(): + """Test nullable type handling.""" + # Test nullable types + nullable_string = dt.String(nullable=True) + beam_type = BeamType.from_ibis(nullable_string) + assert beam_type == "VARCHAR" + + # Test non-nullable types + non_nullable_string = dt.String(nullable=False) + beam_type = BeamType.from_ibis(non_nullable_string) + assert beam_type == "VARCHAR NOT NULL" diff --git a/ibis/backends/beam/tests/test_set_statements.py b/ibis/backends/beam/tests/test_set_statements.py new file mode 100644 index 000000000000..88f89cbedaa5 --- /dev/null +++ b/ibis/backends/beam/tests/test_set_statements.py @@ -0,0 +1,245 @@ +"""Tests for SET statement functionality in Beam backend.""" + +import pytest + +import ibis +from ibis.backends.beam import Backend + + +def test_set_runner(): + """Test setting the runner using SET statement.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Test setting DataflowRunner + result = backend.raw_sql("SET runner = 'dataflow'") + assert backend.get_runner_config()['runner'] == 'DataflowRunner' + + # Test setting FlinkRunner + result = backend.raw_sql("SET runner = 'flink'") + assert backend.get_runner_config()['runner'] == 'FlinkRunner' + + # Test setting DirectRunner + result = backend.raw_sql("SET runner = 'direct'") + assert backend.get_runner_config()['runner'] == 'DirectRunner' + + +def test_set_dataflow_options(): + """Test setting Dataflow-specific options.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Set DataflowRunner + backend.raw_sql("SET runner = 'dataflow'") + + # Set Dataflow options + backend.raw_sql("SET dataflow.project = 'my-gcp-project'") + backend.raw_sql("SET dataflow.region = 'us-central1'") + backend.raw_sql("SET dataflow.staging_location = 'gs://my-bucket/staging'") + backend.raw_sql("SET dataflow.temp_location = 'gs://my-bucket/temp'") + backend.raw_sql("SET dataflow.num_workers = '5'") + backend.raw_sql("SET dataflow.max_num_workers = '10'") + backend.raw_sql("SET dataflow.machine_type = 'n1-standard-4'") + backend.raw_sql("SET dataflow.use_public_ips = 'false'") + backend.raw_sql("SET dataflow.enable_streaming_engine = 'true'") + + config = backend.get_runner_config() + assert config['project'] == 'my-gcp-project' + assert config['region'] == 'us-central1' + assert config['staging_location'] == 'gs://my-bucket/staging' + assert config['temp_location'] == 'gs://my-bucket/temp' + assert config['num_workers'] == '5' + assert config['max_num_workers'] == '10' + assert config['machine_type'] == 'n1-standard-4' + assert config['use_public_ips'] == 'false' + assert config['enable_streaming_engine'] == 'true' + + +def test_set_pipeline_options(): + """Test setting pipeline options.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Set pipeline options + backend.raw_sql("SET pipeline.streaming = 'true'") + backend.raw_sql("SET pipeline.save_main_session = 'true'") + backend.raw_sql("SET pipeline.setup_file = '/path/to/setup.py'") + + options = backend.get_pipeline_options() + assert options['streaming'] == 'true' + assert options['save_main_session'] == 'true' + assert options['setup_file'] == '/path/to/setup.py' + + +def test_set_generic_options(): + """Test setting generic options.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Set generic options + backend.raw_sql("SET streaming = 'true'") + backend.raw_sql("SET job_name = 'my-beam-job'") + + options = backend.get_pipeline_options() + assert options['streaming'] == 'true' + assert options['job_name'] == 'my-beam-job' + + +def test_set_with_quotes(): + """Test SET statements with quoted values.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Test with single quotes + backend.raw_sql("SET runner = 'dataflow'") + assert backend.get_runner_config()['runner'] == 'DataflowRunner' + + # Test with double quotes + backend.raw_sql('SET runner = "flink"') + assert backend.get_runner_config()['runner'] == 'FlinkRunner' + + # Test with spaces and quotes + backend.raw_sql("SET dataflow.project = 'my-gcp-project'") + assert backend.get_runner_config()['project'] == 'my-gcp-project' + + +def test_set_invalid_statement(): + """Test invalid SET statements.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Test invalid SET statement + with pytest.raises(Exception): + backend.raw_sql("SET invalid statement") + + # Test unknown runner + with pytest.raises(Exception): + backend.raw_sql("SET runner = 'unknown'") + + +def test_create_configured_pipeline(): + """Test creating a pipeline with configured options.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Configure for Dataflow + backend.raw_sql("SET runner = 'dataflow'") + backend.raw_sql("SET dataflow.project = 'my-gcp-project'") + backend.raw_sql("SET dataflow.region = 'us-central1'") + backend.raw_sql("SET dataflow.staging_location = 'gs://my-bucket/staging'") + backend.raw_sql("SET dataflow.temp_location = 'gs://my-bucket/temp'") + backend.raw_sql("SET dataflow.num_workers = '5'") + + # Create configured pipeline + configured_pipeline = backend.create_configured_pipeline() + assert configured_pipeline is not None + + # Verify the pipeline has options + options = configured_pipeline._options + assert options is not None + + +def test_set_labels(): + """Test setting Dataflow labels.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Set DataflowRunner + backend.raw_sql("SET runner = 'dataflow'") + + # Set labels + backend.raw_sql("SET dataflow.labels = 'env=prod,team=data,version=1.0'") + + config = backend.get_runner_config() + assert config['labels'] == 'env=prod,team=data,version=1.0' + + +def test_complete_dataflow_setup(): + """Test complete Dataflow setup with multiple SET statements.""" + import apache_beam as beam + + pipeline = beam.Pipeline() + backend = Backend() + backend.do_connect(pipeline) + + # Complete Dataflow configuration + setup_statements = [ + "SET runner = 'dataflow'", + "SET dataflow.project = 'my-gcp-project'", + "SET dataflow.region = 'us-central1'", + "SET dataflow.staging_location = 'gs://my-bucket/staging'", + "SET dataflow.temp_location = 'gs://my-bucket/temp'", + "SET dataflow.service_account = 'my-service-account@my-gcp-project.iam.gserviceaccount.com'", + "SET dataflow.network = 'my-network'", + "SET dataflow.subnetwork = 'my-subnetwork'", + "SET dataflow.use_public_ips = 'false'", + "SET dataflow.num_workers = '3'", + "SET dataflow.max_num_workers = '10'", + "SET dataflow.machine_type = 'n1-standard-4'", + "SET dataflow.disk_size_gb = '100'", + "SET dataflow.disk_type = 'pd-ssd'", + "SET dataflow.autoscaling_algorithm = 'THROUGHPUT_BASED'", + "SET dataflow.enable_streaming_engine = 'true'", + "SET dataflow.flexrs_goal = 'COST_OPTIMIZED'", + "SET dataflow.labels = 'env=prod,team=data'", + "SET pipeline.streaming = 'true'", + "SET pipeline.save_main_session = 'true'" + ] + + # Execute all SET statements + for statement in setup_statements: + result = backend.raw_sql(statement) + + # Verify configuration + runner_config = backend.get_runner_config() + pipeline_options = backend.get_pipeline_options() + + assert runner_config['runner'] == 'DataflowRunner' + assert runner_config['project'] == 'my-gcp-project' + assert runner_config['region'] == 'us-central1' + assert runner_config['staging_location'] == 'gs://my-bucket/staging' + assert runner_config['temp_location'] == 'gs://my-bucket/temp' + assert runner_config['service_account'] == 'my-service-account@my-gcp-project.iam.gserviceaccount.com' + assert runner_config['network'] == 'my-network' + assert runner_config['subnetwork'] == 'my-subnetwork' + assert runner_config['use_public_ips'] == 'false' + assert runner_config['num_workers'] == '3' + assert runner_config['max_num_workers'] == '10' + assert runner_config['machine_type'] == 'n1-standard-4' + assert runner_config['disk_size_gb'] == '100' + assert runner_config['disk_type'] == 'pd-ssd' + assert runner_config['autoscaling_algorithm'] == 'THROUGHPUT_BASED' + assert runner_config['enable_streaming_engine'] == 'true' + assert runner_config['flexrs_goal'] == 'COST_OPTIMIZED' + assert runner_config['labels'] == 'env=prod,team=data' + + assert pipeline_options['streaming'] == 'true' + assert pipeline_options['save_main_session'] == 'true' + + # Create configured pipeline + configured_pipeline = backend.create_configured_pipeline() + assert configured_pipeline is not None diff --git a/ibis/backends/sql/compilers/__init__.py b/ibis/backends/sql/compilers/__init__.py index d49d2d68dc4e..73ce2ce3dc87 100644 --- a/ibis/backends/sql/compilers/__init__.py +++ b/ibis/backends/sql/compilers/__init__.py @@ -2,6 +2,7 @@ __all__ = [ "AthenaCompiler", + "BeamCompiler", "BigQueryCompiler", "ClickHouseCompiler", "DataFusionCompiler", @@ -23,6 +24,7 @@ ] from ibis.backends.sql.compilers.athena import AthenaCompiler +from ibis.backends.sql.compilers.beam import BeamCompiler from ibis.backends.sql.compilers.bigquery import BigQueryCompiler from ibis.backends.sql.compilers.clickhouse import ClickHouseCompiler from ibis.backends.sql.compilers.databricks import DatabricksCompiler diff --git a/ibis/backends/sql/compilers/beam.py b/ibis/backends/sql/compilers/beam.py new file mode 100644 index 000000000000..230431642ae4 --- /dev/null +++ b/ibis/backends/sql/compilers/beam.py @@ -0,0 +1,295 @@ +"""Beam SQL Ibis expression to SQL compiler.""" + +from __future__ import annotations + +import sqlglot as sg +import sqlglot.expressions as sge + +import ibis.common.exceptions as com +import ibis.expr.datatypes as dt +import ibis.expr.operations as ops +from ibis.backends.sql.compilers.base import NULL, STAR, AggGen, SQLGlotCompiler +from ibis.backends.sql.datatypes import BeamType +from ibis.backends.sql.dialects import Beam +from ibis.backends.sql.rewrites import ( + exclude_unsupported_window_frame_from_ops, + exclude_unsupported_window_frame_from_rank, + exclude_unsupported_window_frame_from_row_number, + split_select_distinct_with_order_by, +) + + +class BeamAggGen(AggGen): + """Aggregate function generator for Beam SQL.""" + + def aggregate(self, compiler, name, *args, where=None, order_by=()): + """Generate aggregate function calls.""" + if order_by: + raise com.UnsupportedOperationError( + "ordering of order-sensitive aggregations via `order_by` is " + "not supported for this backend" + ) + + func = compiler.f[name] + if where is not None: + # Filter the aggregation + filtered_args = [] + for arg in args: + if hasattr(arg, 'op') and isinstance(arg.op(), ops.Value): + # Apply WHERE clause to the argument + filtered_arg = compiler.visit_Filter( + ops.Filter(table=arg, predicates=[where]) + ) + filtered_args.append(filtered_arg) + else: + filtered_args.append(arg) + args = filtered_args + + return func(*args) + + +class BeamCompiler(SQLGlotCompiler): + """Beam SQL compiler.""" + + quoted = True + dialect = Beam + type_mapper = BeamType + + agg = BeamAggGen() + + rewrites = ( + exclude_unsupported_window_frame_from_row_number, + exclude_unsupported_window_frame_from_ops, + exclude_unsupported_window_frame_from_rank, + *SQLGlotCompiler.rewrites, + ) + post_rewrites = (split_select_distinct_with_order_by,) + + UNSUPPORTED_OPS = ( + ops.AnalyticVectorizedUDF, + ops.ApproxMedian, + ops.ArrayFlatten, + ops.ArrayStringJoin, + ops.ArgMax, + ops.ArgMin, + ops.Correlation, + ops.CountDistinctStar, + ops.Covariance, + ops.DateDiff, + ops.FindInSet, + ops.IsInf, + ops.IsNan, + ops.Levenshtein, + ops.Median, + ops.NthValue, + ops.ReductionVectorizedUDF, + ops.RegexSplit, + ops.RowID, + ops.Translate, + ops.StringToTime, + ops.Kurtosis, + ) + + SIMPLE_OPS = { + ops.All: "min", + ops.Any: "max", + ops.ApproxCountDistinct: "approx_count_distinct", + ops.ArrayDistinct: "array_distinct", + ops.ArrayLength: "cardinality", + ops.ArrayPosition: "array_position", + ops.ArrayRemove: "array_remove", + ops.ArraySort: "array_sort", + ops.ArrayUnion: "array_union", + ops.ExtractDayOfYear: "dayofyear", + ops.MapKeys: "map_keys", + ops.MapValues: "map_values", + ops.Power: "power", + ops.RegexSearch: "regexp", + ops.StrRight: "right", + ops.StringLength: "char_length", + ops.StringToDate: "to_date", + ops.StringToTimestamp: "to_timestamp", + ops.TypeOf: "typeof", + } + + @property + def NAN(self): + """Beam SQL doesn't support NaN.""" + raise NotImplementedError("Beam SQL does not support NaN") + + @property + def POS_INF(self): + """Beam SQL doesn't support Infinity.""" + raise NotImplementedError("Beam SQL does not support Infinity") + + NEG_INF = POS_INF + + @staticmethod + def _generate_groups(groups): + """Generate GROUP BY clauses.""" + return groups + + def visit_ArrayCollect(self, op, *, arg, where, order_by, include_null, distinct): + """Visit ArrayCollect operation.""" + if order_by: + raise com.UnsupportedOperationError( + "ArrayCollect with order_by is not supported in Beam SQL" + ) + + if distinct: + func_name = "array_agg_distinct" + else: + func_name = "array_agg" + + if where is not None: + # Apply filter to the argument + filtered_arg = self.visit_Filter( + ops.Filter(table=arg, predicates=[where]) + ) + return self.f[func_name](filtered_arg) + + return self.f[func_name](arg) + + def visit_ArrayConcat(self, op, *, arg): + """Visit ArrayConcat operation.""" + return self.f.array_concat(*arg) + + def visit_ArrayContains(self, op, *, arg, other): + """Visit ArrayContains operation.""" + return self.f.array_contains(arg, other) + + def visit_ArrayIndex(self, op, *, arg, index): + """Visit ArrayIndex operation.""" + return self.f.array_get(arg, index + 1) # Beam SQL is 1-indexed + + def visit_ArrayRepeat(self, op, *, arg, times): + """Visit ArrayRepeat operation.""" + return self.f.array_repeat(arg, times) + + def visit_ArraySlice(self, op, *, arg, start, stop): + """Visit ArraySlice operation.""" + return self.f.array_slice(arg, start + 1, stop) # Beam SQL is 1-indexed + + def visit_ArraySort(self, op, *, arg, key=None): + """Visit ArraySort operation.""" + if key is not None: + raise com.UnsupportedOperationError( + "ArraySort with key function is not supported in Beam SQL" + ) + return self.f.array_sort(arg) + + def visit_ArrayUnion(self, op, *, left, right): + """Visit ArrayUnion operation.""" + return self.f.array_union(left, right) + + def visit_ArrayDistinct(self, op, *, arg): + """Visit ArrayDistinct operation.""" + return self.f.array_distinct(arg) + + def visit_ArrayLength(self, op, *, arg): + """Visit ArrayLength operation.""" + return self.f.cardinality(arg) + + def visit_ArrayPosition(self, op, *, arg, other): + """Visit ArrayPosition operation.""" + return self.f.array_position(arg, other) + + def visit_ArrayRemove(self, op, *, arg, other): + """Visit ArrayRemove operation.""" + return self.f.array_remove(arg, other) + + def visit_ArrayFlatten(self, op, *, arg): + """Visit ArrayFlatten operation.""" + raise com.UnsupportedOperationError( + "ArrayFlatten is not supported in Beam SQL" + ) + + def visit_ArrayStringJoin(self, op, *, arg, sep): + """Visit ArrayStringJoin operation.""" + raise com.UnsupportedOperationError( + "ArrayStringJoin is not supported in Beam SQL" + ) + + def visit_MapKeys(self, op, *, arg): + """Visit MapKeys operation.""" + return self.f.map_keys(arg) + + def visit_MapValues(self, op, *, arg): + """Visit MapValues operation.""" + return self.f.map_values(arg) + + def visit_MapGet(self, op, *, arg, key, default=None): + """Visit MapGet operation.""" + if default is not None: + return self.f.coalesce(self.f.map_get(arg, key), default) + return self.f.map_get(arg, key) + + def visit_MapContains(self, op, *, arg, key): + """Visit MapContains operation.""" + return self.f.map_contains(arg, key) + + def visit_MapMerge(self, op, *, left, right): + """Visit MapMerge operation.""" + return self.f.map_merge(left, right) + + def visit_MapConcat(self, op, *, arg): + """Visit MapConcat operation.""" + return self.f.map_concat(*arg) + + def visit_MapFromArrays(self, op, *, keys, values): + """Visit MapFromArrays operation.""" + return self.f.map_from_arrays(keys, values) + + def visit_MapFromEntries(self, op, *, arg): + """Visit MapFromEntries operation.""" + return self.f.map_from_entries(arg) + + def visit_MapEntries(self, op, *, arg): + """Visit MapEntries operation.""" + return self.f.map_entries(arg) + + def visit_MapKeys(self, op, *, arg): + """Visit MapKeys operation.""" + return self.f.map_keys(arg) + + def visit_MapValues(self, op, *, arg): + """Visit MapValues operation.""" + return self.f.map_values(arg) + + def visit_MapGet(self, op, *, arg, key, default=None): + """Visit MapGet operation.""" + if default is not None: + return self.f.coalesce(self.f.map_get(arg, key), default) + return self.f.map_get(arg, key) + + def visit_MapContains(self, op, *, arg, key): + """Visit MapContains operation.""" + return self.f.map_contains(arg, key) + + def visit_MapMerge(self, op, *, left, right): + """Visit MapMerge operation.""" + return self.f.map_merge(left, right) + + def visit_MapConcat(self, op, *, arg): + """Visit MapConcat operation.""" + return self.f.map_concat(*arg) + + def visit_MapFromArrays(self, op, *, keys, values): + """Visit MapFromArrays operation.""" + return self.f.map_from_arrays(keys, values) + + def visit_MapFromEntries(self, op, *, arg): + """Visit MapFromEntries operation.""" + return self.f.map_from_entries(arg) + + def visit_MapEntries(self, op, *, arg): + """Visit MapEntries operation.""" + return self.f.map_entries(arg) + + def visit_Strip(self, op, *, arg): + """Visit Strip operation.""" + # Beam SQL doesn't have BTRIM, so we use a combination of left and right trim + return self.visit_RStrip(op, arg=self.visit_LStrip(op, arg=arg)) + + +compiler = BeamCompiler() diff --git a/ibis/backends/sql/datatypes.py b/ibis/backends/sql/datatypes.py index ed1815890867..e6c39b9c06f3 100644 --- a/ibis/backends/sql/datatypes.py +++ b/ibis/backends/sql/datatypes.py @@ -1393,6 +1393,10 @@ class AthenaType(SqlglotType): dialect = "athena" +class BeamType(SqlglotType): + dialect = "beam" + + TYPE_MAPPERS: dict[str, SqlglotType] = { mapper.dialect: mapper for mapper in set(get_subclasses(SqlglotType)) - {SqlglotType, BigQueryUDFType} diff --git a/ibis/backends/sql/dialects.py b/ibis/backends/sql/dialects.py index 85c67a719a68..bcc7ae4c729b 100644 --- a/ibis/backends/sql/dialects.py +++ b/ibis/backends/sql/dialects.py @@ -23,6 +23,8 @@ from sqlglot.dialects.dialect import rename_func from sqlglot.helper import find_new_name, seq_get +from ibis.backends.sql.dialects.beam import Beam + class ClickHouse(_ClickHouse): class Generator(_ClickHouse.Generator): diff --git a/ibis/backends/sql/dialects/beam.py b/ibis/backends/sql/dialects/beam.py new file mode 100644 index 000000000000..41947858d934 --- /dev/null +++ b/ibis/backends/sql/dialects/beam.py @@ -0,0 +1,160 @@ +"""Beam SQL dialect for SQLGlot.""" + +from __future__ import annotations + +import sqlglot as sg +from sqlglot.dialects import Dialect + + +class Beam(Dialect): + """Beam SQL dialect.""" + + class Tokenizer(sg.Tokenizer): + """Beam SQL tokenizer.""" + + KEYWORDS = { + **sg.Tokenizer.KEYWORDS, + "ARRAY": "ARRAY", + "MAP": "MAP", + "ROW": "ROW", + "STRUCT": "STRUCT", + "WATERMARK": "WATERMARK", + "PARTITIONED": "PARTITIONED", + "BY": "BY", + "WITH": "WITH", + "PROPERTIES": "PROPERTIES", + "CONNECTOR": "CONNECTOR", + "FORMAT": "FORMAT", + "PATH": "PATH", + "SCHEMA": "SCHEMA", + "PRIMARY": "PRIMARY", + "KEY": "KEY", + "NOT": "NOT", + "ENFORCED": "ENFORCED", + } + + class Parser(sg.Parser): + """Beam SQL parser.""" + + FUNCTIONS = { + **sg.Parser.FUNCTIONS, + "array_agg": "ARRAY_AGG", + "array_agg_distinct": "ARRAY_AGG_DISTINCT", + "array_concat": "ARRAY_CONCAT", + "array_contains": "ARRAY_CONTAINS", + "array_get": "ARRAY_GET", + "array_repeat": "ARRAY_REPEAT", + "array_slice": "ARRAY_SLICE", + "array_sort": "ARRAY_SORT", + "array_union": "ARRAY_UNION", + "array_distinct": "ARRAY_DISTINCT", + "cardinality": "CARDINALITY", + "array_position": "ARRAY_POSITION", + "array_remove": "ARRAY_REMOVE", + "map_keys": "MAP_KEYS", + "map_values": "MAP_VALUES", + "map_get": "MAP_GET", + "map_contains": "MAP_CONTAINS", + "map_merge": "MAP_MERGE", + "map_concat": "MAP_CONCAT", + "map_from_arrays": "MAP_FROM_ARRAYS", + "map_from_entries": "MAP_FROM_ENTRIES", + "map_entries": "MAP_ENTRIES", + "approx_count_distinct": "APPROX_COUNT_DISTINCT", + "dayofyear": "DAYOFYEAR", + "power": "POWER", + "regexp": "REGEXP", + "right": "RIGHT", + "char_length": "CHAR_LENGTH", + "to_date": "TO_DATE", + "to_timestamp": "TO_TIMESTAMP", + "typeof": "TYPEOF", + } + + class Generator(sg.Generator): + """Beam SQL generator.""" + + TYPE_MAPPING = { + **sg.Generator.TYPE_MAPPING, + sg.DataType.Type.TINYINT: "TINYINT", + sg.DataType.Type.SMALLINT: "SMALLINT", + sg.DataType.Type.INT: "INTEGER", + sg.DataType.Type.BIGINT: "BIGINT", + sg.DataType.Type.FLOAT: "FLOAT", + sg.DataType.Type.DOUBLE: "DOUBLE", + sg.DataType.Type.BOOLEAN: "BOOLEAN", + sg.DataType.Type.STRING: "VARCHAR", + sg.DataType.Type.CHAR: "CHAR", + sg.DataType.Type.VARCHAR: "VARCHAR", + sg.DataType.Type.DATE: "DATE", + sg.DataType.Type.TIME: "TIME", + sg.DataType.Type.TIMESTAMP: "TIMESTAMP", + sg.DataType.Type.ARRAY: "ARRAY", + sg.DataType.Type.MAP: "MAP", + sg.DataType.Type.STRUCT: "ROW", + } + + def array_sql(self, expression): + """Generate ARRAY type SQL.""" + return f"ARRAY<{self.sql(expression, 'expressions')[0]}>" + + def map_sql(self, expression): + """Generate MAP type SQL.""" + key_type = self.sql(expression, 'expressions')[0] + value_type = self.sql(expression, 'expressions')[1] + return f"MAP<{key_type}, {value_type}>" + + def struct_sql(self, expression): + """Generate ROW type SQL.""" + fields = [] + for field in expression.expressions: + field_name = field.alias or field.this + field_type = self.sql(field, 'expressions')[0] + fields.append(f"{field_name} {field_type}") + return f"ROW({', '.join(fields)})" + + def watermark_sql(self, expression): + """Generate WATERMARK SQL.""" + time_col = expression.this + strategy = expression.expressions[0] if expression.expressions else None + if strategy: + return f"WATERMARK FOR {time_col} AS {strategy}" + return f"WATERMARK FOR {time_col}" + + def partitioned_sql(self, expression): + """Generate PARTITIONED BY SQL.""" + columns = self.sql(expression, 'expressions') + return f"PARTITIONED BY ({', '.join(columns)})" + + def properties_sql(self, expression): + """Generate WITH PROPERTIES SQL.""" + props = [] + for prop in expression.expressions: + key = prop.this + value = prop.expressions[0] if prop.expressions else None + if value: + props.append(f"'{key}'='{value}'") + else: + props.append(f"'{key}'") + return f"WITH ({', '.join(props)})" + + def primary_key_sql(self, expression): + """Generate PRIMARY KEY SQL.""" + columns = self.sql(expression, 'expressions') + return f"PRIMARY KEY ({', '.join(columns)})" + + def connector_sql(self, expression): + """Generate CONNECTOR SQL.""" + return f"CONNECTOR = '{expression.this}'" + + def format_sql(self, expression): + """Generate FORMAT SQL.""" + return f"FORMAT = '{expression.this}'" + + def path_sql(self, expression): + """Generate PATH SQL.""" + return f"PATH = '{expression.this}'" + + def schema_sql(self, expression): + """Generate SCHEMA SQL.""" + return f"SCHEMA = '{expression.this}'" diff --git a/pyproject.toml b/pyproject.toml index 977b58b5d537..94de6e928b95 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -149,6 +149,14 @@ flink = [ "pandas>=1.5.3,<3", "rich>=12.4.4", ] +beam = [ + "apache-beam>=2.50.0", + "pyarrow>=10.0.1", + "pyarrow-hotfix>=0.4", + "numpy>=1.23.2,<3", + "pandas>=1.5.3,<3", + "rich>=12.4.4", +] impala = [ "impyla>=0.17", "pyarrow>=10.0.1", @@ -311,6 +319,7 @@ docs = [ [project.entry-points."ibis.backends"] athena = "ibis.backends.athena" +beam = "ibis.backends.beam" bigquery = "ibis.backends.bigquery" clickhouse = "ibis.backends.clickhouse" databricks = "ibis.backends.databricks" @@ -468,6 +477,7 @@ markers = [ "notyet: This requires upstream to implement/fix something. We can't/won't workaround on the ibis side", "never: The backend will never support this / pass this test. Don't bother trying to fix it", "athena: Amazon Athena tests", + "beam: Apache Beam tests", "bigquery: BigQuery tests", "clickhouse: ClickHouse tests", "databricks: Databricks SQL tests",