Skip to content

Commit de436b0

Browse files
committed
Nexus prototype
uv add --editable ../sdk-python uv add --group nexus --editable ../nexus-sdk-python
1 parent 7a1dd4d commit de436b0

File tree

13 files changed

+1253
-358
lines changed

13 files changed

+1253
-358
lines changed

hello_nexus/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from rich.traceback import install
2+
3+
install(show_locals=True)

hello_nexus/caller/app.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import asyncio
2+
import sys
3+
from typing import Any, Type
4+
5+
from temporalio.client import Client
6+
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
7+
8+
from hello_nexus.caller.workflows import (
9+
Echo2CallerWorkflow,
10+
Echo3CallerWorkflow,
11+
EchoCallerWorkflow,
12+
Hello2CallerWorkflow,
13+
HelloCallerWorkflow,
14+
)
15+
16+
interrupt_event = asyncio.Event()
17+
18+
19+
async def execute_workflow(workflow_cls: Type[Any], input: Any) -> None:
20+
client = await Client.connect("localhost:7233", namespace="my-caller-namespace")
21+
task_queue = "my-caller-task-queue"
22+
23+
async with Worker(
24+
client,
25+
task_queue=task_queue,
26+
workflows=[workflow_cls],
27+
workflow_runner=UnsandboxedWorkflowRunner(),
28+
):
29+
print("🟠 Caller worker started")
30+
result = await client.execute_workflow(
31+
workflow_cls.run,
32+
input,
33+
id="my-caller-workflow-id",
34+
task_queue=task_queue,
35+
)
36+
print("🟢 workflow result:", result)
37+
38+
39+
if __name__ == "__main__":
40+
if len(sys.argv) != 2:
41+
print("Usage: python -m nexus.caller.app [echo|hello]")
42+
sys.exit(1)
43+
44+
[wf_name] = sys.argv[1:]
45+
fn = {
46+
"echo": lambda: execute_workflow(EchoCallerWorkflow, "hello"),
47+
"echo2": lambda: execute_workflow(Echo2CallerWorkflow, "hello"),
48+
"echo3": lambda: execute_workflow(Echo3CallerWorkflow, "hello"),
49+
"hello": lambda: execute_workflow(HelloCallerWorkflow, "world"),
50+
"hello2": lambda: execute_workflow(Hello2CallerWorkflow, "world"),
51+
}[wf_name]
52+
53+
loop = asyncio.new_event_loop()
54+
try:
55+
loop.run_until_complete(fn())
56+
except KeyboardInterrupt:
57+
interrupt_event.set()
58+
loop.run_until_complete(loop.shutdown_asyncgens())

hello_nexus/caller/workflows.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
from datetime import timedelta
2+
3+
from temporalio import workflow
4+
from temporalio.exceptions import FailureError
5+
from temporalio.workflow import NexusClient
6+
7+
from hello_nexus.service.interface import (
8+
EchoInput,
9+
EchoOutput,
10+
HelloInput,
11+
HelloOutput,
12+
MyNexusService,
13+
)
14+
15+
16+
class CallerWorkflowBase:
17+
def __init__(self):
18+
self.nexus_client = NexusClient(
19+
MyNexusService, # or string name "my-nexus-service",
20+
"my-nexus-endpoint-name",
21+
schedule_to_close_timeout=timedelta(seconds=30),
22+
)
23+
24+
25+
@workflow.defn
26+
class EchoCallerWorkflow(CallerWorkflowBase):
27+
@workflow.run
28+
async def run(self, message: str) -> EchoOutput:
29+
op_output = await self.nexus_client.execute_operation(
30+
MyNexusService.echo,
31+
EchoInput(message),
32+
)
33+
return op_output
34+
35+
36+
@workflow.defn
37+
class Echo2CallerWorkflow(CallerWorkflowBase):
38+
@workflow.run
39+
async def run(self, message: str) -> EchoOutput:
40+
op_output = await self.nexus_client.execute_operation(
41+
MyNexusService.echo2,
42+
EchoInput(message),
43+
)
44+
return op_output
45+
46+
47+
@workflow.defn
48+
class Echo3CallerWorkflow(CallerWorkflowBase):
49+
@workflow.run
50+
async def run(self, message: str) -> EchoOutput:
51+
op_output = await self.nexus_client.execute_operation(
52+
MyNexusService.echo3,
53+
EchoInput(message),
54+
)
55+
return op_output
56+
57+
58+
@workflow.defn
59+
class HelloCallerWorkflow(CallerWorkflowBase):
60+
@workflow.run
61+
async def run(self, name: str) -> HelloOutput:
62+
handle = await self.nexus_client.start_operation(
63+
MyNexusService.hello,
64+
HelloInput(name),
65+
)
66+
assert handle.cancel()
67+
try:
68+
await handle
69+
except FailureError:
70+
handle = await self.nexus_client.start_operation(
71+
MyNexusService.hello,
72+
HelloInput(name),
73+
)
74+
result = await handle
75+
return result
76+
raise AssertionError("Expected Nexus operation to be cancelled")
77+
78+
79+
@workflow.defn
80+
class Hello2CallerWorkflow(CallerWorkflowBase):
81+
@workflow.run
82+
async def run(self, name: str) -> HelloOutput:
83+
handle = await self.nexus_client.start_operation(
84+
MyNexusService.hello2,
85+
HelloInput(name),
86+
)
87+
return await handle
88+
89+
90+
@workflow.defn
91+
class Hello3CallerWorkflow(CallerWorkflowBase):
92+
@workflow.run
93+
async def run(self, name: str) -> HelloOutput:
94+
handle = await self.nexus_client.start_operation(
95+
MyNexusService.hello3,
96+
HelloInput(name),
97+
)
98+
return await handle

