Skip to content

Commit 271c7c9

Browse files
committed
Add sample DT app + screenshots
1 parent d5754dd commit 271c7c9

File tree

6 files changed

+358
-0
lines changed

6 files changed

+358
-0
lines changed
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
# Distributed Tracing Example
2+
3+
This example demonstrates how to set up **distributed tracing** with the
4+
Durable Task Python SDK using [OpenTelemetry](https://opentelemetry.io/)
5+
and [Jaeger](https://www.jaegertracing.io/) as the trace backend.
6+
7+
The sample orchestration showcases three key Durable Task features that
8+
all produce correlated trace spans:
9+
10+
1. **Timers** — a short delay before starting work.
11+
1. **Sub-orchestration** — delegates city-level weather collection to a
12+
child orchestration.
13+
1. **Activities** — individual activity calls to fetch weather data and
14+
produce a summary.
15+
16+
## Prerequisites
17+
18+
- [Docker](https://www.docker.com/) (for the emulator and Jaeger)
19+
- Python 3.10+
20+
21+
## Quick Start
22+
23+
### 1. Start the DTS Emulator
24+
25+
```bash
26+
docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:latest
27+
```
28+
29+
### 2. Start Jaeger
30+
31+
Jaeger's all-in-one image accepts OTLP over gRPC on port **4317** and
32+
serves the UI on port **16686**:
33+
34+
```bash
35+
docker run --name jaeger -d \
36+
-p 4317:4317 \
37+
-p 16686:16686 \
38+
jaegertracing/all-in-one:latest
39+
```
40+
41+
PowerShell:
42+
43+
```powershell
44+
docker run --name jaeger -d `
45+
-p 4317:4317 `
46+
-p 16686:16686 `
47+
jaegertracing/all-in-one:latest
48+
```
49+
50+
### 3. Install Dependencies
51+
52+
Create and activate a virtual environment, then install the required
53+
packages:
54+
55+
```bash
56+
python -m venv .venv
57+
```
58+
59+
Bash:
60+
61+
```bash
62+
source .venv/bin/activate
63+
```
64+
65+
PowerShell:
66+
67+
```powershell
68+
.\.venv\Scripts\Activate.ps1
69+
```
70+
71+
Install requirements:
72+
73+
```bash
74+
pip install -r requirements.txt
75+
```
76+
77+
If you are running from a local clone of the repository, install the
78+
local packages in editable mode instead (run from the repo root):
79+
80+
```bash
81+
pip install -e ".[opentelemetry]" -e ./durabletask-azuremanaged
82+
```
83+
84+
### 4. Run the Example
85+
86+
```bash
87+
python app.py
88+
```
89+
90+
Once the orchestration completes, open the Jaeger UI at
91+
<http://localhost:16686>, select the **durabletask-tracing-example**
92+
service, and click **Find Traces** to explore the spans.
93+
94+
## What You Will See in Jaeger
95+
96+
A single trace for the orchestration will contain spans for:
97+
98+
- **`orchestration:weather_report_orchestrator`** — the top-level
99+
orchestration span.
100+
- **`timer`** — the 2-second timer delay.
101+
- **`orchestration:collect_weather`** — the sub-orchestration span.
102+
- **`activity:get_weather`** — one span per city
103+
(Tokyo, Seattle, London).
104+
- **`activity:summarize`** — the final summarization activity.
105+
106+
All spans share the same trace ID, so you can follow the full execution
107+
flow from the parent orchestration through the sub-orchestration and
108+
into each activity.
109+
110+
## Configuration
111+
112+
The example reads the following environment variables (all optional):
113+
114+
| Variable | Default | Description |
115+
|---|---|---|
116+
| `ENDPOINT` | `http://localhost:8080` | DTS emulator / scheduler endpoint |
117+
| `TASKHUB` | `default` | Task hub name |
118+
| `OTEL_EXPORTER_OTLP_ENDPOINT` | `http://localhost:4317` | OTLP gRPC endpoint (Jaeger) |
119+
120+
## Important Usage Guidelines for Distributed Tracing
121+
122+
### Install the OpenTelemetry extras
123+
124+
The SDK ships OpenTelemetry as an **optional** dependency. Install it
125+
with the `opentelemetry` extra:
126+
127+
```bash
128+
pip install "durabletask[opentelemetry]"
129+
```
130+
131+
Without these packages the SDK still works, but no trace spans are
132+
emitted.
133+
134+
### Configure the `TracerProvider` before starting the worker
135+
136+
OpenTelemetry requires a configured `TracerProvider` with at least one
137+
`SpanProcessor` and exporter **before** any spans are created. In
138+
practice this means setting it up at the top of your entry-point module,
139+
before constructing the worker or client:
140+
141+
```python
142+
from opentelemetry import trace
143+
from opentelemetry.sdk.trace import TracerProvider
144+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
145+
from opentelemetry.sdk.resources import Resource
146+
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
147+
148+
resource = Resource.create({"service.name": "my-app"})
149+
provider = TracerProvider(resource=resource)
150+
provider.add_span_processor(
151+
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True))
152+
)
153+
trace.set_tracer_provider(provider)
154+
```
155+
156+
### Flush spans before exiting
157+
158+
The `BatchSpanProcessor` buffers spans and exports them in the
159+
background. If the process exits before the buffer is flushed, some
160+
spans may be lost. Call `provider.force_flush()` (and optionally add a
161+
short sleep) before your program terminates:
162+
163+
```python
164+
provider.force_flush()
165+
```
166+
167+
### Orchestrator code must remain deterministic
168+
169+
Distributed tracing does **not** change the determinism requirement for
170+
orchestrator functions. Do not create your own OpenTelemetry spans
171+
inside orchestrator code — the SDK handles span creation automatically.
172+
Activity functions and client code are free to create additional spans
173+
as needed.
174+
175+
### Use `BatchSpanProcessor` in production
176+
177+
`SimpleSpanProcessor` exports every span synchronously, which adds
178+
latency to every operation. Use `BatchSpanProcessor` for production
179+
workloads to avoid performance overhead.
180+
181+
### Choose the right exporter for your backend
182+
183+
This example uses the OTLP/gRPC exporter, which is compatible with
184+
Jaeger 1.35+, the OpenTelemetry Collector, Azure Monitor (via the
185+
Azure Monitor OpenTelemetry exporter), and many other backends. Swap
186+
the exporter if your tracing backend uses a different protocol.
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
"""Distributed tracing example using OpenTelemetry and Jaeger.
5+
6+
This example demonstrates how to configure OpenTelemetry distributed tracing
7+
with the Durable Task Python SDK. The orchestration showcases timers,
8+
activities, and a sub-orchestration, all producing correlated trace spans
9+
visible in the Jaeger UI.
10+
11+
Prerequisites:
12+
- DTS emulator running on localhost:8080
13+
- Jaeger running on localhost:4317 (OTLP gRPC) / localhost:16686 (UI)
14+
- pip install -r requirements.txt
15+
"""
16+
17+
import os
18+
import time
19+
from datetime import timedelta
20+
21+
from opentelemetry import trace
22+
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
23+
from opentelemetry.sdk.resources import Resource
24+
from opentelemetry.sdk.trace import TracerProvider
25+
from opentelemetry.sdk.trace.export import BatchSpanProcessor
26+
27+
from azure.identity import DefaultAzureCredential
28+
29+
from durabletask import client, task
30+
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
31+
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
32+
33+
34+
# ---------------------------------------------------------------------------
35+
# OpenTelemetry configuration — MUST be done before any spans are created
36+
# ---------------------------------------------------------------------------
37+
38+
OTEL_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317")
39+
40+
resource = Resource.create({"service.name": "durabletask-tracing-example"})
41+
provider = TracerProvider(resource=resource)
42+
provider.add_span_processor(
43+
BatchSpanProcessor(
44+
OTLPSpanExporter(endpoint=OTEL_ENDPOINT, insecure=True)
45+
)
46+
)
47+
trace.set_tracer_provider(provider)
48+
49+
50+
# ---------------------------------------------------------------------------
51+
# Activity functions
52+
# ---------------------------------------------------------------------------
53+
54+
def get_weather(ctx: task.ActivityContext, city: str) -> str:
55+
"""Simulate fetching weather data for a city."""
56+
# In a real app this would call an external API
57+
weather_data = {
58+
"Tokyo": "Sunny, 22°C",
59+
"Seattle": "Rainy, 12°C",
60+
"London": "Cloudy, 15°C",
61+
}
62+
result = weather_data.get(city, "Unknown")
63+
print(f" [Activity] get_weather({city}) -> {result}")
64+
return result
65+
66+
67+
def summarize(ctx: task.ActivityContext, reports: list) -> str:
68+
"""Combine individual weather reports into a summary string."""
69+
summary = " | ".join(reports)
70+
print(f" [Activity] summarize -> {summary}")
71+
return summary
72+
73+
74+
# ---------------------------------------------------------------------------
75+
# Sub-orchestration
76+
# ---------------------------------------------------------------------------
77+
78+
def collect_weather(ctx: task.OrchestrationContext, cities: list):
79+
"""Sub-orchestration that collects weather for a list of cities."""
80+
results = []
81+
for city in cities:
82+
weather = yield ctx.call_activity(get_weather, input=city)
83+
results.append(f"{city}: {weather}")
84+
return results
85+
86+
87+
# ---------------------------------------------------------------------------
88+
# Main orchestration
89+
# ---------------------------------------------------------------------------
90+
91+
def weather_report_orchestrator(ctx: task.OrchestrationContext, cities: list):
92+
"""Top-level orchestration demonstrating timers, activities, and sub-orchestrations.
93+
94+
Flow:
95+
1. Wait for a short timer (simulating a scheduled delay).
96+
2. Call a sub-orchestration to collect weather data for each city.
97+
3. Call an activity to summarize the results.
98+
"""
99+
# Step 1 — Timer: wait briefly before starting work
100+
yield ctx.create_timer(timedelta(seconds=2))
101+
if not ctx.is_replaying:
102+
print(" [Orchestrator] Timer fired — starting weather collection")
103+
104+
# Step 2 — Sub-orchestration: delegate city-level work
105+
reports = yield ctx.call_sub_orchestrator(collect_weather, input=cities)
106+
107+
# Step 3 — Activity: summarize the collected reports
108+
summary = yield ctx.call_activity(summarize, input=reports)
109+
110+
return summary
111+
112+
113+
# ---------------------------------------------------------------------------
114+
# Entry point
115+
# ---------------------------------------------------------------------------
116+
117+
if __name__ == "__main__":
118+
# Use environment variables if provided, otherwise use default emulator values
119+
taskhub_name = os.getenv("TASKHUB", "default")
120+
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
121+
122+
print(f"Using taskhub: {taskhub_name}")
123+
print(f"Using endpoint: {endpoint}")
124+
print(f"OTLP endpoint: {OTEL_ENDPOINT}")
125+
126+
# Set credential to None for emulator, or DefaultAzureCredential for Azure
127+
secure_channel = endpoint.startswith("https://")
128+
credential = DefaultAzureCredential() if secure_channel else None
129+
130+
with DurableTaskSchedulerWorker(
131+
host_address=endpoint,
132+
secure_channel=secure_channel,
133+
taskhub=taskhub_name,
134+
token_credential=credential,
135+
) as w:
136+
# Register orchestrators and activities
137+
w.add_orchestrator(weather_report_orchestrator)
138+
w.add_orchestrator(collect_weather)
139+
w.add_activity(get_weather)
140+
w.add_activity(summarize)
141+
w.start()
142+
print("Worker started.")
143+
144+
# Create client, schedule the orchestration, and wait for completion
145+
c = DurableTaskSchedulerClient(
146+
host_address=endpoint,
147+
secure_channel=secure_channel,
148+
taskhub=taskhub_name,
149+
token_credential=credential,
150+
)
151+
152+
cities = ["Tokyo", "Seattle", "London"]
153+
instance_id = c.schedule_new_orchestration(
154+
weather_report_orchestrator, input=cities,
155+
)
156+
print(f"Orchestration started: {instance_id}")
157+
158+
state = c.wait_for_orchestration_completion(instance_id, timeout=60)
159+
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
160+
print(f"Orchestration completed! Result: {state.serialized_output}")
161+
elif state:
162+
print(f"Orchestration failed: {state.failure_details}")
163+
164+
# Flush any remaining spans to the exporter
165+
provider.force_flush()
166+
time.sleep(1)
167+
168+
print("Done. Open Jaeger at http://localhost:16686 to view traces.")
167 KB
Loading
189 KB
Loading
280 KB
Loading
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
durabletask[opentelemetry]
2+
durabletask-azuremanaged
3+
azure-identity
4+
opentelemetry-exporter-otlp-proto-grpc

0 commit comments

Comments
 (0)