Skip to content

Commit

Permalink
Streaming of SAS-log while the task is running (#26)
Browse files Browse the repository at this point in the history
* The SAS Log will now be streamed back to the DAG-log while the task is running, instead of being retrieved after the code have been executed

* New version 0.0.12

---------

Co-authored-by: sdktjj <[email protected]>
  • Loading branch information
torbenjuul and sdktjj authored Dec 20, 2023
1 parent 4ed8b18 commit 1c8f96a
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 22 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = sas-airflow-provider
version = 0.0.11
version = 0.0.12
author = SAS
author_email = [email protected]
description = Enables execution of Studio Flows and Jobs from Airflow
Expand Down
46 changes: 25 additions & 21 deletions src/sas_airflow_provider/operators/sas_studio.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from sas_airflow_provider.hooks.sas import SasHook
from sas_airflow_provider.util.util import dump_logs, create_or_connect_to_session, end_compute_session
from sas_airflow_provider.util.util import dump_logs, stream_log, create_or_connect_to_session, end_compute_session

# main API URI for Code Gen
URI_BASE = "/studioDevelopment/code"
Expand Down Expand Up @@ -145,7 +145,8 @@ def __init__(
self.on_success_callback=[on_success]
self.on_failure_callback=[on_failure]
self.on_retry_callback=[on_retry]



def execute(self, context):
if self.path_type not in ['compute', 'content', 'raw']:
raise AirflowFailException("Path type is invalid. Valid values are 'compute', 'content' or 'raw'")
Expand Down Expand Up @@ -199,27 +200,21 @@ def execute(self, context):
except Exception as e:
raise AirflowException(f"SASStudioOperator error: {str(e)}")


# Kick off the JES job.
# Kick off the JES job, wait to get the state
# _run_job_and_wait will poll for new
# SAS log-lines and stream them in the DAG'-log
job, success = self._run_job_and_wait(jr, 10)
job_state = job["state"]

# display logs if needed
if self.exec_log is True:
# Safeguard if we are unable to retreive the log. We will NOT throw any exceptions
try:
dump_logs(self.connection, job)
except Exception as e:
self.log.info("Unable to retrieve log. Maybe the log is too large.")

job_state= "unknown"
if "state" in job:
job_state = job["state"]

# set output variables
if success and self.output_macro_var_prefix and self.compute_session_id:
try:
self._set_output_variables(context)
except Exception as e:
raise AirflowException(f"SASStudioOperator error: {str(e)}")


# raise exception in Airflow if SAS Studio Flow ended execution with "failed" "canceled" or "timed out" state
# support retry for 'failed' (typically there is an ERROR in the log) and 'timed out'
# do NOT support retry for 'canceled' (typically the SAS Job called ABORT ABEND)
Expand Down Expand Up @@ -336,6 +331,7 @@ def _run_job_and_wait(self, job_request: dict, poll_interval: int) -> (dict, boo
state = "unknown"
countUnknownState = 0
log_location = None
num_log_lines= 0
while state in ["pending", "running"] or (state == "unknown" and ((countUnknownState*poll_interval) <= self.unknown_state_timeout)):
time.sleep(poll_interval)

Expand All @@ -348,12 +344,16 @@ def _run_job_and_wait(self, job_request: dict, poll_interval: int) -> (dict, boo
else:
countUnknownState = 0
job = response.json()
state = job["state"]
if state == "running" and log_location == None:
# Print the log location to the DAG-log, in case the user needs access to the SAS-log while it is running.
if "logLocation" in job:
log_location=job["logLocation"];
self.log.info(f"While the job is running, the SAS-log formated as JSON can be found at URI: {log_location}?limit=9999999")
if "state" in job:
state = job["state"]
else:
self.log.info(f'Not able to determine state from {uri}. Will set state=unknown and continue checking...')
state = "unknown"

# Get the latest new log lines.
if self.exec_log and state != "unknown":
num_log_lines=stream_log(self.connection, job, num_log_lines)

except Exception as e:
countUnknownState = countUnknownState + 1
self.log.info(f'HTTP Call failed with error "{e}". Will set state=unknown and continue checking...')
Expand All @@ -363,6 +363,10 @@ def _run_job_and_wait(self, job_request: dict, poll_interval: int) -> (dict, boo
# Raise AirflowFailException as we don't know if the job is still running
raise AirflowFailException(f'Unable to retrieve state of job after trying {countUnknownState} times. Will mark task as failed. Please check the SAS-log.')

# Be sure to Get the latest new log lines after the job have finished.
if self.exec_log:
num_log_lines=stream_log(self.connection, job, num_log_lines)

self.log.info("Job request has completed execution with the status: " + str(state))
success = True
if state in ['failed', 'canceled', 'timed out']:
Expand Down
36 changes: 36 additions & 0 deletions src/sas_airflow_provider/util/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import json
import requests
import os
import logging


def get_folder_file_contents(session, path: str) -> str:
Expand Down Expand Up @@ -106,6 +107,41 @@ def get_uri(links, rel):
return link["uri"]


def stream_log(session,job,start,limit=99999) -> int:
current_line=start

log_uri = get_uri(job["links"], "log")
if not log_uri:
logging.getLogger(name=None).warning("Warning: failed to retrieve log URI from links")
else:
try:
# Note if it is a files link (it will be that when the job have finished), this does not support the 'start' parameter, so we need to filter it by ourself.
# We will ignore the limit parameter in that case
is_files_link=log_uri.startswith("/files/")

r = session.get(f"{log_uri}/content?start={start}&limit={limit}")
if r.ok:
# Parse the json log format and print each line
log_contents = r.text
jcontents = json.loads(log_contents)
lines=0;
for line in jcontents["items"]:
if (is_files_link and lines>=start) or not is_files_link:
t = line["type"]
if t != "title":
logging.getLogger(name=None).info(f'{line["line"]}')
current_line=current_line+1

lines=lines+1
else:
logging.getLogger(name=None).warning(f"Failed to retrieve part of the log from URI: {log_uri}/content ")
except Exception as e:
logging.getLogger(name=None).warning("Unable to retrieve parts of the log.")

return current_line



def dump_logs(session, job):
"""
Get the log from the job object
Expand Down

0 comments on commit 1c8f96a

Please sign in to comment.