Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""add_task_creator_columns

Revision ID: a1f73ada66c5
Revises: 6c942325c828
Create Date: 2026-05-21 15:08:51.441535

"""

from collections.abc import Sequence

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "a1f73ada66c5"
down_revision: str | None = "6c942325c828"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
op.add_column("tasks", sa.Column("creator_user_id", sa.String(), nullable=True))
op.add_column(
"tasks", sa.Column("creator_service_account_id", sa.String(), nullable=True)
)
with op.get_context().autocommit_block():
# Partial indexes — the columns are NULL for all pre-migration rows and
# remain majority-NULL for legacy traffic indefinitely. A WHERE clause
# keeps the indexes scoped to populated rows so we don't pay storage or
# write amplification for NULL entries.
op.execute(
"CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_tasks_creator_user_id "
"ON tasks (creator_user_id) WHERE creator_user_id IS NOT NULL"
)
op.execute(
"CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_tasks_creator_service_account_id "
"ON tasks (creator_service_account_id) "
"WHERE creator_service_account_id IS NOT NULL"
)
# Add the CHECK as NOT VALID so the brief ACCESS EXCLUSIVE lock doesn't have to
# wait on an existence scan, then VALIDATE under SHARE UPDATE EXCLUSIVE which
# doesn't block concurrent reads/writes. `tasks` is a high-write table; even the
# short ACCESS EXCLUSIVE held during a vanilla CHECK addition queues behind
# in-flight transactions and blocks readers until it releases.
#
# Each ALTER must commit before the next one runs — otherwise Alembic's
# default single-transaction wrapper holds the ACCESS EXCLUSIVE from
# `NOT VALID` straight through the `VALIDATE` scan, collapsing the
# two-statement split into one long blocking window. Use autocommit_block
# to release locks between statements.
with op.get_context().autocommit_block():
op.execute(
"ALTER TABLE tasks ADD CONSTRAINT ck_tasks_at_most_one_creator "
"CHECK ((creator_user_id IS NULL) OR (creator_service_account_id IS NULL)) "
"NOT VALID"
)
op.execute("ALTER TABLE tasks VALIDATE CONSTRAINT ck_tasks_at_most_one_creator")


def downgrade() -> None:
op.drop_constraint("ck_tasks_at_most_one_creator", "tasks", type_="check")
with op.get_context().autocommit_block():
op.execute(
"DROP INDEX CONCURRENTLY IF EXISTS ix_tasks_creator_service_account_id"
)
op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_tasks_creator_user_id")
op.drop_column("tasks", "creator_service_account_id")
op.drop_column("tasks", "creator_user_id")
3 changes: 2 additions & 1 deletion agentex/database/migrations/migration_history.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
a9959ebcbe98 -> 6c942325c828 (head), adding task cleaned at
6c942325c828 -> a1f73ada66c5 (head), add_task_creator_columns
a9959ebcbe98 -> 6c942325c828, adding task cleaned at
e9c4ff9e6542 -> a9959ebcbe98, finalize_spans_task_id
9ff3ee32c81b -> e9c4ff9e6542, add_tasks_metadata_gin_index
57c5ed4f59ae -> 9ff3ee32c81b, uppercase deployment status enum labels
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,35 @@ async def list_resources(
)
return response["items"]

async def register_resource(
self,
principal: AgentexAuthPrincipalContext,
resource: AgentexResource,
parent: AgentexResource | None = None,
) -> None:
payload: dict = {
"principal": principal,
"resource": resource.model_dump(),
}
if parent is not None:
payload["parent"] = parent.model_dump()
await HttpRequestHandler.post_with_error_handling(
self.agentex_auth_url, "/v1/authz/register", json=payload
)

async def deregister_resource(
self,
principal: AgentexAuthPrincipalContext,
resource: AgentexResource,
) -> None:
payload = {
"principal": principal,
"resource": resource.model_dump(),
}
await HttpRequestHandler.post_with_error_handling(
self.agentex_auth_url, "/v1/authz/deregister", json=payload
)


DAgentexAuthorization = Annotated[
AgentexAuthorizationProxy, Depends(AgentexAuthorizationProxy)
Expand Down
28 changes: 28 additions & 0 deletions agentex/src/adapters/authorization/port.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,31 @@ async def list_resources(
filter_operation: AuthorizedOperationType = AuthorizedOperationType.read,
) -> Iterable[str]:
"""List resource_ids for a given principal"""

@abstractmethod
async def register_resource(
self,
principal: PrincipalT,
resource: AgentexResource,
parent: AgentexResource | None = None,
) -> None:
"""Register a newly-created resource in the authorization graph.

Atomically writes the relation tuples the schema requires (tenant +
owner, plus an optional typed parent like ``task.parent_agent``).
Distinct from ``grant`` because ``grant`` writes a single role
relation, which is insufficient for schemas that gate access on
``tenant->membership``.
"""

@abstractmethod
async def deregister_resource(
self,
principal: PrincipalT,
resource: AgentexResource,
) -> None:
"""Deregister a resource being deleted from the authorization graph.

Removes every relation tuple written for the resource — keeps the
graph in sync with the application database on row delete.
"""
11 changes: 11 additions & 0 deletions agentex/src/adapters/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
JSON,
BigInteger,
Boolean,
CheckConstraint,
Column,
DateTime,
ForeignKey,
Expand Down Expand Up @@ -75,13 +76,23 @@ class TaskORM(BaseORM):
cleaned_at = Column(DateTime(timezone=True), nullable=True)
params = Column(JSONB, nullable=True)
task_metadata = Column(JSONB, nullable=True)
# NB: the runtime DB indexes are partial (`WHERE … IS NOT NULL`) — see
# migration a1f73ada66c5. SQLAlchemy's declarative `index=True` can only
# express a full index, so the ORM and migration intentionally differ on
# the WHERE clause; the migration's index wins.
creator_user_id = Column(String, nullable=True, index=True)
creator_service_account_id = Column(String, nullable=True, index=True)
# Many-to-Many relationship with agents
agents = relationship("AgentORM", secondary="task_agents", back_populates="tasks")

# Indexes for efficient querying
__table_args__ = (
# Index for filtering tasks by status (used in list queries)
Index("ix_tasks_status", "status"),
CheckConstraint(
"creator_user_id IS NULL OR creator_service_account_id IS NULL",
name="ck_tasks_at_most_one_creator",
),
)


Expand Down
9 changes: 9 additions & 0 deletions agentex/src/config/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class EnvVarKeys(str, Enum):
AGENTEX_SERVER_TASK_QUEUE = "AGENTEX_SERVER_TASK_QUEUE"
ENABLE_HEALTH_CHECK_WORKFLOW = "ENABLE_HEALTH_CHECK_WORKFLOW"
WEBHOOK_REQUEST_TIMEOUT = "WEBHOOK_REQUEST_TIMEOUT"
FGAC_TASKS_DUAL_WRITE = "FGAC_TASKS_DUAL_WRITE"


class Environment(str, Enum):
Expand Down Expand Up @@ -114,6 +115,10 @@ class EnvironmentVariables(BaseModel):
AGENTEX_SERVER_TASK_QUEUE: str | None = None
ENABLE_HEALTH_CHECK_WORKFLOW: bool = False
WEBHOOK_REQUEST_TIMEOUT: float = 15.0 # Webhook request timeout in seconds
# AGX1-274: gate the task FGAC dual-write call sites. Off by default so
# rollout is operator-controlled per environment. Mirrors KB's
# ``FGAC_KNOWLEDGE_BASES_DUAL_WRITE`` shape.
FGAC_TASKS_DUAL_WRITE: bool = False

@classmethod
def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
Expand Down Expand Up @@ -203,6 +208,10 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
WEBHOOK_REQUEST_TIMEOUT=float(
os.environ.get(EnvVarKeys.WEBHOOK_REQUEST_TIMEOUT, "15.0")
),
FGAC_TASKS_DUAL_WRITE=(
os.environ.get(EnvVarKeys.FGAC_TASKS_DUAL_WRITE, "false").lower()
== "true"
),
)
refreshed_environment_variables = environment_variables
return refreshed_environment_variables
Expand Down
8 changes: 8 additions & 0 deletions agentex/src/domain/entities/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ class TaskEntity(BaseModel):
None,
title="Task metadata",
)
creator_user_id: str | None = Field(
None,
title="Identity ID of the user who created this task",
)
creator_service_account_id: str | None = Field(
None,
title="Service identity ID of the service account that created this task",
)

# allow extra fields for agents relationships
model_config = ConfigDict(extra="allow")
Expand Down
94 changes: 85 additions & 9 deletions agentex/src/domain/services/authorization_service.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections.abc import Iterable
from typing import Annotated
from typing import Annotated, Any

from fastapi import Depends, Request

Expand All @@ -17,6 +17,11 @@

logger = make_logger(__name__)

# Sentinel for "caller did not pass an explicit principal_context" — None is a
# valid value (the bypass path logs ``for principal None``) so it can't be the
# default. Using a named object reads better at the call site than ``...``.
_UNSET: Any = object()


class AuthorizationService:
def __init__(
Expand All @@ -40,7 +45,11 @@ def is_enabled(self) -> bool:
return self.enabled

async def grant(
self, resource: AgentexResource, *, commit: bool = True, principal_context=...
self,
resource: AgentexResource,
*,
commit: bool = True,
principal_context: Any = _UNSET,
) -> None:
if self._bypass():
logger.info(
Expand All @@ -57,15 +66,19 @@ async def grant(
)
result = await self.gateway.grant(
principal_context
if principal_context is not ...
if principal_context is not _UNSET
else self.principal_context,
resource,
AuthorizedOperationType.create,
)
return result

async def revoke(
self, resource: AgentexResource, *, commit: bool = True, principal_context=...
self,
resource: AgentexResource,
*,
commit: bool = True,
principal_context: Any = _UNSET,
) -> None:
if self._bypass():
logger.info("Authorization bypassed for revoke operation")
Expand All @@ -81,7 +94,7 @@ async def revoke(

result = await self.gateway.revoke(
principal_context
if principal_context is not ...
if principal_context is not _UNSET
else self.principal_context,
resource,
AuthorizedOperationType.delete,
Expand All @@ -96,7 +109,7 @@ async def check(
resource: AgentexResource,
operation: AuthorizedOperationType,
*,
principal_context=...,
principal_context: Any = _UNSET,
) -> bool:
if self._bypass():
logger.info("Authorization bypassed for check operation")
Expand All @@ -105,7 +118,7 @@ async def check(
# Determine which principal context to use
effective_principal = (
principal_context
if principal_context is not ...
if principal_context is not _UNSET
else self.principal_context
)

Expand Down Expand Up @@ -157,12 +170,75 @@ async def check(
)
return result

async def register_resource(
self,
resource: AgentexResource,
*,
parent: AgentexResource | None = None,
principal_context: Any = _UNSET,
) -> None:
"""Register a freshly-created resource with the authorization graph.

