Skip to content
Open
344 changes: 344 additions & 0 deletions agents/connectors/cogdx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,344 @@
"""
CogDx Connector
Cognitive Diagnostics for Prediction Market Agents

Optional reasoning verification before trade execution.
Detects logical fallacies, calibration issues, and cognitive biases.

API: https://api.cerebratech.ai
"""

import os
import requests
from typing import Dict, Any, Optional, List

class CogDxClient:
"""
Client for Cerebratech's Cognitive Diagnostics API.

Verifies agent reasoning quality before high-stakes decisions.
Detects logical fallacies, calibration issues, and cognitive biases.
"""

BASE_URL = "https://api.cerebratech.ai"

def __init__(self, coupon: Optional[str] = None, wallet: Optional[str] = None):
"""
Initialize CogDx client.

Args:
coupon: Optional coupon code for credits
wallet: Ethereum wallet address for credit-based payments
"""
self.coupon = coupon or os.getenv("COGDX_COUPON")
self.wallet = wallet or os.getenv("COGDX_WALLET")

def _headers(self) -> Dict[str, str]:
headers = {"Content-Type": "application/json"}
if self.coupon:
headers["X-COUPON"] = self.coupon
if self.wallet:
headers["X-WALLET"] = self.wallet
return headers

def analyze_reasoning(self, reasoning_trace: str) -> Dict[str, Any]:
"""
Analyze a reasoning trace for logical fallacies and validity issues.

Args:
reasoning_trace: The agent's reasoning text to analyze

Returns:
dict with:
- logical_validity: float 0-1
- status: 'valid' | 'flawed'
- flaws_detected: list of detected fallacies
- recommendations: suggested improvements
"""
try:
response = requests.post(
f"{self.BASE_URL}/reasoning_trace_analysis",
headers=self._headers(),
json={"trace": reasoning_trace},
timeout=30
)

if response.status_code == 402:
return {
"error": "payment_required",
"message": "Add COGDX_COUPON or COGDX_WALLET to env",
"logical_validity": None
}

# Handle other HTTP errors (500, 403, 429, etc.)
if not response.ok:
return {
"error": f"http_{response.status_code}",
"message": f"API returned status {response.status_code}",
"logical_validity": None
}

return response.json()

except Exception as e:
return {"error": str(e), "logical_validity": None}

