diff --git a/examples/task_stage_example.py b/examples/task_stage_example.py new file mode 100644 index 0000000..d16f637 --- /dev/null +++ b/examples/task_stage_example.py @@ -0,0 +1,60 @@ +""" +Example usage of TaskStages API + +Demonstrates: +- Read a task stage +- List task stages for a run +- Override a task stage +""" + +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from pytfe import TFEClient, TFEConfig + + +def main(): + client = TFEClient(TFEConfig.from_env()) + + task_stage_id = os.getenv("TFE_TASK_STAGE_ID") + run_id = os.getenv("TFE_RUN_ID") + + if not task_stage_id or not run_id: + print("Please set TFE_TASK_STAGE_ID and TFE_RUN_ID") + return + + print("=== TaskStages Example ===") + + # READ + print("\nReading task stage...") + try: + stage = client.task_stages.read(task_stage_id) + print(f"ID: {stage.id}") + print(f"Stage: {stage.stage}") + print(f"Status: {stage.status}") + print(f"Run: {stage.run.id if stage.run else None}") + except Exception as e: + print(f"Read failed: {e}") + + # LIST + print("\nListing task stages...") + try: + stages = list(client.task_stages.list(run_id)) + for s in stages: + print(f"{s.id} - {s.status}") + except Exception as e: + print(f"List failed: {e}") + + # OVERRIDE + print("\nOverriding task stage...") + try: + client.task_stages.override(task_stage_id, comment="Approved") + print("Override successful") + except Exception as e: + print(f"Override failed: {e}") + + +if __name__ == "__main__": + main() diff --git a/src/pytfe/client.py b/src/pytfe/client.py index 4642d9a..4d93aa5 100644 --- a/src/pytfe/client.py +++ b/src/pytfe/client.py @@ -39,6 +39,7 @@ from .resources.stack_configuration import StackConfigurations from .resources.state_version_outputs import StateVersionOutputs from .resources.state_versions import StateVersions +from .resources.task_stage import TaskStages from .resources.team import Teams from .resources.team_project_access import TeamProjectAccesses from .resources.user import Users @@ -102,6 +103,7 @@ def __init__(self, config: TFEConfig | None = None): self.run_tasks = RunTasks(self._transport) self.run_triggers = RunTriggers(self._transport) self.runs = Runs(self._transport) + self.task_stages = TaskStages(self._transport) self.query_runs = QueryRuns(self._transport) self.run_events = RunEvents(self._transport) self.policies = Policies(self._transport) diff --git a/src/pytfe/models/task_result.py b/src/pytfe/models/task_result.py new file mode 100644 index 0000000..506994b --- /dev/null +++ b/src/pytfe/models/task_result.py @@ -0,0 +1,87 @@ +# Copyright IBM Corp. 2025, 2026 +# SPDX-License-Identifier: MPL-2.0 + +from __future__ import annotations + +from datetime import datetime +from enum import Enum +from typing import TYPE_CHECKING, Any + +from pydantic import BaseModel, ConfigDict, Field + +if TYPE_CHECKING: + from pytfe.models.task_stage import TaskStage + + +class TaskResultStatus(str, Enum): + passed = "passed" + failed = "failed" + pending = "pending" + running = "running" + unreachable = "unreachable" + errored = "errored" + + +class TaskEnforcementLevel(str, Enum): + advisory = "advisory" + mandatory = "mandatory" + + +class TaskResultStatusTimestamps(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + errored_at: datetime | None = Field(None, alias="errored-at") + running_at: datetime | None = Field(None, alias="running-at") + canceled_at: datetime | None = Field(None, alias="canceled-at") + failed_at: datetime | None = Field(None, alias="failed-at") + passed_at: datetime | None = Field(None, alias="passed-at") + + +class TaskResult(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + id: str + status: TaskResultStatus | None = Field(None, alias="status") + message: str | None = Field(None, alias="message") + + status_timestamps: TaskResultStatusTimestamps | None = Field( + None, alias="status-timestamps" + ) + + url: str | None = Field(None, alias="url") + + created_at: datetime | None = Field(None, alias="created-at") + updated_at: datetime | None = Field(None, alias="updated-at") + + task_id: str | None = Field(None, alias="task-id") + task_name: str | None = Field(None, alias="task-name") + task_url: str | None = Field(None, alias="task-url") + + workspace_task_id: str | None = Field(None, alias="workspace-task-id") + workspace_task_enforcement_level: TaskEnforcementLevel | None = Field( + None, alias="workspace-task-enforcement-level" + ) + + agent_pool_id: str | None = Field(None, alias="agent-pool-id") + task_stage: TaskStage | None = Field(None, alias="task-stage") + + @classmethod + def model_validate(cls, *args: Any, **kwargs: Any) -> TaskResult: + if not getattr(cls, "__pydantic_complete__", True): + _rebuild_task_result_model() + return super().model_validate(*args, **kwargs) + + +def _rebuild_task_result_model() -> None: + try: + from pytfe.models.task_stage import TaskStage + + TaskResult.model_rebuild( + raise_errors=False, + _types_namespace={"TaskStage": TaskStage}, + ) + except Exception: + pass + + +_rebuild_task_result_model() diff --git a/src/pytfe/models/task_stage.py b/src/pytfe/models/task_stage.py index 54b9346..bb52831 100644 --- a/src/pytfe/models/task_stage.py +++ b/src/pytfe/models/task_stage.py @@ -3,23 +3,116 @@ from __future__ import annotations -from pydantic import BaseModel, ConfigDict +from datetime import datetime +from enum import Enum +from typing import TYPE_CHECKING + +from pydantic import BaseModel, ConfigDict, Field + +from pytfe.models.policy_evaluation import PolicyEvaluation +from pytfe.models.task_result import TaskResult + +if TYPE_CHECKING: + from pytfe.models.run import Run + + +class Stage(str, Enum): + pre_plan = "pre_plan" + post_plan = "post_plan" + pre_apply = "pre_apply" + post_apply = "post_apply" + + +class TaskStageStatus(str, Enum): + pending = "pending" + running = "running" + passed = "passed" + failed = "failed" + awaiting_override = "awaiting_override" + canceled = "canceled" + errored = "errored" + unreachable = "unreachable" + + +class TaskStageStatusTimestamps(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + errored_at: datetime | None = Field(None, alias="errored-at") + running_at: datetime | None = Field(None, alias="running-at") + canceled_at: datetime | None = Field(None, alias="canceled-at") + failed_at: datetime | None = Field(None, alias="failed-at") + passed_at: datetime | None = Field(None, alias="passed-at") + + +class Permissions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + can_override_policy: bool | None = Field(None, alias="can-override-policy") + can_override_tasks: bool | None = Field(None, alias="can-override-tasks") + can_override: bool | None = Field(None, alias="can-override") + + +class Actions(BaseModel): + model_config = ConfigDict(populate_by_name=True, validate_by_name=True) + + is_overridable: bool | None = Field(None, alias="is-overridable") -# TaskStage represents a HCP Terraform or Terraform Enterprise run's stage where run tasks can occur class TaskStage(BaseModel): model_config = ConfigDict(populate_by_name=True, validate_by_name=True) id: str - # stage: Stage = Field(..., alias="stage") - # status: TaskStageStatus = Field(..., alias="status") - # status_timestamps: TaskStageStatusTimestamps = Field(..., alias="status-timestamps") - # created_at: datetime = Field(..., alias="created-at") - # updated_at: datetime = Field(..., alias="updated-at") - # permissions: Permissions = Field(..., alias="permissions") - # actions: Actions = Field(..., alias="actions") - - # # Relations - # run: Run = Field(..., alias="run") - # task_results: list[TaskResult] = Field(..., alias="task-results") - # policy_evaluations: list[PolicyEvaluation] = Field(..., alias="policy-evaluations") + + stage: Stage = Field(..., alias="stage") + + status: TaskStageStatus = Field( + ..., + alias="status", + ) + + status_timestamps: TaskStageStatusTimestamps = Field( + ..., + alias="status-timestamps", + ) + + created_at: datetime = Field(..., alias="created-at") + updated_at: datetime = Field(..., alias="updated-at") + + permissions: Permissions | None = Field( + None, + alias="permissions", + ) + + actions: Actions | None = Field( + None, + alias="actions", + ) + + # Relationships + run: Run | None = Field( + None, + alias="run", + ) + + task_results: list[TaskResult] | None = Field( + None, + alias="task-results", + ) + + policy_evaluations: list[PolicyEvaluation] | None = Field( + None, + alias="policy-evaluations", + ) + + +def _rebuild_task_stage_model() -> None: + TaskStage.model_rebuild( + raise_errors=False, + _types_namespace={ + "TaskResult": TaskResult, + "PolicyEvaluation": PolicyEvaluation, + }, + ) + + +_rebuild_task_stage_model() diff --git a/src/pytfe/resources/task_stage.py b/src/pytfe/resources/task_stage.py new file mode 100644 index 0000000..0eb628c --- /dev/null +++ b/src/pytfe/resources/task_stage.py @@ -0,0 +1,104 @@ +# Copyright IBM Corp. 2025, 2026 +# SPDX-License-Identifier: MPL-2.0 + +from __future__ import annotations + +from collections.abc import Iterator +from typing import Any + +from ..errors import InvalidRunIDError +from ..models.policy_evaluation import PolicyEvaluation +from ..models.run import Run +from ..models.task_result import TaskResult +from ..models.task_stage import TaskStage +from ..utils import _safe_str, valid_string_id +from ._base import _Service + + +class TaskStages(_Service): + """TaskStages provides access to task stage endpoints.""" + + def _parse_task_stage(self, data: dict[str, Any]) -> TaskStage: + TaskStage.model_rebuild( + raise_errors=False, + _types_namespace={"Run": Run}, + ) + + attributes = data.get("attributes", {}) + + attributes["id"] = _safe_str(data.get("id")) + + relationships = data.get("relationships", {}) + + run_data = relationships.get("run", {}).get("data") + if run_data: + attributes["run"] = Run.model_validate(run_data) + + task_results_data = relationships.get("task-results", {}).get( + "data", + [], + ) + + attributes["task-results"] = [ + TaskResult.model_validate(task_result) for task_result in task_results_data + ] + + policy_evaluations_data = relationships.get( + "policy-evaluations", + {}, + ).get( + "data", + [], + ) + + attributes["policy-evaluations"] = [ + PolicyEvaluation.model_validate(policy_evaluation) + for policy_evaluation in policy_evaluations_data + ] + + return TaskStage.model_validate(attributes) + + # Read + def read(self, task_stage_id: str) -> TaskStage: + if not valid_string_id(task_stage_id): + raise ValueError("Invalid task_stage_id") + + response = self.t.request( + "GET", + f"/api/v2/task-stages/{task_stage_id}", + ) + + data = response.json().get("data", {}) + + return self._parse_task_stage(data) + + # List + def list(self, run_id: str) -> Iterator[TaskStage]: + if not valid_string_id(run_id): + raise InvalidRunIDError() + + path = f"/api/v2/runs/{run_id}/task-stages" + + for item in self._list(path): + yield self._parse_task_stage(item) + + # Override + def override( + self, + task_stage_id: str, + comment: str | None = None, + ) -> TaskStage: + if not valid_string_id(task_stage_id): + raise ValueError("Invalid task_stage_id") + + body: dict[str, Any] | None = {"comment": comment} if comment else None + + response = self.t.request( + "POST", + f"/api/v2/task-stages/{task_stage_id}/actions/override", + json_body=body, + ) + + data = response.json().get("data", {}) + + return self._parse_task_stage(data) diff --git a/tests/test_task_stage.py b/tests/test_task_stage.py new file mode 100644 index 0000000..56a54d0 --- /dev/null +++ b/tests/test_task_stage.py @@ -0,0 +1,186 @@ +import pytest + +from pytfe.client import TFEClient +from pytfe.models.task_stage import TaskStage +from pytfe.resources.task_stage import TaskStages + +# --------------------------- +# Basic existence tests +# --------------------------- + + +def test_task_stage_service_exists(): + client = TFEClient() + assert hasattr(client, "task_stages") + + +def test_task_stage_methods_exist(): + client = TFEClient() + + assert hasattr(client.task_stages, "read") + assert hasattr(client.task_stages, "list") + assert hasattr(client.task_stages, "override") + + +# --------------------------- +# Read method tests +# --------------------------- + + +def test_read_raises_error_when_id_missing(): + client = TFEClient() + + with pytest.raises(ValueError): + client.task_stages.read("") + + +def test_read_calls_request_correctly(mocker): + mock_transport = mocker.Mock() + + mock_response = mocker.Mock() + mock_response.json.return_value = { + "data": { + "id": "ts-123", + "attributes": { + "stage": "pre_plan", + "status": "pending", + "status-timestamps": {}, + "created-at": "2024-01-01T00:00:00Z", + "updated-at": "2024-01-01T00:00:00Z", + }, + } + } + + mock_transport.request.return_value = mock_response + + service = TaskStages(mock_transport) + + result = service.read("ts-123") + + assert isinstance(result, TaskStage) + + mock_transport.request.assert_called_once_with( + "GET", + "/api/v2/task-stages/ts-123", + ) + + +# --------------------------- +# List method tests +# --------------------------- + + +def test_list_with_valid_id_does_not_raise(mocker): + mock_transport = mocker.Mock() + + service = TaskStages(mock_transport) + + service._list = mocker.Mock(return_value=[]) + + result = list(service.list("run-123")) + + assert result == [] + + +def test_list_calls_internal_list(mocker): + mock_transport = mocker.Mock() + + service = TaskStages(mock_transport) + + service._list = mocker.Mock( + return_value=[ + { + "id": "ts-1", + "attributes": { + "stage": "pre_plan", + "status": "pending", + "status-timestamps": {}, + "created-at": "2024-01-01T00:00:00Z", + "updated-at": "2024-01-01T00:00:00Z", + }, + } + ] + ) + + result = list(service.list("run-123")) + + assert len(result) == 1 + assert isinstance(result[0], TaskStage) + + service._list.assert_called_once_with("/api/v2/runs/run-123/task-stages") + + +# --------------------------- +# Override method tests +# --------------------------- + + +def test_override_raises_error_when_id_missing(): + client = TFEClient() + + with pytest.raises(ValueError): + client.task_stages.override("") + + +def test_override_calls_request_without_comment(mocker): + mock_transport = mocker.Mock() + + mock_response = mocker.Mock() + mock_response.json.return_value = { + "data": { + "id": "ts-123", + "attributes": { + "stage": "pre_plan", + "status": "pending", + "status-timestamps": {}, + "created-at": "2024-01-01T00:00:00Z", + "updated-at": "2024-01-01T00:00:00Z", + }, + } + } + + mock_transport.request.return_value = mock_response + + service = TaskStages(mock_transport) + + result = service.override("ts-123") + + assert isinstance(result, TaskStage) + + mock_transport.request.assert_called_once_with( + "POST", + "/api/v2/task-stages/ts-123/actions/override", + json_body=None, + ) + + +def test_override_calls_request_with_comment(mocker): + mock_transport = mocker.Mock() + + mock_response = mocker.Mock() + mock_response.json.return_value = { + "data": { + "id": "ts-123", + "attributes": { + "stage": "pre_plan", + "status": "pending", + "status-timestamps": {}, + "created-at": "2024-01-01T00:00:00Z", + "updated-at": "2024-01-01T00:00:00Z", + }, + } + } + + mock_transport.request.return_value = mock_response + + service = TaskStages(mock_transport) + + result = service.override("ts-123", comment="approved") + + assert isinstance(result, TaskStage) + + mock_transport.request.assert_called_once_with( + "POST", + "/api/v2/task-stages/ts-123/actions/override", + json_body={"comment": "approved"}, + )