Skip to content

antmendoza/temporal-python-workshop

Repository files navigation

System Patch Workflow

The file System_Patching.md provides an overview of the functionality that will be implemented in this workshop.

This file contains the basic implementation as a set of activities to be executed in the workflow.

For the sake of simplicity, some activity executions are skipped. There is a comment in the code that indicates it.

Open the file and review the implementation.

Every step defined in the file System_Patching.md is defined as an activity, except the step 2 (SendApprovalRequestActivity and wait for approval) which is missing and will be added as part of the first exersice.

Project structure

  • The folder workflow contains the implementation of the workflow/s and the activities.
  • temporal_client.py is responsible for sending the request to start the workflow and interact with it (e.g. send signals).
  • temporal_worker.py is responsible for running the workflow and activities, this is our application.
  • activities.py contains the activity implementations
  • types.py workflow and activity input/output types.

Everytime you start working on a new exersice (system_patch_workflow_vx.py) you will need to:

  • stop the worker temporal_worker.py and register the new workflow implementation which corresponds to function name. (e.g. SystemPatchWorkflow_V2->SystemPatchWorkflow_V3).
  • change the workflow type in the temporal_client.py to send the request for the new workflow type (same as above).

You won't have to do it if you are just modifying your own implementation.

Workshop

Each exercise’s final implementation is in the next version of the file (v1, v2, and so on).

Exersice 0 - Environment setup (~10 min)

Make sure you can run the workflow system_patch_workflow_v1.py

Exersice 1 - Message passing (Signal/Update) (~ 15 min)

During this exercise, you will implement the step 2 : "Execute SendApprovalRequestActivity. If not approved, terminate."

You can use the file system_patch_workflow_v1.py as starting point:

Run the code Restart the worker and the client, the workflow should complete after several seconds, go to the UI and check the progress.

Exersice 1.1 Inspect the workflow history

Spend some time inspecting the workflow history in the UI.

Note that the implementation of some activities is designed to fail during the first retry (ej. check_service_health_activity).

activities.py

@activity.defn
async def check_service_health_activity(self, activity_input: CheckServiceHealthActivityInput) -> bool:
    await self.simulate_activity_execution()

    # Simulate a health check failure
    if activity_input.hostname == "cluster1_host1" and activity.info().attempt == 1:
        raise Exception("Service health check failed: " + str(activity_input))

    return True

Open the workflow history in the UI, click on its ActivityTaskStarted and ActivityTaskComplete and check the information in the fields Attemps and LastFailure.

activity_last_failure

Exersice 2 - Workflow query (~ 10 min)

Imagine you need to check periodically the steps that has been completed for each execution.

Workflow queries allow us to retrieve state from a Workflow Execution.

You can use the file system_patch_workflow_v2.py as starting point:

Run the code Restart the worker and the client, the workflow should complete after several seconds, go to the UI and check the progress and the logs of the client should show the progress.

