Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
_read_gbq_colab,
from_glob_path,
read_arrow,
read_avro,
read_csv,
read_gbq,
read_gbq_function,
Expand All @@ -105,6 +106,7 @@
read_gbq_query,
read_gbq_table,
read_json,
read_orc,
read_pandas,
read_parquet,
read_pickle,
Expand Down Expand Up @@ -461,6 +463,8 @@ def reset_session():
read_pandas,
read_parquet,
read_pickle,
read_orc,
read_avro,
remote_function,
to_datetime,
to_timedelta,
Expand Down Expand Up @@ -496,6 +500,8 @@ def reset_session():
"read_pandas",
"read_parquet",
"read_pickle",
"read_orc",
"read_avro",
"remote_function",
"to_datetime",
"to_timedelta",
Expand Down
32 changes: 32 additions & 0 deletions bigframes/pandas/io/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,38 @@ def read_parquet(
read_parquet.__doc__ = inspect.getdoc(bigframes.session.Session.read_parquet)


def read_orc(
path: str | IO["bytes"],
*,
engine: str = "auto",
write_engine: constants.WriteEngineType = "default",
) -> bigframes.dataframe.DataFrame:
return global_session.with_default_session(
bigframes.session.Session.read_orc,
path,
engine=engine,
write_engine=write_engine,
)


read_orc.__doc__ = inspect.getdoc(bigframes.session.Session.read_orc)


def read_avro(
path: str | IO["bytes"],
*,
engine: str = "auto",
) -> bigframes.dataframe.DataFrame:
return global_session.with_default_session(
bigframes.session.Session.read_avro,
path,
engine=engine,
)


read_avro.__doc__ = inspect.getdoc(bigframes.session.Session.read_avro)


def read_gbq_function(
function_name: str,
is_row_processor: bool = False,
Expand Down
81 changes: 81 additions & 0 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1371,6 +1371,87 @@ def read_parquet(
)
return self._read_pandas(pandas_obj, write_engine=write_engine)

def read_orc(
self,
path: str | IO["bytes"],
*,
engine: str = "auto",
write_engine: constants.WriteEngineType = "default",
) -> dataframe.DataFrame:
"""Load an ORC file to a BigQuery DataFrames DataFrame.

Args:
path (str or IO):
The path or buffer to the ORC file. Can be a local path or Google Cloud Storage URI.
engine (str, default "auto"):
The engine used to read the file. Supported values: `auto`, `bigquery`, `pyarrow`.
write_engine (str, default "default"):
The write engine used to persist the data to BigQuery if needed.

Returns:
bigframes.dataframe.DataFrame:
A new DataFrame representing the data from the ORC file.
"""
bigframes.session.validation.validate_engine_compatibility(
engine=engine,
write_engine=write_engine,
)
if engine == "bigquery":
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.ORC
job_config.labels = {"bigframes-api": "read_orc"}
table_id = self._loader.load_file(path, job_config=job_config)
return self._loader.read_gbq_table(table_id)
elif engine in ("auto", "pyarrow"):
if isinstance(path, str) and "*" in path:
raise ValueError(
"The provided path contains a wildcard character (*), which is not "
"supported by the current engine. To read files from wildcard paths, "
"please use the 'bigquery' engine by setting `engine='bigquery'` in "
"your configuration."
)

read_orc_kwargs: Dict[str, Any] = {}
if not pandas.__version__.startswith("1."):
read_orc_kwargs["dtype_backend"] = "pyarrow"

pandas_obj = pandas.read_orc(path, **read_orc_kwargs)
return self._read_pandas(pandas_obj, write_engine=write_engine)
else:
raise ValueError(
f"Unsupported engine: {repr(engine)}. Supported values: 'auto', 'bigquery', 'pyarrow'."
)

def read_avro(
self,
path: str | IO["bytes"],
*,
engine: str = "auto",
) -> dataframe.DataFrame:
"""Load an Avro file to a BigQuery DataFrames DataFrame.

Args:
path (str or IO):
The path or buffer to the Avro file. Can be a local path or Google Cloud Storage URI.
engine (str, default "auto"):
The engine used to read the file. Only `bigquery` is supported for Avro.

Returns:
bigframes.dataframe.DataFrame:
A new DataFrame representing the data from the Avro file.
"""
if engine not in ("auto", "bigquery"):
raise ValueError(
f"Unsupported engine: {repr(engine)}. Supported values: 'auto', 'bigquery'."
)

job_config = bigquery.LoadJobConfig()
job_config.use_avro_logical_types = True
job_config.source_format = bigquery.SourceFormat.AVRO
job_config.labels = {"bigframes-api": "read_avro"}
table_id = self._loader.load_file(path, job_config=job_config)
return self._loader.read_gbq_table(table_id)

def read_json(
self,
path_or_buf: str | IO["bytes"],
Expand Down
Binary file added test_types.orc
Binary file not shown.
125 changes: 125 additions & 0 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1934,6 +1934,131 @@ def test_read_parquet_gcs(
bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out)


@pytest.mark.parametrize(
("engine", "filename"),
(
pytest.param(
"bigquery",
"000000000000.orc",
id="bigquery",
),
pytest.param(
"auto",
"000000000000.orc",
id="auto",
),
pytest.param(
"pyarrow",
"000000000000.orc",
id="pyarrow",
),
pytest.param(
"bigquery",
"*.orc",
id="bigquery_wildcard",
),
pytest.param(
"auto",
"*.orc",
id="auto_wildcard",
marks=pytest.mark.xfail(
raises=ValueError,
),
),
),
)
def test_read_orc_gcs(
session: bigframes.Session, scalars_dfs, gcs_folder, engine, filename
):
pytest.importorskip(
"pandas",
minversion="2.0.0",
reason="pandas<2 does not handle nullable int columns well",
)
scalars_df, _ = scalars_dfs
write_path = gcs_folder + test_read_orc_gcs.__name__ + "000000000000.orc"
read_path = gcs_folder + test_read_orc_gcs.__name__ + filename

df_in: bigframes.dataframe.DataFrame = scalars_df.copy()
df_in = df_in.drop(
columns=[
"geography_col",
"time_col",
"datetime_col",
"duration_col",
"timestamp_col",
]
)
df_write = df_in.reset_index(drop=False)
df_write.index.name = f"ordering_id_{random.randrange(1_000_000)}"
df_write.to_orc(write_path)

df_out = (
session.read_orc(read_path, engine=engine)
.set_index(df_write.index.name)
.sort_index()
.set_index(typing.cast(str, df_in.index.name))
)

assert df_out.size != 0
pd_df_in = df_in.to_pandas()
pd_df_out = df_out.to_pandas()
bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out)


@pytest.mark.parametrize(
("engine", "filename"),
(
pytest.param(
"bigquery",
"000000000000.avro",
id="bigquery",
),
pytest.param(
"bigquery",
"*.avro",
id="bigquery_wildcard",
),
),
)
def test_read_avro_gcs(
session: bigframes.Session, scalars_dfs, gcs_folder, engine, filename
):
scalars_df, _ = scalars_dfs
write_uri = gcs_folder + test_read_avro_gcs.__name__ + "*.avro"
read_uri = gcs_folder + test_read_avro_gcs.__name__ + filename

df_in: bigframes.dataframe.DataFrame = scalars_df.copy()
# datetime round-trips back as str in avro
df_in = df_in.drop(columns=["geography_col", "duration_col", "datetime_col"])
df_write = df_in.reset_index(drop=False)
index_name = f"ordering_id_{random.randrange(1_000_000)}"
df_write.index.name = index_name

# Create a BigQuery table
table_id = df_write.to_gbq()

# Extract to GCS as Avro
client = session.bqclient
extract_job_config = bigquery.ExtractJobConfig()
extract_job_config.destination_format = "AVRO"
extract_job_config.use_avro_logical_types = True

client.extract_table(table_id, write_uri, job_config=extract_job_config).result()

df_out = (
session.read_avro(read_uri, engine=engine)
.set_index(index_name)
.sort_index()
.set_index(typing.cast(str, df_in.index.name))
)

assert df_out.size != 0
pd_df_in = df_in.to_pandas()
pd_df_out = df_out.to_pandas()
bigframes.testing.utils.assert_frame_equal(pd_df_in, pd_df_out)


@pytest.mark.parametrize(
"compression",
[
Expand Down
Loading