Skip to content

fix(messages): stamp agent messages with workflow.now() for monotonic ordering#356

Merged
declan-scale merged 3 commits into
nextfrom
declan-scale/consistent-timestamps
May 13, 2026
Merged

fix(messages): stamp agent messages with workflow.now() for monotonic ordering#356
declan-scale merged 3 commits into
nextfrom
declan-scale/consistent-timestamps

Conversation

@declan-scale
Copy link
Copy Markdown
Contributor

@declan-scale declan-scale commented May 13, 2026

Summary

Pairs with scaleapi/scale-agentex#233 (server now respects caller-supplied created_at). Without an SDK-side timestamp, the workflow-side user-echo and the activity-side assistant-shell messages.create calls can land at the server within the same millisecond, and the assistant turn sorts before the user message that triggered it on reload. This PR closes the gap on the SDK side.

  • MessagesModule.create / create_batch auto-inject workflow.now() inside a Temporal workflow; caller-supplied values are respected; outside workflows the value stays None so the server wall-clock applies (sync / plain-async agents are unaffected).
  • CreateMessageParams / CreateMessagesBatchParams and MessagesService thread created_at to the SDK client (omit sentinel when None).
  • StreamingTaskMessageContext + streaming_task_message_context factory accept created_at and forward to the initial messages.create in open().
  • Auto-send dispatchers (adk.providers.litellm / .openai) capture workflow.now() at the workflow→activity boundary and thread it through ChatCompletion*AutoSendParams / RunAgent(Streamed)AutoSendParams. Inside the activity the timestamp is consumed once on the first streaming context per turn — later contexts in the same turn naturally separate via network/processing latency plus PR 233's ms-stagger.
  • workflow_now_if_in_workflow() helper in lib.utils.temporal.

Test plan

  • rye run pytest tests/lib --ignore=tests/lib/test_payload_codec.py — 266 passed, 2 skipped (the ignored file has pre-existing failures unrelated to this change)
  • rye run ruff check on every touched file — clean
  • rye run pyright on every touched src file — clean (fixed an unrelated ruff-format regression that had moved 4 # type: ignore comments off-target in providers/openai.py)
  • New tests:
    • tests/lib/test_temporal_utils.pyworkflow_now_if_in_workflow helper, inside/outside workflow
    • tests/lib/test_auto_send_params_created_at.py — every auto-send + messages activity param model accepts created_at
    • tests/lib/adk/test_messages_module.pyMessagesModule auto-injects workflow.now() inside workflow, respects caller-supplied values, omits outside workflow
    • tests/lib/adk/test_messages_service.py — service forwards created_at to the SDK client and passes omit when None
    • extends tests/lib/core/services/adk/test_streaming.py with two StreamingTaskMessageContext.open() cases (with and without created_at)

🤖 Generated with Claude Code

Greptile Summary

  • Introduces workflow_now_if_in_workflow() to stamp messages with Temporal's deterministic clock at the workflow→activity boundary, fixing a race condition where assistant messages could sort before user messages on reload.
  • Threads created_at through MessagesModule, MessagesService, StreamingTaskMessageContext, and all auto-send provider paths (LiteLLM and OpenAI); uses an omit sentinel when None so outside-workflow agents are unaffected.
  • Uses a single-fire _make_created_at_dispenser closure in the OpenAI service so only the first streaming context per activity invocation receives the workflow timestamp; subsequent messages rely on server wall-clock + server-side ms-stagger from PR making testing better #233.

Confidence Score: 5/5

Safe to merge — no correctness, security, or data-integrity issues found.

The change is a clean, layered threading of an optional created_at field. The helper correctly delegates to workflow.now() only inside a Temporal workflow, the omit sentinel is applied consistently so outside-workflow agents are unaffected, and the single-fire dispenser in the OpenAI service correctly stamps only the first message per turn. Tests cover all key branches. No P1 or P0 findings.

No files require special attention.

Important Files Changed

Filename Overview
src/agentex/lib/utils/temporal.py Adds workflow_now_if_in_workflow() — returns workflow.now() inside a Temporal workflow, None elsewhere. Simple, correct, well-tested.
src/agentex/lib/adk/_modules/messages.py Auto-injects workflow_now_if_in_workflow() into created_at for both create and create_batch; respects caller-supplied values; correctly threads through both the activity-dispatch and direct-service-call branches.
src/agentex/lib/core/services/adk/providers/openai.py Adds _make_created_at_dispenser closure to stamp only the first streaming context per turn; correctly applied across all streaming context creation sites in both run_agent_auto_send and run_agent_streamed_auto_send.
src/agentex/lib/core/services/adk/streaming.py Adds created_at to StreamingTaskMessageContext.init and forwards it via the omit sentinel in open().
src/agentex/lib/core/services/adk/messages.py Forwards created_at to the SDK client with omit sentinel when None; logic is consistent across create_message and create_messages_batch.
src/agentex/lib/core/temporal/activities/adk/messages_activities.py Adds created_at: datetime
tests/lib/adk/test_messages_module.py New tests covering auto-injection inside workflow, no injection outside workflow, and respect for caller-supplied timestamps.
tests/lib/core/services/adk/test_streaming.py Extends existing streaming tests with two new cases validating created_at forwarding and omit sentinel in StreamingTaskMessageContext.open().

Sequence Diagram

sequenceDiagram
    participant W as Temporal Workflow
    participant MM as MessagesModule
    participant AA as ActivityHelpers
    participant MA as MessagesActivities
    participant MS as MessagesService
    participant SDK as Agentex SDK Client

    W->>MM: messages.create(task_id, content)
    MM->>MM: "created_at = workflow_now_if_in_workflow()"
    MM->>AA: "execute_activity(CreateMessageParams{created_at})"
    AA->>MA: create_message(params)
    MA->>MS: create_message(task_id, content, created_at)
    MS->>SDK: messages.create(task_id, content, created_at)
    SDK-->>W: TaskMessage

    Note over W,SDK: LiteLLM/OpenAI auto-send path
    W->>W: "params = AutoSendParams(created_at=workflow_now_if_in_workflow())"
    W->>AA: execute_activity(params)
    AA->>MS: chat_completion_auto_send(created_at)
    MS->>MS: _make_created_at_dispenser(created_at)
    MS->>SDK: "messages.create(created_at=workflow_now) [first msg]"
    MS->>SDK: "messages.create(created_at=omit) [subsequent msgs]"
Loading

Reviews (2): Last reviewed commit: "refactor: address PR review comments on ..." | Re-trigger Greptile

declan-scale and others added 2 commits May 13, 2026 13:03
… ordering

Pairs with scaleapi/scale-agentex#233 (server now respects caller-supplied
created_at). Without an SDK-side timestamp, the user-echo and the assistant-
shell messages.create calls can land at the server within the same ms, and
the assistant turn sorts before the user message that triggered it on reload.

