Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
eee2117
docs(retention): design for scheduled task-retention cleanup workflow
smoreinis Jun 3, 2026
7336eb5
docs(retention): implementation plan for scheduled cleanup workflow
smoreinis Jun 3, 2026
b2e86b7
feat(retention): add scheduled-cleanup configuration env vars
smoreinis Jun 3, 2026
38c46b3
feat(retention): add keyset-paginated cleanup-candidate discovery query
smoreinis Jun 3, 2026
20f19c0
refactor(retention): use keyword arg for read-only session in discove…
smoreinis Jun 3, 2026
9ef7e5f
feat(retention): add worker-context factory for TaskRetentionUseCase
smoreinis Jun 3, 2026
488106c
feat(retention): add cleanup discovery + clean activities
smoreinis Jun 3, 2026
20cd737
refactor(retention): typed clean outcome + discovery logging
smoreinis Jun 3, 2026
557e3d8
feat(retention): add sweep + per-task cleanup workflows
smoreinis Jun 3, 2026
9c00819
feat(retention): register cleanup workflows + activities on worker
smoreinis Jun 3, 2026
fcf82c6
refactor(retention): reuse factory-built task repository in cleanup w…
smoreinis Jun 3, 2026
616085a
feat(retention): add cleanup schedule bootstrap script
smoreinis Jun 3, 2026
11c0406
feat(retention): wire cleanup worker + schedule bootstrap for local dev
smoreinis Jun 3, 2026
0815518
feat(retention): load cleanup policy at runtime instead of baking int…
smoreinis Jun 3, 2026
652ad55
docs(retention): clarify why load_cleanup_config lives on the activit…
smoreinis Jun 3, 2026
0832994
docs(retention): update design for runtime policy loading
smoreinis Jun 3, 2026
c9ebdda
fix(retention): honor cleanup flag at runtime
smoreinis Jun 4, 2026
935f7cd
refactor(retention): serve cleanup on the single agentex-server worker
smoreinis Jun 4, 2026
76cdf52
fix(retention): pass RETENTION_CLEANUP_ENABLED to the worker for the …
smoreinis Jun 4, 2026
45802a7
fix(retention): address greptile review findings
smoreinis Jun 4, 2026
9df40a6
Merge branch 'main' into feat/retention-scheduled-cleanup
smoreinis Jun 4, 2026
34989ad
Merge branch 'main' into feat/retention-scheduled-cleanup
smoreinis Jun 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions agentex/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ services:
- ALLOWED_ORIGINS=http://localhost:3000
- OTEL_EXPORTER_OTLP_ENDPOINT=http://agentex-otel-collector:4317
- OTEL_SERVICE_NAME=agentex-api
- RETENTION_CLEANUP_ENABLED=${RETENTION_CLEANUP_ENABLED:-false}
- RETENTION_CLEANUP_AGENT_ALLOWLIST=${RETENTION_CLEANUP_AGENT_ALLOWLIST:-}
- RETENTION_CLEANUP_IDLE_DAYS=${RETENTION_CLEANUP_IDLE_DAYS:-7}
- RETENTION_CLEANUP_CRON=${RETENTION_CLEANUP_CRON:-0 4 * * *}
- RETENTION_CLEANUP_PAGE_SIZE=${RETENTION_CLEANUP_PAGE_SIZE:-200}
- RETENTION_CLEANUP_MAX_IN_FLIGHT=${RETENTION_CLEANUP_MAX_IN_FLIGHT:-20}
ports:
- "5003:5003"
volumes:
Expand Down Expand Up @@ -200,6 +206,7 @@ services:
alembic upgrade head &&
popd &&
python src/temporal/run_healthcheck_workflow.py &&
python src/temporal/run_retention_cleanup_schedule.py &&
echo 'Starting API server...' &&
uvicorn src.api.app:app --host 0.0.0.0 --port 5003 --reload --reload-dir /app/src --reload-include '*.py' --workers 1
"
Expand Down Expand Up @@ -228,6 +235,11 @@ services:
- MONGODB_URI=mongodb://agentex-mongodb:27017
- MONGODB_DATABASE_NAME=agentex
- AGENTEX_SERVER_TASK_QUEUE=agentex-server
- RETENTION_CLEANUP_ENABLED=${RETENTION_CLEANUP_ENABLED:-false}
- RETENTION_CLEANUP_AGENT_ALLOWLIST=${RETENTION_CLEANUP_AGENT_ALLOWLIST:-}
- RETENTION_CLEANUP_IDLE_DAYS=${RETENTION_CLEANUP_IDLE_DAYS:-7}
- RETENTION_CLEANUP_PAGE_SIZE=${RETENTION_CLEANUP_PAGE_SIZE:-200}
- RETENTION_CLEANUP_MAX_IN_FLIGHT=${RETENTION_CLEANUP_MAX_IN_FLIGHT:-20}
volumes:
- .:/app:cached
depends_on:
Expand Down
34 changes: 34 additions & 0 deletions agentex/src/config/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ class EnvVarKeys(str, Enum):
AGENTEX_SERVER_TASK_QUEUE = "AGENTEX_SERVER_TASK_QUEUE"
ENABLE_HEALTH_CHECK_WORKFLOW = "ENABLE_HEALTH_CHECK_WORKFLOW"
WEBHOOK_REQUEST_TIMEOUT = "WEBHOOK_REQUEST_TIMEOUT"
RETENTION_CLEANUP_ENABLED = "RETENTION_CLEANUP_ENABLED"
RETENTION_CLEANUP_AGENT_ALLOWLIST = "RETENTION_CLEANUP_AGENT_ALLOWLIST"
RETENTION_CLEANUP_IDLE_DAYS = "RETENTION_CLEANUP_IDLE_DAYS"
RETENTION_CLEANUP_CRON = "RETENTION_CLEANUP_CRON"
RETENTION_CLEANUP_PAGE_SIZE = "RETENTION_CLEANUP_PAGE_SIZE"
RETENTION_CLEANUP_MAX_IN_FLIGHT = "RETENTION_CLEANUP_MAX_IN_FLIGHT"


