Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions examples/organization_audit_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env python3
"""Organization audit configuration operations example.

Demonstrates:
1. read() - read organization audit configuration
2. test() - send a test audit event
3. update() - update organization audit configuration
"""

import os

from pytfe import TFEClient, TFEConfig
from pytfe.errors import TFEError
from pytfe.models.organization_audit_configuration import (
OrganizationAuditConfigAuditTrails,
OrganizationAuditConfigurationOptions,
)


def main() -> None:
client = TFEClient(TFEConfig.from_env())

organization_name = os.getenv("TFE_ORG", "example-org")

try:
print("[READ] Reading organization audit configuration")
read_result = client.organization_audit_configurations.read(organization_name)
print(f"[READ] id={read_result.id}, updated_at={read_result.updated_at}")
if read_result.audit_trails is not None:
print(f"[READ] audit_trails_enabled={read_result.audit_trails.enabled}")

print("[TEST] Sending test audit event")
test_result = client.organization_audit_configurations.test(organization_name)
print(f"[TEST] request_id={test_result.request_id}")

print("[UPDATE] Updating organization audit configuration")
options = OrganizationAuditConfigurationOptions(
audit_trails=OrganizationAuditConfigAuditTrails(enabled=True)
)
update_result = client.organization_audit_configurations.update(
organization_name,
options,
)
print(f"[UPDATE] id={update_result.id}, updated_at={update_result.updated_at}")

except TFEError as exc:
print(f"API error: {exc}")
print("Check TFE_TOKEN, TFE_ADDRESS, and TFE_ORG.")
finally:
client.close()


if __name__ == "__main__":
main()
4 changes: 4 additions & 0 deletions src/pytfe/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .resources.notification_configuration import NotificationConfigurations
from .resources.oauth_client import OAuthClients
from .resources.oauth_token import OAuthTokens
from .resources.organization_audit_configuration import OrganizationAuditConfigurations
from .resources.organization_membership import OrganizationMemberships
from .resources.organization_token import OrganizationTokens
from .resources.organizations import Organizations
Expand Down Expand Up @@ -81,6 +82,9 @@ def __init__(self, config: TFEConfig | None = None):
self.plans = Plans(self._transport)
self.organizations = Organizations(self._transport)
self.organization_memberships = OrganizationMemberships(self._transport)
self.organization_audit_configurations = OrganizationAuditConfigurations(
self._transport
)
self.explorer = Explorer(
self._transport
) # org Explorer queries and saved views
Expand Down
129 changes: 129 additions & 0 deletions src/pytfe/models/organization_audit_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from __future__ import annotations

from datetime import datetime

from pydantic import BaseModel, ConfigDict, Field

from .organization import Organization


class OrganizationAuditConfigAuditTrails(BaseModel):
"""Audit Trails configuration."""

model_config = ConfigDict(extra="forbid")

enabled: bool = Field(..., description="Whether Audit Trails is enabled")


class OrganizationAuditConfigAuditStreaming(BaseModel):
"""HCP Audit Log Streaming configuration."""

model_config = ConfigDict(populate_by_name=True, extra="forbid")

enabled: bool = Field(..., description="Whether HCP Audit Log Streaming is enabled")
organization_id: str | None = Field(None, alias="organization-id")
use_default_organization: bool = Field(
...,
alias="use-default-organization",
)


class OrganizationAuditConfigPermissions(BaseModel):
"""Permissions for managing audit configuration."""

model_config = ConfigDict(populate_by_name=True, extra="forbid")

can_enable_hcp_audit_log_streaming: bool = Field(
...,
alias="can-enable-hcp-audit-log-streaming",
)
can_set_hcp_audit_log_streaming_organization: bool = Field(
...,
alias="can-set-hcp-audit-log-streaming-organization-id",
)
can_use_default_audit_log_streaming_organization: bool = Field(
...,
alias="can-use-default-audit-log-streaming-organization",
)


class OrganizationAuditConfigTimestamps(BaseModel):
"""Timestamp fields for organization audit configuration events."""

model_config = ConfigDict(populate_by_name=True, extra="forbid")

audit_trails_disabled_at: datetime | None = Field(
None,
alias="audit-trails-disabled-at",
)
audit_trails_enabled_at: datetime | None = Field(
None,
alias="audit-trails-enabled-at",
)
audit_trails_last_failure: datetime | None = Field(
None,
alias="audit-trails-last-failure",
)
audit_trails_last_success: datetime | None = Field(
None,
alias="audit-trails-last-success",
)
hcp_audit_log_streaming_disabled_at: datetime | None = Field(
None,
alias="hcp-audit-log-streaming-disabled-at",
)
hcp_audit_log_streaming_enabled_at: datetime | None = Field(
None,
alias="hcp-audit-log-streaming-enabled-at",
)
hcp_audit_log_streaming_last_failure: datetime | None = Field(
None,
alias="hcp-audit-log-streaming-last-failure",
)
hcp_audit_log_streaming_last_success: datetime | None = Field(
None,
alias="hcp-audit-log-streaming-last-success",
)


