From 859c076cb689badaf266e7fb9a11beb96677326f Mon Sep 17 00:00:00 2001 From: Barac9492 Date: Mon, 16 Mar 2026 16:54:41 +0900 Subject: [PATCH 01/22] feat: add prediction market sentiment engine Build a pipeline that fetches Polymarket markets, converts questions into balanced simulation scenarios via LLM, runs multi-agent Reddit simulations, analyzes sentiment/consensus, and surfaces trading signals by comparing simulated probability vs market odds. Backend: polymarket_client, scenario_generator, sentiment_analyzer, prediction_manager (pipeline orchestrator), prediction API blueprint. Frontend: PredictionView with market browser + signal dashboard. Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/app/__init__.py | 3 +- backend/app/api/__init__.py | 2 + backend/app/api/prediction.py | 238 ++++++++++++ backend/app/config.py | 8 + backend/app/models/prediction.py | 270 +++++++++++++ backend/app/services/polymarket_client.py | 122 ++++++ backend/app/services/prediction_manager.py | 278 +++++++++++++ backend/app/services/scenario_generator.py | 132 +++++++ backend/app/services/sentiment_analyzer.py | 253 ++++++++++++ frontend/src/api/prediction.js | 35 ++ frontend/src/router/index.js | 6 + frontend/src/views/Home.vue | 4 +- frontend/src/views/PredictionView.vue | 430 +++++++++++++++++++++ tasks/todo.md | 35 ++ 14 files changed, 1814 insertions(+), 2 deletions(-) create mode 100644 backend/app/api/prediction.py create mode 100644 backend/app/models/prediction.py create mode 100644 backend/app/services/polymarket_client.py create mode 100644 backend/app/services/prediction_manager.py create mode 100644 backend/app/services/scenario_generator.py create mode 100644 backend/app/services/sentiment_analyzer.py create mode 100644 frontend/src/api/prediction.js create mode 100644 frontend/src/views/PredictionView.vue create mode 100644 tasks/todo.md diff --git a/backend/app/__init__.py b/backend/app/__init__.py index e874cea..05dc080 100644 --- a/backend/app/__init__.py +++ b/backend/app/__init__.py @@ -75,10 +75,11 @@ def log_response(response): return response # 注册蓝图 - from .api import graph_bp, simulation_bp, report_bp + from .api import graph_bp, simulation_bp, report_bp, prediction_bp app.register_blueprint(graph_bp, url_prefix='/api/graph') app.register_blueprint(simulation_bp, url_prefix='/api/simulation') app.register_blueprint(report_bp, url_prefix='/api/report') + app.register_blueprint(prediction_bp, url_prefix='/api/prediction') # 健康检查 @app.route('/health') diff --git a/backend/app/api/__init__.py b/backend/app/api/__init__.py index ffda743..de57787 100644 --- a/backend/app/api/__init__.py +++ b/backend/app/api/__init__.py @@ -7,8 +7,10 @@ graph_bp = Blueprint('graph', __name__) simulation_bp = Blueprint('simulation', __name__) report_bp = Blueprint('report', __name__) +prediction_bp = Blueprint('prediction', __name__) from . import graph # noqa: E402, F401 from . import simulation # noqa: E402, F401 from . import report # noqa: E402, F401 +from . import prediction # noqa: E402, F401 diff --git a/backend/app/api/prediction.py b/backend/app/api/prediction.py new file mode 100644 index 0000000..8b0c544 --- /dev/null +++ b/backend/app/api/prediction.py @@ -0,0 +1,238 @@ +""" +Prediction Market API routes +""" + +import traceback +import threading +from flask import request, jsonify, current_app + +from . import prediction_bp +from ..config import Config +from ..models.prediction import PredictionMarket, PredictionRunManager, PredictionRunStatus +from ..services.polymarket_client import PolymarketClient +from ..services.prediction_manager import PredictionManager +from ..models.task import TaskManager, TaskStatus +from ..utils.logger import get_logger + +logger = get_logger('mirofish.api.prediction') + + +# ============== Market Browsing ============== + +@prediction_bp.route('/markets', methods=['GET']) +def get_markets(): + """ + Fetch active markets from Polymarket. + + Query params: + min_volume: Minimum volume filter (default 10000) + limit: Max results (default 50) + search: Search query (optional) + """ + try: + min_volume = request.args.get('min_volume', 10000, type=float) + limit = request.args.get('limit', 50, type=int) + search = request.args.get('search', None) + + client = PolymarketClient() + markets = client.fetch_active_markets( + min_volume=min_volume, + limit=limit, + search=search, + ) + + return jsonify({ + "success": True, + "data": [m.to_dict() for m in markets], + "count": len(markets), + }) + + except Exception as e: + logger.error(f"Failed to fetch markets: {e}") + return jsonify({ + "success": False, + "error": str(e), + }), 500 + + +# ============== Prediction Runs ============== + +@prediction_bp.route('/run', methods=['POST']) +def start_prediction_run(): + """ + Start a prediction run for a market. + + Request JSON: + { + "market": { ... PredictionMarket dict ... } + } + + Returns run_id + task_id for polling. + """ + try: + data = request.get_json() or {} + market_data = data.get('market') + + if not market_data: + return jsonify({"success": False, "error": "market data required"}), 400 + + market = PredictionMarket.from_dict(market_data) + + if not market.title: + return jsonify({"success": False, "error": "market must have a title"}), 400 + + # Create run + run = PredictionRunManager.create_run() + + # Create async task + task_manager = TaskManager() + task_id = task_manager.create_task( + task_type="prediction_run", + metadata={"run_id": run.run_id, "market_title": market.title}, + ) + + # Get storage from app context + storage = current_app.extensions.get('neo4j_storage') + + def run_pipeline(): + try: + task_manager.update_task( + task_id, + status=TaskStatus.PROCESSING, + progress=0, + message="Starting prediction pipeline...", + ) + + def progress_callback(stage, message): + # Map stages to progress percentages + stage_progress = { + "fetching_market": 5, + "generating_scenario": 15, + "creating_project": 20, + "building_graph": 35, + "preparing_simulation": 50, + "running_simulation": 70, + "analyzing": 90, + "completed": 100, + } + progress = stage_progress.get(stage, 50) + task_manager.update_task( + task_id, + progress=progress, + message=message, + ) + + manager = PredictionManager(storage=storage) + result = manager.run_prediction( + market=market, + run=run, + progress_callback=progress_callback, + ) + + if result.status == PredictionRunStatus.COMPLETED: + task_manager.complete_task(task_id, result={ + "run_id": result.run_id, + "status": "completed", + "signal": result.signal, + }) + else: + task_manager.fail_task(task_id, result.error or "Pipeline failed") + + except Exception as e: + logger.error(f"Prediction pipeline failed: {e}", exc_info=True) + task_manager.fail_task(task_id, str(e)) + + thread = threading.Thread(target=run_pipeline, daemon=True) + thread.start() + + return jsonify({ + "success": True, + "data": { + "run_id": run.run_id, + "task_id": task_id, + "status": "started", + "message": "Prediction pipeline started", + }, + }) + + except Exception as e: + logger.error(f"Failed to start prediction run: {e}") + return jsonify({ + "success": False, + "error": str(e), + "traceback": traceback.format_exc(), + }), 500 + + +@prediction_bp.route('/run//status', methods=['GET']) +def get_run_status(run_id: str): + """Get prediction run status""" + try: + run = PredictionRunManager.get_run(run_id) + if not run: + return jsonify({"success": False, "error": f"Run not found: {run_id}"}), 404 + + return jsonify({ + "success": True, + "data": { + "run_id": run.run_id, + "status": run.status.value, + "progress_message": run.progress_message, + "error": run.error, + }, + }) + + except Exception as e: + logger.error(f"Failed to get run status: {e}") + return jsonify({"success": False, "error": str(e)}), 500 + + +@prediction_bp.route('/run/', methods=['GET']) +def get_run(run_id: str): + """Get full prediction run details""" + try: + run = PredictionRunManager.get_run(run_id) + if not run: + return jsonify({"success": False, "error": f"Run not found: {run_id}"}), 404 + + return jsonify({ + "success": True, + "data": run.to_dict(), + }) + + except Exception as e: + logger.error(f"Failed to get run: {e}") + return jsonify({"success": False, "error": str(e)}), 500 + + +@prediction_bp.route('/runs', methods=['GET']) +def list_runs(): + """List all prediction runs""" + try: + limit = request.args.get('limit', 50, type=int) + runs = PredictionRunManager.list_runs(limit=limit) + + return jsonify({ + "success": True, + "data": [r.to_dict() for r in runs], + "count": len(runs), + }) + + except Exception as e: + logger.error(f"Failed to list runs: {e}") + return jsonify({"success": False, "error": str(e)}), 500 + + +@prediction_bp.route('/run/', methods=['DELETE']) +def delete_run(run_id: str): + """Delete a prediction run""" + try: + success = PredictionRunManager.delete_run(run_id) + if not success: + return jsonify({"success": False, "error": f"Run not found: {run_id}"}), 404 + + return jsonify({"success": True, "message": f"Run deleted: {run_id}"}) + + except Exception as e: + logger.error(f"Failed to delete run: {e}") + return jsonify({"success": False, "error": str(e)}), 500 diff --git a/backend/app/config.py b/backend/app/config.py index 6b8eb75..b2e2a3f 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -69,6 +69,14 @@ class Config: REPORT_AGENT_MAX_REFLECTION_ROUNDS = int(os.environ.get('REPORT_AGENT_MAX_REFLECTION_ROUNDS', '2')) REPORT_AGENT_TEMPERATURE = float(os.environ.get('REPORT_AGENT_TEMPERATURE', '0.5')) + # Prediction Market配置 + POLYMARKET_GAMMA_URL = os.environ.get('POLYMARKET_GAMMA_URL', 'https://gamma-api.polymarket.com') + PREDICTION_DEFAULT_AGENTS = int(os.environ.get('PREDICTION_DEFAULT_AGENTS', '50')) + PREDICTION_DEFAULT_ROUNDS = int(os.environ.get('PREDICTION_DEFAULT_ROUNDS', '5')) + PREDICTION_SIGNAL_THRESHOLD = float(os.environ.get('PREDICTION_SIGNAL_THRESHOLD', '0.10')) + PREDICTION_TRADE_ENABLED = os.environ.get('PREDICTION_TRADE_ENABLED', 'false').lower() == 'true' + PREDICTION_DATA_DIR = os.path.join(os.path.dirname(__file__), '../uploads/predictions') + @classmethod def validate(cls): """验证必要配置""" diff --git a/backend/app/models/prediction.py b/backend/app/models/prediction.py new file mode 100644 index 0000000..284fef8 --- /dev/null +++ b/backend/app/models/prediction.py @@ -0,0 +1,270 @@ +""" +Prediction Market data models and persistence +""" + +import os +import json +import uuid +from datetime import datetime +from typing import Dict, Any, List, Optional +from enum import Enum +from dataclasses import dataclass, field + +from ..config import Config + + +class PredictionRunStatus(str, Enum): + FETCHING_MARKET = "fetching_market" + GENERATING_SCENARIO = "generating_scenario" + CREATING_PROJECT = "creating_project" + BUILDING_GRAPH = "building_graph" + PREPARING_SIMULATION = "preparing_simulation" + RUNNING_SIMULATION = "running_simulation" + ANALYZING = "analyzing" + COMPLETED = "completed" + FAILED = "failed" + + +@dataclass +class PredictionMarket: + """Polymarket market data""" + condition_id: str + title: str + slug: str + description: str + outcomes: List[str] + prices: List[float] + volume: float + liquidity: float + end_date: str + active: bool = True + + def to_dict(self) -> Dict[str, Any]: + return { + "condition_id": self.condition_id, + "title": self.title, + "slug": self.slug, + "description": self.description, + "outcomes": self.outcomes, + "prices": self.prices, + "volume": self.volume, + "liquidity": self.liquidity, + "end_date": self.end_date, + "active": self.active, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'PredictionMarket': + return cls( + condition_id=data.get('condition_id', ''), + title=data.get('title', ''), + slug=data.get('slug', ''), + description=data.get('description', ''), + outcomes=data.get('outcomes', []), + prices=data.get('prices', []), + volume=data.get('volume', 0), + liquidity=data.get('liquidity', 0), + end_date=data.get('end_date', ''), + active=data.get('active', True), + ) + + +@dataclass +class TradingSignal: + """Trading signal from prediction analysis""" + direction: str # BUY_YES, BUY_NO, HOLD + edge: float # simulated_prob - market_prob (signed) + confidence: float # 0-1 + reasoning: str + simulated_probability: float + market_probability: float + + def to_dict(self) -> Dict[str, Any]: + return { + "direction": self.direction, + "edge": round(self.edge, 4), + "confidence": round(self.confidence, 4), + "reasoning": self.reasoning, + "simulated_probability": round(self.simulated_probability, 4), + "market_probability": round(self.market_probability, 4), + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'TradingSignal': + return cls( + direction=data['direction'], + edge=data['edge'], + confidence=data['confidence'], + reasoning=data['reasoning'], + simulated_probability=data['simulated_probability'], + market_probability=data['market_probability'], + ) + + +@dataclass +class SentimentResult: + """Result from sentiment analysis of simulation""" + simulated_probability: float + confidence: float + stance_counts: Dict[str, int] # {for: N, against: N, neutral: N} + key_arguments_for: List[str] + key_arguments_against: List[str] + total_posts_analyzed: int + + def to_dict(self) -> Dict[str, Any]: + return { + "simulated_probability": round(self.simulated_probability, 4), + "confidence": round(self.confidence, 4), + "stance_counts": self.stance_counts, + "key_arguments_for": self.key_arguments_for, + "key_arguments_against": self.key_arguments_against, + "total_posts_analyzed": self.total_posts_analyzed, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'SentimentResult': + return cls( + simulated_probability=data['simulated_probability'], + confidence=data['confidence'], + stance_counts=data['stance_counts'], + key_arguments_for=data['key_arguments_for'], + key_arguments_against=data['key_arguments_against'], + total_posts_analyzed=data['total_posts_analyzed'], + ) + + +@dataclass +class PredictionRun: + """Full prediction run state""" + run_id: str + status: PredictionRunStatus + created_at: str + updated_at: str + + # Market info + market: Optional[Dict[str, Any]] = None + + # Pipeline IDs + project_id: Optional[str] = None + graph_id: Optional[str] = None + simulation_id: Optional[str] = None + + # Scenario + scenario: Optional[Dict[str, Any]] = None + + # Results + sentiment: Optional[Dict[str, Any]] = None + signal: Optional[Dict[str, Any]] = None + + # Error + error: Optional[str] = None + progress_message: str = "" + + def to_dict(self) -> Dict[str, Any]: + return { + "run_id": self.run_id, + "status": self.status.value if isinstance(self.status, PredictionRunStatus) else self.status, + "created_at": self.created_at, + "updated_at": self.updated_at, + "market": self.market, + "project_id": self.project_id, + "graph_id": self.graph_id, + "simulation_id": self.simulation_id, + "scenario": self.scenario, + "sentiment": self.sentiment, + "signal": self.signal, + "error": self.error, + "progress_message": self.progress_message, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'PredictionRun': + status = data.get('status', 'fetching_market') + if isinstance(status, str): + status = PredictionRunStatus(status) + return cls( + run_id=data['run_id'], + status=status, + created_at=data.get('created_at', ''), + updated_at=data.get('updated_at', ''), + market=data.get('market'), + project_id=data.get('project_id'), + graph_id=data.get('graph_id'), + simulation_id=data.get('simulation_id'), + scenario=data.get('scenario'), + sentiment=data.get('sentiment'), + signal=data.get('signal'), + error=data.get('error'), + progress_message=data.get('progress_message', ''), + ) + + +class PredictionRunManager: + """Manages prediction run persistence — follows ProjectManager pattern""" + + PREDICTIONS_DIR = Config.PREDICTION_DATA_DIR + + @classmethod + def _ensure_dir(cls): + os.makedirs(cls.PREDICTIONS_DIR, exist_ok=True) + + @classmethod + def _get_run_dir(cls, run_id: str) -> str: + return os.path.join(cls.PREDICTIONS_DIR, run_id) + + @classmethod + def _get_run_path(cls, run_id: str) -> str: + return os.path.join(cls._get_run_dir(run_id), 'run.json') + + @classmethod + def create_run(cls) -> PredictionRun: + cls._ensure_dir() + run_id = f"pred_{uuid.uuid4().hex[:12]}" + now = datetime.now().isoformat() + run = PredictionRun( + run_id=run_id, + status=PredictionRunStatus.FETCHING_MARKET, + created_at=now, + updated_at=now, + ) + run_dir = cls._get_run_dir(run_id) + os.makedirs(run_dir, exist_ok=True) + cls.save_run(run) + return run + + @classmethod + def save_run(cls, run: PredictionRun) -> None: + run.updated_at = datetime.now().isoformat() + run_path = cls._get_run_path(run.run_id) + os.makedirs(os.path.dirname(run_path), exist_ok=True) + with open(run_path, 'w', encoding='utf-8') as f: + json.dump(run.to_dict(), f, ensure_ascii=False, indent=2) + + @classmethod + def get_run(cls, run_id: str) -> Optional[PredictionRun]: + run_path = cls._get_run_path(run_id) + if not os.path.exists(run_path): + return None + with open(run_path, 'r', encoding='utf-8') as f: + data = json.load(f) + return PredictionRun.from_dict(data) + + @classmethod + def list_runs(cls, limit: int = 50) -> List[PredictionRun]: + cls._ensure_dir() + runs = [] + for name in os.listdir(cls.PREDICTIONS_DIR): + run = cls.get_run(name) + if run: + runs.append(run) + runs.sort(key=lambda r: r.created_at, reverse=True) + return runs[:limit] + + @classmethod + def delete_run(cls, run_id: str) -> bool: + import shutil + run_dir = cls._get_run_dir(run_id) + if not os.path.exists(run_dir): + return False + shutil.rmtree(run_dir) + return True diff --git a/backend/app/services/polymarket_client.py b/backend/app/services/polymarket_client.py new file mode 100644 index 0000000..61593ba --- /dev/null +++ b/backend/app/services/polymarket_client.py @@ -0,0 +1,122 @@ +""" +Polymarket client — fetches active markets from the Gamma API +""" + +import requests +from typing import List, Optional, Dict, Any + +from ..config import Config +from ..models.prediction import PredictionMarket +from ..utils.logger import get_logger + +logger = get_logger('mirofish.polymarket') + + +class PolymarketClient: + """Fetches prediction market data from Polymarket's Gamma API""" + + def __init__(self, base_url: Optional[str] = None): + self.base_url = base_url or Config.POLYMARKET_GAMMA_URL + + def fetch_active_markets( + self, + min_volume: float = 10000, + limit: int = 50, + search: Optional[str] = None, + ) -> List[PredictionMarket]: + """ + Fetch active binary markets from Polymarket. + + Args: + min_volume: Minimum trading volume filter + limit: Max markets to return + search: Optional search query + + Returns: + List of PredictionMarket objects + """ + try: + params: Dict[str, Any] = { + "limit": min(limit, 100), + "active": True, + "closed": False, + "order": "volume", + "ascending": False, + } + + url = f"{self.base_url}/markets" + logger.info(f"Fetching markets from {url}") + + resp = requests.get(url, params=params, timeout=30) + resp.raise_for_status() + raw_markets = resp.json() + + if not isinstance(raw_markets, list): + logger.warning(f"Unexpected response format: {type(raw_markets)}") + return [] + + markets = [] + for item in raw_markets: + market = self._parse_market(item) + if market is None: + continue + if market.volume < min_volume: + continue + if search and search.lower() not in market.title.lower(): + continue + markets.append(market) + if len(markets) >= limit: + break + + logger.info(f"Fetched {len(markets)} markets (filtered from {len(raw_markets)})") + return markets + + except requests.RequestException as e: + logger.error(f"Failed to fetch markets: {e}") + raise + + def get_market(self, condition_id: str) -> Optional[PredictionMarket]: + """Fetch a single market by condition_id""" + try: + url = f"{self.base_url}/markets/{condition_id}" + resp = requests.get(url, timeout=30) + resp.raise_for_status() + data = resp.json() + return self._parse_market(data) + except requests.RequestException as e: + logger.error(f"Failed to fetch market {condition_id}: {e}") + return None + + def _parse_market(self, data: Dict[str, Any]) -> Optional[PredictionMarket]: + """Parse raw Gamma API response into PredictionMarket""" + try: + # Gamma API returns tokens with prices for each outcome + tokens = data.get('tokens', []) + outcomes = [] + prices = [] + + if tokens: + for token in tokens: + outcomes.append(token.get('outcome', 'Unknown')) + prices.append(float(token.get('price', 0))) + else: + # Fallback: try outcomes/outcomePrices fields + outcomes = data.get('outcomes', ['Yes', 'No']) + raw_prices = data.get('outcomePrices', ['0.5', '0.5']) + prices = [float(p) for p in raw_prices] if raw_prices else [0.5, 0.5] + + return PredictionMarket( + condition_id=data.get('conditionId', data.get('condition_id', '')), + title=data.get('question', data.get('title', 'Unknown')), + slug=data.get('slug', ''), + description=data.get('description', ''), + outcomes=outcomes, + prices=prices, + volume=float(data.get('volume', 0) or 0), + liquidity=float(data.get('liquidity', 0) or 0), + end_date=data.get('endDate', data.get('end_date', '')), + active=data.get('active', True), + ) + except (KeyError, ValueError, TypeError) as e: + logger.warning(f"Failed to parse market: {e}") + return None diff --git a/backend/app/services/prediction_manager.py b/backend/app/services/prediction_manager.py new file mode 100644 index 0000000..b20fca7 --- /dev/null +++ b/backend/app/services/prediction_manager.py @@ -0,0 +1,278 @@ +""" +Prediction Manager — orchestrates the full prediction pipeline: +market → scenario → project → graph → simulation → analysis → signal +""" + +import time +from typing import Optional, Callable + +from flask import current_app + +from ..config import Config +from ..models.prediction import ( + PredictionMarket, PredictionRun, PredictionRunStatus, + PredictionRunManager, TradingSignal, SentimentResult, +) +from ..models.project import ProjectManager +from ..services.polymarket_client import PolymarketClient +from ..services.scenario_generator import ScenarioGenerator +from ..services.sentiment_analyzer import SentimentAnalyzer +from ..services.ontology_generator import OntologyGenerator +from ..services.graph_builder import GraphBuilderService +from ..services.simulation_manager import SimulationManager, SimulationStatus +from ..services.simulation_runner import SimulationRunner, RunnerStatus +from ..models.task import TaskManager, TaskStatus +from ..utils.llm_client import LLMClient +from ..utils.logger import get_logger + +logger = get_logger('mirofish.prediction_manager') + + +class PredictionManager: + """Orchestrates the full prediction pipeline""" + + def __init__(self, storage=None): + """ + Args: + storage: Neo4jStorage instance (from app.extensions) + """ + self.storage = storage + self.llm_client = LLMClient() + self.polymarket = PolymarketClient() + self.scenario_gen = ScenarioGenerator(self.llm_client) + self.sentiment_analyzer = SentimentAnalyzer(self.llm_client) + self.ontology_gen = OntologyGenerator(self.llm_client) + self.sim_manager = SimulationManager() + + def run_prediction( + self, + market: PredictionMarket, + run: PredictionRun, + progress_callback: Optional[Callable] = None, + ) -> PredictionRun: + """ + Execute the full prediction pipeline. + + This runs synchronously (called from a background thread). + + Args: + market: The market to predict + run: PredictionRun to update with progress + progress_callback: Optional (stage, progress, message) callback + """ + try: + run.market = market.to_dict() + self._update(run, PredictionRunStatus.FETCHING_MARKET, "Market data loaded", progress_callback) + + # Step 1: Generate scenario + self._update(run, PredictionRunStatus.GENERATING_SCENARIO, "Generating simulation scenario...", progress_callback) + scenario = self.scenario_gen.generate_scenario(market) + run.scenario = scenario.to_dict() + PredictionRunManager.save_run(run) + + # Step 2: Create project with synthetic document + self._update(run, PredictionRunStatus.CREATING_PROJECT, "Creating project...", progress_callback) + project = ProjectManager.create_project(name=f"Prediction: {market.title[:80]}") + run.project_id = project.project_id + + # Save context document as extracted text + ProjectManager.save_extracted_text(project.project_id, scenario.context_document) + project.total_text_length = len(scenario.context_document) + project.simulation_requirement = scenario.simulation_requirement + ProjectManager.save_project(project) + PredictionRunManager.save_run(run) + + # Step 3: Generate ontology + self._update(run, PredictionRunStatus.BUILDING_GRAPH, "Generating ontology...", progress_callback) + ontology = self.ontology_gen.generate( + document_texts=[scenario.context_document], + simulation_requirement=scenario.simulation_requirement, + ) + project.ontology = ontology + project.analysis_summary = ontology.get('analysis_summary', '') + ProjectManager.save_project(project) + + # Step 4: Build graph (synchronous — wait for completion) + self._update(run, PredictionRunStatus.BUILDING_GRAPH, "Building knowledge graph...", progress_callback) + graph_builder = GraphBuilderService(self.storage) + task_id = graph_builder.build_graph_async( + text=scenario.context_document, + ontology=ontology, + graph_name=f"pred_{run.run_id}", + chunk_size=Config.DEFAULT_CHUNK_SIZE, + chunk_overlap=Config.DEFAULT_CHUNK_OVERLAP, + ) + + # Poll for graph build completion + task_manager = TaskManager() + graph_id = self._wait_for_task(task_manager, task_id, "graph build", progress_callback, run) + + if not graph_id: + raise RuntimeError("Graph build failed or timed out") + + run.graph_id = graph_id + project.graph_id = graph_id + ProjectManager.save_project(project) + PredictionRunManager.save_run(run) + + # Step 5: Create and prepare simulation + self._update(run, PredictionRunStatus.PREPARING_SIMULATION, "Preparing simulation...", progress_callback) + sim_state = self.sim_manager.create_simulation( + project_id=project.project_id, + graph_id=graph_id, + enable_twitter=False, # Reddit-only for richer discourse + enable_reddit=True, + ) + run.simulation_id = sim_state.simulation_id + PredictionRunManager.save_run(run) + + # Get entity types from ontology + entity_types = [et['name'] for et in ontology.get('entity_types', [])] + + self.sim_manager.prepare_simulation( + simulation_id=sim_state.simulation_id, + simulation_requirement=scenario.simulation_requirement, + document_text=scenario.context_document, + defined_entity_types=entity_types, + use_llm_for_profiles=True, + parallel_profile_count=3, + storage=self.storage, + ) + + # Step 6: Run simulation + self._update(run, PredictionRunStatus.RUNNING_SIMULATION, "Running simulation...", progress_callback) + max_rounds = Config.PREDICTION_DEFAULT_ROUNDS + SimulationRunner.start_simulation( + simulation_id=sim_state.simulation_id, + platform="reddit", + max_rounds=max_rounds, + enable_graph_memory_update=False, + ) + + # Poll for simulation completion + self._wait_for_simulation(sim_state.simulation_id, progress_callback, run) + + # Step 7: Analyze sentiment + self._update(run, PredictionRunStatus.ANALYZING, "Analyzing simulation output...", progress_callback) + sentiment = self.sentiment_analyzer.analyze( + simulation_id=sim_state.simulation_id, + market_question=market.title, + platform="reddit", + ) + run.sentiment = sentiment.to_dict() + PredictionRunManager.save_run(run) + + # Step 8: Generate trading signal + signal = self._generate_signal(market, sentiment) + run.signal = signal.to_dict() + + self._update(run, PredictionRunStatus.COMPLETED, "Prediction complete", progress_callback) + return run + + except Exception as e: + logger.error(f"Prediction pipeline failed: {e}", exc_info=True) + run.status = PredictionRunStatus.FAILED + run.error = str(e) + run.progress_message = f"Failed: {str(e)}" + PredictionRunManager.save_run(run) + return run + + def _update(self, run: PredictionRun, status: PredictionRunStatus, message: str, callback=None): + """Update run status and notify""" + run.status = status + run.progress_message = message + PredictionRunManager.save_run(run) + if callback: + callback(status.value, message) + logger.info(f"[{run.run_id}] {status.value}: {message}") + + def _wait_for_task(self, task_manager, task_id, task_name, callback, run, timeout=600): + """Poll TaskManager until task completes. Returns result graph_id or None.""" + start = time.time() + while time.time() - start < timeout: + task = task_manager.get_task(task_id) + if not task: + time.sleep(2) + continue + + if task.status == TaskStatus.COMPLETED: + result = task.result or {} + return result.get('graph_id') + + if task.status == TaskStatus.FAILED: + raise RuntimeError(f"{task_name} failed: {task.error}") + + # Update progress message + if task.message: + run.progress_message = f"Building graph: {task.message}" + PredictionRunManager.save_run(run) + + time.sleep(3) + + raise RuntimeError(f"{task_name} timed out after {timeout}s") + + def _wait_for_simulation(self, simulation_id, callback, run, timeout=3600): + """Poll simulation runner until it completes""" + start = time.time() + while time.time() - start < timeout: + run_state = SimulationRunner.get_run_state(simulation_id) + + if run_state is None: + time.sleep(3) + continue + + status = run_state.runner_status + + if status in (RunnerStatus.COMPLETED, RunnerStatus.STOPPED): + logger.info(f"Simulation {simulation_id} completed") + return + + if status == RunnerStatus.FAILED: + raise RuntimeError(f"Simulation failed: {run_state.error}") + + # Update progress + if run_state.current_round > 0: + msg = f"Simulation round {run_state.current_round}/{run_state.total_rounds}" + run.progress_message = msg + PredictionRunManager.save_run(run) + + time.sleep(5) + + raise RuntimeError(f"Simulation timed out after {timeout}s") + + def _generate_signal(self, market: PredictionMarket, sentiment: SentimentResult) -> TradingSignal: + """Compare simulated probability vs market price to generate trading signal""" + # Market YES price + market_prob = market.prices[0] if market.prices else 0.5 + sim_prob = sentiment.simulated_probability + + edge = sim_prob - market_prob + threshold = Config.PREDICTION_SIGNAL_THRESHOLD + + if edge > threshold: + direction = "BUY_YES" + reasoning = ( + f"Simulated probability ({sim_prob:.1%}) is {edge:.1%} higher than " + f"market price ({market_prob:.1%}). Agents lean toward YES." + ) + elif edge < -threshold: + direction = "BUY_NO" + reasoning = ( + f"Simulated probability ({sim_prob:.1%}) is {abs(edge):.1%} lower than " + f"market price ({market_prob:.1%}). Agents lean toward NO." + ) + else: + direction = "HOLD" + reasoning = ( + f"Simulated probability ({sim_prob:.1%}) is within threshold of " + f"market price ({market_prob:.1%}). No clear edge." + ) + + return TradingSignal( + direction=direction, + edge=edge, + confidence=sentiment.confidence, + reasoning=reasoning, + simulated_probability=sim_prob, + market_probability=market_prob, + ) diff --git a/backend/app/services/scenario_generator.py b/backend/app/services/scenario_generator.py new file mode 100644 index 0000000..8410f90 --- /dev/null +++ b/backend/app/services/scenario_generator.py @@ -0,0 +1,132 @@ +""" +Scenario Generator — converts a prediction market question into a simulation scenario +""" + +from typing import Optional, Dict, Any +from dataclasses import dataclass + +from ..models.prediction import PredictionMarket +from ..utils.llm_client import LLMClient +from ..utils.logger import get_logger + +logger = get_logger('mirofish.scenario_generator') + + +@dataclass +class ScenarioConfig: + """Generated simulation scenario from a market question""" + simulation_requirement: str + context_document: str + suggested_agent_count: int + stance_distribution: Dict[str, float] # {supportive: 0.4, opposing: 0.4, neutral: 0.2} + + def to_dict(self) -> Dict[str, Any]: + return { + "simulation_requirement": self.simulation_requirement, + "context_document": self.context_document, + "suggested_agent_count": self.suggested_agent_count, + "stance_distribution": self.stance_distribution, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'ScenarioConfig': + return cls( + simulation_requirement=data['simulation_requirement'], + context_document=data['context_document'], + suggested_agent_count=data.get('suggested_agent_count', 50), + stance_distribution=data.get('stance_distribution', { + "supportive": 0.4, "opposing": 0.4, "neutral": 0.2 + }), + ) + + +SCENARIO_SYSTEM_PROMPT = """You are a simulation scenario designer for prediction market analysis. + +Given a prediction market question, create a balanced multi-agent social simulation scenario. + +CRITICAL RULES: +1. The scenario must NOT bias toward YES or NO — it must be balanced +2. Agents should represent diverse viewpoints (supporters, opponents, and neutral observers) +3. The simulation requirement should frame the debate, not predetermine the outcome +4. The context document should provide factual background that both sides can use +5. Include relevant stakeholders, experts, and general public perspectives + +Output JSON with these fields: +{ + "simulation_requirement": "A clear description of what the simulation should model. Frame it as: 'Simulate a social media discussion about [topic] where diverse participants debate [the question]. Include experts, stakeholders, and general public with varying opinions.'", + "context_document": "A 500-1000 word factual background document covering: the current situation, key arguments for and against, relevant data points, stakeholder positions, and recent developments. This becomes the 'world' the agents inhabit.", + "suggested_agent_count": 50, + "stance_distribution": { + "supportive": 0.35, + "opposing": 0.35, + "neutral": 0.30 + } +}""" + + +class ScenarioGenerator: + """Converts a prediction market question into a simulation scenario""" + + def __init__(self, llm_client: Optional[LLMClient] = None): + self.llm_client = llm_client or LLMClient() + + def generate_scenario(self, market: PredictionMarket) -> ScenarioConfig: + """ + Generate a balanced simulation scenario from a market question. + + Args: + market: PredictionMarket with question and context + + Returns: + ScenarioConfig ready for the simulation pipeline + """ + user_message = self._build_prompt(market) + + messages = [ + {"role": "system", "content": SCENARIO_SYSTEM_PROMPT}, + {"role": "user", "content": user_message}, + ] + + logger.info(f"Generating scenario for market: {market.title}") + + result = self.llm_client.chat_json( + messages=messages, + temperature=0.4, + max_tokens=4096, + ) + + scenario = ScenarioConfig( + simulation_requirement=result.get('simulation_requirement', ''), + context_document=result.get('context_document', ''), + suggested_agent_count=result.get('suggested_agent_count', 50), + stance_distribution=result.get('stance_distribution', { + "supportive": 0.35, "opposing": 0.35, "neutral": 0.30 + }), + ) + + logger.info(f"Scenario generated: {len(scenario.context_document)} chars context") + return scenario + + def _build_prompt(self, market: PredictionMarket) -> str: + """Build the user prompt from market data""" + parts = [ + f"# Prediction Market Question", + f"**Question:** {market.title}", + f"**Outcomes:** {', '.join(market.outcomes)}", + f"**Current Prices:** {', '.join(f'{o}: ${p:.2f}' for o, p in zip(market.outcomes, market.prices))}", + f"**Trading Volume:** ${market.volume:,.0f}", + f"**End Date:** {market.end_date}", + ] + + if market.description: + # Truncate very long descriptions + desc = market.description[:3000] + parts.append(f"\n**Market Description:**\n{desc}") + + parts.append( + "\nCreate a balanced simulation scenario for this market. " + "The simulation should produce organic discourse that reveals " + "the collective intelligence of diverse agents debating this question." + ) + + return '\n'.join(parts) diff --git a/backend/app/services/sentiment_analyzer.py b/backend/app/services/sentiment_analyzer.py new file mode 100644 index 0000000..c9300e7 --- /dev/null +++ b/backend/app/services/sentiment_analyzer.py @@ -0,0 +1,253 @@ +""" +Sentiment Analyzer — parses simulation actions and classifies stance toward market question +""" + +import os +import json +from typing import List, Dict, Any, Optional + +from ..config import Config +from ..models.prediction import SentimentResult +from ..utils.llm_client import LLMClient +from ..utils.logger import get_logger + +logger = get_logger('mirofish.sentiment_analyzer') + +CLASSIFY_SYSTEM_PROMPT = """You are analyzing social media posts from a simulation about a prediction market question. + +For each post, classify the author's stance: +- "for": supports the YES outcome +- "against": supports the NO outcome +- "neutral": no clear position or purely informational + +Also rate confidence (0.0-1.0) in your classification. + +Return JSON array: +[ + {"post_index": 0, "stance": "for", "confidence": 0.8, "key_argument": "brief summary"}, + ... +] + +Be precise. Only classify as "for" or "against" if the post clearly takes a side.""" + + +class SentimentAnalyzer: + """Analyzes simulation output to estimate probability""" + + def __init__(self, llm_client: Optional[LLMClient] = None): + self.llm_client = llm_client or LLMClient() + + def analyze( + self, + simulation_id: str, + market_question: str, + platform: str = "reddit", + ) -> SentimentResult: + """ + Analyze simulation actions to compute simulated probability. + + Args: + simulation_id: ID of completed simulation + market_question: The original prediction market question + platform: Which platform's actions to analyze + + Returns: + SentimentResult with probability and breakdown + """ + # Load posts from actions.jsonl + posts = self._load_posts(simulation_id, platform) + + if not posts: + logger.warning(f"No posts found for simulation {simulation_id}") + return SentimentResult( + simulated_probability=0.5, + confidence=0.0, + stance_counts={"for": 0, "against": 0, "neutral": 0}, + key_arguments_for=[], + key_arguments_against=[], + total_posts_analyzed=0, + ) + + logger.info(f"Analyzing {len(posts)} posts for simulation {simulation_id}") + + # Batch-classify posts via LLM + all_classifications = [] + batch_size = 15 + + for i in range(0, len(posts), batch_size): + batch = posts[i:i + batch_size] + classifications = self._classify_batch(batch, market_question, start_index=i) + all_classifications.extend(classifications) + + # Compute weighted probability + return self._compute_result(all_classifications, len(posts)) + + def _load_posts(self, simulation_id: str, platform: str) -> List[Dict[str, Any]]: + """Load CREATE_POST and CREATE_COMMENT actions from actions.jsonl""" + actions_path = os.path.join( + Config.OASIS_SIMULATION_DATA_DIR, + simulation_id, + platform, + 'actions.jsonl' + ) + + if not os.path.exists(actions_path): + logger.warning(f"Actions file not found: {actions_path}") + return [] + + posts = [] + with open(actions_path, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if not line: + continue + try: + action = json.loads(line) + except json.JSONDecodeError: + continue + + # Skip event records + if 'event_type' in action: + continue + + action_type = action.get('action_type', '') + if action_type not in ('CREATE_POST', 'CREATE_COMMENT'): + continue + + content = '' + args = action.get('action_args', {}) + if isinstance(args, dict): + content = args.get('content', '') + elif isinstance(args, str): + content = args + + if not content or len(content) < 10: + continue + + posts.append({ + "agent_name": action.get('agent_name', 'Unknown'), + "action_type": action_type, + "content": content[:500], # Truncate long posts + "round": action.get('round', 0), + }) + + return posts + + def _classify_batch( + self, + posts: List[Dict[str, Any]], + market_question: str, + start_index: int = 0, + ) -> List[Dict[str, Any]]: + """Classify a batch of posts via LLM""" + posts_text = [] + for i, post in enumerate(posts): + posts_text.append( + f"[Post {start_index + i}] ({post['agent_name']}, {post['action_type']}):\n" + f"{post['content']}" + ) + + user_message = ( + f"# Prediction Market Question\n{market_question}\n\n" + f"# Posts to Classify\n" + "\n\n".join(posts_text) + ) + + messages = [ + {"role": "system", "content": CLASSIFY_SYSTEM_PROMPT}, + {"role": "user", "content": user_message}, + ] + + try: + result = self.llm_client.chat_json( + messages=messages, + temperature=0.2, + max_tokens=4096, + ) + + if isinstance(result, list): + return result + if isinstance(result, dict) and 'classifications' in result: + return result['classifications'] + return [] + + except Exception as e: + logger.error(f"Failed to classify batch: {e}") + return [] + + def _compute_result( + self, + classifications: List[Dict[str, Any]], + total_posts: int, + ) -> SentimentResult: + """Compute probability from classifications""" + stance_counts = {"for": 0, "against": 0, "neutral": 0} + weighted_for = 0.0 + weighted_against = 0.0 + weighted_total = 0.0 + args_for = [] + args_against = [] + + for c in classifications: + stance = c.get('stance', 'neutral') + confidence = float(c.get('confidence', 0.5)) + key_arg = c.get('key_argument', '') + + if stance in stance_counts: + stance_counts[stance] += 1 + else: + stance_counts['neutral'] += 1 + stance = 'neutral' + + if stance == 'for': + weighted_for += confidence + weighted_total += confidence + if key_arg: + args_for.append(key_arg) + elif stance == 'against': + weighted_against += confidence + weighted_total += confidence + if key_arg: + args_against.append(key_arg) + else: + weighted_total += confidence * 0.5 # Neutral contributes less + + # P(Yes) = weighted_for / weighted_total + if weighted_total > 0: + simulated_prob = weighted_for / (weighted_for + weighted_against) if (weighted_for + weighted_against) > 0 else 0.5 + else: + simulated_prob = 0.5 + + # Confidence based on sample size and agreement + total_classified = stance_counts['for'] + stance_counts['against'] + if total_classified > 0: + agreement = max(stance_counts['for'], stance_counts['against']) / total_classified + sample_factor = min(total_classified / 20, 1.0) # Full confidence at 20+ opinionated posts + result_confidence = agreement * sample_factor + else: + result_confidence = 0.0 + + # Deduplicate arguments (keep top 5) + seen_for = set() + unique_for = [] + for arg in args_for: + key = arg.lower()[:50] + if key not in seen_for: + seen_for.add(key) + unique_for.append(arg) + + seen_against = set() + unique_against = [] + for arg in args_against: + key = arg.lower()[:50] + if key not in seen_against: + seen_against.add(key) + unique_against.append(arg) + + return SentimentResult( + simulated_probability=simulated_prob, + confidence=result_confidence, + stance_counts=stance_counts, + key_arguments_for=unique_for[:5], + key_arguments_against=unique_against[:5], + total_posts_analyzed=total_posts, + ) diff --git a/frontend/src/api/prediction.js b/frontend/src/api/prediction.js new file mode 100644 index 0000000..b7e5588 --- /dev/null +++ b/frontend/src/api/prediction.js @@ -0,0 +1,35 @@ +import service, { requestWithRetry } from './index' + +// Fetch active markets from Polymarket +export const fetchMarkets = (params = {}) => { + return service.get('/api/prediction/markets', { params }) +} + +// Start a prediction run +export const startPredictionRun = (market) => { + return requestWithRetry( + () => service.post('/api/prediction/run', { market }), + 3, + 1000 + ) +} + +// Get prediction run status +export const getRunStatus = (runId) => { + return service.get(`/api/prediction/run/${runId}/status`) +} + +// Get full prediction run details +export const getRun = (runId) => { + return service.get(`/api/prediction/run/${runId}`) +} + +// List all prediction runs +export const listRuns = (limit = 50) => { + return service.get('/api/prediction/runs', { params: { limit } }) +} + +// Delete a prediction run +export const deleteRun = (runId) => { + return service.delete(`/api/prediction/run/${runId}`) +} diff --git a/frontend/src/router/index.js b/frontend/src/router/index.js index 62d2320..71d9509 100644 --- a/frontend/src/router/index.js +++ b/frontend/src/router/index.js @@ -5,6 +5,7 @@ import SimulationView from '../views/SimulationView.vue' import SimulationRunView from '../views/SimulationRunView.vue' import ReportView from '../views/ReportView.vue' import InteractionView from '../views/InteractionView.vue' +import PredictionView from '../views/PredictionView.vue' const routes = [ { @@ -12,6 +13,11 @@ const routes = [ name: 'Home', component: Home }, + { + path: '/prediction', + name: 'Prediction', + component: PredictionView + }, { path: '/process/:projectId', name: 'Process', diff --git a/frontend/src/views/Home.vue b/frontend/src/views/Home.vue index 36bb714..acc6f62 100644 --- a/frontend/src/views/Home.vue +++ b/frontend/src/views/Home.vue @@ -4,6 +4,7 @@