Skip to content

Commit e279966

Browse files
committed
context vars
1 parent 41877f0 commit e279966

File tree

4 files changed

+33
-4
lines changed

4 files changed

+33
-4
lines changed

custom_metric/activity.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
import threading
12
import time
23

34
from temporalio import activity
5+
from custom_metric.shared import user_id
46

57

68
@activity.defn
79
def print_and_sleep():
8-
print("In the activity.")
10+
print(f"In the activity. in thread {threading.current_thread().name}")
11+
print(f"User ID: {user_id.get()} in activity {activity.info().activity_id}")
912
time.sleep(1)

custom_metric/shared.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from contextvars import ContextVar
2+
from typing import Optional
3+
4+
user_id: ContextVar[Optional[str]] = ContextVar("user_id", default=None)

custom_metric/worker.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
from concurrent.futures import ThreadPoolExecutor
3+
import threading
34

45
from temporalio import activity
56
from temporalio.client import Client
@@ -13,12 +14,14 @@
1314

1415
from custom_metric.activity import print_and_sleep
1516
from custom_metric.workflow import StartTwoActivitiesWorkflow
17+
from custom_metric.shared import user_id
1618

1719

1820
class SimpleWorkerInterceptor(Interceptor):
1921
def intercept_activity(
2022
self, next: ActivityInboundInterceptor
2123
) -> ActivityInboundInterceptor:
24+
user_id.set(activity.info().activity_id) # Set a user ID for the activity context
2225
return CustomScheduleToStartInterceptor(next)
2326

2427

@@ -32,6 +35,9 @@ async def execute_activity(self, input: ExecuteActivityInput):
3235
# Could do the original schedule time instead of current attempt
3336
# schedule_to_start_second_option = activity.info().started_time - activity.info().scheduled_time
3437

38+
# print the thread name for debugging
39+
print(f"In the activity interceptor. in thread {threading.current_thread().name}")
40+
3541
meter = activity.metric_meter()
3642
histogram = meter.create_histogram_timedelta(
3743
"custom_activity_schedule_to_start_latency",
@@ -60,8 +66,8 @@ async def main():
6066
activities=[print_and_sleep],
6167
# only one activity executor with two concurrently scheduled activities
6268
# to force a nontrivial schedule to start times
63-
activity_executor=ThreadPoolExecutor(1),
64-
max_concurrent_activities=1,
69+
activity_executor=ThreadPoolExecutor(10),
70+
max_concurrent_activities=10,
6571
)
6672

6773
await worker.run()

custom_metric/workflow.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,21 @@ async def run(self):
2121
print_and_sleep,
2222
start_to_close_timeout=timedelta(seconds=5),
2323
)
24-
await asyncio.gather(activity1, activity2)
24+
activity3 = workflow.execute_activity(
25+
print_and_sleep,
26+
start_to_close_timeout=timedelta(seconds=5),
27+
)
28+
activity4 = workflow.execute_activity(
29+
print_and_sleep,
30+
start_to_close_timeout=timedelta(seconds=5),
31+
)
32+
activity5 = workflow.execute_activity(
33+
print_and_sleep,
34+
start_to_close_timeout=timedelta(seconds=5),
35+
)
36+
activity6 = workflow.execute_activity(
37+
print_and_sleep,
38+
start_to_close_timeout=timedelta(seconds=5),
39+
)
40+
await asyncio.gather(activity1, activity2, activity3, activity4, activity5, activity6)
2541
return None

0 commit comments

Comments
 (0)