Skip to content

Commit

Permalink
Added support for job_name_prefix: an optional parameter used to iden…
Browse files Browse the repository at this point in the history
…tify the compute server session in SAS Workload Orchestrator (#41)

* Added parameter job_name_prefix in SASStudioOperator to be used for identify a compute session in SAS Workload Orchestrator (SWO)

* New version 0.0.17
  • Loading branch information
torbenjuul authored Sep 5, 2024
1 parent 7be7439 commit 010a535
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 7 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = sas-airflow-provider
version = 0.0.16
version = 0.0.17
author = SAS
author_email = [email protected]
description = Enables execution of Studio Flows and Jobs from Airflow
Expand Down
12 changes: 11 additions & 1 deletion src/sas_airflow_provider/operators/sas_create_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ class SASComputeCreateSession(BaseOperator):
:param compute_context_name: (optional) Name of the Compute context to use. If not provided, a
suitable default is used.
:param session_name: (optional) name to give the created session. If not provided, a suitable default is used
:param http_timeout: (optional) Timeout for https requests. Default value is (30.05, 300), meaning a connect timeout sligthly above 30 seoconds and
a read timeout of 300 seconds where the operator will wait for the server to send a response.
:param job_name_prefix: (optional) string. Specify a name that you want the compute session to identify as in SAS Workload Orchestrator (SWO).
If job_name_prefix is not specified the default prefix is determined by Viya (currently 'sas-compute-server-').
If the value cannot be parsed by Viya to create a valid k8s pod name, the default value will be used as well.
job_name_prefix is supported from Viya Stable 2024.07
"""

ui_color = "#CCE5FF"
Expand All @@ -47,6 +53,8 @@ def __init__(
connection_name=None,
compute_context_name="SAS Studio compute context",
session_name="Airflow-Session",
http_timeout=(30.05, 300),
job_name_prefix=None,
**kwargs,
) -> None:

Expand All @@ -56,6 +64,8 @@ def __init__(
self.compute_context_name = compute_context_name
self.session_name = session_name
self.compute_session_id=""
self.http_timeout=http_timeout
self.job_name_prefix = job_name_prefix

def execute(self, context):
try:
Expand All @@ -74,7 +84,7 @@ def _connect_compute(self):
# connect to compute if we are not connected, and set our compute session id
if not self.compute_session_id:
self.log.info("Creating or connecting to compute session")
sesh = create_or_connect_to_session(self.connection, self.compute_context_name, self.session_name)
sesh = create_or_connect_to_session(self.connection,self.compute_context_name,self.session_name,self.http_timeout,self.job_name_prefix)
self.compute_session_id = sesh["id"]
self.log.info(f"Created session with id {self.compute_session_id}")

12 changes: 10 additions & 2 deletions src/sas_airflow_provider/operators/sas_studio.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@ class SASStudioOperator(BaseOperator):
temporary unobtainable. When unknown_state_timeout is reached without the state being retrievable, the operator
will throw an AirflowFailException and the task will be marked as failed.
Default value is 0, meaning the task will fail immediately if the state could not be retrieved.
:para http_timeout: (optional) Timeout for https requests. Default value is (30.05, 300), meaning a connect timeout sligthly above 30 seoconds and
:param http_timeout: (optional) Timeout for https requests. Default value is (30.05, 300), meaning a connect timeout sligthly above 30 seoconds and
a read timeout of 300 seconds where the operator will wait for the server to send a response.
:param job_name_prefix: (optional) string. Specify a name that you want the compute session to identify as in SAS Workload Orchestrator (SWO).
If job_name_prefix is not specified the default prefix is determined by Viya (currently 'sas-compute-server-').
If the value cannot be parsed by Viya to create a valid k8s pod name, the default value will be used as well.
job_name_prefix is supported from Viya Stable 2024.07
"""

ui_color = "#CCE5FF"
Expand All @@ -115,6 +119,7 @@ def __init__(
compute_session_id="",
output_macro_var_prefix="",
unknown_state_timeout=0,
job_name_prefix=None,
http_timeout=(30.05, 300),
**kwargs,
) -> None:
Expand All @@ -134,6 +139,7 @@ def __init__(
self.macro_vars = macro_vars
self.connection = None
self.allways_reuse_session = allways_reuse_session
self.job_name_prefix = job_name_prefix

self.external_managed_session = False
self.compute_session_id = None
Expand Down Expand Up @@ -176,7 +182,9 @@ def execute(self, context):
compute_session = create_or_connect_to_session(self.connection,
self.compute_context_name,
AIRFLOW_SESSION_NAME if self.allways_reuse_session else None,
http_timeout=self.http_timeout)
http_timeout=self.http_timeout,
job_name_prefix=self.job_name_prefix
)
self.compute_session_id = compute_session["id"]
else:
self.log.info(f"Compute Session {self.compute_session_id} was provided")
Expand Down
13 changes: 10 additions & 3 deletions src/sas_airflow_provider/util/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,15 @@ def find_named_compute_session(session: requests.Session, name: str, http_timeou
return sessions["items"][0]
return {}

def create_or_connect_to_session(session: requests.Session, context_name: str, name = None, http_timeout=None) -> dict:
def create_or_connect_to_session(session: requests.Session, context_name: str, name = None, http_timeout=None, job_name_prefix = None) -> dict:
"""
Connect to an existing compute session by name. If that named session does not exist,
one is created using the context name supplied
:param session: rest session that includes oauth token
:param context_name: the context name to use to create the session if the session was not found
:param name: name of session to find
:param http_timeout: Timeout for http connection
:param job_name_prefix: (optional) string. The name that you want the compute session to identify as in SAS Workload Orchestrator (SWO). job_name_prefix is supported from Viya Stable 2024.07 and forward
:return: session object
"""
Expand All @@ -212,10 +213,16 @@ def create_or_connect_to_session(session: requests.Session, context_name: str, n
# create session with given context
uri = f'/compute/contexts/{sas_context["id"]}/sessions'
if name != None:
session_request = {"version": 1, "name": name}
if job_name_prefix != None:
session_request = {"version": 1, "name": name, "attributes":{"jobNamePrefix":job_name_prefix}}
else:
session_request = {"version": 1, "name": name}
else:
# Create a unnamed session
session_request = {"version": 1}
if job_name_prefix != None:
session_request = {"version": 1, "attributes":{"jobNamePrefix":job_name_prefix}}
else:
session_request = {"version": 1}

headers = {"Content-Type": "application/vnd.sas.compute.session.request+json"}

Expand Down

0 comments on commit 010a535

Please sign in to comment.