Queries should only return in-memory data and do not modify the workflow history (e.g. don't run an activity from a query method)

Exersice 3 - Activity Timeouts and heartbeat (~ 15 min)

Temporal allows you to set timeouts to activities and also to send heartbeats from them, this is useful to react quickly to worker failures and to resume the activity execution from when it was interrupted.

There are different types of activity timeouts. For this exercise, let's focus on start_to_close timeout and heartbeats.

Some activities can be long-running, let's use perform_update_activity for this exersice.

You can use the file system_patch_workflow_v3.py as starting point.

Exersice 3.1 Start to close timeout

  • Review the perform_update_activity activity options, where the method perform_update_activity is called, note that start_to_close_timeout is set to 10 seconds and maximum_attempts=3
  • Modify the activity implementation to add a await asyncio.sleep > 10 seconds.

Run the code

  • Restart the worker and the client
  • Open the workflow execution in the UI and go to the pending activities tab.
    • Note that the activity is retried after the timeout.
      • You can modify the default retry policy and set initial_interval, backoff_coefficient etc.

retry.png

  • After 3 retries the activity fails with ActivityTaskTimedOut

activity_timeout.png

  • Add a log to the start of the activity perform_update_activity
    • activity.logger.info("***Starting perform_update_activity***")

Run the code

  • Restart the worker and the client
  • Check the worker logs, note that the activity is retried after the timeout (+ initial_interval + backoff_coefficient * attempt)

activity_timeout_2.png

  • In the UI we can see that the activity execution latency is > 30 seconds (from schedule to close)

activity_execution_latency_1.png

Let's see what happen if a worker crashes while executing an activity.

Exersice 3.2 Heartbeat

Temporal allows you to send heartbeats from activities, this is useful to react quickly to worker failures and to resume the activity execution quickly.

Add heartbeat to detect worker failures

Run the code

  • Run the client to start the workflow
  • Run worker and restart it after the first heartbeat.
  • The activity will be re-scheduled after 2 seconds (heartbeat timeout) and start running again once the worker is up.

heartbeat.png

  • Check lastfailure in the UI.

activity_last_failure_hb.png

Exersice 4 - Concurrency (~ 10 min)

Temporal allows running multiple activities (or child workflows as we will see later) in parallel, this is useful to reduce the execution time of the workflow when there are multiple independent tasks to be executed.

You can use the file system_patch_workflow_v4.py as starting point:

Run the code

  • Restart the worker and the client
  • Check the UI, note that the workflow execution time is reduced compared to the previous execution.

workflow_execution_latency.png

Click the gear icon ⚙️ in the bottom-right corner of the UI to add or remove columns

Exersice 5 - Child workflows (~ 20 min)

Child workflows are feature that allows you to spawn a workflow from another workflow. There are several use cases for this feature, for example to represent each resource as a workflow (like a cluster or hosts).

You can use the file system_patch_workflow_v5.py as starting point

Modify the workflow to spawn a child workflow for each target cluster:

Run the code

  • Restart the worker and the client

  • Check the UI, tab Relationships to check the workflow execution tree.

    execution_tree.png

  • Additionally, in the new child workflow code, you can refactor the method _process_host and move the implementation to a new child workflow (as we did with clusters)

Exersice 6 - Cancellation and cancellation scope (~ 15 min)

The new requirement is that if updating one of the hosts it fails, we want to cancel the update of the other hosts in the same cluster.

In Temporal "Cancellation is done using asyncio task cancellation".

You can use the file system_patch_workflow_v6.py as starting point:

We will make first one of the child workflows fail, and then add the logic to the workflow to cancel the other child workflows.

  • Modify one of the method that performs operations in the host to make it fail (e.g. check_host_preconditions_activity) , there are a couple of options:
    • add a await asyncio.sleep(t) (where t > start_to_close) to make it timeout, after 3 retries the activity will be marked as failed and the error will be propagated to the caller, being the caller the child workflow SystemPatchWorkflow_Host_V7 in this case.
    • From the activity throw a non-retryable error, this will make the activity fail immediately.
    await asyncio.sleep(3)
    raise ApplicationError("-", non_retryable=True)
    
  • Modify the method SystemPatchWorkflow_Cluster_V7._process_hosts to capture the exception (ChildWorkflowFailure) and cancel the other child workflows.

Run the code

  • Restart the worker and the client
  • Check the UI, one of the child workflows should fail and the other should be cancelled.

cancel_child.png

Exersice 7 - Worker configuration

Workers are the piece of software that executes our code in form or workflow and activity task. To do so they open long poll connections (gRPC) to the server to poll tasks.

There are two main components in the worker:

  • pollers: Pollers are responsible for polling the server for tasks.
  • executors: Executors are responsible for executing the tasks.

Another important part of the worker is the worker cache. Workers will store in cache workflow executions. Temporal uses sticky executions to dispatch task to the worker that has the execution in cache to prevent the full replay of the workflow for each workflow task.

Observability is key to understand how our workers are performing and if they are overloaded,

For this exersice we have integrated prometheus a grafana with the client and workers to explore the SDK metrics

This exercise demonstrates how the worker configuration can impact end-to-end workflow latency.

  • To start the prometheus and grafana containers run the script start-grafana_prometheus.sh

  • We have modified the worker (temporal_worker_v8.py) to throttle the number of concurrent activity tasks, check the configuration.

  • The client (temporal_client_v8.py) is configured to start 5 workflows.

  • After running the client and the worker check the grafana dashboard and the UI, note that the schedule to start latency is high for activities.

schedule_to_start_latency.png

  • Play with the worker configuration and see how it impacts the workflow execution latencies.

Further reading:

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published