Skip to content

Commit

Permalink
Release/2.55.0 (#622)
Browse files Browse the repository at this point in the history
Co-authored-by: Vinicius Chagas <[email protected]>
  • Loading branch information
darolt and chagasVinicius authored Sep 26, 2023
1 parent 16f9065 commit 14cc9b7
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 27 deletions.
14 changes: 13 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ used:
service_account = tm.get_token().service_account
After each execution of ``BQ.query``, the ``BQ`` object will have an attribute called
``job``. This attribute is of type ``bigquery.job.query.QueryJob`` and may be useful for
monitoring/debug jobs.

PyCarol provides access to BigQuery Storage API also. It allows for much faster reading
times, but with limited querying capabilities. For instance, only tables are readable,
so 'ingestion_stg_model_deep_audit' is ok, but 'stg_model_deep_audit' is not (it is a
Expand All @@ -188,7 +192,15 @@ view).
bq = BQStorage(Carol())
table_name = "ingestion_stg_model_deep_audit"
col_names = ["request_id", "version"]
df = bq.query(table_name, col_names, return_dataframe=True)
restriction = "branch = '01'"
sample_size = 1000
df = bq.query(
table_name,
col_names,
row_restriction=restriction,
sample_percentage=sample_size,
return_dataframe=True
)
From Data Models (RT Layer): Filter queries
Expand Down
75 changes: 61 additions & 14 deletions pycarol/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
import sys
import typing as T
import os

from google.cloud import bigquery, bigquery_storage, bigquery_storage_v1
from google.cloud.bigquery_storage import types
Expand Down Expand Up @@ -242,9 +243,11 @@ def __init__(
cache_cds: bool = True,
):
self._env = carol.get_current()
self._env["app_name"] = carol.app_name
self._project_id = f"carol-{self._env['env_id'][0:20]}"
self._dataset_id = f"{self._project_id}.{self._env['env_id']}"
self._token_manager = TokenManager(carol, service_account, cache_cds)
self.job: T.Optional[bigquery.job.query.QueryJob] = None

@staticmethod
def _generate_client(service_account: T.Dict) -> bigquery.Client:
Expand All @@ -253,13 +256,27 @@ def _generate_client(service_account: T.Dict) -> bigquery.Client:
client = bigquery.Client(project=project, credentials=credentials)
return client

def _build_query_job_labels(self) -> T.Dict[str, str]:
labels_to_check = {
"tenant_id": self._env.get("env_id", ""),
"tenant_name": self._env.get("env_name", ""),
"organization_id": self._env.get("org_id", ""),
"organization_name": self._env.get("org_name", ""),
"job_type": "sync",
"source": "py_carol",
"task_id": os.environ.get("LONGTAKSID", ""),
"carol_app_name": self._env.get("app_name", ""),
"carol_app_process_name": os.environ.get("ALGORITHM_NAME", ""),
}
return {k: v for k, v in labels_to_check.items() if v.strip() != ""}

def query(
self,
query: str,
dataset_id: T.Optional[str] = None,
return_dataframe: bool = True,
return_job_id: bool = False,
retry: retries.Retry = None
retry: T.Optional[retries.Retry] = None,
) -> T.Union["pandas.DataFrame", T.List[T.Dict[str, T.Any]]]:
"""Run query. This will generate a SA if necessary.
Expand All @@ -268,9 +285,10 @@ def query(
dataset_id: BigQuery dataset ID, if not provided, it will use the default
one.
return_dataframe: Return dataframe if True.
return_job_id : If True, returns an tuple containing the query results with the job-id on BigQuery platform.
retry: Custom google.api_core.retry.Retry object to adjust Google`s BigQuery API
calls, to custom timeout and exceptions to retry.
return_job_id : If True, returns an tuple containing the query results with
the job-id on BigQuery platform.
retry: Custom google.api_core.retry.Retry object to adjust Google`s BigQuery
API calls, to custom timeout and exceptions to retry.
Returns:
Query result.
Expand All @@ -283,24 +301,33 @@ def query(
bq = BQ(Carol())
query = 'select * from invoice limit 10'
#Normal use
df = bq.query(query, return_dataframe=True)
#Custom retry object
from google.api_core.retry import Retry
df = bq.query(query, return_dataframe=True, retry=Retry(initial=2, multiplier=2, maximum=60, timeout=200))
df = bq.query(query, return_dataframe=True, retry=Retry(
initial=2, multiplier=2, maximum=60, timeout=200)
)
#Getting BigQuery`s Job-id (Util for debugging in platform)
df, job_id_string = bq.query(query, return_dataframe=True, return_job_id=True)
df, job_id_string = bq.query(
query, return_dataframe=True, return_job_id=True
)
"""
service_account = self._token_manager.get_token().service_account
client = self._generate_client(service_account)

dataset_id = dataset_id or self._dataset_id
job_config = bigquery.QueryJobConfig(default_dataset=dataset_id)
results_job = client.query(query, retry=retry, job_config=job_config) if retry else client.query(query, job_config=job_config)
labels = self._build_query_job_labels()
job_config = bigquery.QueryJobConfig(default_dataset=dataset_id, labels=labels)
if retry is not None:
results_job = client.query(query, retry=retry, job_config=job_config)
else:
results_job = client.query(query, job_config=job_config)
self.job = results_job

results = [dict(row) for row in results_job]

Expand All @@ -310,7 +337,10 @@ def query(
if "pandas" not in sys.modules and return_dataframe is True:
raise exceptions.PandasNotFoundException

return pandas.DataFrame(results) if not return_job_id else (pandas.DataFrame(results), results_job.job_id)
if return_job_id:
return (pandas.DataFrame(results), results_job.job_id)

return pandas.DataFrame(results)


class BQStorage:
Expand Down Expand Up @@ -344,11 +374,15 @@ def _get_read_session(
client: bigquery_storage.BigQueryReadClient,
table_name: str,
columns_names: T.Optional[T.List[str]] = None,
row_restriction: T.Optional[str] = None,
sample_percentage: T.Optional[float] = None,
) -> bigquery_storage_v1.types.ReadSession:
read_options = None
if columns_names is not None:
read_options = types.ReadSession.TableReadOptions( # type:ignore # noqa:E501 pylint:disable=no-member
selected_fields=columns_names
selected_fields=columns_names,
row_restriction=row_restriction,
sample_percentage=sample_percentage,
)

table_path = f"projects/{self._project_id}/datasets/{self._dataset_id}/tables/{table_name}" # noqa:E501
Expand All @@ -361,7 +395,7 @@ def _get_read_session(
read_session = client.create_read_session(
parent=parent,
read_session=requested_session,
max_stream_count=4,
max_stream_count=1,
)
return read_session

Expand All @@ -370,13 +404,17 @@ def query(
table_name: str,
columns_names: T.Optional[T.List[str]] = None,
return_dataframe: bool = True,
row_restriction: T.Optional[str] = None,
sample_percentage: T.Optional[float] = None,
) -> T.Union["pandas.DataFrame", T.List[bigquery_storage_v1.reader.ReadRowsPage]]:
"""Read from BigQuery Storage API.
Args:
table_name: name of the table (views are not supported).
columns_names: names of columns to return.
return_dataframe: if True, return a pandas DataFrame.
row_restriction: SQL WHERE clause. Limited to BQ Storage API.
sample_percentage: percentage of rows to return.
Returns:
Query result.
Expand All @@ -391,11 +429,20 @@ def query(
bq = BQStorage(Carol())
table_name = "ingestion_stg_model_deep_audit"
col_names = ["request_id", "version"]
df = bq.query(table_name, col_names, return_dataframe=True)
filter = "branch = '01'"
df = bq.query(
table_name, column_names=col_names, row_restriction=filter,
)
"""
service_account = self._token_manager.get_token().service_account
client = self._generate_client(service_account)
read_session = self._get_read_session(client, table_name, columns_names)
read_session = self._get_read_session(
client,
table_name,
columns_names,
row_restriction,
sample_percentage,
)

stream = read_session.streams[0]
reader = client.read_rows(stream.name)
Expand Down
23 changes: 15 additions & 8 deletions pycarol/carol.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import os
import typing as T
import urllib3
import warnings

from dotenv import load_dotenv
Expand Down Expand Up @@ -676,14 +677,20 @@ def _retry_session(
session
"""
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
method_whitelist=method_whitelist,
)
kwargs = {
"total": retries,
"read": retries,
"connect": retries,
"backoff_factor": backoff_factor,
"status_forcelist": status_forcelist,
}

if urllib3.__version__ >= "1.26.0":
kwargs["allowed_methods"] = method_whitelist
else:
kwargs["method_whitelist"] = method_whitelist

retry = Retry(**kwargs)
adapter = requests.adapters.HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
Expand Down
4 changes: 4 additions & 0 deletions pycarol/schema_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def get_schema_type_for(cls, t):

"""docstring for get_schema_type_for"""

if np.__version__.startswith("1.2"):
np.int = np.int64
np.float = np.float64

SCHEMA_TYPES = {
type(None): NullType,
str: StringType,
Expand Down
30 changes: 26 additions & 4 deletions test/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,34 @@ def test_bq_init(manager_mock) -> None:
"""Test the initialization of the BQ class in the pycarol.bigquery module."""
bq_mock = mock.MagicMock()
carol_mock = mock.MagicMock()
carol_mock.get_current.return_value = {"env_id": "5"}
carol_mock.app_name = " "
carol_mock.get_current.return_value = {
"env_name": "env_name",
"env_id": "env_id",
"org_name": "org_name",
"org_id": "org_id",
"org_level": False,
}
pycarol.bigquery.BQ.__init__(bq_mock, carol_mock)
assert bq_mock._env == {"env_id": "5"}
assert bq_mock._project_id == "carol-5"
assert bq_mock._dataset_id == "carol-5.5"
assert bq_mock._env == {
"env_name": "env_name",
"env_id": "env_id",
"org_name": "org_name",
"org_id": "org_id",
"org_level": False,
"app_name": " ",
}
assert bq_mock._project_id == "carol-env_id"
assert bq_mock._dataset_id == "carol-env_id.env_id"
assert bq_mock._token_manager == manager_mock.return_value
assert pycarol.bigquery.BQ._build_query_job_labels(bq_mock) == {
"tenant_id": "env_id",
"tenant_name": "env_name",
"organization_id": "org_id",
"organization_name": "org_name",
"job_type": "sync",
"source": "py_carol",
}


@mock.patch("pycarol.bigquery.Credentials")
Expand Down

0 comments on commit 14cc9b7

Please sign in to comment.