class Environment(str, Enum):
Expand Down Expand Up @@ -114,6 +120,12 @@ class EnvironmentVariables(BaseModel):
AGENTEX_SERVER_TASK_QUEUE: str | None = None
ENABLE_HEALTH_CHECK_WORKFLOW: bool = False
WEBHOOK_REQUEST_TIMEOUT: float = 15.0 # Webhook request timeout in seconds
RETENTION_CLEANUP_ENABLED: bool = False
RETENTION_CLEANUP_AGENT_ALLOWLIST: list[str] = []
RETENTION_CLEANUP_IDLE_DAYS: int = 7
RETENTION_CLEANUP_CRON: str = "0 4 * * *"
RETENTION_CLEANUP_PAGE_SIZE: int = 200
RETENTION_CLEANUP_MAX_IN_FLIGHT: int = 20

@classmethod
def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
Expand Down Expand Up @@ -203,6 +215,28 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
WEBHOOK_REQUEST_TIMEOUT=float(
os.environ.get(EnvVarKeys.WEBHOOK_REQUEST_TIMEOUT, "15.0")
),
RETENTION_CLEANUP_ENABLED=(
os.environ.get(EnvVarKeys.RETENTION_CLEANUP_ENABLED, "false") == "true"
),
RETENTION_CLEANUP_AGENT_ALLOWLIST=[
name.strip()
for name in os.environ.get(
EnvVarKeys.RETENTION_CLEANUP_AGENT_ALLOWLIST, ""
).split(",")
if name.strip()
],
RETENTION_CLEANUP_IDLE_DAYS=int(
os.environ.get(EnvVarKeys.RETENTION_CLEANUP_IDLE_DAYS, "7")
),
RETENTION_CLEANUP_CRON=os.environ.get(
EnvVarKeys.RETENTION_CLEANUP_CRON, "0 4 * * *"
),
RETENTION_CLEANUP_PAGE_SIZE=int(
os.environ.get(EnvVarKeys.RETENTION_CLEANUP_PAGE_SIZE, "200")
),
RETENTION_CLEANUP_MAX_IN_FLIGHT=int(
os.environ.get(EnvVarKeys.RETENTION_CLEANUP_MAX_IN_FLIGHT, "20")
),
)
refreshed_environment_variables = environment_variables
return refreshed_environment_variables
Expand Down
47 changes: 47 additions & 0 deletions agentex/src/domain/repositories/task_repository.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections.abc import Sequence
from datetime import UTC, datetime, timedelta
from typing import Annotated, Literal

