diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..b332f39 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,22 @@ +// For format details, see https://aka.ms/devcontainer.json. For config options, see the +// README at: https://github.com/devcontainers/templates/tree/main/src/ubuntu +{ + "name": "Ubuntu", + // Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile + "image": "mcr.microsoft.com/devcontainers/base:jammy" + + // Features to add to the dev container. More info: https://containers.dev/features. + // "features": {}, + + // Use 'forwardPorts' to make a list of ports inside the container available locally. + // "forwardPorts": [], + + // Use 'postCreateCommand' to run commands after the container is created. + // "postCreateCommand": "uname -a", + + // Configure tool-specific properties. + // "customizations": {}, + + // Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root. + // "remoteUser": "root" +} diff --git a/README.md b/README.md index f9885b2..ab0c217 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ SimpleAgent is designed with the belief that AI agents don't need to be complex - **Easy to Extend**: Add new capabilities by creating new command modules - **Change Summarization**: Automatically summarizes changes made using a cheaper GPT model - **Modular Architecture**: Core components are separated into their own modules +- **Benchmarking System**: Comprehensive testing framework to verify command functionality ## Project Structure @@ -34,8 +35,14 @@ SimpleAgent/ │ │ ├── write_file/ │ │ └── ... │ └── ... # Other command categories + ├── benchmark/ # Benchmark tests + │ ├── __init__.py # Benchmark package initialization + │ ├── test_framework.py # Test discovery and execution framework + │ ├── test_file_ops.py # Tests for file operations + │ └── ... # Tests for other command categories ├── output/ # Generated files and input files directory ├── SimpleAgent.py # Main entry point + ├── status.md # Command status report ├── requirements.txt # Dependencies └── .env # Environment variables (create from .env.example) ``` @@ -133,6 +140,25 @@ python SimpleAgent.py -a 10 "research and look into https://github.com/PyGithub/ python SimpleAgent.py -a 10 "please research the latest in stock and look at the top 10 stock prices and write them to a file called 'stock_prices.txt'" ``` +## Running Benchmarks + +SimpleAgent includes a comprehensive benchmark system to verify that all commands are working correctly. + +To run the benchmark tests: + +``` +python SimpleAgent.py --benchmark +``` + +This will test all available commands and generate a status report in `status.md`. + +To view the status report without running the tests: + +``` +python SimpleAgent.py --status +``` + +For more information about benchmarks, see the [benchmark README](SimpleAgent/benchmark/README.md). ## Adding New Commands diff --git a/SimpleAgent/SimpleAgent.py b/SimpleAgent/SimpleAgent.py index 01caaa7..93ded5d 100644 --- a/SimpleAgent/SimpleAgent.py +++ b/SimpleAgent/SimpleAgent.py @@ -21,6 +21,13 @@ from core.agent import SimpleAgent from core.config import OPENAI_API_KEY, MAX_STEPS +# Import benchmark modules +try: + from benchmark.test_framework import discover_and_run_tests, generate_status_markdown, save_status_file + BENCHMARK_AVAILABLE = True +except ImportError: + BENCHMARK_AVAILABLE = False + # Initialize commands commands.init() @@ -39,11 +46,54 @@ def main(): help='Auto-continue for N steps (default: 10 if no number provided)') parser.add_argument('-m', '--max-steps', type=int, default=10, help='Maximum number of steps to run (default: 10)') - parser.add_argument('instruction', nargs='+', help='The instruction for the AI agent') + parser.add_argument('-b', '--benchmark', action='store_true', + help='Run benchmark tests for all commands') + parser.add_argument('-s', '--status', action='store_true', + help='Generate a status report for all commands without running tests') + parser.add_argument('-o', '--output', default=None, + help='Output file path for status.md (default: SimpleAgent/status.md)') + parser.add_argument('instruction', nargs='*', help='The instruction for the AI agent') # Parse arguments args = parser.parse_args() + # Run benchmarks if requested + if args.benchmark: + if not BENCHMARK_AVAILABLE: + print("Error: Benchmark module not available. Please install it first.") + return 1 + + print("Running benchmark tests for all commands...") + results = discover_and_run_tests() + status_md = generate_status_markdown(results) + output_path = save_status_file(status_md, args.output) + print(f"Benchmark tests completed! Status file saved to: {output_path}") + return 0 + + # Generate status report if requested + if args.status: + if not BENCHMARK_AVAILABLE: + print("Error: Benchmark module not available. Please install it first.") + return 1 + + status_path = args.output or os.path.join(os.path.dirname(__file__), 'status.md') + if os.path.exists(status_path): + print(f"Status report is available at: {status_path}") + with open(status_path, 'r', encoding='utf-8') as f: + # Print summary section + for line in f: + print(line.strip()) + if line.strip() == "## Command Status by Category": + break + else: + print(f"Status report not found. Run with --benchmark to generate it.") + return 0 + + # Ensure instruction is provided if not running benchmarks or status + if not args.instruction: + parser.print_help() + return 1 + # Join the instruction parts back together instruction = ' '.join(args.instruction) @@ -53,7 +103,9 @@ def main(): # Initialize and run the agent agent = SimpleAgent() agent.run(instruction, max_steps=max_steps, auto_continue=args.auto) + + return 0 if __name__ == "__main__": - main() \ No newline at end of file + sys.exit(main()) \ No newline at end of file diff --git a/SimpleAgent/benchmark/README.md b/SimpleAgent/benchmark/README.md new file mode 100644 index 0000000..d284d23 --- /dev/null +++ b/SimpleAgent/benchmark/README.md @@ -0,0 +1,87 @@ +# SimpleAgent Benchmark Tests + +This directory contains benchmark tests for all SimpleAgent commands to ensure they work correctly. The tests are designed to be run periodically to verify that all functionality is working as expected. + +## Running Benchmark Tests + +There are several ways to run the benchmark tests: + +### 1. Using SimpleAgent.py + +The simplest way is to use the `--benchmark` flag with SimpleAgent.py: + +```bash +python SimpleAgent.py --benchmark +``` + +This will run all tests and generate a status.md file with the results. + +### 2. Using the Benchmark Runner + +You can also use the dedicated benchmark runner script: + +```bash +./benchmark/run_all.py +``` + +### 3. Using Individual Test Modules + +You can run individual test modules using Python: + +```bash +python -m benchmark.test_file_ops +python -m benchmark.test_web_ops +python -m benchmark.test_data_ops +python -m benchmark.test_github_ops +python -m benchmark.test_agent +``` + +## Viewing Test Results + +After running the tests, you can view the results in the generated status.md file. This file contains a summary of all tests and their status. + +You can also display a summary of the results using the `--status` flag: + +```bash +python SimpleAgent.py --status +``` + +## Adding New Tests + +To add tests for a new command, create a test function in the appropriate test module. The test function should: + +1. Be named `test_commandname` +2. Return a tuple of (success, message) +3. Handle exceptions gracefully + +Example: + +```python +def test_new_command() -> Tuple[bool, str]: + """Test the new_command command.""" + try: + # Test code here + result = new_command(param1, param2) + + # Verify the result + if not result: + return False, "Command failed" + + return True, "Command successful" + except Exception as e: + return False, f"Exception: {str(e)}" +``` + +## Test Environment + +Tests run in a dedicated test environment with: + +- A clean test directory (`TEST_OUTPUT_DIR`) +- Mocked network calls to avoid actual web requests +- Mocked GitHub API calls to avoid actual GitHub operations + +This ensures tests can run without external dependencies or side effects. + +## Failed Tests + +If a test fails, it will be marked as "Failed" in the status.md file. You can fix the issue and run the tests again to update the status. \ No newline at end of file diff --git a/SimpleAgent/benchmark/__init__.py b/SimpleAgent/benchmark/__init__.py new file mode 100644 index 0000000..d495223 --- /dev/null +++ b/SimpleAgent/benchmark/__init__.py @@ -0,0 +1,6 @@ +""" +Benchmark package for SimpleAgent. + +This package contains benchmark tests for all SimpleAgent commands to ensure +they work correctly. It also generates a status.md file for tracking command status. +""" \ No newline at end of file diff --git a/SimpleAgent/benchmark/run_all.py b/SimpleAgent/benchmark/run_all.py new file mode 100755 index 0000000..0971d65 --- /dev/null +++ b/SimpleAgent/benchmark/run_all.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +""" +Run all SimpleAgent benchmark tests. + +This script is a simple shortcut to run benchmark tests for all SimpleAgent commands +and generate a status.md file with the results. +""" + +import os +import sys + +# Add parent directory to sys.path +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +# Import test framework +from benchmark.test_framework import ( + discover_and_run_tests, + generate_status_markdown, + save_status_file +) + +if __name__ == "__main__": + print("Running SimpleAgent benchmark tests...") + + # Run all tests + results = discover_and_run_tests() + + # Generate status markdown + status_md = generate_status_markdown(results) + + # Save status file + output_path = save_status_file(status_md) + + print(f"\nBenchmark tests completed!") + print(f"Status file saved to: {output_path}") + + # Print summary + total = 0 + working = 0 + failed = 0 + not_tested = 0 + + for category, commands in results.items(): + for cmd, result in commands.items(): + total += 1 + if result['status'] == 'Working': + working += 1 + elif result['status'] == 'Failed': + failed += 1 + else: + not_tested += 1 + + print(f"\nSummary:") + print(f"- Total Commands: {total}") + if total > 0: + print(f"- Working: {working} ({working/total*100:.1f}%)") + print(f"- Failed: {failed} ({failed/total*100:.1f}%)") + print(f"- Not Tested: {not_tested} ({not_tested/total*100:.1f}%)") + else: + print("- No commands found to test") \ No newline at end of file diff --git a/SimpleAgent/benchmark/run_benchmarks.py b/SimpleAgent/benchmark/run_benchmarks.py new file mode 100644 index 0000000..21f7c5b --- /dev/null +++ b/SimpleAgent/benchmark/run_benchmarks.py @@ -0,0 +1,52 @@ +""" +Run benchmarks for SimpleAgent commands. + +This script runs tests for all SimpleAgent commands and +generates a status.md file with the results. +""" + +import os +import sys +import argparse + +from benchmark.test_framework import ( + discover_and_run_tests, + generate_status_markdown, + save_status_file +) + +def main(): + parser = argparse.ArgumentParser(description='Run benchmark tests for SimpleAgent commands.') + parser.add_argument('--output', '-o', default=None, + help='Output file path for status.md (default: SimpleAgent/status.md)') + parser.add_argument('--verbose', '-v', action='store_true', + help='Enable verbose output') + + args = parser.parse_args() + + # Ensure we can import SimpleAgent modules + sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + + print("Starting SimpleAgent benchmark tests...") + + try: + # Run all tests + results = discover_and_run_tests() + + # Generate status markdown + status_md = generate_status_markdown(results) + + # Save status file + output_path = save_status_file(status_md, args.output) + + print(f"\nBenchmark tests completed!") + print(f"Status file saved to: {output_path}") + + return 0 + + except Exception as e: + print(f"Error running benchmark tests: {e}") + return 1 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/SimpleAgent/benchmark/test_agent.py b/SimpleAgent/benchmark/test_agent.py new file mode 100644 index 0000000..febaf91 --- /dev/null +++ b/SimpleAgent/benchmark/test_agent.py @@ -0,0 +1,99 @@ +""" +Tests for the main SimpleAgent class functionality. + +This module contains benchmark tests for the SimpleAgent class itself. +""" + +import os +from typing import Tuple, Dict, Any +import unittest.mock as mock + +from benchmark.test_framework import TEST_OUTPUT_DIR +from core.agent import SimpleAgent +from core.config import DEFAULT_MODEL + +def test_agent_initialization() -> Tuple[bool, str]: + """Test the SimpleAgent initialization.""" + try: + # Create agent with default model + agent = SimpleAgent() + + # Verify that components are initialized + if not agent.model: + return False, "Agent model not initialized" + + if not agent.output_dir: + return False, "Agent output_dir not initialized" + + if not agent.conversation_manager: + return False, "Agent conversation_manager not initialized" + + if not agent.execution_manager: + return False, "Agent execution_manager not initialized" + + if not agent.memory_manager: + return False, "Agent memory_manager not initialized" + + # Verify model is set correctly + if agent.model != DEFAULT_MODEL: + return False, f"Agent model mismatch: {agent.model} != {DEFAULT_MODEL}" + + return True, "Agent initialized successfully" + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_agent_run() -> Tuple[bool, str]: + """Test the SimpleAgent run method with mocked execution.""" + try: + # Create agent + agent = SimpleAgent() + + # Mock run_manager.run method to avoid actual execution + agent.run_manager.run = mock.Mock() + + # Call run method + agent.run("Test instruction", max_steps=5, auto_continue=2) + + # Verify run was called with correct arguments + agent.run_manager.run.assert_called_once_with( + user_instruction="Test instruction", + max_steps=5, + auto_continue=2 + ) + + return True, "Agent run method called successfully" + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_agent_memory() -> Tuple[bool, str]: + """Test the SimpleAgent memory operations.""" + try: + # Create agent + agent = SimpleAgent() + + # Mock memory manager methods to avoid file operations + agent.memory_manager._load_memory = mock.Mock(return_value={"test_key": "test_value"}) + agent.memory_manager.save_memory = mock.Mock() + + # Test loading memory + memory = agent.load_memory() + + # Verify memory was loaded + if not memory or not isinstance(memory, dict): + return False, f"Expected dict memory, got: {type(memory)}" + + if memory.get("test_key") != "test_value": + return False, f"Memory content mismatch: {memory}" + + # Verify memory manager method was called + agent.memory_manager._load_memory.assert_called_once() + + # Test saving memory + agent.save_memory() + + # Verify memory manager method was called + agent.memory_manager.save_memory.assert_called_once() + + return True, "Agent memory operations successful" + except Exception as e: + return False, f"Exception: {str(e)}" \ No newline at end of file diff --git a/SimpleAgent/benchmark/test_data_ops.py b/SimpleAgent/benchmark/test_data_ops.py new file mode 100644 index 0000000..55ac80f --- /dev/null +++ b/SimpleAgent/benchmark/test_data_ops.py @@ -0,0 +1,65 @@ +""" +Tests for data operation commands. + +This module contains benchmark tests for all data operation commands. +""" + +import os +from typing import Tuple, Dict, Any + +from benchmark.test_framework import TEST_OUTPUT_DIR + +# Import data operation functions +try: + from commands.data_ops.text_analysis import text_analysis + + # Flag to track if imports succeeded + DATA_OPS_AVAILABLE = True +except ImportError as e: + print(f"Warning: Could not import data_ops modules: {e}") + DATA_OPS_AVAILABLE = False + +# Test data +TEST_TEXT = """ +SimpleAgent is a minimalist AI agent framework. It can help you perform various +tasks through function calling. The agent uses the OpenAI API to generate responses +and execute functions based on user instructions. + +Key features include: +1. File operations for reading, writing, and editing files +2. Web operations for searching, scraping, and API requests +3. GitHub operations for repo and issue management +4. Data analysis for text processing +""" + +def test_text_analysis() -> Tuple[bool, str]: + """Test the text_analysis command.""" + if not DATA_OPS_AVAILABLE: + return False, "data_ops modules not available" + + try: + # Call the text_analysis function + result = text_analysis(TEST_TEXT) + + # Verify result structure + if not isinstance(result, dict): + return False, f"Expected dictionary result, got: {type(result)}" + + # Check for expected keys + expected_keys = ["summary", "entities", "keywords", "sentiment"] + for key in expected_keys: + if key not in result: + return False, f"Missing expected key '{key}' in result: {result.keys()}" + + # Check that summary is not empty + if not result["summary"] or len(result["summary"]) < 10: + return False, f"Expected non-empty summary, got: {result['summary']}" + + # Check that keywords are a list + if not isinstance(result["keywords"], list) or not result["keywords"]: + return False, f"Expected non-empty keywords list, got: {result['keywords']}" + + return True, "Text analysis test passed" + + except Exception as e: + return False, f"Exception: {str(e)}" \ No newline at end of file diff --git a/SimpleAgent/benchmark/test_file_ops.py b/SimpleAgent/benchmark/test_file_ops.py new file mode 100644 index 0000000..184c211 --- /dev/null +++ b/SimpleAgent/benchmark/test_file_ops.py @@ -0,0 +1,289 @@ +""" +Tests for file operation commands. + +This module contains benchmark tests for all file operation commands. +""" + +import os +import json +import tempfile +from typing import Tuple, Dict, Any + +from benchmark.test_framework import TEST_OUTPUT_DIR +from commands.file_ops.read_file import read_file +from commands.file_ops.write_file import write_file +from commands.file_ops.edit_file import edit_file +from commands.file_ops.append_file import append_file +from commands.file_ops.delete_file import delete_file +from commands.file_ops.create_directory import create_directory +from commands.file_ops.list_directory import list_directory +from commands.file_ops.file_exists import file_exists +from commands.file_ops.save_json import save_json +from commands.file_ops.load_json import load_json +from commands.file_ops.advanced_edit_file import advanced_edit_file + +# Test files +TEST_FILE = os.path.join(TEST_OUTPUT_DIR, 'test_file.txt') +TEST_JSON = os.path.join(TEST_OUTPUT_DIR, 'test_data.json') +TEST_DIR = os.path.join(TEST_OUTPUT_DIR, 'test_dir') +TEST_EDIT_FILE = os.path.join(TEST_OUTPUT_DIR, 'test_edit.txt') +TEST_APPEND_FILE = os.path.join(TEST_OUTPUT_DIR, 'test_append.txt') + +def test_write_file() -> Tuple[bool, str]: + """Test the write_file command.""" + try: + content = "This is a test file.\nIt has multiple lines.\nCreated for testing." + result = write_file(TEST_FILE, content) + + # Verify the file was created + if not os.path.exists(TEST_FILE): + return False, "File was not created" + + # Verify the content was written correctly + with open(TEST_FILE, 'r', encoding='utf-8') as f: + file_content = f.read() + + if file_content != content: + return False, f"Content mismatch: {file_content} != {content}" + + return True, "File written successfully" + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_read_file() -> Tuple[bool, str]: + """Test the read_file command.""" + try: + # First ensure there's a file to read + content = "Test content for read_file test." + with open(TEST_FILE, 'w', encoding='utf-8') as f: + f.write(content) + + # Read the file + result = read_file(TEST_FILE) + + # Verify + if result != content: + return False, f"Content mismatch: {result} != {content}" + + return True, "File read successfully" + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_edit_file() -> Tuple[bool, str]: + """Test the edit_file command.""" + try: + # Create a file to edit + original_content = "Line 1\nLine 2\nLine 3\n" + with open(TEST_EDIT_FILE, 'w', encoding='utf-8') as f: + f.write(original_content) + + # Edit the file + new_content = "Line 1\nEdited Line 2\nLine 3\n" + result = edit_file(TEST_EDIT_FILE, new_content) + + # Verify + with open(TEST_EDIT_FILE, 'r', encoding='utf-8') as f: + file_content = f.read() + + if file_content != new_content: + return False, f"Content mismatch after edit: {file_content} != {new_content}" + + return True, "File edited successfully" + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_append_file() -> Tuple[bool, str]: + """Test the append_file command.""" + try: + # Create a file to append to + original_content = "First line.\n" + with open(TEST_APPEND_FILE, 'w', encoding='utf-8') as f: + f.write(original_content) + + # Append to the file + append_content = "Appended line.\n" + result = append_file(TEST_APPEND_FILE, append_content) + + # Verify + expected_content = original_content + append_content + with open(TEST_APPEND_FILE, 'r', encoding='utf-8') as f: + file_content = f.read() + + if file_content != expected_content: + return False, f"Content mismatch after append: {file_content} != {expected_content}" + + return True, "Content appended successfully" + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_delete_file() -> Tuple[bool, str]: + """Test the delete_file command.""" + try: + # Create a file to delete + with open(TEST_FILE, 'w', encoding='utf-8') as f: + f.write("This file will be deleted.") + + # Verify it exists + if not os.path.exists(TEST_FILE): + return False, "Could not create file for deletion test" + + # Delete the file + result = delete_file(TEST_FILE) + + # Verify deletion + if os.path.exists(TEST_FILE): + return False, "File was not deleted" + + return True, "File deleted successfully" + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_create_directory() -> Tuple[bool, str]: + """Test the create_directory command.""" + try: + # Delete the directory if it already exists + if os.path.exists(TEST_DIR): + os.rmdir(TEST_DIR) + + # Create directory + result = create_directory(TEST_DIR) + + # Verify + if not os.path.exists(TEST_DIR) or not os.path.isdir(TEST_DIR): + return False, "Directory was not created" + + return True, "Directory created successfully" + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_list_directory() -> Tuple[bool, str]: + """Test the list_directory command.""" + try: + # Create a directory with known content + if not os.path.exists(TEST_DIR): + os.makedirs(TEST_DIR) + + # Create some files + test_files = ['file1.txt', 'file2.txt', 'file3.txt'] + for file in test_files: + with open(os.path.join(TEST_DIR, file), 'w') as f: + f.write(f"Content of {file}") + + # List the directory + result = list_directory(TEST_DIR) + + # Verify all test files are listed + for file in test_files: + if file not in result: + return False, f"File {file} was not listed in result: {result}" + + return True, "Directory listed successfully" + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_file_exists() -> Tuple[bool, str]: + """Test the file_exists command.""" + try: + # Ensure a file exists + with open(TEST_FILE, 'w') as f: + f.write("Test file") + + # Check existing file + result1 = file_exists(TEST_FILE) + if not result1: + return False, f"file_exists reported False for existing file" + + # Check non-existing file + result2 = file_exists(TEST_FILE + ".nonexistent") + if result2: + return False, f"file_exists reported True for non-existent file" + + return True, "file_exists checked successfully" + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_save_json() -> Tuple[bool, str]: + """Test the save_json command.""" + try: + # Data to save + test_data = { + "name": "Test Data", + "values": [1, 2, 3, 4, 5], + "nested": { + "key": "value", + "flag": True + } + } + + # Save the data + result = save_json(TEST_JSON, test_data) + + # Verify the file was created + if not os.path.exists(TEST_JSON): + return False, "JSON file was not created" + + # Verify content + with open(TEST_JSON, 'r') as f: + loaded_data = json.load(f) + + if loaded_data != test_data: + return False, f"JSON data mismatch: {loaded_data} != {test_data}" + + return True, "JSON saved successfully" + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_load_json() -> Tuple[bool, str]: + """Test the load_json command.""" + try: + # Data to save and load + test_data = { + "name": "Test Data", + "values": [1, 2, 3, 4, 5], + "nested": { + "key": "value", + "flag": True + } + } + + # Create the file + with open(TEST_JSON, 'w') as f: + json.dump(test_data, f) + + # Load the data + result = load_json(TEST_JSON) + + # Verify + if result != test_data: + return False, f"Loaded JSON data mismatch: {result} != {test_data}" + + return True, "JSON loaded successfully" + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_advanced_edit_file() -> Tuple[bool, str]: + """Test the advanced_edit_file command.""" + try: + # Create a file with multiple lines + original_content = "Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n" + with open(TEST_EDIT_FILE, 'w', encoding='utf-8') as f: + f.write(original_content) + + # Test advanced edit: replace line 3 + edits = [ + {"operation": "replace", "line_number": 3, "content": "New Line 3"}, + ] + + result = advanced_edit_file(TEST_EDIT_FILE, edits) + + # Verify + expected_content = "Line 1\nLine 2\nNew Line 3\nLine 4\nLine 5\n" + with open(TEST_EDIT_FILE, 'r', encoding='utf-8') as f: + file_content = f.read() + + if file_content != expected_content: + return False, f"Content mismatch after advanced edit: {file_content} != {expected_content}" + + return True, "Advanced edit successful" + except Exception as e: + return False, f"Exception: {str(e)}" \ No newline at end of file diff --git a/SimpleAgent/benchmark/test_framework.py b/SimpleAgent/benchmark/test_framework.py new file mode 100644 index 0000000..39808d0 --- /dev/null +++ b/SimpleAgent/benchmark/test_framework.py @@ -0,0 +1,242 @@ +""" +Test framework for SimpleAgent commands. + +This module provides a framework for testing all SimpleAgent commands +and generating a status report in Markdown format. +""" + +import os +import importlib +import inspect +import pkgutil +import time +import json +import shutil +from typing import Dict, List, Callable, Any, Tuple +from datetime import datetime + +# Import SimpleAgent modules +import commands +from commands import REGISTERED_COMMANDS, COMMANDS_BY_CATEGORY +from core.agent import SimpleAgent + +# Initialize test results dictionary +test_results = {} + +# Setup test environment +TEST_OUTPUT_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'test_output') + +def setup_test_environment(): + """Setup the test environment including test directories.""" + # Create test directory if it doesn't exist + if not os.path.exists(TEST_OUTPUT_DIR): + os.makedirs(TEST_OUTPUT_DIR) + else: + # Clean up any previous test files + for filename in os.listdir(TEST_OUTPUT_DIR): + file_path = os.path.join(TEST_OUTPUT_DIR, filename) + try: + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + shutil.rmtree(file_path) + except Exception as e: + print(f'Failed to delete {file_path}. Reason: {e}') + +def discover_test_modules() -> List[str]: + """ + Discover all test modules in the benchmark package. + + Returns: + List of test module names + """ + test_modules = [] + package_dir = os.path.dirname(__file__) + + for _, module_name, is_pkg in pkgutil.iter_modules([package_dir]): + if not is_pkg and module_name.startswith('test_') and module_name != 'test_framework': + test_modules.append(module_name) + + return test_modules + +def run_test(test_func: Callable, test_name: str) -> Tuple[bool, str]: + """ + Run a single test function and return the results. + + Args: + test_func: The test function to run + test_name: The name of the test + + Returns: + Tuple of (success, message) + """ + start_time = time.time() + try: + result = test_func() + elapsed_time = time.time() - start_time + + # If the test returns a tuple with a boolean and a message + if isinstance(result, tuple) and len(result) == 2 and isinstance(result[0], bool): + success, message = result + return success, f"{message} (Completed in {elapsed_time:.2f}s)" + + # If the test just returns a boolean + elif isinstance(result, bool): + return result, f"Completed in {elapsed_time:.2f}s" + + # If the test doesn't return anything, assume success + elif result is None: + return True, f"Completed in {elapsed_time:.2f}s" + + # Otherwise, convert the result to a string message + else: + return True, f"{result} (Completed in {elapsed_time:.2f}s)" + + except Exception as e: + elapsed_time = time.time() - start_time + return False, f"Error: {str(e)} (Failed after {elapsed_time:.2f}s)" + +def discover_and_run_tests() -> Dict[str, Dict[str, Any]]: + """ + Discover and run all tests for SimpleAgent commands. + + Returns: + Dictionary of test results by category and command + """ + # Initialize commands + commands.init() + + # Setup test environment + setup_test_environment() + + # Discover test modules + test_modules = discover_test_modules() + print(f"Discovered {len(test_modules)} test modules: {test_modules}") + + # Import test modules + for module_name in test_modules: + try: + importlib.import_module(f"benchmark.{module_name}") + print(f"Imported test module: {module_name}") + except ImportError as e: + print(f"Error importing test module {module_name}: {e}") + + # Initialize results dictionary with categories and commands + results = {} + for category, cmd_list in COMMANDS_BY_CATEGORY.items(): + results[category] = {} + for cmd in cmd_list: + results[category][cmd] = { + "status": "Not Tested", + "message": "No test available", + "timestamp": datetime.now().isoformat() + } + + # Find test functions for each command + for test_module_name in test_modules: + module = importlib.import_module(f"benchmark.{test_module_name}") + + # Get all test functions from the module + test_functions = inspect.getmembers(module, + lambda member: inspect.isfunction(member) and member.__name__.startswith('test_')) + + for func_name, test_func in test_functions: + # Extract command name from test function name (removing 'test_' prefix) + cmd_name = func_name[5:] # Remove 'test_' prefix + + # Find category for this command + category = None + for cat, cmds in COMMANDS_BY_CATEGORY.items(): + if cmd_name in cmds: + category = cat + break + + if category: + print(f"Running test for {category}.{cmd_name}...") + success, message = run_test(test_func, func_name) + + # Update results + results[category][cmd_name] = { + "status": "Working" if success else "Failed", + "message": message, + "timestamp": datetime.now().isoformat() + } + else: + print(f"Warning: Could not find category for command '{cmd_name}'") + + return results + +def generate_status_markdown(results: Dict[str, Dict[str, Any]]) -> str: + """ + Generate a Markdown file with the status of all commands. + + Args: + results: Test results by category and command + + Returns: + Markdown string with command status + """ + md_content = "# SimpleAgent Command Status\n\n" + md_content += f"Last updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" + + # Count totals + total_commands = 0 + working_commands = 0 + failed_commands = 0 + untested_commands = 0 + + for category, commands in results.items(): + for cmd, result in commands.items(): + total_commands += 1 + if result['status'] == 'Working': + working_commands += 1 + elif result['status'] == 'Failed': + failed_commands += 1 + else: + untested_commands += 1 + + # Add summary + md_content += "## Summary\n\n" + md_content += f"- Total Commands: {total_commands}\n" + md_content += f"- Working: {working_commands} ({working_commands/total_commands*100:.1f}%)\n" + md_content += f"- Failed: {failed_commands} ({failed_commands/total_commands*100:.1f}%)\n" + md_content += f"- Not Tested: {untested_commands} ({untested_commands/total_commands*100:.1f}%)\n\n" + + # Add details by category + md_content += "## Command Status by Category\n\n" + + for category in sorted(results.keys()): + category_display = category.replace('_', ' ').title() + md_content += f"### {category_display} Commands\n\n" + + md_content += "| Command | Status | Message |\n" + md_content += "|---------|--------|--------|\n" + + commands = results[category] + for cmd_name in sorted(commands.keys()): + result = commands[cmd_name] + status_emoji = "✅" if result['status'] == 'Working' else "❌" if result['status'] == 'Failed' else "⚠️" + md_content += f"| `{cmd_name}` | {status_emoji} {result['status']} | {result['message']} |\n" + + md_content += "\n" + + return md_content + +def save_status_file(markdown_content: str, filepath: str = None) -> str: + """ + Save the status markdown to a file. + + Args: + markdown_content: The markdown content to save + filepath: The file path to save to (default: SimpleAgent/status.md) + + Returns: + The path to the saved file + """ + if filepath is None: + filepath = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'status.md') + + with open(filepath, 'w', encoding='utf-8') as f: + f.write(markdown_content) + + return filepath \ No newline at end of file diff --git a/SimpleAgent/benchmark/test_github_ops.py b/SimpleAgent/benchmark/test_github_ops.py new file mode 100644 index 0000000..6208ae2 --- /dev/null +++ b/SimpleAgent/benchmark/test_github_ops.py @@ -0,0 +1,349 @@ +""" +Tests for GitHub operation commands. + +This module contains benchmark tests for all GitHub operation commands. +Tests use mocked GitHub API responses to avoid real API calls. +""" + +import os +import json +import unittest.mock as mock +from typing import Tuple, Dict, Any + +from benchmark.test_framework import TEST_OUTPUT_DIR + +# Import GitHub operation functions +try: + from commands.github_ops.repo_reader import repo_reader + from commands.github_ops.issue_reader import issue_reader + from commands.github_ops.pr_reader import pr_reader + from commands.github_ops.github_create_issue import github_create_issue + from commands.github_ops.github_create_pr import github_create_pr + from commands.github_ops.github_comment import github_comment + from commands.github_ops.github_create_repo import github_create_repo + from commands.github_ops.github_read_files import github_read_files + + # Flag to track if imports succeeded + GITHUB_OPS_AVAILABLE = True +except ImportError as e: + print(f"Warning: Could not import github_ops modules: {e}") + GITHUB_OPS_AVAILABLE = False + +# Test data +TEST_REPO = "example/repo" +TEST_ISSUE_NUMBER = 42 +TEST_PR_NUMBER = 123 +TEST_COMMENT = "This is a test comment." +TEST_REPO_DATA = { + "name": "repo", + "full_name": "example/repo", + "description": "Test repository", + "html_url": "https://github.com/example/repo", + "owner": { + "login": "example" + }, + "default_branch": "main" +} +TEST_ISSUE_DATA = { + "number": TEST_ISSUE_NUMBER, + "title": "Test Issue", + "body": "This is a test issue body.", + "html_url": f"https://github.com/example/repo/issues/{TEST_ISSUE_NUMBER}", + "user": { + "login": "testuser" + }, + "state": "open", + "created_at": "2023-01-01T00:00:00Z", + "comments": 5 +} +TEST_PR_DATA = { + "number": TEST_PR_NUMBER, + "title": "Test PR", + "body": "This is a test PR body.", + "html_url": f"https://github.com/example/repo/pull/{TEST_PR_NUMBER}", + "user": { + "login": "testuser" + }, + "state": "open", + "created_at": "2023-01-01T00:00:00Z", + "comments": 3, + "merged": False +} +TEST_ISSUES_LIST = [TEST_ISSUE_DATA] +TEST_PR_LIST = [TEST_PR_DATA] +TEST_FILE_CONTENT = "This is the content of a mock GitHub file." + +# Mock GitHub methods +def mock_github_repo(): + """Create a mock GitHub Repository object.""" + mock_repo = mock.Mock() + mock_repo.name = TEST_REPO_DATA["name"] + mock_repo.full_name = TEST_REPO_DATA["full_name"] + mock_repo.description = TEST_REPO_DATA["description"] + mock_repo.html_url = TEST_REPO_DATA["html_url"] + mock_repo.owner.login = TEST_REPO_DATA["owner"]["login"] + mock_repo.default_branch = TEST_REPO_DATA["default_branch"] + + # Mock get_issues method + mock_issues = [] + for issue_data in TEST_ISSUES_LIST: + mock_issue = mock.Mock() + for key, value in issue_data.items(): + setattr(mock_issue, key, value) + mock_issues.append(mock_issue) + mock_repo.get_issues.return_value = mock_issues + + # Mock get_pulls method + mock_prs = [] + for pr_data in TEST_PR_LIST: + mock_pr = mock.Mock() + for key, value in pr_data.items(): + setattr(mock_pr, key, value) + mock_prs.append(mock_pr) + mock_repo.get_pulls.return_value = mock_prs + + # Mock get_issue method + mock_issue = mock.Mock() + for key, value in TEST_ISSUE_DATA.items(): + setattr(mock_issue, key, value) + mock_repo.get_issue.return_value = mock_issue + + # Mock create_issue method + mock_repo.create_issue.return_value = mock_issue + + # Mock get_pull method + mock_pr = mock.Mock() + for key, value in TEST_PR_DATA.items(): + setattr(mock_pr, key, value) + mock_repo.get_pull.return_value = mock_pr + + # Mock get_contents method + mock_content = mock.Mock() + mock_content.decoded_content.decode.return_value = TEST_FILE_CONTENT + mock_repo.get_contents.return_value = mock_content + + return mock_repo + +def test_repo_reader() -> Tuple[bool, str]: + """Test the repo_reader command.""" + if not GITHUB_OPS_AVAILABLE: + return False, "github_ops modules not available" + + try: + # Mock the GitHub client + with mock.patch('github.Github') as mock_github: + # Set up the mock repository + mock_github.return_value.get_repo.return_value = mock_github_repo() + + # Call the function + result = repo_reader(TEST_REPO) + + # Verify mocks were called + mock_github.return_value.get_repo.assert_called_once_with(TEST_REPO) + + # Check that the result contains expected info + if not result or not isinstance(result, dict): + return False, f"Expected dict result, got: {result}" + + if "repo" not in result or "issues" not in result or "pulls" not in result: + return False, f"Missing expected keys in result: {result.keys()}" + + return True, "Repo reader test passed" + + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_issue_reader() -> Tuple[bool, str]: + """Test the issue_reader command.""" + if not GITHUB_OPS_AVAILABLE: + return False, "github_ops modules not available" + + try: + # Mock the GitHub client + with mock.patch('github.Github') as mock_github: + # Set up the mock repository + mock_github.return_value.get_repo.return_value = mock_github_repo() + + # Call the function + result = issue_reader(TEST_REPO, TEST_ISSUE_NUMBER) + + # Verify mocks were called + mock_github.return_value.get_repo.assert_called_once_with(TEST_REPO) + + # Check that the result contains expected info + if not result or not isinstance(result, dict): + return False, f"Expected dict result, got: {result}" + + if "title" not in result or "body" not in result: + return False, f"Missing expected keys in result: {result.keys()}" + + return True, "Issue reader test passed" + + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_pr_reader() -> Tuple[bool, str]: + """Test the pr_reader command.""" + if not GITHUB_OPS_AVAILABLE: + return False, "github_ops modules not available" + + try: + # Mock the GitHub client + with mock.patch('github.Github') as mock_github: + # Set up the mock repository + mock_github.return_value.get_repo.return_value = mock_github_repo() + + # Call the function + result = pr_reader(TEST_REPO, TEST_PR_NUMBER) + + # Verify mocks were called + mock_github.return_value.get_repo.assert_called_once_with(TEST_REPO) + + # Check that the result contains expected info + if not result or not isinstance(result, dict): + return False, f"Expected dict result, got: {result}" + + if "title" not in result or "body" not in result: + return False, f"Missing expected keys in result: {result.keys()}" + + return True, "PR reader test passed" + + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_github_create_issue() -> Tuple[bool, str]: + """Test the github_create_issue command.""" + if not GITHUB_OPS_AVAILABLE: + return False, "github_ops modules not available" + + try: + # Mock the GitHub client + with mock.patch('github.Github') as mock_github: + # Set up the mock repository + mock_repo = mock_github_repo() + mock_github.return_value.get_repo.return_value = mock_repo + + # Call the function + result = github_create_issue(TEST_REPO, "Test Issue Title", "Test issue body") + + # Verify mocks were called + mock_github.return_value.get_repo.assert_called_once_with(TEST_REPO) + mock_repo.create_issue.assert_called_once_with( + title="Test Issue Title", + body="Test issue body" + ) + + # Check that the result contains expected info + if not result or not isinstance(result, dict): + return False, f"Expected dict result, got: {result}" + + if "url" not in result or "number" not in result: + return False, f"Missing expected keys in result: {result.keys()}" + + return True, "GitHub create issue test passed" + + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_github_comment() -> Tuple[bool, str]: + """Test the github_comment command.""" + if not GITHUB_OPS_AVAILABLE: + return False, "github_ops modules not available" + + try: + # Mock the GitHub client + with mock.patch('github.Github') as mock_github: + # Set up the mock repository and issue + mock_repo = mock_github_repo() + mock_github.return_value.get_repo.return_value = mock_repo + mock_issue = mock_repo.get_issue.return_value + mock_issue.create_comment.return_value = mock.Mock( + html_url=f"https://github.com/example/repo/issues/{TEST_ISSUE_NUMBER}#comment-1" + ) + + # Call the function + result = github_comment(TEST_REPO, TEST_ISSUE_NUMBER, TEST_COMMENT) + + # Verify mocks were called + mock_github.return_value.get_repo.assert_called_once_with(TEST_REPO) + mock_repo.get_issue.assert_called_once_with(TEST_ISSUE_NUMBER) + mock_issue.create_comment.assert_called_once_with(TEST_COMMENT) + + # Check that the result contains expected info + if not result or not isinstance(result, dict): + return False, f"Expected dict result, got: {result}" + + if "url" not in result: + return False, f"Missing expected key 'url' in result: {result.keys()}" + + return True, "GitHub comment test passed" + + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_github_read_files() -> Tuple[bool, str]: + """Test the github_read_files command.""" + if not GITHUB_OPS_AVAILABLE: + return False, "github_ops modules not available" + + try: + # Mock the GitHub client + with mock.patch('github.Github') as mock_github: + # Set up the mock repository + mock_repo = mock_github_repo() + mock_github.return_value.get_repo.return_value = mock_repo + + # Call the function + result = github_read_files(TEST_REPO, "path/to/file.txt") + + # Verify mocks were called + mock_github.return_value.get_repo.assert_called_once_with(TEST_REPO) + mock_repo.get_contents.assert_called_once_with("path/to/file.txt") + + # Check that the result contains expected content + if not result or not isinstance(result, str): + return False, f"Expected string result, got: {type(result)}" + + if TEST_FILE_CONTENT not in result: + return False, f"Expected file content not in result: {result}" + + return True, "GitHub read files test passed" + + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_github_create_repo() -> Tuple[bool, str]: + """Test the github_create_repo command.""" + if not GITHUB_OPS_AVAILABLE: + return False, "github_ops modules not available" + + try: + # Mock the GitHub client + with mock.patch('github.Github') as mock_github: + # Set up the mock user + mock_user = mock.Mock() + mock_user.create_repo.return_value = mock_github_repo() + mock_github.return_value.get_user.return_value = mock_user + + # Call the function + result = github_create_repo("test-repo", "Test repository description", private=True) + + # Verify mocks were called + mock_github.return_value.get_user.assert_called_once() + mock_user.create_repo.assert_called_once_with( + "test-repo", + description="Test repository description", + private=True + ) + + # Check that the result contains expected info + if not result or not isinstance(result, dict): + return False, f"Expected dict result, got: {result}" + + if "url" not in result or "full_name" not in result: + return False, f"Missing expected keys in result: {result.keys()}" + + return True, "GitHub create repo test passed" + + except Exception as e: + return False, f"Exception: {str(e)}" \ No newline at end of file diff --git a/SimpleAgent/benchmark/test_web_ops.py b/SimpleAgent/benchmark/test_web_ops.py new file mode 100644 index 0000000..3f7818e --- /dev/null +++ b/SimpleAgent/benchmark/test_web_ops.py @@ -0,0 +1,236 @@ +""" +Tests for web operation commands. + +This module contains benchmark tests for all web operation commands. +Note that these tests use mocked HTTP responses to avoid real network requests. +""" + +import os +import json +import unittest.mock as mock +from typing import Tuple, Dict, Any + +from benchmark.test_framework import TEST_OUTPUT_DIR +import requests + +# Import web operation functions +try: + from commands.web_ops.web_search import web_search + from commands.web_ops.web_scrape import web_scrape + from commands.web_ops.fetch_json_api import fetch_json_api + from commands.web_ops.raw_web_read import raw_web_read + from commands.web_ops.extract_links import extract_links + + # Flag to track if imports succeeded + WEB_OPS_AVAILABLE = True +except ImportError as e: + print(f"Warning: Could not import web_ops modules: {e}") + WEB_OPS_AVAILABLE = False + +# Test URLs +TEST_URL = "https://example.com" +TEST_API_URL = "https://api.example.com/data" +TEST_SEARCH_QUERY = "test query" +TEST_HTML = """ + + +
+This is a test paragraph.
+ Link 1 + Link 2 + Link 3 + + +""" + +TEST_JSON_RESPONSE = { + "status": "success", + "data": { + "items": [1, 2, 3, 4, 5], + "meta": { + "total": 5, + "page": 1 + } + } +} + +def mock_response(content=None, json_data=None, status_code=200, url=TEST_URL): + """Create a mock requests.Response object.""" + mock_resp = mock.Mock() + mock_resp.status_code = status_code + mock_resp.url = url + + # Set content or json response + if content is not None: + mock_resp.text = content + mock_resp.content = content.encode('utf-8') + + if json_data is not None: + mock_resp.json = mock.Mock(return_value=json_data) + + return mock_resp + +def test_web_search() -> Tuple[bool, str]: + """Test the web_search command.""" + if not WEB_OPS_AVAILABLE: + return False, "web_ops modules not available" + + try: + # Mock the requests.get function to avoid actual web requests + with mock.patch('requests.get') as mock_get: + # Set up the mock response + mock_resp = mock_response( + content='{"results": [{"title": "Test Result", "url": "https://example.com"}]}', + json_data={"results": [{"title": "Test Result", "url": "https://example.com"}]} + ) + mock_get.return_value = mock_resp + + # Call the function with the test query + result = web_search(TEST_SEARCH_QUERY) + + # Verify mock was called with the correct URL + mock_get.assert_called_once() + + # Check that the result is not empty + if not result or not isinstance(result, list): + return False, f"Expected list of results, got: {result}" + + return True, "Web search test passed" + + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_web_scrape() -> Tuple[bool, str]: + """Test the web_scrape command.""" + if not WEB_OPS_AVAILABLE: + return False, "web_ops modules not available" + + try: + # Mock the requests.get function + with mock.patch('requests.get') as mock_get: + # Set up the mock response + mock_resp = mock_response(content=TEST_HTML) + mock_get.return_value = mock_resp + + # Call the function + result = web_scrape(TEST_URL) + + # Verify mock was called with the correct URL + mock_get.assert_called_once_with( + TEST_URL, + headers=mock.ANY, + timeout=mock.ANY + ) + + # Check that the result contains expected content + if not result or "Test Page Header" not in result: + return False, f"Expected HTML content with 'Test Page Header', got: {result[:100]}..." + + return True, "Web scrape test passed" + + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_fetch_json_api() -> Tuple[bool, str]: + """Test the fetch_json_api command.""" + if not WEB_OPS_AVAILABLE: + return False, "web_ops modules not available" + + try: + # Mock the requests.get function + with mock.patch('requests.get') as mock_get: + # Set up the mock response + mock_resp = mock_response(json_data=TEST_JSON_RESPONSE) + mock_get.return_value = mock_resp + + # Call the function + result = fetch_json_api(TEST_API_URL) + + # Verify mock was called with the correct URL + mock_get.assert_called_once_with( + TEST_API_URL, + headers=mock.ANY, + timeout=mock.ANY + ) + + # Check that the result contains expected JSON + if not result or result.get('status') != 'success': + return False, f"Expected JSON with status 'success', got: {result}" + + return True, "JSON API fetch test passed" + + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_raw_web_read() -> Tuple[bool, str]: + """Test the raw_web_read command.""" + if not WEB_OPS_AVAILABLE: + return False, "web_ops modules not available" + + try: + # Mock the requests.get function + with mock.patch('requests.get') as mock_get: + # Set up the mock response + mock_resp = mock_response(content=TEST_HTML) + mock_get.return_value = mock_resp + + # Call the function + result = raw_web_read(TEST_URL) + + # Verify mock was called with the correct URL + mock_get.assert_called_once_with( + TEST_URL, + headers=mock.ANY, + timeout=mock.ANY + ) + + # Check that the result contains the raw HTML + if not result or "" not in result: + return False, f"Expected raw HTML with DOCTYPE, got: {result[:100]}..." + + return True, "Raw web read test passed" + + except Exception as e: + return False, f"Exception: {str(e)}" + +def test_extract_links() -> Tuple[bool, str]: + """Test the extract_links command.""" + if not WEB_OPS_AVAILABLE: + return False, "web_ops modules not available" + + try: + # Mock the requests.get function + with mock.patch('requests.get') as mock_get: + # Set up the mock response + mock_resp = mock_response(content=TEST_HTML) + mock_get.return_value = mock_resp + + # Call the function + result = extract_links(TEST_URL) + + # Verify mock was called + mock_get.assert_called_once() + + # Check that the result contains the expected links + expected_links = [ + "https://example.com/link1", + "https://example.com/link2", + "https://example.com/link3" + ] + + if not result or not isinstance(result, list): + return False, f"Expected list of links, got: {result}" + + # Check if expected links are in the result + missing_links = [link for link in expected_links if link not in result] + if missing_links: + return False, f"Missing expected links: {missing_links}, got: {result}" + + return True, "Link extraction test passed" + + except Exception as e: + return False, f"Exception: {str(e)}" \ No newline at end of file diff --git a/SimpleAgent/commands/web_ops/__init__.py b/SimpleAgent/commands/web_ops/__init__.py index 7620591..2ff5472 100644 --- a/SimpleAgent/commands/web_ops/__init__.py +++ b/SimpleAgent/commands/web_ops/__init__.py @@ -7,4 +7,5 @@ from . import web_scrape # Import the web scrape command from . import raw_web_read # Import the raw web read command from . import extract_links # Import the extract links command -from . import fetch_json_api # Import the fetch JSON API command \ No newline at end of file +from . import fetch_json_api # Import the fetch JSON API command +from . import browse_web # Import the browse web command \ No newline at end of file diff --git a/SimpleAgent/commands/web_ops/browse_web/__init__.py b/SimpleAgent/commands/web_ops/browse_web/__init__.py new file mode 100644 index 0000000..c5c83c1 --- /dev/null +++ b/SimpleAgent/commands/web_ops/browse_web/__init__.py @@ -0,0 +1,79 @@ +""" +Browse web command for SimpleAgent. + +This module provides the browse_web command for interactive web browsing using lynx. +""" + +import subprocess +import shlex +from typing import Dict, Any +from commands import register_command + + +def browse_web(url: str) -> Dict[str, Any]: + """ + Launch an interactive lynx browser session to browse a webpage. + + Args: + url: The URL to browse + + Returns: + Dictionary indicating success or failure + """ + try: + # Check if lynx is available + which_process = subprocess.run(["which", "lynx"], capture_output=True, text=True) + if which_process.returncode != 0: + return { + "success": False, + "error": "Lynx browser is not installed. Please install lynx using 'sudo apt-get install lynx'." + } + + # Print instructions for the user + print("\n=== Lynx Browser Instructions ===") + print("Arrow keys: Navigate") + print("Enter: Follow link") + print("q: Quit browser") + print("g: Go to URL") + print("h: Help") + print("================================\n") + + # Launch lynx in interactive mode + print(f"Opening {url} in lynx browser...") + + # We use subprocess.call to allow direct terminal interaction + subprocess.call(["lynx", url]) + + return { + "success": True, + "message": "Lynx browser session completed." + } + + except Exception as e: + return { + "success": False, + "error": f"Failed to launch lynx browser: {str(e)}" + } + + +# Define the schema for the browse_web command +BROWSE_WEB_SCHEMA = { + "type": "function", + "function": { + "name": "browse_web", + "description": "Launch an interactive lynx browser session to browse a webpage", + "parameters": { + "type": "object", + "properties": { + "url": { + "type": "string", + "description": "The URL to browse" + } + }, + "required": ["url"] + } + } +} + +# Register the command +register_command("browse_web", browse_web, BROWSE_WEB_SCHEMA) \ No newline at end of file diff --git a/SimpleAgent/commands/web_ops/web_search/__init__.py b/SimpleAgent/commands/web_ops/web_search/__init__.py index 6fe6b4a..b239c0f 100644 --- a/SimpleAgent/commands/web_ops/web_search/__init__.py +++ b/SimpleAgent/commands/web_ops/web_search/__init__.py @@ -5,6 +5,8 @@ """ import json +import subprocess +import shlex from typing import List, Dict, Any from googlesearch import search import requests @@ -22,6 +24,116 @@ def web_search(query: str, num_results: int = 5, include_snippets: bool = True) num_results: Number of results to return (default: 5) include_snippets: Whether to include text snippets from the pages (default: True) + Returns: + Dictionary containing search results and snippets + """ + # Try to use lynx first if available + try: + # Check if lynx is available + which_process = subprocess.run(["which", "lynx"], capture_output=True, text=True) + lynx_available = which_process.returncode == 0 + + if lynx_available: + return _lynx_web_search(query, num_results, include_snippets) + else: + return _default_web_search(query, num_results, include_snippets) + except Exception as e: + # If any error occurs with lynx, fall back to default implementation + print(f"Lynx search failed, falling back to default search: {str(e)}") + return _default_web_search(query, num_results, include_snippets) + + +def _lynx_web_search(query: str, num_results: int = 5, include_snippets: bool = True) -> Dict[str, Any]: + """ + Use lynx to search the web for information. + + Args: + query: The search query + num_results: Number of results to return + include_snippets: Whether to include text snippets from the pages + + Returns: + Dictionary containing search results and snippets + """ + try: + # Format query for a search engine URL (using DuckDuckGo which is more friendly to text browsers) + search_url = f"https://lite.duckduckgo.com/lite/?q={query.replace(' ', '+')}" + + # Use lynx in dump mode to get the text content of the search results + # -dump: output the rendered page content to stdout + # -nolist: don't show the link list at the end + # -width=800: wider output to avoid line wrapping + cmd = ["lynx", "-dump", "-nolist", "-width=800", search_url] + process = subprocess.run(cmd, capture_output=True, text=True) + + if process.returncode != 0: + raise Exception(f"Lynx command failed with code {process.returncode}: {process.stderr}") + + # Process the output to extract results + output = process.stdout + + # Split output into lines and look for results + lines = output.split('\n') + results = [] + current_result = None + url = None + + # Simple parsing of the DDG Lite output + for line in lines: + line = line.strip() + if not line: + continue + + # Extract links - they appear as numbers in brackets + if line.startswith('[') and ']' in line and 'http' in line: + url_part = line.split(']', 1)[1].strip() + if 'http' in url_part: + url = url_part.split(' ', 1)[0].strip() + + # Look for potential result titles (non-empty lines that aren't navigation) + elif line and not line.startswith('[') and not line.startswith('DuckDuckGo'): + # This might be a result title or description + if url and len(line) > 10: # Ensure it's substantial content + # Create a new result + current_result = { + "url": url, + "title": line, + } + if include_snippets: + # For snippet, we'll try to get the next non-empty line + current_result["snippet"] = line + + results.append(current_result) + current_result = None + url = None + + # Limit to requested number of results + if len(results) >= num_results: + break + + return { + "query": query, + "results": results, + "total_results": len(results) + } + + except Exception as e: + return { + "error": f"Lynx search failed: {str(e)}", + "query": query, + "results": [] + } + + +def _default_web_search(query: str, num_results: int = 5, include_snippets: bool = True) -> Dict[str, Any]: + """ + Default implementation of web search using googlesearch-python and requests. + + Args: + query: The search query + num_results: Number of results to return + include_snippets: Whether to include text snippets from the pages + Returns: Dictionary containing search results and snippets """ @@ -88,6 +200,7 @@ def web_search(query: str, num_results: int = 5, include_snippets: bool = True) "results": [] } + # Define the schema for the web_search command WEB_SEARCH_SCHEMA = { "type": "function", diff --git a/SimpleAgent/status.md b/SimpleAgent/status.md new file mode 100644 index 0000000..e7cf567 --- /dev/null +++ b/SimpleAgent/status.md @@ -0,0 +1,26 @@ +# SimpleAgent Command Status + +Last updated: Initial template (Run benchmark tests to update) + +## Summary + +- Total Commands: 0 +- Working: 0 (0.0%) +- Failed: 0 (0.0%) +- Not Tested: 0 (0.0%) + +## Command Status by Category + +This status file will be automatically updated when you run benchmark tests using: + +``` +python SimpleAgent.py --benchmark +``` + +You can view the latest status using: + +``` +python SimpleAgent.py --status +``` + +The benchmark tests will verify that all commands are working correctly and update this file accordingly. \ No newline at end of file diff --git a/SimpleAgent/test_lynx_search.py b/SimpleAgent/test_lynx_search.py new file mode 100755 index 0000000..5094229 --- /dev/null +++ b/SimpleAgent/test_lynx_search.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +""" +Test script for lynx-based web search. + +This script demonstrates the web search functionality using lynx. +""" + +import sys +import json +from commands.web_ops.web_search import web_search +from commands.web_ops.browse_web import browse_web + +def main(): + """Run a test of the lynx web search.""" + # Check if query is provided as argument + if len(sys.argv) > 1: + query = " ".join(sys.argv[1:]) + else: + query = input("Enter search query: ") + + print(f"\nSearching for: {query}\n") + + # Perform search + results = web_search(query, num_results=5, include_snippets=True) + + # Print results + print(f"Found {results.get('total_results', 0)} results:\n") + + for i, result in enumerate(results.get('results', []), 1): + print(f"{i}. {result.get('title', 'No title')}") + print(f" URL: {result.get('url', 'No URL')}") + if 'snippet' in result: + print(f" Snippet: {result.get('snippet')[:150]}...") + print() + + # Ask if user wants to browse one of the results + if results.get('results'): + try: + choice = input("Enter result number to browse (or press Enter to exit): ") + if choice and choice.isdigit(): + choice_idx = int(choice) - 1 + if 0 <= choice_idx < len(results.get('results', [])): + url = results['results'][choice_idx].get('url') + if url: + print(f"\nOpening {url} in lynx browser...") + browse_web(url) + except KeyboardInterrupt: + print("\nBrowsing cancelled.") + + print("Test completed.") + +if __name__ == "__main__": + main() \ No newline at end of file