Skip to content

Commit 4686d8f

Browse files
committed
Query level api
1 parent 72d4729 commit 4686d8f

File tree

3 files changed

+60
-2
lines changed

3 files changed

+60
-2
lines changed

src/pyarrow/bigquery/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
from .read import reader, read_table # noqa
1+
from .read import reader, read_table, reader_query, read_query # noqa
22
from .write import writer, write_table # noqa

src/pyarrow/bigquery/read.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import shutil
88

99
from google.cloud import bigquery_storage
10+
from google.cloud import bigquery
1011

1112
import pyarrow as pa
1213
import pyarrow.feather as fa
@@ -154,6 +155,28 @@ def reader(
154155
logger.debug(f"Time taken to read: {time.time()-t0:.2f}")
155156

156157

158+
def reader_query(
159+
project: str,
160+
query: str,
161+
*,
162+
worker_count: int = multiprocessing.cpu_count(),
163+
worker_type: type[threading.Thread] | type[multiprocessing.Process] = threading.Thread,
164+
batch_size: int = 100,
165+
):
166+
client = bigquery.Client(project=project)
167+
job = client.query(query)
168+
job.result()
169+
170+
source = f"{job.destination.project}.{job.destination.dataset_id}.{job.destination.table_id}"
171+
return reader(
172+
source=source,
173+
project=project,
174+
worker_count=worker_count,
175+
worker_type=worker_type,
176+
batch_size=batch_size,
177+
)
178+
179+
157180
def read_table(
158181
source: str,
159182
*,
@@ -175,3 +198,21 @@ def read_table(
175198
batch_size=batch_size,
176199
)
177200
)
201+
202+
def read_query(
203+
project: str,
204+
query: str,
205+
*,
206+
worker_count: int = multiprocessing.cpu_count(),
207+
worker_type: type[threading.Thread] | type[multiprocessing.Process] = threading.Thread,
208+
batch_size: int = 100,
209+
):
210+
return pa.concat_tables(
211+
reader_query(
212+
project=project,
213+
query=query,
214+
worker_count=worker_count,
215+
worker_type=worker_type,
216+
batch_size=batch_size
217+
)
218+
)

tests/integration/test_upload.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
import pyarrow.bigquery as bq
77

88

9-
LOCATION = f"{os.environ['GCP_PROJECT']}.test.{uuid.uuid4()}"
9+
PROJECT = os.environ['GCP_PROJECT']
10+
LOCATION = f"{PROJECT}.test.{uuid.uuid4()}"
1011

1112

1213
@pytest.fixture(autouse=True)
@@ -29,6 +30,22 @@ def test_simple():
2930
assert table_back.sort_by("test").equals(table.sort_by("test"))
3031

3132

33+
def test_reader_query():
34+
table = pa.Table.from_arrays([[1, 2, 3, 4]], names=["test"])
35+
36+
bq.write_table(table, LOCATION, table_create=True)
37+
38+
query = f'SELECT * FROM `{LOCATION}`'
39+
table_back1 = pa.concat_tables([t for t in bq.reader_query(project=PROJECT, query=query)])
40+
41+
table_back2 = bq.read_query(project=PROJECT, query=query)
42+
43+
assert table_back1.schema == table_back2.schema == table.schema
44+
45+
assert table_back1.sort_by("test").equals(table.sort_by("test"))
46+
assert table_back2.sort_by("test").equals(table.sort_by("test"))
47+
48+
3249
def test_context():
3350
table = pa.Table.from_arrays([[1, 2, 3, 4]], names=["test"])
3451

0 commit comments

Comments
 (0)