from fastapi import Depends
Expand Down Expand Up @@ -94,6 +95,52 @@ async def list_with_join(
relationships=relationships,
)

async def list_cleanup_candidate_ids(
self,
*,
idle_days: int,
agent_names: Sequence[str],
after_id: str | None,
limit: int,
) -> list[str]:
"""
Return ids of tasks eligible for scheduled retention cleanup.

Cheap, index-friendly PRE-FILTER only — the authoritative idle / status /
unprocessed-events checks live in TaskRetentionService.clean_task. This
deliberately omits a status filter: status is race-prone (a task can flip
to RUNNING between this query and the clean call), so the trustworthy
RUNNING guard is enforced at clean-time. `updated_at < cutoff` is a correct
superset of truly-idle tasks (true idleness also requires the latest Mongo
message to predate the cutoff), so we never under-include.

Keyset-paginated by id ascending; pass the last returned id as `after_id`
to fetch the next page. Fail-closed: empty `agent_names` returns [].
"""
if not agent_names:
return []

cutoff = datetime.now(UTC) - timedelta(days=idle_days)
query = (
select(TaskORM.id)
.join(TaskAgentORM, TaskORM.id == TaskAgentORM.task_id)
.join(AgentORM, TaskAgentORM.agent_id == AgentORM.id)
.where(
TaskORM.cleaned_at.is_(None),
TaskORM.updated_at < cutoff,
AgentORM.name.in_(agent_names),
)
.order_by(TaskORM.id.asc())
.limit(limit)
.distinct()
)
if after_id is not None:
query = query.where(TaskORM.id > after_id)

async with self.start_async_db_session(allow_writes=False) as session:
result = await session.execute(query)
return [row[0] for row in result.all()]

async def create(self, agent_id: str, task: TaskEntity) -> TaskEntity:
"""Create task and establish agent relationships"""

Expand Down
8 changes: 8 additions & 0 deletions agentex/src/domain/use_cases/task_retention_use_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
TaskExportToUrlResultEntity,
TaskSnapshotEntity,
)
from src.domain.repositories.task_repository import TaskRepository
from src.domain.services.task_retention_service import DTaskRetentionService


Expand All @@ -20,6 +21,13 @@ class TaskRetentionUseCase:
def __init__(self, retention_service: DTaskRetentionService):
self.retention_service = retention_service

@property
def task_repository(self) -> TaskRepository:
"""Stable accessor for the underlying task repository so callers (e.g. the
Temporal worker) can reuse the same instance without reaching through the
service's internals."""
return self.retention_service.task_repository

async def export_task(self, task_id: str) -> TaskSnapshotEntity:
return await self.retention_service.export_task(task_id)

Expand Down
147 changes: 147 additions & 0 deletions agentex/src/temporal/activities/retention_cleanup_activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""
Temporal activities for the scheduled task-retention cleanup sweep.

