Skip to content

Feat/context windows #247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
"fastagent.secrets.yaml"
]
},
"editor.fontFamily": "BlexMono Nerd Font",
"python.testing.pytestArgs": ["tests"],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.analysis.typeCheckingMode": "standard"
"python.analysis.typeCheckingMode": "standard",
"python.analysis.nodeExecutable": "auto"
}
13 changes: 13 additions & 0 deletions src/mcp_agent/agents/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
HUMAN_INPUT_TOOL_NAME = "__human_input__"
if TYPE_CHECKING:
from mcp_agent.context import Context
from mcp_agent.llm.usage_tracking import UsageAccumulator


DEFAULT_CAPABILITIES = AgentCapabilities(
Expand Down Expand Up @@ -698,3 +699,15 @@ def message_history(self) -> List[PromptMessageMultipart]:
if self._llm:
return self._llm.message_history
return []

@property
def usage_accumulator(self) -> Optional["UsageAccumulator"]:
"""
Return the usage accumulator for tracking token usage across turns.

Returns:
UsageAccumulator object if LLM is attached, None otherwise
"""
if self._llm:
return self._llm.usage_accumulator
return None
42 changes: 41 additions & 1 deletion src/mcp_agent/core/agent_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

from deprecated import deprecated
from mcp.types import PromptMessage
from rich import print as rich_print

from mcp_agent.agents.agent import Agent
from mcp_agent.core.interactive_prompt import InteractivePrompt
from mcp_agent.mcp.prompt_message_multipart import PromptMessageMultipart
from mcp_agent.progress_display import progress_display


class AgentApp:
Expand Down Expand Up @@ -272,7 +274,12 @@ async def interactive(self, agent_name: str | None = None, default_prompt: str =

# Define the wrapper for send function
async def send_wrapper(message, agent_name):
return await self.send(message, agent_name)
result = await self.send(message, agent_name)

# Show usage info after each turn if progress display is enabled
self._show_turn_usage(agent_name)

return result

# Start the prompt loop with the agent name (not the agent object)
return await prompt.prompt_loop(
Expand All @@ -282,3 +289,36 @@ async def send_wrapper(message, agent_name):
prompt_provider=self, # Pass self as the prompt provider
default=default_prompt,
)

def _show_turn_usage(self, agent_name: str) -> None:
"""Show subtle usage information after each turn."""
agent = self._agents.get(agent_name)
if not agent or not agent.usage_accumulator:
return

# Get the last turn's usage (if any)
turns = agent.usage_accumulator.turns
if not turns:
return

last_turn = turns[-1]
input_tokens = last_turn.input_tokens
output_tokens = last_turn.output_tokens

# Build cache indicators with bright colors
cache_indicators = ""
if last_turn.cache_usage.cache_write_tokens > 0:
cache_indicators += "[bright_yellow]^[/bright_yellow]"
if last_turn.cache_usage.cache_read_tokens > 0 or last_turn.cache_usage.cache_hit_tokens > 0:
cache_indicators += "[bright_green]*[/bright_green]"

# Build context percentage - get from accumulator, not individual turn
context_info = ""
context_percentage = agent.usage_accumulator.context_usage_percentage
if context_percentage is not None:
context_info = f" ({context_percentage:.1f}%)"

# Show subtle usage line - pause progress display to ensure visibility
with progress_display.paused():
cache_suffix = f" {cache_indicators}" if cache_indicators else ""
rich_print(f"[dim]Last turn: {input_tokens:,} Input, {output_tokens:,} Output{context_info}[/dim]{cache_suffix}")
9 changes: 9 additions & 0 deletions src/mcp_agent/core/enhanced_prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(
"prompts": "List and select MCP prompts", # Changed description
"prompt": "Apply a specific prompt by name (/prompt <name>)", # New command
"agents": "List available agents",
"usage": "Show current usage statistics",
"clear": "Clear the screen",
"STOP": "Stop this prompting session and move to next workflow step",
"EXIT": "Exit fast-agent, terminating any running workflows",
Expand All @@ -67,6 +68,7 @@ def __init__(
self.commands.pop("agents")
self.commands.pop("prompts") # Remove prompts command in human input mode
self.commands.pop("prompt", None) # Remove prompt command in human input mode
self.commands.pop("usage", None) # Remove usage command in human input mode
self.agent_types = agent_types or {}

def get_completions(self, document, complete_event):
Expand Down Expand Up @@ -390,6 +392,8 @@ def pre_process_input(text):
return "CLEAR"
elif cmd == "agents":
return "LIST_AGENTS"
elif cmd == "usage":
return "SHOW_USAGE"
elif cmd == "prompts":
# Return a dictionary with select_prompt action instead of a string
# This way it will match what the command handler expects
Expand Down Expand Up @@ -566,6 +570,7 @@ async def handle_special_commands(command, agent_app=None):
rich_print(" /agents - List available agents")
rich_print(" /prompts - List and select MCP prompts")
rich_print(" /prompt <name> - Apply a specific prompt by name")
rich_print(" /usage - Show current usage statistics")
rich_print(" @agent_name - Switch to agent")
rich_print(" STOP - Return control back to the workflow")
rich_print(" EXIT - Exit fast-agent, terminating any running workflows")
Expand Down Expand Up @@ -594,6 +599,10 @@ async def handle_special_commands(command, agent_app=None):
rich_print("[yellow]No agents available[/yellow]")
return True

elif command == "SHOW_USAGE":
# Return a dictionary to signal that usage should be shown
return {"show_usage": True}

elif command == "SELECT_PROMPT" or (
isinstance(command, str) and command.startswith("SELECT_PROMPT:")
):
Expand Down
16 changes: 14 additions & 2 deletions src/mcp_agent/core/fastagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
ServerConfigError,
ServerInitializationError,
)
from mcp_agent.core.usage_display import display_usage_report
from mcp_agent.core.validation import (
validate_server_references,
validate_workflow_references,
Expand Down Expand Up @@ -392,22 +393,29 @@ def model_factory_func(model=None, request_params=None):

yield wrapper

except PromptExitError as e:
# User requested exit - not an error, show usage report
self._handle_error(e)
raise SystemExit(0)
except (
ServerConfigError,
ProviderKeyError,
AgentConfigError,
ServerInitializationError,
ModelConfigError,
CircularDependencyError,
PromptExitError,
) as e:
had_error = True
self._handle_error(e)
raise SystemExit(1)

finally:
# Clean up any active agents
# Print usage report before cleanup (show for user exits too)
if active_agents and not had_error:
self._print_usage_report(active_agents)

# Clean up any active agents (always cleanup, even on errors)
if active_agents:
for agent in active_agents.values():
try:
await agent.shutdown()
Expand Down Expand Up @@ -472,6 +480,10 @@ def _handle_error(self, e: Exception, error_type: Optional[str] = None) -> None:
else:
handle_error(e, error_type or "Error", "An unexpected error occurred.")

def _print_usage_report(self, active_agents: dict) -> None:
"""Print a formatted table of token usage for all agents."""
display_usage_report(active_agents, show_if_progress_disabled=False, subdued_colors=True)

async def start_server(
self,
transport: str = "sse",
Expand Down
72 changes: 59 additions & 13 deletions src/mcp_agent/core/interactive_prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,34 @@
get_selection_input,
handle_special_commands,
)
from mcp_agent.core.usage_display import collect_agents_from_provider, display_usage_report
from mcp_agent.mcp.mcp_aggregator import SEP # Import SEP once at the top
from mcp_agent.mcp.prompt_message_multipart import PromptMessageMultipart
from mcp_agent.progress_display import progress_display

# Type alias for the send function
SendFunc = Callable[[Union[str, PromptMessage, PromptMessageMultipart], str], Awaitable[str]]

# Type alias for the agent getter function
AgentGetter = Callable[[str], Optional[object]]


class PromptProvider(Protocol):
"""Protocol for objects that can provide prompt functionality."""

async def list_prompts(self, server_name: Optional[str] = None, agent_name: Optional[str] = None) -> Mapping[str, List[Prompt]]:

async def list_prompts(
self, server_name: Optional[str] = None, agent_name: Optional[str] = None
) -> Mapping[str, List[Prompt]]:
"""List available prompts."""
...

async def apply_prompt(self, prompt_name: str, arguments: Optional[Dict[str, str]] = None, agent_name: Optional[str] = None, **kwargs) -> str:

async def apply_prompt(
self,
prompt_name: str,
arguments: Optional[Dict[str, str]] = None,
agent_name: Optional[str] = None,
**kwargs,
) -> str:
"""Apply a prompt."""
...

Expand Down Expand Up @@ -160,17 +172,23 @@ async def prompt_loop(
await self._list_prompts(prompt_provider, agent)
else:
# Use the name-based selection
await self._select_prompt(
prompt_provider, agent, prompt_name
)
await self._select_prompt(prompt_provider, agent, prompt_name)
continue
elif "show_usage" in command_result:
# Handle usage display
await self._show_usage(prompt_provider, agent)
continue

# Skip further processing if:
# 1. The command was handled (command_result is truthy)
# 2. The original input was a dictionary (special command like /prompt)
# 3. The command result itself is a dictionary (special command handling result)
# This fixes the issue where /prompt without arguments gets sent to the LLM
if command_result or isinstance(user_input, dict) or isinstance(command_result, dict):
if (
command_result
or isinstance(user_input, dict)
or isinstance(command_result, dict)
):
continue

if user_input.upper() == "STOP":
Expand All @@ -183,7 +201,9 @@ async def prompt_loop(

return result

async def _get_all_prompts(self, prompt_provider: PromptProvider, agent_name: Optional[str] = None):
async def _get_all_prompts(
self, prompt_provider: PromptProvider, agent_name: Optional[str] = None
):
"""
Get a list of all available prompts.

Expand All @@ -196,8 +216,10 @@ async def _get_all_prompts(self, prompt_provider: PromptProvider, agent_name: Op
"""
try:
# Call list_prompts on the provider
prompt_servers = await prompt_provider.list_prompts(server_name=None, agent_name=agent_name)

prompt_servers = await prompt_provider.list_prompts(
server_name=None, agent_name=agent_name
)

all_prompts = []

# Process the returned prompt servers
Expand Down Expand Up @@ -326,9 +348,11 @@ async def _select_prompt(
try:
# Get all available prompts directly from the prompt provider
rich_print(f"\n[bold]Fetching prompts for agent [cyan]{agent_name}[/cyan]...[/bold]")

# Call list_prompts on the provider
prompt_servers = await prompt_provider.list_prompts(server_name=None, agent_name=agent_name)
prompt_servers = await prompt_provider.list_prompts(
server_name=None, agent_name=agent_name
)

if not prompt_servers:
rich_print("[yellow]No prompts available for this agent[/yellow]")
Expand Down Expand Up @@ -557,3 +581,25 @@ async def _select_prompt(

rich_print(f"[red]Error selecting or applying prompt: {e}[/red]")
rich_print(f"[dim]{traceback.format_exc()}[/dim]")

async def _show_usage(self, prompt_provider: PromptProvider, agent_name: str) -> None:
"""
Show usage statistics for the current agent(s) in a colorful table format.

Args:
prompt_provider: Provider that has access to agents
agent_name: Name of the current agent
"""
try:
# Collect all agents from the prompt provider
agents_to_show = collect_agents_from_provider(prompt_provider, agent_name)

if not agents_to_show:
rich_print("[yellow]No usage data available[/yellow]")
return

# Use the shared display utility
display_usage_report(agents_to_show, show_if_progress_disabled=True)

except Exception as e:
rich_print(f"[red]Error showing usage: {e}[/red]")
Loading
Loading