- 
                Notifications
    You must be signed in to change notification settings 
- Fork 89
Expense example #201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
          
     Open
      
        
      
            jssmith
  wants to merge
  18
  commits into
  main
  
    
      
        
          
  
    
      Choose a base branch
      
     
    
      
        
      
      
        
          
          
        
        
          
            
              
              
              
  
           
        
        
          
            
              
              
           
        
       
     
  
        
          
            
          
            
          
        
       
    
      
from
expense-example
  
      
      
   
  
    
  
  
  
 
  
      
    base: main
Could not load branches
            
              
  
    Branch not found: {{ refName }}
  
            
                
      Loading
              
            Could not load tags
            
            
              Nothing to show
            
              
  
            
                
      Loading
              
            Are you sure you want to change the base?
            Some commits from the old base branch may be removed from the timeline,
            and old review comments may become outdated.
          
          
  
     Open
                    Expense example #201
Changes from all commits
      Commits
    
    
            Show all changes
          
          
            18 commits
          
        
        Select commit
          Hold shift + click to select a range
      
      437f80e
              
                ported expense report test from Go
              
              
                jssmith 089127c
              
                cleanup
              
              
                jssmith a822d57
              
                move specification documents
              
              
                jssmith 392c9b8
              
                fix UI specification
              
              
                jssmith b008ff8
              
                testing cleanup
              
              
                jssmith 3a6a45b
              
                test reorg
              
              
                jssmith d939798
              
                cleanup
              
              
                jssmith af1d0e8
              
                add top-level readme
              
              
                jssmith 3f950d8
              
                Merge remote-tracking branch 'origin/main' into expense-example
              
              
                jssmith 8dbb91e
              
                remove expense from pyproject defaults
              
              
                jssmith 2f5d5d3
              
                change exception logging
              
              
                jssmith 231a8de
              
                switch from async activity completion to signals
              
              
                jssmith ff80141
              
                ui cleanup
              
              
                jssmith 71c11b3
              
                add expense group to ci
              
              
                jssmith 4bf6493
              
                lint fixes
              
              
                jssmith 8a0229a
              
                remove unicode for windows compatibility
              
              
                jssmith f15ec1c
              
                fixing ci
              
              
                jssmith a1297b1
              
                test fix
              
              
                jssmith File filter
Filter by extension
Conversations
          Failed to load comments.   
        
        
          
      Loading
        
  Jump to
        
          Jump to file
        
      
      
          Failed to load files.   
        
        
          
      Loading
        
  Diff view