Three activities:
- load_cleanup_config: reads RETENTION_CLEANUP_* env vars at run time so policy
changes take effect on the next scheduled run without recreating the schedule.
- find_cleanup_candidates: cheap pre-filtered, keyset-paginated discovery.
- clean_task: delegates to TaskRetentionUseCase.clean_task; catches ClientError
(the three policy/safety refusals) and maps it to a 'skipped' outcome so the
caller's child workflow completes cleanly. Genuine transient errors propagate
so Temporal retries them.

Boundary types are JSON-native (the backend data converter does not serialize
Pydantic models).
"""

from typing import TypedDict

from src.config.environment_variables import EnvironmentVariables
from src.domain.exceptions import ClientError
from src.domain.repositories.task_repository import TaskRepository
from src.domain.use_cases.task_retention_use_case import TaskRetentionUseCase
from src.utils.logging import make_logger
from temporalio import activity

logger = make_logger(__name__)

LOAD_CLEANUP_CONFIG_ACTIVITY = "load_cleanup_config_activity"
FIND_CLEANUP_CANDIDATES_ACTIVITY = "find_cleanup_candidates_activity"
CLEAN_TASK_ACTIVITY = "clean_task_activity"


class CleanTaskOutcome(TypedDict):
task_id: str
status: str # "cleaned" | "skipped"
reason: str | None
messages_deleted: int
task_states_deleted: int
events_deleted: int


class RetentionCleanupActivities:
def __init__(
self,
task_repository: TaskRepository,
use_case: TaskRetentionUseCase,
):
self.task_repository = task_repository
self.use_case = use_case

@activity.defn(name=LOAD_CLEANUP_CONFIG_ACTIVITY)
async def load_cleanup_config(self) -> dict:
"""
Read the current retention-cleanup policy from the environment.

Policy (allowlist, idle threshold, paging) is intentionally NOT baked into
the Temporal Schedule. The sweep loads it here at run time so changing a
RETENTION_CLEANUP_* env var and restarting the worker takes effect on the
next scheduled run without recreating the schedule.
"""
# Lives on this class (rather than as a free function) only so the worker
# can register it alongside the other activities; it intentionally uses
# none of the injected repositories/use case.
env = EnvironmentVariables.refresh(force_refresh=True)
return {
"enabled": env.RETENTION_CLEANUP_ENABLED,
"idle_days": env.RETENTION_CLEANUP_IDLE_DAYS,
"agent_names": env.RETENTION_CLEANUP_AGENT_ALLOWLIST,
"page_size": env.RETENTION_CLEANUP_PAGE_SIZE,
"max_in_flight": env.RETENTION_CLEANUP_MAX_IN_FLIGHT,
}

@activity.defn(name=FIND_CLEANUP_CANDIDATES_ACTIVITY)
async def find_cleanup_candidates(
self,
after_id: str | None,
limit: int,
idle_days: int,
agent_names: list[str],
) -> list[str]:
"""
Return a page of task IDs that are eligible for content cleanup.

Args:
after_id: Keyset cursor — return only IDs strictly after this value,
or None to start from the beginning.
limit: Maximum number of IDs to return.
idle_days: Minimum number of days a task must have been idle to qualify.
agent_names: Restrict candidates to tasks belonging to these agents.

Returns:
list[str]: Up to *limit* task IDs ordered by ID, suitable for passing
back as *after_id* on the next page.
"""
logger.info(
"find_cleanup_candidates_started",
extra={"after_id": after_id, "limit": limit},
)
result = await self.task_repository.list_cleanup_candidate_ids(
idle_days=idle_days,
agent_names=agent_names,
after_id=after_id,
limit=limit,
)
logger.info("find_cleanup_candidates_completed", extra={"count": len(result)})
return result

@activity.defn(name=CLEAN_TASK_ACTIVITY)
async def clean_task(self, task_id: str, idle_days: int) -> CleanTaskOutcome:
"""
Delete the stored content (messages, states, events) for a single task.

Args:
task_id: ID of the task to clean.
idle_days: Passed through to the use case for policy checks.