hello_nexus/clean

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
temporal-delete-all my-target-namespace
2+
temporal-delete-all my-caller-namespace
3+
4+
temporal operator namespace create --namespace my-target-namespace
5+
temporal operator namespace create --namespace my-caller-namespace
6+
7+
sleep 1
8+
9+
temporal operator nexus endpoint create \
10+
--name my-nexus-endpoint-name \
11+
--target-namespace my-target-namespace \
12+
--target-task-queue my-target-task-queue \
13+
--description-file ./hello_nexus/service/description.md
14+

hello_nexus/handler/activities.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import asyncio
2+
3+
from temporalio import activity
4+
5+
from hello_nexus.service.interface import (
6+
HelloInput,
7+
HelloOutput,
8+
)
9+
10+
11+
@activity.defn
12+
async def hello_activity(input: HelloInput) -> HelloOutput:
13+
await asyncio.sleep(1)
14+
return HelloOutput(message=f"hello {input.name}")

hello_nexus/handler/dbclient.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
class MyDBClient:
2+
def execute(self, query: str) -> str:
3+
return "<query result>"
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
"""
2+
Notes:
3+
4+
Sync operations:
5+
---------------
6+
Implementations are free to make arbitrary network calls, or perform CPU-bound
7+
computations such as this one. Total execution duration must not exceed 10s. To
8+
perform Temporal client calls such as signaling/querying/listing workflows, use
9+
self.client.
10+
11+
12+
Workflow operations:
13+
---------------------
14+
The task queue defaults to the task queue being used by the Nexus worker.
15+
"""
16+
17+
from __future__ import annotations
18+
19+
import nexusrpc.handler
20+
import temporalio.common
21+
import temporalio.nexus.handler
22+
23+
from hello_nexus.handler.dbclient import MyDBClient
24+
from hello_nexus.handler.workflows import HelloWorkflow
25+
from hello_nexus.service import interface
26+
from hello_nexus.service.interface import (
27+
EchoInput,
28+
EchoOutput,
29+
HelloInput,
30+
HelloOutput,
31+
)
32+
33+
34+
# Inheriting from the protocol here is optional. Users who do it will get the
35+
# operation definition itself type-checked in situ against the interface (*).
36+
# Call-sites using instances of the operation are always type-checked.
37+
#
38+
# (*) However, in VSCode/Pyright this is done only when type-checking is set to
39+
# 'strict'.
40+
class EchoOperation(nexusrpc.handler.Operation[EchoInput, EchoOutput]):
41+
def __init__(self, service: MyNexusService):
42+
self.service = service
43+
44+
async def start(
45+
self, input: EchoInput, options: nexusrpc.handler.StartOperationOptions
46+
) -> EchoOutput:
47+
return EchoOutput(message=f"Echo {input.message}!")
48+
49+
async def cancel(
50+
self, token: str, options: nexusrpc.handler.CancelOperationOptions
51+
) -> None:
52+
raise NotImplementedError
53+
54+
async def fetch_info(
55+
self, token: str, options: nexusrpc.handler.FetchOperationInfoOptions
56+
) -> nexusrpc.handler.OperationInfo:
57+
raise NotImplementedError
58+
59+
async def fetch_result(
60+
self, token: str, options: nexusrpc.handler.FetchOperationResultOptions
61+
) -> EchoOutput:
62+
raise NotImplementedError
63+
64+
65+
# Inheriting from the protocol here is optional. Users who do it will get the
66+
# operation definition itself type-checked in situ against the interface (*).
67+
# Call-sites using instances of the operation are always type-checked.
68+
#
69+
# (*) However, in VSCode/Pyright this is done only when type-checking is set to
70+
# 'strict'.
71+
class HelloOperation: # (nexusrpc.handler.Operation[HelloInput, HelloOutput]):
72+
def __init__(self, service: "MyNexusService"):
73+
self.service = service
74+
75+
async def start(
76+
self, input: HelloInput, options: nexusrpc.handler.StartOperationOptions
77+
) -> temporalio.nexus.handler.AsyncWorkflowOperationResult[HelloOutput]:
78+
self.service.db_client.execute("<some query>")
79+
workflow_id = "default-workflow-id"
80+
return await temporalio.nexus.handler.start_workflow(
81+
HelloWorkflow.run,
82+
input,
83+
id=workflow_id,
84+
options=options,
85+
)
86+
87+
async def cancel(
88+
self, token: str, options: nexusrpc.handler.CancelOperationOptions
89+
) -> None:
90+
return await temporalio.nexus.handler.cancel_workflow(token, options)
91+
92+
async def fetch_info(
93+
self, token: str, options: nexusrpc.handler.FetchOperationInfoOptions
94+
) -> nexusrpc.handler.OperationInfo:
95+
return await temporalio.nexus.handler.fetch_workflow_info(token, options)
96+
97+
async def fetch_result(
98+
self, token: str, options: nexusrpc.handler.FetchOperationResultOptions
99+
) -> HelloOutput:
100+
return await temporalio.nexus.handler.fetch_workflow_result(token, options)
101+
102+
103+
class EchoOperation3(nexusrpc.handler.AbstractOperation[EchoInput, EchoOutput]):
104+
async def start(
105+
self, input: EchoInput, options: nexusrpc.handler.StartOperationOptions
106+
) -> EchoOutput:
107+
return EchoOutput(message=f"Echo {input.message}! [from base class variant]")
108+
109+
110+
@nexusrpc.handler.service(interface=interface.MyNexusService)
111+
class MyNexusService:
112+
def __init__(self, db_client: MyDBClient):
113+
# An example of something that might be held by the service instance.
114+
self.db_client = db_client
115+
116+
# --------------------------------------------------------------------------
117+
# Operations defined by explicitly implementing the Operation interface.
118+
#
119+
120+
@nexusrpc.handler.operation
121+
def echo(self) -> nexusrpc.handler.Operation[EchoInput, EchoOutput]:
122+
return EchoOperation(self)
123+
124+
@nexusrpc.handler.operation
125+
def hello(self) -> nexusrpc.handler.Operation[HelloInput, HelloOutput]:
126+
return HelloOperation(self)
127+
128+
@nexusrpc.handler.operation
129+
def echo3(self) -> nexusrpc.handler.Operation[EchoInput, EchoOutput]:
130+
return EchoOperation3()
131+
132+
# --------------------------------------------------------------------------
133+
# Operations defined by providing the start method only, using the
134+
# "shorthand" decorators.
135+
#
136+
# Note that a start method defined this way has access to the service
137+
# instance, but not to the operation instance (users who need the latter
138+
# should implement the Operation interface directly).
139+
140+
@nexusrpc.handler.sync_operation
141+
async def echo2(
142+
self, input: EchoInput, _: nexusrpc.handler.StartOperationOptions
143+
) -> EchoOutput:
144+
return EchoOutput(message=f"Echo {input.message} [via shorthand]!")
145+
146+
# --------------------------------------------------------------------------
147+
# Operations defined by providing the start method only, using the
148+
# "shorthand" decorators.
149+
#
150+
# Note that a start method defined this way has access to the service
151+
# instance, but not to the operation instance (users who need the latter
152+
# should implement the Operation interface directly).
153+
154+
@temporalio.nexus.handler.workflow_operation
155+
async def hello2(
156+
self, input: HelloInput, options: nexusrpc.handler.StartOperationOptions
157+
) -> temporalio.nexus.handler.AsyncWorkflowOperationResult[HelloOutput]:
158+
self.db_client.execute("<some query>")
159+
workflow_id = "default-workflow-id"
160+
input.name += " [via shorthand]"
161+
return await temporalio.nexus.handler.start_workflow(
162+
HelloWorkflow.run,
163+
input,
164+
id=workflow_id,
165+
options=options,
166+
)
167+
168+
169+
if __name__ == "__main__":
170+
# Check run-time type annotations resulting from the decorators.
171+
service = MyNexusService(MyDBClient())
172+
print("echo:", temporalio.common._type_hints_from_func(service.echo2().start))
173+
print(
174+
"hello:", temporalio.common._type_hints_from_func(service.hello2().fetch_result)
175+
)

0 commit comments

Comments
 (0)