Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ This framework supports **Claude Code**. Support for **Cursor** is in beta.
Rule files can be forgotten or ignored completely by LLMs. Policies are unavoidable:

```python
if re.match(r'^terraform\s+apply(?:\s|$)', command):
yield PolicyDecision(action=PolicyAction.DENY, reason="terraform apply is not allowed. Use `terraform plan` instead.")
command = input_data.command.strip()

if re.match(r'^terraform\s+(fmt|plan)(?:\s|$)', command):
if command == "terraform apply":
yield PolicyDecision(
action=PolicyAction.DENY,
reason="terraform apply is not allowed. Use `terraform plan` instead."
)

if command == "terraform plan":
yield PolicyDecision(action=PolicyAction.ALLOW)
```

Expand All @@ -43,10 +48,9 @@ In the image below, an agent makes a change which includes a comment that is det

## Usage

At DevLeaps we developed an internal policy set for AI Agents. To create your own, refer to the [example server](https://github.com/Devleaps/agent-policies/blob/main/devleaps/policies/example/main.py) as a starting point The example server contains:
- A basic server setup demonstrating the use of policies and middleware.
- Rudimentary policies showcasing how to automatically deny, allow and provide guidance.
- Rudimentary middleware showcasing how multi-command tool use could be handled.
At DevLeaps we developed an internal policy set for AI Agents. To create your own, refer to the [example server](https://github.com/Devleaps/agent-policies/blob/main/devleaps/policies/example/main.py) as a starting point. The example server contains:
- A basic server setup demonstrating the use of policies
- Simple policies using exact matching (`command == "terraform apply"`)

**To run the example server:**
```bash
Expand All @@ -55,6 +59,8 @@ devleaps-policy-example-server

This starts a minimal server running just our example policies.

**Note:** The example server uses simple string matching for demonstration purposes. For production use cases requiring sophisticated command parsing (analyzing arguments, flags, options) or declarative policy languages (OPA/Rego), you'll need to implement your own parsing logic. Alternatively, visit [DevLeaps](https://devleaps.nl) for production-ready policies built on shell language parsing and OPA-compatible policy evaluation.

## Architecture

```mermaid
Expand Down
65 changes: 16 additions & 49 deletions src/devleaps/policies/example/main.py
Original file line number Diff line number Diff line change
@@ -1,80 +1,47 @@
#!/usr/bin/env python3
import re
from dataclasses import replace
import uvicorn

from devleaps.policies.server.common.models import (
PolicyAction,
PolicyDecision,
PolicyGuidance,
PostToolUseEvent,
ToolUseEvent,
)
from devleaps.policies.server.server import app, get_registry


def bash_split_middleware(input_data: ToolUseEvent):
if not input_data.tool_is_bash or not input_data.command:
yield input_data
return

if ' && ' in input_data.command:
for cmd in input_data.command.split(' && '):
if cmd.strip():
yield replace(input_data, command=cmd.strip())
else:
yield input_data

def terraform_rule(input_data: ToolUseEvent):
"""Example policy: Block terraform apply, allow terraform plan."""
if not input_data.tool_is_bash:
return

command = input_data.command.strip()

if re.match(r'^terraform\s+apply(?:\s|$)', command):
yield PolicyDecision(action=PolicyAction.DENY, reason="terraform apply is not allowed. Use `terraform plan` instead.")
if command == "terraform apply":
yield PolicyDecision(
action=PolicyAction.DENY,
reason="terraform apply is not allowed. Use `terraform plan` instead."
)

if re.match(r'^terraform\s+(fmt|plan)(?:\s|$)', command):
if command == "terraform plan":
yield PolicyDecision(action=PolicyAction.ALLOW)


def python_test_file_post_guidance_rule(input_data: PostToolUseEvent):
"""Provide guidance after running tests with python directly."""
if not input_data.tool_is_bash or not input_data.command:
return

if re.match(r'python3?\s+.*test_.*\.py', input_data.command.strip()):
yield PolicyGuidance(content="Consider using pytest instead of running test files directly. Use `pytest` for better test discovery and reporting.")


def uv_bundle_rule(input_data: ToolUseEvent):
def pip_rule(input_data: ToolUseEvent):
"""Example policy: Block pip install."""
if not input_data.tool_is_bash:
return

command = input_data.command.strip()

# Block pip commands, suggest uv
if re.match(r'^pip(?:\s|$)', command):
yield PolicyDecision(action=PolicyAction.DENY, reason="Use `uv` instead of pip for package management.")
yield PolicyGuidance(content="Replace `pip install` with `uv pip install` or `uv add` for dependency management.")
return

# Block python -m pip, suggest uv
if re.match(r'^python3?\s+-m\s+pip(?:\s|$)', command):
yield PolicyDecision(action=PolicyAction.DENY, reason="Use `uv` instead of python -m pip.")
return

# Block direct python usage for scripts, suggest uv run
if re.match(r'^python3?\s+[^-]', command) and not re.match(r'^python3?\s+.*test', command):
yield PolicyDecision(action=PolicyAction.DENY, reason="Use `uv run python` to execute scripts.")
return
if command == "pip install":
yield PolicyDecision(
action=PolicyAction.DENY,
reason="pip install is not allowed."
)


if __name__ == "__main__":
import uvicorn
registry = get_registry()
registry.register_middleware(ToolUseEvent, bash_split_middleware)
registry.register_handler(ToolUseEvent, terraform_rule)
registry.register_handler(PostToolUseEvent, python_test_file_post_guidance_rule)
# Register the uv bundle rule
registry.register_handler(ToolUseEvent, uv_bundle_rule, bundle="uv")
registry.register_handler(ToolUseEvent, pip_rule)
uvicorn.run(app, host="0.0.0.0", port=8338, log_level="info")
95 changes: 25 additions & 70 deletions src/devleaps/policies/server/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,86 +9,41 @@

def execute_handlers_generic(input_data, enabled_bundles: List[str] = []) -> List[Union[PolicyDecision, PolicyGuidance]]:
"""
Execute middleware and handlers with generic event input and return generic results.
Execute handlers with generic event input and return generic results.

This is the core policy execution pipeline:
1. Process input through middleware (can transform/split inputs)
2. Execute all registered handlers
3. Return raw policy results (decisions and guidance)
1. Execute all registered handlers
2. Return raw policy results (decisions and guidance)

Args:
input_data: The input event data
enabled_bundles: List of enabled bundle names, or None for all bundles

Aggregation of results is done by the mapper layer for each editor.
"""
processed_inputs = _process_middleware_pipeline(input_data, enabled_bundles)
all_results = _execute_all_handlers(processed_inputs, enabled_bundles)
return all_results


def _process_middleware_pipeline(input_data, enabled_bundles: List[str] = []):
"""Process input through middleware pipeline, filtered by enabled bundles."""
current_inputs = [input_data]
middleware_functions = registry.get_middleware(type(input_data), enabled_bundles)

for middleware in middleware_functions:
next_inputs = []
for input_item in current_inputs:
try:
yielded_inputs = list(middleware(input_item))
next_inputs.extend(yielded_inputs)
logger.debug(
f"Middleware {middleware.__name__} yielded {len(yielded_inputs)} inputs",
extra={
"middleware": middleware.__name__,
"input_count": len(yielded_inputs),
}
)
except Exception as e:
logger.error(
f"Error in middleware {middleware.__name__}: {e}",
extra={
"middleware": middleware.__name__,
"error": str(e),
},
exc_info=True
)
next_inputs.append(input_item)
current_inputs = next_inputs

return current_inputs


def _execute_all_handlers(processed_inputs, enabled_bundles: List[str] = []):
"""Execute all handlers on processed inputs and collect results, filtered by enabled bundles."""
if not processed_inputs:
return []

handlers = registry.get_handlers(type(processed_inputs[0]), enabled_bundles)
handlers = registry.get_handlers(type(input_data), enabled_bundles)
all_results = []

for processed_input in processed_inputs:
for handler in handlers:
try:
yielded_results = list(handler(processed_input))
all_results.extend(yielded_results)
logger.debug(
f"Handler {handler.__name__} yielded {len(yielded_results)} results",
extra={
"handler": handler.__name__,
"result_count": len(yielded_results),
}
)
except Exception as e:
logger.error(
f"Error in handler {handler.__name__}: {e}",
extra={
"handler": handler.__name__,
"error": str(e),
},
exc_info=True
)
continue
for handler in handlers:
try:
yielded_results = list(handler(input_data))
all_results.extend(yielded_results)
logger.debug(
f"Handler {handler.__name__} yielded {len(yielded_results)} results",
extra={
"handler": handler.__name__,
"result_count": len(yielded_results),
}
)
except Exception as e:
logger.error(
f"Error in handler {handler.__name__}: {e}",
extra={
"handler": handler.__name__,
"error": str(e),
},
exc_info=True
)
continue

return all_results
34 changes: 1 addition & 33 deletions src/devleaps/policies/server/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@
InputType = TypeVar('InputType')
OutputType = TypeVar('OutputType', PolicyDecision, PolicyGuidance)

MiddlewareFunction = Callable[[InputType], Generator[InputType, None, None]]
HandlerFunction = Callable[[InputType], Generator[Union[PolicyDecision, PolicyGuidance], None, None]]


class HookRegistry:
"""Generic registry for hook handlers and middleware using class types as keys."""
"""Generic registry for hook handlers using class types as keys."""

def __init__(self):
# Store handlers with their bundle association: {input_class: [(handler, bundle_name_or_none), ...]}
self.handlers: Dict[Type[InputType], List[Tuple[HandlerFunction, Optional[str]]]] = {}
self.middleware: Dict[Type[InputType], List[Tuple[MiddlewareFunction, Optional[str]]]] = {}

def register_handler(self, input_class: Type[InputType], handler: HandlerFunction, bundle: Optional[str] = None):
"""Register a handler for a specific input class type with optional bundle association."""
Expand All @@ -35,21 +33,6 @@ def register_handler(self, input_class: Type[InputType], handler: HandlerFunctio
}
)

def register_middleware(self, input_class: Type[InputType], middleware: MiddlewareFunction, bundle: Optional[str] = None):
"""Register middleware for a specific input class type with optional bundle association."""
if input_class not in self.middleware:
self.middleware[input_class] = []

self.middleware[input_class].append((middleware, bundle))
logger.debug(
f"Registered middleware: {middleware.__name__}" + (f" (bundle: {bundle})" if bundle else ""),
extra={
"input_class": input_class.__name__,
"middleware": middleware.__name__,
"bundle": bundle,
}
)

def get_handlers(self, input_class: Type[InputType], enabled_bundles: List[str] = []) -> List[HandlerFunction]:
"""Get handlers for input class, filtered by enabled bundles. Universal policies always included."""
all_handlers = self.handlers.get(input_class, [])
Expand All @@ -60,21 +43,6 @@ def get_handlers(self, input_class: Type[InputType], enabled_bundles: List[str]
if bundle is None or bundle in enabled_set
]

def get_middleware(self, input_class: Type[InputType], enabled_bundles: List[str] = []) -> List[MiddlewareFunction]:
"""Get middleware for input class, filtered by enabled bundles. Universal middleware always included."""
all_middleware = self.middleware.get(input_class, [])
enabled_set = set(enabled_bundles)

return [
mw for mw, bundle in all_middleware
if bundle is None or bundle in enabled_set
]

def register_all_middleware(self, input_class: Type[InputType], middleware_list: List[MiddlewareFunction], bundle: Optional[str] = None):
"""Register multiple middleware functions at once with optional bundle association."""
for middleware in middleware_list:
self.register_middleware(input_class, middleware, bundle)

def register_all_handlers(self, input_class: Type[InputType], handler_list: List[HandlerFunction], bundle: Optional[str] = None):
"""Register multiple handlers at once with optional bundle association."""
for handler in handler_list:
Expand Down
33 changes: 5 additions & 28 deletions tests/test_claude_code_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import pytest

from devleaps.policies.example.main import pip_rule, terraform_rule
from devleaps.policies.server.common.enums import SourceClient
from devleaps.policies.server.common.models import (
PolicyAction,
Expand All @@ -21,16 +22,9 @@
@pytest.fixture(scope="session", autouse=True)
def setup_example_policies():
"""Setup example policies before running tests."""
from devleaps.policies.example.main import (
bash_split_middleware,
terraform_rule,
python_test_file_post_guidance_rule,
)

registry = get_registry()
registry.register_middleware(ToolUseEvent, bash_split_middleware)
registry.register_handler(ToolUseEvent, terraform_rule)
registry.register_handler(PostToolUseEvent, python_test_file_post_guidance_rule)
registry.register_handler(ToolUseEvent, pip_rule)


def create_tool_use_event(command: str, tool_name: str = "Bash") -> ToolUseEvent:
Expand Down Expand Up @@ -71,10 +65,6 @@ def test_terraform_plan_allowed(self):
"""Test terraform plan is allowed."""
check_policy("terraform plan", PolicyAction.ALLOW)

def test_terraform_fmt_allowed(self):
"""Test terraform fmt is allowed."""
check_policy("terraform fmt", PolicyAction.ALLOW)


class TestClaudeCodeBlockedCommands:
"""Test commands that should be blocked."""
Expand All @@ -83,22 +73,9 @@ def test_terraform_apply_denied(self):
"""Test terraform apply is denied."""
check_policy("terraform apply", PolicyAction.DENY)

def test_terraform_apply_with_args_denied(self):
"""Test terraform apply with arguments is denied."""
check_policy("terraform apply -auto-approve", PolicyAction.DENY)


class TestClaudeCodeCommandSplitting:
"""Test bash command splitting middleware."""

def test_chained_commands_all_allowed(self):
"""Test chained commands where all are allowed."""
check_policy("terraform plan && terraform fmt", PolicyAction.ALLOW)

def test_chained_commands_one_denied(self):
"""Test chained commands where one is denied."""
check_policy("terraform plan && terraform apply", PolicyAction.DENY)
check_policy("terraform apply && terraform plan", PolicyAction.DENY)
def test_pip_install_denied(self):
"""Test pip install is denied."""
check_policy("pip install", PolicyAction.DENY)


class TestClaudeCodeNonBashTools:
Expand Down
Loading
Loading