diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index bfb1353e..a17f284e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -31,7 +31,7 @@ jobs:
with:
python-version: ${{ matrix.python }}
- run: uv tool install poethepoet
- - run: uv sync --group=dsl --group=encryption --group=trio-async
+ - run: uv sync --group=dsl --group=encryption --group=trio-async --group=expense
- run: poe lint
- run: mkdir junit-xml
- run: poe test -s --junit-xml=junit-xml/${{ matrix.python }}--${{ matrix.os }}.xml
diff --git a/README.md b/README.md
index 8bd9986a..b2a06548 100644
--- a/README.md
+++ b/README.md
@@ -65,6 +65,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [custom_metric](custom_metric) - Custom metric to record the workflow type in the activity schedule to start latency.
* [dsl](dsl) - DSL workflow that executes steps defined in a YAML file.
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
+* [expense](expense) - Human-in-the-loop processing and asynchronous activity completion.
* [gevent_async](gevent_async) - Combine gevent and Temporal.
* [langchain](langchain) - Orchestrate workflows for LangChain.
* [message_passing/introduction](message_passing/introduction/) - Introduction to queries, signals, and updates.
diff --git a/expense/README.md b/expense/README.md
new file mode 100644
index 00000000..8c15f133
--- /dev/null
+++ b/expense/README.md
@@ -0,0 +1,83 @@
+# Expense
+
+This sample workflow processes an expense request. It demonstrates human-in-the loop processing using Temporal's signal mechanism.
+
+## Overview
+
+This sample demonstrates the following workflow:
+
+1. **Create Expense**: The workflow executes the `create_expense_activity` to initialize a new expense report in the external system.
+
+2. **Register for Decision**: The workflow calls `register_for_decision_activity`, which registers the workflow with the external UI system so it can receive signals when decisions are made.
+
+3. **Wait for Signal**: The workflow uses `workflow.wait_condition()` to wait for an external signal containing the approval/rejection decision.
+
+4. **Signal-Based Completion**: When a human approves or rejects the expense, the external UI system sends a signal to the workflow using `workflow_handle.signal()`, providing the decision result.
+
+5. **Process Payment**: Once the workflow receives the approval decision via signal, it executes the `payment_activity` to complete the simulated expense processing.
+
+This pattern enables human-in-the-loop workflows where workflows can wait as long as necessary for external decisions using Temporal's durable signal mechanism.
+
+## Steps To Run Sample
+
+* You need a Temporal service running. See the main [README.md](../README.md) for more details.
+* Start the sample expense system UI:
+ ```bash
+ uv run -m expense.ui
+ ```
+* Start workflow and activity workers:
+ ```bash
+ uv run -m expense.worker
+ ```
+* Start expense workflow execution:
+ ```bash
+ # Start workflow and return immediately (default)
+ uv run -m expense.starter
+
+ # Start workflow and wait for completion
+ uv run -m expense.starter --wait
+
+ # Start workflow with custom expense ID
+ uv run -m expense.starter --expense-id "my-expense-123"
+
+ # Start workflow with custom ID and wait for completion
+ uv run -m expense.starter --wait --expense-id "my-expense-123"
+ ```
+* When you see the console print out that the expense is created, go to [localhost:8099/list](http://localhost:8099/list) to approve the expense.
+* You should see the workflow complete after you approve the expense. You can also reject the expense.
+
+## Running Tests
+
+```bash
+# Run all expense tests
+uv run -m pytest tests/expense/ -v
+
+# Run specific test categories
+uv run -m pytest tests/expense/test_expense_workflow.py -v # Workflow tests
+uv run -m pytest tests/expense/test_expense_activities.py -v # Activity tests
+uv run -m pytest tests/expense/test_expense_integration.py -v # Integration tests
+uv run -m pytest tests/expense/test_ui.py -v # UI tests
+
+# Run a specific test
+uv run -m pytest tests/expense/test_expense_workflow.py::TestWorkflowPaths::test_workflow_approved_complete_flow -v
+```
+
+## Key Concepts Demonstrated
+
+* **Human-in-the-Loop Workflows**: Long-running workflows that wait for human interaction
+* **Workflow Signals**: Using `workflow.signal()` and `workflow.wait_condition()` for external communication
+* **Signal-Based Completion**: External systems sending signals to workflows for asynchronous decision-making
+* **External System Integration**: Communication between workflows and external systems via web services and signals
+* **HTTP Client Lifecycle Management**: Proper resource management with worker-scoped HTTP clients
+
+## Troubleshooting
+
+If you see the workflow failed, the cause may be a port conflict. You can try to change to a different port number in `__init__.py`. Then rerun everything.
+
+## Files
+
+* `workflow.py` - The main expense processing workflow with signal handling
+* `activities.py` - Three activities: create expense, register for decision, process payment
+* `ui.py` - A demonstration expense approval system web UI with signal sending
+* `worker.py` - Worker to run workflows and activities with HTTP client lifecycle management
+* `starter.py` - Client to start workflow executions with optional completion waiting
\ No newline at end of file
diff --git a/expense/__init__.py b/expense/__init__.py
new file mode 100644
index 00000000..c87f215d
--- /dev/null
+++ b/expense/__init__.py
@@ -0,0 +1,3 @@
+EXPENSE_SERVER_HOST = "localhost"
+EXPENSE_SERVER_PORT = 8099
+EXPENSE_SERVER_HOST_PORT = f"http://{EXPENSE_SERVER_HOST}:{EXPENSE_SERVER_PORT}"
diff --git a/expense/activities.py b/expense/activities.py
new file mode 100644
index 00000000..0a401605
--- /dev/null
+++ b/expense/activities.py
@@ -0,0 +1,122 @@
+from typing import Optional
+
+import httpx
+from temporalio import activity
+from temporalio.exceptions import ApplicationError
+
+from expense import EXPENSE_SERVER_HOST_PORT
+
+# Module-level HTTP client, managed by worker lifecycle
+_http_client: Optional[httpx.AsyncClient] = None
+
+
+async def initialize_http_client() -> None:
+ """Initialize the global HTTP client. Called by worker setup."""
+ global _http_client
+ if _http_client is None:
+ _http_client = httpx.AsyncClient()
+
+
+async def cleanup_http_client() -> None:
+ """Cleanup the global HTTP client. Called by worker shutdown."""
+ global _http_client
+ if _http_client is not None:
+ await _http_client.aclose()
+ _http_client = None
+
+
+def get_http_client() -> httpx.AsyncClient:
+ """Get the global HTTP client."""
+ if _http_client is None:
+ raise RuntimeError(
+ "HTTP client not initialized. Call initialize_http_client() first."
+ )
+ return _http_client
+
+
+@activity.defn
+async def create_expense_activity(expense_id: str) -> None:
+ if not expense_id:
+ raise ValueError("expense id is empty")
+
+ client = get_http_client()
+ try:
+ response = await client.get(
+ f"{EXPENSE_SERVER_HOST_PORT}/create",
+ params={"is_api_call": "true", "id": expense_id},
+ )
+ response.raise_for_status()
+ except httpx.HTTPStatusError as e:
+ if 400 <= e.response.status_code < 500:
+ raise ApplicationError(
+ f"Client error: {e.response.status_code} {e.response.text}",
+ non_retryable=True,
+ ) from e
+ raise
+
+ body = response.text
+
+ if body == "SUCCEED":
+ activity.logger.info(f"Expense created. ExpenseID: {expense_id}")
+ return
+
+ raise Exception(body)
+
+
+@activity.defn
+async def register_for_decision_activity(expense_id: str) -> None:
+ """
+ Register the expense for decision. This activity registers the workflow
+ with the external system so it can receive signals when decisions are made.
+ """
+ if not expense_id:
+ raise ValueError("expense id is empty")
+
+ logger = activity.logger
+ http_client = get_http_client()
+
+ # Get workflow info to register with the UI system
+ activity_info = activity.info()
+ workflow_id = activity_info.workflow_id
+
+ # Register the workflow ID with the UI system so it can send signals
+ try:
+ response = await http_client.post(
+ f"{EXPENSE_SERVER_HOST_PORT}/registerWorkflow",
+ params={"id": expense_id},
+ data={"workflow_id": workflow_id},
+ )
+ response.raise_for_status()
+ logger.info(f"Registered expense for decision. ExpenseID: {expense_id}")
+ except Exception as e:
+ logger.error(f"Failed to register workflow with UI system: {e}")
+ raise
+
+
+@activity.defn
+async def payment_activity(expense_id: str) -> None:
+ if not expense_id:
+ raise ValueError("expense id is empty")
+
+ client = get_http_client()
+ try:
+ response = await client.post(
+ f"{EXPENSE_SERVER_HOST_PORT}/action",
+ data={"is_api_call": "true", "type": "payment", "id": expense_id},
+ )
+ response.raise_for_status()
+ except httpx.HTTPStatusError as e:
+ if 400 <= e.response.status_code < 500:
+ raise ApplicationError(
+ f"Client error: {e.response.status_code} {e.response.text}",
+ non_retryable=True,
+ ) from e
+ raise
+
+ body = response.text
+
+ if body == "SUCCEED":
+ activity.logger.info(f"payment_activity succeed ExpenseID: {expense_id}")
+ return
+
+ raise Exception(body)
diff --git a/expense/starter.py b/expense/starter.py
new file mode 100644
index 00000000..15434323
--- /dev/null
+++ b/expense/starter.py
@@ -0,0 +1,52 @@
+import argparse
+import asyncio
+import uuid
+
+from temporalio.client import Client
+
+from .workflow import SampleExpenseWorkflow
+
+
+async def main():
+ parser = argparse.ArgumentParser(description="Start an expense workflow")
+ parser.add_argument(
+ "--wait",
+ action="store_true",
+ help="Wait for workflow completion (default: start and return immediately)",
+ )
+ parser.add_argument(
+ "--expense-id",
+ type=str,
+ help="Expense ID to use (default: generate random UUID)",
+ )
+ args = parser.parse_args()
+
+ # The client is a heavyweight object that should be created once per process.
+ client = await Client.connect("localhost:7233")
+
+ expense_id = args.expense_id or str(uuid.uuid4())
+ workflow_id = f"expense_{expense_id}"
+
+ # Start the workflow
+ handle = await client.start_workflow(
+ SampleExpenseWorkflow.run,
+ expense_id,
+ id=workflow_id,
+ task_queue="expense",
+ )
+
+ print(f"Started workflow WorkflowID {handle.id} RunID {handle.result_run_id}")
+ print(f"Workflow will register itself with UI system for expense {expense_id}")
+
+ if args.wait:
+ print("Waiting for workflow to complete...")
+ result = await handle.result()
+ print(f"Workflow completed with result: {result}")
+ return result
+ else:
+ print("Workflow started. Use --wait flag to wait for completion.")
+ return None
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/expense/ui.py b/expense/ui.py
new file mode 100644
index 00000000..866d1847
--- /dev/null
+++ b/expense/ui.py
@@ -0,0 +1,194 @@
+import asyncio
+from enum import Enum
+from typing import Dict, Optional
+
+import uvicorn
+from fastapi import FastAPI, Form, Query
+from fastapi.responses import HTMLResponse, PlainTextResponse, RedirectResponse
+from temporalio.client import Client
+
+from expense import EXPENSE_SERVER_HOST, EXPENSE_SERVER_PORT
+
+
+class ExpenseState(str, Enum):
+ CREATED = "CREATED"
+ APPROVED = "APPROVED"
+ REJECTED = "REJECTED"
+ COMPLETED = "COMPLETED"
+
+
+# Use memory store for this sample expense system
+all_expenses: Dict[str, ExpenseState] = {}
+workflow_map: Dict[str, str] = {} # Maps expense_id to workflow_id
+
+app = FastAPI()
+
+# Global client - will be initialized when starting the server
+workflow_client: Optional[Client] = None
+
+
+@app.get("/", response_class=HTMLResponse)
+@app.get("/list", response_class=HTMLResponse)
+async def list_handler():
+ html = """
+
SAMPLE EXPENSE SYSTEM
+ HOME
+ All expense requests:
+
+ Expense ID Status Action
+ """
+
+ # Sort keys for consistent display
+ for expense_id in sorted(all_expenses.keys()):
+ state = all_expenses[expense_id]
+ action_link = ""
+ if state == ExpenseState.CREATED:
+ action_link = (
+ f'"
+ " "
+ f'"
+ )
+ html += f"{expense_id} {state} {action_link} "
+
+ html += "
"
+ return html
+
+
+@app.post("/action")
+async def action_handler(
+ type: str = Form(...), id: str = Form(...), is_api_call: str = Form("false")
+):
+ if id not in all_expenses:
+ if is_api_call == "true":
+ return PlainTextResponse("ERROR:INVALID_ID")
+ else:
+ return PlainTextResponse("Invalid ID")
+
+ old_state = all_expenses[id]
+
+ if type == "approve":
+ all_expenses[id] = ExpenseState.APPROVED
+ elif type == "reject":
+ all_expenses[id] = ExpenseState.REJECTED
+ elif type == "payment":
+ all_expenses[id] = ExpenseState.COMPLETED
+ else:
+ if is_api_call == "true":
+ return PlainTextResponse("ERROR:INVALID_TYPE")
+ else:
+ return PlainTextResponse("Invalid action type")
+
+ if is_api_call == "true" or type == "payment":
+ # For API calls and payment, just return success
+ if old_state == ExpenseState.CREATED and all_expenses[id] in [
+ ExpenseState.APPROVED,
+ ExpenseState.REJECTED,
+ ]:
+ # Report state change
+ await notify_expense_state_change(id, all_expenses[id])
+
+ print(f"Set state for {id} from {old_state} to {all_expenses[id]}")
+ return PlainTextResponse("SUCCEED")
+ else:
+ # For UI calls, notify and redirect to list
+ if old_state == ExpenseState.CREATED and all_expenses[id] in [
+ ExpenseState.APPROVED,
+ ExpenseState.REJECTED,
+ ]:
+ await notify_expense_state_change(id, all_expenses[id])
+
+ print(f"Set state for {id} from {old_state} to {all_expenses[id]}")
+ return RedirectResponse(url="/list", status_code=303)
+
+
+@app.get("/create")
+async def create_handler(id: str = Query(...), is_api_call: str = Query("false")):
+ if id in all_expenses:
+ if is_api_call == "true":
+ return PlainTextResponse("ERROR:ID_ALREADY_EXISTS")
+ else:
+ return PlainTextResponse("ID already exists")
+
+ all_expenses[id] = ExpenseState.CREATED
+
+ if is_api_call == "true":
+ print(f"Created new expense id: {id}")
+ return PlainTextResponse("SUCCEED")
+ else:
+ print(f"Created new expense id: {id}")
+ return await list_handler()
+
+
+@app.get("/status")
+async def status_handler(id: str = Query(...)):
+ if id not in all_expenses:
+ return PlainTextResponse("ERROR:INVALID_ID")
+
+ state = all_expenses[id]
+ print(f"Checking status for {id}: {state}")
+ return PlainTextResponse(state.value)
+
+
+@app.post("/registerWorkflow")
+async def register_workflow_handler(id: str = Query(...), workflow_id: str = Form(...)):
+ if id not in all_expenses:
+ return PlainTextResponse("ERROR:INVALID_ID")
+
+ curr_state = all_expenses[id]
+ if curr_state != ExpenseState.CREATED:
+ return PlainTextResponse("ERROR:INVALID_STATE")
+
+ print(f"Registered workflow for ID={id}, workflow_id={workflow_id}")
+ workflow_map[id] = workflow_id
+ return PlainTextResponse("SUCCEED")
+
+
+async def notify_expense_state_change(expense_id: str, state: str):
+ if expense_id not in workflow_map:
+ print(f"Invalid id: {expense_id}")
+ return
+
+ if workflow_client is None:
+ print("Workflow client not initialized")
+ return
+
+ workflow_id = workflow_map[expense_id]
+ try:
+ # Send signal to workflow
+ handle = workflow_client.get_workflow_handle(workflow_id)
+ await handle.signal("expense_decision_signal", state)
+ print(
+ f"Successfully sent signal to workflow: {workflow_id} with decision: {state}"
+ )
+ except Exception as err:
+ print(f"Failed to send signal to workflow with error: {err}")
+
+
+async def main():
+ global workflow_client
+
+ # Initialize the workflow client
+ workflow_client = await Client.connect("localhost:7233")
+
+ print(
+ f"Expense system UI available at http://{EXPENSE_SERVER_HOST}:{EXPENSE_SERVER_PORT}"
+ )
+
+ # Start the FastAPI server
+ config = uvicorn.Config(
+ app, host="0.0.0.0", port=EXPENSE_SERVER_PORT, log_level="info"
+ )
+ server = uvicorn.Server(config)
+ await server.serve()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/expense/worker.py b/expense/worker.py
new file mode 100644
index 00000000..20a891a3
--- /dev/null
+++ b/expense/worker.py
@@ -0,0 +1,44 @@
+import asyncio
+
+from temporalio.client import Client
+from temporalio.worker import Worker
+
+from .activities import (
+ cleanup_http_client,
+ create_expense_activity,
+ initialize_http_client,
+ payment_activity,
+ register_for_decision_activity,
+)
+from .workflow import SampleExpenseWorkflow
+
+
+async def main():
+ # The client and worker are heavyweight objects that should be created once per process.
+ client = await Client.connect("localhost:7233")
+
+ # Initialize HTTP client before starting worker
+ await initialize_http_client()
+
+ try:
+ # Run the worker
+ worker = Worker(
+ client,
+ task_queue="expense",
+ workflows=[SampleExpenseWorkflow],
+ activities=[
+ create_expense_activity,
+ register_for_decision_activity,
+ payment_activity,
+ ],
+ )
+
+ print("Worker starting...")
+ await worker.run()
+ finally:
+ # Cleanup HTTP client when worker shuts down
+ await cleanup_http_client()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/expense/workflow.py b/expense/workflow.py
new file mode 100644
index 00000000..e0ba9728
--- /dev/null
+++ b/expense/workflow.py
@@ -0,0 +1,75 @@
+from datetime import timedelta
+
+from temporalio import workflow
+from temporalio.common import RetryPolicy
+
+with workflow.unsafe.imports_passed_through():
+ from expense.activities import (
+ create_expense_activity,
+ payment_activity,
+ register_for_decision_activity,
+ )
+
+
+@workflow.defn
+class SampleExpenseWorkflow:
+ def __init__(self) -> None:
+ self.expense_decision: str = ""
+
+ @workflow.signal
+ async def expense_decision_signal(self, decision: str) -> None:
+ """Signal handler for expense decision."""
+ self.expense_decision = decision
+
+ @workflow.run
+ async def run(self, expense_id: str) -> str:
+ logger = workflow.logger
+
+ # Step 1: Create new expense report
+ try:
+ await workflow.execute_activity(
+ create_expense_activity,
+ expense_id,
+ start_to_close_timeout=timedelta(seconds=10),
+ retry_policy=RetryPolicy(maximum_attempts=3),
+ )
+ except Exception as err:
+ logger.exception(f"Failed to create expense report: {err}")
+ raise
+
+ # Step 2: Register for decision and wait for signal
+ try:
+ await workflow.execute_activity(
+ register_for_decision_activity,
+ expense_id,
+ start_to_close_timeout=timedelta(seconds=10),
+ )
+ except Exception as err:
+ logger.exception(f"Failed to register for decision: {err}")
+ raise
+
+ # Wait for the expense decision signal with a timeout
+ logger.info(f"Waiting for expense decision signal for {expense_id}")
+ await workflow.wait_condition(
+ lambda: self.expense_decision != "", timeout=timedelta(minutes=10)
+ )
+
+ status = self.expense_decision
+ if status != "APPROVED":
+ logger.info(f"Workflow completed. ExpenseStatus: {status}")
+ return ""
+
+ # Step 3: Request payment for the expense
+ try:
+ await workflow.execute_activity(
+ payment_activity,
+ expense_id,
+ start_to_close_timeout=timedelta(seconds=10),
+ retry_policy=RetryPolicy(maximum_attempts=3),
+ )
+ except Exception as err:
+ logger.info(f"Workflow completed with payment failed. Error: {err}")
+ raise
+
+ logger.info("Workflow completed with expense payment completed.")
+ return "COMPLETED"
diff --git a/pyproject.toml b/pyproject.toml
index b82bd912..da7329cc 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -64,6 +64,12 @@ cloud-export-to-parquet = [
"boto3>=1.34.89,<2",
"pyarrow>=19.0.1",
]
+expense = [
+ "fastapi>=0.115.12",
+ "httpx>=0.25.0,<1",
+ "uvicorn[standard]>=0.24.0.post1,<0.25",
+ "python-multipart>=0.0.5",
+]
[tool.uv]
default-groups = [
@@ -94,6 +100,7 @@ packages = [
"custom_metric",
"dsl",
"encryption",
+ "expense",
"gevent_async",
"hello",
"langchain",
@@ -126,7 +133,7 @@ build-backend = "hatchling.build"
[tool.poe.tasks]
format = [{cmd = "uv run black ."}, {cmd = "uv run isort ."}]
lint = [{cmd = "uv run black --check ."}, {cmd = "uv run isort --check-only ."}, {ref = "lint-types" }]
-lint-types = "uv run mypy --check-untyped-defs --namespace-packages ."
+lint-types = "uv run mypy --python-version=3.9 --check-untyped-defs --namespace-packages ."
test = "uv run pytest"
[tool.pytest.ini_options]
@@ -142,6 +149,7 @@ skip_gitignore = true
[tool.mypy]
ignore_missing_imports = true
namespace_packages = true
+exclude = [".venv"]
[[tool.mypy.overrides]]
module = "aiohttp.*"
diff --git a/tests/expense/UI_SPECIFICATION.md b/tests/expense/UI_SPECIFICATION.md
new file mode 100644
index 00000000..81538760
--- /dev/null
+++ b/tests/expense/UI_SPECIFICATION.md
@@ -0,0 +1,152 @@
+# Expense System UI Specification
+
+## Overview
+The Expense System UI is a FastAPI-based web application that provides both a web interface and REST API for managing expense requests. It integrates with Temporal workflows through signal mechanisms.
+
+## System Components
+
+### Data Model
+- **ExpenseState Enum**: Defines expense lifecycle states
+ - `CREATED`: Initial state when expense is first created
+ - `APPROVED`: Expense has been approved for payment
+ - `REJECTED`: Expense has been denied
+ - `COMPLETED`: Payment has been processed
+
+### Storage
+- **all_expenses**: In-memory dictionary mapping expense IDs to their current state
+- **workflow_map**: Maps expense IDs to Temporal workflow IDs for signal sending
+
+## API Endpoints
+
+### Parameter Validation
+All endpoints use FastAPI's automatic parameter validation:
+- Missing required parameters return HTTP 422 (Unprocessable Entity)
+- Invalid parameter types return HTTP 422 (Unprocessable Entity)
+- This validation occurs before endpoint-specific business logic
+
+### 1. Home/List View (`GET /` or `GET /list`)
+**Purpose**: Display all expenses in an HTML table format
+
+**Response**: HTML page containing:
+- Page title "SAMPLE EXPENSE SYSTEM"
+- Navigation link to HOME
+- Table with columns: Expense ID, Status, Action
+- Action buttons for CREATED expenses (APPROVE/REJECT)
+- Sorted expense display by ID
+
+**Business Rules**:
+- Only CREATED expenses show action buttons
+- Expenses are displayed in sorted order by ID
+
+### 2. Action Handler (`POST /action`)
+**Purpose**: Process expense state changes (approve/reject/payment)
+
+**Parameters**:
+- `type` (required): Action type - "approve", "reject", or "payment" (form data)
+- `id` (required): Expense ID (form data)
+- `is_api_call` (optional): "true" for API calls, "false" for UI calls (form data)
+
+**Business Rules**:
+- `approve`: Changes CREATED → APPROVED
+- `reject`: Changes CREATED → REJECTED
+- `payment`: Changes APPROVED → COMPLETED
+- Invalid IDs return HTTP 200 with error message in response body
+- Invalid action types return HTTP 200 with error message in response body
+- State changes from CREATED to APPROVED/REJECTED trigger workflow notifications
+- API calls return "SUCCEED" on success
+- UI calls redirect to list view after success (HTTP 303 redirect)
+
+**Error Handling**:
+- API calls return HTTP 200 with "ERROR:INVALID_ID" or "ERROR:INVALID_TYPE" in response body
+- UI calls return HTTP 200 with descriptive messages like "Invalid ID" or "Invalid action type" in response body
+
+### 3. Create Expense (`GET /create`)
+**Purpose**: Create a new expense entry
+
+**Parameters**:
+- `id` (required): Unique expense ID
+- `is_api_call` (optional): "true" for API calls, "false" for UI calls
+
+**Business Rules**:
+- Expense ID must be unique
+- New expenses start in CREATED state
+- Duplicate IDs return HTTP 200 with error message in response body
+
+**Error Handling**:
+- API calls return HTTP 200 with "ERROR:ID_ALREADY_EXISTS" in response body
+- UI calls return HTTP 200 with descriptive message "ID already exists" in response body
+
+### 4. Status Check (`GET /status`)
+**Purpose**: Retrieve current expense state
+
+**Parameters**:
+- `id` (required): Expense ID
+
+**Response**: Current expense state as string
+**Error Handling**: Returns HTTP 200 with "ERROR:INVALID_ID" in response body for unknown IDs
+
+### 5. Workflow Registration (`POST /registerWorkflow`)
+**Purpose**: Register Temporal workflow ID for expense state change signals
+
+**Parameters**:
+- `id` (query): Expense ID
+- `workflow_id` (form): Temporal workflow ID
+
+**Business Rules**:
+- Expense must exist and be in CREATED state
+- Workflow ID is stored for later signal sending
+- Enables workflow signal notification on state changes
+
+**Error Handling**:
+- HTTP 200 with "ERROR:INVALID_ID" in response body for unknown expenses
+- HTTP 200 with "ERROR:INVALID_STATE" in response body for non-CREATED expenses
+
+## Workflow Integration
+
+### Signal Mechanism
+- When expenses transition from CREATED to APPROVED/REJECTED, registered workflows are signaled
+- Uses Temporal's workflow signal mechanism
+- Workflow IDs are stored and used to send signals to workflows
+
+### Error Handling
+- Failed signal sending is logged but doesn't affect UI operations
+- Invalid or non-existent workflow IDs are handled gracefully
+
+## User Interface
+
+### Web Interface Features
+- Clean HTML table display
+- Color-coded action buttons (green for APPROVE, red for REJECT)
+- Real-time state display
+- Navigation between views
+
+### API Interface Features
+- RESTful endpoints for programmatic access
+- Consistent error response format
+- Support for both sync and async operations
+
+## Non-Functional Requirements
+
+### Concurrency
+- Thread-safe in-memory storage operations
+- Handles concurrent API and UI requests
+
+### Error Recovery
+- Graceful handling of workflow signal failures
+- Input validation on all endpoints (422 for missing/invalid parameters, 200 with error messages for business logic errors)
+- Descriptive error messages in response body
+
+### Logging
+- State change operations are logged
+- Workflow registration and signal sending logged
+- Error conditions logged for debugging
+
+## Security Considerations
+- Input validation on all parameters
+- Protection against duplicate ID creation
+- Secure handling of Temporal workflow IDs
+
+## Scalability Notes
+- Current implementation uses in-memory storage
+- Designed for demonstration/development use
+- Production deployment would require persistent storage
\ No newline at end of file
diff --git a/tests/expense/WORKFLOW_SPECIFICATION.md b/tests/expense/WORKFLOW_SPECIFICATION.md
new file mode 100644
index 00000000..248ad478
--- /dev/null
+++ b/tests/expense/WORKFLOW_SPECIFICATION.md
@@ -0,0 +1,251 @@
+# Expense Workflow and Activities Specification
+
+## Overview
+The Expense Processing System demonstrates a human-in-the-loop workflow pattern using Temporal. It processes expense requests through a multi-step approval workflow with signal-based completion.
+
+## Business Process Flow
+
+### Workflow Steps
+1. **Create Expense Report**: Initialize a new expense in the external system
+2. **Register for Decision & Wait for Signal**: Register expense and wait for approval/rejection via external UI (signal-based completion)
+3. **Process Payment** (conditional): Execute payment if approved
+
+### Decision Logic
+- **APPROVED**: Continue to payment processing → Return "COMPLETED"
+- **Any other value**: Skip payment processing → Return empty string ""
+ - This includes: "REJECTED", "DENIED", "PENDING", "CANCELLED", or any unknown value
+- **ERROR**: Propagate failure to workflow caller
+
+## Architecture Components
+
+### Core Entities
+- **Workflow**: `SampleExpenseWorkflow` - Main orchestration logic
+- **Activities**: Three distinct activities for each business step
+- **External System**: HTTP-based expense management UI
+- **Workflow Signals**: Enable workflow completion from external systems
+
+### External Integration
+- **Expense UI Server**: HTTP API at `localhost:8099`
+- **Signal Completion**: UI system sends signals to workflows via Temporal client
+- **Human Interaction**: Web-based approval/rejection interface
+
+## Implementation Specifications
+
+### Workflow Definition
+
+#### `SampleExpenseWorkflow`
+```python
+@workflow.defn
+class SampleExpenseWorkflow:
+ def __init__(self) -> None:
+ self.expense_decision: str = ""
+
+ @workflow.signal
+ async def expense_decision_signal(self, decision: str) -> None:
+ self.expense_decision = decision
+
+ @workflow.run
+ async def run(self, expense_id: str) -> str
+```
+
+**Input Parameters**:
+- `expense_id`: Unique identifier for the expense request
+
+**Return Values**:
+- Success (Approved): `"COMPLETED"`
+- Success (Rejected): `""` (empty string)
+- Failure: Exception/error propagated
+
+**Timeout Configuration**:
+- Step 1 (Create): 10 seconds
+- Step 2 (Wait): 10 minutes (human approval timeout)
+- Step 3 (Payment): 10 seconds
+
+### Activity Definitions
+
+#### 1. Create Expense Activity
+
+**Purpose**: Initialize expense record in external system
+
+**Function Signature**: `create_expense_activity(expense_id: str) -> None`
+
+**Business Rules**:
+- Validate expense_id is not empty
+- HTTP GET to `/create?is_api_call=true&id={expense_id}`
+- Success condition: Response body equals "SUCCEED"
+- Any other response triggers exception
+
+**Error Handling**:
+- Empty expense_id: `ValueError` with message "expense id is empty"
+- Whitespace-only expense_id: `ValueError` (same as empty)
+- HTTP errors: `httpx.HTTPStatusError` propagated to workflow
+- Server error responses: `Exception` with specific error message (e.g., "ERROR:ID_ALREADY_EXISTS")
+- Network failures: Connection timeouts and DNS resolution errors propagated
+
+#### 2. Register for Decision Activity
+
+**Purpose**: Register expense for human decision and return immediately
+
+**Function Signature**: `register_for_decision_activity(expense_id: str) -> None`
+
+**Signal-Based Pattern**:
+The activity demonstrates a signal-based human-in-the-loop pattern. It simply registers the expense for decision and completes immediately. The workflow then waits for a signal from an external system. This pattern enables human-in-the-loop workflows where workflows can wait as long as necessary for external decisions using Temporal's signal mechanism.
+
+**Business Logic**:
+1. Validate expense_id is not empty
+2. Log that the expense has been registered for decision
+3. Return immediately (no HTTP calls or external registration)
+4. The workflow then waits for a signal using `workflow.wait_condition()`
+5. When a human approves or rejects the expense, an external process sends a signal to the workflow using `workflow_handle.signal()`
+
+**Signal Integration**:
+- **Signal Name**: `expense_decision_signal`
+- **Signal Payload**: Decision string ("APPROVED", "REJECTED", etc.)
+- **Workflow Registration**: External system must know the workflow ID to send signals
+
+**Completion Values**:
+- `"APPROVED"`: Expense approved for payment
+- `"REJECTED"`: Expense denied
+- `"DENIED"`, `"PENDING"`, `"CANCELLED"`: Also treated as rejection
+- Any other value: Treated as rejection (workflow returns empty string)
+
+**Error Scenarios**:
+- Empty expense_id: Immediate validation error
+- Signal timeout: Temporal timeout handling (workflow-level timeout)
+- Invalid signal payload: Handled gracefully by workflow
+
+#### 3. Payment Activity
+
+**Purpose**: Process payment for approved expenses
+
+**Function Signature**: `payment_activity(expense_id: str) -> None`
+
+**Business Rules**:
+- Only called for approved expenses
+- Validate expense_id is not empty
+- HTTP POST to `/action` with form data: `is_api_call=true`, `type=payment`, `id={expense_id}`
+- Success condition: Response body equals "SUCCEED"
+
+**Error Handling**:
+- Empty expense_id: `ValueError` with message "expense id is empty"
+- HTTP errors: `httpx.HTTPStatusError` propagated to workflow
+- Payment failure: `Exception` with specific error message (e.g., "ERROR:INSUFFICIENT_FUNDS")
+- Network failures: Connection timeouts and DNS resolution errors propagated
+
+## State Management
+
+### Activity Completion Flow
+1. **Synchronous Activities**: Create, Register, and Payment activities complete immediately
+2. **Signal-Based Waiting**: Workflow waits for external signal after registration
+
+### Signal Lifecycle
+1. Workflow starts and registers expense for decision
+2. External system stores workflow ID to expense ID mapping
+3. Human makes decision via web UI
+4. UI system calls Temporal client to send signal to workflow
+5. Workflow receives signal and continues execution
+
+### External System Integration
+- **Storage**: In-memory expense state management
+- **Workflow Mapping**: Workflow ID to expense ID mapping
+- **Signal Completion**: Temporal client workflow signal sending
+- **Error Recovery**: Graceful handling of signal failures
+
+## Error Handling Patterns
+
+### Validation Errors
+- **Trigger**: Empty or invalid input parameters
+- **Behavior**: Immediate activity/workflow failure
+- **Retry**: Not applicable (validation errors are non-retryable)
+
+### HTTP Communication Errors
+- **Network Failures**: Connection timeouts, DNS resolution
+- **Server Errors**: 5xx responses from expense system
+- **Retry Behavior**: Follows Temporal's default retry policy
+- **Final Failure**: Propagated to workflow after retries exhausted
+
+### External System Errors
+- **Business Logic Errors**: Duplicate expense IDs, invalid states
+- **Response Format**: Error messages in HTTP response body (e.g., "ERROR:ID_ALREADY_EXISTS")
+- **Handling**: Converted to application errors with descriptive messages
+- **Tested Examples**: "ERROR:INVALID_ID", "ERROR:INSUFFICIENT_FUNDS", "ERROR:INVALID_STATE"
+
+### Async Completion Errors
+- **Registration Failure**: Activity fails immediately if callback registration fails
+- **Completion Timeout**: Temporal enforces activity timeout (10 minutes)
+- **Invalid Completion**: External system error handling for malformed completions
+
+## Timeout Configuration
+
+### Activity Timeouts
+- **Create Expense**: 10 seconds (fast operation)
+- **Wait for Decision**: 10 minutes (human approval window)
+- **Payment Processing**: 10 seconds (automated operation)
+
+### Timeout Behavior
+- **Exceeded**: Activity marked as failed by Temporal
+- **Retry**: Follows activity retry policy
+- **Workflow Impact**: Timeout failures propagate to workflow
+
+
+
+## Testing Patterns
+
+### Mock Testing Approach
+The system supports comprehensive testing with mocked activities:
+
+#### Test Patterns
+```python
+@activity.defn(name="create_expense_activity")
+async def create_expense_mock(expense_id: str) -> None:
+ return None # Success mock
+
+@activity.defn(name="register_for_decision_activity")
+async def register_for_decision_mock(expense_id: str) -> None:
+ return None # Registration mock
+
+# Testing signal-based behavior:
+# Activity completes immediately, no special exceptions
+result = await activity_env.run(register_for_decision_activity, "test-expense")
+assert result is None
+```
+
+### Test Scenarios
+1. **Happy Path**: All activities succeed, expense approved
+2. **Rejection Path**: Expense rejected, payment skipped
+3. **Failure Scenarios**: Activity failures at each step
+4. **Mock Server Testing**: HTTP interactions with test server
+5. **Signal Testing**: Simulated workflow signal sending and receiving
+6. **Decision Value Testing**: All possible decision values (APPROVED, REJECTED, DENIED, PENDING, CANCELLED, UNKNOWN)
+7. **Retryable Failures**: Activities that fail temporarily and then succeed on retry
+8. **Parameter Validation**: Empty and whitespace-only expense IDs
+9. **Logging Behavior**: Verify activity logging works correctly
+10. **Server Error Responses**: Specific error formats like "ERROR:ID_ALREADY_EXISTS"
+
+### Mock Server Integration
+- **HTTP Mocking**: Uses test frameworks to mock HTTP server responses
+- **Delayed Completion**: Simulates human approval delays in tests
+
+### Edge Case Testing
+Tests include comprehensive coverage of edge cases and error scenarios:
+
+#### Retry Behavior Testing
+- **Transient Failures**: Activities that fail on first attempts but succeed after retries
+- **Retry Counting**: Verification that activities retry the expected number of times
+- **Mixed Scenarios**: Different activities failing and recovering independently
+
+#### Parameter Validation Testing
+- **Empty Strings**: Expense IDs that are completely empty (`""`)
+- **Whitespace-Only**: Expense IDs containing only spaces (`" "`)
+- **Non-Retryable Errors**: Validation failures that should not be retried
+
+#### Logging Verification
+- **Activity Logging**: Ensures activity.logger.info() calls work correctly
+- **Workflow Logging**: Verification of workflow-level logging behavior
+- **Log Content**: Checking that log messages contain expected information
+
+#### Server Error Response Testing
+- **Specific Error Codes**: Testing responses like "ERROR:ID_ALREADY_EXISTS"
+- **HTTP Status Errors**: Network-level HTTP errors vs application errors
+- **Error Message Propagation**: Ensuring error details reach the workflow caller
+
diff --git a/tests/expense/test_expense_activities.py b/tests/expense/test_expense_activities.py
new file mode 100644
index 00000000..8020122d
--- /dev/null
+++ b/tests/expense/test_expense_activities.py
@@ -0,0 +1,184 @@
+"""
+Tests for individual expense activities.
+Focuses on activity behavior, parameters, error handling, and HTTP interactions.
+"""
+
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import httpx
+import pytest
+from temporalio.testing import ActivityEnvironment
+
+from expense import EXPENSE_SERVER_HOST_PORT
+from expense.activities import (
+ create_expense_activity,
+ payment_activity,
+ register_for_decision_activity,
+)
+
+
+class TestCreateExpenseActivity:
+ """Test create_expense_activity individual behavior"""
+
+ @pytest.fixture
+ def activity_env(self):
+ return ActivityEnvironment()
+
+ async def test_create_expense_activity_success(self, activity_env):
+ """Test successful expense creation"""
+ with patch("expense.activities.get_http_client") as mock_get_client:
+ # Mock successful HTTP response
+ mock_response = AsyncMock()
+ mock_response.text = "SUCCEED"
+ mock_response.raise_for_status = AsyncMock()
+
+ mock_client_instance = AsyncMock()
+ mock_client_instance.get.return_value = mock_response
+ mock_get_client.return_value = mock_client_instance
+
+ # Execute activity
+ result = await activity_env.run(create_expense_activity, "test-expense-123")
+
+ # Verify HTTP call
+ mock_client_instance.get.assert_called_once_with(
+ f"{EXPENSE_SERVER_HOST_PORT}/create",
+ params={"is_api_call": "true", "id": "test-expense-123"},
+ )
+ mock_response.raise_for_status.assert_called_once()
+
+ # Activity should return None on success
+ assert result is None
+
+ async def test_create_expense_activity_empty_id(self, activity_env):
+ """Test create expense activity with empty expense ID"""
+ with pytest.raises(ValueError, match="expense id is empty"):
+ await activity_env.run(create_expense_activity, "")
+
+ async def test_create_expense_activity_http_error(self, activity_env):
+ """Test create expense activity with HTTP error"""
+ with patch("expense.activities.get_http_client") as mock_get_client:
+ # Mock HTTP error with proper response mock
+ mock_response_obj = MagicMock()
+ mock_response_obj.status_code = 500
+ mock_response_obj.text = "Server Error"
+
+ mock_response = MagicMock()
+ mock_response.raise_for_status.side_effect = httpx.HTTPStatusError(
+ "Server Error", request=MagicMock(), response=mock_response_obj
+ )
+
+ mock_client_instance = AsyncMock()
+ mock_client_instance.get.return_value = mock_response
+ mock_get_client.return_value = mock_client_instance
+
+ with pytest.raises(httpx.HTTPStatusError):
+ await activity_env.run(create_expense_activity, "test-expense-123")
+
+ async def test_create_expense_activity_server_error_response(self, activity_env):
+ """Test create expense activity with server error response"""
+ with patch("expense.activities.get_http_client") as mock_get_client:
+ # Mock error response
+ mock_response = AsyncMock()
+ mock_response.text = "ERROR:ID_ALREADY_EXISTS"
+ mock_response.raise_for_status = AsyncMock()
+
+ mock_client_instance = AsyncMock()
+ mock_client_instance.get.return_value = mock_response
+ mock_get_client.return_value = mock_client_instance
+
+ with pytest.raises(Exception, match="ERROR:ID_ALREADY_EXISTS"):
+ await activity_env.run(create_expense_activity, "test-expense-123")
+
+
+class TestRegisterForDecisionActivity:
+ """Test register_for_decision_activity individual behavior"""
+
+ @pytest.fixture
+ def activity_env(self):
+ return ActivityEnvironment()
+
+ async def test_register_for_decision_activity_empty_id(self, activity_env):
+ """Test register for decision activity with empty expense ID"""
+ with pytest.raises(ValueError, match="expense id is empty"):
+ await activity_env.run(register_for_decision_activity, "")
+
+ async def test_register_for_decision_activity_success(self, activity_env):
+ """Test successful expense registration behavior"""
+ # Mock the HTTP client and response
+ mock_response = AsyncMock()
+ mock_response.raise_for_status = AsyncMock()
+
+ mock_http_client = AsyncMock()
+ mock_http_client.post.return_value = mock_response
+
+ # Mock the get_http_client function
+ with patch("expense.activities.get_http_client", return_value=mock_http_client):
+ result = await activity_env.run(
+ register_for_decision_activity, "test-expense-123"
+ )
+
+ # Activity should return None on success
+ assert result is None
+
+ # Verify HTTP registration was called
+ mock_http_client.post.assert_called_once()
+ call_args = mock_http_client.post.call_args
+ assert "/registerWorkflow" in call_args[0][0]
+ assert call_args[1]["params"]["id"] == "test-expense-123"
+ assert "workflow_id" in call_args[1]["data"]
+
+
+class TestPaymentActivity:
+ """Test payment_activity individual behavior"""
+
+ @pytest.fixture
+ def activity_env(self):
+ return ActivityEnvironment()
+
+ async def test_payment_activity_success(self, activity_env):
+ """Test successful payment processing"""
+ with patch("expense.activities.get_http_client") as mock_get_client:
+ # Mock successful payment response
+ mock_response = AsyncMock()
+ mock_response.text = "SUCCEED"
+ mock_response.raise_for_status = AsyncMock()
+
+ mock_client_instance = AsyncMock()
+ mock_client_instance.post.return_value = mock_response
+ mock_get_client.return_value = mock_client_instance
+
+ # Execute activity
+ result = await activity_env.run(payment_activity, "test-expense-123")
+
+ # Verify HTTP call
+ mock_client_instance.post.assert_called_once_with(
+ f"{EXPENSE_SERVER_HOST_PORT}/action",
+ data={
+ "is_api_call": "true",
+ "type": "payment",
+ "id": "test-expense-123",
+ },
+ )
+
+ # Activity should return None on success
+ assert result is None
+
+ async def test_payment_activity_empty_id(self, activity_env):
+ """Test payment activity with empty expense ID"""
+ with pytest.raises(ValueError, match="expense id is empty"):
+ await activity_env.run(payment_activity, "")
+
+ async def test_payment_activity_payment_failure(self, activity_env):
+ """Test payment activity with payment failure"""
+ with patch("expense.activities.get_http_client") as mock_get_client:
+ # Mock payment failure response
+ mock_response = AsyncMock()
+ mock_response.text = "ERROR:INSUFFICIENT_FUNDS"
+ mock_response.raise_for_status = AsyncMock()
+
+ mock_client_instance = AsyncMock()
+ mock_client_instance.post.return_value = mock_response
+ mock_get_client.return_value = mock_client_instance
+
+ with pytest.raises(Exception, match="ERROR:INSUFFICIENT_FUNDS"):
+ await activity_env.run(payment_activity, "test-expense-123")
diff --git a/tests/expense/test_expense_edge_cases.py b/tests/expense/test_expense_edge_cases.py
new file mode 100644
index 00000000..0452b7a1
--- /dev/null
+++ b/tests/expense/test_expense_edge_cases.py
@@ -0,0 +1,242 @@
+"""
+Edge case tests for expense workflow and activities.
+Tests parameter validation, retries, error scenarios, and boundary conditions.
+"""
+
+import asyncio
+import uuid
+
+import pytest
+from temporalio import activity
+from temporalio.client import Client, WorkflowFailureError
+from temporalio.exceptions import ApplicationError
+from temporalio.testing import WorkflowEnvironment
+from temporalio.worker import Worker
+
+from expense.workflow import SampleExpenseWorkflow
+
+
+class MockExpenseUI:
+ """Mock UI that simulates the expense approval system"""
+
+ def __init__(self, client: Client):
+ self.client = client
+ self.workflow_map: dict[str, str] = {}
+ self.scheduled_decisions: dict[str, str] = {}
+
+ def register_workflow(self, expense_id: str, workflow_id: str):
+ """Register a workflow for an expense (simulates UI registration)"""
+ self.workflow_map[expense_id] = workflow_id
+
+ def schedule_decision(self, expense_id: str, decision: str):
+ """Schedule a decision to be made (simulates human decision)"""
+ self.scheduled_decisions[expense_id] = decision
+
+ async def send_decision():
+ try:
+ if expense_id in self.workflow_map:
+ workflow_id = self.workflow_map[expense_id]
+ handle = self.client.get_workflow_handle(workflow_id)
+ await handle.signal("expense_decision_signal", decision)
+ except Exception:
+ # Ignore errors in time-skipping mode where workflows may complete quickly
+ pass
+
+ asyncio.create_task(send_decision())
+
+ def create_register_activity(self):
+ """Create a register activity that works with this mock UI"""
+
+ @activity.defn(name="register_for_decision_activity")
+ async def register_decision_activity(expense_id: str) -> None:
+ # In time-skipping mode, send the decision immediately
+ if expense_id in self.scheduled_decisions:
+ decision = self.scheduled_decisions[expense_id]
+ if expense_id in self.workflow_map:
+ workflow_id = self.workflow_map[expense_id]
+ handle = self.client.get_workflow_handle(workflow_id)
+ try:
+ # Send signal immediately when registering
+ await handle.signal("expense_decision_signal", decision)
+ except Exception:
+ # Ignore errors in time-skipping mode
+ pass
+ return None
+
+ return register_decision_activity
+
+
+class TestWorkflowEdgeCases:
+ """Test edge cases in workflow behavior"""
+
+ async def test_workflow_with_retryable_activity_failures(
+ self, client: Client, env: WorkflowEnvironment
+ ):
+ """Test workflow behavior with retryable activity failures"""
+ task_queue = f"test-retryable-failures-{uuid.uuid4()}"
+ workflow_id = f"test-workflow-retryable-{uuid.uuid4()}"
+ expense_id = "test-expense-retryable"
+ create_call_count = 0
+ payment_call_count = 0
+
+ # Set up mock UI with APPROVED decision
+ mock_ui = MockExpenseUI(client)
+ mock_ui.register_workflow(expense_id, workflow_id)
+ mock_ui.schedule_decision(expense_id, "APPROVED")
+
+ @activity.defn(name="create_expense_activity")
+ async def create_expense_retry(expense_id: str) -> None:
+ nonlocal create_call_count
+ create_call_count += 1
+ if create_call_count == 1:
+ # First call fails, but retryable
+ raise Exception("Transient failure in create expense")
+ return None # Second call succeeds
+
+ @activity.defn(name="payment_activity")
+ async def payment_retry(expense_id: str) -> None:
+ nonlocal payment_call_count
+ payment_call_count += 1
+ if payment_call_count == 1:
+ # First call fails, but retryable
+ raise Exception("Transient failure in payment")
+ return None # Second call succeeds
+
+ async with Worker(
+ client,
+ task_queue=task_queue,
+ workflows=[SampleExpenseWorkflow],
+ activities=[
+ create_expense_retry,
+ mock_ui.create_register_activity(),
+ payment_retry,
+ ],
+ ):
+ result = await client.execute_workflow(
+ SampleExpenseWorkflow.run,
+ expense_id,
+ id=workflow_id,
+ task_queue=task_queue,
+ )
+
+ # Should succeed after retries
+ assert result == "COMPLETED"
+ # Verify activities were retried
+ assert create_call_count == 2
+ assert payment_call_count == 2
+
+ async def test_workflow_logging_behavior(
+ self, client: Client, env: WorkflowEnvironment
+ ):
+ """Test that workflow logging works correctly"""
+ task_queue = f"test-logging-{uuid.uuid4()}"
+ workflow_id = f"test-workflow-logging-{uuid.uuid4()}"
+ expense_id = "test-expense-logging"
+ logged_messages = []
+
+ # Set up mock UI with APPROVED decision
+ mock_ui = MockExpenseUI(client)
+ mock_ui.register_workflow(expense_id, workflow_id)
+ mock_ui.schedule_decision(expense_id, "APPROVED")
+
+ @activity.defn(name="create_expense_activity")
+ async def create_expense_mock(expense_id: str) -> None:
+ # Mock logging by capturing messages
+ logged_messages.append(f"Creating expense: {expense_id}")
+ return None
+
+ @activity.defn(name="payment_activity")
+ async def payment_mock(expense_id: str) -> None:
+ logged_messages.append(f"Processing payment: {expense_id}")
+ return None
+
+ # Create logging register activity
+ def create_logging_register_activity():
+ @activity.defn(name="register_for_decision_activity")
+ async def register_decision_logging(expense_id: str) -> None:
+ logged_messages.append(f"Waiting for decision: {expense_id}")
+ # In time-skipping mode, send the decision immediately
+ if expense_id in mock_ui.scheduled_decisions:
+ decision = mock_ui.scheduled_decisions[expense_id]
+ if expense_id in mock_ui.workflow_map:
+ workflow_id = mock_ui.workflow_map[expense_id]
+ handle = client.get_workflow_handle(workflow_id)
+ try:
+ # Send signal immediately when registering
+ await handle.signal("expense_decision_signal", decision)
+ except Exception:
+ # Ignore errors in time-skipping mode
+ pass
+ return None
+
+ return register_decision_logging
+
+ async with Worker(
+ client,
+ task_queue=task_queue,
+ workflows=[SampleExpenseWorkflow],
+ activities=[
+ create_expense_mock,
+ create_logging_register_activity(),
+ payment_mock,
+ ],
+ ):
+ result = await client.execute_workflow(
+ SampleExpenseWorkflow.run,
+ expense_id,
+ id=workflow_id,
+ task_queue=task_queue,
+ )
+
+ assert result == "COMPLETED"
+ # Verify logging occurred
+ assert len(logged_messages) == 3
+ assert f"Creating expense: {expense_id}" in logged_messages
+ assert f"Waiting for decision: {expense_id}" in logged_messages
+ assert f"Processing payment: {expense_id}" in logged_messages
+
+ async def test_workflow_parameter_validation(
+ self, client: Client, env: WorkflowEnvironment
+ ):
+ """Test workflow with various parameter validation scenarios"""
+ task_queue = f"test-param-validation-{uuid.uuid4()}"
+
+ @activity.defn(name="create_expense_activity")
+ async def create_expense_validate(expense_id: str) -> None:
+ if not expense_id or expense_id.strip() == "":
+ raise ApplicationError(
+ "expense id is empty or whitespace", non_retryable=True
+ )
+ return None
+
+ @activity.defn(name="register_for_decision_activity")
+ async def wait_for_decision_mock(expense_id: str) -> None:
+ return None
+
+ @activity.defn(name="payment_activity")
+ async def payment_mock(expense_id: str) -> None:
+ return None
+
+ async with Worker(
+ client,
+ task_queue=task_queue,
+ workflows=[SampleExpenseWorkflow],
+ activities=[create_expense_validate, wait_for_decision_mock, payment_mock],
+ ):
+ # Test with empty string - this should fail at create_expense_activity
+ with pytest.raises(WorkflowFailureError):
+ await client.execute_workflow(
+ SampleExpenseWorkflow.run,
+ "", # Empty expense ID
+ id=f"test-workflow-empty-id-{uuid.uuid4()}",
+ task_queue=task_queue,
+ )
+
+ # Test with whitespace-only string - this should fail at create_expense_activity
+ with pytest.raises(WorkflowFailureError):
+ await client.execute_workflow(
+ SampleExpenseWorkflow.run,
+ " ", # Whitespace-only expense ID
+ id=f"test-workflow-whitespace-id-{uuid.uuid4()}",
+ task_queue=task_queue,
+ )
diff --git a/tests/expense/test_expense_integration.py b/tests/expense/test_expense_integration.py
new file mode 100644
index 00000000..587f7e5b
--- /dev/null
+++ b/tests/expense/test_expense_integration.py
@@ -0,0 +1,212 @@
+"""
+Integration tests for expense workflow with mock HTTP server.
+Tests end-to-end behavior with realistic HTTP interactions.
+"""
+
+import asyncio
+import uuid
+from unittest.mock import AsyncMock, patch
+
+from temporalio import activity
+from temporalio.client import Client
+from temporalio.testing import WorkflowEnvironment
+from temporalio.worker import Worker
+
+from expense.workflow import SampleExpenseWorkflow
+
+
+class MockExpenseUI:
+ """Mock UI that simulates the expense approval system"""
+
+ def __init__(self, client: Client):
+ self.client = client
+ self.workflow_map: dict[str, str] = {}
+ self.scheduled_decisions: dict[str, str] = {}
+
+ def register_workflow(self, expense_id: str, workflow_id: str):
+ """Register a workflow for an expense (simulates UI registration)"""
+ self.workflow_map[expense_id] = workflow_id
+
+ def schedule_decision(self, expense_id: str, decision: str):
+ """Schedule a decision to be made (simulates human decision)"""
+ self.scheduled_decisions[expense_id] = decision
+
+ async def send_decision():
+ try:
+ if expense_id in self.workflow_map:
+ workflow_id = self.workflow_map[expense_id]
+ handle = self.client.get_workflow_handle(workflow_id)
+ await handle.signal("expense_decision_signal", decision)
+ except Exception:
+ # Ignore errors in time-skipping mode where workflows may complete quickly
+ pass
+
+ asyncio.create_task(send_decision())
+
+ def create_register_activity(self):
+ """Create a register activity that works with this mock UI"""
+
+ @activity.defn(name="register_for_decision_activity")
+ async def register_decision_activity(expense_id: str) -> None:
+ # In time-skipping mode, send the decision immediately
+ if expense_id in self.scheduled_decisions:
+ decision = self.scheduled_decisions[expense_id]
+ if expense_id in self.workflow_map:
+ workflow_id = self.workflow_map[expense_id]
+ handle = self.client.get_workflow_handle(workflow_id)
+ try:
+ # Send signal immediately when registering
+ await handle.signal("expense_decision_signal", decision)
+ except Exception:
+ # Ignore errors in time-skipping mode
+ pass
+ return None
+
+ return register_decision_activity
+
+
+class TestExpenseWorkflowWithMockServer:
+ """Test workflow with mock HTTP server"""
+
+ async def test_workflow_with_mock_server_approved(
+ self, client: Client, env: WorkflowEnvironment
+ ):
+ """Test complete workflow with mock HTTP server - approved path"""
+ task_queue = f"test-mock-server-approved-{uuid.uuid4()}"
+ workflow_id = f"test-mock-server-workflow-{uuid.uuid4()}"
+ expense_id = "test-mock-server-expense"
+
+ # Set up mock UI with APPROVED decision
+ mock_ui = MockExpenseUI(client)
+ mock_ui.register_workflow(expense_id, workflow_id)
+ mock_ui.schedule_decision(expense_id, "APPROVED")
+
+ # Mock HTTP responses
+ responses = {
+ "/create": "SUCCEED",
+ "/registerCallback": "SUCCEED",
+ "/action": "SUCCEED",
+ }
+
+ with patch("httpx.AsyncClient") as mock_client:
+
+ async def mock_request_handler(*args, **kwargs):
+ mock_response = AsyncMock()
+ url = args[0] if args else kwargs.get("url", "")
+
+ # Determine response based on URL path
+ for path, response_text in responses.items():
+ if path in url:
+ mock_response.text = response_text
+ break
+ else:
+ mock_response.text = "NOT_FOUND"
+
+ mock_response.raise_for_status = AsyncMock()
+ return mock_response
+
+ mock_client_instance = AsyncMock()
+ mock_client_instance.get.side_effect = mock_request_handler
+ mock_client_instance.post.side_effect = mock_request_handler
+ mock_client.return_value.__aenter__.return_value = mock_client_instance
+
+ # Use completely mocked activities to avoid async completion issues
+ @activity.defn(name="create_expense_activity")
+ async def mock_create_expense(expense_id: str) -> None:
+ # Simulated HTTP call logic
+ return None
+
+ @activity.defn(name="payment_activity")
+ async def mock_payment(expense_id: str) -> None:
+ # Simulated HTTP call logic
+ return None
+
+ async with Worker(
+ client,
+ task_queue=task_queue,
+ workflows=[SampleExpenseWorkflow],
+ activities=[
+ mock_create_expense,
+ mock_ui.create_register_activity(),
+ mock_payment,
+ ],
+ ):
+ result = await client.execute_workflow(
+ SampleExpenseWorkflow.run,
+ expense_id,
+ id=workflow_id,
+ task_queue=task_queue,
+ )
+
+ assert result == "COMPLETED"
+
+ async def test_workflow_with_mock_server_rejected(
+ self, client: Client, env: WorkflowEnvironment
+ ):
+ """Test complete workflow with mock HTTP server - rejected path"""
+ task_queue = f"test-mock-server-rejected-{uuid.uuid4()}"
+ workflow_id = f"test-mock-server-rejected-workflow-{uuid.uuid4()}"
+ expense_id = "test-mock-server-rejected"
+
+ # Set up mock UI with REJECTED decision
+ mock_ui = MockExpenseUI(client)
+ mock_ui.register_workflow(expense_id, workflow_id)
+ mock_ui.schedule_decision(expense_id, "REJECTED")
+
+ # Mock HTTP responses
+ responses = {
+ "/create": "SUCCEED",
+ "/registerCallback": "SUCCEED",
+ }
+
+ with patch("httpx.AsyncClient") as mock_client:
+
+ async def mock_request_handler(*args, **kwargs):
+ mock_response = AsyncMock()
+ url = args[0] if args else kwargs.get("url", "")
+
+ # Determine response based on URL path
+ for path, response_text in responses.items():
+ if path in url:
+ mock_response.text = response_text
+ break
+ else:
+ mock_response.text = "NOT_FOUND"
+
+ mock_response.raise_for_status = AsyncMock()
+ return mock_response
+
+ mock_client_instance = AsyncMock()
+ mock_client_instance.get.side_effect = mock_request_handler
+ mock_client_instance.post.side_effect = mock_request_handler
+ mock_client.return_value.__aenter__.return_value = mock_client_instance
+
+ # Use completely mocked activities
+ @activity.defn(name="create_expense_activity")
+ async def mock_create_expense(expense_id: str) -> None:
+ # Simulated HTTP call logic
+ return None
+
+ @activity.defn(name="payment_activity")
+ async def mock_payment(expense_id: str) -> None:
+ # Simulated HTTP call logic
+ return None
+
+ async with Worker(
+ client,
+ task_queue=task_queue,
+ workflows=[SampleExpenseWorkflow],
+ activities=[
+ mock_create_expense,
+ mock_ui.create_register_activity(),
+ mock_payment,
+ ],
+ ):
+ result = await client.execute_workflow(
+ SampleExpenseWorkflow.run,
+ expense_id,
+ id=workflow_id,
+ task_queue=task_queue,
+ )
+
+ assert result == ""
diff --git a/tests/expense/test_expense_workflow.py b/tests/expense/test_expense_workflow.py
new file mode 100644
index 00000000..8457436a
--- /dev/null
+++ b/tests/expense/test_expense_workflow.py
@@ -0,0 +1,497 @@
+"""
+Tests for the SampleExpenseWorkflow orchestration logic.
+Focuses on workflow behavior, decision paths, and error propagation.
+"""
+
+import asyncio
+import uuid
+from datetime import timedelta
+
+import pytest
+from temporalio import activity
+from temporalio.client import Client, WorkflowFailureError
+from temporalio.exceptions import ApplicationError
+from temporalio.testing import WorkflowEnvironment
+from temporalio.worker import Worker
+
+from expense.workflow import SampleExpenseWorkflow
+
+
+class MockExpenseUI:
+ """Mock UI that simulates the expense approval system"""
+
+ def __init__(self, client: Client):
+ self.client = client
+ self.workflow_map: dict[str, str] = {}
+ self.scheduled_decisions: dict[str, str] = {}
+
+ def register_workflow(self, expense_id: str, workflow_id: str):
+ """Register a workflow for an expense (simulates UI registration)"""
+ self.workflow_map[expense_id] = workflow_id
+
+ def schedule_decision(self, expense_id: str, decision: str):
+ """Schedule a decision to be made (simulates human decision)"""
+ self.scheduled_decisions[expense_id] = decision
+
+ async def send_decision():
+ try:
+ if expense_id in self.workflow_map:
+ workflow_id = self.workflow_map[expense_id]
+ handle = self.client.get_workflow_handle(workflow_id)
+ await handle.signal("expense_decision_signal", decision)
+ except Exception:
+ # Ignore errors in time-skipping mode where workflows may complete quickly
+ pass
+
+ asyncio.create_task(send_decision())
+
+ def create_register_activity(self):
+ """Create a register activity that works with this mock UI"""
+
+ @activity.defn(name="register_for_decision_activity")
+ async def register_decision_activity(expense_id: str) -> None:
+ # In time-skipping mode, send the decision immediately
+ if expense_id in self.scheduled_decisions:
+ decision = self.scheduled_decisions[expense_id]
+ if expense_id in self.workflow_map:
+ workflow_id = self.workflow_map[expense_id]
+ handle = self.client.get_workflow_handle(workflow_id)
+ try:
+ # Send signal immediately when registering
+ await handle.signal("expense_decision_signal", decision)
+ except Exception:
+ # Ignore errors in time-skipping mode
+ pass
+ return None
+
+ return register_decision_activity
+
+
+class TestWorkflowPaths:
+ """Test main workflow execution paths"""
+
+ async def test_workflow_approved_complete_flow(
+ self, client: Client, env: WorkflowEnvironment
+ ):
+ """Test complete approved expense workflow - Happy Path"""
+ task_queue = f"test-expense-approved-{uuid.uuid4()}"
+ workflow_id = f"test-workflow-approved-{uuid.uuid4()}"
+ expense_id = "test-expense-approved"
+
+ # Set up mock UI
+ mock_ui = MockExpenseUI(client)
+ mock_ui.register_workflow(expense_id, workflow_id)
+ mock_ui.schedule_decision(expense_id, "APPROVED")
+
+ @activity.defn(name="create_expense_activity")
+ async def create_expense_mock(expense_id: str) -> None:
+ return None
+
+ @activity.defn(name="payment_activity")
+ async def payment_mock(expense_id: str) -> None:
+ return None
+
+ async with Worker(
+ client,
+ task_queue=task_queue,
+ workflows=[SampleExpenseWorkflow],
+ activities=[
+ create_expense_mock,
+ mock_ui.create_register_activity(),
+ payment_mock,
+ ],
+ ):
+ result = await client.execute_workflow(
+ SampleExpenseWorkflow.run,
+ expense_id,
+ id=workflow_id,
+ task_queue=task_queue,
+ )
+
+ assert result == "COMPLETED"
+
+ async def test_workflow_rejected_flow(
+ self, client: Client, env: WorkflowEnvironment
+ ):
+ """Test rejected expense workflow - Returns empty string"""
+ task_queue = f"test-expense-rejected-{uuid.uuid4()}"
+ workflow_id = f"test-workflow-rejected-{uuid.uuid4()}"
+ expense_id = "test-expense-rejected"
+
+ # Set up mock UI
+ mock_ui = MockExpenseUI(client)
+ mock_ui.register_workflow(expense_id, workflow_id)
+ mock_ui.schedule_decision(expense_id, "REJECTED")
+
+ @activity.defn(name="create_expense_activity")
+ async def create_expense_mock(expense_id: str) -> None:
+ return None
+
+ async with Worker(
+ client,
+ task_queue=task_queue,
+ workflows=[SampleExpenseWorkflow],
+ activities=[create_expense_mock, mock_ui.create_register_activity()],
+ ):
+ result = await client.execute_workflow(
+ SampleExpenseWorkflow.run,
+ expense_id,
+ id=workflow_id,
+ task_queue=task_queue,
+ )
+
+ assert result == ""
+
+ async def test_workflow_other_decision_treated_as_rejected(
+ self, client: Client, env: WorkflowEnvironment
+ ):
+ """Test that non-APPROVED decisions are treated as rejection"""
+ task_queue = f"test-expense-other-{uuid.uuid4()}"
+ workflow_id = f"test-workflow-other-{uuid.uuid4()}"
+ expense_id = "test-expense-other"
+
+ # Set up mock UI with PENDING decision
+ mock_ui = MockExpenseUI(client)
+ mock_ui.register_workflow(expense_id, workflow_id)
+ mock_ui.schedule_decision(expense_id, "PENDING") # Any non-APPROVED value
+
+ @activity.defn(name="create_expense_activity")
+ async def create_expense_mock(expense_id: str) -> None:
+ return None
+
+ async with Worker(
+ client,
+ task_queue=task_queue,
+ workflows=[SampleExpenseWorkflow],
+ activities=[create_expense_mock, mock_ui.create_register_activity()],
+ ):
+ result = await client.execute_workflow(
+ SampleExpenseWorkflow.run,
+ expense_id,
+ id=workflow_id,
+ task_queue=task_queue,
+ )
+
+ assert result == ""
+
+ async def test_workflow_decision_values(
+ self, client: Client, env: WorkflowEnvironment
+ ):
+ """Test that workflow returns correct values for different decisions"""
+ task_queue = f"test-decisions-{uuid.uuid4()}"
+
+ # Test cases: any non-"APPROVED" decision should return empty string
+ test_cases = [
+ ("APPROVED", "COMPLETED"),
+ ("REJECTED", ""),
+ ("DENIED", ""),
+ ("PENDING", ""),
+ ("CANCELLED", ""),
+ ("UNKNOWN", ""),
+ ]
+
+ for decision, expected_result in test_cases:
+ workflow_id = f"test-workflow-decision-{decision.lower()}-{uuid.uuid4()}"
+ expense_id = f"test-expense-{decision.lower()}"
+
+ # Set up mock UI with specific decision
+ mock_ui = MockExpenseUI(client)
+ mock_ui.register_workflow(expense_id, workflow_id)
+ mock_ui.schedule_decision(expense_id, decision)
+
+ @activity.defn(name="create_expense_activity")
+ async def create_expense_mock(expense_id: str) -> None:
+ return None
+
+ @activity.defn(name="payment_activity")
+ async def payment_mock(expense_id: str) -> None:
+ return None
+
+ async with Worker(
+ client,
+ task_queue=task_queue,
+ workflows=[SampleExpenseWorkflow],
+ activities=[
+ create_expense_mock,
+ mock_ui.create_register_activity(),
+ payment_mock,
+ ],
+ ):
+ result = await client.execute_workflow(
+ SampleExpenseWorkflow.run,
+ expense_id,
+ id=workflow_id,
+ task_queue=task_queue,
+ )
+
+ assert (
+ result == expected_result
+ ), f"Decision '{decision}' should return '{expected_result}', got '{result}'"
+
+
+class TestWorkflowFailures:
+ """Test workflow behavior when activities fail"""
+
+ async def test_workflow_create_expense_failure(
+ self, client: Client, env: WorkflowEnvironment
+ ):
+ """Test workflow when create expense activity fails"""
+ task_queue = f"test-create-failure-{uuid.uuid4()}"
+
+ @activity.defn(name="create_expense_activity")
+ async def failing_create_expense(expense_id: str):
+ raise ApplicationError("Failed to create expense", non_retryable=True)
+
+ async with Worker(
+ client,
+ task_queue=task_queue,
+ workflows=[SampleExpenseWorkflow],
+ activities=[failing_create_expense],
+ ):
+ with pytest.raises(WorkflowFailureError):
+ await client.execute_workflow(
+ SampleExpenseWorkflow.run,
+ "test-expense-create-fail",
+ id=f"test-workflow-create-fail-{uuid.uuid4()}",
+ task_queue=task_queue,
+ )
+
+ async def test_workflow_wait_decision_failure(
+ self, client: Client, env: WorkflowEnvironment
+ ):
+ """Test workflow when wait for decision activity fails"""
+ task_queue = f"test-wait-failure-{uuid.uuid4()}"
+
+ @activity.defn(name="create_expense_activity")
+ async def create_expense_mock(expense_id: str) -> None:
+ return None
+
+ @activity.defn(name="register_for_decision_activity")
+ async def failing_wait_decision(expense_id: str) -> None:
+ raise ApplicationError("Failed to register callback", non_retryable=True)
+
+ async with Worker(
+ client,
+ task_queue=task_queue,
+ workflows=[SampleExpenseWorkflow],
+ activities=[create_expense_mock, failing_wait_decision],
+ ):
+ with pytest.raises(WorkflowFailureError):
+ await client.execute_workflow(
+ SampleExpenseWorkflow.run,
+ "test-expense-wait-fail",
+ id=f"test-workflow-wait-fail-{uuid.uuid4()}",
+ task_queue=task_queue,
+ )
+
+ async def test_workflow_payment_failure(
+ self, client: Client, env: WorkflowEnvironment
+ ):
+ """Test workflow when payment activity fails after approval"""
+ task_queue = f"test-payment-failure-{uuid.uuid4()}"
+ workflow_id = f"test-workflow-payment-fail-{uuid.uuid4()}"
+ expense_id = "test-expense-payment-fail"
+
+ # Set up mock UI with APPROVED decision
+ mock_ui = MockExpenseUI(client)
+ mock_ui.register_workflow(expense_id, workflow_id)
+ mock_ui.schedule_decision(expense_id, "APPROVED")
+
+ @activity.defn(name="create_expense_activity")
+ async def create_expense_mock(expense_id: str) -> None:
+ return None
+
+ @activity.defn(name="payment_activity")
+ async def failing_payment(expense_id: str):
+ raise ApplicationError("Payment processing failed", non_retryable=True)
+
+ async with Worker(
+ client,
+ task_queue=task_queue,
+ workflows=[SampleExpenseWorkflow],
+ activities=[
+ create_expense_mock,
+ mock_ui.create_register_activity(),
+ failing_payment,
+ ],
+ ):
+ with pytest.raises(WorkflowFailureError):
+ await client.execute_workflow(
+ SampleExpenseWorkflow.run,
+ expense_id,
+ id=workflow_id,
+ task_queue=task_queue,
+ )
+
+
+class TestWorkflowConfiguration:
+ """Test workflow timeout and configuration behavior"""
+
+ async def test_workflow_timeout_configuration(
+ self, client: Client, env: WorkflowEnvironment
+ ):
+ """Test that workflow uses correct timeout configurations"""
+ task_queue = f"test-timeouts-{uuid.uuid4()}"
+ workflow_id = f"test-workflow-timeouts-{uuid.uuid4()}"
+ expense_id = "test-expense-timeouts"
+ timeout_calls = []
+
+ # Set up mock UI with APPROVED decision
+ mock_ui = MockExpenseUI(client)
+ mock_ui.register_workflow(expense_id, workflow_id)
+ mock_ui.schedule_decision(expense_id, "APPROVED")
+
+ @activity.defn(name="create_expense_activity")
+ async def create_expense_timeout_check(expense_id: str) -> None:
+ # Check that we're called with 10 second timeout
+ activity_info = activity.info()
+ timeout_calls.append(("create", activity_info.start_to_close_timeout))
+ return None
+
+ @activity.defn(name="payment_activity")
+ async def payment_timeout_check(expense_id: str) -> None:
+ # Check that we're called with 10 second timeout
+ activity_info = activity.info()
+ timeout_calls.append(("payment", activity_info.start_to_close_timeout))
+ return None
+
+ # Create register activity that captures timeout info
+ def create_timeout_checking_register_activity():
+ @activity.defn(name="register_for_decision_activity")
+ async def register_decision_timeout_check(expense_id: str) -> None:
+ # Check that we're called with 10 minute timeout
+ activity_info = activity.info()
+ timeout_calls.append(("wait", activity_info.start_to_close_timeout))
+ # In time-skipping mode, send the decision immediately
+ if expense_id in mock_ui.scheduled_decisions:
+ decision = mock_ui.scheduled_decisions[expense_id]
+ if expense_id in mock_ui.workflow_map:
+ workflow_id = mock_ui.workflow_map[expense_id]
+ handle = client.get_workflow_handle(workflow_id)
+ try:
+ # Send signal immediately when registering
+ await handle.signal("expense_decision_signal", decision)
+ except Exception:
+ # Ignore errors in time-skipping mode
+ pass
+ return None
+
+ return register_decision_timeout_check
+
+ async with Worker(
+ client,
+ task_queue=task_queue,
+ workflows=[SampleExpenseWorkflow],
+ activities=[
+ create_expense_timeout_check,
+ create_timeout_checking_register_activity(),
+ payment_timeout_check,
+ ],
+ ):
+ await client.execute_workflow(
+ SampleExpenseWorkflow.run,
+ expense_id,
+ id=workflow_id,
+ task_queue=task_queue,
+ )
+
+ # Verify timeout configurations
+ assert len(timeout_calls) == 3
+ create_timeout = next(
+ call[1] for call in timeout_calls if call[0] == "create"
+ )
+ wait_timeout = next(call[1] for call in timeout_calls if call[0] == "wait")
+ payment_timeout = next(
+ call[1] for call in timeout_calls if call[0] == "payment"
+ )
+
+ assert create_timeout == timedelta(seconds=10)
+ assert wait_timeout == timedelta(
+ seconds=10
+ ) # register activity timeout is 10 seconds
+ assert payment_timeout == timedelta(seconds=10)
+
+
+class TestWorkflowFromSimpleFile:
+ """Tests moved from the original simple test_workflow.py file"""
+
+ async def test_workflow_with_mock_activities(
+ self, client: Client, env: WorkflowEnvironment
+ ):
+ """Test workflow with mocked activities"""
+ task_queue = f"test-expense-{uuid.uuid4()}"
+ workflow_id = f"test-expense-workflow-{uuid.uuid4()}"
+ expense_id = "test-expense-id"
+
+ # Set up mock UI with APPROVED decision
+ mock_ui = MockExpenseUI(client)
+ mock_ui.register_workflow(expense_id, workflow_id)
+ mock_ui.schedule_decision(expense_id, "APPROVED")
+
+ # Mock the activities to return expected values
+ @activity.defn(name="create_expense_activity")
+ async def create_expense_mock(expense_id: str) -> None:
+ # Mock succeeds by returning None
+ return None
+
+ @activity.defn(name="payment_activity")
+ async def payment_mock(expense_id: str) -> None:
+ # Mock succeeds by returning None
+ return None
+
+ async with Worker(
+ client,
+ task_queue=task_queue,
+ workflows=[SampleExpenseWorkflow],
+ activities=[
+ create_expense_mock,
+ mock_ui.create_register_activity(),
+ payment_mock,
+ ],
+ ):
+ # Execute workflow
+ result = await client.execute_workflow(
+ SampleExpenseWorkflow.run,
+ expense_id,
+ id=workflow_id,
+ task_queue=task_queue,
+ )
+
+ # Verify result
+ assert result == "COMPLETED"
+
+ async def test_workflow_rejected_expense(
+ self, client: Client, env: WorkflowEnvironment
+ ):
+ """Test workflow when expense is rejected"""
+ task_queue = f"test-expense-rejected-{uuid.uuid4()}"
+ workflow_id = f"test-expense-rejected-workflow-{uuid.uuid4()}"
+ expense_id = "test-expense-id"
+
+ # Set up mock UI with REJECTED decision
+ mock_ui = MockExpenseUI(client)
+ mock_ui.register_workflow(expense_id, workflow_id)
+ mock_ui.schedule_decision(expense_id, "REJECTED")
+
+ # Mock the activities
+ @activity.defn(name="create_expense_activity")
+ async def create_expense_mock(expense_id: str) -> None:
+ # Mock succeeds by returning None
+ return None
+
+ async with Worker(
+ client,
+ task_queue=task_queue,
+ workflows=[SampleExpenseWorkflow],
+ activities=[create_expense_mock, mock_ui.create_register_activity()],
+ ):
+ # Execute workflow
+ result = await client.execute_workflow(
+ SampleExpenseWorkflow.run,
+ expense_id,
+ id=workflow_id,
+ task_queue=task_queue,
+ )
+
+ # Verify result is empty string when rejected
+ assert result == ""
diff --git a/tests/expense/test_http_client_lifecycle.py b/tests/expense/test_http_client_lifecycle.py
new file mode 100644
index 00000000..146f9911
--- /dev/null
+++ b/tests/expense/test_http_client_lifecycle.py
@@ -0,0 +1,120 @@
+#!/usr/bin/env python3
+"""
+Simple test script to verify HTTP client lifecycle management.
+"""
+
+import asyncio
+import os
+import sys
+
+# Add the project root to Python path
+sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
+
+from expense.activities import (
+ cleanup_http_client,
+ create_expense_activity,
+ get_http_client,
+ initialize_http_client,
+)
+
+
+async def test_http_client_lifecycle():
+ """Test that HTTP client lifecycle management works correctly."""
+ print("Testing HTTP client lifecycle management...")
+
+ # Test 1: Client should not be initialized initially
+ try:
+ get_http_client()
+ print("FAIL: Expected RuntimeError when client not initialized")
+ return False
+ except RuntimeError as e:
+ print(f"PASS: Got expected error when client not initialized: {e}")
+
+ # Test 2: Initialize client
+ await initialize_http_client()
+ print("PASS: HTTP client initialized")
+
+ # Test 3: Client should be available now
+ try:
+ client = get_http_client()
+ print(f"PASS: Got HTTP client: {type(client).__name__}")
+ except Exception as e:
+ print(f"FAIL: Could not get HTTP client after initialization: {e}")
+ return False
+
+ # Test 4: Test multiple initializations (should be safe)
+ await initialize_http_client()
+ client2 = get_http_client()
+ if client is client2:
+ print("PASS: Multiple initializations return same client instance")
+ else:
+ print("FAIL: Multiple initializations created different clients")
+ return False
+
+ # Test 5: Cleanup client
+ await cleanup_http_client()
+ print("PASS: HTTP client cleaned up")
+
+ # Test 6: Client should not be available after cleanup
+ try:
+ get_http_client()
+ print("FAIL: Expected RuntimeError after cleanup")
+ return False
+ except RuntimeError as e:
+ print(f"PASS: Got expected error after cleanup: {e}")
+
+ print("\nAll HTTP client lifecycle tests passed!")
+ return True
+
+
+async def test_activity_integration():
+ """Test that activities can use the HTTP client (mock test)."""
+ print("\nTesting activity integration...")
+
+ # Initialize client for activities
+ await initialize_http_client()
+
+ try:
+ # This will fail because the expense server isn't running,
+ # but it will test that the HTTP client is accessible
+ await create_expense_activity("test-expense-123")
+ print("Unexpected: Activity succeeded (expense server must be running)")
+ except Exception as e:
+ # We expect this to fail since expense server isn't running
+ if "HTTP client not initialized" in str(e):
+ print("FAIL: HTTP client not accessible in activity")
+ return False
+ else:
+ print(
+ f"PASS: Activity accessed HTTP client correctly (failed as expected due to no server): {type(e).__name__}"
+ )
+
+ # Cleanup
+ await cleanup_http_client()
+ print("PASS: Activity integration test completed")
+ return True
+
+
+async def main():
+ """Run all tests."""
+ print("=" * 60)
+ print("HTTP Client Lifecycle Management Tests")
+ print("=" * 60)
+
+ test1_passed = await test_http_client_lifecycle()
+ test2_passed = await test_activity_integration()
+
+ print("\n" + "=" * 60)
+ if test1_passed and test2_passed:
+ print(
+ "ALL TESTS PASSED! HTTP client lifecycle management is working correctly."
+ )
+ return 0
+ else:
+ print("SOME TESTS FAILED! Please check the implementation.")
+ return 1
+
+
+if __name__ == "__main__":
+ exit_code = asyncio.run(main())
+ sys.exit(exit_code)
diff --git a/tests/expense/test_ui.py b/tests/expense/test_ui.py
new file mode 100644
index 00000000..f29075b0
--- /dev/null
+++ b/tests/expense/test_ui.py
@@ -0,0 +1,377 @@
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+from fastapi.testclient import TestClient
+
+from expense.ui import ExpenseState, all_expenses, app, workflow_map
+
+
+class TestExpenseUI:
+ """Test suite for the Expense System UI based on the specification"""
+
+ def setup_method(self):
+ """Reset state before each test"""
+ all_expenses.clear()
+ workflow_map.clear()
+
+ @pytest.fixture
+ def client(self):
+ """FastAPI test client fixture"""
+ return TestClient(app)
+
+ def test_list_view_empty(self, client):
+ """Test list view with no expenses"""
+ response = client.get("/")
+ assert response.status_code == 200
+ assert "SAMPLE EXPENSE SYSTEM" in response.text
+ assert "" in response.text
+ assert "Expense ID " in response.text
+
+ def test_list_view_with_expenses(self, client):
+ """Test list view displaying expenses in sorted order"""
+ # Setup test data
+ all_expenses["EXP-003"] = ExpenseState.CREATED
+ all_expenses["EXP-001"] = ExpenseState.APPROVED
+ all_expenses["EXP-002"] = ExpenseState.REJECTED
+
+ response = client.get("/list")
+ assert response.status_code == 200
+
+ # Check sorted order in HTML
+ html = response.text
+ exp001_pos = html.find("EXP-001")
+ exp002_pos = html.find("EXP-002")
+ exp003_pos = html.find("EXP-003")
+
+ assert exp001_pos < exp002_pos < exp003_pos
+
+ def test_list_view_action_buttons_only_for_created(self, client):
+ """Test that action buttons only appear for CREATED expenses"""
+ all_expenses["created-expense"] = ExpenseState.CREATED
+ all_expenses["approved-expense"] = ExpenseState.APPROVED
+ all_expenses["rejected-expense"] = ExpenseState.REJECTED
+ all_expenses["completed-expense"] = ExpenseState.COMPLETED
+
+ response = client.get("/")
+ html = response.text
+
+ # CREATED expense should have buttons
+ assert "APPROVE" in html
+ assert "REJECT" in html
+ assert "created-expense" in html
+
+ # Count actual button elements - should only be for the CREATED expense
+ approve_count = html.count(
+ 'APPROVE '
+ )
+ reject_count = html.count(
+ 'REJECT '
+ )
+ assert approve_count == 1
+ assert reject_count == 1
+
+ def test_create_expense_success_ui(self, client):
+ """Test successful expense creation via UI"""
+ response = client.get("/create?id=new-expense")
+ assert response.status_code == 200
+ assert all_expenses["new-expense"] == ExpenseState.CREATED
+ assert "SAMPLE EXPENSE SYSTEM" in response.text # Should redirect to list
+
+ def test_create_expense_success_api(self, client):
+ """Test successful expense creation via API"""
+ response = client.get("/create?id=new-expense&is_api_call=true")
+ assert response.status_code == 200
+ assert response.text == "SUCCEED"
+ assert all_expenses["new-expense"] == ExpenseState.CREATED
+
+ def test_create_expense_duplicate_ui(self, client):
+ """Test creating duplicate expense via UI"""
+ all_expenses["existing"] = ExpenseState.CREATED
+
+ response = client.get("/create?id=existing")
+ assert response.status_code == 200
+ assert response.text == "ID already exists"
+
+ def test_create_expense_duplicate_api(self, client):
+ """Test creating duplicate expense via API"""
+ all_expenses["existing"] = ExpenseState.CREATED
+
+ response = client.get("/create?id=existing&is_api_call=true")
+ assert response.status_code == 200
+ assert response.text == "ERROR:ID_ALREADY_EXISTS"
+
+ def test_status_check_valid_id(self, client):
+ """Test status check for valid expense ID"""
+ all_expenses["test-expense"] = ExpenseState.APPROVED
+
+ response = client.get("/status?id=test-expense")
+ assert response.status_code == 200
+ assert response.text == "APPROVED"
+
+ def test_status_check_invalid_id(self, client):
+ """Test status check for invalid expense ID"""
+ response = client.get("/status?id=nonexistent")
+ assert response.status_code == 200
+ assert response.text == "ERROR:INVALID_ID"
+
+ def test_action_approve_ui(self, client):
+ """Test approve action via UI"""
+ all_expenses["test-expense"] = ExpenseState.CREATED
+
+ with patch("expense.ui.notify_expense_state_change") as mock_notify:
+ response = client.post(
+ "/action", data={"type": "approve", "id": "test-expense"}
+ )
+ assert response.status_code == 200
+ assert all_expenses["test-expense"] == ExpenseState.APPROVED
+ # Should redirect to list view
+ assert response.url.path == "/list"
+ mock_notify.assert_called_once_with("test-expense", ExpenseState.APPROVED)
+
+ def test_action_approve_api(self, client):
+ """Test approve action via API"""
+ all_expenses["test-expense"] = ExpenseState.CREATED
+
+ with patch("expense.ui.notify_expense_state_change") as mock_notify:
+ response = client.post(
+ "/action",
+ data={"type": "approve", "id": "test-expense", "is_api_call": "true"},
+ )
+ assert response.status_code == 200
+ assert response.text == "SUCCEED"
+ assert all_expenses["test-expense"] == ExpenseState.APPROVED
+ mock_notify.assert_called_once_with("test-expense", ExpenseState.APPROVED)
+
+ def test_action_reject_ui(self, client):
+ """Test reject action via UI"""
+ all_expenses["test-expense"] = ExpenseState.CREATED
+
+ with patch("expense.ui.notify_expense_state_change") as mock_notify:
+ response = client.post(
+ "/action", data={"type": "reject", "id": "test-expense"}
+ )
+ assert response.status_code == 200
+ assert all_expenses["test-expense"] == ExpenseState.REJECTED
+ # Should redirect to list view
+ assert response.url.path == "/list"
+ mock_notify.assert_called_once_with("test-expense", ExpenseState.REJECTED)
+
+ def test_action_payment(self, client):
+ """Test payment action"""
+ all_expenses["test-expense"] = ExpenseState.APPROVED
+
+ response = client.post(
+ "/action",
+ data={"type": "payment", "id": "test-expense", "is_api_call": "true"},
+ )
+ assert response.status_code == 200
+ assert response.text == "SUCCEED"
+ assert all_expenses["test-expense"] == ExpenseState.COMPLETED
+
+ def test_action_invalid_id_ui(self, client):
+ """Test action with invalid ID via UI"""
+ response = client.post("/action", data={"type": "approve", "id": "nonexistent"})
+ assert response.status_code == 200
+ assert response.text == "Invalid ID"
+
+ def test_action_invalid_id_api(self, client):
+ """Test action with invalid ID via API"""
+ response = client.post(
+ "/action",
+ data={"type": "approve", "id": "nonexistent", "is_api_call": "true"},
+ )
+ assert response.status_code == 200
+ assert response.text == "ERROR:INVALID_ID"
+
+ def test_action_invalid_type_ui(self, client):
+ """Test action with invalid type via UI"""
+ all_expenses["test-expense"] = ExpenseState.CREATED
+
+ response = client.post(
+ "/action", data={"type": "invalid", "id": "test-expense"}
+ )
+ assert response.status_code == 200
+ assert response.text == "Invalid action type"
+
+ def test_action_invalid_type_api(self, client):
+ """Test action with invalid type via API"""
+ all_expenses["test-expense"] = ExpenseState.CREATED
+
+ response = client.post(
+ "/action",
+ data={"type": "invalid", "id": "test-expense", "is_api_call": "true"},
+ )
+ assert response.status_code == 200
+ assert response.text == "ERROR:INVALID_TYPE"
+
+ def test_register_callback_success(self, client):
+ """Test successful callback registration"""
+ all_expenses["test-expense"] = ExpenseState.CREATED
+ test_token = "deadbeef"
+
+ response = client.post(
+ "/registerWorkflow?id=test-expense", data={"workflow_id": test_token}
+ )
+ assert response.status_code == 200
+ assert response.text == "SUCCEED"
+ assert workflow_map["test-expense"] == test_token
+
+ def test_register_workflow_invalid_id(self, client):
+ """Test workflow registration with invalid ID"""
+ response = client.post(
+ "/registerWorkflow?id=nonexistent", data={"workflow_id": "workflow-123"}
+ )
+ assert response.status_code == 200
+ assert response.text == "ERROR:INVALID_ID"
+
+ def test_register_workflow_invalid_state(self, client):
+ """Test workflow registration with non-CREATED expense"""
+ all_expenses["test-expense"] = ExpenseState.APPROVED
+
+ response = client.post(
+ "/registerWorkflow?id=test-expense", data={"workflow_id": "workflow-123"}
+ )
+ assert response.status_code == 200
+ assert response.text == "ERROR:INVALID_STATE"
+
+ @pytest.mark.asyncio
+ async def test_notify_expense_state_change_success(self):
+ """Test successful workflow notification"""
+ # Setup
+ expense_id = "test-expense"
+ test_workflow_id = "workflow-123"
+ workflow_map[expense_id] = test_workflow_id
+
+ # Mock workflow client and workflow handle
+ mock_handle = AsyncMock()
+ mock_client = MagicMock()
+ mock_client.get_workflow_handle.return_value = mock_handle
+
+ with patch("expense.ui.workflow_client", mock_client):
+ from expense.ui import notify_expense_state_change
+
+ await notify_expense_state_change(expense_id, "APPROVED")
+
+ mock_client.get_workflow_handle.assert_called_once_with(test_workflow_id)
+ mock_handle.signal.assert_called_once_with(
+ "expense_decision_signal", "APPROVED"
+ )
+
+ @pytest.mark.asyncio
+ async def test_notify_expense_state_change_invalid_id(self):
+ """Test workflow notification with invalid expense ID"""
+ from expense.ui import notify_expense_state_change
+
+ # Should not raise exception for invalid ID
+ await notify_expense_state_change("nonexistent", "APPROVED")
+
+ @pytest.mark.asyncio
+ async def test_notify_expense_state_change_client_error(self):
+ """Test workflow notification when client fails"""
+ expense_id = "test-expense"
+ test_workflow_id = "workflow-123"
+ workflow_map[expense_id] = test_workflow_id
+
+ mock_client = MagicMock()
+ mock_client.get_workflow_handle.side_effect = Exception("Client error")
+
+ with patch("expense.ui.workflow_client", mock_client):
+ from expense.ui import notify_expense_state_change
+
+ # Should not raise exception even if client fails
+ await notify_expense_state_change(expense_id, "APPROVED")
+
+ def test_state_transitions_complete_workflow(self, client):
+ """Test complete expense workflow state transitions"""
+ expense_id = "workflow-expense"
+
+ # 1. Create expense
+ response = client.get(f"/create?id={expense_id}&is_api_call=true")
+ assert response.text == "SUCCEED"
+ assert all_expenses[expense_id] == ExpenseState.CREATED
+
+ # 2. Register workflow
+ test_workflow_id = "workflow-123"
+ response = client.post(
+ f"/registerWorkflow?id={expense_id}", data={"workflow_id": test_workflow_id}
+ )
+ assert response.text == "SUCCEED"
+
+ # 3. Approve expense
+ with patch("expense.ui.notify_expense_state_change") as mock_notify:
+ response = client.post(
+ "/action",
+ data={"type": "approve", "id": expense_id, "is_api_call": "true"},
+ )
+ assert response.text == "SUCCEED"
+ assert all_expenses[expense_id] == ExpenseState.APPROVED
+ mock_notify.assert_called_once_with(expense_id, ExpenseState.APPROVED)
+
+ # 4. Process payment
+ response = client.post(
+ "/action", data={"type": "payment", "id": expense_id, "is_api_call": "true"}
+ )
+ assert response.text == "SUCCEED"
+ assert all_expenses[expense_id] == ExpenseState.COMPLETED
+
+ def test_html_response_structure(self, client):
+ """Test HTML response contains required elements"""
+ all_expenses["test-expense"] = ExpenseState.CREATED
+
+ response = client.get("/")
+ html = response.text
+
+ # Check required HTML elements
+ assert "SAMPLE EXPENSE SYSTEM " in html
+ assert 'HOME ' in html
+ assert "" in html
+ assert "Expense ID " in html
+ assert "Status " in html
+ assert "Action " in html
+ assert 'style="background-color:#4CAF50;"' in html # Green approve button
+ assert 'style="background-color:#f44336;"' in html # Red reject button
+
+ def test_concurrent_operations(self, client):
+ """Test handling of concurrent operations"""
+ import threading
+
+ results = []
+
+ def create_expense(expense_id):
+ try:
+ response = client.get(f"/create?id={expense_id}&is_api_call=true")
+ results.append((expense_id, response.status_code, response.text))
+ except Exception as e:
+ results.append((expense_id, "error", str(e)))
+
+ # Create multiple expenses concurrently
+ threads = []
+ for i in range(5):
+ thread = threading.Thread(target=create_expense, args=[f"concurrent-{i}"])
+ threads.append(thread)
+ thread.start()
+
+ for thread in threads:
+ thread.join()
+
+ # All should succeed
+ assert len(results) == 5
+ for expense_id, status_code, text in results:
+ assert status_code == 200
+ assert text == "SUCCEED"
+ assert expense_id in all_expenses
+
+ def test_parameter_validation(self, client):
+ """Test parameter validation for all endpoints"""
+ # Missing required parameters
+ response = client.get("/create") # Missing id
+ assert response.status_code == 422 # FastAPI validation error
+
+ response = client.post("/action") # Missing type and id
+ assert response.status_code == 422
+
+ response = client.get("/status") # Missing id
+ assert response.status_code == 422
+
+ response = client.post("/registerWorkflow") # Missing id and workflow_id
+ assert response.status_code == 422
diff --git a/uv.lock b/uv.lock
index 21e6c17e..cfe3ae21 100644
--- a/uv.lock
+++ b/uv.lock
@@ -2499,6 +2499,12 @@ encryption = [
{ name = "aiohttp" },
{ name = "cryptography" },
]
+expense = [
+ { name = "fastapi" },
+ { name = "httpx" },
+ { name = "python-multipart" },
+ { name = "uvicorn", extra = ["standard"] },
+]
gevent = [
{ name = "gevent" },
]
@@ -2559,6 +2565,12 @@ encryption = [
{ name = "aiohttp", specifier = ">=3.8.1,<4" },
{ name = "cryptography", specifier = ">=38.0.1,<39" },
]
+expense = [
+ { name = "fastapi", specifier = ">=0.115.12" },
+ { name = "httpx", specifier = ">=0.25.0,<1" },
+ { name = "python-multipart", specifier = ">=0.0.5" },
+ { name = "uvicorn", extras = ["standard"], specifier = ">=0.24.0.post1,<0.25" },
+]
gevent = [{ name = "gevent", marker = "python_full_version >= '3.8'", specifier = "==25.4.2" }]
langchain = [
{ name = "fastapi", specifier = ">=0.115.12" },