The file System_Patching.md provides an overview of the functionality that will be implemented in this workshop.
This file contains the basic implementation as a set of activities to be executed in the workflow.
For the sake of simplicity, some activity executions are skipped. There is a comment in the code that indicates it.
Open the file and review the implementation.
Every step defined in the file System_Patching.md is defined as an activity, except the step 2 (SendApprovalRequestActivity and wait for approval) which is missing and will be added as part of the first exersice.
- The folder
workflow
contains the implementation of the workflow/s and the activities. - temporal_client.py is responsible for sending the request to start the workflow and interact with it (e.g. send signals).
- temporal_worker.py is responsible for running the workflow and activities, this is our application.
- activities.py contains the activity implementations
- types.py workflow and activity input/output types.
Everytime you start working on a new exersice (system_patch_workflow_vx.py) you will need to:
- stop the worker temporal_worker.py and register the new workflow implementation which corresponds to function name. (e.g.
SystemPatchWorkflow_V2
->SystemPatchWorkflow_V3
). - change the workflow type in the temporal_client.py to send the request for the new workflow type (same as above).
You won't have to do it if you are just modifying your own implementation.
Each exercise’s final implementation is in the next version of the file (v1, v2, and so on).
Make sure you can run the workflow system_patch_workflow_v1.py
- Start the temporal server
- Start the worker temporal_worker.py
- Start the client temporal_client.py
- Check the UI
During this exercise, you will implement the step 2 : "Execute SendApprovalRequestActivity. If not approved, terminate."
You can use the file system_patch_workflow_v1.py as starting point:
-
Add the invocation to SendApprovalRequestActivity
-
In order to interact with the workflow to send the approval response, you need to add a signal or update handler to the workflow. The main difference is that signal is asynchronous and update is synchronous.
- https://docs.temporal.io/develop/python/message-passing#signals
- https://docs.temporal.io/develop/python/message-passing#updates
- Additionally, you can add a timeout to workflow.wait_condition
to react if the approval response takes too long.
- If a timer times out it throws a TimeoutError, which you have to catch and handle until this issue is resolved.
- To fail the workflow, you can re-throw the error as an
temporalio.exceptions.ApplicationError
- If the timeout is reached you can decide either fail the workflow or complete it.
-
Modify the client to send the signal or update to the workflow after the workflow is started.
Run the code Restart the worker and the client, the workflow should complete after several seconds, go to the UI and check the progress.
Spend some time inspecting the workflow history in the UI.
Note that the implementation of some activities is designed to fail during the first retry (ej. check_service_health_activity
).
@activity.defn
async def check_service_health_activity(self, activity_input: CheckServiceHealthActivityInput) -> bool:
await self.simulate_activity_execution()
# Simulate a health check failure
if activity_input.hostname == "cluster1_host1" and activity.info().attempt == 1:
raise Exception("Service health check failed: " + str(activity_input))
return True
Open the workflow history in the UI, click on its ActivityTaskStarted and ActivityTaskComplete and
check the information in the fields Attemps
and LastFailure
.
Imagine you need to check periodically the steps that has been completed for each execution.
Workflow queries allow us to retrieve state from a Workflow Execution.
You can use the file system_patch_workflow_v2.py as starting point:
- Create an internal variable to store the steps, e.g.
completed_steps = []
and modify the workflow to append the completed steps to this variable. - Add a query handler to the workflow to return it.
- Modify the client to call the query handler to get the completed steps every x seconds.
Run the code Restart the worker and the client, the workflow should complete after several seconds, go to the UI and check the progress and the logs of the client should show the progress.
Queries should only return in-memory data and do not modify the workflow history (e.g. don't run an activity from a query method)
Temporal allows you to set timeouts to activities and also to send heartbeats from them, this is useful to react quickly to worker failures and to resume the activity execution from when it was interrupted.
There are different types of activity timeouts. For this exercise, let's focus on start_to_close timeout and heartbeats.
Some activities can be long-running, let's use perform_update_activity
for this exersice.
You can use the file system_patch_workflow_v3.py as starting point.
- Review the
perform_update_activity
activity options, where the methodperform_update_activity
is called, note thatstart_to_close_timeout
is set to 10 seconds andmaximum_attempts=3
- Modify the activity implementation to add a
await asyncio.sleep
> 10 seconds.
Run the code
- Restart the worker and the client
- Open the workflow execution in the UI and go to the pending activities tab.
- Note that the activity is retried after the timeout.
- You can modify the default retry policy and set initial_interval, backoff_coefficient etc.
- Note that the activity is retried after the timeout.
- After 3 retries the activity fails with ActivityTaskTimedOut
- Add a log to the start of the activity
perform_update_activity
activity.logger.info("***Starting perform_update_activity***")
Run the code
- Restart the worker and the client
- Check the worker logs, note that the activity is retried after the timeout (+ initial_interval + backoff_coefficient * attempt)
- In the UI we can see that the activity execution latency is > 30 seconds (from schedule to close)
Let's see what happen if a worker crashes while executing an activity.
Temporal allows you to send heartbeats from activities, this is useful to react quickly to worker failures and to resume the activity execution quickly.
-
Modify
perform_update_activity
:- Remove the
await asyncio.sleep
added in 3.1 - Add a loop to iterate 9 times and send a heartbeat every second.
- Add the logic to continue the activity execution from the last heartbeat if the activity is retried.
- Remove the
-
Modify the activity options to add heartbeat timeout to 2 seconds.
heartbeat_timeout=timedelta(seconds=2)
Run the code
- Run the client to start the workflow
- Run worker and restart it after the first heartbeat.
- the script sh_restart_worker.sh might be useful.
- The activity will be re-scheduled after 2 seconds (heartbeat timeout) and start running again once the worker is up.
- Check
lastfailure
in the UI.
Temporal allows running multiple activities (or child workflows as we will see later) in parallel, this is useful to reduce the execution time of the workflow when there are multiple independent tasks to be executed.
You can use the file system_patch_workflow_v4.py as starting point:
- Modify the workflow to process target clusters (search #4. For each
targetCluster
) and host (methods_process_pilot_hosts
and_process_remaining_hosts
) in parallel, useasyncio.gather
to run the branches in parallel.
Run the code
- Restart the worker and the client
- Check the UI, note that the workflow execution time is reduced compared to the previous execution.
Click the gear icon ⚙️ in the bottom-right corner of the UI to add or remove columns
Child workflows are feature that allows you to spawn a workflow from another workflow. There are several use cases for this feature, for example to represent each resource as a workflow (like a cluster or hosts).
You can use the file system_patch_workflow_v5.py as starting point
Modify the workflow to spawn a child workflow for each target cluster:
- Refactor the method
SystemPatchWorkflow_V5._process_target_cluster
to spawn a child workflow for each target cluster.
Run the code
-
Restart the worker and the client
-
Check the UI, tab Relationships to check the workflow execution tree.
-
Additionally, in the new child workflow code, you can refactor the method
_process_host
and move the implementation to a new child workflow (as we did with clusters)
The new requirement is that if updating one of the hosts it fails, we want to cancel the update of the other hosts in the same cluster.
In Temporal "Cancellation is done using asyncio task cancellation".
You can use the file system_patch_workflow_v6.py as starting point:
We will make first one of the child workflows fail, and then add the logic to the workflow to cancel the other child workflows.
- Modify one of the method that performs operations in the host to make it fail (e.g.
check_host_preconditions_activity
) , there are a couple of options:- add a
await asyncio.sleep(t)
(where t > start_to_close) to make it timeout, after 3 retries the activity will be marked as failed and the error will be propagated to the caller, being the caller the child workflow SystemPatchWorkflow_Host_V7 in this case. - From the activity throw a non-retryable error, this will make the activity fail immediately.
await asyncio.sleep(3) raise ApplicationError("-", non_retryable=True)
- add a
- Modify the method
SystemPatchWorkflow_Cluster_V7._process_hosts
to capture the exception (ChildWorkflowFailure) and cancel the other child workflows.
Run the code
- Restart the worker and the client
- Check the UI, one of the child workflows should fail and the other should be cancelled.
Workers are the piece of software that executes our code in form or workflow and activity task. To do so they open long poll connections (gRPC) to the server to poll tasks.
There are two main components in the worker:
- pollers: Pollers are responsible for polling the server for tasks.
- executors: Executors are responsible for executing the tasks.
Another important part of the worker is the worker cache. Workers will store in cache workflow executions. Temporal uses sticky executions to dispatch task to the worker that has the execution in cache to prevent the full replay of the workflow for each workflow task.
Observability is key to understand how our workers are performing and if they are overloaded,
For this exersice we have integrated prometheus a grafana with the client and workers to explore the SDK metrics
This exercise demonstrates how the worker configuration can impact end-to-end workflow latency.
-
To start the prometheus and grafana containers run the script start-grafana_prometheus.sh
- Open the grafana UI in http://localhost:3000, there is already a preloaded dashboard with the metrics.
-
We have modified the worker (temporal_worker_v8.py) to throttle the number of concurrent activity tasks, check the configuration.
-
The client (temporal_client_v8.py) is configured to start 5 workflows.
-
After running the client and the worker check the grafana dashboard and the UI, note that the schedule to start latency is high for activities.
- Play with the worker configuration and see how it impacts the workflow execution latencies.
-
Local activities:
-
Workflow versioning: Renaming the workflow type is one of the approaches you can take to add changes to an existing implementation, but not the only one:
-
Activity Timeouts:
-
Workflow - Cancellation vs Termination:
-
Testing
-
Workflow Execution Limits
-
Worker performance