Used immediately after persisting a new row to write the tenant +
owner (and optionally typed parent) relation tuples atomically.
Distinct from ``grant`` because ``grant`` only writes a single
role relation, which is insufficient for schemas (e.g. ``task``)
that require a ``tenant->membership`` gate.
"""
if self._bypass():
logger.info(
f"Authorization bypassed for register_resource on {resource.type}:{resource.selector}"
)
return None

logger.info(
"[authorization_service] Registering resource %s:%s for principal %s (parent=%s)",
resource.type,
resource.selector,
self.principal_context,
parent,
)
await self.gateway.register_resource(
principal_context
if principal_context is not _UNSET
else self.principal_context,
resource,
parent,
)

async def deregister_resource(
self,
resource: AgentexResource,
*,
principal_context: Any = _UNSET,
) -> None:
"""Remove every relation tuple written for the resource — used when
deleting the underlying database row."""
if self._bypass():
logger.info(
f"Authorization bypassed for deregister_resource on {resource.type}:{resource.selector}"
)
return None

logger.info(
"[authorization_service] Deregistering resource %s:%s for principal %s",
resource.type,
resource.selector,
self.principal_context,
)
await self.gateway.deregister_resource(
principal_context
if principal_context is not _UNSET
else self.principal_context,
resource,
)

async def list_resources(
self,
filter_resource: AgentexResourceType,
filter_operation: AuthorizedOperationType = AuthorizedOperationType.read,
*,
principal_context=...,
principal_context: Any = _UNSET,
) -> Iterable[str] | None:
"""List resource identifiers for which the current principal has *filter_operation* permission."""

Expand All @@ -178,7 +254,7 @@ async def list_resources(
)
result = await self.gateway.list_resources(
principal_context
if principal_context is not ...
if principal_context is not _UNSET
else self.principal_context,
filter_resource,
filter_operation,
Expand Down
Loading
Loading