Skip to content

Commit

Permalink
Use lazy formatting for logging
Browse files Browse the repository at this point in the history
  • Loading branch information
mjurkus committed Nov 13, 2023
1 parent 61950d6 commit f10b9be
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(
self._run_dataflow()

def process_path(self, path):
logger.info(f"Processing path {path}")
logger.info("Processing path %s", path)
dataset = pq.ParquetDataset(path, use_legacy_dataset=False)
batches = []
for fragment in dataset.fragments:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,35 +234,30 @@ def _await_path_materialization(
MaterializationJobStatus.WAITING,
MaterializationJobStatus.RUNNING,
):
logger.info(
f"{feature_view.name} materialization for pods {batch_start}-{batch_end} "
f"(of {total_pods}) running..."
)
logger.info("%s materialization for pods %d-%d (of %d) running...",
feature_view.name, batch_start, batch_end, total_pods)
sleep(30)
logger.info(
f"{feature_view.name} materialization for pods {batch_start}-{batch_end} "
f"(of {total_pods}) complete with status {job.status()}"
)

logger.info("%s materialization for pods %d-%d (of %d) complete with status %s",
feature_view.name, batch_start, batch_end, total_pods, job.status())
except BaseException as e:
if self.batch_engine_config.print_pod_logs_on_failure:
self._print_pod_logs(job.job_id(), feature_view, batch_start)

logger.info(f"Deleting job {job.job_id()}")
logger.info("Deleting job %s", job.job_id())
try:
self.batch_v1.delete_namespaced_job(job.job_id(), self.namespace)
except ApiException as ae:
logger.warning(f"Could not delete job due to API Error: {ae.body}")
logger.warning("Could not delete job due to API Error: %s", ae.body)
raise e
finally:
logger.info(f"Deleting configmap {self._configmap_name(job_id)}")
logger.info("Deleting configmap %s", self._configmap_name(job_id))
try:
self.v1.delete_namespaced_config_map(
self._configmap_name(job_id), self.namespace
)
except ApiException as ae:
logger.warning(
f"Could not delete configmap due to API Error: {ae.body}"
)
logger.warning("Could not delete configmap due to API Error: %s", ae.body)

return job

Expand All @@ -272,13 +267,13 @@ def _print_pod_logs(self, job_id, feature_view, offset=0):
label_selector=f"job-name={job_id}",
).items
for i, pod in enumerate(pods_list):
logger.info(f"Logging output for {feature_view.name} pod {offset + i}")
logger.info("Logging output for %s pod %d", feature_view.name, offset + i)
try:
logger.info(
self.v1.read_namespaced_pod_log(pod.metadata.name, self.namespace)
)
except ApiException as e:
logger.warning(f"Could not retrieve pod logs due to: {e.body}")
logger.warning("Could not retrieve pod logs due to: %s", e.body)

def _create_kubernetes_job(self, job_id, paths, feature_view):
try:
Expand All @@ -293,7 +288,7 @@ def _create_kubernetes_job(self, job_id, paths, feature_view):
self.batch_engine_config.env,
)

logger.info(f"Created job `dataflow-{job_id}` in namespace `{self.namespace}`")
logger.info("Created job `dataflow-%s` in namespace `%s`", job_id, self.namespace)
except FailToCreateError as failures:
return BytewaxMaterializationJob(job_id, self.namespace, error=failures)

Expand Down

0 comments on commit f10b9be

Please sign in to comment.