fix(messages): stamp agent messages with workflow.now() for monotonic ordering#356
Merged
Merged
Conversation
… ordering Pairs with scaleapi/scale-agentex#233 (server now respects caller-supplied created_at). Without an SDK-side timestamp, the user-echo and the assistant- shell messages.create calls can land at the server within the same ms, and the assistant turn sorts before the user message that triggered it on reload. - MessagesModule.create / create_batch auto-inject workflow.now() inside a Temporal workflow; caller-supplied values are respected; outside workflows remains None so the server wall-clock applies (sync / plain-async agents). - CreateMessageParams / CreateMessagesBatchParams and MessagesService thread created_at to the SDK client (omit sentinel when None). - StreamingTaskMessageContext + factory accept created_at and forward to the initial messages.create at open(). - Auto-send dispatchers (adk.providers.litellm / .openai) capture workflow.now() at the workflow->activity boundary and thread it through ChatCompletion* AutoSendParams / RunAgent(Streamed)AutoSendParams. Inside the activity the timestamp is consumed once on the first streaming context per turn; later contexts in the same turn fall back to server wall-clock (naturally separated by network/processing latency plus PR 233's ms-stagger). - workflow_now_if_in_workflow() helper in lib.utils.temporal. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…reaming test The streaming test was querying client.states.list immediately after create_task returned, before the agent's on_task_create handler ran on the server. The sibling test_send_event_and_poll handles this with a 1s sleep; the streaming variant was missing it and was passing by luck on faster runners. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
smoreinis
approved these changes
May 13, 2026
- Drop redundant in_temporal_workflow() guard in MessagesModule.create / create_batch; workflow_now_if_in_workflow() already returns None outside a workflow. - Extract the inline _take_created_at closure in providers/openai.py into a module-level _make_created_at_dispenser helper, eliminating the duplicated block between run_agent_auto_send and run_agent_streamed_auto_send. - Drop @pytest.mark.asyncio from tests/lib/adk/test_messages_module.py to match the project's asyncio_mode="auto" convention used in the sibling test_messages_service.py. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Pairs with scaleapi/scale-agentex#233 (server now respects caller-supplied
created_at). Without an SDK-side timestamp, the workflow-side user-echo and the activity-side assistant-shellmessages.createcalls can land at the server within the same millisecond, and the assistant turn sorts before the user message that triggered it on reload. This PR closes the gap on the SDK side.MessagesModule.create/create_batchauto-injectworkflow.now()inside a Temporal workflow; caller-supplied values are respected; outside workflows the value staysNoneso the server wall-clock applies (sync / plain-async agents are unaffected).CreateMessageParams/CreateMessagesBatchParamsandMessagesServicethreadcreated_atto the SDK client (omitsentinel whenNone).StreamingTaskMessageContext+streaming_task_message_contextfactory acceptcreated_atand forward to the initialmessages.createinopen().adk.providers.litellm/.openai) captureworkflow.now()at the workflow→activity boundary and thread it throughChatCompletion*AutoSendParams/RunAgent(Streamed)AutoSendParams. Inside the activity the timestamp is consumed once on the first streaming context per turn — later contexts in the same turn naturally separate via network/processing latency plus PR 233's ms-stagger.workflow_now_if_in_workflow()helper inlib.utils.temporal.Test plan
rye run pytest tests/lib --ignore=tests/lib/test_payload_codec.py— 266 passed, 2 skipped (the ignored file has pre-existing failures unrelated to this change)rye run ruff checkon every touched file — cleanrye run pyrighton every touched src file — clean (fixed an unrelated ruff-format regression that had moved 4# type: ignorecomments off-target inproviders/openai.py)tests/lib/test_temporal_utils.py—workflow_now_if_in_workflowhelper, inside/outside workflowtests/lib/test_auto_send_params_created_at.py— every auto-send + messages activity param model acceptscreated_attests/lib/adk/test_messages_module.py—MessagesModuleauto-injectsworkflow.now()inside workflow, respects caller-supplied values, omits outside workflowtests/lib/adk/test_messages_service.py— service forwardscreated_atto the SDK client and passesomitwhenNonetests/lib/core/services/adk/test_streaming.pywith twoStreamingTaskMessageContext.open()cases (with and withoutcreated_at)🤖 Generated with Claude Code
Greptile Summary
workflow_now_if_in_workflow()to stamp messages with Temporal's deterministic clock at the workflow→activity boundary, fixing a race condition where assistant messages could sort before user messages on reload.created_atthroughMessagesModule,MessagesService,StreamingTaskMessageContext, and all auto-send provider paths (LiteLLM and OpenAI); uses anomitsentinel whenNoneso outside-workflow agents are unaffected._make_created_at_dispenserclosure in the OpenAI service so only the first streaming context per activity invocation receives the workflow timestamp; subsequent messages rely on server wall-clock + server-side ms-stagger from PR making testing better #233.Confidence Score: 5/5
Safe to merge — no correctness, security, or data-integrity issues found.
The change is a clean, layered threading of an optional created_at field. The helper correctly delegates to workflow.now() only inside a Temporal workflow, the omit sentinel is applied consistently so outside-workflow agents are unaffected, and the single-fire dispenser in the OpenAI service correctly stamps only the first message per turn. Tests cover all key branches. No P1 or P0 findings.
No files require special attention.
Important Files Changed
Sequence Diagram
sequenceDiagram participant W as Temporal Workflow participant MM as MessagesModule participant AA as ActivityHelpers participant MA as MessagesActivities participant MS as MessagesService participant SDK as Agentex SDK Client W->>MM: messages.create(task_id, content) MM->>MM: "created_at = workflow_now_if_in_workflow()" MM->>AA: "execute_activity(CreateMessageParams{created_at})" AA->>MA: create_message(params) MA->>MS: create_message(task_id, content, created_at) MS->>SDK: messages.create(task_id, content, created_at) SDK-->>W: TaskMessage Note over W,SDK: LiteLLM/OpenAI auto-send path W->>W: "params = AutoSendParams(created_at=workflow_now_if_in_workflow())" W->>AA: execute_activity(params) AA->>MS: chat_completion_auto_send(created_at) MS->>MS: _make_created_at_dispenser(created_at) MS->>SDK: "messages.create(created_at=workflow_now) [first msg]" MS->>SDK: "messages.create(created_at=omit) [subsequent msgs]"Reviews (2): Last reviewed commit: "refactor: address PR review comments on ..." | Re-trigger Greptile