diff --git a/application_sdk/activities/common/utils.py b/application_sdk/activities/common/utils.py index 163febf6a..3a6167ba0 100644 --- a/application_sdk/activities/common/utils.py +++ b/application_sdk/activities/common/utils.py @@ -7,6 +7,7 @@ import asyncio import glob import os +import re from datetime import timedelta from functools import wraps from typing import Any, Awaitable, Callable, List, Optional, TypeVar, cast @@ -22,6 +23,9 @@ logger = get_logger(__name__) +# Compiled regex pattern for removing timestamp suffix from workflow IDs +TIMESTAMP_PATTERN = re.compile(r"-\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$") + F = TypeVar("F", bound=Callable[..., Awaitable[Any]]) @@ -63,6 +67,7 @@ def build_output_path() -> str: """Build a standardized output path for workflow artifacts. This method creates a consistent output path format across all workflows using the WORKFLOW_OUTPUT_PATH_TEMPLATE constant. + For scheduled workflows, it removes any timestamp suffix from the workflow_id to ensure consistent output paths. Returns: str: The standardized output path. @@ -71,9 +76,19 @@ def build_output_path() -> str: >>> build_output_path() "artifacts/apps/appName/workflows/wf-123/run-456" """ + # Sanitize workflow_id to remove any schedule/timestamp suffix + raw_workflow_id = get_workflow_id() + + # Remove timestamp suffix (e.g., '-YYYY-MM-DDTHH:MM:SSZ') if present + sanitized_workflow_id = TIMESTAMP_PATTERN.sub("", raw_workflow_id) + + # Fallback to raw workflow_id if sanitization results in empty string + if not sanitized_workflow_id: + sanitized_workflow_id = "unknown-workflow" + return WORKFLOW_OUTPUT_PATH_TEMPLATE.format( application_name=APPLICATION_NAME, - workflow_id=get_workflow_id(), + workflow_id=sanitized_workflow_id, run_id=get_workflow_run_id(), ) diff --git a/tests/unit/activities/common/test_utils.py b/tests/unit/activities/common/test_utils.py index 9db41d555..0bcb39ee1 100644 --- a/tests/unit/activities/common/test_utils.py +++ b/tests/unit/activities/common/test_utils.py @@ -8,6 +8,7 @@ from application_sdk.activities.common.utils import ( auto_heartbeater, + build_output_path, get_object_store_prefix, get_workflow_id, send_periodic_heartbeat, @@ -36,6 +37,37 @@ def test_get_workflow_id_activity_error(self, mock_activity): get_workflow_id() +class TestBuildOutputPath: + """Test cases for build_output_path function.""" + + @patch("application_sdk.constants.APPLICATION_NAME", "test-app") + @patch("application_sdk.activities.common.utils.activity") + def test_build_output_path_standard(self, mock_activity): + """Standard case: typical workflow and run IDs.""" + mock_activity.info.return_value.workflow_id = "wf-123" + mock_activity.info.return_value.workflow_run_id = "run-456" + result = build_output_path() + assert result == "artifacts/apps/test-app/workflows/wf-123/run-456" + + @patch("application_sdk.constants.APPLICATION_NAME", "test-app") + @patch("application_sdk.activities.common.utils.activity") + def test_build_output_path_scheduled(self, mock_activity): + """Scheduled run: workflow ID with timestamp suffix.""" + mock_activity.info.return_value.workflow_id = "wf-123-2025-09-30T12:34:56Z" + mock_activity.info.return_value.workflow_run_id = "run-456" + result = build_output_path() + assert result == "artifacts/apps/test-app/workflows/wf-123/run-456" + + @patch("application_sdk.constants.APPLICATION_NAME", "test-app") + @patch("application_sdk.activities.common.utils.activity") + def test_build_output_path_empty_workflow_id(self, mock_activity): + """Defensive: workflow ID is empty string.""" + mock_activity.info.return_value.workflow_id = "" + mock_activity.info.return_value.workflow_run_id = "run-000" + result = build_output_path() + assert result == "artifacts/apps/test-app/workflows/unknown-workflow/run-000" + + class TestGetObjectStorePrefix: """Test cases for get_object_store_prefix function - Real World Scenarios."""