class OrganizationAuditConfiguration(BaseModel):
"""Organization audit configuration resource."""

model_config = ConfigDict(populate_by_name=True, extra="forbid")

id: str
audit_trails: OrganizationAuditConfigAuditTrails | None = Field(
None,
alias="audit-trails",
)
hcp_audit_log_streaming: OrganizationAuditConfigAuditStreaming | None = Field(
None,
alias="hcp-audit-log-streaming",
)
permissions: OrganizationAuditConfigPermissions | None = None
timestamps: OrganizationAuditConfigTimestamps | None = None
updated_at: datetime | None = Field(None, alias="updated-at")
organization: Organization | None = None


class OrganizationAuditConfigurationTest(BaseModel):
"""Result payload for sending a test audit event."""

model_config = ConfigDict(populate_by_name=True, extra="forbid")

request_id: str | None = Field(None, alias="request-id")


class OrganizationAuditConfigurationOptions(BaseModel):
"""Options for updating organization audit configuration."""

model_config = ConfigDict(populate_by_name=True, extra="forbid")

audit_trails: OrganizationAuditConfigAuditTrails | None = Field(
None,
alias="audit-trails",
)
hcp_audit_log_streaming: OrganizationAuditConfigAuditStreaming | None = Field(
None,
alias="hcp-audit-log-streaming",
)
103 changes: 103 additions & 0 deletions src/pytfe/resources/organization_audit_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from __future__ import annotations

from typing import Any
from urllib.parse import quote

from ..errors import ERR_INVALID_ORG
from ..models.organization import Organization
from ..models.organization_audit_configuration import (
OrganizationAuditConfiguration,
OrganizationAuditConfigurationOptions,
OrganizationAuditConfigurationTest,
)
from ..utils import valid_string_id
from ._base import _Service


class OrganizationAuditConfigurations(_Service):
"""Organization audit configuration service."""

def read(self, organization: str) -> OrganizationAuditConfiguration:
"""Read an organization's audit configuration by organization name."""
if not valid_string_id(organization):
raise ValueError(ERR_INVALID_ORG)

path = f"/api/v2/organizations/{quote(organization)}/audit-configuration"
response = self.t.request("GET", path)
payload = response.json() or {}
data = payload.get("data")
if not isinstance(data, dict):
raise ValueError("Invalid response format")

return self._parse_audit_configuration(data)

def test(self, organization: str) -> OrganizationAuditConfigurationTest:
"""Send a test audit event for an organization."""
if not valid_string_id(organization):
raise ValueError(ERR_INVALID_ORG)

path = f"/api/v2/organizations/{quote(organization)}/audit-configuration/test"
response = self.t.request("POST", path)
payload = response.json() or {}

if isinstance(payload, dict) and "request-id" in payload:
return OrganizationAuditConfigurationTest.model_validate(payload)

data = payload.get("data") if isinstance(payload, dict) else None
if isinstance(data, dict):
if "request-id" in data:
return OrganizationAuditConfigurationTest.model_validate(data)
attrs = data.get("attributes")
if isinstance(attrs, dict) and "request-id" in attrs:
return OrganizationAuditConfigurationTest.model_validate(attrs)

return OrganizationAuditConfigurationTest.model_validate({})

def update(
self,
organization: str,
options: OrganizationAuditConfigurationOptions,
) -> OrganizationAuditConfiguration:
"""Update an organization's audit configuration."""
if not valid_string_id(organization):
raise ValueError(ERR_INVALID_ORG)

attrs = options.model_dump(by_alias=True, exclude_none=True)
body: dict[str, Any] = {
"data": {
"type": "audit-configurations",
"attributes": attrs,
}
}

path = f"/api/v2/organizations/{quote(organization)}/audit-configuration"
response = self.t.request("PATCH", path, json_body=body)
payload = response.json() or {}
data = payload.get("data")
if not isinstance(data, dict):
raise ValueError("Invalid response format")

return self._parse_audit_configuration(data)

def _parse_audit_configuration(
self, data: dict[str, Any]
) -> OrganizationAuditConfiguration:
attrs = data.get("attributes", {})
relationships = data.get("relationships", {})

org = None
org_data = relationships.get("organization", {}).get("data")
if isinstance(org_data, dict):
org = Organization(id=org_data.get("id"))

return OrganizationAuditConfiguration.model_validate(
{
"id": data.get("id", ""),
"audit-trails": attrs.get("audit-trails"),
"hcp-audit-log-streaming": attrs.get("hcp-audit-log-streaming"),
"permissions": attrs.get("permissions"),
"timestamps": attrs.get("timestamps"),
"updated-at": attrs.get("updated-at"),
"organization": org,
}
)
Loading
Loading