Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/2.55.0 #622

Merged
merged 7 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ used:
service_account = tm.get_token().service_account


After each execution of ``BQ.query``, the ``BQ`` object will have an attribute called
``job``. This attribute is of type ``bigquery.job.query.QueryJob`` and may be useful for
monitoring/debug jobs.

PyCarol provides access to BigQuery Storage API also. It allows for much faster reading
times, but with limited querying capabilities. For instance, only tables are readable,
so 'ingestion_stg_model_deep_audit' is ok, but 'stg_model_deep_audit' is not (it is a
Expand All @@ -188,7 +192,15 @@ view).
bq = BQStorage(Carol())
table_name = "ingestion_stg_model_deep_audit"
col_names = ["request_id", "version"]
df = bq.query(table_name, col_names, return_dataframe=True)
restriction = "branch = '01'"
sample_size = 1000
df = bq.query(
table_name,
col_names,
row_restriction=restriction,
sample_percentage=sample_size,
return_dataframe=True
)


From Data Models (RT Layer): Filter queries
Expand Down
75 changes: 61 additions & 14 deletions pycarol/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
import sys
import typing as T
import os

from google.cloud import bigquery, bigquery_storage, bigquery_storage_v1
from google.cloud.bigquery_storage import types
Expand Down Expand Up @@ -242,9 +243,11 @@ def __init__(
cache_cds: bool = True,
):
self._env = carol.get_current()
self._env["app_name"] = carol.app_name
self._project_id = f"carol-{self._env['env_id'][0:20]}"
self._dataset_id = f"{self._project_id}.{self._env['env_id']}"
self._token_manager = TokenManager(carol, service_account, cache_cds)
self.job: T.Optional[bigquery.job.query.QueryJob] = None

@staticmethod
def _generate_client(service_account: T.Dict) -> bigquery.Client:
Expand All @@ -253,13 +256,27 @@ def _generate_client(service_account: T.Dict) -> bigquery.Client:
client = bigquery.Client(project=project, credentials=credentials)
return client

def _build_query_job_labels(self) -> T.Dict[str, str]:
labels_to_check = {
"tenant_id": self._env.get("env_id", ""),
"tenant_name": self._env.get("env_name", ""),
"organization_id": self._env.get("org_id", ""),
"organization_name": self._env.get("org_name", ""),
"job_type": "sync",
"source": "py_carol",
"task_id": os.environ.get("LONGTAKSID", ""),
"carol_app_name": self._env.get("app_name", ""),
"carol_app_process_name": os.environ.get("ALGORITHM_NAME", ""),
}
return {k: v for k, v in labels_to_check.items() if v.strip() != ""}

def query(
self,
query: str,
dataset_id: T.Optional[str] = None,
return_dataframe: bool = True,
return_job_id: bool = False,
retry: retries.Retry = None
retry: T.Optional[retries.Retry] = None,
) -> T.Union["pandas.DataFrame", T.List[T.Dict[str, T.Any]]]:
"""Run query. This will generate a SA if necessary.

Expand All @@ -268,9 +285,10 @@ def query(
dataset_id: BigQuery dataset ID, if not provided, it will use the default
one.
return_dataframe: Return dataframe if True.
return_job_id : If True, returns an tuple containing the query results with the job-id on BigQuery platform.
retry: Custom google.api_core.retry.Retry object to adjust Google`s BigQuery API
calls, to custom timeout and exceptions to retry.
return_job_id : If True, returns an tuple containing the query results with
the job-id on BigQuery platform.
retry: Custom google.api_core.retry.Retry object to adjust Google`s BigQuery
API calls, to custom timeout and exceptions to retry.

Returns:
Query result.
Expand All @@ -283,24 +301,33 @@ def query(

bq = BQ(Carol())
query = 'select * from invoice limit 10'

#Normal use
df = bq.query(query, return_dataframe=True)

#Custom retry object
from google.api_core.retry import Retry
df = bq.query(query, return_dataframe=True, retry=Retry(initial=2, multiplier=2, maximum=60, timeout=200))
df = bq.query(query, return_dataframe=True, retry=Retry(
initial=2, multiplier=2, maximum=60, timeout=200)
)

#Getting BigQuery`s Job-id (Util for debugging in platform)
df, job_id_string = bq.query(query, return_dataframe=True, return_job_id=True)
df, job_id_string = bq.query(
query, return_dataframe=True, return_job_id=True
)

"""
service_account = self._token_manager.get_token().service_account
client = self._generate_client(service_account)

dataset_id = dataset_id or self._dataset_id
job_config = bigquery.QueryJobConfig(default_dataset=dataset_id)
results_job = client.query(query, retry=retry, job_config=job_config) if retry else client.query(query, job_config=job_config)
labels = self._build_query_job_labels()
job_config = bigquery.QueryJobConfig(default_dataset=dataset_id, labels=labels)
if retry is not None:
results_job = client.query(query, retry=retry, job_config=job_config)
else:
results_job = client.query(query, job_config=job_config)
self.job = results_job

results = [dict(row) for row in results_job]

