diff --git a/self_improve/runner.py b/self_improve/runner.py index ae458716..59b873cf 100644 --- a/self_improve/runner.py +++ b/self_improve/runner.py @@ -152,7 +152,7 @@ def _run_deduplicate(plan_dir: Path, plan_output_dir: Path, llm_executor: LLMExe name=plan_name, status="ok", duration_seconds=0, # filled by caller - calls_succeeded=len(result.response), + calls_succeeded=1, # single batch call ) diff --git a/worker_plan/worker_plan_internal/lever/deduplicate_levers.py b/worker_plan/worker_plan_internal/lever/deduplicate_levers.py index 614c86e9..3db89921 100644 --- a/worker_plan/worker_plan_internal/lever/deduplicate_levers.py +++ b/worker_plan/worker_plan_internal/lever/deduplicate_levers.py @@ -1,11 +1,19 @@ """ The identify_potential_levers.py script creates a list of levers, some of which are duplicates. -This script deduplicates the list. +This script deduplicates the list using a single-call Likert scoring approach. + +Each lever is scored on a 5-point scale: + 2 = primary (essential strategic decision) + 1 = secondary (useful but supporting) + 0 = borderline + -1 = overlapping (absorbed by another lever) + -2 = irrelevant (fully redundant) + +Levers scoring >= 1 are kept; levers scoring <= 0 are removed. PROMPT> python -m worker_plan_internal.lever.deduplicate_levers """ -from enum import Enum import json import logging import os @@ -19,23 +27,80 @@ logger = logging.getLogger(__name__) -class LeverClassification(str, Enum): - keep = "keep" - absorb = "absorb" - remove = "remove" +OPTIMIZE_INSTRUCTIONS = """\ +Goal: consolidate a brainstormed list of levers into a deduplicated, +prioritized set by scoring each lever on a Likert scale (-2 to +2). +The surviving levers (score >= 1) should be distinct, grounded, and +actionable — ready for enrichment and scenario generation downstream. + +Pipeline context +---------------- +This step (DeduplicateLevers) is part of a 6-step solution-space +exploration pipeline inside run_plan_pipeline.py: + + 1. IdentifyPotentialLevers — brainstorms 15-20 raw levers + 2. DeduplicateLevers ← you are here + 3. EnrichLevers — adds description, synergy, and conflict text + 4. FocusOnVitalFewLevers — filters down to 4-6 high-impact levers + 5. ScenarioGeneration — builds 3 scenarios (aggressive, medium, safe) + 6. ScenarioSelection — picks the best-fitting scenario + +Step 1 intentionally over-generates. This step's job is to remove +near-duplicates and tag each surviving lever as primary (strategic) or +secondary (operational). Over-removal is worse than over-inclusion — +step 4 handles further filtering. The classification field (primary/ +secondary) is consumed by downstream steps for prioritization. + +Known problems to guard against +-------------------------------- +- Blanket-primary. Weak models score nearly every lever as 2, + performing zero removals. Watch for runs where all scores are >= 1. +- Over-inclusion. Mid-tier models keep 10-12 of 15 levers instead of + the expected 5-8. Check the score distribution. +- Hierarchy-direction errors. Models score -1 on the general lever + and keep the narrow one — reversed from correct behavior. The more + general lever should survive; the specific one should be removed. +- Chain absorption. When lever A overlaps B and B overlaps C, all + three end up removed except C. Check that the surviving lever is + the most general. +- Calibration capping. Narrow calibration ranges act as stopping + signals — models stop scoring negatively once they hit a threshold. +- Definition mirroring. Weak models copy the score definition verbatim + into every justification (e.g. "addresses a real concern but does + not gate the core outcome"), producing content-free boilerplate. + The model loses the ability to distinguish levers from each other, + which also suppresses negative scores. Fix: the prompt uses a + conditional question test ("If this lever were handled wrong, would + the project fail?") rather than a reusable dictionary definition. +""" -class LeverClassificationDecision(BaseModel): - """Minimal per-lever schema. lever_id is assigned by code, not the LLM.""" - classification: Literal["keep", "absorb", "remove"] = Field( - description="What should happen to this lever: keep (distinct), absorb (overlaps another), or remove (fully redundant)." +# --- Pydantic Models --- + +class LeverScoreDecision(BaseModel): + """Score decision for a single lever.""" + lever_id: str = Field(description="The lever_id being scored.") + score: Literal[-2, -1, 0, 1, 2] = Field( + description=( + "How relevant is this lever to this specific project plan? " + "2 = highly relevant, 1 = somewhat relevant, 0 = borderline, " + "-1 = low relevance or overlaps a better lever, " + "-2 = irrelevant or fully redundant." + ) ) justification: str = Field( - description="A concise justification for the classification (~80 words). If absorbing, state which lever id it merges into." + description="Concise justification for the score (~40-80 words)." + ) + +class BatchDeduplicationResult(BaseModel): + """Complete deduplication result for all levers in a single call.""" + decisions: List[LeverScoreDecision] = Field( + description="One score decision per input lever. Must cover every lever_id from the input." ) class LeverDecision(BaseModel): + """Stored decision for each lever (used in response output).""" lever_id: str - classification: Literal["keep", "absorb", "remove"] + score: Literal[-2, -1, 0, 1, 2] justification: str class InputLever(BaseModel): @@ -47,55 +112,56 @@ class InputLever(BaseModel): review: str class OutputLever(InputLever): - """The InputLever and the deduplication justification.""" + """A lever that survived deduplication, with its classification and justification.""" + classification: Literal["primary", "secondary"] deduplication_justification: str -def _build_compact_history( - system_message_with_context: str, - prior_decisions: List[LeverDecision], -) -> List[ChatMessage]: - """Option C: replace full conversation history with a compact summary in the system message.""" - summary = "\n".join( - f"- [{d.lever_id}] {d.classification}: {d.justification[:80]}..." - for d in prior_decisions - ) - return [ - ChatMessage(role=MessageRole.SYSTEM, content=( - f"{system_message_with_context}\n\n" - f"**Prior decisions (compacted):**\n{summary}" - )), - ] - - -def _call_llm(chat_message_list: List[ChatMessage], llm: LLM) -> dict: - """Execute a structured LLM call for a single lever classification.""" - sllm = llm.as_structured_llm(LeverClassificationDecision) - chat_response = sllm.chat(chat_message_list) - return {"chat_response": chat_response, "metadata": dict(llm.metadata)} +def _score_to_classification(score: int) -> Literal["primary", "secondary", "remove"]: + """Map a Likert score to a classification label.""" + if score >= 2: + return "primary" + elif score >= 1: + return "secondary" + else: + return "remove" DEDUPLICATE_SYSTEM_PROMPT = """ -Evaluate each of the provided strategic levers individually. Classify every lever explicitly into one of: +You are evaluating a set of strategic levers for a project plan. Your task is +to score how relevant each lever is to this specific plan. -- keep: Lever is distinct, unique, and essential. -- absorb: Lever overlaps significantly with another lever. Explicitly state the lever ID it should be merged into. -- remove: Lever is fully redundant. Removing it loses no meaningful detail. Use this sparingly. +**Scoring scale:** -Provide concise, explicit justifications mentioning lever IDs clearly. Always prefer "absorb" over "remove" to retain important details. +- **2** (highly relevant): This lever directly addresses a core challenge or + opportunity in the plan. The plan would be significantly weaker without it. -Always provide a justification for the classification. Explain why the lever is distinct from others. Don't use the same uninformative boilerplate. +- **1** (somewhat relevant): This lever addresses a real concern in the plan + but is not central to the project's success. -Respect Hierarchy: When absorbing, merge the more specific lever into the more general one. -Don't take the more general lever and absorb it into a narrower one. -Also compare a lever against the group of already-merged levers. +- **0** (borderline): Marginal relevance. Could be included or excluded + without significant impact on the plan. -Use "keep" if you lack understanding of what the lever is doing. This way a potential important lever is not getting removed. -Describe what the issue is in the justification. +- **-1** (low relevance): This lever adds little value — either because it + overlaps substantially with a more relevant lever, or because it addresses + a concern that is peripheral to this plan. -Don't play it too safe, so you fail to perform the core task: consolidate the levers and get rid of the duplicates. +- **-2** (irrelevant): This lever does not meaningfully contribute to the plan. + It is redundant, off-topic, or its concern is already fully covered by + other levers. -You must classify and justify **every lever** provided in the input. +**Rules:** + +- Score every lever in the input. Do not skip any. +- Read the project context carefully. A lever that sounds important in general + may be irrelevant to this specific plan, and vice versa. +- Each justification must explain your reasoning in terms of the plan. +- When two levers cover similar ground, score the more general one higher and + the more specific one lower. +- Expect 25-50% of levers to score 0 or below. If you score everything 1 or + 2, reconsider — the input almost always contains overlap and redundancy. +- You see the full list at once. Compare all levers against each other before + assigning scores. """ @dataclass @@ -110,14 +176,11 @@ class DeduplicateLevers: @classmethod def execute(cls, llm_executor: LLMExecutor, project_context: str, raw_levers_list: List[dict]) -> 'DeduplicateLevers': """ - Executes the deduplication process. - - Args: - llm_executor: The configured LLMExecutor instance. - raw_levers_list: A list of dictionaries, each representing a lever. + Executes the deduplication process using a single batch LLM call. - Returns: - An instance of DeduplicateLevers containing the results. + All levers are scored simultaneously on a Likert scale (-2 to +2). + Levers scoring >= 1 are kept as primary (2) or secondary (1). + Levers scoring <= 0 are removed. """ try: input_levers = [InputLever(**lever) for lever in raw_levers_list] @@ -127,136 +190,97 @@ def execute(cls, llm_executor: LLMExecutor, project_context: str, raw_levers_lis if not input_levers: raise ValueError("No input levers to deduplicate.") - logger.info(f"Starting deduplication for {len(input_levers)} levers.") + logger.info(f"Starting deduplication for {len(input_levers)} levers (single-call scoring).") levers_json = json.dumps([lever.model_dump() for lever in input_levers], indent=2) system_prompt = DEDUPLICATE_SYSTEM_PROMPT.strip() - # Build a summary of all levers for comparison context (shared across all per-lever calls). - all_levers_summary = "\n".join( - f"- [{lever.lever_id}] {lever.name}: {lever.consequences[:120]}..." - for lever in input_levers - ) - - decisions: List[LeverDecision] = [] - metadata_list: List[dict] = [] - - # Initialise conversation with full context in the system message (option A). - # System message carries project context + lever summary so the first USER - # message is the first lever — no dangling USER→USER before the first ASSISTANT. - system_message_with_context = ( - f"{system_prompt}\n\n" + # Build the single prompt with all levers. + user_prompt = ( f"**Project Context:**\n{project_context}\n\n" - f"**All levers under review:**\n{all_levers_summary}" + f"**Levers to score ({len(input_levers)} total):**\n{levers_json}\n\n" + f"Score every lever on the Likert scale (-2 to +2) with a justification." ) - chat_message_list: List[ChatMessage] = [ - ChatMessage(role=MessageRole.SYSTEM, content=system_message_with_context), + + chat_message_list = [ + ChatMessage(role=MessageRole.SYSTEM, content=system_prompt), + ChatMessage(role=MessageRole.USER, content=user_prompt), ] - # Closure captures chat_message_list by variable reference, so rebinding - # after compaction is visible on the next call without redefining the function. def execute_function(llm: LLM) -> dict: - return _call_llm(chat_message_list, llm) + sllm = llm.as_structured_llm(BatchDeduplicationResult) + chat_response = sllm.chat(chat_message_list) + return {"chat_response": chat_response, "metadata": dict(llm.metadata)} - for lever in input_levers: - lever_json = json.dumps(lever.model_dump(), indent=2) - lever_prompt = ( - f"Classify this lever (keep / absorb / remove) with a justification:\n{lever_json}" - ) - chat_message_list.append(ChatMessage(role=MessageRole.USER, content=lever_prompt)) - - decision: LeverClassificationDecision | None = None - result = None - - # First attempt with full conversation history. - try: - result = llm_executor.run(execute_function) - metadata_list.append(result.get("metadata", {})) - except PipelineStopRequested: - raise - except Exception as e: - # Option C: compact history and retry once. - logger.warning(f"Lever {lever.lever_id}: call failed ({e}). Compacting history and retrying.") - chat_message_list = _build_compact_history(system_message_with_context, decisions) - chat_message_list.append(ChatMessage(role=MessageRole.USER, content=lever_prompt)) - - # Second attempt with compacted history (only reached if first attempt failed). - if result is None: - try: - result = llm_executor.run(execute_function) - metadata_list.append(result.get("metadata", {})) - except PipelineStopRequested: - raise - except Exception as e2: - logger.warning(f"Lever {lever.lever_id}: failed after compaction ({e2}). Skipping lever.") - - # Process whichever attempt succeeded. - if result is not None: - raw = result["chat_response"].raw - if raw is not None: - decision = raw - chat_message_list.append(ChatMessage( - role=MessageRole.ASSISTANT, - content=json.dumps({"classification": decision.classification, "justification": decision.justification}), - )) - else: - logger.warning(f"Lever {lever.lever_id}: returned None raw.") - - if decision is None: - logger.warning(f"Lever {lever.lever_id}: classification failed. Defaulting to keep.") - decision = LeverClassificationDecision( - classification=LeverClassification.keep, - justification="Classification failed after retries. Keeping this lever to avoid data loss." - ) - chat_message_list.append(ChatMessage( - role=MessageRole.ASSISTANT, - content=json.dumps({"classification": decision.classification, "justification": decision.justification}), + # Single LLM call. + batch_result: BatchDeduplicationResult | None = None + metadata_list: List[dict] = [] + try: + result = llm_executor.run(execute_function) + batch_result = result["chat_response"].raw + metadata_list.append(result.get("metadata", {})) + except PipelineStopRequested: + raise + except Exception as e: + logger.error(f"Batch deduplication call failed: {e}") + + # Build decisions from the batch result. + decisions: List[LeverDecision] = [] + input_lever_ids = {lever.lever_id for lever in input_levers} + + if batch_result is not None: + for score_decision in batch_result.decisions: + if score_decision.lever_id not in input_lever_ids: + logger.warning(f"LLM returned score for unknown lever_id: '{score_decision.lever_id}'. Skipping.") + continue + decisions.append(LeverDecision( + lever_id=score_decision.lever_id, + score=score_decision.score, + justification=score_decision.justification, )) - decisions.append(LeverDecision( - lever_id=lever.lever_id, - classification=decision.classification, - justification=decision.justification, - )) + # Handle missing decisions — any lever not scored defaults to primary. + scored_ids = {d.lever_id for d in decisions} + for lever in input_levers: + if lever.lever_id not in scored_ids: + logger.warning(f"Lever {lever.lever_id}: not scored by LLM. Defaulting to primary (score 2).") + decisions.append(LeverDecision( + lever_id=lever.lever_id, + score=2, + justification="Not scored by LLM. Keeping as primary to avoid data loss.", + )) - # Perform the deduplication. + # Build output levers (keep score >= 1). decisions_by_id = {d.lever_id: d for d in decisions} output_levers = [] for lever in input_levers: - lever_decision = decisions_by_id.get(lever.lever_id) - if not lever_decision: - # Missing decision for this lever. Keep it. - deduplication_justification = "Missing deduplication justification. Keeping this lever." - output_lever = OutputLever( - **lever.model_dump(), - deduplication_justification=deduplication_justification - ) - output_levers.append(output_lever) + lever_decision = decisions_by_id[lever.lever_id] + if lever_decision.score < 1: continue - # Check if this is a keeper - if lever_decision.classification != LeverClassification.keep: - # This is not a keeper - continue - - # This is a keeper deduplication_justification = lever_decision.justification.strip() if len(deduplication_justification) == 0: deduplication_justification = "Empty explanation. Keeping this lever." output_lever = OutputLever( **lever.model_dump(), - deduplication_justification=deduplication_justification + classification=_score_to_classification(lever_decision.score), + deduplication_justification=deduplication_justification, ) output_levers.append(output_lever) + logger.info( + f"Deduplication complete: {len(output_levers)} kept, " + f"{len(input_levers) - len(output_levers)} removed." + ) + return cls( - user_prompt=levers_json, + user_prompt=project_context, system_prompt=system_prompt, response=decisions, deduplicated_levers=output_levers, - metadata=metadata_list + metadata=metadata_list, ) def to_dict(self, include_response=True, include_deduplicated_levers=True, include_metadata=True, include_system_prompt=True, include_user_prompt=True) -> dict: diff --git a/worker_plan/worker_plan_internal/lever/enrich_potential_levers.py b/worker_plan/worker_plan_internal/lever/enrich_potential_levers.py index 308c51d6..70cc2022 100644 --- a/worker_plan/worker_plan_internal/lever/enrich_potential_levers.py +++ b/worker_plan/worker_plan_internal/lever/enrich_potential_levers.py @@ -14,7 +14,7 @@ import logging import os from dataclasses import dataclass -from typing import List, Dict, Any +from typing import List, Dict, Any, Optional from llama_index.core.llms import ChatMessage, MessageRole from llama_index.core.llms.llm import LLM @@ -30,12 +30,13 @@ # --- Pydantic Models for Data Structuring --- class InputLever(BaseModel): - """Represents a single lever loaded from the initial brainstormed file.""" + """Represents a single lever loaded from the deduplicated file.""" lever_id: str name: str consequences: str options: List[str] review: str + classification: Optional[str] = None deduplication_justification: str class LeverCharacterization(BaseModel):