Skip to content

Commit

Permalink
fixing location for the tabular workflow bq jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
Carlos Timoteo committed Oct 7, 2023
1 parent c0a5a84 commit 7c05ac6
Show file tree
Hide file tree
Showing 7 changed files with 963 additions and 235 deletions.
65 changes: 39 additions & 26 deletions python/pipelines/components/bigquery/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def bq_stored_procedure_exec(

query_job = client.query(
query=query,
location=location,
job_config=job_config)

query_job.result(timeout=timeout)
Expand Down Expand Up @@ -129,7 +130,9 @@ def bq_clustering_exec(
)

query_job = client.query(
query=query)
query=query,
location=location
)

r = query_job.result()

Expand Down Expand Up @@ -161,7 +164,9 @@ def bq_evaluate(
)

query_job = client.query(
query=query)
query=query,
location=location
)

r = query_job.result()
r = list(r)
Expand Down Expand Up @@ -262,7 +267,10 @@ def list(cls):
query = f"""
SELECT * FROM ML.EVALUATE(MODEL `{model_bq_name}`)
"""
query_job = client.query(query=query)
query_job = client.query(
query=query,
location=location
)

r = list(query_job.result())[0]

Expand Down Expand Up @@ -316,15 +324,19 @@ def bq_clustering_predictions(
destination_table.metadata["table_id"] = f"{bigquery_destination_prefix}_{timestamp}"
model_uri = f"{model.metadata['projectId']}.{model.metadata['datasetId']}.{model.metadata['modelId']}"

client = bigquery.Client(project=project_id, location=location)
client = bigquery.Client(
project=project_id,
location=location
)

query = f"""
SELECT * FROM ML.PREDICT(MODEL `{model_uri}`,
TABLE `{bigquery_source}`)
"""

query_job = client.query(
query,
query=query,
location=location,
job_config=bigquery.QueryJobConfig(
destination=destination_table.metadata["table_id"])
)
Expand Down Expand Up @@ -368,8 +380,8 @@ def bq_flatten_tabular_binary_prediction_table(

# View table properties
logging.info(
"Got table '{}.{}.{}'.".format(
bq_table.project, bq_table.dataset_id, bq_table.table_id)
"Got table '{}.{}.{} located at {}'.".format(
bq_table.project, bq_table.dataset_id, bq_table.table_id, bq_table.location)
)

predictions_column = None
Expand Down Expand Up @@ -400,15 +412,15 @@ def bq_flatten_tabular_binary_prediction_table(

job_config = bigquery.QueryJobConfig()
job_config.write_disposition = 'WRITE_TRUNCATE'
"""
# Make an API request to create the view.
view = bigquery.Table(f"{table.metadata['table_id']}_view")
view.view_query = query
view = client.create_table(table = view)
logging.info(f"Created {view.table_type}: {str(view.reference)}")
"""

# Reconstruct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=bq_table.location
)
query_job = client.query(
query
query=query,
location=bq_table.location
)

results = query_job.result()
Expand Down Expand Up @@ -451,8 +463,8 @@ def bq_flatten_tabular_regression_table(

# View table properties
logging.info(
"Got table '{}.{}.{}'.".format(
bq_table.project, bq_table.dataset_id, bq_table.table_id)
"Got table '{}.{}.{} located at {}'.".format(
bq_table.project, bq_table.dataset_id, bq_table.table_id, bq_table.location)
)

predictions_column = None
Expand All @@ -473,15 +485,15 @@ def bq_flatten_tabular_regression_table(
"""
job_config = bigquery.QueryJobConfig()
job_config.write_disposition = 'WRITE_TRUNCATE'
"""
# Make an API request to create the view.
view = bigquery.Table(f"{table.metadata['table_id']}_view")
view.view_query = query
view = client.create_table(table = view)
logging.info(f"Created {view.table_type}: {str(view.reference)}")
"""

# Reconstruct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=bq_table.location
)
query_job = client.query(
query
query=query,
location=bq_table.location,
)

results = query_job.result()
Expand Down Expand Up @@ -548,7 +560,8 @@ def bq_flatten_kmeans_prediction_table(
logging.info(f"Created {view.table_type}: {str(view.reference)}")
"""
query_job = client.query(
query
query=query,
location=location
)

results = query_job.result()
Expand Down
8 changes: 3 additions & 5 deletions python/pipelines/tabular_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def prediction_binary_classification_pl(
location=location,
source_table=bigquery_source,
predictions_table=predictions.outputs['destination_table'],
bq_unique_key = bq_unique_key,
bq_unique_key=bq_unique_key,
threashold=threashold,
positive_label=positive_label
)
Expand All @@ -96,8 +96,6 @@ def prediction_binary_classification_pl(
)




@dsl.pipeline()
def prediction_regression_pl(
project_id: str,
Expand All @@ -114,7 +112,7 @@ def prediction_regression_pl(
bigquery_source: str,
bigquery_destination_prefix: str,
bq_unique_key: str,

job_name_prefix: str,
machine_type: str = "n1-standard-4",
max_replica_count: int = 10,
Expand Down Expand Up @@ -151,7 +149,7 @@ def prediction_regression_pl(
location=location,
source_table=bigquery_source,
predictions_table=predictions.outputs['destination_table'],
bq_unique_key = bq_unique_key
bq_unique_key=bq_unique_key
)

send_pubsub_activation_msg(
Expand Down
54 changes: 54 additions & 0 deletions sql/procedure/audience_segmentation_inference_preparation.sqlx
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,62 @@
-- limitations under the License.
-- Setting procedure to lookback from the day before `inference_date`

DECLARE lastest_processed_time_ud TIMESTAMP;
DECLARE lastest_processed_time_uwm TIMESTAMP;
DECLARE lastest_processed_time_um TIMESTAMP;

SET lastest_processed_time_ud = (SELECT MAX(processed_timestamp) FROM `{{feature_store_project_id}}.{{feature_store_dataset}}.user_segmentation_dimensions` WHERE feature_date = inference_date LIMIT 1);
SET lastest_processed_time_uwm = (SELECT MAX(processed_timestamp) FROM `{{feature_store_project_id}}.{{feature_store_dataset}}.user_lookback_metrics` WHERE feature_date = inference_date LIMIT 1);
SET lastest_processed_time_um = (SELECT MAX(processed_timestamp) FROM `{{feature_store_project_id}}.{{feature_store_dataset}}.user_scoped_segmentation_metrics` WHERE feature_date = inference_date LIMIT 1);

SET inference_date = DATE_SUB(inference_date, INTERVAL 1 DAY);

CREATE OR REPLACE TEMP TABLE inference_preparation_ud as (
SELECT DISTINCT
UD.user_pseudo_id,
MAX(UD.user_id) OVER(user_segmentation_dimensions_window) AS user_id,
UD.feature_date,
MAX(UD.month_of_the_year) OVER(user_segmentation_dimensions_window) AS month_of_the_year,
MAX(UD.week_of_the_year) OVER(user_segmentation_dimensions_window) AS week_of_the_year,
MAX(UD.day_of_the_month) OVER(user_segmentation_dimensions_window) AS day_of_the_month,
MAX(UD.day_of_week) OVER(user_segmentation_dimensions_window) AS day_of_week,
MAX(UD.device_category) OVER(user_segmentation_dimensions_window) AS device_category,
MAX(UD.device_mobile_brand_name) OVER(user_segmentation_dimensions_window) AS device_mobile_brand_name,
MAX(UD.device_mobile_model_name) OVER(user_segmentation_dimensions_window) AS device_mobile_model_name,
MAX(UD.device_os) OVER(user_segmentation_dimensions_window) AS device_os,
MAX(UD.device_os_version) OVER(user_segmentation_dimensions_window) AS device_os_version,
MAX(UD.device_language) OVER(user_segmentation_dimensions_window) AS device_language,
MAX(UD.device_web_browser) OVER(user_segmentation_dimensions_window) AS device_web_browser,
MAX(UD.device_web_browser_version) OVER(user_segmentation_dimensions_window) AS device_web_browser_version,
MAX(UD.geo_sub_continent) OVER(user_segmentation_dimensions_window) AS geo_sub_continent,
MAX(UD.geo_country) OVER(user_segmentation_dimensions_window) AS geo_country,
MAX(UD.geo_region) OVER(user_segmentation_dimensions_window) AS geo_region,
MAX(UD.geo_city) OVER(user_segmentation_dimensions_window) AS geo_city,
MAX(UD.geo_metro) OVER(user_segmentation_dimensions_window) AS geo_metro,
MAX(UD.last_traffic_source_medium) OVER(user_segmentation_dimensions_window) AS last_traffic_source_medium,
MAX(UD.last_traffic_source_name) OVER(user_segmentation_dimensions_window) AS last_traffic_source_name,
MAX(UD.last_traffic_source_source) OVER(user_segmentation_dimensions_window) AS last_traffic_source_source,
MAX(UD.first_traffic_source_medium) OVER(user_segmentation_dimensions_window) AS first_traffic_source_medium,
MAX(UD.first_traffic_source_name) OVER(user_segmentation_dimensions_window) AS first_traffic_source_name,
MAX(UD.first_traffic_source_source) OVER(user_segmentation_dimensions_window) AS first_traffic_source_source,
MAX(UD.has_signed_in_with_user_id) OVER(user_segmentation_dimensions_window) AS has_signed_in_with_user_id
FROM
`{{feature_store_project_id}}.{{feature_store_dataset}}.user_segmentation_dimensions` UD
WHERE
-- Define the training+validation subset interval
UD.feature_date = inference_date
AND UD.processed_timestamp = lastest_processed_time_ud
WINDOW
user_segmentation_dimensions_window AS (PARTITION BY UD.user_pseudo_id ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
);








CREATE TEMP TABLE inference_preparation AS (
SELECT DISTINCT
UD.user_pseudo_id,
Expand Down
Loading

0 comments on commit 7c05ac6

Please sign in to comment.