diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 4db900e776..9a2039d396 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -97,6 +97,7 @@ _read_gbq_colab, from_glob_path, read_arrow, + read_avro, read_csv, read_gbq, read_gbq_function, @@ -105,6 +106,7 @@ read_gbq_query, read_gbq_table, read_json, + read_orc, read_pandas, read_parquet, read_pickle, @@ -461,6 +463,8 @@ def reset_session(): read_pandas, read_parquet, read_pickle, + read_orc, + read_avro, remote_function, to_datetime, to_timedelta, @@ -496,6 +500,8 @@ def reset_session(): "read_pandas", "read_parquet", "read_pickle", + "read_orc", + "read_avro", "remote_function", "to_datetime", "to_timedelta", diff --git a/bigframes/pandas/io/api.py b/bigframes/pandas/io/api.py index 7296cd2b7f..d0ad88ec4b 100644 --- a/bigframes/pandas/io/api.py +++ b/bigframes/pandas/io/api.py @@ -600,6 +600,38 @@ def read_parquet( read_parquet.__doc__ = inspect.getdoc(bigframes.session.Session.read_parquet) +def read_orc( + path: str | IO["bytes"], + *, + engine: str = "auto", + write_engine: constants.WriteEngineType = "default", +) -> bigframes.dataframe.DataFrame: + return global_session.with_default_session( + bigframes.session.Session.read_orc, + path, + engine=engine, + write_engine=write_engine, + ) + + +read_orc.__doc__ = inspect.getdoc(bigframes.session.Session.read_orc) + + +def read_avro( + path: str | IO["bytes"], + *, + engine: str = "auto", +) -> bigframes.dataframe.DataFrame: + return global_session.with_default_session( + bigframes.session.Session.read_avro, + path, + engine=engine, + ) + + +read_avro.__doc__ = inspect.getdoc(bigframes.session.Session.read_avro) + + def read_gbq_function( function_name: str, is_row_processor: bool = False, diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 75be3022d7..3ce50933f5 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1371,6 +1371,87 @@ def read_parquet( ) return self._read_pandas(pandas_obj, write_engine=write_engine) + def read_orc( + self, + path: str | IO["bytes"], + *, + engine: str = "auto", + write_engine: constants.WriteEngineType = "default", + ) -> dataframe.DataFrame: + """Load an ORC file to a BigQuery DataFrames DataFrame. + + Args: + path (str or IO): + The path or buffer to the ORC file. Can be a local path or Google Cloud Storage URI. + engine (str, default "auto"): + The engine used to read the file. Supported values: `auto`, `bigquery`, `pyarrow`. + write_engine (str, default "default"): + The write engine used to persist the data to BigQuery if needed. + + Returns: + bigframes.dataframe.DataFrame: + A new DataFrame representing the data from the ORC file. + """ + bigframes.session.validation.validate_engine_compatibility( + engine=engine, + write_engine=write_engine, + ) + if engine == "bigquery": + job_config = bigquery.LoadJobConfig() + job_config.source_format = bigquery.SourceFormat.ORC + job_config.labels = {"bigframes-api": "read_orc"} + table_id = self._loader.load_file(path, job_config=job_config) + return self._loader.read_gbq_table(table_id) + elif engine in ("auto", "pyarrow"): + if isinstance(path, str) and "*" in path: + raise ValueError( + "The provided path contains a wildcard character (*), which is not " + "supported by the current engine. To read files from wildcard paths, " + "please use the 'bigquery' engine by setting `engine='bigquery'` in " + "your configuration." + ) + + read_orc_kwargs: Dict[str, Any] = {} + if not pandas.__version__.startswith("1."): + read_orc_kwargs["dtype_backend"] = "pyarrow" + + pandas_obj = pandas.read_orc(path, **read_orc_kwargs) + return self._read_pandas(pandas_obj, write_engine=write_engine) + else: + raise ValueError( + f"Unsupported engine: {repr(engine)}. Supported values: 'auto', 'bigquery', 'pyarrow'." + ) + + def read_avro( + self, + path: str | IO["bytes"], + *, + engine: str = "auto", + ) -> dataframe.DataFrame: + """Load an Avro file to a BigQuery DataFrames DataFrame. + + Args: + path (str or IO): + The path or buffer to the Avro file. Can be a local path or Google Cloud Storage URI. + engine (str, default "auto"): + The engine used to read the file. Only `bigquery` is supported for Avro. + + Returns: + bigframes.dataframe.DataFrame: + A new DataFrame representing the data from the Avro file. + """ + if engine not in ("auto", "bigquery"): + raise ValueError( + f"Unsupported engine: {repr(engine)}. Supported values: 'auto', 'bigquery'." + ) + + job_config = bigquery.LoadJobConfig() + job_config.use_avro_logical_types = True + job_config.source_format = bigquery.SourceFormat.AVRO + job_config.labels = {"bigframes-api": "read_avro"} + table_id = self._loader.load_file(path, job_config=job_config) + return self._loader.read_gbq_table(table_id) + def read_json( self, path_or_buf: str | IO["bytes"], diff --git a/test_types.orc b/test_types.orc new file mode 100644 index 0000000000..3da1defa54 Binary files /dev/null and b/test_types.orc differ diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 45e98cd960..a042589c5b 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -1934,6 +1934,131 @@ def test_read_parquet_gcs( bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out) +@pytest.mark.parametrize( + ("engine", "filename"), + ( + pytest.param( + "bigquery", + "000000000000.orc", + id="bigquery", + ), + pytest.param( + "auto", + "000000000000.orc", + id="auto", + ), + pytest.param( + "pyarrow", + "000000000000.orc", + id="pyarrow", + ), + pytest.param( + "bigquery", + "*.orc", + id="bigquery_wildcard", + ), + pytest.param( + "auto", + "*.orc", + id="auto_wildcard", + marks=pytest.mark.xfail( + raises=ValueError, + ), + ), + ), +) +def test_read_orc_gcs( + session: bigframes.Session, scalars_dfs, gcs_folder, engine, filename +): + pytest.importorskip( + "pandas", + minversion="2.0.0", + reason="pandas<2 does not handle nullable int columns well", + ) + scalars_df, _ = scalars_dfs + write_path = gcs_folder + test_read_orc_gcs.__name__ + "000000000000.orc" + read_path = gcs_folder + test_read_orc_gcs.__name__ + filename + + df_in: bigframes.dataframe.DataFrame = scalars_df.copy() + df_in = df_in.drop( + columns=[ + "geography_col", + "time_col", + "datetime_col", + "duration_col", + "timestamp_col", + ] + ) + df_write = df_in.reset_index(drop=False) + df_write.index.name = f"ordering_id_{random.randrange(1_000_000)}" + df_write.to_orc(write_path) + + df_out = ( + session.read_orc(read_path, engine=engine) + .set_index(df_write.index.name) + .sort_index() + .set_index(typing.cast(str, df_in.index.name)) + ) + + assert df_out.size != 0 + pd_df_in = df_in.to_pandas() + pd_df_out = df_out.to_pandas() + bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out) + + +@pytest.mark.parametrize( + ("engine", "filename"), + ( + pytest.param( + "bigquery", + "000000000000.avro", + id="bigquery", + ), + pytest.param( + "bigquery", + "*.avro", + id="bigquery_wildcard", + ), + ), +) +def test_read_avro_gcs( + session: bigframes.Session, scalars_dfs, gcs_folder, engine, filename +): + scalars_df, _ = scalars_dfs + write_uri = gcs_folder + test_read_avro_gcs.__name__ + "*.avro" + read_uri = gcs_folder + test_read_avro_gcs.__name__ + filename + + df_in: bigframes.dataframe.DataFrame = scalars_df.copy() + # datetime round-trips back as str in avro + df_in = df_in.drop(columns=["geography_col", "duration_col", "datetime_col"]) + df_write = df_in.reset_index(drop=False) + index_name = f"ordering_id_{random.randrange(1_000_000)}" + df_write.index.name = index_name + + # Create a BigQuery table + table_id = df_write.to_gbq() + + # Extract to GCS as Avro + client = session.bqclient + extract_job_config = bigquery.ExtractJobConfig() + extract_job_config.destination_format = "AVRO" + extract_job_config.use_avro_logical_types = True + + client.extract_table(table_id, write_uri, job_config=extract_job_config).result() + + df_out = ( + session.read_avro(read_uri, engine=engine) + .set_index(index_name) + .sort_index() + .set_index(typing.cast(str, df_in.index.name)) + ) + + assert df_out.size != 0 + pd_df_in = df_in.to_pandas() + pd_df_out = df_out.to_pandas() + bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out) + + @pytest.mark.parametrize( "compression", [