-
Notifications
You must be signed in to change notification settings - Fork 107
Expand file tree
/
Copy pathworkflow.py
More file actions
88 lines (69 loc) · 2.58 KB
/
workflow.py
File metadata and controls
88 lines (69 loc) · 2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""ReAct agent using the LangGraph Functional API with Temporal.
The Functional API expresses the ReAct loop as a plain `while` loop, making
the control flow explicit and easy to extend.
"""
from datetime import timedelta
from langgraph.func import entrypoint, task
from temporalio import workflow
from temporalio.contrib.langgraph import entrypoint as temporal_entrypoint
@task
def agent_think(query: str, history: list[str]) -> dict:
"""The agent decides the next action based on query and tool history.
In production, replace this with an LLM call (e.g., Claude with tools).
"""
tool_results = [h for h in history if h.startswith("[Tool]")]
if len(tool_results) == 0:
return {
"action": "tool",
"tool_name": "get_weather",
"tool_input": "San Francisco",
}
elif len(tool_results) == 1:
return {
"action": "tool",
"tool_name": "get_population",
"tool_input": "San Francisco",
}
else:
facts = "; ".join(tool_results)
return {
"action": "final",
"answer": (f"Here's what I found about San Francisco: {facts}"),
}
@task
def execute_tool(tool_name: str, tool_input: str) -> str:
"""Execute a tool by name. In production, dispatch to real implementations."""
tool_registry = {
"get_weather": lambda inp: f"[Tool] Weather in {inp}: 72°F and sunny.",
"get_population": lambda inp: f"[Tool] {inp} population: ~870,000 residents.",
}
handler = tool_registry.get(tool_name)
if handler:
return handler(tool_input)
return f"[Tool] Unknown tool: {tool_name}"
@entrypoint()
async def react_agent_entrypoint(query: str) -> dict:
"""ReAct agent loop: think -> act -> observe -> repeat."""
history: list[str] = []
while True:
decision = await agent_think(query, history)
if decision["action"] == "final":
return {"answer": decision["answer"], "steps": len(history)}
result = await execute_tool(decision["tool_name"], decision["tool_input"])
history.append(result)
all_tasks = [agent_think, execute_tool]
activity_options = {
"agent_think": {
"execute_in": "activity",
"start_to_close_timeout": timedelta(seconds=30),
},
"execute_tool": {
"execute_in": "activity",
"start_to_close_timeout": timedelta(seconds=30),
},
}
@workflow.defn
class ReactAgentFunctionalWorkflow:
@workflow.run
async def run(self, query: str) -> dict:
return await temporal_entrypoint("react-agent").ainvoke(query)