Diff view
There are no files selected for viewing
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| # Expense | ||
|  | ||
| This sample workflow processes an expense request. It demonstrates human-in-the loop processing using Temporal's signal mechanism. | ||
|  | ||
| ## Overview | ||
|  | ||
| This sample demonstrates the following workflow: | ||
|  | ||
| 1. **Create Expense**: The workflow executes the `create_expense_activity` to initialize a new expense report in the external system. | ||
|  | ||
| 2. **Register for Decision**: The workflow calls `register_for_decision_activity`, which registers the workflow with the external UI system so it can receive signals when decisions are made. | ||
|  | ||
| 3. **Wait for Signal**: The workflow uses `workflow.wait_condition()` to wait for an external signal containing the approval/rejection decision. | ||
|  | ||
| 4. **Signal-Based Completion**: When a human approves or rejects the expense, the external UI system sends a signal to the workflow using `workflow_handle.signal()`, providing the decision result. | ||
|  | ||
| 5. **Process Payment**: Once the workflow receives the approval decision via signal, it executes the `payment_activity` to complete the simulated expense processing. | ||
|  | ||
| This pattern enables human-in-the-loop workflows where workflows can wait as long as necessary for external decisions using Temporal's durable signal mechanism. | ||
|  | ||
| ## Steps To Run Sample | ||
|  | ||
| * You need a Temporal service running. See the main [README.md](../README.md) for more details. | ||
| * Start the sample expense system UI: | ||
| ```bash | ||
| uv run -m expense.ui | ||
| ``` | ||
| * Start workflow and activity workers: | ||
| ```bash | ||
| uv run -m expense.worker | ||
| ``` | ||
| * Start expense workflow execution: | ||
| ```bash | ||
| # Start workflow and return immediately (default) | ||
| uv run -m expense.starter | ||
|  | ||
| # Start workflow and wait for completion | ||
| uv run -m expense.starter --wait | ||
|  | ||
| # Start workflow with custom expense ID | ||
| uv run -m expense.starter --expense-id "my-expense-123" | ||
|  | ||
| # Start workflow with custom ID and wait for completion | ||
| uv run -m expense.starter --wait --expense-id "my-expense-123" | ||
| ``` | ||
| * When you see the console print out that the expense is created, go to [localhost:8099/list](http://localhost:8099/list) to approve the expense. | ||
| * You should see the workflow complete after you approve the expense. You can also reject the expense. | ||
|  | ||
| ## Running Tests | ||
|  | ||
| ```bash | ||
| # Run all expense tests | ||
| uv run -m pytest tests/expense/ -v | ||
|  | ||
| # Run specific test categories | ||
| uv run -m pytest tests/expense/test_expense_workflow.py -v # Workflow tests | ||
| uv run -m pytest tests/expense/test_expense_activities.py -v # Activity tests | ||
| uv run -m pytest tests/expense/test_expense_integration.py -v # Integration tests | ||
| uv run -m pytest tests/expense/test_ui.py -v # UI tests | ||
|  | ||
| # Run a specific test | ||
| uv run -m pytest tests/expense/test_expense_workflow.py::TestWorkflowPaths::test_workflow_approved_complete_flow -v | ||
| ``` | ||
|  | ||
| ## Key Concepts Demonstrated | ||
|  | ||
| * **Human-in-the-Loop Workflows**: Long-running workflows that wait for human interaction | ||
| * **Workflow Signals**: Using `workflow.signal()` and `workflow.wait_condition()` for external communication | ||
| * **Signal-Based Completion**: External systems sending signals to workflows for asynchronous decision-making | ||
| * **External System Integration**: Communication between workflows and external systems via web services and signals | ||
| * **HTTP Client Lifecycle Management**: Proper resource management with worker-scoped HTTP clients | ||
|  | ||
| ## Troubleshooting | ||
|  | ||
| If you see the workflow failed, the cause may be a port conflict. You can try to change to a different port number in `__init__.py`. Then rerun everything. | ||
|  | ||
| ## Files | ||
|  | ||
| * `workflow.py` - The main expense processing workflow with signal handling | ||
| * `activities.py` - Three activities: create expense, register for decision, process payment | ||
| * `ui.py` - A demonstration expense approval system web UI with signal sending | ||
| * `worker.py` - Worker to run workflows and activities with HTTP client lifecycle management | ||
| * `starter.py` - Client to start workflow executions with optional completion waiting | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| EXPENSE_SERVER_HOST = "localhost" | ||
| EXPENSE_SERVER_PORT = 8099 | ||
| EXPENSE_SERVER_HOST_PORT = f"http://{EXPENSE_SERVER_HOST}:{EXPENSE_SERVER_PORT}" | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| from typing import Optional | ||
|  | ||
| import httpx | ||
| from temporalio import activity | ||
| from temporalio.exceptions import ApplicationError | ||
|  | ||
| from expense import EXPENSE_SERVER_HOST_PORT | ||
|  | ||
| # Module-level HTTP client, managed by worker lifecycle | ||
| _http_client: Optional[httpx.AsyncClient] = None | ||
|  | ||
|  | ||
| async def initialize_http_client() -> None: | ||
| """Initialize the global HTTP client. Called by worker setup.""" | ||
| global _http_client | ||
| if _http_client is None: | ||
| _http_client = httpx.AsyncClient() | ||
|  | ||
|  | ||
| async def cleanup_http_client() -> None: | ||
| """Cleanup the global HTTP client. Called by worker shutdown.""" | ||
| global _http_client | ||
| if _http_client is not None: | ||
| await _http_client.aclose() | ||
| _http_client = None | ||
|  | ||
|  | ||
| def get_http_client() -> httpx.AsyncClient: | ||
| """Get the global HTTP client.""" | ||
| if _http_client is None: | ||
| raise RuntimeError( | ||
| "HTTP client not initialized. Call initialize_http_client() first." | ||
| ) | ||
| return _http_client | ||
|  | ||
|  | ||
| @activity.defn | ||
| async def create_expense_activity(expense_id: str) -> None: | ||
| if not expense_id: | ||
| raise ValueError("expense id is empty") | ||
|  | ||
| client = get_http_client() | ||
| try: | ||
| response = await client.get( | ||
| f"{EXPENSE_SERVER_HOST_PORT}/create", | ||
| params={"is_api_call": "true", "id": expense_id}, | ||
| ) | ||
| response.raise_for_status() | ||
| except httpx.HTTPStatusError as e: | ||
| if 400 <= e.response.status_code < 500: | ||
| raise ApplicationError( | ||
| f"Client error: {e.response.status_code} {e.response.text}", | ||
| non_retryable=True, | ||
| ) from e | ||
| raise | ||
|  | ||
| body = response.text | ||
|  | ||
| if body == "SUCCEED": | ||
| activity.logger.info(f"Expense created. ExpenseID: {expense_id}") | ||
| return | ||
|  | ||
| raise Exception(body) | ||
|  | ||
|  | ||
| @activity.defn | ||
| async def register_for_decision_activity(expense_id: str) -> None: | ||
| """ | ||
| Register the expense for decision. This activity registers the workflow | ||
| with the external system so it can receive signals when decisions are made. | ||
| """ | ||
| if not expense_id: | ||
| raise ValueError("expense id is empty") | ||
|  | ||
| logger = activity.logger | ||
| http_client = get_http_client() | ||
|  | ||
| # Get workflow info to register with the UI system | ||
| activity_info = activity.info() | ||
| workflow_id = activity_info.workflow_id | ||
|  | ||
| # Register the workflow ID with the UI system so it can send signals | ||
| try: | ||
| response = await http_client.post( | ||
| f"{EXPENSE_SERVER_HOST_PORT}/registerWorkflow", | ||
| params={"id": expense_id}, | ||
| data={"workflow_id": workflow_id}, | ||
| ) | ||
| response.raise_for_status() | ||
| logger.info(f"Registered expense for decision. ExpenseID: {expense_id}") | ||
| except Exception as e: | ||
| logger.error(f"Failed to register workflow with UI system: {e}") | ||
| raise | ||
|  | ||
|  | ||
| @activity.defn | ||
| async def payment_activity(expense_id: str) -> None: | ||
| if not expense_id: | ||
| raise ValueError("expense id is empty") | ||
|  | ||
| client = get_http_client() | ||
| try: | ||
| response = await client.post( | ||
| f"{EXPENSE_SERVER_HOST_PORT}/action", | ||
| data={"is_api_call": "true", "type": "payment", "id": expense_id}, | ||
| ) | ||
| response.raise_for_status() | ||
| except httpx.HTTPStatusError as e: | ||
| if 400 <= e.response.status_code < 500: | ||
| raise ApplicationError( | ||
| f"Client error: {e.response.status_code} {e.response.text}", | ||
| non_retryable=True, | ||
| ) from e | ||
| raise | ||
|  | ||
| body = response.text | ||
|  | ||
| if body == "SUCCEED": | ||
| activity.logger.info(f"payment_activity succeed ExpenseID: {expense_id}") | ||
| return | ||
|  | ||
| raise Exception(body) | ||
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
              | Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| import argparse | ||
| import asyncio | ||
| import uuid | ||
|  | ||
| from temporalio.client import Client | ||
|  | ||
| from .workflow import SampleExpenseWorkflow | ||
|  | ||
|  | ||
| async def main(): | ||
| parser = argparse.ArgumentParser(description="Start an expense workflow") | ||
| parser.add_argument( | ||
| "--wait", | ||
| action="store_true", | ||
| help="Wait for workflow completion (default: start and return immediately)", | ||
| ) | ||
| parser.add_argument( | ||
| "--expense-id", | ||
| type=str, | ||
| help="Expense ID to use (default: generate random UUID)", | ||
| ) | ||
| args = parser.parse_args() | ||
|  | ||
| # The client is a heavyweight object that should be created once per process. | ||
| client = await Client.connect("localhost:7233") | ||
|  | ||
| expense_id = args.expense_id or str(uuid.uuid4()) | ||
| workflow_id = f"expense_{expense_id}" | ||
|  | ||
| # Start the workflow | ||
| handle = await client.start_workflow( | ||
| SampleExpenseWorkflow.run, | ||
| expense_id, | ||
| id=workflow_id, | ||
| task_queue="expense", | ||
| ) | ||
|  | ||
| print(f"Started workflow WorkflowID {handle.id} RunID {handle.result_run_id}") | ||
| print(f"Workflow will register itself with UI system for expense {expense_id}") | ||
|  | ||
| if args.wait: | ||
| print("Waiting for workflow to complete...") | ||
| result = await handle.result() | ||
| print(f"Workflow completed with result: {result}") | ||
| return result | ||
| else: | ||
| print("Workflow started. Use --wait flag to wait for completion.") | ||
| return None | ||
|  | ||
|  | ||
| if __name__ == "__main__": | ||
| asyncio.run(main()) | 
      
      Oops, something went wrong.
        
    
  
      
      Oops, something went wrong.
        
    
  
  Add this suggestion to a batch that can be applied as a single commit.
  This suggestion is invalid because no changes were made to the code.
  Suggestions cannot be applied while the pull request is closed.
  Suggestions cannot be applied while viewing a subset of changes.
  Only one suggestion per line can be applied in a batch.
  Add this suggestion to a batch that can be applied as a single commit.
  Applying suggestions on deleted lines is not supported.
  You must change the existing code in this line in order to create a valid suggestion.
  Outdated suggestions cannot be applied.
  This suggestion has been applied or marked resolved.
  Suggestions cannot be applied from pending reviews.
  Suggestions cannot be applied on multi-line comments.
  Suggestions cannot be applied while the pull request is queued to merge.
  Suggestion cannot be applied right now. Please check back later.
  
    
  
    
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically 4xx is probably a
ApplicationErrorwithnon_retryable=True, but that's a bit pedantic. But with this setup, an activity that, say, has invalid auth will retry forever