def calibration_audit(
self,
agent_id: str,
predictions: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Audit prediction calibration - do confidence levels match accuracy?

Args:
agent_id: Identifier for the agent
predictions: List of {prompt, response, confidence} dicts

Returns:
dict with:
- calibration_score: float 0-1 (1 = perfectly calibrated)
- overconfidence_rate: float
- underconfidence_rate: float
- recommendations: list of strings
"""
try:
response = requests.post(
f"{self.BASE_URL}/calibration_audit",
headers=self._headers(),
json={
"agent_id": agent_id,
"sample_outputs": predictions
},
timeout=30
)

if response.status_code == 402:
return {"error": "payment_required", "calibration_score": None}
if not response.ok:
return {"error": f"http_{response.status_code}", "calibration_score": None}

return response.json()
except Exception as e:
return {"error": str(e), "calibration_score": None}

def bias_scan(
self,
agent_id: str,
outputs: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Scan for cognitive biases in agent outputs.

Detects: anchoring, confirmation bias, availability heuristic,
representativeness, sunk cost, and more.

Args:
agent_id: Identifier for the agent
outputs: List of {prompt, response, confidence} dicts

Returns:
dict with:
- biases_detected: list of bias findings
- severity: 'low' | 'medium' | 'high'
- recommendations: list of strings
"""
try:
response = requests.post(
f"{self.BASE_URL}/bias_scan",
headers=self._headers(),
json={
"agent_id": agent_id,
"sample_outputs": outputs
},
timeout=30
)

if response.status_code == 402:
return {"error": "payment_required", "biases_detected": None}
if not response.ok:
return {"error": f"http_{response.status_code}", "biases_detected": None}

return response.json()
except Exception as e:
return {"error": str(e), "biases_detected": None}

def verify_before_trade(
self,
reasoning: str,
min_validity: float = 0.7
) -> Dict[str, Any]:
"""
Pre-trade verification gate.

Use this before executing trades to catch reasoning flaws.

Args:
reasoning: The reasoning trace that led to the trade decision
min_validity: Minimum logical validity score to pass (default 0.7)

Returns:
dict with:
- approved: bool
- validity_score: float
- issues: list of detected problems
- recommendation: 'proceed' | 'review' | 'reject' | 'skip' (on error)
"""
result = self.analyze_reasoning(reasoning)

if result.get("error"):
# On error, fail closed (don't approve unverified trades)
return {
"approved": False,
"validity_score": None,
"issues": [f"CogDx unavailable: {result.get('error')}"],
"recommendation": "skip"
}

# Handle null values explicitly (dict.get returns None for null, not default)
validity = result.get("logical_validity")
if validity is None:
validity = 0
flaws = result.get("flaws_detected") or []

approved = validity >= min_validity and len(flaws) == 0

# Use approved variable to avoid duplicated logic
if approved:
recommendation = "proceed"
elif validity >= 0.5:
recommendation = "review"
else:
recommendation = "reject"

# Handle flaws as either dicts or strings
issues = []
for f in flaws:
if isinstance(f, dict):
issues.append(f.get("name", str(f)))
else:
issues.append(str(f))

return {
"approved": approved,
"validity_score": validity,
"issues": issues,
"recommendation": recommendation
}

def submit_feedback(
self,
endpoint: str,
agent_id: str,
accurate: bool,
confidence: Optional[float] = None,
severity: Optional[int] = None,
accuracy_score: Optional[float] = None,
outcome: Optional[str] = None,
reasoning: Optional[str] = None,
diagnosis_id: Optional[str] = None,
wallet: Optional[str] = None
) -> Dict[str, Any]:
"""
Submit feedback on a diagnosis to improve detection and earn credits.

Feedback builds shared reality across agents. Network effects improve
consensus and detection accuracy for everyone.

Binary core (required):
- endpoint: Which endpoint was diagnosed
- agent_id: Your agent identifier
- accurate: Was the detection correct? (True/False)

Numerical enrichment (optional - increases signal + credits):
- confidence: 0.0-1.0 - How sure are you about this feedback?
- severity: 1-5 - If bias was real, how impactful?
- accuracy_score: 0.0-1.0 - Partial credit (mostly right, slightly off)

Structured context (optional):
- outcome: "win" | "loss" | "neutral" | "unknown"
- reasoning: Why was detection right/wrong?
- diagnosis_id: ID from original diagnosis
- wallet: Earn credits to this wallet (0x...)

Returns:
dict with:
- received: bool
- feedback_id: str
- signal_strength: float (1.0-2.0x learning value)
- credits: {awarded, new_balance, wallet} if wallet provided
- network_contribution: impact on shared reality
"""
try:
payload = {
"endpoint": endpoint,
"agent_id": agent_id,
"accurate": accurate,
}

# Optional numerical enrichment
if confidence is not None:
payload["confidence"] = confidence
if severity is not None:
payload["severity"] = severity
if accuracy_score is not None:
payload["accuracy_score"] = accuracy_score

# Optional structured context
if outcome:
payload["outcome"] = outcome
if reasoning:
payload["reasoning"] = reasoning
if diagnosis_id:
payload["diagnosis_id"] = diagnosis_id
if wallet:
payload["wallet"] = wallet
elif self.wallet:
payload["wallet"] = self.wallet

response = requests.post(
f"{self.BASE_URL}/feedback",
headers=self._headers(),
json=payload,
timeout=30
)

if response.status_code == 402:
return {"error": "payment_required", "received": False}
if not response.ok:
return {"error": f"http_{response.status_code}", "received": False}

return response.json()

except Exception as e:
return {"error": str(e), "received": False}


def verify_trade_reasoning(
reasoning: str,
coupon: str = None,
wallet: str = None
) -> bool:
"""
Convenience function for quick trade verification.

Usage:
from agents.connectors.cogdx import verify_trade_reasoning

if verify_trade_reasoning(my_reasoning):
execute_trade()
else:
print("Reasoning flagged for review")

Args:
reasoning: The reasoning trace to verify
coupon: Optional coupon code for credits
wallet: Optional wallet address for credits

Returns:
True if reasoning passes verification, False otherwise.
Note: Returns False if API is unavailable (fails closed).
"""
client = CogDxClient(coupon=coupon, wallet=wallet)
result = client.verify_before_trade(reasoning)
return result.get("approved", False)
Loading