diff --git a/examples/organization_audit_configuration.py b/examples/organization_audit_configuration.py new file mode 100644 index 0000000..3153510 --- /dev/null +++ b/examples/organization_audit_configuration.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +"""Organization audit configuration operations example. + +Demonstrates: +1. read() - read organization audit configuration +2. test() - send a test audit event +3. update() - update organization audit configuration +""" + +import os + +from pytfe import TFEClient, TFEConfig +from pytfe.errors import TFEError +from pytfe.models.organization_audit_configuration import ( + OrganizationAuditConfigAuditTrails, + OrganizationAuditConfigurationOptions, +) + + +def main() -> None: + client = TFEClient(TFEConfig.from_env()) + + organization_name = os.getenv("TFE_ORG", "example-org") + + try: + print("[READ] Reading organization audit configuration") + read_result = client.organization_audit_configurations.read(organization_name) + print(f"[READ] id={read_result.id}, updated_at={read_result.updated_at}") + if read_result.audit_trails is not None: + print(f"[READ] audit_trails_enabled={read_result.audit_trails.enabled}") + + print("[TEST] Sending test audit event") + test_result = client.organization_audit_configurations.test(organization_name) + print(f"[TEST] request_id={test_result.request_id}") + + print("[UPDATE] Updating organization audit configuration") + options = OrganizationAuditConfigurationOptions( + audit_trails=OrganizationAuditConfigAuditTrails(enabled=True) + ) + update_result = client.organization_audit_configurations.update( + organization_name, + options, + ) + print(f"[UPDATE] id={update_result.id}, updated_at={update_result.updated_at}") + + except TFEError as exc: + print(f"API error: {exc}") + print("Check TFE_TOKEN, TFE_ADDRESS, and TFE_ORG.") + finally: + client.close() + + +if __name__ == "__main__": + main() diff --git a/src/pytfe/client.py b/src/pytfe/client.py index 4aead33..f0f7fd2 100644 --- a/src/pytfe/client.py +++ b/src/pytfe/client.py @@ -14,6 +14,7 @@ from .resources.notification_configuration import NotificationConfigurations from .resources.oauth_client import OAuthClients from .resources.oauth_token import OAuthTokens +from .resources.organization_audit_configuration import OrganizationAuditConfigurations from .resources.organization_membership import OrganizationMemberships from .resources.organization_token import OrganizationTokens from .resources.organizations import Organizations @@ -81,6 +82,9 @@ def __init__(self, config: TFEConfig | None = None): self.plans = Plans(self._transport) self.organizations = Organizations(self._transport) self.organization_memberships = OrganizationMemberships(self._transport) + self.organization_audit_configurations = OrganizationAuditConfigurations( + self._transport + ) self.explorer = Explorer( self._transport ) # org Explorer queries and saved views diff --git a/src/pytfe/models/organization_audit_configuration.py b/src/pytfe/models/organization_audit_configuration.py new file mode 100644 index 0000000..a045396 --- /dev/null +++ b/src/pytfe/models/organization_audit_configuration.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, ConfigDict, Field + +from .organization import Organization + + +class OrganizationAuditConfigAuditTrails(BaseModel): + """Audit Trails configuration.""" + + model_config = ConfigDict(extra="forbid") + + enabled: bool = Field(..., description="Whether Audit Trails is enabled") + + +class OrganizationAuditConfigAuditStreaming(BaseModel): + """HCP Audit Log Streaming configuration.""" + + model_config = ConfigDict(populate_by_name=True, extra="forbid") + + enabled: bool = Field(..., description="Whether HCP Audit Log Streaming is enabled") + organization_id: str | None = Field(None, alias="organization-id") + use_default_organization: bool = Field( + ..., + alias="use-default-organization", + ) + + +class OrganizationAuditConfigPermissions(BaseModel): + """Permissions for managing audit configuration.""" + + model_config = ConfigDict(populate_by_name=True, extra="forbid") + + can_enable_hcp_audit_log_streaming: bool = Field( + ..., + alias="can-enable-hcp-audit-log-streaming", + ) + can_set_hcp_audit_log_streaming_organization: bool = Field( + ..., + alias="can-set-hcp-audit-log-streaming-organization-id", + ) + can_use_default_audit_log_streaming_organization: bool = Field( + ..., + alias="can-use-default-audit-log-streaming-organization", + ) + + +class OrganizationAuditConfigTimestamps(BaseModel): + """Timestamp fields for organization audit configuration events.""" + + model_config = ConfigDict(populate_by_name=True, extra="forbid") + + audit_trails_disabled_at: datetime | None = Field( + None, + alias="audit-trails-disabled-at", + ) + audit_trails_enabled_at: datetime | None = Field( + None, + alias="audit-trails-enabled-at", + ) + audit_trails_last_failure: datetime | None = Field( + None, + alias="audit-trails-last-failure", + ) + audit_trails_last_success: datetime | None = Field( + None, + alias="audit-trails-last-success", + ) + hcp_audit_log_streaming_disabled_at: datetime | None = Field( + None, + alias="hcp-audit-log-streaming-disabled-at", + ) + hcp_audit_log_streaming_enabled_at: datetime | None = Field( + None, + alias="hcp-audit-log-streaming-enabled-at", + ) + hcp_audit_log_streaming_last_failure: datetime | None = Field( + None, + alias="hcp-audit-log-streaming-last-failure", + ) + hcp_audit_log_streaming_last_success: datetime | None = Field( + None, + alias="hcp-audit-log-streaming-last-success", + ) + + +class OrganizationAuditConfiguration(BaseModel): + """Organization audit configuration resource.""" + + model_config = ConfigDict(populate_by_name=True, extra="forbid") + + id: str + audit_trails: OrganizationAuditConfigAuditTrails | None = Field( + None, + alias="audit-trails", + ) + hcp_audit_log_streaming: OrganizationAuditConfigAuditStreaming | None = Field( + None, + alias="hcp-audit-log-streaming", + ) + permissions: OrganizationAuditConfigPermissions | None = None + timestamps: OrganizationAuditConfigTimestamps | None = None + updated_at: datetime | None = Field(None, alias="updated-at") + organization: Organization | None = None + + +class OrganizationAuditConfigurationTest(BaseModel): + """Result payload for sending a test audit event.""" + + model_config = ConfigDict(populate_by_name=True, extra="forbid") + + request_id: str | None = Field(None, alias="request-id") + + +class OrganizationAuditConfigurationOptions(BaseModel): + """Options for updating organization audit configuration.""" + + model_config = ConfigDict(populate_by_name=True, extra="forbid") + + audit_trails: OrganizationAuditConfigAuditTrails | None = Field( + None, + alias="audit-trails", + ) + hcp_audit_log_streaming: OrganizationAuditConfigAuditStreaming | None = Field( + None, + alias="hcp-audit-log-streaming", + ) diff --git a/src/pytfe/resources/organization_audit_configuration.py b/src/pytfe/resources/organization_audit_configuration.py new file mode 100644 index 0000000..ee51ef5 --- /dev/null +++ b/src/pytfe/resources/organization_audit_configuration.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +from typing import Any +from urllib.parse import quote + +from ..errors import ERR_INVALID_ORG +from ..models.organization import Organization +from ..models.organization_audit_configuration import ( + OrganizationAuditConfiguration, + OrganizationAuditConfigurationOptions, + OrganizationAuditConfigurationTest, +) +from ..utils import valid_string_id +from ._base import _Service + + +class OrganizationAuditConfigurations(_Service): + """Organization audit configuration service.""" + + def read(self, organization: str) -> OrganizationAuditConfiguration: + """Read an organization's audit configuration by organization name.""" + if not valid_string_id(organization): + raise ValueError(ERR_INVALID_ORG) + + path = f"/api/v2/organizations/{quote(organization)}/audit-configuration" + response = self.t.request("GET", path) + payload = response.json() or {} + data = payload.get("data") + if not isinstance(data, dict): + raise ValueError("Invalid response format") + + return self._parse_audit_configuration(data) + + def test(self, organization: str) -> OrganizationAuditConfigurationTest: + """Send a test audit event for an organization.""" + if not valid_string_id(organization): + raise ValueError(ERR_INVALID_ORG) + + path = f"/api/v2/organizations/{quote(organization)}/audit-configuration/test" + response = self.t.request("POST", path) + payload = response.json() or {} + + if isinstance(payload, dict) and "request-id" in payload: + return OrganizationAuditConfigurationTest.model_validate(payload) + + data = payload.get("data") if isinstance(payload, dict) else None + if isinstance(data, dict): + if "request-id" in data: + return OrganizationAuditConfigurationTest.model_validate(data) + attrs = data.get("attributes") + if isinstance(attrs, dict) and "request-id" in attrs: + return OrganizationAuditConfigurationTest.model_validate(attrs) + + return OrganizationAuditConfigurationTest.model_validate({}) + + def update( + self, + organization: str, + options: OrganizationAuditConfigurationOptions, + ) -> OrganizationAuditConfiguration: + """Update an organization's audit configuration.""" + if not valid_string_id(organization): + raise ValueError(ERR_INVALID_ORG) + + attrs = options.model_dump(by_alias=True, exclude_none=True) + body: dict[str, Any] = { + "data": { + "type": "audit-configurations", + "attributes": attrs, + } + } + + path = f"/api/v2/organizations/{quote(organization)}/audit-configuration" + response = self.t.request("PATCH", path, json_body=body) + payload = response.json() or {} + data = payload.get("data") + if not isinstance(data, dict): + raise ValueError("Invalid response format") + + return self._parse_audit_configuration(data) + + def _parse_audit_configuration( + self, data: dict[str, Any] + ) -> OrganizationAuditConfiguration: + attrs = data.get("attributes", {}) + relationships = data.get("relationships", {}) + + org = None + org_data = relationships.get("organization", {}).get("data") + if isinstance(org_data, dict): + org = Organization(id=org_data.get("id")) + + return OrganizationAuditConfiguration.model_validate( + { + "id": data.get("id", ""), + "audit-trails": attrs.get("audit-trails"), + "hcp-audit-log-streaming": attrs.get("hcp-audit-log-streaming"), + "permissions": attrs.get("permissions"), + "timestamps": attrs.get("timestamps"), + "updated-at": attrs.get("updated-at"), + "organization": org, + } + ) diff --git a/tests/units/test_organization_audit_configuration.py b/tests/units/test_organization_audit_configuration.py new file mode 100644 index 0000000..5603335 --- /dev/null +++ b/tests/units/test_organization_audit_configuration.py @@ -0,0 +1,159 @@ +"""Unit tests for organization audit configuration service.""" + +import os +import sys +from unittest.mock import Mock, patch + +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) + +from pytfe._http import HTTPTransport +from pytfe.errors import ERR_INVALID_ORG +from pytfe.models.organization_audit_configuration import ( + OrganizationAuditConfigAuditStreaming, + OrganizationAuditConfigAuditTrails, + OrganizationAuditConfiguration, + OrganizationAuditConfigurationOptions, + OrganizationAuditConfigurationTest, +) +from pytfe.resources.organization_audit_configuration import ( + OrganizationAuditConfigurations, +) + + +class TestOrganizationAuditConfigurations: + @pytest.fixture + def mock_transport(self): + return Mock(spec=HTTPTransport) + + @pytest.fixture + def service(self, mock_transport): + return OrganizationAuditConfigurations(mock_transport) + + def test_read_success(self, service): + mock_response_data = { + "data": { + "id": "acfg-123", + "attributes": { + "audit-trails": {"enabled": True}, + "hcp-audit-log-streaming": { + "enabled": False, + "organization-id": "org-123", + "use-default-organization": True, + }, + "updated-at": "2025-01-01T00:00:00Z", + }, + "relationships": { + "organization": {"data": {"id": "org-123", "type": "organizations"}} + }, + } + } + mock_response = Mock() + mock_response.json.return_value = mock_response_data + + with patch.object(service, "t") as mock_t: + mock_t.request.return_value = mock_response + result = service.read("test-org") + + assert isinstance(result, OrganizationAuditConfiguration) + assert result.id == "acfg-123" + assert result.audit_trails is not None + assert result.audit_trails.enabled is True + assert result.organization is not None + assert result.organization.id == "org-123" + + call_args = mock_t.request.call_args + assert call_args[0][0] == "GET" + assert ( + call_args[0][1] == "/api/v2/organizations/test-org/audit-configuration" + ) + + def test_read_validation_errors(self, service): + with pytest.raises(ValueError, match=ERR_INVALID_ORG): + service.read("") + + with pytest.raises(ValueError, match=ERR_INVALID_ORG): + service.read(None) + + def test_test_success(self, service): + mock_response = Mock() + mock_response.json.return_value = {"request-id": "req-123"} + + with patch.object(service, "t") as mock_t: + mock_t.request.return_value = mock_response + result = service.test("test-org") + + assert isinstance(result, OrganizationAuditConfigurationTest) + assert result.request_id == "req-123" + + call_args = mock_t.request.call_args + assert call_args[0][0] == "POST" + assert ( + call_args[0][1] + == "/api/v2/organizations/test-org/audit-configuration/test" + ) + + def test_test_validation_errors(self, service): + with pytest.raises(ValueError, match=ERR_INVALID_ORG): + service.test("") + + with pytest.raises(ValueError, match=ERR_INVALID_ORG): + service.test(None) + + def test_update_success(self, service): + mock_response_data = { + "data": { + "id": "acfg-123", + "attributes": { + "audit-trails": {"enabled": True}, + "hcp-audit-log-streaming": { + "enabled": True, + "organization-id": "org-123", + "use-default-organization": False, + }, + "updated-at": "2025-01-01T00:00:00Z", + }, + "relationships": { + "organization": {"data": {"id": "org-123", "type": "organizations"}} + }, + } + } + mock_response = Mock() + mock_response.json.return_value = mock_response_data + + with patch.object(service, "t") as mock_t: + mock_t.request.return_value = mock_response + + options = OrganizationAuditConfigurationOptions( + audit_trails=OrganizationAuditConfigAuditTrails(enabled=True), + hcp_audit_log_streaming=OrganizationAuditConfigAuditStreaming( + enabled=True, + organization_id="org-123", + use_default_organization=False, + ), + ) + + result = service.update("test-org", options) + assert isinstance(result, OrganizationAuditConfiguration) + + call_args = mock_t.request.call_args + assert call_args[0][0] == "PATCH" + assert ( + call_args[0][1] == "/api/v2/organizations/test-org/audit-configuration" + ) + assert call_args[1]["json_body"]["data"]["type"] == "audit-configurations" + attrs = call_args[1]["json_body"]["data"]["attributes"] + assert attrs["audit-trails"]["enabled"] is True + assert attrs["hcp-audit-log-streaming"]["organization-id"] == "org-123" + + def test_update_validation_errors(self, service): + options = OrganizationAuditConfigurationOptions( + audit_trails=OrganizationAuditConfigAuditTrails(enabled=False) + ) + + with pytest.raises(ValueError, match=ERR_INVALID_ORG): + service.update("", options) + + with pytest.raises(ValueError, match=ERR_INVALID_ORG): + service.update(None, options)