diff --git a/agentex/docker-compose.yml b/agentex/docker-compose.yml index b3a935b3..4bedf1b2 100644 --- a/agentex/docker-compose.yml +++ b/agentex/docker-compose.yml @@ -170,6 +170,12 @@ services: - ALLOWED_ORIGINS=http://localhost:3000 - OTEL_EXPORTER_OTLP_ENDPOINT=http://agentex-otel-collector:4317 - OTEL_SERVICE_NAME=agentex-api + - RETENTION_CLEANUP_ENABLED=${RETENTION_CLEANUP_ENABLED:-false} + - RETENTION_CLEANUP_AGENT_ALLOWLIST=${RETENTION_CLEANUP_AGENT_ALLOWLIST:-} + - RETENTION_CLEANUP_IDLE_DAYS=${RETENTION_CLEANUP_IDLE_DAYS:-7} + - RETENTION_CLEANUP_CRON=${RETENTION_CLEANUP_CRON:-0 4 * * *} + - RETENTION_CLEANUP_PAGE_SIZE=${RETENTION_CLEANUP_PAGE_SIZE:-200} + - RETENTION_CLEANUP_MAX_IN_FLIGHT=${RETENTION_CLEANUP_MAX_IN_FLIGHT:-20} ports: - "5003:5003" volumes: @@ -200,6 +206,7 @@ services: alembic upgrade head && popd && python src/temporal/run_healthcheck_workflow.py && + python src/temporal/run_retention_cleanup_schedule.py && echo 'Starting API server...' && uvicorn src.api.app:app --host 0.0.0.0 --port 5003 --reload --reload-dir /app/src --reload-include '*.py' --workers 1 " @@ -228,6 +235,11 @@ services: - MONGODB_URI=mongodb://agentex-mongodb:27017 - MONGODB_DATABASE_NAME=agentex - AGENTEX_SERVER_TASK_QUEUE=agentex-server + - RETENTION_CLEANUP_ENABLED=${RETENTION_CLEANUP_ENABLED:-false} + - RETENTION_CLEANUP_AGENT_ALLOWLIST=${RETENTION_CLEANUP_AGENT_ALLOWLIST:-} + - RETENTION_CLEANUP_IDLE_DAYS=${RETENTION_CLEANUP_IDLE_DAYS:-7} + - RETENTION_CLEANUP_PAGE_SIZE=${RETENTION_CLEANUP_PAGE_SIZE:-200} + - RETENTION_CLEANUP_MAX_IN_FLIGHT=${RETENTION_CLEANUP_MAX_IN_FLIGHT:-20} volumes: - .:/app:cached depends_on: diff --git a/agentex/src/config/environment_variables.py b/agentex/src/config/environment_variables.py index 2d41740a..7d1f2a57 100644 --- a/agentex/src/config/environment_variables.py +++ b/agentex/src/config/environment_variables.py @@ -58,6 +58,12 @@ class EnvVarKeys(str, Enum): AGENTEX_SERVER_TASK_QUEUE = "AGENTEX_SERVER_TASK_QUEUE" ENABLE_HEALTH_CHECK_WORKFLOW = "ENABLE_HEALTH_CHECK_WORKFLOW" WEBHOOK_REQUEST_TIMEOUT = "WEBHOOK_REQUEST_TIMEOUT" + RETENTION_CLEANUP_ENABLED = "RETENTION_CLEANUP_ENABLED" + RETENTION_CLEANUP_AGENT_ALLOWLIST = "RETENTION_CLEANUP_AGENT_ALLOWLIST" + RETENTION_CLEANUP_IDLE_DAYS = "RETENTION_CLEANUP_IDLE_DAYS" + RETENTION_CLEANUP_CRON = "RETENTION_CLEANUP_CRON" + RETENTION_CLEANUP_PAGE_SIZE = "RETENTION_CLEANUP_PAGE_SIZE" + RETENTION_CLEANUP_MAX_IN_FLIGHT = "RETENTION_CLEANUP_MAX_IN_FLIGHT" class Environment(str, Enum): @@ -114,6 +120,12 @@ class EnvironmentVariables(BaseModel): AGENTEX_SERVER_TASK_QUEUE: str | None = None ENABLE_HEALTH_CHECK_WORKFLOW: bool = False WEBHOOK_REQUEST_TIMEOUT: float = 15.0 # Webhook request timeout in seconds + RETENTION_CLEANUP_ENABLED: bool = False + RETENTION_CLEANUP_AGENT_ALLOWLIST: list[str] = [] + RETENTION_CLEANUP_IDLE_DAYS: int = 7 + RETENTION_CLEANUP_CRON: str = "0 4 * * *" + RETENTION_CLEANUP_PAGE_SIZE: int = 200 + RETENTION_CLEANUP_MAX_IN_FLIGHT: int = 20 @classmethod def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None: @@ -203,6 +215,28 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None: WEBHOOK_REQUEST_TIMEOUT=float( os.environ.get(EnvVarKeys.WEBHOOK_REQUEST_TIMEOUT, "15.0") ), + RETENTION_CLEANUP_ENABLED=( + os.environ.get(EnvVarKeys.RETENTION_CLEANUP_ENABLED, "false") == "true" + ), + RETENTION_CLEANUP_AGENT_ALLOWLIST=[ + name.strip() + for name in os.environ.get( + EnvVarKeys.RETENTION_CLEANUP_AGENT_ALLOWLIST, "" + ).split(",") + if name.strip() + ], + RETENTION_CLEANUP_IDLE_DAYS=int( + os.environ.get(EnvVarKeys.RETENTION_CLEANUP_IDLE_DAYS, "7") + ), + RETENTION_CLEANUP_CRON=os.environ.get( + EnvVarKeys.RETENTION_CLEANUP_CRON, "0 4 * * *" + ), + RETENTION_CLEANUP_PAGE_SIZE=int( + os.environ.get(EnvVarKeys.RETENTION_CLEANUP_PAGE_SIZE, "200") + ), + RETENTION_CLEANUP_MAX_IN_FLIGHT=int( + os.environ.get(EnvVarKeys.RETENTION_CLEANUP_MAX_IN_FLIGHT, "20") + ), ) refreshed_environment_variables = environment_variables return refreshed_environment_variables diff --git a/agentex/src/domain/repositories/task_repository.py b/agentex/src/domain/repositories/task_repository.py index fd67ea97..03f53d49 100644 --- a/agentex/src/domain/repositories/task_repository.py +++ b/agentex/src/domain/repositories/task_repository.py @@ -1,4 +1,5 @@ from collections.abc import Sequence +from datetime import UTC, datetime, timedelta from typing import Annotated, Literal from fastapi import Depends @@ -94,6 +95,52 @@ async def list_with_join( relationships=relationships, ) + async def list_cleanup_candidate_ids( + self, + *, + idle_days: int, + agent_names: Sequence[str], + after_id: str | None, + limit: int, + ) -> list[str]: + """ + Return ids of tasks eligible for scheduled retention cleanup. + + Cheap, index-friendly PRE-FILTER only — the authoritative idle / status / + unprocessed-events checks live in TaskRetentionService.clean_task. This + deliberately omits a status filter: status is race-prone (a task can flip + to RUNNING between this query and the clean call), so the trustworthy + RUNNING guard is enforced at clean-time. `updated_at < cutoff` is a correct + superset of truly-idle tasks (true idleness also requires the latest Mongo + message to predate the cutoff), so we never under-include. + + Keyset-paginated by id ascending; pass the last returned id as `after_id` + to fetch the next page. Fail-closed: empty `agent_names` returns []. + """ + if not agent_names: + return [] + + cutoff = datetime.now(UTC) - timedelta(days=idle_days) + query = ( + select(TaskORM.id) + .join(TaskAgentORM, TaskORM.id == TaskAgentORM.task_id) + .join(AgentORM, TaskAgentORM.agent_id == AgentORM.id) + .where( + TaskORM.cleaned_at.is_(None), + TaskORM.updated_at < cutoff, + AgentORM.name.in_(agent_names), + ) + .order_by(TaskORM.id.asc()) + .limit(limit) + .distinct() + ) + if after_id is not None: + query = query.where(TaskORM.id > after_id) + + async with self.start_async_db_session(allow_writes=False) as session: + result = await session.execute(query) + return [row[0] for row in result.all()] + async def create(self, agent_id: str, task: TaskEntity) -> TaskEntity: """Create task and establish agent relationships""" diff --git a/agentex/src/domain/use_cases/task_retention_use_case.py b/agentex/src/domain/use_cases/task_retention_use_case.py index ef8ec620..3998a0df 100644 --- a/agentex/src/domain/use_cases/task_retention_use_case.py +++ b/agentex/src/domain/use_cases/task_retention_use_case.py @@ -7,6 +7,7 @@ TaskExportToUrlResultEntity, TaskSnapshotEntity, ) +from src.domain.repositories.task_repository import TaskRepository from src.domain.services.task_retention_service import DTaskRetentionService @@ -20,6 +21,13 @@ class TaskRetentionUseCase: def __init__(self, retention_service: DTaskRetentionService): self.retention_service = retention_service + @property + def task_repository(self) -> TaskRepository: + """Stable accessor for the underlying task repository so callers (e.g. the + Temporal worker) can reuse the same instance without reaching through the + service's internals.""" + return self.retention_service.task_repository + async def export_task(self, task_id: str) -> TaskSnapshotEntity: return await self.retention_service.export_task(task_id) diff --git a/agentex/src/temporal/activities/retention_cleanup_activities.py b/agentex/src/temporal/activities/retention_cleanup_activities.py new file mode 100644 index 00000000..4ebdef03 --- /dev/null +++ b/agentex/src/temporal/activities/retention_cleanup_activities.py @@ -0,0 +1,147 @@ +""" +Temporal activities for the scheduled task-retention cleanup sweep. + +Three activities: +- load_cleanup_config: reads RETENTION_CLEANUP_* env vars at run time so policy + changes take effect on the next scheduled run without recreating the schedule. +- find_cleanup_candidates: cheap pre-filtered, keyset-paginated discovery. +- clean_task: delegates to TaskRetentionUseCase.clean_task; catches ClientError + (the three policy/safety refusals) and maps it to a 'skipped' outcome so the + caller's child workflow completes cleanly. Genuine transient errors propagate + so Temporal retries them. + +Boundary types are JSON-native (the backend data converter does not serialize +Pydantic models). +""" + +from typing import TypedDict + +from src.config.environment_variables import EnvironmentVariables +from src.domain.exceptions import ClientError +from src.domain.repositories.task_repository import TaskRepository +from src.domain.use_cases.task_retention_use_case import TaskRetentionUseCase +from src.utils.logging import make_logger +from temporalio import activity + +logger = make_logger(__name__) + +LOAD_CLEANUP_CONFIG_ACTIVITY = "load_cleanup_config_activity" +FIND_CLEANUP_CANDIDATES_ACTIVITY = "find_cleanup_candidates_activity" +CLEAN_TASK_ACTIVITY = "clean_task_activity" + + +class CleanTaskOutcome(TypedDict): + task_id: str + status: str # "cleaned" | "skipped" + reason: str | None + messages_deleted: int + task_states_deleted: int + events_deleted: int + + +class RetentionCleanupActivities: + def __init__( + self, + task_repository: TaskRepository, + use_case: TaskRetentionUseCase, + ): + self.task_repository = task_repository + self.use_case = use_case + + @activity.defn(name=LOAD_CLEANUP_CONFIG_ACTIVITY) + async def load_cleanup_config(self) -> dict: + """ + Read the current retention-cleanup policy from the environment. + + Policy (allowlist, idle threshold, paging) is intentionally NOT baked into + the Temporal Schedule. The sweep loads it here at run time so changing a + RETENTION_CLEANUP_* env var and restarting the worker takes effect on the + next scheduled run without recreating the schedule. + """ + # Lives on this class (rather than as a free function) only so the worker + # can register it alongside the other activities; it intentionally uses + # none of the injected repositories/use case. + env = EnvironmentVariables.refresh(force_refresh=True) + return { + "enabled": env.RETENTION_CLEANUP_ENABLED, + "idle_days": env.RETENTION_CLEANUP_IDLE_DAYS, + "agent_names": env.RETENTION_CLEANUP_AGENT_ALLOWLIST, + "page_size": env.RETENTION_CLEANUP_PAGE_SIZE, + "max_in_flight": env.RETENTION_CLEANUP_MAX_IN_FLIGHT, + } + + @activity.defn(name=FIND_CLEANUP_CANDIDATES_ACTIVITY) + async def find_cleanup_candidates( + self, + after_id: str | None, + limit: int, + idle_days: int, + agent_names: list[str], + ) -> list[str]: + """ + Return a page of task IDs that are eligible for content cleanup. + + Args: + after_id: Keyset cursor — return only IDs strictly after this value, + or None to start from the beginning. + limit: Maximum number of IDs to return. + idle_days: Minimum number of days a task must have been idle to qualify. + agent_names: Restrict candidates to tasks belonging to these agents. + + Returns: + list[str]: Up to *limit* task IDs ordered by ID, suitable for passing + back as *after_id* on the next page. + """ + logger.info( + "find_cleanup_candidates_started", + extra={"after_id": after_id, "limit": limit}, + ) + result = await self.task_repository.list_cleanup_candidate_ids( + idle_days=idle_days, + agent_names=agent_names, + after_id=after_id, + limit=limit, + ) + logger.info("find_cleanup_candidates_completed", extra={"count": len(result)}) + return result + + @activity.defn(name=CLEAN_TASK_ACTIVITY) + async def clean_task(self, task_id: str, idle_days: int) -> CleanTaskOutcome: + """ + Delete the stored content (messages, states, events) for a single task. + + Args: + task_id: ID of the task to clean. + idle_days: Passed through to the use case for policy checks. + + Returns: + CleanTaskOutcome with ``status`` set to ``"cleaned"`` when content was + deleted, or ``"skipped"`` when the use case refused via ``ClientError`` + (e.g. task is still active, not yet idle long enough, or already + cleaned). Other exceptions propagate so Temporal can retry them. + """ + try: + result = await self.use_case.clean_task( + task_id=task_id, force=False, idle_days=idle_days + ) + return { + "task_id": result.task_id, + "status": "cleaned", + "reason": None, + "messages_deleted": result.messages_deleted, + "task_states_deleted": result.task_states_deleted, + "events_deleted": result.events_deleted, + } + except ClientError as e: + logger.info( + "task_cleanup_skipped", + extra={"task_id": task_id, "reason": str(e)}, + ) + return { + "task_id": task_id, + "status": "skipped", + "reason": str(e), + "messages_deleted": 0, + "task_states_deleted": 0, + "events_deleted": 0, + } diff --git a/agentex/src/temporal/run_retention_cleanup_schedule.py b/agentex/src/temporal/run_retention_cleanup_schedule.py new file mode 100644 index 00000000..6ea789df --- /dev/null +++ b/agentex/src/temporal/run_retention_cleanup_schedule.py @@ -0,0 +1,76 @@ +""" +Create the Temporal Schedule that drives the scheduled task-retention cleanup. + +Runs at startup (mirrors run_healthcheck_workflow.py). No-op unless +RETENTION_CLEANUP_ENABLED is true and Temporal is configured. +Idempotent: if the schedule already exists, it is left as-is. + +The schedule carries NO policy args (no allowlist, idle_days, page_size, or +max_in_flight). Those are read at sweep runtime via the load_cleanup_config +activity so changing a RETENTION_CLEANUP_* env var and restarting the worker +takes effect on the next scheduled run without recreating the schedule. +Only the cron expression and workflow identity are baked into the schedule. + +Fail-closed behaviour is preserved at runtime: if the allowlist is empty the +discovery activity returns no candidates and the sweep completes immediately. +""" + +import asyncio + +from src.adapters.temporal.adapter_temporal import TemporalAdapter +from src.adapters.temporal.client_factory import TemporalClientFactory +from src.adapters.temporal.exceptions import TemporalScheduleAlreadyExistsError +from src.config.dependencies import GlobalDependencies +from src.config.environment_variables import EnvironmentVariables +from src.temporal.run_worker import AGENTEX_SERVER_TASK_QUEUE +from src.temporal.workflows.retention_cleanup_workflow import ( + RetentionCleanupSweepWorkflow, +) +from src.utils.logging import make_logger + +logger = make_logger(__name__) + +SCHEDULE_ID = "retention-cleanup-sweep" +WORKFLOW_ID = "retention-cleanup-sweep" + + +async def main() -> None: + global_dependencies = GlobalDependencies() + await global_dependencies.load() + + env = EnvironmentVariables.refresh() + if not env or not env.RETENTION_CLEANUP_ENABLED: + logger.info("Retention cleanup is not enabled; skipping schedule creation") + return + if not TemporalClientFactory.is_temporal_configured(env): + logger.error("Temporal is not configured; skipping schedule creation") + return + + if not env.RETENTION_CLEANUP_AGENT_ALLOWLIST: + logger.info( + "Retention cleanup agent allowlist is empty; the sweep will discover " + "no candidates at runtime (fail-closed by policy, not by schedule)" + ) + + task_queue = env.AGENTEX_SERVER_TASK_QUEUE or AGENTEX_SERVER_TASK_QUEUE + adapter = TemporalAdapter(temporal_client=global_dependencies.temporal_client) + + try: + await adapter.create_schedule( + schedule_id=SCHEDULE_ID, + workflow=RetentionCleanupSweepWorkflow.run, + workflow_id=WORKFLOW_ID, + args=[], + task_queue=task_queue, + cron_expressions=[env.RETENTION_CLEANUP_CRON], + ) + logger.info( + "Created retention-cleanup schedule (policy read at runtime)", + extra={"cron": env.RETENTION_CLEANUP_CRON}, + ) + except TemporalScheduleAlreadyExistsError: + logger.info("Retention-cleanup schedule already exists; leaving as-is") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/agentex/src/temporal/run_worker.py b/agentex/src/temporal/run_worker.py index d3665094..091e8ca7 100644 --- a/agentex/src/temporal/run_worker.py +++ b/agentex/src/temporal/run_worker.py @@ -14,6 +14,7 @@ from src.adapters.temporal.client_factory import TemporalClientFactory from src.config.dependencies import ( + GlobalDependencies, database_async_read_only_session_maker, database_async_read_write_engine, database_async_read_write_session_maker, @@ -23,7 +24,15 @@ from src.config.environment_variables import EnvironmentVariables from src.domain.repositories.agent_repository import AgentRepository from src.temporal.activities.healthcheck_activities import HealthCheckActivities +from src.temporal.activities.retention_cleanup_activities import ( + RetentionCleanupActivities, +) +from src.temporal.task_retention_factory import build_task_retention_use_case from src.temporal.workflows.healthcheck_workflow import HealthCheckWorkflow +from src.temporal.workflows.retention_cleanup_workflow import ( + RetentionCleanupSweepWorkflow, + RetentionCleanupTaskWorkflow, +) from src.utils.logging import make_logger logger = make_logger(__name__) @@ -121,36 +130,52 @@ async def run_worker( await health_check_worker.shutdown() -def create_health_check_worker( - agent_repo: AgentRepository, http_client: httpx.AsyncClient +def create_agentex_server_worker( + agent_repo: AgentRepository, + http_client: httpx.AsyncClient, + global_dependencies: GlobalDependencies, ) -> asyncio.Task: """ - Create a Health Check worker. + Create the single Temporal worker that serves the `agentex-server` task queue. + + Registers ALL workflows + activities that run on this queue — health checks + AND retention cleanup — in one worker. Workers polling the same task queue + must register the same set of types (the queue is not typed), so these live + together in one worker rather than as separate processes/containers. """ - # Get task queue from environment or use default task_queue = os.environ.get("AGENTEX_SERVER_TASK_QUEUE", AGENTEX_SERVER_TASK_QUEUE) - logger.info("Starting Temporal Health Check Worker") + logger.info("Starting agentex-server Temporal worker") logger.info(f"Task queue: {task_queue}") - # Create activities instance with dependencies health_check_activities = HealthCheckActivities( agent_repo=agent_repo, - http_client=httpx_client(), + http_client=http_client, ) - # Extract activity methods - activities = [ - health_check_activities.check_status_activity, - health_check_activities.update_agent_status_activity, - ] + retention_use_case = build_task_retention_use_case(global_dependencies) + # Reuse the repository the factory already built (avoids a duplicate + # TaskRepository) via the use case's stable accessor. + retention_activities = RetentionCleanupActivities( + task_repository=retention_use_case.task_repository, + use_case=retention_use_case, + ) - # Create and run worker task return asyncio.create_task( run_worker( task_queue=task_queue, - workflows=[HealthCheckWorkflow], - activities=activities, + workflows=[ + HealthCheckWorkflow, + RetentionCleanupSweepWorkflow, + RetentionCleanupTaskWorkflow, + ], + activities=[ + health_check_activities.check_status_activity, + health_check_activities.update_agent_status_activity, + retention_activities.load_cleanup_config, + retention_activities.find_cleanup_candidates, + retention_activities.clean_task, + ], max_workers=50, max_concurrent_activities=50, ) @@ -158,23 +183,22 @@ def create_health_check_worker( async def main() -> None: - """ - Main entry point for the Health Check worker. - """ + """Main entry point for the agentex-server Temporal worker.""" try: - # Initialize global dependencies for this thread await startup_global_dependencies() - # Create session maker + global_dependencies = GlobalDependencies() + engine = database_async_read_write_engine() session_maker = database_async_read_write_session_maker(engine) read_only_session_maker = database_async_read_only_session_maker(engine) agent_repo = AgentRepository(session_maker, read_only_session_maker) - health_check_worker_task = create_health_check_worker( + + worker_task = create_agentex_server_worker( agent_repo=agent_repo, http_client=httpx_client(), + global_dependencies=global_dependencies, ) - # Wait for the worker to complete - await health_check_worker_task + await worker_task except KeyboardInterrupt: logger.info("Received interrupt signal, shutting down worker...") diff --git a/agentex/src/temporal/task_retention_factory.py b/agentex/src/temporal/task_retention_factory.py new file mode 100644 index 00000000..0f9290f6 --- /dev/null +++ b/agentex/src/temporal/task_retention_factory.py @@ -0,0 +1,63 @@ +""" +Construct a TaskRetentionUseCase outside FastAPI's Depends DI, for use inside +Temporal worker processes. Mirrors the manual-wiring pattern in +run_healthcheck_workflow.py (repositories built from session makers). +""" + +from src.adapters.temporal.adapter_temporal import TemporalAdapter +from src.config.dependencies import ( + GlobalDependencies, + database_async_read_only_session_maker, + database_async_read_write_engine, + database_async_read_write_session_maker, + httpx_client, +) +from src.domain.repositories.agent_task_tracker_repository import ( + AgentTaskTrackerRepository, +) +from src.domain.repositories.event_repository import EventRepository +from src.domain.repositories.task_message_repository import TaskMessageRepository +from src.domain.repositories.task_repository import TaskRepository +from src.domain.repositories.task_state_repository import TaskStateRepository +from src.domain.services.task_message_service import TaskMessageService +from src.domain.services.task_retention_service import TaskRetentionService +from src.domain.use_cases.task_retention_use_case import TaskRetentionUseCase + + +def build_task_retention_use_case( + global_dependencies: GlobalDependencies, +) -> TaskRetentionUseCase: + """Wire a TaskRetentionUseCase from an already-loaded GlobalDependencies.""" + engine = database_async_read_write_engine() + rw_session_maker = database_async_read_write_session_maker(engine) + ro_session_maker = database_async_read_only_session_maker(engine) + + task_repository = TaskRepository(rw_session_maker, ro_session_maker) + event_repository = EventRepository(rw_session_maker, ro_session_maker) + agent_task_tracker_repository = AgentTaskTrackerRepository( + rw_session_maker, ro_session_maker + ) + + task_message_repository = TaskMessageRepository( + global_dependencies.mongodb_database + ) + task_state_repository = TaskStateRepository(global_dependencies.mongodb_database) + task_message_service = TaskMessageService( + message_repository=task_message_repository + ) + + temporal_adapter = TemporalAdapter( + temporal_client=global_dependencies.temporal_client + ) + + retention_service = TaskRetentionService( + task_repository=task_repository, + task_message_service=task_message_service, + task_message_repository=task_message_repository, + task_state_repository=task_state_repository, + event_repository=event_repository, + agent_task_tracker_repository=agent_task_tracker_repository, + temporal_adapter=temporal_adapter, + httpx_client=httpx_client(), + ) + return TaskRetentionUseCase(retention_service=retention_service) diff --git a/agentex/src/temporal/workflows/retention_cleanup_workflow.py b/agentex/src/temporal/workflows/retention_cleanup_workflow.py new file mode 100644 index 00000000..4bc8cd32 --- /dev/null +++ b/agentex/src/temporal/workflows/retention_cleanup_workflow.py @@ -0,0 +1,130 @@ +""" +Scheduled task-retention cleanup workflows. + +RetentionCleanupSweepWorkflow: started by a Temporal Schedule. Pulls one page of +candidate task ids, fans out one child workflow per task (bounded by +max_in_flight), aggregates cleaned/skipped/failed counts, then continue_as_new's +to the next page so workflow history stays bounded regardless of backlog size. + +RetentionCleanupTaskWorkflow: per-task child. Invokes the clean activity, which +already maps the policy/safety ClientError refusals to a 'skipped' outcome; only +genuine transient errors surface as activity failures (and are retried). +""" + +import asyncio +from datetime import timedelta + +from src.temporal.activities.retention_cleanup_activities import ( + CLEAN_TASK_ACTIVITY, + FIND_CLEANUP_CANDIDATES_ACTIVITY, + LOAD_CLEANUP_CONFIG_ACTIVITY, +) +from src.utils.logging import make_logger +from temporalio import workflow +from temporalio.common import RetryPolicy + +logger = make_logger(__name__) + + +def _chunked(items: list[str], size: int) -> list[list[str]]: + return [items[i : i + size] for i in range(0, len(items), size)] + + +@workflow.defn +class RetentionCleanupTaskWorkflow: + @workflow.run + async def run(self, args: dict) -> dict: + return await workflow.execute_activity( + CLEAN_TASK_ACTIVITY, + args=[args["task_id"], args["idle_days"]], + start_to_close_timeout=timedelta(seconds=60), + retry_policy=RetryPolicy( + maximum_attempts=3, + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + ), + ) + + +@workflow.defn +class RetentionCleanupSweepWorkflow: + @workflow.run + async def run(self, args: dict | None = None) -> dict: + args = args or {} + + # First page of a sweep: load policy from env (via activity) and carry it + # across continue_as_new pages so a single sweep stays consistent even if + # env changes mid-run. Subsequent pages already have it in args. + if "idle_days" not in args: + config = await workflow.execute_activity( + LOAD_CLEANUP_CONFIG_ACTIVITY, + start_to_close_timeout=timedelta(seconds=15), + retry_policy=RetryPolicy( + maximum_attempts=3, + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + ), + ) + args = {**args, **config} + + idle_days = args["idle_days"] + agent_names = args["agent_names"] + page_size = args.get("page_size", 200) + max_in_flight = args.get("max_in_flight", 20) + after_id = args.get("after_id") + totals = args.get("totals", {"cleaned": 0, "skipped": 0, "failed": 0}) + + if not args.get("enabled", True): + logger.info("retention_cleanup_sweep_disabled", extra=totals) + return totals + + task_ids = await workflow.execute_activity( + FIND_CLEANUP_CANDIDATES_ACTIVITY, + args=[after_id, page_size, idle_days, agent_names], + start_to_close_timeout=timedelta(seconds=30), + retry_policy=RetryPolicy( + maximum_attempts=3, + initial_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + ), + ) + + if not task_ids: + logger.info("retention_cleanup_sweep_completed", extra=totals) + return totals + + # Scope child workflow IDs to this run so a task re-discovered in a later + # sweep (e.g. one that was skipped) doesn't collide with a prior cycle's + # completed child under a REJECT_DUPLICATE workflow-id-reuse policy. + sweep_run_id = workflow.info().run_id[:8] + for batch in _chunked(task_ids, max_in_flight): + results = await asyncio.gather( + *[ + workflow.execute_child_workflow( + RetentionCleanupTaskWorkflow.run, + {"task_id": task_id, "idle_days": idle_days}, + id=f"retention-cleanup-task-{sweep_run_id}-{task_id}", + retry_policy=RetryPolicy(maximum_attempts=1), + ) + for task_id in batch + ], + return_exceptions=True, + ) + for result in results: + if isinstance(result, BaseException): + totals["failed"] += 1 + else: + status = result.get("status", "failed") + totals[status] = totals.get(status, 0) + 1 + + workflow.continue_as_new( + arg={ + "enabled": args.get("enabled", True), + "idle_days": idle_days, + "agent_names": agent_names, + "page_size": page_size, + "max_in_flight": max_in_flight, + "after_id": task_ids[-1], + "totals": totals, + } + ) diff --git a/agentex/tests/integration/test_retention_cleanup_discovery.py b/agentex/tests/integration/test_retention_cleanup_discovery.py new file mode 100644 index 00000000..bccab933 --- /dev/null +++ b/agentex/tests/integration/test_retention_cleanup_discovery.py @@ -0,0 +1,115 @@ +from datetime import UTC, datetime, timedelta + +import pytest +from sqlalchemy import insert +from src.adapters.orm import AgentORM, TaskAgentORM, TaskORM +from src.domain.entities.agents import AgentStatus +from src.domain.entities.tasks import TaskStatus + + +async def _seed_agent(session, agent_id: str, name: str) -> None: + await session.execute( + insert(AgentORM).values( + id=agent_id, + name=name, + description="seed", + acp_url=f"http://{agent_id}:8000", + acp_type="sync", + status=AgentStatus.READY, + ) + ) + + +async def _seed_task( + session, + *, + task_id: str, + agent_id: str, + updated_at: datetime, + cleaned_at: datetime | None, + status: TaskStatus = TaskStatus.COMPLETED, +) -> None: + await session.execute( + insert(TaskORM).values( + id=task_id, + name=task_id, + status=status, + updated_at=updated_at, + cleaned_at=cleaned_at, + ) + ) + await session.execute( + insert(TaskAgentORM).values(task_id=task_id, agent_id=agent_id) + ) + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_discovery_filters_and_keyset_paging(isolated_repositories): + repo = isolated_repositories["task_repository"] + now = datetime.now(UTC) + old = now - timedelta(days=30) + + async with isolated_repositories["postgres_rw_session_factory"]() as session: + await _seed_agent(session, "agent-allowed", "allowed-agent") + await _seed_agent(session, "agent-other", "other-agent") + await _seed_task( + session, + task_id="t-aaa", + agent_id="agent-allowed", + updated_at=old, + cleaned_at=None, + ) + await _seed_task( + session, + task_id="t-bbb", + agent_id="agent-allowed", + updated_at=old, + cleaned_at=None, + ) + await _seed_task( + session, + task_id="t-fresh", + agent_id="agent-allowed", + updated_at=now, + cleaned_at=None, + ) + await _seed_task( + session, + task_id="t-clean", + agent_id="agent-allowed", + updated_at=old, + cleaned_at=old, + ) + await _seed_task( + session, + task_id="t-other", + agent_id="agent-other", + updated_at=old, + cleaned_at=None, + ) + await session.commit() + + ids = await repo.list_cleanup_candidate_ids( + idle_days=7, agent_names=["allowed-agent"], after_id=None, limit=100 + ) + assert ids == ["t-aaa", "t-bbb"] + + page1 = await repo.list_cleanup_candidate_ids( + idle_days=7, agent_names=["allowed-agent"], after_id=None, limit=1 + ) + assert page1 == ["t-aaa"] + page2 = await repo.list_cleanup_candidate_ids( + idle_days=7, agent_names=["allowed-agent"], after_id="t-aaa", limit=1 + ) + assert page2 == ["t-bbb"] + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_discovery_empty_allowlist_returns_nothing(isolated_repositories): + repo = isolated_repositories["task_repository"] + ids = await repo.list_cleanup_candidate_ids( + idle_days=7, agent_names=[], after_id=None, limit=100 + ) + assert ids == [] diff --git a/agentex/tests/unit/config/test_retention_cleanup_env.py b/agentex/tests/unit/config/test_retention_cleanup_env.py new file mode 100644 index 00000000..15c2791d --- /dev/null +++ b/agentex/tests/unit/config/test_retention_cleanup_env.py @@ -0,0 +1,43 @@ +import pytest +from src.config.environment_variables import EnvironmentVariables + + +@pytest.mark.unit +def test_retention_cleanup_env_parses_enabled_and_allowlist(monkeypatch): + monkeypatch.setenv("RETENTION_CLEANUP_ENABLED", "true") + monkeypatch.setenv("RETENTION_CLEANUP_AGENT_ALLOWLIST", "agent-a, agent-b ,agent-c") + monkeypatch.setenv("RETENTION_CLEANUP_IDLE_DAYS", "14") + monkeypatch.setenv("RETENTION_CLEANUP_CRON", "0 3 * * *") + monkeypatch.setenv("RETENTION_CLEANUP_PAGE_SIZE", "50") + monkeypatch.setenv("RETENTION_CLEANUP_MAX_IN_FLIGHT", "5") + + env = EnvironmentVariables.refresh(force_refresh=True) + + assert env.RETENTION_CLEANUP_ENABLED is True + assert env.RETENTION_CLEANUP_AGENT_ALLOWLIST == ["agent-a", "agent-b", "agent-c"] + assert env.RETENTION_CLEANUP_IDLE_DAYS == 14 + assert env.RETENTION_CLEANUP_CRON == "0 3 * * *" + assert env.RETENTION_CLEANUP_PAGE_SIZE == 50 + assert env.RETENTION_CLEANUP_MAX_IN_FLIGHT == 5 + + +@pytest.mark.unit +def test_retention_cleanup_env_defaults(monkeypatch): + for key in ( + "RETENTION_CLEANUP_ENABLED", + "RETENTION_CLEANUP_AGENT_ALLOWLIST", + "RETENTION_CLEANUP_IDLE_DAYS", + "RETENTION_CLEANUP_CRON", + "RETENTION_CLEANUP_PAGE_SIZE", + "RETENTION_CLEANUP_MAX_IN_FLIGHT", + ): + monkeypatch.delenv(key, raising=False) + + env = EnvironmentVariables.refresh(force_refresh=True) + + assert env.RETENTION_CLEANUP_ENABLED is False + assert env.RETENTION_CLEANUP_AGENT_ALLOWLIST == [] # fail-closed + assert env.RETENTION_CLEANUP_IDLE_DAYS == 7 + assert env.RETENTION_CLEANUP_CRON == "0 4 * * *" + assert env.RETENTION_CLEANUP_PAGE_SIZE == 200 + assert env.RETENTION_CLEANUP_MAX_IN_FLIGHT == 20 diff --git a/agentex/tests/unit/temporal/__init__.py b/agentex/tests/unit/temporal/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/agentex/tests/unit/temporal/test_retention_cleanup_activities.py b/agentex/tests/unit/temporal/test_retention_cleanup_activities.py new file mode 100644 index 00000000..34362e6f --- /dev/null +++ b/agentex/tests/unit/temporal/test_retention_cleanup_activities.py @@ -0,0 +1,101 @@ +from datetime import UTC, datetime +from unittest.mock import AsyncMock + +import pytest +from src.domain.entities.task_retention import TaskCleanupResultEntity +from src.domain.exceptions import ClientError +from src.temporal.activities.retention_cleanup_activities import ( + RetentionCleanupActivities, +) + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_find_cleanup_candidates_delegates_to_repo(): + repo = AsyncMock() + repo.list_cleanup_candidate_ids.return_value = ["t1", "t2"] + activities = RetentionCleanupActivities(task_repository=repo, use_case=AsyncMock()) + + result = await activities.find_cleanup_candidates( + after_id=None, limit=200, idle_days=7, agent_names=["a"] + ) + + assert result == ["t1", "t2"] + repo.list_cleanup_candidate_ids.assert_awaited_once_with( + idle_days=7, agent_names=["a"], after_id=None, limit=200 + ) + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_clean_task_cleaned_outcome(): + use_case = AsyncMock() + use_case.clean_task.return_value = TaskCleanupResultEntity( + task_id="t1", + cleaned_at=datetime.now(UTC), + messages_deleted=3, + task_states_deleted=1, + events_deleted=2, + ) + activities = RetentionCleanupActivities( + task_repository=AsyncMock(), use_case=use_case + ) + + outcome = await activities.clean_task(task_id="t1", idle_days=7) + + assert outcome["status"] == "cleaned" + assert outcome["task_id"] == "t1" + assert outcome["messages_deleted"] == 3 + use_case.clean_task.assert_awaited_once_with(task_id="t1", force=False, idle_days=7) + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_clean_task_clienterror_maps_to_skipped(): + use_case = AsyncMock() + use_case.clean_task.side_effect = ClientError( + "Cannot clean task t1: status is RUNNING (active)" + ) + activities = RetentionCleanupActivities( + task_repository=AsyncMock(), use_case=use_case + ) + + outcome = await activities.clean_task(task_id="t1", idle_days=7) + + assert outcome["status"] == "skipped" + assert "RUNNING" in outcome["reason"] + assert outcome["task_id"] == "t1" + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_clean_task_unexpected_error_propagates(): + use_case = AsyncMock() + use_case.clean_task.side_effect = RuntimeError("mongo timeout") + activities = RetentionCleanupActivities( + task_repository=AsyncMock(), use_case=use_case + ) + + with pytest.raises(RuntimeError): + await activities.clean_task(task_id="t1", idle_days=7) + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_load_cleanup_config_reads_env(monkeypatch): + monkeypatch.setenv("RETENTION_CLEANUP_ENABLED", "true") + monkeypatch.setenv("RETENTION_CLEANUP_AGENT_ALLOWLIST", "x,y") + monkeypatch.setenv("RETENTION_CLEANUP_IDLE_DAYS", "9") + monkeypatch.setenv("RETENTION_CLEANUP_PAGE_SIZE", "33") + monkeypatch.setenv("RETENTION_CLEANUP_MAX_IN_FLIGHT", "4") + activities = RetentionCleanupActivities( + task_repository=AsyncMock(), use_case=AsyncMock() + ) + config = await activities.load_cleanup_config() + assert config == { + "enabled": True, + "idle_days": 9, + "agent_names": ["x", "y"], + "page_size": 33, + "max_in_flight": 4, + } diff --git a/agentex/tests/unit/temporal/test_retention_cleanup_workflow.py b/agentex/tests/unit/temporal/test_retention_cleanup_workflow.py new file mode 100644 index 00000000..7c89bea6 --- /dev/null +++ b/agentex/tests/unit/temporal/test_retention_cleanup_workflow.py @@ -0,0 +1,155 @@ +import uuid + +import pytest +from src.temporal.activities.retention_cleanup_activities import ( + CLEAN_TASK_ACTIVITY, + FIND_CLEANUP_CANDIDATES_ACTIVITY, + LOAD_CLEANUP_CONFIG_ACTIVITY, +) +from src.temporal.workflows.retention_cleanup_workflow import ( + RetentionCleanupSweepWorkflow, + RetentionCleanupTaskWorkflow, +) +from temporalio import activity +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import UnsandboxedWorkflowRunner, Worker + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_sweep_cleans_all_pages_and_aggregates(): + pages = {None: ["t1", "t2"], "t2": ["t3"], "t3": []} + + @activity.defn(name=FIND_CLEANUP_CANDIDATES_ACTIVITY) + async def fake_find(after_id, limit, idle_days, agent_names) -> list[str]: + return pages[after_id] + + @activity.defn(name=CLEAN_TASK_ACTIVITY) + async def fake_clean(task_id: str, idle_days: int) -> dict: + if task_id == "t2": + return { + "task_id": task_id, + "status": "skipped", + "reason": "RUNNING", + "messages_deleted": 0, + "task_states_deleted": 0, + "events_deleted": 0, + } + if task_id == "t3": + raise RuntimeError("permanent failure") + return { + "task_id": task_id, + "status": "cleaned", + "reason": None, + "messages_deleted": 1, + "task_states_deleted": 0, + "events_deleted": 0, + } + + async with await WorkflowEnvironment.start_time_skipping() as env: + async with Worker( + env.client, + task_queue="test-retention", + workflows=[RetentionCleanupSweepWorkflow, RetentionCleanupTaskWorkflow], + activities=[fake_find, fake_clean], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + summary = await env.client.execute_workflow( + RetentionCleanupSweepWorkflow.run, + { + "idle_days": 7, + "agent_names": ["a"], + "page_size": 2, + "max_in_flight": 2, + }, + id=f"sweep-{uuid.uuid4()}", + task_queue="test-retention", + ) + + assert summary["cleaned"] == 1 # t1 + assert summary["skipped"] == 1 # t2 + assert summary["failed"] == 1 # t3 + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_sweep_loads_config_from_activity_when_no_args(): + pages = {None: ["t1"], "t1": []} + + @activity.defn(name=LOAD_CLEANUP_CONFIG_ACTIVITY) + async def fake_load() -> dict: + return { + "idle_days": 7, + "agent_names": ["a"], + "page_size": 2, + "max_in_flight": 2, + } + + @activity.defn(name=FIND_CLEANUP_CANDIDATES_ACTIVITY) + async def fake_find(after_id, limit, idle_days, agent_names) -> list[str]: + assert agent_names == ["a"] # policy from load activity flowed through + return pages[after_id] + + @activity.defn(name=CLEAN_TASK_ACTIVITY) + async def fake_clean(task_id: str, idle_days: int) -> dict: + return { + "task_id": task_id, + "status": "cleaned", + "reason": None, + "messages_deleted": 1, + "task_states_deleted": 0, + "events_deleted": 0, + } + + async with await WorkflowEnvironment.start_time_skipping() as env: + async with Worker( + env.client, + task_queue="test-retention-load", + workflows=[RetentionCleanupSweepWorkflow, RetentionCleanupTaskWorkflow], + activities=[fake_load, fake_find, fake_clean], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + summary = await env.client.execute_workflow( + RetentionCleanupSweepWorkflow.run, + id=f"sweep-{uuid.uuid4()}", + task_queue="test-retention-load", + ) + assert summary["cleaned"] == 1 + + +@pytest.mark.unit +@pytest.mark.asyncio +async def test_sweep_noops_when_runtime_config_disabled(): + @activity.defn(name=LOAD_CLEANUP_CONFIG_ACTIVITY) + async def fake_load() -> dict: + return { + "enabled": False, + "idle_days": 7, + "agent_names": ["a"], + "page_size": 2, + "max_in_flight": 2, + } + + @activity.defn(name=FIND_CLEANUP_CANDIDATES_ACTIVITY) + async def fake_find(after_id, limit, idle_days, agent_names) -> list[str]: + raise AssertionError("disabled cleanup should not discover candidates") + + @activity.defn(name=CLEAN_TASK_ACTIVITY) + async def fake_clean(task_id: str, idle_days: int) -> dict: + raise AssertionError("disabled cleanup should not clean tasks") + + async with await WorkflowEnvironment.start_time_skipping() as env: + async with Worker( + env.client, + task_queue="test-retention-disabled", + workflows=[RetentionCleanupSweepWorkflow, RetentionCleanupTaskWorkflow], + activities=[fake_load, fake_find, fake_clean], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + summary = await env.client.execute_workflow( + RetentionCleanupSweepWorkflow.run, + id=f"sweep-{uuid.uuid4()}", + task_queue="test-retention-disabled", + ) + + assert summary == {"cleaned": 0, "skipped": 0, "failed": 0}