From 14cc9b7e8270e29535226f74aa7f139f7b481cdf Mon Sep 17 00:00:00 2001 From: darolt Date: Tue, 26 Sep 2023 10:41:54 -0300 Subject: [PATCH] Release/2.55.0 (#622) Co-authored-by: Vinicius Chagas --- README.rst | 14 ++++++- pycarol/bigquery.py | 75 ++++++++++++++++++++++++++++++------- pycarol/carol.py | 23 ++++++++---- pycarol/schema_generator.py | 4 ++ test/test_bigquery.py | 30 +++++++++++++-- 5 files changed, 119 insertions(+), 27 deletions(-) diff --git a/README.rst b/README.rst index 469c0c7d..7a1331ac 100644 --- a/README.rst +++ b/README.rst @@ -176,6 +176,10 @@ used: service_account = tm.get_token().service_account +After each execution of ``BQ.query``, the ``BQ`` object will have an attribute called +``job``. This attribute is of type ``bigquery.job.query.QueryJob`` and may be useful for +monitoring/debug jobs. + PyCarol provides access to BigQuery Storage API also. It allows for much faster reading times, but with limited querying capabilities. For instance, only tables are readable, so 'ingestion_stg_model_deep_audit' is ok, but 'stg_model_deep_audit' is not (it is a @@ -188,7 +192,15 @@ view). bq = BQStorage(Carol()) table_name = "ingestion_stg_model_deep_audit" col_names = ["request_id", "version"] - df = bq.query(table_name, col_names, return_dataframe=True) + restriction = "branch = '01'" + sample_size = 1000 + df = bq.query( + table_name, + col_names, + row_restriction=restriction, + sample_percentage=sample_size, + return_dataframe=True + ) From Data Models (RT Layer): Filter queries diff --git a/pycarol/bigquery.py b/pycarol/bigquery.py index 80e6dd44..b31d5bec 100644 --- a/pycarol/bigquery.py +++ b/pycarol/bigquery.py @@ -5,6 +5,7 @@ from pathlib import Path import sys import typing as T +import os from google.cloud import bigquery, bigquery_storage, bigquery_storage_v1 from google.cloud.bigquery_storage import types @@ -242,9 +243,11 @@ def __init__( cache_cds: bool = True, ): self._env = carol.get_current() + self._env["app_name"] = carol.app_name self._project_id = f"carol-{self._env['env_id'][0:20]}" self._dataset_id = f"{self._project_id}.{self._env['env_id']}" self._token_manager = TokenManager(carol, service_account, cache_cds) + self.job: T.Optional[bigquery.job.query.QueryJob] = None @staticmethod def _generate_client(service_account: T.Dict) -> bigquery.Client: @@ -253,13 +256,27 @@ def _generate_client(service_account: T.Dict) -> bigquery.Client: client = bigquery.Client(project=project, credentials=credentials) return client + def _build_query_job_labels(self) -> T.Dict[str, str]: + labels_to_check = { + "tenant_id": self._env.get("env_id", ""), + "tenant_name": self._env.get("env_name", ""), + "organization_id": self._env.get("org_id", ""), + "organization_name": self._env.get("org_name", ""), + "job_type": "sync", + "source": "py_carol", + "task_id": os.environ.get("LONGTAKSID", ""), + "carol_app_name": self._env.get("app_name", ""), + "carol_app_process_name": os.environ.get("ALGORITHM_NAME", ""), + } + return {k: v for k, v in labels_to_check.items() if v.strip() != ""} + def query( self, query: str, dataset_id: T.Optional[str] = None, return_dataframe: bool = True, return_job_id: bool = False, - retry: retries.Retry = None + retry: T.Optional[retries.Retry] = None, ) -> T.Union["pandas.DataFrame", T.List[T.Dict[str, T.Any]]]: """Run query. This will generate a SA if necessary. @@ -268,9 +285,10 @@ def query( dataset_id: BigQuery dataset ID, if not provided, it will use the default one. return_dataframe: Return dataframe if True. - return_job_id : If True, returns an tuple containing the query results with the job-id on BigQuery platform. - retry: Custom google.api_core.retry.Retry object to adjust Google`s BigQuery API - calls, to custom timeout and exceptions to retry. + return_job_id : If True, returns an tuple containing the query results with + the job-id on BigQuery platform. + retry: Custom google.api_core.retry.Retry object to adjust Google`s BigQuery + API calls, to custom timeout and exceptions to retry. Returns: Query result. @@ -283,24 +301,33 @@ def query( bq = BQ(Carol()) query = 'select * from invoice limit 10' - + #Normal use df = bq.query(query, return_dataframe=True) #Custom retry object from google.api_core.retry import Retry - df = bq.query(query, return_dataframe=True, retry=Retry(initial=2, multiplier=2, maximum=60, timeout=200)) + df = bq.query(query, return_dataframe=True, retry=Retry( + initial=2, multiplier=2, maximum=60, timeout=200) + ) #Getting BigQuery`s Job-id (Util for debugging in platform) - df, job_id_string = bq.query(query, return_dataframe=True, return_job_id=True) + df, job_id_string = bq.query( + query, return_dataframe=True, return_job_id=True + ) """ service_account = self._token_manager.get_token().service_account client = self._generate_client(service_account) dataset_id = dataset_id or self._dataset_id - job_config = bigquery.QueryJobConfig(default_dataset=dataset_id) - results_job = client.query(query, retry=retry, job_config=job_config) if retry else client.query(query, job_config=job_config) + labels = self._build_query_job_labels() + job_config = bigquery.QueryJobConfig(default_dataset=dataset_id, labels=labels) + if retry is not None: + results_job = client.query(query, retry=retry, job_config=job_config) + else: + results_job = client.query(query, job_config=job_config) + self.job = results_job results = [dict(row) for row in results_job] @@ -310,7 +337,10 @@ def query( if "pandas" not in sys.modules and return_dataframe is True: raise exceptions.PandasNotFoundException - return pandas.DataFrame(results) if not return_job_id else (pandas.DataFrame(results), results_job.job_id) + if return_job_id: + return (pandas.DataFrame(results), results_job.job_id) + + return pandas.DataFrame(results) class BQStorage: @@ -344,11 +374,15 @@ def _get_read_session( client: bigquery_storage.BigQueryReadClient, table_name: str, columns_names: T.Optional[T.List[str]] = None, + row_restriction: T.Optional[str] = None, + sample_percentage: T.Optional[float] = None, ) -> bigquery_storage_v1.types.ReadSession: read_options = None if columns_names is not None: read_options = types.ReadSession.TableReadOptions( # type:ignore # noqa:E501 pylint:disable=no-member - selected_fields=columns_names + selected_fields=columns_names, + row_restriction=row_restriction, + sample_percentage=sample_percentage, ) table_path = f"projects/{self._project_id}/datasets/{self._dataset_id}/tables/{table_name}" # noqa:E501 @@ -361,7 +395,7 @@ def _get_read_session( read_session = client.create_read_session( parent=parent, read_session=requested_session, - max_stream_count=4, + max_stream_count=1, ) return read_session @@ -370,6 +404,8 @@ def query( table_name: str, columns_names: T.Optional[T.List[str]] = None, return_dataframe: bool = True, + row_restriction: T.Optional[str] = None, + sample_percentage: T.Optional[float] = None, ) -> T.Union["pandas.DataFrame", T.List[bigquery_storage_v1.reader.ReadRowsPage]]: """Read from BigQuery Storage API. @@ -377,6 +413,8 @@ def query( table_name: name of the table (views are not supported). columns_names: names of columns to return. return_dataframe: if True, return a pandas DataFrame. + row_restriction: SQL WHERE clause. Limited to BQ Storage API. + sample_percentage: percentage of rows to return. Returns: Query result. @@ -391,11 +429,20 @@ def query( bq = BQStorage(Carol()) table_name = "ingestion_stg_model_deep_audit" col_names = ["request_id", "version"] - df = bq.query(table_name, col_names, return_dataframe=True) + filter = "branch = '01'" + df = bq.query( + table_name, column_names=col_names, row_restriction=filter, + ) """ service_account = self._token_manager.get_token().service_account client = self._generate_client(service_account) - read_session = self._get_read_session(client, table_name, columns_names) + read_session = self._get_read_session( + client, + table_name, + columns_names, + row_restriction, + sample_percentage, + ) stream = read_session.streams[0] reader = client.read_rows(stream.name) diff --git a/pycarol/carol.py b/pycarol/carol.py index 094d26fa..5f1c4e40 100644 --- a/pycarol/carol.py +++ b/pycarol/carol.py @@ -3,6 +3,7 @@ import json import os import typing as T +import urllib3 import warnings from dotenv import load_dotenv @@ -676,14 +677,20 @@ def _retry_session( session """ session = session or requests.Session() - retry = Retry( - total=retries, - read=retries, - connect=retries, - backoff_factor=backoff_factor, - status_forcelist=status_forcelist, - method_whitelist=method_whitelist, - ) + kwargs = { + "total": retries, + "read": retries, + "connect": retries, + "backoff_factor": backoff_factor, + "status_forcelist": status_forcelist, + } + + if urllib3.__version__ >= "1.26.0": + kwargs["allowed_methods"] = method_whitelist + else: + kwargs["method_whitelist"] = method_whitelist + + retry = Retry(**kwargs) adapter = requests.adapters.HTTPAdapter(max_retries=retry) session.mount("http://", adapter) session.mount("https://", adapter) diff --git a/pycarol/schema_generator.py b/pycarol/schema_generator.py index 26038306..afd0675b 100644 --- a/pycarol/schema_generator.py +++ b/pycarol/schema_generator.py @@ -42,6 +42,10 @@ def get_schema_type_for(cls, t): """docstring for get_schema_type_for""" + if np.__version__.startswith("1.2"): + np.int = np.int64 + np.float = np.float64 + SCHEMA_TYPES = { type(None): NullType, str: StringType, diff --git a/test/test_bigquery.py b/test/test_bigquery.py index 9a8e145d..4898cbac 100644 --- a/test/test_bigquery.py +++ b/test/test_bigquery.py @@ -134,12 +134,34 @@ def test_bq_init(manager_mock) -> None: """Test the initialization of the BQ class in the pycarol.bigquery module.""" bq_mock = mock.MagicMock() carol_mock = mock.MagicMock() - carol_mock.get_current.return_value = {"env_id": "5"} + carol_mock.app_name = " " + carol_mock.get_current.return_value = { + "env_name": "env_name", + "env_id": "env_id", + "org_name": "org_name", + "org_id": "org_id", + "org_level": False, + } pycarol.bigquery.BQ.__init__(bq_mock, carol_mock) - assert bq_mock._env == {"env_id": "5"} - assert bq_mock._project_id == "carol-5" - assert bq_mock._dataset_id == "carol-5.5" + assert bq_mock._env == { + "env_name": "env_name", + "env_id": "env_id", + "org_name": "org_name", + "org_id": "org_id", + "org_level": False, + "app_name": " ", + } + assert bq_mock._project_id == "carol-env_id" + assert bq_mock._dataset_id == "carol-env_id.env_id" assert bq_mock._token_manager == manager_mock.return_value + assert pycarol.bigquery.BQ._build_query_job_labels(bq_mock) == { + "tenant_id": "env_id", + "tenant_name": "env_name", + "organization_id": "org_id", + "organization_name": "org_name", + "job_type": "sync", + "source": "py_carol", + } @mock.patch("pycarol.bigquery.Credentials")