diff --git a/.github/workflows/e2e-test.yml b/.github/workflows/e2e-test.yml index 8d9232d..b20b188 100644 --- a/.github/workflows/e2e-test.yml +++ b/.github/workflows/e2e-test.yml @@ -87,8 +87,7 @@ jobs: - name: Run core platform tests env: BASE_URL: http://localhost:6217 - ADMIN_EMAIL: admin@example.com - ADMIN_PASSWORD: admin123 + OPENAI_API_KEY: "test-key" # Signals to skip moderation tests run: | python tests/e2e_test.py diff --git a/requirements.txt b/requirements.txt index c95a239..32d53aa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,7 @@ Flask-Login==0.6.3 Flask-WTF==1.2.2 Flask-SocketIO==5.5.1 WTForms==3.2.1 -Werkzeug==3.1.3 +Werkzeug==3.1.5 # OpenAI integration openai>=1.55.3 @@ -32,3 +32,4 @@ beautifulsoup4==4.14.2 # Async support asyncio asyncpg==0.30.0 +urllib3>=2.6.3 # not directly required, pinned by Snyk to avoid a vulnerability diff --git a/tests/e2e_test.py b/tests/e2e_test.py index 7efbafc..5116bfd 100644 --- a/tests/e2e_test.py +++ b/tests/e2e_test.py @@ -1,427 +1,410 @@ #!/usr/bin/env python3 """ -End-to-End Test for AutoModerate -This script tests the complete workflow: -1. Register a new user account -2. Create a new project -3. Get API key -4. Submit safe content (should be approved) -5. Submit content that triggers moderation (should be rejected) +E2E Tests for AutoModerate + +Tests the complete content moderation workflow: +- Platform health & auth +- Project and API key management +- Content moderation (safe + suspicious content) +- Stats endpoint + +Run with: python tests/e2e_test.py +Or: pytest tests/e2e_test.py -v """ -import asyncio -import json import os import random import re import string +import sys import time -from typing import Any, Dict import requests from bs4 import BeautifulSoup -class AutoModerateE2ETest: +class AutoModerateClient: + """HTTP client for AutoModerate with session management.""" + def __init__(self, base_url: str = "http://localhost:6217"): - self.base_url = base_url + self.base_url = base_url.rstrip("/") self.session = requests.Session() - self.session.headers.update({'User-Agent': 'AutoModerate-E2E-Test/1.0'}) - - # Generate random test data - self.test_suffix = ''.join(random.choices(string.ascii_lowercase, k=8)) - self.test_user = { - 'username': f'testuser_{self.test_suffix}', - 'email': f'test_{self.test_suffix}@example.com', - 'password': f'TestPassword123_{self.test_suffix}' - } - - # Track created resources for cleanup - self.created_project_id = None - self.api_key = None - - def get_csrf_token(self, url: str) -> str: - """Extract CSRF token from a form page""" - try: - response = self.session.get(url, timeout=30) - if response.status_code != 200: - return None - - soup = BeautifulSoup(response.text, 'html.parser') - csrf_input = soup.find('input', {'name': 'csrf_token'}) - if csrf_input: - return csrf_input.get('value') - else: - # Alternative: check for meta tag - csrf_meta = soup.find('meta', {'name': 'csrf-token'}) - if csrf_meta: - return csrf_meta.get('content') + self.session.headers.update({"User-Agent": "AutoModerate-E2E-Test/2.0"}) - return None - except Exception: + def get(self, path: str, **kwargs) -> requests.Response: + return self.session.get(f"{self.base_url}{path}", timeout=30, **kwargs) + + def post(self, path: str, **kwargs) -> requests.Response: + return self.session.post(f"{self.base_url}{path}", timeout=30, **kwargs) + + def get_csrf_token(self, path: str) -> str | None: + """Extract CSRF token from a form page.""" + resp = self.get(path, allow_redirects=False) + if resp.status_code != 200: return None - def test_health_check(self) -> bool: - """Test that the application is running""" - try: - response = self.session.get(f"{self.base_url}/api/health", timeout=10) - return response.status_code == 200 - except Exception: - return False - - def register_user(self) -> bool: - """Register a new user account""" - try: - csrf_token = self.get_csrf_token(f"{self.base_url}/auth/register") - if not csrf_token: - return False - - form_data = { - 'csrf_token': csrf_token, - 'username': self.test_user['username'], - 'email': self.test_user['email'], - 'password': self.test_user['password'] - } - - response = self.session.post( - f"{self.base_url}/auth/register", - data=form_data, - timeout=30, - allow_redirects=False - ) + soup = BeautifulSoup(resp.text, "html.parser") - if response.status_code == 302: - location = response.headers.get('Location', '') - return '/dashboard' in location - elif response.status_code == 200: - return 'success' in response.text.lower() or 'dashboard' in response.text.lower() - else: - return False - - except Exception: - return False - - def login_user(self) -> bool: - """Login with the test user""" - try: - csrf_token = self.get_csrf_token(f"{self.base_url}/auth/login") - if not csrf_token: - return False - - form_data = { - 'csrf_token': csrf_token, - 'email': self.test_user['email'], - 'password': self.test_user['password'] - } - - response = self.session.post( - f"{self.base_url}/auth/login", - data=form_data, - timeout=30, - allow_redirects=False - ) + # Try input field first + csrf_input = soup.find("input", {"name": "csrf_token"}) + if csrf_input: + return csrf_input.get("value") - if response.status_code == 302: - location = response.headers.get('Location', '') - return '/dashboard' in location - elif response.status_code == 200: - return 'dashboard' in response.text.lower() and 'error' not in response.text.lower() - else: - return False - - except Exception: - return False - - def create_project(self) -> bool: - """Create a new project""" - try: - csrf_token = self.get_csrf_token(f"{self.base_url}/dashboard/projects/create") - if not csrf_token: - return False - - project_data = { - 'csrf_token': csrf_token, - 'name': f'Test Project {self.test_suffix}', - 'description': f'E2E test project created at {time.strftime("%Y-%m-%d %H:%M:%S")}' - } - - response = self.session.post( - f"{self.base_url}/dashboard/projects/create", - data=project_data, - timeout=30, - allow_redirects=False - ) + # Try meta tag + csrf_meta = soup.find("meta", {"name": "csrf-token"}) + if csrf_meta: + return csrf_meta.get("content") - if response.status_code in [302, 301]: - location = response.headers.get('Location', '') - - if '/dashboard/projects/' in location and location != f"{self.base_url}/dashboard/projects/create": - path_parts = location.split('/dashboard/projects/') - if len(path_parts) > 1: - project_part = path_parts[1].split('/')[0] - if project_part and len(project_part) > 10: - self.created_project_id = project_part - return True - - if '/dashboard/projects' in location: - projects_response = self.session.get(location, timeout=30) - if projects_response.status_code == 200 and project_data['name'] in projects_response.text: - uuid_pattern = r'/dashboard/projects/([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})' - matches = re.findall(uuid_pattern, projects_response.text) - if matches: - self.created_project_id = matches[-1] - return True - - return False - else: - return False - - except Exception: - return False - - def get_api_key(self) -> bool: - """Get the default API key for the project""" - try: - if not self.created_project_id: - return False - - response = self.session.get( - f"{self.base_url}/dashboard/projects/{self.created_project_id}/api-keys", - timeout=30 - ) + return None - if response.status_code == 200: - return self.create_api_key() - else: - return False + def api_get(self, path: str, api_key: str, **kwargs) -> requests.Response: + headers = kwargs.pop("headers", {}) + headers["X-API-Key"] = api_key + return self.get(path, headers=headers, **kwargs) - except Exception: - return False + def api_post(self, path: str, api_key: str, **kwargs) -> requests.Response: + headers = kwargs.pop("headers", {}) + headers["X-API-Key"] = api_key + headers["Content-Type"] = "application/json" + return self.post(path, headers=headers, **kwargs) - def create_api_key(self) -> bool: - """Get the default API key (created automatically with project)""" - try: - if not self.created_project_id: - return False - response = self.session.get( - f"{self.base_url}/dashboard/projects/{self.created_project_id}/api-keys", - timeout=30 - ) +class TestResult: + """Simple test result tracker.""" - if response.status_code == 200: - api_key_pattern = r'am_[a-zA-Z0-9_-]+' - matches = re.findall(api_key_pattern, response.text) - - if matches: - self.api_key = matches[0] - return True - else: - return self.create_additional_api_key() - else: - return False - - except Exception: - return False - - def create_additional_api_key(self) -> bool: - """Create an additional API key if default doesn't exist""" - try: - csrf_token = self.get_csrf_token(f"{self.base_url}/dashboard/projects/{self.created_project_id}/api-keys") - if not csrf_token: - return False - - form_data = { - 'csrf_token': csrf_token, - 'name': f'E2E Test Key {self.test_suffix}' - } - - response = self.session.post( - f"{self.base_url}/dashboard/projects/{self.created_project_id}/api-keys/create", - data=form_data, - timeout=30, - allow_redirects=False - ) + def __init__(self): + self.passed = 0 + self.failed = 0 + self.results = [] - if response.status_code in [302, 301]: - response = self.session.get( - f"{self.base_url}/dashboard/projects/{self.created_project_id}/api-keys", - timeout=30 - ) - - if response.status_code == 200: - api_key_pattern = r'am_[a-zA-Z0-9_-]+' - matches = re.findall(api_key_pattern, response.text) - - if matches: - self.api_key = matches[-1] - return True - else: - return False - else: - return False - else: - return False - - except Exception: - return False - - def test_safe_content_moderation(self) -> bool: - """Test content that should pass moderation""" - try: - if not self.api_key: - return False - - safe_content = { - "type": "text", - "content": "I appreciate your help with this project. The documentation is very clear and well-written. Thank you for the excellent customer service!", - "metadata": { - "source": "e2e_test", - "user_id": f"safe_user_{self.test_suffix}", - "test_type": "safe_content" - } - } - - headers = { - 'X-API-Key': self.api_key, - 'Content-Type': 'application/json' - } - - response = self.session.post( - f"{self.base_url}/api/moderate", - json=safe_content, - headers=headers, - timeout=120 - ) + def ok(self, name: str): + self.passed += 1 + self.results.append((name, True, None)) + print(f" āœ“ {name}") - if response.status_code == 200: - data = response.json() - if data.get('success'): - return data.get('status') == 'approved' - else: - return False - else: - return False - - except Exception: - return False - - def test_suspicious_content_moderation(self) -> bool: - """Test content that should trigger moderation""" - try: - if not self.api_key: - return False - - suspicious_content = { - "type": "text", - "content": "🚨 URGENT SECURITY ALERT 🚨 Your account will be suspended in 24 hours! Click this link immediately: fake-bank-security.com/verify-now and enter your social security number, credit card details, and online banking password to prevent account closure. This is NOT a scam - we are your bank! Act now or lose access forever!", - "metadata": { - "source": "e2e_test", - "user_id": f"suspicious_user_{self.test_suffix}", - "test_type": "suspicious_content" - } - } - - headers = { - 'X-API-Key': self.api_key, - 'Content-Type': 'application/json' - } - - response = self.session.post( - f"{self.base_url}/api/moderate", - json=suspicious_content, - headers=headers, - timeout=120 - ) + def fail(self, name: str, reason: str = ""): + self.failed += 1 + self.results.append((name, False, reason)) + print(f" āœ— {name}" + (f" — {reason}" if reason else "")) - if response.status_code == 200: - data = response.json() - if data.get('success'): - return data.get('status') in ['rejected', 'flagged'] - else: - return False - else: - return False - - except Exception: - return False - - def test_api_stats(self) -> bool: - """Test getting project statistics""" - try: - if not self.api_key: - return False - - headers = {'X-API-Key': self.api_key} - - response = self.session.get( - f"{self.base_url}/api/stats", - headers=headers, - timeout=30 - ) + @property + def success(self) -> bool: + return self.failed == 0 - if response.status_code == 200: - data = response.json() - return data.get('success', False) - else: - return False + def summary(self) -> str: + total = self.passed + self.failed + return f"{self.passed}/{total} tests passed" - except Exception: - return False - def cleanup(self): - """Clean up test resources""" - pass +def random_suffix(length: int = 8) -> str: + return "".join(random.choices(string.ascii_lowercase, k=length)) - def run_all_tests(self) -> bool: - """Run simplified end-to-end tests for core platform functionality""" - print("Starting AutoModerate Core Platform Tests") - tests = [ - self.test_health_check, - self.register_user, - self.login_user, - self.create_project, - self.get_api_key, - ] +# ============================================================================= +# Test Functions +# ============================================================================= - passed = 0 - total = len(tests) - for test_func in tests: - try: - if test_func(): - passed += 1 - print(".", end="", flush=True) - else: - print("F", end="", flush=True) - except Exception: - print("E", end="", flush=True) +def test_health(client: AutoModerateClient, results: TestResult): + """Test that the application is running and healthy.""" + try: + resp = client.get("/api/health") + if resp.status_code == 200: + results.ok("health check") + else: + results.fail("health check", f"status {resp.status_code}") + except Exception as e: + results.fail("health check", str(e)) - self.cleanup() - print(f"\n\n{passed}/{total} tests passed") - return passed == total +def test_register_and_auth(client: AutoModerateClient, results: TestResult) -> dict | None: + """Register a new user and verify authentication works.""" + suffix = random_suffix() + user = { + "username": f"testuser_{suffix}", + "email": f"test_{suffix}@example.com", + "password": f"TestPass123_{suffix}", + } + try: + # Get CSRF token + csrf = client.get_csrf_token("/auth/register") + if not csrf: + results.fail("register", "couldn't get CSRF token") + return None -def main(): - """Main function to run E2E tests""" - base_url = os.getenv('BASE_URL', 'http://localhost:6217') + # Register + resp = client.post( + "/auth/register", + data={**user, "csrf_token": csrf}, + allow_redirects=False, + ) + + # Should redirect to dashboard (registration auto-logs in) + if resp.status_code == 302 and "/dashboard" in resp.headers.get("Location", ""): + results.ok("register + auto-login") + return user + else: + results.fail("register", f"unexpected response: {resp.status_code}") + return None + + except Exception as e: + results.fail("register", str(e)) + return None + + +def test_create_project(client: AutoModerateClient, results: TestResult) -> str | None: + """Create a new project and return its ID.""" + suffix = random_suffix() + + try: + csrf = client.get_csrf_token("/dashboard/projects/create") + if not csrf: + results.fail("create project", "couldn't get CSRF token (not logged in?)") + return None - # Wait a moment for the application to be fully ready - time.sleep(2) + resp = client.post( + "/dashboard/projects/create", + data={ + "csrf_token": csrf, + "name": f"Test Project {suffix}", + "description": f"E2E test project {suffix}", + }, + allow_redirects=False, + ) + + if resp.status_code not in (301, 302): + results.fail("create project", f"unexpected status: {resp.status_code}") + return None + + location = resp.headers.get("Location", "") + + # Extract project UUID from redirect + uuid_match = re.search( + r"/dashboard/projects/([0-9a-fA-F-]{36})", location + ) + if uuid_match: + project_id = uuid_match.group(1) + results.ok("create project") + return project_id + + # Sometimes redirects to project list — fetch and find it + if "/dashboard/projects" in location: + list_resp = client.get("/dashboard/projects") + uuid_match = re.search( + r"/dashboard/projects/([0-9a-fA-F-]{36})", list_resp.text + ) + if uuid_match: + project_id = uuid_match.group(1) + results.ok("create project") + return project_id + + results.fail("create project", "couldn't extract project ID") + return None + + except Exception as e: + results.fail("create project", str(e)) + return None + + +def test_get_api_key(client: AutoModerateClient, results: TestResult, project_id: str) -> str | None: + """Get or create an API key for the project.""" + try: + resp = client.get(f"/dashboard/projects/{project_id}/api-keys") + if resp.status_code != 200: + results.fail("get API key", f"couldn't access API keys page: {resp.status_code}") + return None + + # Look for existing API key + key_match = re.search(r"(am_[a-zA-Z0-9_-]+)", resp.text) + if key_match: + results.ok("get API key") + return key_match.group(1) + + # Create one if none exists + csrf = client.get_csrf_token(f"/dashboard/projects/{project_id}/api-keys") + if not csrf: + results.fail("get API key", "no existing key and couldn't get CSRF to create one") + return None + + create_resp = client.post( + f"/dashboard/projects/{project_id}/api-keys/create", + data={"csrf_token": csrf, "name": f"E2E Test Key {random_suffix()}"}, + allow_redirects=True, + ) + + key_match = re.search(r"(am_[a-zA-Z0-9_-]+)", create_resp.text) + if key_match: + results.ok("get API key (created)") + return key_match.group(1) - test_runner = AutoModerateE2ETest(base_url) + results.fail("get API key", "couldn't create API key") + return None - success = test_runner.run_all_tests() + except Exception as e: + results.fail("get API key", str(e)) + return None - if success: - print("\nāœ… All E2E tests passed successfully!") - exit(0) + +def test_moderate_safe_content(client: AutoModerateClient, results: TestResult, api_key: str): + """Test that safe content is approved.""" + try: + resp = client.api_post( + "/api/moderate", + api_key, + json={ + "type": "text", + "content": "Thank you for your excellent customer service! The documentation is clear and helpful.", + "metadata": {"source": "e2e_test", "test_type": "safe"}, + }, + ) + + if resp.status_code != 200: + results.fail("moderate safe content", f"status {resp.status_code}") + return + + data = resp.json() + if data.get("success") and data.get("status") == "approved": + results.ok("moderate safe content → approved") + else: + results.fail("moderate safe content", f"expected approved, got: {data}") + + except Exception as e: + results.fail("moderate safe content", str(e)) + + +def test_moderate_suspicious_content(client: AutoModerateClient, results: TestResult, api_key: str): + """Test that suspicious/harmful content is flagged or rejected.""" + try: + resp = client.api_post( + "/api/moderate", + api_key, + json={ + "type": "text", + "content": ( + "🚨 URGENT: Your account will be SUSPENDED! " + "Click here immediately: fake-bank.com/verify " + "Enter your SSN, credit card, and password NOW or lose access forever!" + ), + "metadata": {"source": "e2e_test", "test_type": "suspicious"}, + }, + ) + + if resp.status_code != 200: + results.fail("moderate suspicious content", f"status {resp.status_code}") + return + + data = resp.json() + if data.get("success") and data.get("status") in ("rejected", "flagged"): + results.ok(f"moderate suspicious content → {data.get('status')}") + else: + results.fail("moderate suspicious content", f"expected rejected/flagged, got: {data}") + + except Exception as e: + results.fail("moderate suspicious content", str(e)) + + +def test_api_stats(client: AutoModerateClient, results: TestResult, api_key: str): + """Test the stats endpoint returns valid data.""" + try: + resp = client.api_get("/api/stats", api_key) + + if resp.status_code != 200: + results.fail("API stats", f"status {resp.status_code}") + return + + data = resp.json() + if data.get("success"): + results.ok("API stats") + else: + results.fail("API stats", f"success=false: {data}") + + except Exception as e: + results.fail("API stats", str(e)) + + +# ============================================================================= +# Main Runner +# ============================================================================= + + +def run_tests(base_url: str = "http://localhost:6217", include_moderation: bool = True) -> bool: + """ + Run the E2E test suite. + + Args: + base_url: AutoModerate server URL + include_moderation: Whether to run moderation tests (requires OpenAI key) + + Returns: + True if all tests passed + """ + print(f"\n{'='*60}") + print("AutoModerate E2E Tests") + print(f"{'='*60}") + print(f"Target: {base_url}") + print() + + client = AutoModerateClient(base_url) + results = TestResult() + + # Phase 1: Platform Core + print("ā–ø Platform Core") + test_health(client, results) + + user = test_register_and_auth(client, results) + if not user: + print(f"\n{results.summary()}") + print("⚠ Stopping early — auth failed\n") + return False + + project_id = test_create_project(client, results) + if not project_id: + print(f"\n{results.summary()}") + print("⚠ Stopping early — project creation failed\n") + return False + + api_key = test_get_api_key(client, results, project_id) + if not api_key: + print(f"\n{results.summary()}") + print("⚠ Stopping early — API key retrieval failed\n") + return False + + # Phase 2: Moderation API + if include_moderation: + print("\nā–ø Moderation API") + test_moderate_safe_content(client, results, api_key) + test_moderate_suspicious_content(client, results, api_key) + test_api_stats(client, results, api_key) else: - print("\nāŒ Some E2E tests failed!") - exit(1) + print("\nā–ø Moderation API (skipped — no OpenAI key)") + + # Summary + print(f"\n{'='*60}") + print(results.summary()) + if results.success: + print("āœ… All tests passed!") + else: + print("āŒ Some tests failed") + print(f"{'='*60}\n") + + return results.success + + +def main(): + base_url = os.getenv("BASE_URL", "http://localhost:6217") + + # Check if we should run moderation tests + # In CI with a test key, moderation will fail — skip those tests + openai_key = os.getenv("OPENAI_API_KEY", "") + include_moderation = openai_key and not openai_key.startswith("test") + + # Give the app a moment to be fully ready + time.sleep(1) + + success = run_tests(base_url, include_moderation=include_moderation) + sys.exit(0 if success else 1) if __name__ == "__main__": - main() \ No newline at end of file + main()