Expand All @@ -310,7 +337,10 @@ def query(
if "pandas" not in sys.modules and return_dataframe is True:
raise exceptions.PandasNotFoundException

return pandas.DataFrame(results) if not return_job_id else (pandas.DataFrame(results), results_job.job_id)
if return_job_id:
return (pandas.DataFrame(results), results_job.job_id)

return pandas.DataFrame(results)


class BQStorage:
Expand Down Expand Up @@ -344,11 +374,15 @@ def _get_read_session(
client: bigquery_storage.BigQueryReadClient,
table_name: str,
columns_names: T.Optional[T.List[str]] = None,
row_restriction: T.Optional[str] = None,
sample_percentage: T.Optional[float] = None,
) -> bigquery_storage_v1.types.ReadSession:
read_options = None
if columns_names is not None:
read_options = types.ReadSession.TableReadOptions( # type:ignore # noqa:E501 pylint:disable=no-member
selected_fields=columns_names
selected_fields=columns_names,
row_restriction=row_restriction,
sample_percentage=sample_percentage,
)

table_path = f"projects/{self._project_id}/datasets/{self._dataset_id}/tables/{table_name}" # noqa:E501
Expand All @@ -361,7 +395,7 @@ def _get_read_session(
read_session = client.create_read_session(
parent=parent,
read_session=requested_session,
max_stream_count=4,
max_stream_count=1,
)
return read_session

Expand All @@ -370,13 +404,17 @@ def query(
table_name: str,
columns_names: T.Optional[T.List[str]] = None,
return_dataframe: bool = True,
row_restriction: T.Optional[str] = None,
sample_percentage: T.Optional[float] = None,
) -> T.Union["pandas.DataFrame", T.List[bigquery_storage_v1.reader.ReadRowsPage]]:
"""Read from BigQuery Storage API.

Args:
table_name: name of the table (views are not supported).
columns_names: names of columns to return.
return_dataframe: if True, return a pandas DataFrame.
row_restriction: SQL WHERE clause. Limited to BQ Storage API.
sample_percentage: percentage of rows to return.

Returns:
Query result.
Expand All @@ -391,11 +429,20 @@ def query(
bq = BQStorage(Carol())
table_name = "ingestion_stg_model_deep_audit"
col_names = ["request_id", "version"]
df = bq.query(table_name, col_names, return_dataframe=True)
filter = "branch = '01'"
df = bq.query(
table_name, column_names=col_names, row_restriction=filter,
)
"""
service_account = self._token_manager.get_token().service_account
client = self._generate_client(service_account)
read_session = self._get_read_session(client, table_name, columns_names)
read_session = self._get_read_session(
client,
table_name,
columns_names,
row_restriction,
sample_percentage,
)

stream = read_session.streams[0]
reader = client.read_rows(stream.name)
Expand Down
23 changes: 15 additions & 8 deletions pycarol/carol.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import os
import typing as T
import urllib3
import warnings

from dotenv import load_dotenv
Expand Down Expand Up @@ -676,14 +677,20 @@ def _retry_session(
session
"""
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
method_whitelist=method_whitelist,
)
kwargs = {
"total": retries,
"read": retries,
"connect": retries,
"backoff_factor": backoff_factor,
"status_forcelist": status_forcelist,
}

if urllib3.__version__ >= "1.26.0":
kwargs["allowed_methods"] = method_whitelist
else:
kwargs["method_whitelist"] = method_whitelist

retry = Retry(**kwargs)
adapter = requests.adapters.HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
Expand Down
4 changes: 4 additions & 0 deletions pycarol/schema_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def get_schema_type_for(cls, t):

"""docstring for get_schema_type_for"""

if np.__version__.startswith("1.2"):
np.int = np.int64
np.float = np.float64

SCHEMA_TYPES = {
type(None): NullType,
str: StringType,
Expand Down
30 changes: 26 additions & 4 deletions test/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,34 @@ def test_bq_init(manager_mock) -> None:
"""Test the initialization of the BQ class in the pycarol.bigquery module."""
bq_mock = mock.MagicMock()
carol_mock = mock.MagicMock()
carol_mock.get_current.return_value = {"env_id": "5"}
carol_mock.app_name = " "
carol_mock.get_current.return_value = {
"env_name": "env_name",
"env_id": "env_id",
"org_name": "org_name",
"org_id": "org_id",
"org_level": False,
}
pycarol.bigquery.BQ.__init__(bq_mock, carol_mock)
assert bq_mock._env == {"env_id": "5"}
assert bq_mock._project_id == "carol-5"
assert bq_mock._dataset_id == "carol-5.5"
assert bq_mock._env == {
"env_name": "env_name",
"env_id": "env_id",
"org_name": "org_name",
"org_id": "org_id",
"org_level": False,
"app_name": " ",
}
assert bq_mock._project_id == "carol-env_id"
assert bq_mock._dataset_id == "carol-env_id.env_id"
assert bq_mock._token_manager == manager_mock.return_value
assert pycarol.bigquery.BQ._build_query_job_labels(bq_mock) == {
"tenant_id": "env_id",
"tenant_name": "env_name",
"organization_id": "org_id",
"organization_name": "org_name",
"job_type": "sync",
"source": "py_carol",
}


@mock.patch("pycarol.bigquery.Credentials")
Expand Down
Loading