diff --git a/examples/workspace_run_task.py b/examples/workspace_run_task.py new file mode 100644 index 0000000..cf47294 --- /dev/null +++ b/examples/workspace_run_task.py @@ -0,0 +1,313 @@ +""" +Terraform Cloud/Enterprise Workspace Run Task Management Example + +This example demonstrates comprehensive workspace run task operations using the python-tfe SDK. +It provides a command-line interface for managing workspace run tasks with various operations +including attach/create, read, update, delete, and listing attached tasks. + +Prerequisites: + - Set TFE_TOKEN environment variable with your Terraform Cloud API token + - Ensure you have access to the target organization and workspaces + - Run tasks must exist in the organization before attaching to workspaces + +Basic Usage: + python examples/workspace_run_task.py --help + +Core Operations: + +1. List Workspace Run Tasks (default operation): + python examples/workspace_run_task.py --workspace-id ws-abc123 + python examples/workspace_run_task.py --workspace-id ws-abc123 --page-size 20 + +2. Attach Run Task to Workspace (Create): + python examples/workspace_run_task.py --workspace-id ws-abc123 --run-task-id task-def456 --create --enforcement-level mandatory --stages pre-plan post-plan + +3. Read Workspace Run Task Details: + python examples/workspace_run_task.py --workspace-id ws-abc123 --workspace-task-id wstask-xyz789 + +4. Update Workspace Run Task: + python examples/workspace_run_task.py --workspace-id ws-abc123 --workspace-task-id wstask-xyz789 --update --enforcement-level advisory --stages pre-plan + +5. Delete Workspace Run Task: + python examples/workspace_run_task.py --workspace-id ws-abc123 --workspace-task-id wstask-xyz789 --delete +""" + +from __future__ import annotations + +import argparse +import os + +from pytfe import TFEClient, TFEConfig +from pytfe.models import ( + RunTask, + Stage, + TaskEnforcementLevel, + WorkspaceRunTaskCreateOptions, + WorkspaceRunTaskListOptions, + WorkspaceRunTaskUpdateOptions, +) + +# Ensure models are fully rebuilt to resolve forward references +WorkspaceRunTaskUpdateOptions.model_rebuild() +WorkspaceRunTaskCreateOptions.model_rebuild() + + +def _print_header(title: str) -> None: + """Print a formatted header for operations.""" + print("\n" + "=" * 80) + print(title) + print("=" * 80) + + +def main(): + parser = argparse.ArgumentParser( + description="Workspace Run Task demo for python-tfe SDK" + ) + parser.add_argument( + "--address", default=os.getenv("TFE_ADDRESS", "https://app.terraform.io") + ) + parser.add_argument("--token", default=os.getenv("TFE_TOKEN", "")) + parser.add_argument("--workspace-id", required=True, help="Workspace ID") + parser.add_argument( + "--run-task-id", help="Run Task ID to attach (for create operation)" + ) + parser.add_argument( + "--workspace-task-id", help="Workspace Run Task ID for read/update/delete" + ) + parser.add_argument( + "--create", action="store_true", help="Create/attach a workspace run task" + ) + parser.add_argument( + "--update", action="store_true", help="Update a workspace run task" + ) + parser.add_argument( + "--delete", action="store_true", help="Delete a workspace run task" + ) + parser.add_argument( + "--enforcement-level", + choices=["advisory", "mandatory"], + help="Enforcement level for create/update", + ) + parser.add_argument( + "--stages", + nargs="+", + choices=["pre-plan", "post-plan", "pre-apply", "post-apply"], + help="Stages to run the task in (for create/update)", + ) + parser.add_argument( + "--stage", + choices=["pre-plan", "post-plan", "pre-apply", "post-apply"], + help="Deprecated: Single stage to run the task in (use --stages instead)", + ) + parser.add_argument("--page", type=int, default=1, help="Page number for listing") + parser.add_argument( + "--page-size", type=int, default=10, help="Page size for listing" + ) + args = parser.parse_args() + + cfg = TFEConfig(address=args.address, token=args.token) + client = TFEClient(cfg) + + # Create a new workspace run task (attach run task to workspace) + if args.create: + if not args.run_task_id: + print("Error: --run-task-id is required for creating a workspace run task") + return + + if not args.enforcement_level: + print("Error: --enforcement-level is required for creating") + return + + _print_header("Creating Workspace Run Task") + + # Convert enforcement level string to enum + enforcement_level = ( + TaskEnforcementLevel.MANDATORY + if args.enforcement_level == "mandatory" + else TaskEnforcementLevel.ADVISORY + ) + + # Convert stages to enum + stages = None + if args.stages: + stages = [] + for stage_str in args.stages: + if stage_str == "pre-plan": + stages.append(Stage.PRE_PLAN) + elif stage_str == "post-plan": + stages.append(Stage.POST_PLAN) + elif stage_str == "pre-apply": + stages.append(Stage.PRE_APPLY) + elif stage_str == "post-apply": + stages.append(Stage.POST_APPLY) + + # Deprecated stage support + stage = None + if args.stage: + if args.stage == "pre-plan": + stage = Stage.PRE_PLAN + elif args.stage == "post-plan": + stage = Stage.POST_PLAN + elif args.stage == "pre-apply": + stage = Stage.PRE_APPLY + elif args.stage == "post-apply": + stage = Stage.POST_APPLY + + # Create RunTask object with just ID (minimal required) + run_task = RunTask( + id=args.run_task_id, + name="", # Name not needed for attachment + url="", # URL not needed for attachment + category="task", + enabled=True, + ) + + options = WorkspaceRunTaskCreateOptions( + enforcement_level=enforcement_level, + run_task=run_task, + stages=stages, + stage=stage, + ) + + try: + workspace_task = client.workspace_run_tasks.create( + args.workspace_id, options + ) + print("✓ Successfully attached run task to workspace") + print(f" Workspace Task ID: {workspace_task.id}") + print(f" Enforcement Level: {workspace_task.enforcement_level.value}") + print(f" Stage: {workspace_task.stage.value}") + if workspace_task.stages: + print(f" Stages: {[s.value for s in workspace_task.stages]}") + except Exception as e: + print(f"✗ Failed to create workspace run task: {e}") + + # Update an existing workspace run task + elif args.update: + if not args.workspace_task_id: + print("Error: --workspace-task-id is required for updating") + return + + _print_header("Updating Workspace Run Task") + + # Build update options + enforcement_level = None + if args.enforcement_level: + enforcement_level = ( + TaskEnforcementLevel.MANDATORY + if args.enforcement_level == "mandatory" + else TaskEnforcementLevel.ADVISORY + ) + + # Update stages if provided + stages = None + if args.stages: + stages = [] + for stage_str in args.stages: + if stage_str == "pre-plan": + stages.append(Stage.PRE_PLAN) + elif stage_str == "post-plan": + stages.append(Stage.POST_PLAN) + elif stage_str == "pre-apply": + stages.append(Stage.PRE_APPLY) + elif stage_str == "post-apply": + stages.append(Stage.POST_APPLY) + + options = WorkspaceRunTaskUpdateOptions( + enforcement_level=enforcement_level, stages=stages + ) + + # Update stage if provided (deprecated) + if args.stage: + if args.stage == "pre-plan": + options.stage = Stage.PRE_PLAN + elif args.stage == "post-plan": + options.stage = Stage.POST_PLAN + elif args.stage == "pre-apply": + options.stage = Stage.PRE_APPLY + elif args.stage == "post-apply": + options.stage = Stage.POST_APPLY + + try: + workspace_task = client.workspace_run_tasks.update( + args.workspace_id, args.workspace_task_id, options + ) + print("✓ Successfully updated workspace run task") + print(f" Workspace Task ID: {workspace_task.id}") + print(f" Enforcement Level: {workspace_task.enforcement_level.value}") + print(f" Stage: {workspace_task.stage.value}") + if workspace_task.stages: + print(f" Stages: {[s.value for s in workspace_task.stages]}") + except Exception as e: + print(f"✗ Failed to update workspace run task: {e}") + + # Delete a workspace run task + elif args.delete: + if not args.workspace_task_id: + print("Error: --workspace-task-id is required for deleting") + return + + _print_header("Deleting Workspace Run Task") + + try: + client.workspace_run_tasks.delete(args.workspace_id, args.workspace_task_id) + print( + f"✓ Successfully deleted workspace run task: {args.workspace_task_id}" + ) + except Exception as e: + print(f"✗ Failed to delete workspace run task: {e}") + + # Read a specific workspace run task + elif args.workspace_task_id: + _print_header("Reading Workspace Run Task") + + try: + workspace_task = client.workspace_run_tasks.read( + args.workspace_id, args.workspace_task_id + ) + print("✓ Workspace Run Task Details:") + print(f" ID: {workspace_task.id}") + print(f" Enforcement Level: {workspace_task.enforcement_level.value}") + print(f" Stage (deprecated): {workspace_task.stage.value}") + if workspace_task.stages: + print(f" Stages: {[s.value for s in workspace_task.stages]}") + if workspace_task.run_task: + print(f" Run Task ID: {workspace_task.run_task.id}") + if workspace_task.workspace: + print(f" Workspace ID: {workspace_task.workspace.id}") + except Exception as e: + print(f"✗ Failed to read workspace run task: {e}") + + # List all workspace run tasks (default operation) + else: + _print_header(f"Listing Workspace Run Tasks for Workspace: {args.workspace_id}") + + options = WorkspaceRunTaskListOptions( + page_number=args.page, + page_size=args.page_size, + ) + + try: + count = 0 + for workspace_task in client.workspace_run_tasks.list( + args.workspace_id, options + ): + count += 1 + print(f"\n{count}. Workspace Run Task ID: {workspace_task.id}") + print(f" Enforcement Level: {workspace_task.enforcement_level.value}") + print(f" Stage: {workspace_task.stage.value}") + if workspace_task.stages: + print(f" Stages: {[s.value for s in workspace_task.stages]}") + if workspace_task.run_task: + print(f" Run Task ID: {workspace_task.run_task.id}") + + if count == 0: + print("No workspace run tasks found for this workspace.") + else: + print(f"\n✓ Total workspace run tasks listed: {count}") + except Exception as e: + print(f"✗ Failed to list workspace run tasks: {e}") + + +if __name__ == "__main__": + main() diff --git a/src/pytfe/client.py b/src/pytfe/client.py index 3894886..e4c88d8 100644 --- a/src/pytfe/client.py +++ b/src/pytfe/client.py @@ -31,6 +31,7 @@ from .resources.state_versions import StateVersions from .resources.variable import Variables from .resources.variable_sets import VariableSets, VariableSetVariables +from .resources.workspace_run_task import WorkspaceRunTasks from .resources.workspaces import Workspaces @@ -76,6 +77,7 @@ def __init__(self, config: TFEConfig | None = None): self.state_versions = StateVersions(self._transport) self.state_version_outputs = StateVersionOutputs(self._transport) self.run_tasks = RunTasks(self._transport) + self.workspace_run_tasks = WorkspaceRunTasks(self._transport) self.run_triggers = RunTriggers(self._transport) self.runs = Runs(self._transport) self.query_runs = QueryRuns(self._transport) diff --git a/src/pytfe/errors.py b/src/pytfe/errors.py index 3eac2be..c2a0466 100644 --- a/src/pytfe/errors.py +++ b/src/pytfe/errors.py @@ -304,6 +304,13 @@ def __init__(self, message: str = 'category must be "task"'): super().__init__(message) +class InvalidWorkspaceRunTaskIDError(InvalidValues): + """Raised when an invalid workspace run task ID is provided.""" + + def __init__(self, message: str = "invalid value for workspace run task ID"): + super().__init__(message) + + # Run Trigger errors class RequiredRunTriggerListOpsError(RequiredFieldMissing): """Raised when required run trigger list options are missing.""" diff --git a/src/pytfe/models/__init__.py b/src/pytfe/models/__init__.py index f3eb33b..1a3f394 100644 --- a/src/pytfe/models/__init__.py +++ b/src/pytfe/models/__init__.py @@ -334,6 +334,15 @@ WorkspaceUpdateRemoteStateConsumersOptions, ) +# ── Workspace Run Tasks ────────────────────────────────────────────────────── +from .workspace_run_task import ( + WorkspaceRunTask, + WorkspaceRunTaskCreateOptions, + WorkspaceRunTaskList, + WorkspaceRunTaskListOptions, + WorkspaceRunTaskUpdateOptions, +) + # ── Public surface ──────────────────────────────────────────────────────────── __all__ = [ # OAuth @@ -496,6 +505,12 @@ "WorkspaceTagListOptions", "WorkspaceUpdateOptions", "WorkspaceUpdateRemoteStateConsumersOptions", + # Workspace Run Tasks + "WorkspaceRunTask", + "WorkspaceRunTaskCreateOptions", + "WorkspaceRunTaskList", + "WorkspaceRunTaskListOptions", + "WorkspaceRunTaskUpdateOptions", "RunQueue", "ReadRunQueueOptions", # Runs @@ -545,6 +560,12 @@ "SourceableChoice", "RunTriggerFilterOp", "RunTriggerIncludeOp", + # Workspace Run Tasks + "WorkspaceRunTask", + "WorkspaceRunTaskCreateOptions", + "WorkspaceRunTaskList", + "WorkspaceRunTaskListOptions", + "WorkspaceRunTaskUpdateOptions", # Policy Checks "PolicyCheck", "PolicyCheckIncludeOpt", diff --git a/src/pytfe/models/run_task.py b/src/pytfe/models/run_task.py index 8741162..2d0e9a3 100644 --- a/src/pytfe/models/run_task.py +++ b/src/pytfe/models/run_task.py @@ -1,13 +1,18 @@ from __future__ import annotations from enum import Enum +from typing import TYPE_CHECKING from pydantic import BaseModel, Field from ..models.common import Pagination from .agent import AgentPool from .organization import Organization -from .workspace_run_task import WorkspaceRunTask + +# Use TYPE_CHECKING to avoid circular import issues between RunTask and WorkspaceRunTask +# This allows forward references without importing at runtime +if TYPE_CHECKING: + from .workspace_run_task import WorkspaceRunTask class RunTask(BaseModel): @@ -22,6 +27,8 @@ class RunTask(BaseModel): agent_pool: AgentPool | None = None organization: Organization | None = None + # Workspace run tasks that use this run task + # Added to support the workspace_run_tasks relationship in the API workspace_run_tasks: list[WorkspaceRunTask] = Field(default_factory=list) @@ -38,10 +45,19 @@ class GlobalRunTaskOptions(BaseModel): class Stage(str, Enum): - PRE_PLAN = "pre-plan" - POST_PLAN = "post-plan" - PRE_APPLY = "pre-apply" - POST_APPLY = "post-apply" + """Run task stage enumeration. + + Defines when a run task should execute in the run lifecycle. + + Note: Values use underscore format (e.g., 'pre_plan') to match the + Terraform Cloud API specification. This was changed from hyphen format + (e.g., 'pre-plan') to align with the actual API responses and requests. + """ + + PRE_PLAN = "pre_plan" + POST_PLAN = "post_plan" + PRE_APPLY = "pre_apply" + POST_APPLY = "post_apply" class TaskEnforcementLevel(str, Enum): @@ -91,3 +107,28 @@ class RunTaskUpdateOptions(BaseModel): enabled: bool | None = None global_configuration: GlobalRunTaskOptions | None = None agent_pool: AgentPool | None = None + + +def _rebuild_models() -> None: + """Rebuild models to resolve forward references. + + This function resolves the circular dependency between RunTask and WorkspaceRunTask. + It imports WorkspaceRunTask and rebuilds the Pydantic models so that forward + references (e.g., list["WorkspaceRunTask"]) are properly resolved. + + The try-except ensures that if WorkspaceRunTask hasn't been defined yet, + the models will rebuild later when first used. + """ + try: + from .workspace_run_task import WorkspaceRunTask # noqa: F401 + + RunTask.model_rebuild() + GlobalRunTask.model_rebuild() + GlobalRunTaskOptions.model_rebuild() + RunTaskUpdateOptions.model_rebuild() + except Exception: + # Models will rebuild later when first used + pass + + +_rebuild_models() diff --git a/src/pytfe/models/workspace_run_task.py b/src/pytfe/models/workspace_run_task.py index b5072a6..0af6aac 100644 --- a/src/pytfe/models/workspace_run_task.py +++ b/src/pytfe/models/workspace_run_task.py @@ -1,7 +1,77 @@ from __future__ import annotations -from pydantic import BaseModel +from typing import TYPE_CHECKING + +from pydantic import BaseModel, Field + +from ..models.common import Pagination + +if TYPE_CHECKING: + from .run_task import RunTask, Stage, TaskEnforcementLevel + from .workspace import Workspace class WorkspaceRunTask(BaseModel): + """Represents a run task attached to a workspace.""" + id: str + enforcement_level: TaskEnforcementLevel | None = None + # Deprecated: Use stages property instead + stage: Stage | None = None + stages: list[Stage] = Field(default_factory=list) + + # Relationships + run_task: RunTask | None = None + workspace: Workspace | None = None + + +class WorkspaceRunTaskList(BaseModel): + """Represents a list of workspace run tasks.""" + + items: list[WorkspaceRunTask] = Field(default_factory=list) + pagination: Pagination | None = None + + +class WorkspaceRunTaskListOptions(BaseModel): + """Options for listing workspace run tasks.""" + + page_number: int | None = None + page_size: int | None = None + + +class WorkspaceRunTaskCreateOptions(BaseModel): + """Options for creating a workspace run task.""" + + type: str = Field(default="workspace-tasks") + enforcement_level: TaskEnforcementLevel + run_task: RunTask # Required + # Deprecated: Use stages property instead + stage: Stage | None = None + stages: list[Stage] | None = None + + +class WorkspaceRunTaskUpdateOptions(BaseModel): + """Options for updating a workspace run task.""" + + type: str = Field(default="workspace-tasks") + enforcement_level: TaskEnforcementLevel | None = None + # Deprecated: Use stages property instead + stage: Stage | None = None + stages: list[Stage] | None = None + + +def _rebuild_models() -> None: + """Rebuild models to resolve forward references.""" + try: + from .run_task import RunTask, Stage, TaskEnforcementLevel # noqa: F401 + from .workspace import Workspace # noqa: F401 + + WorkspaceRunTask.model_rebuild() + WorkspaceRunTaskCreateOptions.model_rebuild() + WorkspaceRunTaskUpdateOptions.model_rebuild() + except Exception: + # Models will rebuild later when first used + pass + + +_rebuild_models() diff --git a/src/pytfe/resources/run_task.py b/src/pytfe/resources/run_task.py index 853eab6..dcda0dc 100644 --- a/src/pytfe/resources/run_task.py +++ b/src/pytfe/resources/run_task.py @@ -94,6 +94,9 @@ def _run_task_from(d: dict[str, Any], org: str | None = None) -> RunTask: ) # Handle workspace run tasks relationship + # Added to support the workspace-tasks relationship returned by the API. + # This populates the workspace_run_tasks field on RunTask objects when + # the API includes this relationship (e.g., when using include query params). workspace_run_tasks = [] wrt_data = relationships.get("workspace-tasks", {}).get("data", []) if isinstance(wrt_data, list): @@ -270,17 +273,35 @@ def attach_to_workspace( Attach a run task to a workspace. This is a convenience method that creates a workspace run task relationship. + Delegates to workspace_run_tasks.create(). + + Args: + workspace_id: The workspace ID + run_task_id: The run task ID to attach + enforcement_level: The enforcement level for this task + + Returns: + WorkspaceRunTask: The created workspace run task + + Raises: + InvalidWorkspaceIDError: If workspace_id is invalid + InvalidRunTaskIDError: If run_task_id is invalid """ - # This would typically delegate to workspace_run_tasks.create() - # For now, we'll create a placeholder implementation - # In a real implementation, this would call: - """ + from ..models.workspace_run_task import WorkspaceRunTaskCreateOptions + from .workspace_run_task import WorkspaceRunTasks + + # Create workspace run tasks service + workspace_run_tasks = WorkspaceRunTasks(self.t) + + # Create the run task object with minimal required fields + run_task = RunTask( + id=run_task_id, name="", url="", category="task", enabled=True + ) + + # Create options for attaching the task create_options = WorkspaceRunTaskCreateOptions( enforcement_level=enforcement_level, - run_task=RunTask(id=run_task_id, name="", url="", category="task", enabled=True) + run_task=run_task, ) - return workspace_run_tasks.create(workspace_id, create_options) - """ - # TODO: Implement actual workspace run task creation - raise NotImplementedError("attach_to_workspace method needs to be implemented") + return workspace_run_tasks.create(workspace_id, create_options) diff --git a/src/pytfe/resources/workspace_run_task.py b/src/pytfe/resources/workspace_run_task.py new file mode 100644 index 0000000..adfc221 --- /dev/null +++ b/src/pytfe/resources/workspace_run_task.py @@ -0,0 +1,334 @@ +from __future__ import annotations + +from collections.abc import Iterator +from typing import Any + +from ..errors import ( + InvalidRunTaskIDError, + InvalidWorkspaceIDError, + InvalidWorkspaceRunTaskIDError, +) +from ..models.run_task import RunTask, Stage, TaskEnforcementLevel +from ..models.workspace_run_task import ( + WorkspaceRunTask, + WorkspaceRunTaskCreateOptions, + WorkspaceRunTaskListOptions, + WorkspaceRunTaskUpdateOptions, +) +from ..utils import _safe_str, valid_string_id +from ._base import _Service + + +def _workspace_run_task_from(d: dict[str, Any]) -> WorkspaceRunTask: + """ + Convert JSON API response data to WorkspaceRunTask object. + + Maps the JSON API format to Python model fields, handling: + - Basic attributes (id, enforcement_level, stage, stages) + - Relationships (run_task, workspace) + """ + attr: dict[str, Any] = d.get("attributes", {}) or {} + relationships: dict[str, Any] = d.get("relationships", {}) or {} + + id_str: str = _safe_str(d.get("id")) + + # Parse enforcement level + enforcement_level = TaskEnforcementLevel.ADVISORY # Default + if "enforcement-level" in attr: + try: + enforcement_level = TaskEnforcementLevel(attr["enforcement-level"]) + except ValueError: + enforcement_level = TaskEnforcementLevel.ADVISORY + + # Parse stage (deprecated) + stage = Stage.PRE_PLAN # Default + if "stage" in attr: + try: + stage = Stage(attr["stage"]) + except ValueError: + stage = Stage.PRE_PLAN + + # Parse stages list + stages = [] + if "stages" in attr and isinstance(attr["stages"], list): + for stage_str in attr["stages"]: + if isinstance(stage_str, str): + try: + stages.append(Stage(stage_str)) + except ValueError: + pass # Skip invalid stages + + # Handle run_task relationship + run_task = None + run_task_data = relationships.get("task", {}).get("data") + if run_task_data and isinstance(run_task_data, dict): + run_task = RunTask( + id=_safe_str(run_task_data.get("id")), + name="", # Name not available in relationship data + url="", # URL not available in relationship data + category="task", + enabled=True, + ) + + # Handle workspace relationship + workspace = None + workspace_data = relationships.get("workspace", {}).get("data") + if workspace_data and isinstance(workspace_data, dict): + from ..models.workspace import Workspace + + workspace = Workspace( + id=_safe_str(workspace_data.get("id")), + name="", # Name not available in relationship data + ) + + return WorkspaceRunTask( + id=id_str, + enforcement_level=enforcement_level, + stage=stage, + stages=stages, + run_task=run_task, + workspace=workspace, + ) + + +class WorkspaceRunTasks(_Service): + """ + Workspace Run Tasks service for managing run tasks attached to workspaces. + + API Documentation: + https://developer.hashicorp.com/terraform/cloud-docs/api-docs/workspace-run-tasks + """ + + def list( + self, + workspace_id: str, + options: WorkspaceRunTaskListOptions | None = None, + ) -> Iterator[WorkspaceRunTask]: + """ + List all run tasks attached to a workspace. + + Args: + workspace_id: The ID of the workspace + options: Optional pagination parameters + + Yields: + WorkspaceRunTask objects + + Raises: + InvalidWorkspaceIDError: If workspace_id is invalid + + API Endpoint: + GET /workspaces/:workspace_id/tasks + """ + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError("Invalid workspace ID") + + url = f"/api/v2/workspaces/{workspace_id}/tasks" + params: dict[str, Any] = {} + + if options: + if options.page_number is not None: + params["page[number]"] = options.page_number + if options.page_size is not None: + params["page[size]"] = options.page_size + + while True: + r = self.t.request("GET", url, params=params) + response: dict[str, Any] = r.json() + + # Parse data array + data_list = response.get("data", []) + if not isinstance(data_list, list): + break + + for item in data_list: + yield _workspace_run_task_from(item) + + # Check for next page + links = response.get("links", {}) + next_url = links.get("next") + if not next_url: + break + + # Update URL for next page + url = next_url + params = {} + + def read(self, workspace_id: str, workspace_task_id: str) -> WorkspaceRunTask: + """ + Read a workspace run task by ID. + + Args: + workspace_id: The ID of the workspace + workspace_task_id: The ID of the workspace run task + + Returns: + WorkspaceRunTask object + + Raises: + InvalidWorkspaceIDError: If workspace_id is invalid + InvalidWorkspaceRunTaskIDError: If workspace_task_id is invalid + + API Endpoint: + GET /workspaces/:workspace_id/tasks/:workspace_task_id + """ + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError("Invalid workspace ID") + + if not valid_string_id(workspace_task_id): + raise InvalidWorkspaceRunTaskIDError("Invalid workspace run task ID") + + url = f"/api/v2/workspaces/{workspace_id}/tasks/{workspace_task_id}" + r = self.t.request("GET", url) + response: dict[str, Any] = r.json() + + data = response.get("data", {}) + return _workspace_run_task_from(data) + + def create( + self, + workspace_id: str, + options: WorkspaceRunTaskCreateOptions, + ) -> WorkspaceRunTask: + """ + Create a workspace run task (attach a run task to a workspace). + + The run task must exist in the workspace's organization. + + Args: + workspace_id: The ID of the workspace + options: Creation options including run_task and enforcement_level + + Returns: + Created WorkspaceRunTask object + + Raises: + InvalidWorkspaceIDError: If workspace_id is invalid + InvalidRunTaskIDError: If run_task ID is invalid + + API Endpoint: + POST /workspaces/:workspace_id/tasks + """ + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError("Invalid workspace ID") + + if not options.run_task or not options.run_task.id: + raise InvalidRunTaskIDError("Invalid run task ID") + + url = f"/api/v2/workspaces/{workspace_id}/tasks" + + # Build request payload + payload: dict[str, Any] = { + "data": { + "type": options.type, + "attributes": { + "enforcement-level": options.enforcement_level.value, + }, + "relationships": { + "task": { + "data": { + "type": "tasks", + "id": options.run_task.id, + } + } + }, + } + } + + # Add optional stage (deprecated) + if options.stage is not None: + payload["data"]["attributes"]["stage"] = options.stage.value + + # Add optional stages + if options.stages is not None: + payload["data"]["attributes"]["stages"] = [s.value for s in options.stages] + + r = self.t.request("POST", url, json_body=payload) + response: dict[str, Any] = r.json() + + data = response.get("data", {}) + return _workspace_run_task_from(data) + + def update( + self, + workspace_id: str, + workspace_task_id: str, + options: WorkspaceRunTaskUpdateOptions, + ) -> WorkspaceRunTask: + """ + Update an existing workspace run task. + + Args: + workspace_id: The ID of the workspace + workspace_task_id: The ID of the workspace run task + options: Update options (enforcement_level, stage, stages) + + Returns: + Updated WorkspaceRunTask object + + Raises: + InvalidWorkspaceIDError: If workspace_id is invalid + InvalidWorkspaceRunTaskIDError: If workspace_task_id is invalid + + API Endpoint: + PATCH /workspaces/:workspace_id/tasks/:workspace_task_id + """ + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError("Invalid workspace ID") + + if not valid_string_id(workspace_task_id): + raise InvalidWorkspaceRunTaskIDError("Invalid workspace run task ID") + + url = f"/api/v2/workspaces/{workspace_id}/tasks/{workspace_task_id}" + + # Build request payload + payload: dict[str, Any] = { + "data": { + "type": options.type, + "attributes": {}, + } + } + + # Add optional enforcement level + if options.enforcement_level is not None: + payload["data"]["attributes"]["enforcement-level"] = ( + options.enforcement_level.value + ) + + # Add optional stage (deprecated) + if options.stage is not None: + payload["data"]["attributes"]["stage"] = options.stage.value + + # Add optional stages + if options.stages is not None: + payload["data"]["attributes"]["stages"] = [s.value for s in options.stages] + + r = self.t.request("PATCH", url, json_body=payload) + response: dict[str, Any] = r.json() + + data = response.get("data", {}) + return _workspace_run_task_from(data) + + def delete(self, workspace_id: str, workspace_task_id: str) -> None: + """ + Delete a workspace run task by ID. + + Args: + workspace_id: The ID of the workspace + workspace_task_id: The ID of the workspace run task + + Raises: + InvalidWorkspaceIDError: If workspace_id is invalid + InvalidWorkspaceRunTaskIDError: If workspace_task_id is invalid + + API Endpoint: + DELETE /workspaces/:workspace_id/tasks/:workspace_task_id + """ + if not valid_string_id(workspace_id): + raise InvalidWorkspaceIDError("Invalid workspace ID") + + if not valid_string_id(workspace_task_id): + raise InvalidWorkspaceRunTaskIDError("Invalid workspace run task ID") + + url = f"/api/v2/workspaces/{workspace_id}/tasks/{workspace_task_id}" + self.t.request("DELETE", url) diff --git a/tests/units/test_run_task.py b/tests/units/test_run_task.py index a428d79..fbc5b29 100644 --- a/tests/units/test_run_task.py +++ b/tests/units/test_run_task.py @@ -42,7 +42,7 @@ def test_run_task_from_comprehensive(self): "enabled": True, "global-configuration": { "enabled": True, - "stages": ["pre-plan", "post-apply"], + "stages": ["pre_plan", "post_apply"], "enforcement-level": "mandatory", }, }, @@ -221,7 +221,7 @@ def test_create_run_task(self, run_tasks_service): "hmac_key": "secret-key-123", "global-configuration": { "enabled": True, - "stages": ["pre-plan", "post-plan"], + "stages": ["pre_plan", "post_plan"], "enforcement-level": "mandatory", }, }, diff --git a/tests/units/test_workspace_run_task.py b/tests/units/test_workspace_run_task.py new file mode 100644 index 0000000..fc84784 --- /dev/null +++ b/tests/units/test_workspace_run_task.py @@ -0,0 +1,469 @@ +"""Unit tests for the workspace run task module.""" + +from unittest.mock import Mock + +import pytest + +from pytfe._http import HTTPTransport +from pytfe.errors import ( + InvalidRunTaskIDError, + InvalidWorkspaceIDError, + InvalidWorkspaceRunTaskIDError, +) +from pytfe.models.run_task import RunTask, Stage, TaskEnforcementLevel +from pytfe.models.workspace_run_task import ( + WorkspaceRunTask, + WorkspaceRunTaskCreateOptions, + WorkspaceRunTaskListOptions, + WorkspaceRunTaskUpdateOptions, +) +from pytfe.resources.workspace_run_task import ( + WorkspaceRunTasks, + _workspace_run_task_from, +) + +# Ensure models are fully defined for tests +WorkspaceRunTask.model_rebuild() +WorkspaceRunTaskCreateOptions.model_rebuild() +WorkspaceRunTaskUpdateOptions.model_rebuild() + + +class TestWorkspaceRunTaskFrom: + """Test the _workspace_run_task_from function.""" + + def test_workspace_run_task_from_complete(self): + """Test _workspace_run_task_from with all fields populated.""" + + data = { + "id": "wstask-123", + "attributes": { + "enforcement-level": "mandatory", + "stage": "pre_plan", + "stages": ["pre_plan", "post_plan"], + }, + "relationships": { + "task": {"data": {"id": "task-456", "type": "tasks"}}, + "workspace": {"data": {"id": "ws-789", "type": "workspaces"}}, + }, + } + + result = _workspace_run_task_from(data) + + assert result.id == "wstask-123" + assert result.enforcement_level == TaskEnforcementLevel.MANDATORY + assert result.stage == Stage.PRE_PLAN + assert len(result.stages) == 2 + assert result.stages[0] == Stage.PRE_PLAN + assert result.stages[1] == Stage.POST_PLAN + assert result.run_task is not None + assert result.run_task.id == "task-456" + assert result.workspace is not None + assert result.workspace.id == "ws-789" + + def test_workspace_run_task_from_minimal(self): + """Test _workspace_run_task_from with minimal fields.""" + + data = { + "id": "wstask-minimal", + "attributes": {"enforcement-level": "advisory"}, + } + + result = _workspace_run_task_from(data) + + assert result.id == "wstask-minimal" + assert result.enforcement_level == TaskEnforcementLevel.ADVISORY + # Should have default stage + assert result.stage == Stage.PRE_PLAN + # Should have empty stages list + assert result.stages == [] + # Relationships should be None + assert result.run_task is None + assert result.workspace is None + + def test_workspace_run_task_from_invalid_enforcement_level(self): + """Test _workspace_run_task_from handles invalid enforcement level.""" + + data = { + "id": "wstask-invalid", + "attributes": {"enforcement-level": "invalid-level"}, + } + + result = _workspace_run_task_from(data) + + # Should default to ADVISORY for invalid values + assert result.enforcement_level == TaskEnforcementLevel.ADVISORY + + def test_workspace_run_task_from_invalid_stage(self): + """Test _workspace_run_task_from handles invalid stage.""" + + data = { + "id": "wstask-invalid-stage", + "attributes": { + "enforcement-level": "mandatory", + "stage": "invalid-stage", + "stages": ["pre_plan", "invalid-stage", "post_plan"], + }, + } + + result = _workspace_run_task_from(data) + + # Should default to PRE_PLAN for invalid stage value + assert result.stage == Stage.PRE_PLAN + # Stages list should skip invalid stages + assert len(result.stages) == 2 + assert result.stages[0] == Stage.PRE_PLAN + assert result.stages[1] == Stage.POST_PLAN + + +class TestWorkspaceRunTasks: + """Test the WorkspaceRunTasks service class.""" + + @pytest.fixture + def mock_transport(self): + """Create a mock HTTPTransport.""" + return Mock(spec=HTTPTransport) + + @pytest.fixture + def workspace_run_tasks_service(self, mock_transport): + """Create a WorkspaceRunTasks service with mocked transport.""" + return WorkspaceRunTasks(mock_transport) + + # List Tests + def test_list_with_invalid_workspace_id(self, workspace_run_tasks_service): + """Test list with invalid workspace ID.""" + + with pytest.raises(InvalidWorkspaceIDError): + list(workspace_run_tasks_service.list("")) + + def test_list_success(self, workspace_run_tasks_service, mock_transport): + """Test successful list operation.""" + + mock_response_data = { + "data": [ + { + "id": "wstask-1", + "attributes": { + "enforcement-level": "mandatory", + "stage": "pre_plan", + "stages": ["pre_plan"], + }, + }, + { + "id": "wstask-2", + "attributes": { + "enforcement-level": "advisory", + "stage": "post_plan", + "stages": ["post_plan"], + }, + }, + ], + "links": {}, + } + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + mock_transport.request.return_value = mock_response + + options = WorkspaceRunTaskListOptions(page_number=1, page_size=10) + results = list(workspace_run_tasks_service.list("ws-123", options)) + + assert len(results) == 2 + assert results[0].id == "wstask-1" + assert results[0].enforcement_level == TaskEnforcementLevel.MANDATORY + assert results[1].id == "wstask-2" + assert results[1].enforcement_level == TaskEnforcementLevel.ADVISORY + + # Verify API call + mock_transport.request.assert_called_once() + call_args = mock_transport.request.call_args + assert call_args[0][0] == "GET" + assert call_args[0][1] == "/api/v2/workspaces/ws-123/tasks" + + def test_list_pagination(self, workspace_run_tasks_service, mock_transport): + """Test list with pagination.""" + + # First page + mock_response_page1_data = { + "data": [ + { + "id": "wstask-1", + "attributes": {"enforcement-level": "mandatory"}, + } + ], + "links": {"next": "workspaces/ws-123/tasks?page[number]=2"}, + } + + # Second page + mock_response_page2_data = { + "data": [ + { + "id": "wstask-2", + "attributes": {"enforcement-level": "advisory"}, + } + ], + "links": {}, + } + + mock_response_1 = Mock() + mock_response_1.json.return_value = mock_response_page1_data + mock_response_2 = Mock() + mock_response_2.json.return_value = mock_response_page2_data + mock_transport.request.side_effect = [mock_response_1, mock_response_2] + + results = list(workspace_run_tasks_service.list("ws-123")) + + assert len(results) == 2 + assert results[0].id == "wstask-1" + assert results[1].id == "wstask-2" + assert mock_transport.request.call_count == 2 + + # Read Tests + def test_read_with_invalid_workspace_id(self, workspace_run_tasks_service): + """Test read with invalid workspace ID.""" + + with pytest.raises(InvalidWorkspaceIDError): + workspace_run_tasks_service.read("", "wstask-123") + + def test_read_with_invalid_task_id(self, workspace_run_tasks_service): + """Test read with invalid workspace task ID.""" + + with pytest.raises(InvalidWorkspaceRunTaskIDError): + workspace_run_tasks_service.read("ws-123", "") + + def test_read_success(self, workspace_run_tasks_service, mock_transport): + """Test successful read operation.""" + + mock_response_data = { + "data": { + "id": "wstask-123", + "attributes": { + "enforcement-level": "mandatory", + "stage": "pre_plan", + "stages": ["pre_plan", "post_plan"], + }, + "relationships": { + "task": {"data": {"id": "task-456", "type": "tasks"}}, + "workspace": {"data": {"id": "ws-789", "type": "workspaces"}}, + }, + } + } + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + mock_transport.request.return_value = mock_response + + result = workspace_run_tasks_service.read("ws-789", "wstask-123") + + assert result.id == "wstask-123" + assert result.enforcement_level == TaskEnforcementLevel.MANDATORY + assert result.stage == Stage.PRE_PLAN + assert len(result.stages) == 2 + assert result.run_task.id == "task-456" + assert result.workspace.id == "ws-789" + + # Verify API call + mock_transport.request.assert_called_once_with( + "GET", "/api/v2/workspaces/ws-789/tasks/wstask-123" + ) + + # Create Tests + def test_create_with_invalid_workspace_id(self, workspace_run_tasks_service): + """Test create with invalid workspace ID.""" + + run_task = RunTask( + id="task-123", + name="Test Task", + url="https://example.com", + category="task", + enabled=True, + ) + options = WorkspaceRunTaskCreateOptions( + enforcement_level=TaskEnforcementLevel.MANDATORY, + run_task=run_task, + ) + + with pytest.raises(InvalidWorkspaceIDError): + workspace_run_tasks_service.create("", options) + + def test_create_with_invalid_run_task(self, workspace_run_tasks_service): + """Test create with invalid run task.""" + + # Run task with no ID + run_task = RunTask( + id="", name="Test", url="https://example.com", category="task", enabled=True + ) + options = WorkspaceRunTaskCreateOptions( + enforcement_level=TaskEnforcementLevel.MANDATORY, + run_task=run_task, + ) + + with pytest.raises(InvalidRunTaskIDError): + workspace_run_tasks_service.create("ws-123", options) + + def test_create_success(self, workspace_run_tasks_service, mock_transport): + """Test successful create operation.""" + + mock_response_data = { + "data": { + "id": "wstask-new", + "attributes": { + "enforcement-level": "mandatory", + "stage": "pre_plan", + "stages": ["pre_plan", "post_plan"], + }, + } + } + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + mock_transport.request.return_value = mock_response + + run_task = RunTask( + id="task-123", + name="Test Task", + url="https://example.com", + category="task", + enabled=True, + ) + options = WorkspaceRunTaskCreateOptions( + enforcement_level=TaskEnforcementLevel.MANDATORY, + run_task=run_task, + stages=[Stage.PRE_PLAN, Stage.POST_PLAN], + ) + + result = workspace_run_tasks_service.create("ws-789", options) + + assert result.id == "wstask-new" + assert result.enforcement_level == TaskEnforcementLevel.MANDATORY + + # Verify API call + mock_transport.request.assert_called_once() + call_args = mock_transport.request.call_args + assert call_args[0][0] == "POST" + assert call_args[0][1] == "/api/v2/workspaces/ws-789/tasks" + payload = call_args[1]["json_body"] + assert payload["data"]["type"] == "workspace-tasks" + assert payload["data"]["attributes"]["enforcement-level"] == "mandatory" + assert payload["data"]["attributes"]["stages"] == ["pre_plan", "post_plan"] + assert payload["data"]["relationships"]["task"]["data"]["id"] == "task-123" + + def test_create_with_deprecated_stage( + self, workspace_run_tasks_service, mock_transport + ): + """Test create with deprecated stage attribute.""" + + mock_response_data = { + "data": { + "id": "wstask-new", + "attributes": { + "enforcement-level": "advisory", + "stage": "post_plan", + }, + } + } + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + mock_transport.request.return_value = mock_response + + run_task = RunTask( + id="task-123", + name="Test Task", + url="https://example.com", + category="task", + enabled=True, + ) + options = WorkspaceRunTaskCreateOptions( + enforcement_level=TaskEnforcementLevel.ADVISORY, + run_task=run_task, + stage=Stage.POST_PLAN, + ) + + result = workspace_run_tasks_service.create("ws-789", options) + + assert result.id == "wstask-new" + + # Verify API call includes stage + payload = mock_transport.request.call_args[1]["json_body"] + assert payload["data"]["attributes"]["stage"] == "post_plan" + + # Update Tests + def test_update_with_invalid_workspace_id(self, workspace_run_tasks_service): + """Test update with invalid workspace ID.""" + + options = WorkspaceRunTaskUpdateOptions( + enforcement_level=TaskEnforcementLevel.ADVISORY + ) + + with pytest.raises(InvalidWorkspaceIDError): + workspace_run_tasks_service.update("", "wstask-123", options) + + def test_update_with_invalid_task_id(self, workspace_run_tasks_service): + """Test update with invalid workspace task ID.""" + + options = WorkspaceRunTaskUpdateOptions( + enforcement_level=TaskEnforcementLevel.ADVISORY + ) + + with pytest.raises(InvalidWorkspaceRunTaskIDError): + workspace_run_tasks_service.update("ws-123", "", options) + + def test_update_success(self, workspace_run_tasks_service, mock_transport): + """Test successful update operation.""" + + mock_response_data = { + "data": { + "id": "wstask-123", + "attributes": { + "enforcement-level": "advisory", + "stage": "pre_plan", + "stages": ["pre_plan", "post_plan"], + }, + } + } + + mock_response = Mock() + mock_response.json.return_value = mock_response_data + mock_transport.request.return_value = mock_response + + options = WorkspaceRunTaskUpdateOptions( + enforcement_level=TaskEnforcementLevel.ADVISORY, + stages=[Stage.PRE_PLAN, Stage.POST_PLAN], + ) + + result = workspace_run_tasks_service.update("ws-789", "wstask-123", options) + + assert result.id == "wstask-123" + assert result.enforcement_level == TaskEnforcementLevel.ADVISORY + + # Verify API call + mock_transport.request.assert_called_once() + call_args = mock_transport.request.call_args + assert call_args[0][0] == "PATCH" + assert call_args[0][1] == "/api/v2/workspaces/ws-789/tasks/wstask-123" + payload = call_args[1]["json_body"] + assert payload["data"]["attributes"]["enforcement-level"] == "advisory" + assert payload["data"]["attributes"]["stages"] == ["pre_plan", "post_plan"] + + # Delete Tests + def test_delete_with_invalid_workspace_id(self, workspace_run_tasks_service): + """Test delete with invalid workspace ID.""" + + with pytest.raises(InvalidWorkspaceIDError): + workspace_run_tasks_service.delete("", "wstask-123") + + def test_delete_with_invalid_task_id(self, workspace_run_tasks_service): + """Test delete with invalid workspace task ID.""" + + with pytest.raises(InvalidWorkspaceRunTaskIDError): + workspace_run_tasks_service.delete("ws-123", "") + + def test_delete_success(self, workspace_run_tasks_service, mock_transport): + """Test successful delete operation.""" + + workspace_run_tasks_service.delete("ws-789", "wstask-123") + + # Verify API call + mock_transport.request.assert_called_once_with( + "DELETE", "/api/v2/workspaces/ws-789/tasks/wstask-123" + )