Skip to content

Commit fdf011e

Browse files
committed
Signal and update design patterns
1 parent 5c9927f commit fdf011e

File tree

4 files changed

+347
-0
lines changed

4 files changed

+347
-0
lines changed
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
import asyncio
2+
import logging
3+
from datetime import timedelta
4+
5+
from temporalio import activity, common, workflow
6+
from temporalio.client import Client, WorkflowHandle
7+
from temporalio.worker import Worker
8+
9+
10+
class WorkflowBase:
11+
def __init__(self) -> None:
12+
self.letters = []
13+
14+
async def get_letters(self, text: str):
15+
for i in range(len(text)):
16+
letter = await workflow.execute_activity(
17+
get_letter,
18+
args=[text, i],
19+
start_to_close_timeout=timedelta(seconds=10),
20+
)
21+
self.letters.append(letter)
22+
23+
24+
@workflow.defn
25+
class AccumulateLettersIncorrect(WorkflowBase):
26+
"""
27+
This workflow implementation is incorrect: the handler execution interleaves with the main
28+
workflow coroutine.
29+
"""
30+
31+
def __init__(self) -> None:
32+
super().__init__()
33+
self.handler_started = False
34+
35+
@workflow.run
36+
async def run(self) -> str:
37+
await workflow.wait_condition(lambda: self.handler_started)
38+
await self.get_letters(
39+
"world!"
40+
) # Bug: handler and main wf are now interleaving
41+
42+
await workflow.wait_condition(lambda: len(self.letters) == len("Hello world!"))
43+
return "".join(self.letters)
44+
45+
@workflow.update
46+
async def put_letters(self, text: str):
47+
self.handler_started = True
48+
await self.get_letters(text)
49+
50+
51+
@workflow.defn
52+
class AccumulateLettersCorrect1(WorkflowBase):
53+
def __init__(self) -> None:
54+
super().__init__()
55+
self.handler_text = asyncio.Future[str]()
56+
57+
@workflow.run
58+
async def run(self) -> str:
59+
handler_text = await self.handler_text
60+
await self.get_letters(handler_text)
61+
await self.get_letters("world!")
62+
await workflow.wait_condition(lambda: len(self.letters) == len("Hello world!"))
63+
return "".join(self.letters)
64+
65+
# Note: sync handler
66+
@workflow.update
67+
def put_letters(self, text: str):
68+
self.handler_text.set_result(text)
69+
70+
71+
@workflow.defn
72+
class AccumulateLettersCorrect2(WorkflowBase):
73+
def __init__(self) -> None:
74+
super().__init__()
75+
self.handler_complete = False
76+
77+
@workflow.run
78+
async def run(self) -> str:
79+
await workflow.wait_condition(lambda: self.handler_complete)
80+
await self.get_letters("world!")
81+
await workflow.wait_condition(lambda: len(self.letters) == len("Hello world!"))
82+
return "".join(self.letters)
83+
84+
@workflow.update
85+
async def put_letters(self, text: str):
86+
await self.get_letters(text)
87+
self.handler_complete = True
88+
89+
90+
@activity.defn
91+
async def get_letter(text: str, i: int) -> str:
92+
return text[i]
93+
94+
95+
async def app(wf: WorkflowHandle):
96+
await wf.execute_update(AccumulateLettersCorrect1.put_letters, args=["Hello "])
97+
print(await wf.result())
98+
99+
100+
async def main():
101+
client = await Client.connect("localhost:7233")
102+
103+
async with Worker(
104+
client,
105+
task_queue="tq",
106+
workflows=[
107+
AccumulateLettersIncorrect,
108+
AccumulateLettersCorrect1,
109+
AccumulateLettersCorrect2,
110+
],
111+
activities=[get_letter],
112+
):
113+
for wf in [
114+
AccumulateLettersIncorrect,
115+
AccumulateLettersCorrect1,
116+
AccumulateLettersCorrect2,
117+
]:
118+
handle = await client.start_workflow(
119+
wf.run,
120+
id="wid",
121+
task_queue="tq",
122+
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
123+
)
124+
await app(handle)
125+
126+
127+
if __name__ == "__main__":
128+
logging.basicConfig(level=logging.INFO)
129+
asyncio.run(main())
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import asyncio
2+
import logging
3+
import random
4+
5+
from temporalio import common, workflow
6+
from temporalio.client import Client, WorkflowHandle
7+
from temporalio.worker import Worker
8+
9+
Payload = str
10+
11+
12+
class Queue:
13+
def __init__(self) -> None:
14+
self._head = 0
15+
self._futures: dict[int, asyncio.Future[Payload]] = {}
16+
self.lock = asyncio.Lock()
17+
18+
def add(self, item: Payload, position: int):
19+
fut = self._futures.setdefault(position, asyncio.Future())
20+
if not fut.done():
21+
fut.set_result(item)
22+
else:
23+
workflow.logger.warn(f"duplicate message for position {position}")
24+
25+
async def next(self) -> Payload:
26+
async with self.lock:
27+
payload = await self._futures.setdefault(self._head, asyncio.Future())
28+
self._head += 1
29+
return payload
30+
31+
32+
@workflow.defn
33+
class MessageProcessor:
34+
def __init__(self) -> None:
35+
self.queue = Queue()
36+
37+
@workflow.run
38+
async def run(self):
39+
while True:
40+
payload = await self.queue.next()
41+
workflow.logger.info(payload)
42+
if workflow.info().is_continue_as_new_suggested():
43+
workflow.continue_as_new()
44+
45+
@workflow.update
46+
def process_message(self, sequence_number: int, payload: Payload): # sync handler
47+
self.queue.add(payload, sequence_number)
48+
49+
50+
async def app(wf: WorkflowHandle):
51+
sequence_numbers = list(range(10))
52+
random.shuffle(sequence_numbers)
53+
for i in sequence_numbers:
54+
await wf.execute_update(
55+
MessageProcessor.process_message, args=[i, f"payload {i}"]
56+
)
57+
58+
59+
async def main():
60+
client = await Client.connect("localhost:7233")
61+
62+
async with Worker(
63+
client,
64+
task_queue="tq",
65+
workflows=[MessageProcessor],
66+
):
67+
wf = await client.start_workflow(
68+
MessageProcessor.run,
69+
id="wid",
70+
task_queue="tq",
71+
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
72+
)
73+
await app(wf)
74+
75+
76+
if __name__ == "__main__":
77+
logging.basicConfig(level=logging.INFO)
78+
asyncio.run(main())
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import asyncio
2+
import logging
3+
4+
from temporalio import common, workflow
5+
from temporalio.client import Client, WorkflowHandle
6+
from temporalio.worker import Worker
7+
8+
9+
@workflow.defn
10+
class WashAndDryCycle:
11+
12+
def __init__(self) -> None:
13+
self.wash_complete = False
14+
self.dry_complete = False
15+
16+
@workflow.run
17+
async def run(self):
18+
await workflow.wait_condition(lambda: self.dry_complete)
19+
20+
@workflow.update
21+
async def wash(self):
22+
self.wash_complete = True
23+
workflow.logger.info("washing")
24+
25+
@workflow.update
26+
async def dry(self):
27+
await workflow.wait_condition(lambda: self.wash_complete)
28+
self.dry_complete = True
29+
workflow.logger.info("drying")
30+
31+
32+
async def app(wf: WorkflowHandle):
33+
await asyncio.gather(
34+
wf.execute_update(WashAndDryCycle.dry), wf.execute_update(WashAndDryCycle.wash)
35+
)
36+
37+
38+
async def main():
39+
client = await Client.connect("localhost:7233")
40+
41+
async with Worker(
42+
client,
43+
task_queue="tq",
44+
workflows=[WashAndDryCycle],
45+
):
46+
handle = await client.start_workflow(
47+
WashAndDryCycle.run,
48+
id="wid",
49+
task_queue="tq",
50+
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
51+
)
52+
await app(handle)
53+
54+
55+
if __name__ == "__main__":
56+
logging.basicConfig(level=logging.INFO)
57+
asyncio.run(main())
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import asyncio
2+
import logging
3+
from datetime import timedelta
4+
5+
from temporalio import activity, common, workflow
6+
from temporalio.client import Client, WorkflowHandle
7+
from temporalio.worker import Worker
8+
9+
10+
@workflow.defn
11+
class WashAndDryCycle:
12+
13+
def __init__(self) -> None:
14+
self.has_detergent = False
15+
self.wash_complete = False
16+
self.non_dryables_removed = False
17+
self.dry_complete = False
18+
19+
@workflow.run
20+
async def run(self):
21+
await workflow.execute_activity(
22+
add_detergent, start_to_close_timeout=timedelta(seconds=10)
23+
)
24+
self.has_detergent = True
25+
await workflow.wait_condition(lambda: self.wash_complete)
26+
await workflow.execute_activity(
27+
remove_non_dryables, start_to_close_timeout=timedelta(seconds=10)
28+
)
29+
self.non_dryables_removed = True
30+
await workflow.wait_condition(lambda: self.dry_complete)
31+
32+
@workflow.update
33+
async def wash(self):
34+
await workflow.wait_condition(lambda: self.has_detergent)
35+
self.wash_complete = True
36+
workflow.logger.info("washing")
37+
38+
@workflow.update
39+
async def dry(self):
40+
await workflow.wait_condition(
41+
lambda: self.wash_complete and self.non_dryables_removed
42+
)
43+
self.dry_complete = True
44+
workflow.logger.info("drying")
45+
46+
47+
@activity.defn
48+
async def add_detergent():
49+
print("adding detergent")
50+
51+
52+
@activity.defn
53+
async def remove_non_dryables():
54+
print("removing non-dryables")
55+
56+
57+
async def app(wf: WorkflowHandle):
58+
await asyncio.gather(
59+
wf.execute_update(WashAndDryCycle.dry), wf.execute_update(WashAndDryCycle.wash)
60+
)
61+
62+
63+
async def main():
64+
client = await Client.connect("localhost:7233")
65+
66+
async with Worker(
67+
client,
68+
task_queue="tq",
69+
workflows=[WashAndDryCycle],
70+
activities=[add_detergent, remove_non_dryables],
71+
):
72+
handle = await client.start_workflow(
73+
WashAndDryCycle.run,
74+
id="wid",
75+
task_queue="tq",
76+
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
77+
)
78+
await app(handle)
79+
80+
81+
if __name__ == "__main__":
82+
logging.basicConfig(level=logging.INFO)
83+
asyncio.run(main())

0 commit comments

Comments
 (0)