Returns:
CleanTaskOutcome with ``status`` set to ``"cleaned"`` when content was
deleted, or ``"skipped"`` when the use case refused via ``ClientError``
(e.g. task is still active, not yet idle long enough, or already
cleaned). Other exceptions propagate so Temporal can retry them.
"""
try:
result = await self.use_case.clean_task(
task_id=task_id, force=False, idle_days=idle_days
)
return {
"task_id": result.task_id,
"status": "cleaned",
"reason": None,
"messages_deleted": result.messages_deleted,
"task_states_deleted": result.task_states_deleted,
"events_deleted": result.events_deleted,
}
except ClientError as e:
logger.info(
"task_cleanup_skipped",
extra={"task_id": task_id, "reason": str(e)},
)
return {
"task_id": task_id,
"status": "skipped",
"reason": str(e),
"messages_deleted": 0,
"task_states_deleted": 0,
"events_deleted": 0,
}
76 changes: 76 additions & 0 deletions agentex/src/temporal/run_retention_cleanup_schedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""
Create the Temporal Schedule that drives the scheduled task-retention cleanup.

Runs at startup (mirrors run_healthcheck_workflow.py). No-op unless
RETENTION_CLEANUP_ENABLED is true and Temporal is configured.
Idempotent: if the schedule already exists, it is left as-is.

The schedule carries NO policy args (no allowlist, idle_days, page_size, or
max_in_flight). Those are read at sweep runtime via the load_cleanup_config
activity so changing a RETENTION_CLEANUP_* env var and restarting the worker
takes effect on the next scheduled run without recreating the schedule.
Only the cron expression and workflow identity are baked into the schedule.

Fail-closed behaviour is preserved at runtime: if the allowlist is empty the
discovery activity returns no candidates and the sweep completes immediately.
"""

import asyncio

from src.adapters.temporal.adapter_temporal import TemporalAdapter
from src.adapters.temporal.client_factory import TemporalClientFactory
from src.adapters.temporal.exceptions import TemporalScheduleAlreadyExistsError
from src.config.dependencies import GlobalDependencies
from src.config.environment_variables import EnvironmentVariables
from src.temporal.run_worker import AGENTEX_SERVER_TASK_QUEUE
from src.temporal.workflows.retention_cleanup_workflow import (
RetentionCleanupSweepWorkflow,
)
from src.utils.logging import make_logger

logger = make_logger(__name__)

SCHEDULE_ID = "retention-cleanup-sweep"
WORKFLOW_ID = "retention-cleanup-sweep"


async def main() -> None:
global_dependencies = GlobalDependencies()
await global_dependencies.load()

env = EnvironmentVariables.refresh()
if not env or not env.RETENTION_CLEANUP_ENABLED:
logger.info("Retention cleanup is not enabled; skipping schedule creation")
return
if not TemporalClientFactory.is_temporal_configured(env):
logger.error("Temporal is not configured; skipping schedule creation")
return

if not env.RETENTION_CLEANUP_AGENT_ALLOWLIST:
logger.info(
"Retention cleanup agent allowlist is empty; the sweep will discover "
"no candidates at runtime (fail-closed by policy, not by schedule)"
)

task_queue = env.AGENTEX_SERVER_TASK_QUEUE or AGENTEX_SERVER_TASK_QUEUE
adapter = TemporalAdapter(temporal_client=global_dependencies.temporal_client)

try:
await adapter.create_schedule(
schedule_id=SCHEDULE_ID,
workflow=RetentionCleanupSweepWorkflow.run,
workflow_id=WORKFLOW_ID,
args=[],
task_queue=task_queue,
cron_expressions=[env.RETENTION_CLEANUP_CRON],
)
logger.info(
"Created retention-cleanup schedule (policy read at runtime)",
extra={"cron": env.RETENTION_CLEANUP_CRON},
)
except TemporalScheduleAlreadyExistsError:
logger.info("Retention-cleanup schedule already exists; leaving as-is")


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading