diff --git a/README.md b/README.md index a4befa0..1b9072c 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,8 @@ Strands Agents Tools provides a powerful set of tools for your agents to use. It - 🧠 **Advanced Reasoning** - Tools for complex thinking and reasoning capabilities - 🐝 **Swarm Intelligence** - Coordinate multiple AI agents for parallel problem solving with shared memory - 🔄 **Multiple tools in Parallel** - Call multiple other tools at the same time in parallel with Batch Tool - +- 🔍 **Browser Tool** - Tool giving an agent access to perform automated actions on a browser (chromium) + ## 📦 Installation ### Quick Install @@ -121,6 +122,7 @@ Below is a comprehensive table of all available tools, how to use them with an a | use_llm | `agent.tool.use_llm(prompt="Analyze this data", system_prompt="You are a data analyst")` | Create nested AI loops with customized system prompts for specialized tasks | | workflow | `agent.tool.workflow(action="create", name="data_pipeline", steps=[{"tool": "file_read"}, {"tool": "python_repl"}])` | Define, execute, and manage multi-step automated workflows | | batch| `agent.tool.batch(invocations=[{"name": "current_time", "arguments": {"timezone": "Europe/London"}}, {"name": "stop", "arguments": {}}])` | Call multiple other tools in parallel. | +| use_browser | `agent.tool.use_browser(action="navigate", url="https://www.example.com") ` | Web scraping, automated testing, form filling, web automation tasks | \* *These tools do not work on windows* @@ -301,6 +303,32 @@ result = agent.tool.batch( ) ``` +### Use Browser +```python +from strands import Agent +from strands_tools import use_browser + +agent = Agent(tools=[use_browser]) + +# Simple navigation +result = agent.tool.use_browser(action="navigate", url="https://example.com") + +# Sequential actions for form filling +result = agent.tool.use_browser(actions=[ + {"action": "navigate", "args": {"url": "https://example.com/login"}}, + {"action": "type", "args": {"selector": "#username", "text": "user@example.com"}}, + {"action": "click", "args": {"selector": "#submit"}} +]) + +# Web scraping with content extraction +result = agent.tool.use_browser(actions=[ + {"action": "navigate", "args": {"url": "https://example.com/data"}}, + {"action": "get_text", "args": {"selector": ".content"}}, + {"action": "click", "args": {"selector": ".next-page"}}, + {"action": "get_html", "args": {"selector": "main"}} +]) +``` + ## 🌍 Environment Variables Configuration Agents Tools provides extensive customization through environment variables. This allows you to configure tool behavior without modifying code, making it ideal for different environments (development, testing, production). @@ -443,6 +471,20 @@ The Mem0 Memory Tool supports three different backend configurations: | FILE_READ_USE_GIT_DEFAULT | Default setting for using git in time machine mode | true | | FILE_READ_NUM_REVISIONS_DEFAULT | Default number of revisions to show in time machine mode | 5 | +#### Use Browser Tool + +| Environment Variable | Description | Default | +|----------------------|-------------|---------| +| DEFAULT_WAIT_TIME | Default setting for wait time with actions | 1 | +| BROWSER_MAX_RETRIES | Default number of retries to perform when an action fails | 3 | +| BROWSER_SCREENSHOTS_DIR | Default directory where screenshots will be saved | screenshots | +| BROWSER_USER_DATA_DIR | Default directory where data for reloading a browser instance is stored | ~/.browser_automation | +| BROWSER_HEADLESS | Default headless setting for launching browsers | false | +| BROWSER_WIDTH | Default width of the browser | 1280 | +| BROWSER_HEIGHT | Default height of the browser | 800 | +| ENABLE_DEBUG_BROWSER_LOGS | Default enable of the browser's debug logs | false | + + ## Contributing ❤️ We welcome contributions! See our [Contributing Guide](CONTRIBUTING.md) for details on: diff --git a/pyproject.toml b/pyproject.toml index 3042e36..11e7932 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,8 @@ dependencies = [ "tenacity>=9.1.2,<10.0.0", "watchdog>=6.0.0,<7.0.0", "slack_bolt>=1.23.0,<2.0.0", + "nest-asyncio>=1.5.0,<2.0.0", + "playwright>=1.42.0,<2.0.0", # Note: Always want the latest tzdata "tzdata ; platform_system == 'Windows'", ] @@ -112,7 +114,8 @@ extra-dependencies = [ "pytest>=8.0.0,<9.0.0", "pytest-cov>=4.1.0,<5.0.0", "pytest-xdist>=3.0.0,<4.0.0", - "responses>=0.6.1,<1.0.0" + "responses>=0.6.1,<1.0.0", + "pytest_asyncio>=0.23.0,<1.0.0" ] extra-args = [ "-n", @@ -134,7 +137,6 @@ run-cov = [ cov-combine = [] cov-report = [] - [tool.hatch.envs.default.scripts] list = [ "echo 'Scripts commands available for default env:'; hatch env show --json | jq --raw-output '.default.scripts | keys[]'" diff --git a/src/strands_tools/mem0_memory.py b/src/strands_tools/mem0_memory.py index d984981..5840dea 100644 --- a/src/strands_tools/mem0_memory.py +++ b/src/strands_tools/mem0_memory.py @@ -140,7 +140,7 @@ "description": "Optional metadata to store with the memory", }, }, - "required": ["action"] + "required": ["action"], } }, } diff --git a/src/strands_tools/use_browser.py b/src/strands_tools/use_browser.py new file mode 100644 index 0000000..47b980c --- /dev/null +++ b/src/strands_tools/use_browser.py @@ -0,0 +1,771 @@ +import asyncio +import inspect +import json + +# Configure logging +import logging +import os +import time # Added for timestamp in screenshot filenames +from typing import Callable, Dict, List, Optional + +import nest_asyncio +from playwright.async_api import ( + Browser, + BrowserContext, + Page, + Playwright, + async_playwright, +) +from playwright.async_api import ( + TimeoutError as PlaywrightTimeoutError, +) +from rich.console import Console +from rich.panel import Panel +from rich.text import Text +from strands import tool + +from strands_tools.utils.user_input import get_user_input + +logger = logging.getLogger(__name__) + +console = Console() + +# Global browser manager instance +_playwright_manager = None + + +class BrowserApiMethods: + # Api Method Calls + async def navigate(page: Page, url: str): + try: + await page.goto(url) + await page.wait_for_load_state("networkidle") + return f"Navigated to {url}" + except Exception as e: + error_str = str(e) + if "ERR_NAME_NOT_RESOLVED" in error_str: + raise ValueError( + f"Could not resolve domain '{url}'. The website might not exist or a network connectivity issue." + ) from e + elif "ERR_CONNECTION_REFUSED" in error_str: + raise ValueError( + f"Connection refused for '{url}'. The server might be down or blocking requests." + ) from e + elif "ERR_CONNECTION_TIMED_OUT" in error_str: + raise ValueError(f"Connection timed out for '{url}'. The server might be slow or unreachable.") from e + elif "ERR_SSL_PROTOCOL_ERROR" in error_str: + raise ValueError( + f"SSL/TLS error when connecting to '{url}'. The site might have an invalid or expired certificate." + ) from e + elif "ERR_CERT_" in error_str: + raise ValueError( + f"Certificate error when connecting to '{url}'. The site's security certificate might be invalid." + ) from e + else: + raise + + async def click(page: Page, selector: str): + await page.click(selector) + return f"Clicked element: {selector}" + + async def type(page: Page, selector: str, text: str): + await page.fill(selector, text) + return f"Typed '{text}' into {selector}" + + async def evaluate(page: Page, script: str): + result = await page.evaluate(script) + return f"Evaluation result: {result}" + + async def press_key(page: Page, key: str): + await page.keyboard.press(key) + return f"Pressed key: {key}" + + async def get_text(page: Page, selector: str): + text = await page.text_content(selector) + return f"Text content: {text}" + + async def get_html(page: Page, selector: str = None): + if not selector: + result = await page.content() + else: + try: + await page.wait_for_selector(selector, timeout=5000) + result = await page.inner_html(selector) + except PlaywrightTimeoutError as e: + raise ValueError( + f"Element with selector '{selector}' not found on the page. Please verify the selector is correct." + ) from e + return (result[:1000] + "..." if len(result) > 1000 else result,) + + async def screenshot(page: Page, path: str = None): + """Take a screenshot with configurable path from environment variable""" + screenshots_dir = os.getenv("STRANDS_BROWSER_SCREENSHOTS_DIR", "screenshots") + os.makedirs(screenshots_dir, exist_ok=True) # Ensure directory exists + + if not path: + # Generate default filename with timestamp if no path provided + filename = f"screenshot_{int(time.time())}.png" + path = os.path.join(screenshots_dir, filename) + elif not os.path.isabs(path): + # If relative path provided, make it relative to screenshots directory + path = os.path.join(screenshots_dir, path) + + await page.screenshot(path=path) + return f"Screenshot saved as {path}" + + async def refresh(page: Page): + page.reload() + page.wait_for_load_state("networkidle") + return "Page refreshed" + + async def back(page: Page): + page.go_back() + page.wait_for_load_state("networkidle") + return "Navigated back" + + async def forward(page: Page): + page.go_forward() + page.wait_for_load_state("networkidle") + return "Navigated forward" + + async def new_tab(page: Page, browser_manager, tab_id: str = None): + if tab_id is None: + tab_id = f"tab_{len(browser_manager._tabs) + 1}" + + if tab_id in browser_manager._tabs: + return f"Error: Tab with ID {tab_id} already exists" + + new_page = await browser_manager._context.new_page() + browser_manager._tabs[tab_id] = new_page + + # Switch to the new tab + await BrowserApiMethods.switch_tab(new_page, browser_manager, tab_id) + + return f"Created new tab with ID: {tab_id}" + + async def switch_tab(page: Page, browser_manager, tab_id: str): + if not tab_id: + tab_info = await BrowserApiMethods._get_tab_info_for_logs(browser_manager) + error_msg = f"tab_id is required for switch_tab action. {tab_info}" + logger.error(error_msg) + raise ValueError(error_msg) + + if tab_id not in browser_manager._tabs: + tab_info = await BrowserApiMethods._get_tab_info_for_logs(browser_manager) + error_msg = f"Tab with ID '{tab_id}' not found. {tab_info}" + logger.error(error_msg) + raise ValueError(error_msg) + + browser_manager._page = browser_manager._tabs[tab_id] + browser_manager._cdp_client = await browser_manager._page.context.new_cdp_session(browser_manager._page) + browser_manager._active_tab_id = tab_id + + # Use CDP to bring the tab to the foreground + try: + await browser_manager._cdp_client.send("Page.bringToFront") + logger.info(f"Successfully switched to tab '{tab_id}' and brought it to the foreground") + except Exception as e: + logger.warning(f"Failed to bring tab '{tab_id}' to foreground: {str(e)}") + + return f"Switched to tab: {tab_id}" + + async def close_tab(page: Page, browser_manager, tab_id: str = None): + if not tab_id: + tab_id = browser_manager._active_tab_id + + if tab_id not in browser_manager._tabs: + raise ValueError(f"Tab with ID '{tab_id}' not found. Available tabs: {list(browser_manager._tabs.keys())}") + + # Close the tab + await browser_manager._tabs[tab_id].close() + + # Remove from tracking + del browser_manager._tabs[tab_id] + + # If we closed the active tab, switch to another tab if available + if tab_id == browser_manager._active_tab_id: + if browser_manager._tabs: + next_tab_id = next(iter(browser_manager._tabs.keys())) + await BrowserApiMethods.switch_tab(page, browser_manager, next_tab_id) + else: + browser_manager._page = None + browser_manager._cdp_client = None + browser_manager._active_tab_id = None + + logger.info(f"Successfully closed tab '{tab_id}'") + return f"Closed tab: {tab_id}" + + async def list_tabs(page: Page, browser_manager): + tabs = await BrowserApiMethods._get_tab_info_for_logs(browser_manager) + return json.dumps(tabs, indent=2) + + async def get_cookies(page: Page): + cookies = await page.context.cookies() + return json.dumps(cookies, indent=2) + + async def set_cookies(page: Page, cookies: List[Dict]): + await page.context.add_cookies(cookies) + return "Cookies set successfully" + + async def network_intercept(page: Page, pattern: str): + await page.route(pattern, lambda route: route.continue_()) + return f"Network interception set for {pattern}" + + async def execute_cdp(page: Page, method: str, params: Dict = None): + cdp_client = await page.context.new_cdp_session(page) + result = await cdp_client.send(method, params or {}) + return json.dumps(result, indent=2) + + async def close(page: Page, browser_manager): + await browser_manager.cleanup() + return "Browser closed" + + # Api Helper Functions + async def _get_tab_info_for_logs(self): + """Get a summary of current tabs for error messages""" + tabs = {} + for tab_id, page in self._tabs.items(): + try: + is_active = tab_id == self._active_tab_id + tabs[tab_id] = {"url": page.url, "active": is_active} + except (AttributeError, ConnectionError, Exception) as e: + tabs[tab_id] = {"error": f"Could not retrieve tab info: {str(e)}"} + return tabs + + +# Browser manager class for handling browser interactions +class BrowserManager: + def __init__(self): + self._playwright: Optional[Playwright] = None + self._browser: Optional[Browser] = None + self._context: Optional[BrowserContext] = None + self._page: Optional[Page] = None + self._cdp_client = None + self._user_data_dir = None + self._profile_name = None + self._tabs = {} # Dictionary to track tabs by ID + self._active_tab_id = None # Currently active tab ID + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + self._actions = self._load_actions() + self._nest_asyncio_applied = False # Flag to track if nest_asyncio has been applied + + def _load_actions(self) -> Dict[str, Callable]: + actions = {} + for name, method in inspect.getmembers(BrowserApiMethods, predicate=inspect.isfunction): + if not name.startswith("_"): # Exclude private methods + actions[name] = method + return actions + + async def ensure_browser(self, launch_options=None, context_options=None): + """Initialize browser if not already running.""" + logger.debug("Ensuring browser is running...") + + # Apply nest_asyncio lazily, only when browser is actually needed and only once + if not self._nest_asyncio_applied: + nest_asyncio.apply() + self._nest_asyncio_applied = True + logger.debug("Applied nest_asyncio for nested event loop support") + + # Ensure required directories exist + user_data_dir = os.getenv( + "STRANDS_BROWSER_USER_DATA_DIR", os.path.join(os.path.expanduser("~"), ".browser_automation") + ) + headless = os.getenv("STRANDS_BROWSER_HEADLESS", "false").lower() == "true" + width = int(os.getenv("STRANDS_BROWSER_WIDTH", "1280")) + height = int(os.getenv("STRANDS_BROWSER_HEIGHT", "800")) + os.makedirs(user_data_dir, exist_ok=True) + + try: + if self._playwright is None: + self._playwright = await async_playwright().start() + + default_launch_options = {"headless": headless, "args": [f"--window-size={width},{height}"]} + + if launch_options: + default_launch_options.update(launch_options) + + # Handle persistent context + if launch_options and launch_options.get("persistent_context"): + if launch_options and launch_options.get("persistent_context"): + # Use the environment variable by default, but allow override from launch_options + persistent_user_data_dir = launch_options.get("user_data_dir", user_data_dir) + self._context = await self._playwright.chromium.launch_persistent_context( + user_data_dir=persistent_user_data_dir, + **{ + k: v + for k, v in default_launch_options.items() + if k not in ["persistent_context", "user_data_dir"] + }, + ) + self._browser = None + else: + raise ValueError("user_data_dir is required for persistent context") + else: + # Regular browser launch + logger.debug("Launching browser with options: %s", default_launch_options) + self._browser = await self._playwright.chromium.launch(**default_launch_options) + + # Create context + context_options = context_options or {} + default_context_options = {"viewport": {"width": width, "height": height}} + default_context_options.update(context_options) + + self._context = await self._browser.new_context(**default_context_options) + + self._page = await self._context.new_page() + self._cdp_client = await self._page.context.new_cdp_session(self._page) + + # Initialize tab tracking with the first tab + first_tab_id = "main" + self._tabs[first_tab_id] = self._page + self._active_tab_id = first_tab_id + + if not self._page: + raise ValueError("Browser initialized but page is not available") + + return self._page, self._cdp_client + + except Exception as e: + logger.error(f"Failed to initialize browser: {str(e)}") + # Clean up any partial initialization + await self.cleanup() + # Re-raise the exception so it's caught by the error handling in handle_action + raise + + async def cleanup(self): + cleanup_errors = [] + + for resource in ["_page", "_context", "_browser", "_playwright"]: + attr = getattr(self, resource) + if attr: + try: + if resource == "_playwright": + await attr.stop() + else: + await attr.close() + except Exception as e: + cleanup_errors.append(f"Error closing {resource}: {str(e)}") + + self._page = None + self._context = None + self._browser = None + self._playwright = None + self._cdp_client = None + self._tabs = {} # Clear tab dictionary + self._active_tab_id = None + + if cleanup_errors: + for error in cleanup_errors: + logger.error(error) + else: + logger.info("Cleanup completed successfully") + + async def _fix_javascript_syntax(self, script, error_msg): + """ + Attempts to fix common JavaScript syntax errors based on error messages. + + Args: + script: The original JavaScript code with syntax errors + error_msg: The error message from the JavaScript engine + + Returns: + Fixed JavaScript code if a fix was found, otherwise None + """ + if not script or not error_msg: + return None + + fixed_script = None + # Handle illegal return statements + if "Illegal return statement" in error_msg: + # Wrap in IIFE (Immediately Invoked Function Expression) + fixed_script = f"(function() {{ {script} }})()" + logger.info("Fixing 'Illegal return statement' by wrapping in function") + + # Handle unexpected token errors + elif "Unexpected token" in error_msg: + if "`" in script: # Fix template literals + fixed_script = script.replace("`", "'").replace("${", "' + ").replace("}", " + '") + logger.info("Fixing template literals in script") + elif "=>" in script: # Fix arrow functions in old browsers + fixed_script = script.replace("=>", "function() { return ") + if not fixed_script.strip().endswith("}"): + fixed_script += " }" + logger.info("Fixing arrow functions in script") + + # Handle missing braces/parentheses + elif "Unexpected end of input" in error_msg: + # Count opening and closing braces/parentheses to see if they're balanced + open_chars = script.count("{") + script.count("(") + script.count("[") + close_chars = script.count("}") + script.count(")") + script.count("]") + + if open_chars > close_chars: + # Add missing closing characters + missing = open_chars - close_chars + fixed_script = script + ("}" * missing) + logger.info(f"Added {missing} missing closing braces") + + # Handle uncaught reference errors + elif "is not defined" in error_msg: + var_name = error_msg.split("'")[1] if "'" in error_msg else "" + if var_name: + fixed_script = f"var {var_name} = undefined;\n{script}" + logger.info(f"Adding undefined variable declaration for '{var_name}'") + + # Return the fixed script or None if no fix was applied + return fixed_script + + async def handle_action(self, action: str, **kwargs) -> List[Dict[str, str]]: + max_retries = int(os.getenv("BROWSER_MAX_RETRIES", 3)) + retry_delay = int(os.getenv("BROWSER_RETRY_DELAY", 1)) + + async def execute_action(): + if action not in self._actions: + return [{"text": f"Error: Unknown action {action}"}] + + action_method = self._actions[action] + + # Validate parameters + sig = inspect.signature(action_method) + required_params = [p for p in sig.parameters if sig.parameters[p].default == inspect.Parameter.empty] + for param in required_params: + if param not in args and param not in ["page", "browser_manager"]: + return [{"text": f"Error: Missing required parameter: {param}"}] + + # Execute action + page, _ = await self.ensure_browser(args.get("launchOptions")) + + # Include self (BrowserManager instance) in the arguments + action_args = {k: v for k, v in args.items() if k in sig.parameters} + action_args["page"] = page + if "browser_manager" in sig.parameters: + action_args["browser_manager"] = self + + result = await action_method(**action_args) + + return [{"text": str(result)}] + + args = kwargs.get("args", {}) + + for attempt in range(max_retries): + try: + return await execute_action() + except Exception as e: + if attempt == max_retries - 1: # Last attempt + logger.error(f"Action '{action}' failed after {max_retries} attempts: {str(e)}") + return [{"text": f"Error: {str(e)}"}] + + logger.warning(f"Action '{action}' attempt {attempt + 1} failed: {str(e)}") + + # Check for non-retryable errors + if any( + err in str(e).lower() + for err in [ + "could not resolve domain", + "connection refused", + "ssl/tls error", + "certificate error", + "protocol error (page.navigate): cannot navigate to invalid url", + ] + ): + logger.error(f"Non-retryable error encountered: {str(e)}") + return [{"text": f"Error: {str(e)}"}] + + # If it's the evaluate action and there's a JavaScript error, try to fix it + if action == "evaluate" and "script" in args: + error_types = [ + "SyntaxError", + "ReferenceError", + "TypeError", + "Illegal return", + "Unexpected token", + "Unexpected end", + "is not defined", + ] + if any(err_type in str(e) for err_type in error_types): + fixed_script = await self._fix_javascript_syntax(args["script"], str(e)) + if fixed_script: + args["script"] = fixed_script + logger.warning(f"Attempting retry with fixed JavaScript: {fixed_script}") + continue + + # Exponential backoff + await asyncio.sleep(retry_delay * (2**attempt)) + + +# Initialize global browser manager +_playwright_manager = BrowserManager() + + +@tool +def use_browser( + url: str = None, + wait_time: int = int(os.getenv("DEFAULT_WAIT_TIME", 1)), + action: str = None, + selector: str = None, + input_text: str = None, + script: str = None, + cdp_method: str = None, + cdp_params: dict = None, + launch_options: dict = None, + actions: list = None, + args: dict = None, + key: str = None, +) -> str: + """ + Interactive browser automation tool powered by Playwright. + + Important Usage Guidelines: + - Never guess selectors or locators! Always find them first using these steps: + 1. Use get_html to examine the page structure: + {"action": "get_html"} # Get full page HTML + or + {"action": "get_html", "args": {"selector": "body"}} # Get body HTML + + 2. Use evaluate with JavaScript to find specific elements: + {"action": "evaluate", "args": {"script": ` + return Array.from(document.querySelectorAll('input, button')) + .map(el => ({ + tag: el.tagName, + type: el.type, + id: el.id, + name: el.name, + class: el.className, + placeholder: el.placeholder, + value: el.value + })) + `}} + + 3. Only after finding the correct selector, use it for actions like click or type + + - For complex operations requiring multiple steps, use the 'actions' parameter + - For web searches: + 1. Start with Google (https://www.google.com) + 2. First find the search box: + {"action": "evaluate", "args": {"script": ` + return Array.from(document.querySelectorAll('input')) + .map(el => ({ + type: el.type, + name: el.name, + placeholder: el.placeholder + })) + `}} + 3. If CAPTCHA appears, fallback to DuckDuckGo (https://duckduckgo.com) + + Tab Management: + - Create a new tab with an ID: + {"action": "new_tab", "args": {"tab_id": "search_tab"}} + + - Switch between tabs (MUST provide tab_id in args): + use_browser(action="switch_tab", actions=[{"action": "switch_tab", "args": {"tab_id": "main"}}]) + + # CORRECT EXAMPLES: + # Method 1 (recommended): Using the actions parameter + use_browser(actions=[{"action": "switch_tab", "args": {"tab_id": "main"}}]) + + # Method 2: Using single action with args parameter + use_browser(action="switch_tab", args={"tab_id": "search_tab"}) + + # INCORRECT (will fail): + use_browser(action="switch_tab") # Missing tab_id + + - Close a specific tab: + {"action": "close_tab", "args": {"tab_id": "search_tab"}} + + - List all tabs and their status: + {"action": "list_tabs"} + + - Actions are performed only on the active tab + + Common Multi-Action Patterns: + 1. Form filling (with selector discovery): + actions=[ + {"action": "navigate", "args": {"url": "form_url"}}, + {"action": "get_html"}, # First get page HTML + {"action": "evaluate", "args": {"script": ` + return Array.from(document.querySelectorAll('input')) + .map(el => ({ + id: el.id, + name: el.name, + type: el.type + })) + `}}, # Find input selectors + {"action": "type", "args": {"selector": "#found-input-id", "text": "value"}} + ] + + 2. Web scraping (with content discovery): + actions=[ + {"action": "navigate", "args": {"url": "target_url"}}, + {"action": "evaluate", "args": {"script": ` + return { + content: document.querySelector('main')?.innerHTML, + nextButton: Array.from(document.querySelectorAll('a')) + .find(a => a.textContent.includes('Next'))?.outerHTML + } + `}}, + {"action": "click", "args": {"selector": "discovered-next-button-selector"}} + ] + + 3. Working with multiple tabs: + actions=[ + {"action": "navigate", "args": {"url": "https://example.com"}}, + {"action": "new_tab", "args": {"tab_id": "second_tab"}}, + {"action": "navigate", "args": {"url": "https://example.org"}}, + {"action": "switch_tab", "args": {"tab_id": "main"}}, + {"action": "get_html", "args": {"selector": "h1"}} + ] + + Args: + url (str, optional): URL to navigate to. Used with 'navigate' action. + wait_time (int, optional): Time to wait in seconds after performing an action. + Default is set by DEFAULT_WAIT_TIME env var or 1 second. + action (str, optional): Single action to perform. Common actions include: + - navigate: Go to a URL + - click: Click on an element + - type: Input text into a field + - evaluate: Run JavaScript + - get_text: Get text from an element + - get_html: Get HTML content + - screenshot: Take a screenshot + - new_tab: Create a new browser tab + - switch_tab: Switch to a different tab (REQUIRES tab_id in args) + - close_tab: Close a tab + - list_tabs: List all open tabs + selector (str, optional): CSS selector to identify page elements. Required for + actions like click, type, and get_text. + input_text (str, optional): Text to input into a field. Required for 'type' action. + script (str, optional): JavaScript code to execute. Required for 'evaluate' action. + cdp_method (str, optional): Chrome DevTools Protocol method name for 'execute_cdp' action. + cdp_params (dict, optional): Parameters for CDP method. + launch_options (dict, optional): Browser launch options. Common options include: + - headless: Boolean to run browser in headless mode + - args: List of command-line arguments for the browser + - persistent_context: Boolean to use persistent browser context + - user_data_dir: Path to user data directory for persistent context + actions (list, optional): List of action objects to perform in sequence. + Each action is a dict with 'action', 'args', and optional 'wait_for' keys. + Example: [{"action": "switch_tab", "args": {"tab_id": "main"}}] + args (dict, optional): Dictionary of arguments for the action. Used when specific + parameters are needed for an action, especially for tab operations. + Example: {"tab_id": "main"} for switch_tab action. + key (str, optional): Keyboard key to press for 'press_key' action. + + Returns: + str: Text description of the action results. For single actions, returns the result text. + For multiple actions, returns all results concatenated with newlines. + On error, returns an error message starting with "Error: ". + """ + strands_dev = os.environ.get("BYPASS_TOOL_CONSENT", "").lower() == "true" + + if not strands_dev: + if actions: + action_description = "multiple actions" + action_list = [a.get("action") for a in actions if isinstance(a, dict) and "action" in a] + message = Text("User requested multiple actions: ", style="yellow") + message.append(Text(", ".join(action_list), style="bold cyan")) + else: + action_description = action or "unknown" + message = Text("User requested action: ", style="yellow") + message.append(Text(action_description, style="bold cyan")) + + console.print(Panel(message, title="[bold green]BrowserManager", border_style="green")) + + user_input = get_user_input(f"Do you want to proceed with {action_description}? (y/n)") + if user_input.lower().strip() != "y": + cancellation_reason = ( + user_input if user_input.strip() != "n" else get_user_input("Please provide a reason for cancellation:") + ) + error_message = f"Python code execution cancelled by the user. Reason: {cancellation_reason}" + return { + "status": "error", + "content": [{"text": error_message}], + } + + logger.debug(f"Tool parameters: {locals()}") + try: + # Convert single action to actions list format if not using actions parameter + if not actions and action: + # Prepare args dictionary + action_args = args or {} + + # Add specific parameters to args if provided + if url: + action_args["url"] = url + if input_text: + action_args["text"] = input_text + if script: + action_args["script"] = script + if selector: + action_args["selector"] = selector + if cdp_method: + action_args["method"] = cdp_method + if cdp_params: + action_args["params"] = cdp_params + if key: + action_args["key"] = key + if launch_options: + action_args["launchOptions"] = launch_options + + # Special handling for tab_id parameter + if action == "switch_tab" and "tab_id" not in action_args: + try: + # Only try to get tabs if browser is already initialized + if _playwright_manager._page is not None: + tabs_list = _playwright_manager._loop.run_until_complete(_playwright_manager._list_tabs()) + tab_ids = list(tabs_list.keys()) + return f"Error: tab_id is required for switch_tab action. Available tabs: {tab_ids}" + else: + return "Error: tab_id is required for switch_tab action. Browser not yet initialized." + except Exception: + return "Error: tab_id is required for switch_tab action. Could not retrieve available tabs." + + # For close_tab action, default to active tab if none specified + if action == "close_tab" and "tab_id" not in action_args: + active_tab = _playwright_manager._active_tab_id + if active_tab: + action_args["tab_id"] = active_tab + + actions = [ + { + "action": action, + "args": action_args, + "selector": selector, + "wait_for": wait_time * 1000 if wait_time else None, + } + ] + + # Create a coroutine that runs all actions sequentially + async def run_all_actions(): + results = [] + logger.debug(f"Processing {len(actions)} actions: {actions}") # Debug the actions + for action_item in actions: + action_name = action_item.get("action") + action_args = action_item.get("args", {}) + action_selector = action_item.get("selector") + action_wait_for = action_item.get("wait_for", wait_time * 1000 if wait_time else None) + + if launch_options: + action_args["launchOptions"] = launch_options + + logger.info(f"Executing action: {action_name}") + logger.debug(f"Action args: {action_args}") # Debug the args + + # Execute the action and collect results + content = await _playwright_manager.handle_action( + action=action_name, + args=action_args, + selector=action_selector, + wait_for=action_wait_for, + ) + results.extend(content) + return results + + # Run all actions in a single event loop call + all_content = _playwright_manager._loop.run_until_complete(run_all_actions()) + return "\n".join([item["text"] for item in all_content]) + + except Exception as e: + logger.error(f"Error in use_browser: {str(e)}") + logger.error("Cleaning up browser due to explicit request or error with non-persistent session") + _playwright_manager._loop.run_until_complete(_playwright_manager.cleanup()) + return f"Error: {str(e)}" diff --git a/tests/test_use_browser.py b/tests/test_use_browser.py new file mode 100644 index 0000000..fd28db4 --- /dev/null +++ b/tests/test_use_browser.py @@ -0,0 +1,479 @@ +import asyncio +import os +from unittest.mock import AsyncMock, MagicMock, call, patch + +import pytest +import pytest_asyncio + +from src.strands_tools.use_browser import BrowserManager, use_browser + +# Constants for parametrization +BROWSER_ACTIONS = ["navigate", "click", "type", "press_key", "evaluate", "get_text", "get_html", "screenshot"] +NAVIGATION_ACTIONS = ["back", "forward", "refresh", "new_tab", "close_tab", "get_cookies", "close"] +ERROR_SCENARIOS = [ + ("navigate", {"url": None}, "Error: url required for navigate"), + ("click", {"selector": None}, "Error: selector required for click"), + ("type", {"selector": "#input", "input_text": None}, "Error: text required for type"), +] +LAUNCH_OPTIONS_SCENARIOS = [ + {"headless": True, "slowMo": 100}, + {"args": ["--no-sandbox", "--disable-setuid-sandbox"]}, + {"ignoreDefaultArgs": ["--enable-automation"]}, + {"proxy": {"server": "http://myproxy.com:3128"}}, + {"downloadsPath": "/tmp/downloads"}, + {"chromiumSandbox": False}, +] + + +# Helper Functions +def assert_browser_action(result, expected_text): + """Helper function for common browser action assertions""" + if isinstance(result, dict) and "content" in result: + assert any(expected_text in item["text"] for item in result["content"]) + else: + assert expected_text in result + + +# Fixtures +@pytest.fixture(autouse=True) +def setup_test_environment(): + """Fixture to set up common test environment""" + mock_env = {} + with patch.dict(os.environ, mock_env, clear=True): + mock_env["BYPASS_TOOL_CONSENT"] = "true" + with patch("src.strands_tools.use_browser.get_user_input") as mock_input: + mock_input.return_value = "y" + yield mock_env + + +@pytest.fixture +def mock_browser_chain(): + """Fixture to create common browser chain mocks""" + return { + "page": AsyncMock(), + "context": AsyncMock(), + "browser": AsyncMock(), + "cdp": AsyncMock(), + "playwright": AsyncMock(), + } + + +@pytest.fixture +def browser_manager(mock_browser_chain): + manager = BrowserManager() + manager._playwright = mock_browser_chain["playwright"] + manager._browser = mock_browser_chain["browser"] + manager._context = mock_browser_chain["context"] + manager._page = mock_browser_chain["page"] + manager._cdp_client = mock_browser_chain["cdp"] + + async def mock_ensure_browser(*args, **kwargs): + return manager._page, manager._cdp_client + + manager.ensure_browser = mock_ensure_browser + + manager._page.goto = AsyncMock(return_value=None) + manager._page.click = AsyncMock(return_value=None) + manager._page.fill = AsyncMock(return_value=None) + manager._page.keyboard.press = AsyncMock(return_value=None) + manager._page.evaluate = AsyncMock(return_value="Test Title") + manager._page.text_content = AsyncMock(return_value="Test Content") + manager._page.content = AsyncMock(return_value="") + manager._page.reload = AsyncMock(return_value=None) + manager._page.go_back = AsyncMock(return_value=None) + manager._page.go_forward = AsyncMock(return_value=None) + manager._page.screenshot = AsyncMock(return_value=None) + + return manager + + +@pytest.fixture +def mock_browser_manager(): + """Fixture to mock the browser manager with common setup""" + with patch("src.strands_tools.use_browser._playwright_manager") as mock_manager: + mock_loop = MagicMock() + mock_loop.run_until_complete.return_value = [{"text": "Action completed"}] + mock_manager._loop = mock_loop + yield mock_manager + + +@pytest.fixture +def event_loop(): + """Create an instance of the default event loop for each test case.""" + loop = asyncio.new_event_loop() + yield loop + loop.close() + + +@pytest_asyncio.fixture +async def async_mock_playwright(): + """Fixture to provide a properly configured async mock playwright instance.""" + mock_playwright = AsyncMock() + return mock_playwright + + +# Tests for helper functions + + +@pytest.mark.asyncio +async def test_fix_javascript_syntax_edge_cases(): + browser_manager = BrowserManager() + + assert await browser_manager._fix_javascript_syntax("", "any error") is None + assert await browser_manager._fix_javascript_syntax(None, "error") is None + assert await browser_manager._fix_javascript_syntax("script", None) is None + assert await browser_manager._fix_javascript_syntax("script", "") is None + + +@pytest.mark.asyncio +async def test_fix_javascript_syntax(): + browser_manager = BrowserManager() + + # Test case 1: Illegal return statement + script = "return 42;" + error_msg = "Illegal return statement" + fixed = await browser_manager._fix_javascript_syntax(script, error_msg) + assert fixed == "(function() { return 42; })()" + + # Test case 2: Unexpected token (template literals) + script = "console.log(`Hello ${name}!`);" + error_msg = "Unexpected token '`'" + fixed = await browser_manager._fix_javascript_syntax(script, error_msg) + assert fixed == "console.log('Hello ' + name + '!');" + + # Test case 3: Unexpected token (arrow function) + script = "const add = (a, b) => a + b;" + error_msg = "Unexpected token '=>'" + fixed = await browser_manager._fix_javascript_syntax(script, error_msg) + assert fixed == "const add = (a, b) function() { return a + b; }" + + # Test case 4: Unexpected end of input (missing closing brace) + script = "function test() { console.log('Hello')" + error_msg = "Unexpected end of input" + fixed = await browser_manager._fix_javascript_syntax(script, error_msg) + assert fixed == "function test() { console.log('Hello')}" + + # Test case 5: Uncaught reference error + script = "console.log(undefinedVar);" + error_msg = "'undefinedVar' is not defined" + fixed = await browser_manager._fix_javascript_syntax(script, error_msg) + assert fixed == "var undefinedVar = undefined;\nconsole.log(undefinedVar);" + + # Test case 6: No fix needed + script = "console.log('Hello, World!');" + error_msg = "Some other error" + fixed = await browser_manager._fix_javascript_syntax(script, error_msg) + assert fixed is None + + # Test case 7: Empty script + fixed = await browser_manager._fix_javascript_syntax("", "Any error") + assert fixed is None + + # Test case 8: Empty error message + fixed = await browser_manager._fix_javascript_syntax("var x = 5;", "") + assert fixed is None + + # Test case 9: Both script and error message are empty + fixed = await browser_manager._fix_javascript_syntax("", "") + assert fixed is None + + +# Test BYPASS_TOOL_CONSENT environment variable functions correctly +def test_use_browser_with_bypass_consent(): + """Test use_browser with bypassed consent""" + with patch.dict(os.environ, {"BYPASS_TOOL_CONSENT": "true"}): + with patch("src.strands_tools.use_browser._playwright_manager") as mock_manager: + mock_manager._loop = MagicMock() + mock_manager._loop.run_until_complete.return_value = [{"text": "Success"}] + result = use_browser(action="test") + assert "Success" in result + + +def test_use_browser_without_bypass_consent(): + """Test use_browser without bypassed consent""" + with patch.dict(os.environ, {"BYPASS_TOOL_CONSENT": "false"}): + with patch("src.strands_tools.use_browser.get_user_input") as mock_input: + mock_input.return_value = "n" + result = use_browser(action="test") + assert isinstance(result, dict) + assert "error" in result["status"] + + +def test_use_browser_with_invalid_action(): + """Test use_browser with invalid action""" + with patch.dict(os.environ, {"BYPASS_TOOL_CONSENT": "true"}): + with patch("src.strands_tools.use_browser._playwright_manager") as mock_manager: + mock_manager._loop = MagicMock() + mock_manager._loop.run_until_complete.side_effect = Exception("Invalid action") + + with pytest.raises(Exception) as excinfo: + use_browser(action="invalid") + + assert str(excinfo.value) == "Invalid action" + + +# Browser setup tests +@pytest.mark.asyncio +async def test_browser_manager_initialization(): + """Test BrowserManager initialization""" + browser_manager = BrowserManager() + assert browser_manager._playwright is None + assert browser_manager._browser is None + assert browser_manager._context is None + assert browser_manager._page is None + assert browser_manager._cdp_client is None + assert browser_manager._user_data_dir is None + assert browser_manager._profile_name is None + assert isinstance(browser_manager._loop, asyncio.AbstractEventLoop) + + +@pytest.mark.parametrize("launch_options", LAUNCH_OPTIONS_SCENARIOS) +def test_launch_options_combinations(mock_browser_manager, launch_options): + mock_browser_manager._loop.run_until_complete.return_value = [{"text": "Browser launched with custom options"}] + result = use_browser(action="connect", launch_options=launch_options) + assert_browser_action(result, "Browser launched with custom options") + + +@pytest.mark.asyncio +async def test_browser_manager_loop_setup(): + """Test event loop setup in BrowserManager""" + with patch("asyncio.new_event_loop") as mock_new_loop: + with patch("asyncio.set_event_loop") as mock_set_loop: + mock_loop = AsyncMock() + mock_new_loop.return_value = mock_loop + + browser_manager = BrowserManager() + + mock_new_loop.assert_called_once() + mock_set_loop.assert_called_once_with(mock_loop) + assert browser_manager._loop == mock_loop + + +# Tests for calling use_browser with multiple actions + + +@pytest.mark.asyncio +async def test_use_browser_with_multiple_actions_approval(): + """Test use_browser with multiple actions and user approval""" + with patch.dict("os.environ", {"BYPASS_TOOL_CONSENT": "false"}): + with patch("src.strands_tools.use_browser._playwright_manager") as mock_manager: + mock_manager._loop = MagicMock() + + mock_manager._loop.run_until_complete.return_value = [ + {"text": "Navigated to https://example.com"}, + {"text": "Clicked #button"}, + {"text": "Typed 'Hello, World!' into #input"}, + ] + + actions = [ + {"action": "navigate", "args": {"url": "https://example.com"}}, + {"action": "click", "args": {"selector": "#button"}}, + {"action": "type", "args": {"selector": "#input", "text": "Hello, World!"}}, + ] + + with patch("src.strands_tools.use_browser.console") as mock_console: + with patch("src.strands_tools.use_browser.get_user_input") as mock_input: + with patch("src.strands_tools.use_browser.Panel") as mock_panel: + mock_input.return_value = "y" + + result = use_browser(actions=actions) + + mock_panel.assert_called_once() + panel_args = mock_panel.call_args[0][0] + + assert "User requested multiple actions:" in str(panel_args) + assert "navigate" in str(panel_args) + assert "click" in str(panel_args) + assert "type" in str(panel_args) + + assert mock_console.print.call_count == 1 + + mock_input.assert_called_once_with("Do you want to proceed with multiple actions? (y/n)") + + expected_result = ( + "Navigated to https://example.com\n" "Clicked #button\n" "Typed 'Hello, World!' into #input" + ) + assert result == expected_result + + assert mock_manager._loop.run_until_complete.call_count == 1 + + +@pytest.mark.asyncio +async def test_run_all_actions_coroutine(): + """Test that run_all_actions coroutine is created and executed correctly""" + with patch("src.strands_tools.use_browser._playwright_manager") as mock_manager: + mock_manager._loop = MagicMock() + mock_manager.handle_action = AsyncMock() + + mock_manager.handle_action.side_effect = [ + [{"text": "Navigated to https://example.com"}], + [{"text": "Clicked #button"}], + [{"text": "Typed 'Hello, World!' into #input"}], + ] + combined_results = [ + {"text": "Navigated to https://example.com"}, + {"text": "Clicked #button"}, + {"text": "Typed 'Hello, World!' into #input"}, + ] + + mock_manager._loop.run_until_complete = MagicMock(return_value=combined_results) + actions = [ + {"action": "navigate", "args": {"url": "https://example.com"}, "wait_for": 2000}, + {"action": "click", "args": {"selector": "#button"}, "wait_for": 1000}, + {"action": "type", "args": {"selector": "#input", "text": "Hello, World!"}}, + ] + + launch_options = {"headless": True} + default_wait_time = 1 + + with patch.dict("os.environ", {"BYPASS_TOOL_CONSENT": "true"}): + result = use_browser(actions=actions, launch_options=launch_options) + + run_all_actions_coroutine = mock_manager._loop.run_until_complete.call_args[0][0] + + assert asyncio.iscoroutine(run_all_actions_coroutine) + + expected_calls = [ + call( + action="navigate", + args={"url": "https://example.com", "launchOptions": launch_options}, + selector=None, + wait_for=2000, + ), + call( + action="click", + args={"selector": "#button", "launchOptions": launch_options}, + selector=None, + wait_for=1000, + ), + call( + action="type", + args={"selector": "#input", "text": "Hello, World!", "launchOptions": launch_options}, + selector=None, + wait_for=default_wait_time * 1000, + ), + ] + + await run_all_actions_coroutine + + assert mock_manager.handle_action.call_args_list == expected_calls + + expected_result = ( + "Navigated to https://example.com\n" "Clicked #button\n" "Typed 'Hello, World!' into #input" + ) + assert result == expected_result + + +# Tests covering if statements in use_browser main function (lines ~ 510-525) + + +@pytest.mark.asyncio +async def test_use_browser_single_action_url(): + with patch("src.strands_tools.use_browser._playwright_manager") as mock_manager: + mock_manager._loop = MagicMock() + mock_manager.handle_action = AsyncMock(return_value=[{"text": "Navigated to https://example.com"}]) + mock_manager._loop.run_until_complete.return_value = [{"text": "Navigated to https://example.com"}] + + result = use_browser(action="navigate", url="https://example.com") + + mock_manager._loop.run_until_complete.assert_called_once() + assert result == "Navigated to https://example.com" + + +@pytest.mark.asyncio +async def test_use_browser_single_action_input_text(): + with patch("src.strands_tools.use_browser._playwright_manager") as mock_manager: + mock_manager._loop = MagicMock() + mock_manager.handle_action = AsyncMock(return_value=[{"text": "Typed 'Hello World' into #input"}]) + mock_manager._loop.run_until_complete.return_value = [{"text": "Typed 'Hello World' into #input"}] + + result = use_browser(action="type", selector="#input", input_text="Hello World") + + mock_manager._loop.run_until_complete.assert_called_once() + assert result == "Typed 'Hello World' into #input" + + +# Testing errors + + +@pytest.mark.asyncio +async def test_cleanup_error_handling(browser_manager): + """Test cleanup error handling""" + page_mock = browser_manager._page + context_mock = browser_manager._context + browser_mock = browser_manager._browser + playwright_mock = browser_manager._playwright + + page_mock.close = AsyncMock(side_effect=Exception("Page close error")) + context_mock.close = AsyncMock(side_effect=Exception("Context close error")) + browser_mock.close = AsyncMock(side_effect=Exception("Browser close error")) + playwright_mock.stop = AsyncMock(side_effect=Exception("Playwright stop error")) + + await browser_manager.cleanup() + + page_mock.close.assert_called_once() + context_mock.close.assert_called_once() + browser_mock.close.assert_called_once() + playwright_mock.stop.assert_called_once() + + assert browser_manager._page is None + assert browser_manager._context is None + assert browser_manager._browser is None + assert browser_manager._playwright is None + assert browser_manager._cdp_client is None + + +@pytest.mark.parametrize("error_scenario", ERROR_SCENARIOS) +def test_complex_error_conditions(mock_browser_manager, error_scenario): + action, args, expected_error = error_scenario + mock_browser_manager._loop.run_until_complete.return_value = [{"text": expected_error}] + result = use_browser(action=action, **args) + assert_browser_action(result, expected_error) + + +@pytest.mark.asyncio +async def test_handle_action_unknown_action(browser_manager): + """Test handling of unknown actions""" + result = await browser_manager.handle_action("unknown_action") + assert "Error: Unknown action" in result[0]["text"] + + +# Cleanup tests + + +@pytest.mark.asyncio +async def test_browser_manager_cleanup(browser_manager): + await browser_manager.cleanup() + + if browser_manager._page: + browser_manager._page.close.assert_called_once() + if browser_manager._context: + browser_manager._context.close.assert_called_once() + if browser_manager._browser: + browser_manager._browser.close.assert_called_once() + if browser_manager._playwright: + browser_manager._playwright.stop.assert_called_once() + + +@pytest.mark.asyncio +async def test_cleanup_with_no_resources(): + """Test cleanup when no browser resources are initialized""" + browser_manager = BrowserManager() + browser_manager._page = None + browser_manager._context = None + browser_manager._browser = None + browser_manager._playwright = None + browser_manager._cdp_client = None + + with patch("src.strands_tools.use_browser.logger") as mock_logger: + await browser_manager.cleanup() + + mock_logger.info.assert_called_once_with("Cleanup completed successfully") + + mock_logger.warning.assert_not_called() + + assert browser_manager._page is None + assert browser_manager._context is None + assert browser_manager._browser is None + assert browser_manager._playwright is None + assert browser_manager._cdp_client is None