Skip to content

Commit 38ce5c8

Browse files
committed
Nexus samples
1 parent 5d64065 commit 38ce5c8

File tree

15 files changed

+2926
-2000
lines changed

15 files changed

+2926
-2000
lines changed

hello_nexus/basic/README.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
Usually you will want to create a service definition to formalize the service contract.
2+
However it is possible to define a Nexus service and operation handlers without creating a
3+
service definition. This sample demonstrates how to do that. This may be appropriate if
4+
you want to call a Nexus operation that is being executed by a Worker in the same
5+
namespace as the caller: in other words, if the Nexus operation is playing a role similar
6+
to an Activity.
7+
8+
### Instructions
9+
10+
Start a Temporal server.
11+
12+
Run the following:
13+
14+
```
15+
temporal operator namespace create --namespace my-target-namespace
16+
temporal operator namespace create --namespace my-caller-namespace
17+
18+
temporal operator nexus endpoint create \
19+
--name my-nexus-endpoint-name-python \
20+
--target-namespace my-target-namespace \
21+
--target-task-queue my-target-task-queue-python \
22+
--description-file ./hello_nexus/basic/service/description.md
23+
```
24+
25+
In one terminal, run the Temporal worker in the handler namespace:
26+
```
27+
uv run hello_nexus/basic/handler/worker.py
28+
```
29+
30+
In another terminal, run the Temporal worker in the caller namespace and start the caller
31+
workflow:
32+
```
33+
uv run hello_nexus/basic/caller/app.py
34+
```