- MessagesModule.create / create_batch auto-inject workflow.now() inside a
  Temporal workflow; caller-supplied values are respected; outside workflows
  remains None so the server wall-clock applies (sync / plain-async agents).
- CreateMessageParams / CreateMessagesBatchParams and MessagesService thread
  created_at to the SDK client (omit sentinel when None).
- StreamingTaskMessageContext + factory accept created_at and forward to the
  initial messages.create at open().
- Auto-send dispatchers (adk.providers.litellm / .openai) capture workflow.now()
  at the workflow->activity boundary and thread it through ChatCompletion*
  AutoSendParams / RunAgent(Streamed)AutoSendParams. Inside the activity the
  timestamp is consumed once on the first streaming context per turn; later
  contexts in the same turn fall back to server wall-clock (naturally separated
  by network/processing latency plus PR 233's ms-stagger).
- workflow_now_if_in_workflow() helper in lib.utils.temporal.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…reaming test

The streaming test was querying client.states.list immediately after
create_task returned, before the agent's on_task_create handler ran on the
server. The sibling test_send_event_and_poll handles this with a 1s sleep;
the streaming variant was missing it and was passing by luck on faster
runners.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Drop redundant in_temporal_workflow() guard in MessagesModule.create /
  create_batch; workflow_now_if_in_workflow() already returns None outside
  a workflow.
- Extract the inline _take_created_at closure in providers/openai.py into
  a module-level _make_created_at_dispenser helper, eliminating the
  duplicated block between run_agent_auto_send and run_agent_streamed_auto_send.
- Drop @pytest.mark.asyncio from tests/lib/adk/test_messages_module.py to
  match the project's asyncio_mode="auto" convention used in the sibling
  test_messages_service.py.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@declan-scale declan-scale merged commit afe5265 into next May 13, 2026
38 checks passed
@declan-scale declan-scale deleted the declan-scale/consistent-timestamps branch May 13, 2026 20:31
@stainless-app stainless-app Bot mentioned this pull request May 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants