Skip to content

Commit

Permalink
DASC-1168 (#608)
Browse files Browse the repository at this point in the history
* Add MaxRetries support to pyCarol BQ query method

* Docstring adjusts

---------

Co-authored-by: Guilherme Cesar <[email protected]>
  • Loading branch information
Guilherme Cesar dos Santos and Guilherme Cesar authored May 10, 2023
1 parent 191dc01 commit 16f9065
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 7 deletions.
4 changes: 1 addition & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ To install from source:
1. ``pip install -r requirements.txt`` to install the minimal requirements;
2. ``pip install -e . ".[dev]"`` to install the minimal requirements + dev libs;
3. ``pip install -e . ".[pipeline]"`` to install the minimal requirements + pipelines dependencies;
4. ``pip install -e . ".[dev]"`` to install the minimal requirements + dev libs;
5. ``pip install -e . ".[complete]"`` to install all dependencies;
6. etc;
4. ``pip install -e . ".[complete]"`` to install all dependencies;


Initializing pyCarol
Expand Down
23 changes: 19 additions & 4 deletions pycarol/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from google.cloud import bigquery, bigquery_storage, bigquery_storage_v1
from google.cloud.bigquery_storage import types
from google.oauth2.service_account import Credentials
from google.api_core import retry as retries

try:
import pandas
Expand Down Expand Up @@ -257,6 +258,8 @@ def query(
query: str,
dataset_id: T.Optional[str] = None,
return_dataframe: bool = True,
return_job_id: bool = False,
retry: retries.Retry = None
) -> T.Union["pandas.DataFrame", T.List[T.Dict[str, T.Any]]]:
"""Run query. This will generate a SA if necessary.
Expand All @@ -265,37 +268,49 @@ def query(
dataset_id: BigQuery dataset ID, if not provided, it will use the default
one.
return_dataframe: Return dataframe if True.
return_job_id : If True, returns an tuple containing the query results with the job-id on BigQuery platform.
retry: Custom google.api_core.retry.Retry object to adjust Google`s BigQuery API
calls, to custom timeout and exceptions to retry.
Returns:
Query result.
Usage:
.. code:: python
from pycarol import BQ, Carol
bq = BQ(Carol())
query = 'select * from invoice limit 10'
#Normal use
df = bq.query(query, return_dataframe=True)
#Custom retry object
from google.api_core.retry import Retry
df = bq.query(query, return_dataframe=True, retry=Retry(initial=2, multiplier=2, maximum=60, timeout=200))
#Getting BigQuery`s Job-id (Util for debugging in platform)
df, job_id_string = bq.query(query, return_dataframe=True, return_job_id=True)
"""
service_account = self._token_manager.get_token().service_account
client = self._generate_client(service_account)

dataset_id = dataset_id or self._dataset_id
job_config = bigquery.QueryJobConfig(default_dataset=dataset_id)
results_job = client.query(query, job_config=job_config)
results_job = client.query(query, retry=retry, job_config=job_config) if retry else client.query(query, job_config=job_config)

results = [dict(row) for row in results_job]

if return_dataframe is False:
return results
return results if not return_job_id else (results, results_job.job_id)

if "pandas" not in sys.modules and return_dataframe is True:
raise exceptions.PandasNotFoundException

return pandas.DataFrame(results)
return pandas.DataFrame(results) if not return_job_id else (pandas.DataFrame(results), results_job.job_id)


class BQStorage:
Expand Down

0 comments on commit 16f9065

Please sign in to comment.