hello_nexus/basic/caller/app.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import asyncio
2+
import uuid
3+
4+
from temporalio.client import Client
5+
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
6+
7+
from hello_nexus.basic.caller.workflows import CallerWorkflow
8+
9+
10+
async def main() -> None:
11+
client = await Client.connect(
12+
"localhost:7233",
13+
namespace="my-caller-namespace",
14+
)
15+
task_queue = "my-caller-task-queue"
16+
17+
async with Worker(
18+
client,
19+
task_queue=task_queue,
20+
workflows=[CallerWorkflow],
21+
# TODO(dan): isinstance(op, nexusrpc.contract.Operation) is failing under the
22+
# sandbox in temporalio/worker/_interceptor.py
23+
workflow_runner=UnsandboxedWorkflowRunner(),
24+
):
25+
result = await client.execute_workflow(
26+
CallerWorkflow.run,
27+
arg="world",
28+
id=str(uuid.uuid4()),
29+
task_queue=task_queue,
30+
)
31+
print("🟢 workflow result:", result)
32+
33+
34+
if __name__ == "__main__":
35+
loop = asyncio.new_event_loop()
36+
try:
37+
loop.run_until_complete(main())
38+
except KeyboardInterrupt:
39+
loop.run_until_complete(loop.shutdown_asyncgens())
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from temporalio import workflow
2+
from temporalio.workflow import NexusServiceClient
3+
4+
from hello_nexus.basic.service import (
5+
MyInput,
6+
MyNexusService,
7+
)
8+
9+
10+
@workflow.defn
11+
class CallerWorkflow:
12+
def __init__(self):
13+
self.nexus_service_client = NexusServiceClient(
14+
MyNexusService,
15+
endpoint="my-nexus-endpoint-name-python",
16+
)
17+
18+
@workflow.run
19+
async def run(self, name: str) -> None:
20+
await self.nexus_service_client.execute_operation(
21+
MyNexusService.my_sync_operation,
22+
MyInput(name),
23+
)
24+
await self.nexus_service_client.start_operation(
25+
MyNexusService.my_workflow_run_operation,
26+
MyInput(name),
27+
)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from __future__ import annotations
2+
3+
4+
class MyDBClient:
5+
"""
6+
This class represents a resource that your Nexus operation handlers may need when they
7+
are handling Nexus requests, but which is only available when the Nexus worker is
8+
started. Notice that:
9+
10+
(a) The user's service handler class __init__ constructor takes a MyDBClient instance
11+
(see hello_nexus.handler.MyNexusService)
12+
13+
(b) The user is responsible for instantiating the service handler class when they
14+
start the worker (see hello_nexus.handler.worker), so they can pass any
15+
necessary resources (such as this database client) to the service handler.
16+
"""
17+
18+
@classmethod
19+
def connect(cls) -> MyDBClient:
20+
return cls()
21+
22+
def execute(self, query: str) -> str:
23+
return "query-result"
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
"""
2+
This file demonstrates how to define operation handlers by implementing the `start`
3+
method only, using the "shorthand" decorators sync_operation_handler and
4+
workflow_run_operation_handler.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
import uuid
10+
11+
import temporalio.common
12+
import temporalio.nexus.handler
13+
from nexusrpc.handler import (
14+
StartOperationContext,
15+
service_handler,
16+
sync_operation_handler,
17+
)
18+
from temporalio.client import WorkflowHandle
19+
20+
from hello_nexus.basic.handler.db_client import MyDBClient
21+
from hello_nexus.basic.handler.workflows import WorkflowStartedByNexusOperation
22+
from hello_nexus.basic.service import (
23+
MyInput,
24+
MyNexusService,
25+
MyOutput,
26+
)
27+
28+
29+
@service_handler(service=MyNexusService)
30+
class MyNexusServiceHandler:
31+
# You can create an __init__ method accepting what is needed by your operation
32+
# handlers to handle requests. You typically instantiate your service handler class
33+
# when starting your worker. See hello_nexus/basic/handler/worker.py.
34+
def __init__(self, connected_db_client: MyDBClient):
35+
# `connected_db_client` is intended as an example of something that might be
36+
# required by your operation handlers when handling requests, but is only
37+
# available at worker-start time.
38+
self.connected_db_client = connected_db_client
39+
40+
# This is a nexus operation that is backed by a Temporal workflow. The start method
41+
# starts a workflow, and returns a nexus operation token that the handler can use to
42+
# obtain a workflow handle (for example if a cancel request is subsequently sent by
43+
# the caller). The Temporal server takes care of delivering the workflow result to the
44+
# calling workflow. The task queue defaults to the task queue being used by the Nexus
45+
# worker.
46+
@temporalio.nexus.handler.workflow_run_operation_handler
47+
async def my_workflow_run_operation(
48+
self, ctx: StartOperationContext, input: MyInput
49+
) -> WorkflowHandle[WorkflowStartedByNexusOperation, MyOutput]:
50+
# You could use self.connected_db_client here.
51+
return await temporalio.nexus.handler.start_workflow(
52+
ctx,
53+
WorkflowStartedByNexusOperation.run,
54+
input,
55+
id=str(uuid.uuid4()),
56+
)
57+
58+
# This is a sync operation. The `start` method actually returns the result.
59+
# Implementations are free to make arbitrary network calls, or perform CPU-bound
60+
# computations such as this one. Total execution duration must not exceed 10s.
61+
@sync_operation_handler
62+
async def my_sync_operation(
63+
self, ctx: StartOperationContext, input: MyInput
64+
) -> MyOutput:
65+
# You could use self.connected_db_client here.
66+
return MyOutput(message=f"Hello {input.name}!")
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
"""
2+
Notes:
3+
4+
Sync operations:
5+
---------------
6+
Implementations are free to make arbitrary network calls, or perform CPU-bound
7+
computations such as this one. Total execution duration must not exceed 10s. To
8+
perform Temporal client calls such as signaling/querying/listing workflows, use
9+
self.client.
10+
11+
12+
Workflow operations:
13+
---------------------
14+
The task queue defaults to the task queue being used by the Nexus worker.
15+
"""
16+
17+
from __future__ import annotations
18+
19+
import uuid
20+
21+
import temporalio.common
22+
import temporalio.nexus.handler
23+
from nexusrpc.handler import (
24+
CancelOperationContext,
25+
FetchOperationInfoContext,
26+
FetchOperationResultContext,
27+
OperationHandler,
28+
OperationInfo,
29+
StartOperationContext,
30+
StartOperationResultAsync,
31+
StartOperationResultSync,
32+
operation_handler,
33+
service_handler,
34+
)
35+
36+
from hello_nexus.basic.handler.db_client import MyDBClient
37+
from hello_nexus.basic.handler.service_handler import (
38+
MyInput,
39+
MyNexusService,
40+
MyOutput,
41+
)
42+
from hello_nexus.basic.handler.workflows import WorkflowStartedByNexusOperation
43+
44+
45+
@service_handler(service=MyNexusService)
46+
class MyNexusServiceHandlerUsingOperationHandlerClasses:
47+
# You can create an __init__ method accepting what is needed by your operation
48+
# handlers to handle requests. You typically instantiate your service handler class
49+
# when starting your worker. See hello_nexus/basic/handler/worker.py.
50+
def __init__(self, connected_db_client: MyDBClient):
51+
# `connected_db_client` is intended as an example of something that might be
52+
# required by your operation handlers when handling requests, but is only
53+
# available at worker-start time.
54+
self.connected_db_client = connected_db_client
55+
56+
@operation_handler
57+
def my_sync_operation(self) -> OperationHandler[MyInput, MyOutput]:
58+
# Pass any required arguments to the OperationHandler __init__ method here.
59+
return MySyncOperation()
60+
61+
@operation_handler
62+
def my_workflow_run_operation(
63+
self,
64+
) -> OperationHandler[MyInput, MyOutput]:
65+
# Pass any required arguments to the OperationHandler __init__ method here.
66+
return MyWorkflowRunOperation()
67+
68+
69+
class MySyncOperation(OperationHandler[MyInput, MyOutput]):
70+
# You can add an __init__ method taking any required arguments, since you are in
71+
# control of instantiating the OperationHandler inside the operation handler method
72+
# above decorated with @operation_handler.
73+
74+
async def start(
75+
self, ctx: StartOperationContext, input: MyInput
76+
) -> StartOperationResultSync[MyOutput]:
77+
output = MyOutput(message=f"Hello {input.name}!")
78+
return StartOperationResultSync(output)
79+
80+
async def fetch_info(
81+
self,
82+
ctx: FetchOperationInfoContext,
83+
token: str,
84+
) -> OperationInfo:
85+
raise NotImplementedError(
86+
"fetch_info is not supported when a Nexus operation is called by a Temporal workflow"
87+
)
88+
89+
async def fetch_result(
90+
self,
91+
ctx: FetchOperationResultContext,
92+
token: str,
93+
) -> MyOutput:
94+
raise NotImplementedError(
95+
"fetch_result is not supported when a Nexus operation is called by a Temporal workflow, "
96+
"but this sample does not demonstrate result fetching"
97+
)
98+
99+
async def cancel(
100+
self,
101+
ctx: CancelOperationContext,
102+
token: str,
103+
) -> None:
104+
raise NotImplementedError(
105+
"cancel is supported when a Nexus operation is called by a Temporal workflow, "
106+
"but this sample does not demonstrate cancellation"
107+
)
108+
109+
110+
class MyWorkflowRunOperation(OperationHandler[MyInput, MyOutput]):
111+
async def start(
112+
self, ctx: StartOperationContext, input: MyInput
113+
) -> StartOperationResultAsync:
114+
workflow_id = str(uuid.uuid4())
115+
wf_handle = await temporalio.nexus.handler.start_workflow(
116+
ctx,
117+
WorkflowStartedByNexusOperation.run,
118+
input,
119+
id=workflow_id,
120+
)
121+
# TODO(dan)
122+
token = temporalio.nexus.token.WorkflowOperationToken.from_workflow_handle(
123+
wf_handle
124+
).encode()
125+
return StartOperationResultAsync(token)
126+
127+
async def cancel(self, ctx: CancelOperationContext, token: str) -> None:
128+
return await temporalio.nexus.handler.cancel_workflow(ctx, token)
129+
130+
async def fetch_info(
131+
self, ctx: FetchOperationInfoContext, token: str
132+
) -> OperationInfo:
133+
raise NotImplementedError(
134+
"fetch_info is not supported when a Nexus operation is called by a Temporal workflow"
135+
)
136+
137+
async def fetch_result(
138+
self, ctx: FetchOperationResultContext, token: str
139+
) -> MyOutput:
140+
raise NotImplementedError(
141+
"fetch_result is not supported when a Nexus operation is called by a Temporal workflow, "
142+
"but this sample does not demonstrate result fetching"
143+
)

0 commit comments

Comments
 (0)