Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 85 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1515,18 +1515,22 @@ import temporalio.service
class AuthenticationPlugin(Plugin):
def __init__(self, api_key: str):
self.api_key = api_key

def init_client_plugin(self, next: Plugin) -> Plugin:
self.next_client_plugin = next
return self

def configure_client(self, config: ClientConfig) -> ClientConfig:
# Modify client configuration
config["namespace"] = "my-secure-namespace"
return super().configure_client(config)
return self.next_client_plugin.configure_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await super().connect_service_client(config)
return await self.next_client_plugin.connect_service_client(config)

# Use the plugin when connecting
client = await Client.connect(
Expand All @@ -1538,31 +1542,56 @@ client = await Client.connect(
#### Worker Plugins

Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring,
custom lifecycle management, or modifying worker settings.
custom lifecycle management, or modifying worker settings. Worker plugins can also configure replay.
They should do this in the case that they modified the worker in a way which would also need to be present
for replay to function. For instance, changing the data converter or adding workflows.

Here's an example of a worker plugin that adds custom monitoring:

```python
from temporalio.worker import Plugin, WorkerConfig, Worker
import temporalio
from contextlib import asynccontextmanager
from typing import AsyncIterator
from temporalio.worker import Plugin, WorkerConfig, Worker, ReplayerConfig, Worker, Replayer, WorkflowReplayResult
import logging

class MonitoringPlugin(Plugin):
def __init__(self):
self.logger = logging.getLogger(__name__)

def init_worker_plugin(self, next: Plugin) -> Plugin:
self.next_worker_plugin = next
return self

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Modify worker configuration
original_task_queue = config["task_queue"]
config["task_queue"] = f"monitored-{original_task_queue}"
self.logger.info(f"Worker created for task queue: {config['task_queue']}")
return super().configure_worker(config)
return self.next_worker_plugin.configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
self.logger.info("Starting worker execution")
try:
await super().run_worker(worker)
await self.next_worker_plugin.run_worker(worker)
finally:
self.logger.info("Worker execution completed")

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
return self.next_worker_plugin.configure_replayer(config)

@asynccontextmanager
async def workflow_replay(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
self.logger.info("Starting replay execution")
try:
async with self.next_worker_plugin.workflow_replay(replayer, histories) as results:
yield results
finally:
self.logger.info("Replay execution completed")

# Use the plugin when creating a worker
worker = Worker(
Expand All @@ -1577,38 +1606,65 @@ worker = Worker(
For plugins that need to work with both clients and workers, you can implement both interfaces in a single class:

```python
import temporalio
from contextlib import AbstractAsyncContextManager
from typing import AsyncIterator
from temporalio.client import Plugin as ClientPlugin, ClientConfig
from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig
from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult


class UnifiedPlugin(ClientPlugin, WorkerPlugin):
def configure_client(self, config: ClientConfig) -> ClientConfig:
# Client-side customization
config["namespace"] = "unified-namespace"
return super().configure_client(config)

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Worker-side customization
config["max_cached_workflows"] = 500
return super().configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
print("Starting unified worker")
await super().run_worker(worker)


def init_client_plugin(self, next: ClientPlugin) -> ClientPlugin:
self.next_client_plugin = next
return self

def init_worker_plugin(self, next: WorkerPlugin) -> WorkerPlugin:
self.next_worker_plugin = next
return self

def configure_client(self, config: ClientConfig) -> ClientConfig:
# Client-side customization
config["data_converter"] = pydantic_data_converter
return self.next_client_plugin.configure_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await self.next_client_plugin.connect_service_client(config)

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Worker-side customization
return self.next_worker_plugin.configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
print("Starting unified worker")
await self.next_worker_plugin.run_worker(worker)

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
config["data_converter"] = pydantic_data_converter
return config
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tconley1428 This should call self.next_worker_plugin.configure_replayer right?


async def workflow_replay(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
return self.next_worker_plugin.workflow_replay(replayer, histories)

# Create client with the unified plugin
client = await Client.connect(
"localhost:7233",
plugins=[UnifiedPlugin()]
"localhost:7233",
plugins=[UnifiedPlugin()]
)

# Worker will automatically inherit the plugin from the client
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity]
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity]
)
```

Expand All @@ -1617,7 +1673,7 @@ worker = Worker(
- Plugins are executed in reverse order (last plugin wraps the first), forming a chain of responsibility
- Client plugins that also implement worker plugin interfaces are automatically propagated to workers
- Avoid providing the same plugin to both client and worker to prevent double execution
- Plugin methods should call `super()` to maintain the plugin chain
- Plugin methods should call the plugin provided during initialization to maintain the plugin chain
- Each plugin's `name()` method returns a unique identifier for debugging purposes


Expand Down
10 changes: 6 additions & 4 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7398,6 +7398,7 @@ def name(self) -> str:
"""
return type(self).__module__ + "." + type(self).__qualname__

@abstractmethod
def init_client_plugin(self, next: Plugin) -> Plugin:
"""Initialize this plugin in the plugin chain.

Expand All @@ -7411,9 +7412,8 @@ def init_client_plugin(self, next: Plugin) -> Plugin:
Returns:
This plugin instance for method chaining.
"""
self.next_client_plugin = next
return self

@abstractmethod
def configure_client(self, config: ClientConfig) -> ClientConfig:
"""Hook called when creating a client to allow modification of configuration.

Expand All @@ -7427,8 +7427,8 @@ def configure_client(self, config: ClientConfig) -> ClientConfig:
Returns:
The modified client configuration.
"""
return self.next_client_plugin.configure_client(config)

@abstractmethod
async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
Expand All @@ -7444,10 +7444,12 @@ async def connect_service_client(
Returns:
The connected service client.
"""
return await self.next_client_plugin.connect_service_client(config)


class _RootPlugin(Plugin):
def init_client_plugin(self, next: Plugin) -> Plugin:
raise NotImplementedError()

def configure_client(self, config: ClientConfig) -> ClientConfig:
return config

Expand Down
61 changes: 55 additions & 6 deletions temporalio/contrib/openai_agents/_temporal_openai_agents.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Initialize Temporal OpenAI Agents overrides."""

from contextlib import contextmanager
from contextlib import asynccontextmanager, contextmanager
from datetime import timedelta
from typing import AsyncIterator, Callable, Optional, Union

Expand All @@ -24,7 +24,7 @@

import temporalio.client
import temporalio.worker
from temporalio.client import ClientConfig
from temporalio.client import ClientConfig, Plugin
from temporalio.contrib.openai_agents._invoke_model_activity import ModelActivity
from temporalio.contrib.openai_agents._model_parameters import ModelActivityParameters
from temporalio.contrib.openai_agents._openai_runner import TemporalOpenAIRunner
Expand All @@ -41,7 +41,13 @@
from temporalio.converter import (
DataConverter,
)
from temporalio.worker import Worker, WorkerConfig
from temporalio.worker import (
Replayer,
ReplayerConfig,
Worker,
WorkerConfig,
WorkflowReplayResult,
)


@contextmanager
Expand Down Expand Up @@ -231,6 +237,26 @@ def __init__(
self._model_params = model_params
self._model_provider = model_provider

def init_client_plugin(
self, next: temporalio.client.Plugin
) -> temporalio.client.Plugin:
"""Set the next client plugin"""
self.next_client_plugin = next
return self

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
"""No modifications to service client"""
return await self.next_client_plugin.connect_service_client(config)

def init_worker_plugin(
self, next: temporalio.worker.Plugin
) -> temporalio.worker.Plugin:
"""Set the next worker plugin"""
self.next_worker_plugin = next
return self

def configure_client(self, config: ClientConfig) -> ClientConfig:
"""Configure the Temporal client for OpenAI agents integration.

Expand All @@ -246,7 +272,7 @@ def configure_client(self, config: ClientConfig) -> ClientConfig:
config["data_converter"] = DataConverter(
payload_converter_class=_OpenAIPayloadConverter
)
return super().configure_client(config)
return self.next_client_plugin.configure_client(config)

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
"""Configure the Temporal worker for OpenAI agents integration.
Expand All @@ -268,7 +294,7 @@ def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
config["activities"] = list(config.get("activities") or []) + [
ModelActivity(self._model_provider).invoke_model_activity
]
return super().configure_worker(config)
return self.next_worker_plugin.configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
"""Run the worker with OpenAI agents temporal overrides.
Expand All @@ -281,4 +307,27 @@ async def run_worker(self, worker: Worker) -> None:
worker: The worker instance to run.
"""
with set_open_ai_agent_temporal_overrides(self._model_params):
await super().run_worker(worker)
await self.next_worker_plugin.run_worker(worker)

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
"""Configure the replayer for OpenAI Agents."""
config["interceptors"] = list(config.get("interceptors") or []) + [

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a very surprising footgun and introduces coupling, and requires that users understand a concept (Replayer) that few are familiar with. It feels like a very large chance that users will mess this up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is why we chose to require that plugin authors configure replayers and therefore become familiar with them. Regular users don't need to become familiar with replayers, but plugin authors do to ensure their plugin works right in replayer scenarios and not just client and worker scenarios.

OpenAIAgentsTracingInterceptor()
]
config["data_converter"] = DataConverter(
payload_converter_class=_OpenAIPayloadConverter
)
return config

@asynccontextmanager
async def workflow_replay(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
"""Set the OpenAI Overrides during replay"""
with set_open_ai_agent_temporal_overrides(self._model_params):
async with self.next_worker_plugin.workflow_replay(
replayer, histories
) as results:
yield results
2 changes: 1 addition & 1 deletion temporalio/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
WorkflowInterceptorClassInput,
WorkflowOutboundInterceptor,
)
from ._plugin import Plugin
from ._replayer import (
Replayer,
ReplayerConfig,
Expand All @@ -44,7 +45,6 @@
WorkflowSlotInfo,
)
from ._worker import (
Plugin,
PollerBehavior,
PollerBehaviorAutoscaling,
PollerBehaviorSimpleMaximum,
Expand Down
Loading
Loading