diff --git a/CLAUDE.md b/CLAUDE.md index 874cd3e2..0419c55a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -24,6 +24,7 @@ Any code change must either adhere to our spec files perfectly or you should ask | `src/jarvis/utils/location.spec.md` | GeoIP location detection | Privacy-first; local GeoLite2 DB only | | `src/jarvis/memory/graph.spec.md` | Node graph memory (v2), self-organising tree, UI explorer | Dynamic structure; access-aware; auto-split/merge (future) | | `src/jarvis/memory/summariser.spec.md` | Diary summariser prompt contract and hygiene rules (deflection, attribution, topic separation) | Summariser is the source; corrupted summaries poison every downstream consumer | +| `src/jarvis/memory/recall_gate.spec.md` | Deterministic skip-enrichment heuristic when the hot window covers a follow-up | Fail-open; language-agnostic via `\w{3,}` + `re.UNICODE`; planner intent always wins | The LLM contexts graph at `docs/llm_contexts.md` maps every LLM call in the app (model, gating, inputs, outputs, limits, flow). Keep it up-to-date at all times: any change that adds, removes, or alters an LLM context (model resolution, timeout, cap, prompt source, gating flag, data-flow edge) must update `docs/llm_contexts.md` in the same PR. diff --git a/docs/llm_contexts.md b/docs/llm_contexts.md index ad0031be..ab51c827 100644 --- a/docs/llm_contexts.md +++ b/docs/llm_contexts.md @@ -11,9 +11,9 @@ Every distinct LLM call in Jarvis, what feeds it, what consumes it, and how it i - **Model / gating**: `cfg.ollama_chat_model` (the big model). Not optional. No size branching on the loop itself — size branching affects the digests/evaluator around it. - **Inputs**: - Redacted user query - - Recent dialogue (last 5 minutes) + - Recent dialogue (last 5 minutes), including in-loop tool-call + tool-role messages from prior replies within the active conversation (tool carryover, `DialogueMemory.record_tool_turn` / `get_recent_turns_with_tools` in [src/jarvis/memory/conversation.py](src/jarvis/memory/conversation.py); per-prompt cap via `cfg.tool_carryover_max_turns` / `tool_carryover_per_entry_chars`; storage cap `_tool_turns_max_storage = 16`; cleared on `stop` signal AND on new-conversation entry; UNTRUSTED WEB EXTRACT fence markers preserved on truncation; both `content` and `tool_calls[*].function.arguments` scrubbed on write) - Unified system prompt from [src/jarvis/system_prompt.py](src/jarvis/system_prompt.py) + ASR note + tool-protocol guidance - - **Warm profile block** (query-agnostic User + Directives excerpt from the knowledge graph, composed by `build_warm_profile()` / `format_warm_profile_block()` in [src/jarvis/memory/graph_ops.py](src/jarvis/memory/graph_ops.py) at Step 3.5 of `reply()`; no LLM call, pure SQLite read; injected unconditionally so personalisation is the default) + - **Warm profile block** (query-agnostic User + Directives excerpt from the knowledge graph, composed by `build_warm_profile()` / `format_warm_profile_block()` in [src/jarvis/memory/graph_ops.py](src/jarvis/memory/graph_ops.py) at Step 3.5 of `reply()`; no LLM call, pure SQLite read; injected unconditionally so personalisation is the default; result cached in `DialogueMemory._hot_cache` under `DialogueMemory.WARM_PROFILE_CACHE_KEY` for the lifetime of the active conversation. Invalidated on `stop`, on new-conversation entry, AND on User/Directives graph mutations via the listener registered in [src/jarvis/daemon.py](src/jarvis/daemon.py) against `register_graph_mutation_listener` in [src/jarvis/memory/graph.py](src/jarvis/memory/graph.py); World-branch writes are ignored) - Digested memory enrichment (optional, see #4) - Time + location context (re-injected each turn) - Tool schema: native via `generate_tools_json_schema()` ([src/jarvis/tools/registry.py](src/jarvis/tools/registry.py)) or text fallback via `_text_tool_call_guidance()` ([engine.py:68](src/jarvis/reply/engine.py:68)) @@ -44,6 +44,17 @@ Every distinct LLM call in Jarvis, what feeds it, what consumes it, and how it i - **System prompt**: inline at [enrichment.py:35-63](src/jarvis/reply/enrichment.py:35). - **Output**: `{keywords, from?, to?, questions?}`. Consumed by memory search in the reply engine. - **Limits**: up to 2 retries; timeout from `llm_tools_timeout_sec`. +- **Caching**: result cached in `DialogueMemory._hot_cache` under key `enrichment:{redacted_query[+topic_hint]}` for the lifetime of the active conversation. Identical follow-ups within the same conversation reuse the dict and skip the LLM hop. Cleared by `clear_hot_cache()` on the `stop` signal and on new-conversation entry. + +## 3b. Recall Gate (pre-enrichment short-circuit) + +- **File**: [src/jarvis/memory/recall_gate.py](src/jarvis/memory/recall_gate.py) — `should_recall()`. +- **Trigger**: once per reply, before diary/graph/digest enrichment runs (after the planner has decided memory is potentially needed). +- **Model / gating**: NO LLM — deterministic keyword-coverage heuristic. Cheap. +- **Inputs**: query, recent dialogue (incl. tool carryover rows). +- **Output**: `False` only if hot-window contains a fresh tool result AND ≥50% of the query's content words appear in the hot-window transcript → skips diary, graph, and memory digest for this reply. Else `True`. Fail-open on any exception. Content-word extraction uses `\w{3,}` with `re.UNICODE`, so the gate works for Latin, Cyrillic, CJK, Arabic, Hebrew, etc. (per CLAUDE.md "no hardcoded language patterns"). Overlap words are run through `redact()` before being written to debug logs. +- **Planner precedence**: when the planner explicitly emitted a `searchMemory` step, the gate is bypassed — the planner has more signal than coverage and overriding it would silently drop intent. The gate only short-circuits the fail-open empty-plan path. +- **Rationale**: prevents re-running diary/graph lookups when the hot window already grounds the follow-up (e.g. "his most famous song" after a Bieber webSearch). ## 4. Memory Digest (optional, SMALL models) @@ -84,6 +95,7 @@ Every distinct LLM call in Jarvis, what feeds it, what consumes it, and how it i - **System prompt**: inline (~lines 260-315). Teaches pick up-to-5 tools or `none`. - **Output**: comma-separated tool names or `none`. Capped at `_LLM_MAX_SELECTED` (5). Always-included tools (`stop`, `toolSearchTool`) are unioned in regardless. - **Limits**: `llm_timeout_sec`. On failure → all tools. +- **Caching**: `routed_tools` cached in `DialogueMemory._hot_cache` under key `router:{redacted_query}|{strategy}|{builtin-names}|{mcp-names}` for the lifetime of the active conversation. The catalogue signature lets a mid-conversation MCP refresh invalidate the cache; `context_hint` is intentionally excluded so time/location drift inside one conversation doesn't bust it. Cleared by `clear_hot_cache()` on the `stop` signal and on new-conversation entry. ## 8. Tool Searcher (mid-loop escape hatch) diff --git a/src/desktop_app/settings_window.py b/src/desktop_app/settings_window.py index ae57c482..c1e56501 100644 --- a/src/desktop_app/settings_window.py +++ b/src/desktop_app/settings_window.py @@ -264,6 +264,12 @@ def f(key, label, desc, cat, ftype, **kw): f("memory_enrichment_source", "Enrichment Source", "Which memory system enriches replies: all (diary + graph), diary only, or graph only", "memory", "choice", choices=[("diary", "Diary only"), ("graph", "Graph only"), ("all", "All (diary + graph)")]) + f("tool_carryover_max_turns", "Tool Carryover Turns", + "How many prior replies' tool results to keep visible for follow-up questions", + "memory", "int", min_val=0, max_val=10) + f("tool_carryover_per_entry_chars", "Tool Carryover Length", + "Chars kept per carried-over tool result (UNTRUSTED fence markers preserved)", + "memory", "int", min_val=200, max_val=8000, step=100) f("agentic_max_turns", "Agentic Max Turns", "Maximum turns in agentic tool-use loops", "memory", "int", min_val=1, max_val=30) diff --git a/src/jarvis/config.py b/src/jarvis/config.py index 24117027..98586eed 100644 --- a/src/jarvis/config.py +++ b/src/jarvis/config.py @@ -169,6 +169,13 @@ class Settings: dialogue_memory_timeout: float memory_enrichment_max_results: int memory_enrichment_source: str # "all", "diary", or "graph" + # Tool-call + tool-result messages from prior replies in the hot window + # are re-injected into the next turn so follow-ups can reuse them instead + # of re-fetching. These knobs cap how many prior tool turns survive and + # how much of each tool payload is retained (the fence markers of + # UNTRUSTED WEB EXTRACT blocks are preserved on truncation). + tool_carryover_max_turns: int + tool_carryover_per_entry_chars: int # Distil diary + graph into a short relevance-filtered note via a cheap # LLM pass before injecting into the reply system prompt. When None # (the default), it auto-enables for SMALL models (≤7B) and stays off @@ -470,6 +477,9 @@ def get_default_config() -> Dict[str, Any]: "dialogue_memory_timeout": 300.0, "memory_enrichment_max_results": 3, "memory_enrichment_source": "all", # "all", "diary", or "graph" + # Tool carryover: cap re-injected prior tool turns + chars per entry. + "tool_carryover_max_turns": 2, + "tool_carryover_per_entry_chars": 1200, # None = auto (on for small models ≤7B, off for large). Set true/false to force. "memory_digest_enabled": None, # Distil raw tool results (e.g. webSearch extracts) into a short @@ -658,6 +668,8 @@ def load_settings() -> Settings: memory_enrichment_source = str(merged.get("memory_enrichment_source", "all")).lower() if memory_enrichment_source not in ("all", "diary", "graph"): memory_enrichment_source = "all" + tool_carryover_max_turns = max(0, int(merged.get("tool_carryover_max_turns", 2))) + tool_carryover_per_entry_chars = max(200, int(merged.get("tool_carryover_per_entry_chars", 1200))) _digest_raw = merged.get("memory_digest_enabled", None) memory_digest_enabled: Optional[bool] if _digest_raw is None: @@ -818,6 +830,8 @@ def load_settings() -> Settings: dialogue_memory_timeout=dialogue_memory_timeout, memory_enrichment_max_results=memory_enrichment_max_results, memory_enrichment_source=memory_enrichment_source, + tool_carryover_max_turns=tool_carryover_max_turns, + tool_carryover_per_entry_chars=tool_carryover_per_entry_chars, memory_digest_enabled=memory_digest_enabled, tool_result_digest_enabled=tool_result_digest_enabled, agentic_max_turns=agentic_max_turns, diff --git a/src/jarvis/daemon.py b/src/jarvis/daemon.py index 14806c8c..4bca8b13 100644 --- a/src/jarvis/daemon.py +++ b/src/jarvis/daemon.py @@ -44,6 +44,7 @@ # Global instances for coordination between modules _global_dialogue_memory: Optional[DialogueMemory] = None _global_stop_requested: bool = False +_warm_profile_graph_listener = None # registered callback, kept for shutdown unregister _global_tts_engine = None # TTS engine reference for face animation polling _global_dictation_engine = None # Dictation engine reference for history UI @@ -294,6 +295,7 @@ def on_token_handler(token: str): def main() -> None: """Main daemon entry point.""" global _global_dialogue_memory, _global_stop_requested, _global_tts_engine, _global_dictation_engine + global _warm_profile_graph_listener # Reset stop flag at start (in case of restart) _global_stop_requested = False @@ -348,6 +350,60 @@ def main() -> None: ) print("✓ Dialogue memory initialized", flush=True) + # Wire the conversation-scoped warm-profile cache to graph mutations. + # When the User or Directives branch is mutated mid-conversation, the + # cached warm profile is dropped so the next reply rebuilds it from + # the current graph state. World-branch writes (typical webSearch + # extractions) do not touch warm profile, so they are ignored. + try: + from .memory.graph import ( + BRANCH_DIRECTIVES, + BRANCH_USER, + register_graph_mutation_listener, + ) + + _wp_relevant_branches = {BRANCH_USER, BRANCH_DIRECTIVES} + + # Read the DialogueMemory ref through the module global at fire + # time, not via closure capture, so a future singleton swap (tests + # or hot-reload) routes invalidation to the live instance instead + # of the freed one. + def _invalidate_wp_on_graph_mutation(*, action, node_id, branch): + del action, node_id # Only the branch matters for warm-profile filtering. + if branch not in _wp_relevant_branches: + return + dm = _global_dialogue_memory + if dm is None: + return + try: + dm.invalidate_warm_profile() + debug_log( + f"warm profile invalidated by {branch} graph mutation", + "memory", + ) + except Exception as exc: + debug_log( + f"warm profile invalidation failed (non-fatal): {exc}", + "memory", + ) + + # If a previous run left a listener registered (re-entry without + # full process restart), drop it before installing the new one so + # the registry never accumulates stale closures. + if _warm_profile_graph_listener is not None: + try: + from .memory.graph import unregister_graph_mutation_listener + unregister_graph_mutation_listener(_warm_profile_graph_listener) + except Exception: + pass + register_graph_mutation_listener(_invalidate_wp_on_graph_mutation) + _warm_profile_graph_listener = _invalidate_wp_on_graph_mutation + except Exception as exc: + debug_log( + f"warm profile mutation listener wiring failed (non-fatal): {exc}", + "memory", + ) + # Knowledge graph: wipe + re-seed if the on-disk shape predates the # User/Directives/World taxonomy. Non-destructive to the diary — # users can re-import via the memory viewer. @@ -567,6 +623,19 @@ def stdin_monitor(): if tts is not None: tts.stop() db.close() + + # Drop the warm-profile graph listener so the module registry does + # not retain a closure pointing at this run's DialogueMemory after + # shutdown — relevant for tests and any embedder that re-runs the + # daemon in-process. + if _warm_profile_graph_listener is not None: + try: + from .memory.graph import unregister_graph_mutation_listener + unregister_graph_mutation_listener(_warm_profile_graph_listener) + except Exception: + pass + _warm_profile_graph_listener = None + debug_log("daemon stopped", "jarvis") print("👋 Daemon stopped", flush=True) diff --git a/src/jarvis/memory/conversation.py b/src/jarvis/memory/conversation.py index caf6886c..1adf9e25 100644 --- a/src/jarvis/memory/conversation.py +++ b/src/jarvis/memory/conversation.py @@ -8,6 +8,69 @@ from ..llm import call_llm_direct from .embeddings import get_embedding from ..debug import debug_log +from ..utils.redact import redact, scrub_secrets + + +_UNTRUSTED_FENCE_BEGIN = "<<>>" +_UNTRUSTED_FENCE_END = "<<>>" + + +def _scrub_tool_call(tc: dict) -> dict: + """Return a copy of a tool-call entry with the function arguments + scrubbed of secrets. Handles both dict and string-encoded arguments + (some providers serialise arguments as a JSON string). + """ + if not isinstance(tc, dict): + return tc + out = dict(tc) + fn = out.get("function") + if isinstance(fn, dict): + fn_out = dict(fn) + fn_out["arguments"] = _scrub_args(fn_out.get("arguments")) + out["function"] = fn_out + return out + + +def _scrub_args(args): + """Scrub a tool-call ``arguments`` value of secrets. + + Handles every shape we have seen across providers: JSON-encoded + strings, dict objects, and (rarely) lists/tuples of values. Anything + else passes through untouched — there is no safe way to scrub an + opaque scalar. + """ + if isinstance(args, str) and args: + return scrub_secrets(args) + if isinstance(args, dict): + return {k: _scrub_args(v) for k, v in args.items()} + if isinstance(args, (list, tuple)): + scrubbed = [_scrub_args(v) for v in args] + return type(args)(scrubbed) if isinstance(args, tuple) else scrubbed + return args + + +def is_tool_message(msg: dict) -> bool: + """True if a message is a tool-call request or a tool-result. + + Covers both protocols Jarvis speaks: + - Native: ``role="tool"`` for results, or ``role="assistant"`` carrying + a non-empty ``tool_calls`` list for the outbound call. + - Text-tool fallback (small models): the tool result is appended as a + ``role="user"`` message tagged with ``tool_name``. The tagging is + done by the reply engine in `src/jarvis/reply/engine.py` (see the + text-tool branch where ``"tool_name": tool_name`` is attached to + the synthetic user message). + """ + if not isinstance(msg, dict): + return False + role = msg.get("role") + if role == "tool": + return True + if role == "assistant" and msg.get("tool_calls"): + return True + if role == "user" and msg.get("tool_name"): + return True + return False def _filter_contexts_by_time( @@ -84,6 +147,36 @@ def __init__(self, inactivity_timeout: float = 300.0, max_interactions: int = 20 a diary update (same as the window, since enrichment covers older context) """ self._messages: List[Tuple[float, str, str]] = [] # (timestamp, role, content) + # Tool carryover: in-loop assistant-with-tool_calls + tool-role messages + # from prior replies, so follow-up turns within the hot window can reuse + # the prior tool output instead of re-fetching. Stored as a list of + # (timestamp, [msg_dict, ...]) where each entry is one reply's worth of + # tool-related messages. Excluded from `get_pending_chunks` so raw tool + # payloads never reach the diary summariser. + self._tool_turns: List[Tuple[float, List[dict]]] = [] + # Conversation-scoped scratch cache: per-key (timestamp, value) + # entries that survive for the lifetime of the active conversation. + # The reply engine wipes this on new-conversation entry (when + # ``has_recent_messages`` was False at turn start), and individual + # entries can be invalidated on demand (e.g. ``invalidate_warm_profile`` + # on graph mutations). The timestamp is retained so callers may + # inspect entry age, but reads are NOT bounded by RECENT_WINDOW_SEC + # any more — long active conversations would otherwise see warm + # profile / router caches expire while the session is still going. + self._hot_cache: dict[str, Tuple[float, object]] = {} + # Hard ceiling on stored tool turns. With the default + # ``tool_carryover_max_turns=2`` re-injected per reply, 16 lets a + # session accumulate roughly 8x the visible budget before the + # oldest entries get evicted; well below the prompt-bloat + # threshold, well above any realistic single-conversation need. + self._tool_turns_max_storage = 16 + # Monotonic high-water timestamp. ``time.time()`` has ~16ms + # granularity on Windows, so consecutive inserts can collide and + # break interleave ordering between text and tool messages. We + # bump the stored ts by a tiny epsilon so insertion order is + # always preserved, while keeping wall-clock semantics close + # enough for the RECENT_WINDOW_SEC cutoff. + self._last_ts: float = 0.0 self._last_activity_time: float = time.time() self._inactivity_timeout = inactivity_timeout # Unified window: context retention = forced diary update interval @@ -96,10 +189,28 @@ def __init__(self, inactivity_timeout: float = 300.0, max_interactions: int = 20 # Track the last profile used for follow-up detection self._last_profile: Optional[str] = None + def _next_ts(self) -> float: + """Return a strictly-monotonic timestamp. + + On Windows, ``time.time()`` has ~16ms granularity — consecutive + calls within the same tick return the identical float. That + breaks interleave ordering between text messages and tool turns + when both land in the same tick. We bump by a 1µs epsilon so + insertion order is always preserved while staying close enough + to wall-clock for ``RECENT_WINDOW_SEC`` filtering. + + Caller MUST hold ``_lock`` — ``_last_ts`` is shared mutable state. + """ + now = time.time() + if now <= self._last_ts: + now = self._last_ts + 1e-6 + self._last_ts = now + return now + def add_message(self, role: str, content: str) -> None: """Add a message to recent memory. Thread-safe.""" with self._lock: - timestamp = time.time() + timestamp = self._next_ts() self._messages.append((timestamp, role.strip(), content.strip())) self._last_activity_time = timestamp @@ -125,6 +236,160 @@ def get_recent_messages(self) -> List[dict]: return [{"role": role, "content": content} for _, role, content in recent_messages] + def record_tool_turn(self, tool_msgs: List[dict]) -> None: + """Store in-loop tool-call/tool-role messages from a just-finished reply. + + Called once per reply with the tool-related messages extracted from the + engine's messages array. These interleave with text messages on + subsequent `get_recent_turns_with_tools` calls so follow-ups can see + the prior tool output. + """ + if not tool_msgs: + return + # Scrub outside the lock, pure function over message content. + scrubbed: List[dict] = [] + for m in tool_msgs: + mm = dict(m) + c = mm.get("content") + if isinstance(c, str) and c: + # Tool outputs may contain PII or secrets (email bodies, + # API responses, scraped pages). Scrub before persisting + # so re-injection on the next turn can't leak them. + mm["content"] = scrub_secrets(c) + # Native tool-call arguments can also carry sensitive query + # text (e.g. webSearch(query="my email is alice@example.com")). + # Scrub each argument value so re-injection of the assistant + # tool_calls row on the next turn cannot leak them. + tcalls = mm.get("tool_calls") + if isinstance(tcalls, list): + mm["tool_calls"] = [_scrub_tool_call(tc) for tc in tcalls] + scrubbed.append(mm) + with self._lock: + ts = self._next_ts() + self._tool_turns.append((ts, scrubbed)) + # Bound storage to a hard ceiling. Tool turns are NOT pruned + # by RECENT_WINDOW_SEC age any more; the engine clears them + # on new-conversation entry so an active session keeps its + # carryover regardless of how long ago each tool fired. + if len(self._tool_turns) > self._tool_turns_max_storage: + self._tool_turns = self._tool_turns[-self._tool_turns_max_storage:] + + def clear_tool_carryover(self) -> None: + """Drop all stored tool-turn messages. Text messages are untouched.""" + with self._lock: + self._tool_turns = [] + + # ------------------------------------------------------------------ + # Conversation-scoped scratch cache + # ------------------------------------------------------------------ + # Primitive used by the reply engine to memoise expensive per-turn + # work that's idempotent within a single conversation: warm profile + # (SQLite reads), memory enrichment extractor (LLM call), tool + # router (LLM call). + # + # Lifetime contract: + # - Entries persist for the lifetime of the active conversation; + # they are NOT bounded by RECENT_WINDOW_SEC age. A long active + # chat keeps the warm profile / router cache hot for hours. + # - The reply engine wipes the cache when it detects a new + # conversation (i.e. ``has_recent_messages()`` was False at turn + # entry) and on the ``stop`` signal. + # - Granular invalidation hooks: ``invalidate_warm_profile()`` is + # called from a graph-mutation listener so the User / Directives + # branches stay fresh even mid-conversation. + # + # Callers pick a key that captures the invalidation contract — + # typically the redacted query for query-dependent values, or a + # constant for query-agnostic values. + + # Cache key for the warm-profile block. Centralised so the engine + # and the graph-mutation invalidator agree on it. + WARM_PROFILE_CACHE_KEY = "warm_profile_block" + + def hot_cache_get(self, key: str) -> Optional[object]: + """Return the cached value for ``key`` if present, else ``None``. + + No age-based expiry: callers control invalidation via + ``clear_hot_cache``, ``invalidate_warm_profile``, or new- + conversation reset in the engine. + """ + with self._lock: + entry = self._hot_cache.get(key) + if not entry: + return None + _ts, value = entry + return value + + def hot_cache_put(self, key: str, value: object) -> None: + """Store value under key with current timestamp.""" + with self._lock: + self._hot_cache[key] = (time.time(), value) + + def clear_hot_cache(self) -> None: + """Drop all conversation-scoped cache entries.""" + with self._lock: + self._hot_cache = {} + + def invalidate_warm_profile(self) -> None: + """Drop the cached warm-profile block. Called from the graph + mutation listener so a mid-conversation User/Directives change + is reflected on the very next turn. + """ + with self._lock: + self._hot_cache.pop(self.WARM_PROFILE_CACHE_KEY, None) + + def get_recent_turns_with_tools( + self, + max_tool_turns: int = 2, + per_entry_chars: int = 1200, + ) -> List[dict]: + """Like `get_recent_messages`, but interleaves stored tool turns in + timestamp order. Only the most recent `max_tool_turns` tool groups + survive; older ones are dropped wholesale (avoids orphan + assistant-with-tool_calls without a matching tool result, which would + break native tool calling). + """ + with self._lock: + if not self._messages and not self._tool_turns: + return [] + cutoff = time.time() - self.RECENT_WINDOW_SEC + # Build timeline of (ts, payload) where payload is either a single + # text message dict or a list of tool messages. + timeline: list = [] + for ts, role, content in self._messages: + if ts >= cutoff: + timeline.append((ts, "msg", {"role": role, "content": content})) + # Keep only the last N tool turns. Tool carryover lives for + # the conversation, not for RECENT_WINDOW_SEC: an active session + # past the window still benefits from the prior tool result. + # The engine clears ``_tool_turns`` on new-conversation entry. + for ts, msgs in self._tool_turns[-max_tool_turns:]: + truncated: list[dict] = [] + for m in msgs: + mm = dict(m) + c = mm.get("content") + if isinstance(c, str) and len(c) > per_entry_chars: + cut = c[:per_entry_chars].rstrip() + "…" + # If truncation sliced away the closing marker of an + # UNTRUSTED WEB EXTRACT fence, re-append it so the + # injection-defence fence stays intact downstream. + if ( + _UNTRUSTED_FENCE_BEGIN in cut + and _UNTRUSTED_FENCE_END not in cut + ): + cut = cut + "\n" + _UNTRUSTED_FENCE_END + mm["content"] = cut + truncated.append(mm) + timeline.append((ts, "group", truncated)) + timeline.sort(key=lambda t: t[0]) + flat: List[dict] = [] + for _, kind, payload in timeline: + if kind == "msg": + flat.append(payload) + else: + flat.extend(payload) + return flat + def has_recent_messages(self) -> bool: """Check if there are any messages in the last 5 minutes.""" with self._lock: diff --git a/src/jarvis/memory/graph.py b/src/jarvis/memory/graph.py index 168febff..a2f3c874 100644 --- a/src/jarvis/memory/graph.py +++ b/src/jarvis/memory/graph.py @@ -18,11 +18,51 @@ import uuid from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Optional +from typing import Callable, Optional from ..debug import debug_log +# ── Mutation listeners ───────────────────────────────────────────────────── +# +# Lightweight observer registry so consumers (e.g. DialogueMemory's warm +# profile cache) can invalidate derived state when a node is created, +# updated, or deleted. The listener receives the action name, node id, and +# the FIXED_BRANCH ancestor (e.g. ``"user"``, ``"directives"``, ``"world"``) +# so it can scope its reaction. Failures in listeners are logged and +# swallowed so they cannot break a write. + +_MUTATION_LISTENERS: "list[Callable[..., None]]" = [] + + +def register_graph_mutation_listener(cb: Callable[..., None]) -> None: + """Register a callback invoked after every node mutation. + + The callback is invoked with keyword arguments ``action``, ``node_id``, + and ``branch`` where ``branch`` is the id of the FIXED_BRANCH ancestor + (or the node id itself when the node is a fixed branch), or ``None`` + when the branch cannot be resolved (e.g. root mutations). + """ + if cb not in _MUTATION_LISTENERS: + _MUTATION_LISTENERS.append(cb) + + +def unregister_graph_mutation_listener(cb: Callable[..., None]) -> None: + """Remove a previously registered mutation listener (idempotent).""" + try: + _MUTATION_LISTENERS.remove(cb) + except ValueError: + pass + + +def _notify_graph_mutation(action: str, node_id: str, branch: Optional[str]) -> None: + for cb in list(_MUTATION_LISTENERS): + try: + cb(action=action, node_id=node_id, branch=branch) + except Exception as exc: + debug_log(f"graph mutation listener failed (non-fatal): {exc}", "memory") + + # ── Fact normalisation ───────────────────────────────────────────────────── # # Used for dedupe comparisons. Locale-safe — the user base includes @@ -352,6 +392,33 @@ def get_root(self) -> MemoryNode: ).fetchone() return self._row_to_node(row) + def _resolve_branch(self, node_id: Optional[str]) -> Optional[str]: + """Walk parents from ``node_id`` up to find the FIXED_BRANCH id it + belongs to (or itself, if the node IS a fixed branch). Returns + ``None`` for the root or when the node cannot be located. + + Capped at ``MAX_TRAVERSAL_DEPTH`` so a corrupt parent cycle cannot + spin the loop. SQLite reads only — safe to call from write paths. + """ + if not node_id or node_id == "root": + return None + if node_id in FIXED_BRANCH_IDS: + return node_id + current = node_id + for _ in range(MAX_TRAVERSAL_DEPTH): + row = self.conn.execute( + "SELECT parent_id FROM memory_nodes WHERE id = ?", (current,) + ).fetchone() + if row is None: + return None + parent = row["parent_id"] + if parent is None or parent == "root": + return None + if parent in FIXED_BRANCH_IDS: + return parent + current = parent + return None + def create_node( self, name: str, @@ -388,6 +455,7 @@ def create_node( self.conn.commit() debug_log(f"Created memory node '{name}' ({node_id[:8]})", "memory") + _notify_graph_mutation("create", node_id, self._resolve_branch(parent_id)) return MemoryNode( id=node_id, name=name, @@ -440,6 +508,7 @@ def update_node( ) self.conn.commit() + _notify_graph_mutation("update", node_id, self._resolve_branch(node_id)) return node def delete_node(self, node_id: str) -> bool: @@ -451,12 +520,18 @@ def delete_node(self, node_id: str) -> bool: """ if node_id == "root" or node_id in FIXED_BRANCH_IDS: return False + # Resolve branch BEFORE the delete so listeners get a meaningful + # branch attribution even though the row is about to vanish. + branch = self._resolve_branch(node_id) with self._lock: cur = self.conn.execute( "DELETE FROM memory_nodes WHERE id = ?", (node_id,) ) self.conn.commit() - return cur.rowcount > 0 + deleted = cur.rowcount > 0 + if deleted: + _notify_graph_mutation("delete", node_id, branch) + return deleted def node_contains_fact(self, node_id: str, fact: str) -> bool: """True if ``fact`` matches any line of the node's data after diff --git a/src/jarvis/memory/graph.spec.md b/src/jarvis/memory/graph.spec.md index 6833447b..28d52a80 100644 --- a/src/jarvis/memory/graph.spec.md +++ b/src/jarvis/memory/graph.spec.md @@ -89,6 +89,14 @@ Any node except root can be deleted. Children are orphaned (parent_id set to NUL Increments `access_count` and updates `last_accessed`. Called automatically when a node is viewed in the UI or retrieved during query traversal. +### Mutation Listeners + +The graph module exposes a small observer registry, `register_graph_mutation_listener(cb)` / `unregister_graph_mutation_listener(cb)`, invoked after every successful `create_node`, `update_node`, `delete_node`, and (transitively) `append_to_node`. Callbacks receive `action`, `node_id`, and `branch` (the FIXED_BRANCH ancestor id, or `None` for root-level mutations and unresolvable nodes). Listener exceptions are logged via `debug_log` and swallowed so they cannot break a write. + +Touch is intentionally NOT a mutation event: it changes access metadata only, not the warm-profile-relevant fields, so it does not need to invalidate caches. + +The reply layer uses this hook from `daemon.py` to invalidate `DialogueMemory`'s warm-profile cache when the User or Directives branches change mid-conversation. World-branch writes are filtered out because the warm profile does not include the world branch. + ### Access Decay All ordering by access frequency uses a **time-decayed score** computed at query time: `access_count / (1 + age_days / half_life)`. This is hyperbolic decay — a node's effective score halves every `DECAY_HALF_LIFE_DAYS` (default 14) since its last access. The raw `access_count` is never modified, so changing the half-life retroactively reweights all nodes. This applies to `get_top_nodes`, `get_children`, `get_all_nodes`, and `search_nodes` tie-breaking. diff --git a/src/jarvis/memory/recall_gate.py b/src/jarvis/memory/recall_gate.py new file mode 100644 index 00000000..8413f134 --- /dev/null +++ b/src/jarvis/memory/recall_gate.py @@ -0,0 +1,96 @@ +"""Cheap heuristic for deciding whether long-term memory enrichment (diary +recall, graph recall, memory digest) is worth running for the current query. + +When the hot-window transcript already covers the topic (same content words +*and* a fresh tool result is present), running the diary/graph hops adds cost +and context bloat for no new information. Fail open: if in doubt, recall. + +No LLM hop — keyword Jaccard + tool-row presence is deterministic and cheap. +""" +from __future__ import annotations + +import re +from typing import List + +from ..debug import debug_log +from ..utils.redact import redact + + +_STOPWORDS = { + "a", "an", "the", "and", "or", "but", "if", "then", "is", "are", "was", + "were", "be", "been", "being", "do", "does", "did", "have", "has", "had", + "of", "in", "on", "at", "to", "for", "with", "by", "from", "about", + "what", "who", "where", "when", "why", "how", "which", "whose", + "it", "this", "that", "these", "those", "his", "her", "their", "my", + "your", "our", "me", "you", "i", "we", "they", "he", "she", "them", + "can", "could", "would", "should", "will", "may", "might", "shall", + "tell", "show", "give", "find", "know", "think", "want", "need", "get", + "so", "too", "more", "less", "some", "any", "no", "not", "also", "just", + "as", "than", "up", "out", "over", "under", "again", "further", "here", + "there", "all", "most", "other", "such", "own", "same", "very", "s", + "t", "don", "now", "ll", "m", "re", "ve", "d", +} + + +def _content_words(text: str) -> set[str]: + # \w with UNICODE (default in Py3) matches letters in any script — + # Latin, Cyrillic, CJK, Arabic, Hebrew, etc. Keeps Jarvis language-agnostic + # per CLAUDE.md. Digit-only runs are excluded by the stopword-style filter. + words = re.findall(r"\w{3,}", (text or "").lower(), flags=re.UNICODE) + return {w for w in words if w not in _STOPWORDS and not w.isdigit()} + + +def _has_fresh_tool_result(recent_messages: List[dict]) -> bool: + from .conversation import is_tool_message + return any(is_tool_message(m) for m in recent_messages) + + +def should_recall( + query: str, + recent_messages: List[dict], + *, + min_coverage: float = 0.5, +) -> bool: + """Return True iff diary/graph recall should run for this query. + + False only when: + 1. Hot-window contains at least one fresh tool result, AND + 2. At least `min_coverage` fraction of the query's content words + appear in the combined hot-window text (coverage, not symmetric + Jaccard — the window is always larger than the query). + + Fail-open: any exception or missing data → True. + """ + try: + if not recent_messages: + return True + if not _has_fresh_tool_result(recent_messages): + return True + q_words = _content_words(query) + if not q_words: + # Stopword-only query cannot justify skipping recall. + return True + window_text_parts: list[str] = [] + for m in recent_messages: + c = m.get("content") + if isinstance(c, str) and c: + window_text_parts.append(c) + window_words = _content_words(" ".join(window_text_parts)) + if not window_words: + return True + overlap = q_words & window_words + coverage = len(overlap) / len(q_words) if q_words else 0.0 + if coverage >= min_coverage: + # Overlap words come from the user query and may carry names or + # PII; push them through the structural scrub before logging so + # debug logs don't become a side-channel. + safe_overlap = redact(" ".join(sorted(overlap)[:5])) + debug_log( + f"recall gate: skip (coverage={coverage:.2f}, overlap=[{safe_overlap}])", + "memory", + ) + return False + return True + except Exception as e: + debug_log(f"recall gate failed open: {e}", "memory") + return True diff --git a/src/jarvis/memory/recall_gate.spec.md b/src/jarvis/memory/recall_gate.spec.md new file mode 100644 index 00000000..b0cfe1d4 --- /dev/null +++ b/src/jarvis/memory/recall_gate.spec.md @@ -0,0 +1,48 @@ +# Recall Gate + +A deterministic, no-LLM heuristic that lets the reply engine skip diary, graph and memory-digest enrichment when the hot window already grounds the user's follow-up. + +The gate is a cheap pre-flight check, not a routing decision. It either tells the engine "keep going as planned" (recall) or "the hot window has this covered, you can short-circuit enrichment" (skip). + +## Scope + +- File: `src/jarvis/memory/recall_gate.py`. +- Caller: `run_reply_engine` in `src/jarvis/reply/engine.py`, between the planner's `needs_memory` decision and the diary/graph search. +- Inputs: the redacted user query, the recent dialogue messages (already including tool-carryover rows from prior replies in the hot window). +- Output: `True` to recall, `False` to skip. + +## When the gate runs + +The gate runs only when: + +1. The planner did **not** explicitly emit a `searchMemory` step. An explicit planner intent always wins; the gate does not second-guess it. +2. There is at least one recent message in the hot window. + +When the planner returned an empty plan (fail-open), the gate is allowed to short-circuit. When the planner returned a concrete plan that doesn't include `searchMemory`, the engine is already skipping enrichment, so the gate is a no-op. + +## Heuristic + +The gate returns `False` (skip enrichment) only if both hold: + +1. The hot window contains at least one tool-related message — i.e. an entry for which `is_tool_message()` returns true. This is the freshness signal: a tool was already invoked in this conversation, so grounded data is sitting in the messages array. +2. The query's content words have ≥ 50% overlap with the words in the hot-window transcript. Coverage is asymmetric (`|overlap| / |query_words|`), not Jaccard — long histories shouldn't penalise a short follow-up. + +Anything else returns `True`. On any exception the gate fails open with `True`. + +## Language-agnostic by construction + +Per the project's no-hardcoded-language-patterns rule, content-word extraction uses `re.findall(r"\w{3,}", text, flags=re.UNICODE)`. The unicode flag makes `\w` match Cyrillic, CJK, Arabic, Hebrew, etc. + +A small English stopword list (`is`, `the`, `what`, etc.) filters function words before scoring. Non-English queries simply skip stopword filtering — the worst case is a slightly more conservative (i.e. more recall-prone) decision, which is the safe direction for a fail-open gate. Adding language-specific stopword lists is out of scope; the heuristic is intentionally conservative and the cost of recalling unnecessarily is one extractor LLM call, not user-visible failure. + +## Privacy + +The overlap words can include user-supplied query terms. Before they reach `debug_log`, they are passed through `redact()` so emails, JWTs, and other structurally-detectable secrets in the query don't leak into logs. The gate does not store anything itself. + +## Why not have the planner do this? + +The planner is an LLM call and runs once per turn regardless. Adding "is the hot window enough?" to its prompt would make every planner call slower and more brittle. The gate is a 1 ms pure-Python pass that only fires after the planner has decided memory might be useful, so it's strictly additive and trivially removable. + +## Failure mode + +`should_recall()` returns `True` on every exception path. The gate cannot make a turn worse by failing — at most it stops being an optimisation. diff --git a/src/jarvis/reply/engine.py b/src/jarvis/reply/engine.py index 900a81bb..b4ee317f 100644 --- a/src/jarvis/reply/engine.py +++ b/src/jarvis/reply/engine.py @@ -703,9 +703,31 @@ def run_reply_engine(db: "Database", cfg, tts: Optional[Any], is_new_conversation = True if dialogue_memory and dialogue_memory.has_recent_messages(): - recent_messages = dialogue_memory.get_recent_messages() + if hasattr(dialogue_memory, "get_recent_turns_with_tools"): + recent_messages = dialogue_memory.get_recent_turns_with_tools( + max_tool_turns=getattr(cfg, "tool_carryover_max_turns", 2), + per_entry_chars=getattr(cfg, "tool_carryover_per_entry_chars", 1200), + ) + else: + recent_messages = dialogue_memory.get_recent_messages() is_new_conversation = False + # New conversation reset: when the previous session lapsed past the + # inactivity window, drop the conversation-scoped cache and any + # tool-carryover from the previous session. This is what bounds the + # cache lifetime now that individual entries no longer expire by age. + if is_new_conversation and dialogue_memory is not None: + if hasattr(dialogue_memory, "clear_hot_cache"): + try: + dialogue_memory.clear_hot_cache() + except Exception: + pass + if hasattr(dialogue_memory, "clear_tool_carryover"): + try: + dialogue_memory.clear_tool_carryover() + except Exception: + pass + # Refresh MCP tools on new conversation (memory expired) if is_new_conversation and getattr(cfg, "mcps", {}): try: @@ -770,18 +792,40 @@ def run_reply_engine(db: "Database", cfg, tts: Optional[Any], strategy = ToolSelectionStrategy(getattr(cfg, "tool_selection_strategy", "llm")) except ValueError: strategy = ToolSelectionStrategy.LLM - routed_tools = select_tools( - query=redacted, - builtin_tools=BUILTIN_TOOLS, - mcp_tools=mcp_tools, - strategy=strategy, - llm_base_url=cfg.ollama_base_url, - llm_model=resolve_tool_router_model(cfg), - llm_timeout_sec=float(getattr(cfg, "llm_tools_timeout_sec", 8.0)), - embed_model=getattr(cfg, "ollama_embed_model", "nomic-embed-text"), - embed_timeout_sec=float(getattr(cfg, "llm_embed_timeout_sec", 10.0)), - context_hint=context_hint, + # Hot-window cache: router output for the same redacted query and + # tool catalogue is reused within one conversation. Catalogue + # signature includes builtin + MCP tool names so a mid-window MCP + # refresh invalidates the cache. context_hint is intentionally not + # part of the key — time/location drift inside one hot window + # rarely changes the tool pick. + _router_cache_key = ( + f"router:{redacted}|" + f"{strategy.value}|" + f"{','.join(sorted(BUILTIN_TOOLS.keys()))}|" + f"{','.join(sorted((mcp_tools or {}).keys()))}" ) + _cached_routed = ( + dialogue_memory.hot_cache_get(_router_cache_key) + if dialogue_memory and hasattr(dialogue_memory, "hot_cache_get") else None + ) + if isinstance(_cached_routed, list): + routed_tools = list(_cached_routed) + debug_log("tool router served from hot-window cache", "planning") + else: + routed_tools = select_tools( + query=redacted, + builtin_tools=BUILTIN_TOOLS, + mcp_tools=mcp_tools, + strategy=strategy, + llm_base_url=cfg.ollama_base_url, + llm_model=resolve_tool_router_model(cfg), + llm_timeout_sec=float(getattr(cfg, "llm_tools_timeout_sec", 8.0)), + embed_model=getattr(cfg, "ollama_embed_model", "nomic-embed-text"), + embed_timeout_sec=float(getattr(cfg, "llm_embed_timeout_sec", 10.0)), + context_hint=context_hint, + ) + if dialogue_memory and hasattr(dialogue_memory, "hot_cache_put"): + dialogue_memory.hot_cache_put(_router_cache_key, list(routed_tools or [])) _planner_schema = generate_tools_json_schema(routed_tools, mcp_tools) _planner_tool_catalog: list[tuple[str, str]] = [] for _schema in (_planner_schema or []): @@ -819,7 +863,28 @@ def run_reply_engine(db: "Database", cfg, tts: Optional[Any], # - Plan with `searchMemory` directive → run memory enrichment. # - Plan without it → skip memory work entirely (no keyword LLM, # no diary search, no graph search, no digest LLM). - needs_memory = (not action_plan) or plan_requires_memory(action_plan) + plan_demands_memory = bool(action_plan) and plan_requires_memory(action_plan) + needs_memory = (not action_plan) or plan_demands_memory + + # Recall gate: if the hot-window already carries a fresh tool result + # covering the query topic, skip diary/graph enrichment for this turn. + # Cheap deterministic heuristic, no LLM. Fail-open on any error. + # + # Skip the gate when the planner explicitly emitted `searchMemory` — + # the planner has more signal than coverage heuristics, and overriding + # it would silently drop intent. The gate only short-circuits the + # fail-open empty-plan path. + if needs_memory and not plan_demands_memory and recent_messages: + try: + from ..memory.recall_gate import should_recall + if not should_recall(redacted, recent_messages): + debug_log( + "recall gate: hot-window covers topic, skipping enrichment", + "memory", + ) + needs_memory = False + except Exception as exc: # noqa: BLE001 + debug_log(f"recall gate failed (fail-open): {exc}", "memory") # Topic hint from the directive (if any) — passed to the memory # extractor so keyword selection is anchored on what the planner # actually wanted to look up, instead of re-deriving from the raw @@ -843,26 +908,50 @@ def run_reply_engine(db: "Database", cfg, tts: Optional[Any], # interest me" can be answered directly when the model already sees # the user's interests in its system prompt. warm_profile_block = "" - try: - from ..memory.graph import GraphMemoryStore - from ..memory.graph_ops import build_warm_profile, format_warm_profile_block - _graph_store_warm = GraphMemoryStore(cfg.db_path) - _warm_profile = build_warm_profile(_graph_store_warm) - warm_profile_block = format_warm_profile_block(_warm_profile) - if warm_profile_block: - _user_len = len(_warm_profile.get("user", "")) - _dir_len = len(_warm_profile.get("directives", "")) - print( - f" 🪴 Warm profile: {_user_len} user chars, " - f"{_dir_len} directive chars", - flush=True, - ) - debug_log( - f"warm profile loaded: user={_user_len} directives={_dir_len}", - "memory", - ) - except Exception as e: - debug_log(f"warm profile load failed (non-fatal): {e}", "memory") + # Conversation-scoped cache: warm profile is query-agnostic and the + # User / Directives branches change rarely, so reusing the block for + # the lifetime of the conversation saves the SQLite BFS on every + # follow-up turn. The cache is invalidated on: + # - new conversation entry (cleared above with the full hot cache), + # - the stop signal (also clears the full hot cache), + # - any User/Directives graph mutation (via the listener registered + # in daemon.py, which calls ``invalidate_warm_profile`` on the + # active DialogueMemory). + _wp_cache_key = getattr( + type(dialogue_memory), + "WARM_PROFILE_CACHE_KEY", + "warm_profile_block", + ) if dialogue_memory else "warm_profile_block" + _wp_cached = ( + dialogue_memory.hot_cache_get(_wp_cache_key) + if dialogue_memory and hasattr(dialogue_memory, "hot_cache_get") else None + ) + if isinstance(_wp_cached, str): + warm_profile_block = _wp_cached + debug_log("warm profile served from conversation cache", "memory") + else: + try: + from ..memory.graph import GraphMemoryStore + from ..memory.graph_ops import build_warm_profile, format_warm_profile_block + _graph_store_warm = GraphMemoryStore(cfg.db_path) + _warm_profile = build_warm_profile(_graph_store_warm) + warm_profile_block = format_warm_profile_block(_warm_profile) + if warm_profile_block: + _user_len = len(_warm_profile.get("user", "")) + _dir_len = len(_warm_profile.get("directives", "")) + print( + f" 🪴 Warm profile: {_user_len} user chars, " + f"{_dir_len} directive chars", + flush=True, + ) + debug_log( + f"warm profile loaded: user={_user_len} directives={_dir_len}", + "memory", + ) + if dialogue_memory and hasattr(dialogue_memory, "hot_cache_put"): + dialogue_memory.hot_cache_put(_wp_cache_key, warm_profile_block) + except Exception as e: + debug_log(f"warm profile load failed (non-fatal): {e}", "memory") # Step 4: Memory enrichment — controlled by cfg.memory_enrichment_source # "all" = diary + graph, "diary" = diary only, "graph" = graph only @@ -895,12 +984,27 @@ def run_reply_engine(db: "Database", cfg, tts: Optional[Any], # keyword selection tracks what the planner actually # wanted to look up, not just the surface utterance. _extractor_query = f"{redacted}\n[Memory topic: {_memory_topic_hint}]" - search_params = extract_search_params_for_memory( - _extractor_query, cfg.ollama_base_url, resolve_tool_router_model(cfg), - timeout_sec=float(getattr(cfg, 'llm_tools_timeout_sec', 8.0)), - thinking=getattr(cfg, 'llm_thinking_enabled', False), - context_hint=context_hint, + # Hot-window cache: extractor output is a pure function of + # the (query, topic-hint) pair, so identical follow-ups within + # one conversation reuse the keywords/questions/from/to dict + # and skip the LLM call entirely. + _extractor_cache_key = f"enrichment:{_extractor_query}" + _cached_params = ( + dialogue_memory.hot_cache_get(_extractor_cache_key) + if dialogue_memory and hasattr(dialogue_memory, "hot_cache_get") else None ) + if isinstance(_cached_params, dict): + search_params = _cached_params + debug_log("memory extractor served from hot-window cache", "memory") + else: + search_params = extract_search_params_for_memory( + _extractor_query, cfg.ollama_base_url, resolve_tool_router_model(cfg), + timeout_sec=float(getattr(cfg, 'llm_tools_timeout_sec', 8.0)), + thinking=getattr(cfg, 'llm_thinking_enabled', False), + context_hint=context_hint, + ) + if dialogue_memory and hasattr(dialogue_memory, "hot_cache_put"): + dialogue_memory.hot_cache_put(_extractor_cache_key, search_params) keywords = search_params.get('keywords', []) questions = search_params.get('questions', []) if keywords: @@ -1271,6 +1375,27 @@ def _build_initial_system_message() -> str: user_msg_index = len(messages) messages.append({"role": "user", "content": redacted}) + # Idempotent flag — once carryover capture runs (success, error, or stop), + # don't run it again. Lets us call _maybe_record_tool_carryover from any + # exit path safely. + _carryover_state = {"recorded": False} + + def _maybe_record_tool_carryover() -> None: + if _carryover_state["recorded"]: + return + _carryover_state["recorded"] = True + if not dialogue_memory or not hasattr(dialogue_memory, "record_tool_turn"): + return + try: + from ..memory.conversation import is_tool_message + tool_msgs = [ + m for m in messages[user_msg_index + 1:] if is_tool_message(m) + ] + if tool_msgs: + dialogue_memory.record_tool_turn(tool_msgs) + except Exception as exc: # noqa: BLE001 + debug_log(f"tool-carryover record failed: {exc}", "reply") + def _extract_structured_tool_call(resp: dict): try: if isinstance(resp, dict) and isinstance(resp.get("message"), dict): @@ -1831,6 +1956,22 @@ def _extract_text_from_json_response(content: str) -> Optional[str]: except Exception: pass + # Stop is a dismissal — clear any tool carryover from the + # prior turn so the next wake-word turn starts fresh, and + # mark carryover as "recorded" so we don't re-inject this + # turn's stop call into future turns. + _carryover_state["recorded"] = True + if dialogue_memory and hasattr(dialogue_memory, "clear_tool_carryover"): + try: + dialogue_memory.clear_tool_carryover() + except Exception: + pass + if dialogue_memory and hasattr(dialogue_memory, "clear_hot_cache"): + try: + dialogue_memory.clear_hot_cache() + except Exception: + pass + # Return None to signal no response should be generated # Don't add to dialogue memory - this is a dismissal, not a conversation return None @@ -2076,6 +2217,7 @@ def _extract_text_from_json_response(content: str) -> Optional[str]: if dialogue_memory is not None: try: dialogue_memory.add_message("user", redacted) + _maybe_record_tool_carryover() dialogue_memory.add_message("assistant", reply) debug_log("error interaction added to dialogue memory", "memory") except Exception as e: @@ -2108,6 +2250,10 @@ def _extract_text_from_json_response(content: str) -> Optional[str]: # Add user message dialogue_memory.add_message("user", redacted) + # Capture this turn's tool-call + tool-result messages so the next + # reply within the hot window can reuse them instead of re-fetching. + _maybe_record_tool_carryover() + # Add assistant reply if we have one if reply and reply.strip(): dialogue_memory.add_message("assistant", reply.strip()) diff --git a/src/jarvis/reply/reply.spec.md b/src/jarvis/reply/reply.spec.md index 88eaab73..c2d88387 100644 --- a/src/jarvis/reply/reply.spec.md +++ b/src/jarvis/reply/reply.spec.md @@ -30,6 +30,13 @@ Design principles enforced by the engine: 2. Recent Dialogue Context - Include short-term dialogue memory (last 5 minutes) as prior messages. + - The fetch returns not only user/assistant prose but also **tool-call and tool-result messages** from in-loop work in prior replies within the active conversation (capped per-prompt by `cfg.tool_carryover_max_turns` and `cfg.tool_carryover_per_entry_chars`, fence markers of UNTRUSTED WEB EXTRACT blocks preserved on truncation, payloads scrubbed including `tool_calls[*].function.arguments`). This lets follow-up turns reuse a prior `webSearch` / MCP result instead of re-fetching it. Carryover is captured at the end of each reply (success or error). It survives for the lifetime of the conversation and is cleared on (a) the `stop` tool, and (b) new-conversation entry, when `has_recent_messages()` was False at turn start. + - A **recall gate** (`src/jarvis/memory/recall_gate.py`, deterministic, no LLM) skips diary / graph / memory-digest enrichment when the hot window already covers the topic (≥50% content-word overlap with a fresh tool-result row). Language-agnostic via `\w{3,}` with `re.UNICODE`. Fail-open on any error. The gate is bypassed when the planner explicitly emitted a `searchMemory` step, planner intent always wins over coverage heuristics. See `src/jarvis/memory/recall_gate.spec.md`. + - **Conversation-scoped scratch cache** (`DialogueMemory.hot_cache_get` / `hot_cache_put`): a small primitive used by the engine to memoise three idempotent per-turn computations for the lifetime of the active conversation: + - **Warm profile** (`DialogueMemory.WARM_PROFILE_CACHE_KEY`, query-agnostic): skips the SQLite traversal of the User + Directives branches on every follow-up turn. Invalidated on User/Directives graph mutations via a listener registered in `daemon.py` against `register_graph_mutation_listener` (`src/jarvis/memory/graph.py`); World-branch writes do not affect it. + - **Memory enrichment extractor** (`enrichment:{redacted_query[+topic_hint]}` key): skips the small-model LLM call that derives keywords / questions / time bounds when an identical query repeats. + - **Tool router** (`router:{redacted_query}|{strategy}|{builtin-names}|{mcp-names}` key): skips the router LLM call when the query and tool catalogue match. The catalogue signature lets a mid-conversation MCP refresh invalidate the cache. + - Lifetime: entries persist until (a) the `stop` signal clears the whole cache, (b) the engine detects a new conversation at turn entry (`has_recent_messages()` was False) and clears it before running, or (c) targeted invalidation (warm profile only) on graph mutations. Entries are *not* bounded by `RECENT_WINDOW_SEC` age, so a long active session keeps them warm. 3. Pre-flight Planner - The task-list planner (`plan_query` in `src/jarvis/reply/planner.py`) runs **first**, before any memory lookup or tool routing. It sees the query, a compact dialogue snippet, and the full builtin + MCP tool catalogue (names + one-line descriptions). diff --git a/src/jarvis/utils/redact.py b/src/jarvis/utils/redact.py index 6820ac1b..15d7d088 100644 --- a/src/jarvis/utils/redact.py +++ b/src/jarvis/utils/redact.py @@ -1,13 +1,33 @@ from __future__ import annotations import re -# Deterministic structural scrub patterns +# Deterministic structural scrub patterns. Order matters: specific +# vendor-shaped tokens are matched before generic catches so the more +# informative label wins (e.g. "[REDACTED_AWS_KEY]" beats "[REDACTED_HEX]"). _REDACTION_RULES: list[tuple[re.Pattern[str], str]] = [ (re.compile(r"[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}", re.IGNORECASE), "[REDACTED_EMAIL]"), (re.compile(r"\b(?:\d[ -]*?){13,19}\b"), "[REDACTED_CARD]"), + # Vendor-specific access keys (bare, no surrounding keyword required). + (re.compile(r"\b(?:AKIA|ASIA)[0-9A-Z]{16}\b"), "[REDACTED_AWS_KEY]"), + (re.compile(r"\b(?:sk|pk|rk)_(?:live|test)_[A-Za-z0-9]{16,}\b"), "[REDACTED_STRIPE_KEY]"), + (re.compile(r"\bgh[pousr]_[A-Za-z0-9]{36,}\b"), "[REDACTED_GH_TOKEN]"), + (re.compile(r"\bsk-[A-Za-z0-9]{32,}\b"), "[REDACTED_OPENAI_KEY]"), + (re.compile(r"\bAIza[0-9A-Za-z_\-]{35}\b"), "[REDACTED_GOOG_KEY]"), + # Authorisation headers — Bearer/Basic carry credentials in line. + (re.compile(r"Authorization:\s*Bearer\s+\S+", re.IGNORECASE), "Authorization: Bearer [REDACTED]"), + (re.compile(r"Authorization:\s*Basic\s+[A-Za-z0-9+/=]+", re.IGNORECASE), "Authorization: Basic [REDACTED]"), + # Generic prefix catch — left after the vendor-specific rules so + # newer formats like gh[pousr]_ get a precise label first. (re.compile(r"\b(AWS|GH|GCP|AZURE|xox[abpcr]-)[A-Za-z0-9_\-]{10,}\b", re.IGNORECASE), "[REDACTED_TOKEN]"), (re.compile(r"\b(?:eyJ[0-9A-Za-z._\-]+)\b"), "[REDACTED_JWT]"), - (re.compile(r"\b(pass(word)?|secret|token|apikey|api_key)\s*[:=]\s*\S+\b", re.IGNORECASE), r"\1=[REDACTED]"), + # Keyword-anchored credentials. Covers refresh/access/oauth/session + # variants in addition to the original pass/secret/token/apikey set. + (re.compile( + r"\b(pass(?:word)?|secret|token|apikey|api_key|" + r"(?:refresh|access|id|oauth)_?token|session(?:_?id)?|sid)" + r"\s*[:=]\s*\S+\b", + re.IGNORECASE, + ), r"\1=[REDACTED]"), (re.compile(r"\b[0-9A-Fa-f]{32,}\b"), "[REDACTED_HEX]"), (re.compile(r"\b\d{6}\b(?=.*(otp|2fa|code))", re.IGNORECASE), "[REDACTED_OTP]"), ] @@ -21,3 +41,15 @@ def redact(text: str, max_len: int = 8000) -> str: if len(scrubbed) > max_len: scrubbed = scrubbed[:max_len] return scrubbed + + +def scrub_secrets(text: str) -> str: + """Apply the structural scrub rules without whitespace collapse or length cap. + + Use for structured content (tool output, multi-line payloads) where + preserving newlines matters but tokens/emails/etc. must still be masked. + """ + scrubbed = text + for pattern, repl in _REDACTION_RULES: + scrubbed = pattern.sub(repl, scrubbed) + return scrubbed diff --git a/tests/test_dialogue_memory_hot_cache.py b/tests/test_dialogue_memory_hot_cache.py new file mode 100644 index 00000000..91003bd2 --- /dev/null +++ b/tests/test_dialogue_memory_hot_cache.py @@ -0,0 +1,134 @@ +"""Tests for the DialogueMemory conversation-scoped scratch cache and the +``is_tool_message`` helper. + +The cache is a per-conversation primitive used by the reply engine to +memoise idempotent per-turn work (warm profile, memory extractor, tool +router). Entries persist for the lifetime of the active conversation and +are wiped on ``clear_hot_cache()``; the warm profile entry can also be +invalidated on demand via ``invalidate_warm_profile()``. +""" + +import time + +import pytest + +from src.jarvis.memory.conversation import DialogueMemory, is_tool_message + + +@pytest.mark.unit +class TestHotCachePrimitives: + def test_get_returns_none_for_missing_key(self): + dm = DialogueMemory() + assert dm.hot_cache_get("nope") is None + + def test_put_then_get_roundtrips(self): + dm = DialogueMemory() + dm.hot_cache_put("k", {"v": 1}) + assert dm.hot_cache_get("k") == {"v": 1} + + def test_entries_persist_past_recent_window_age(self): + """Cache entries are conversation-scoped, not bounded by + RECENT_WINDOW_SEC. A long active conversation must keep the + cache hot even when the original write is older than the window. + """ + dm = DialogueMemory(inactivity_timeout=300.0) + dm.hot_cache_put("k", "v") + with dm._lock: + ts, value = dm._hot_cache["k"] + dm._hot_cache["k"] = (ts - (dm.RECENT_WINDOW_SEC + 10), value) + # Age alone must NOT cause the value to disappear; only explicit + # invalidation should drop it. + assert dm.hot_cache_get("k") == "v" + + def test_invalidate_warm_profile_drops_only_that_key(self): + dm = DialogueMemory() + dm.hot_cache_put(dm.WARM_PROFILE_CACHE_KEY, "warm-block") + dm.hot_cache_put("router:abc", ["webSearch"]) + dm.invalidate_warm_profile() + assert dm.hot_cache_get(dm.WARM_PROFILE_CACHE_KEY) is None + assert dm.hot_cache_get("router:abc") == ["webSearch"] + + def test_clear_hot_cache_drops_all_entries(self): + dm = DialogueMemory() + dm.hot_cache_put("a", 1) + dm.hot_cache_put("b", 2) + dm.clear_hot_cache() + assert dm.hot_cache_get("a") is None + assert dm.hot_cache_get("b") is None + + def test_put_overwrites_existing_value(self): + dm = DialogueMemory() + dm.hot_cache_put("k", "old") + dm.hot_cache_put("k", "new") + assert dm.hot_cache_get("k") == "new" + + +@pytest.mark.unit +class TestNextTsMonotonic: + """``_next_ts`` exists because ``time.time()`` has ~16ms granularity + on Windows and consecutive calls can return identical values. Without + the epsilon bump, text/tool messages recorded in the same tick would + collide and break interleave ordering downstream. + """ + + def test_consecutive_calls_strictly_increase(self): + dm = DialogueMemory() + with dm._lock: + t1 = dm._next_ts() + t2 = dm._next_ts() + t3 = dm._next_ts() + assert t1 < t2 < t3 + + def test_advances_past_artificially_high_last_ts(self): + """Even if ``_last_ts`` is ahead of the wall clock (clock skew, + manual seed), the next call must still advance. + """ + dm = DialogueMemory() + future = time.time() + 100.0 + with dm._lock: + dm._last_ts = future + nxt = dm._next_ts() + assert nxt > future + assert nxt - future < 0.01 # only an epsilon bump, not a wall jump + + +@pytest.mark.unit +class TestToolTurnsStorageCap: + def test_tool_turns_capped_to_max_storage(self): + dm = DialogueMemory() + # Push more entries than the cap; each call appends one turn. + for i in range(dm._tool_turns_max_storage + 5): + dm.record_tool_turn([ + {"role": "tool", "tool_call_id": f"c{i}", "content": f"r{i}"}, + ]) + assert len(dm._tool_turns) == dm._tool_turns_max_storage + # The oldest entries are dropped — last one survives. + last_msg = dm._tool_turns[-1][1][0]["content"] + assert last_msg.endswith(str(dm._tool_turns_max_storage + 4)) + + +@pytest.mark.unit +class TestIsToolMessage: + def test_native_tool_role(self): + assert is_tool_message({"role": "tool", "content": "x"}) is True + + def test_assistant_with_tool_calls(self): + assert is_tool_message({ + "role": "assistant", "content": "", + "tool_calls": [{"id": "c1"}], + }) is True + + def test_assistant_without_tool_calls(self): + assert is_tool_message({"role": "assistant", "content": "hi"}) is False + + def test_text_tool_user_with_tool_name(self): + assert is_tool_message({ + "role": "user", "content": "result", "tool_name": "webSearch", + }) is True + + def test_plain_user_message(self): + assert is_tool_message({"role": "user", "content": "hi"}) is False + + def test_non_dict_returns_false(self): + assert is_tool_message("tool") is False + assert is_tool_message(None) is False diff --git a/tests/test_dialogue_memory_tool_carryover.py b/tests/test_dialogue_memory_tool_carryover.py new file mode 100644 index 00000000..59dc11b8 --- /dev/null +++ b/tests/test_dialogue_memory_tool_carryover.py @@ -0,0 +1,282 @@ +"""Tests for DialogueMemory tool-message carryover across turns. + +Behaviour under test: within the hot-window (RECENT_WINDOW_SEC), tool-call +and tool-result messages generated during one reply must be retrievable as +part of the next reply's initial messages, so follow-up turns can reuse the +prior tool output instead of re-fetching. +""" + +import time +import pytest + +from src.jarvis.memory.conversation import DialogueMemory + + +@pytest.mark.unit +class TestToolCarryover: + def test_record_tool_turn_stores_messages(self): + dm = DialogueMemory() + dm.add_message("user", "who is justin bieber") + dm.record_tool_turn([ + { + "role": "assistant", + "content": "", + "tool_calls": [ + {"id": "call_1", "type": "function", + "function": {"name": "webSearch", + "arguments": {"query": "justin bieber"}}} + ], + }, + {"role": "tool", "tool_call_id": "call_1", + "content": "Justin Bieber is a Canadian singer..."}, + ]) + dm.add_message("assistant", "He is a Canadian singer.") + + out = dm.get_recent_turns_with_tools() + roles = [m.get("role") for m in out] + # Order: user, assistant-with-tool_calls, tool, assistant + assert roles == ["user", "assistant", "tool", "assistant"] + assert out[1].get("tool_calls") + assert out[2].get("tool_call_id") == "call_1" + assert "Canadian singer" in out[2]["content"] + + def test_carryover_survives_second_add_message(self): + """Tool rows must interleave at the correct timestamps between text messages.""" + dm = DialogueMemory() + dm.add_message("user", "q1") + dm.record_tool_turn([ + {"role": "assistant", "content": "", + "tool_calls": [{"id": "c1", "type": "function", + "function": {"name": "webSearch", + "arguments": {"query": "q1"}}}]}, + {"role": "tool", "tool_call_id": "c1", "content": "r1"}, + ]) + dm.add_message("assistant", "a1") + time.sleep(0.005) + dm.add_message("user", "q2") + + out = dm.get_recent_turns_with_tools() + roles = [m.get("role") for m in out] + assert roles == ["user", "assistant", "tool", "assistant", "user"] + + def test_truncates_large_tool_content(self): + dm = DialogueMemory() + huge = "x" * 5000 + dm.add_message("user", "q") + dm.record_tool_turn([ + {"role": "assistant", "content": "", + "tool_calls": [{"id": "c1", "type": "function", + "function": {"name": "webSearch", + "arguments": {"query": "q"}}}]}, + {"role": "tool", "tool_call_id": "c1", "content": huge}, + ]) + out = dm.get_recent_turns_with_tools(per_entry_chars=1200) + tool_msg = next(m for m in out if m.get("role") == "tool") + assert len(tool_msg["content"]) <= 1201 # 1200 + ellipsis char + + def test_caps_to_max_tool_turns(self): + dm = DialogueMemory() + for i in range(4): + dm.add_message("user", f"q{i}") + dm.record_tool_turn([ + {"role": "assistant", "content": "", + "tool_calls": [{"id": f"c{i}", "type": "function", + "function": {"name": "webSearch", + "arguments": {"q": f"q{i}"}}}]}, + {"role": "tool", "tool_call_id": f"c{i}", "content": f"r{i}"}, + ]) + dm.add_message("assistant", f"a{i}") + + out = dm.get_recent_turns_with_tools(max_tool_turns=2) + tool_contents = [m["content"] for m in out if m.get("role") == "tool"] + # Only the most recent 2 tool turns survive + assert tool_contents == ["r2", "r3"] + + def test_clear_tool_carryover_drops_tool_msgs_only(self): + dm = DialogueMemory() + dm.add_message("user", "q") + dm.record_tool_turn([ + {"role": "assistant", "content": "", + "tool_calls": [{"id": "c1", "type": "function", + "function": {"name": "webSearch", + "arguments": {"q": "x"}}}]}, + {"role": "tool", "tool_call_id": "c1", "content": "r"}, + ]) + dm.add_message("assistant", "a") + + dm.clear_tool_carryover() + + out = dm.get_recent_turns_with_tools() + roles = [m.get("role") for m in out] + # Tool rows gone, but user/assistant prose preserved + assert roles == ["user", "assistant"] + + def test_tool_turns_survive_past_recent_window_age(self): + """Tool carryover is conversation-scoped, not RECENT_WINDOW_SEC- + bounded. An ongoing conversation must keep prior tool results + visible regardless of how long ago each tool fired; the engine + clears them on new-conversation entry and on ``stop``. + """ + dm = DialogueMemory(inactivity_timeout=300.0) + dm.add_message("user", "q") + dm.record_tool_turn([ + {"role": "assistant", "content": "", + "tool_calls": [{"id": "c1", "type": "function", + "function": {"name": "webSearch", + "arguments": {"q": "x"}}}]}, + {"role": "tool", "tool_call_id": "c1", "content": "r"}, + ]) + # Even when we backdate the tool-turn timestamp past the window, + # the carryover survives until explicitly cleared. + with dm._lock: + dm._tool_turns = [ + (ts - (dm.RECENT_WINDOW_SEC + 10), msgs) + for ts, msgs in dm._tool_turns + ] + + out = dm.get_recent_turns_with_tools() + assert any(m.get("role") == "tool" for m in out), ( + "tool carryover must persist beyond RECENT_WINDOW_SEC age" + ) + + dm.clear_tool_carryover() + out_after_clear = dm.get_recent_turns_with_tools() + assert not any(m.get("role") == "tool" for m in out_after_clear) + + def test_tool_call_arguments_are_scrubbed(self): + """Native tool-call arguments can carry secrets too (e.g. an + email or token in the search query). They must be scrubbed + on record so re-injection on the next turn cannot leak them. + """ + dm = DialogueMemory() + dm.record_tool_turn([ + { + "role": "assistant", + "content": "", + "tool_calls": [{ + "id": "c1", + "type": "function", + "function": { + "name": "webSearch", + "arguments": { + "query": "look up alice@example.com please", + }, + }, + }], + }, + {"role": "tool", "tool_call_id": "c1", "content": "ok"}, + ]) + stored_call = dm._tool_turns[0][1][0]["tool_calls"][0] + stored_args = stored_call["function"]["arguments"] + assert "alice@example.com" not in stored_args["query"] + assert "[REDACTED_EMAIL]" in stored_args["query"] + + def test_tool_call_arguments_list_form_is_scrubbed(self): + """Some providers / custom tools pass arguments as a list of + scalars or dicts. Each element must be scrubbed too — otherwise + a positional secret slips through. + """ + dm = DialogueMemory() + dm.record_tool_turn([ + { + "role": "assistant", + "content": "", + "tool_calls": [{ + "id": "c1", + "type": "function", + "function": { + "name": "lookup", + "arguments": [ + "alice@example.com", + {"note": "ping bob@example.com"}, + ], + }, + }], + }, + {"role": "tool", "tool_call_id": "c1", "content": "ok"}, + ]) + stored = dm._tool_turns[0][1][0]["tool_calls"][0]["function"]["arguments"] + flat = repr(stored) + assert "alice@example.com" not in flat + assert "bob@example.com" not in flat + assert flat.count("[REDACTED_EMAIL]") >= 2 + + def test_tool_call_arguments_string_form_is_scrubbed(self): + """Some providers serialise arguments as a JSON string, not a dict.""" + dm = DialogueMemory() + dm.record_tool_turn([ + { + "role": "assistant", + "content": "", + "tool_calls": [{ + "id": "c1", + "type": "function", + "function": { + "name": "webSearch", + "arguments": '{"query": "alice@example.com"}', + }, + }], + }, + {"role": "tool", "tool_call_id": "c1", "content": "ok"}, + ]) + stored_args = dm._tool_turns[0][1][0]["tool_calls"][0]["function"]["arguments"] + assert "alice@example.com" not in stored_args + assert "[REDACTED_EMAIL]" in stored_args + + def test_tool_payloads_are_scrubbed_of_secrets(self): + """Tool results may contain emails, API tokens, JWTs. record_tool_turn + must scrub those before persisting so follow-up injection can't leak. + """ + dm = DialogueMemory() + dm.add_message("user", "look up the api") + dirty = ( + "Contact: alice@example.com\n" + "Bearer token: eyJhbGciOiJIUzI1NiJ9.abc.def\n" + "Fine content stays." + ) + dm.record_tool_turn([ + {"role": "tool", "tool_call_id": "c1", "content": dirty}, + ]) + stored = dm._tool_turns[0][1][0]["content"] + assert "alice@example.com" not in stored + assert "[REDACTED_EMAIL]" in stored + assert "eyJhbGciOiJIUzI1NiJ9" not in stored + assert "Fine content stays." in stored + + def test_truncation_preserves_untrusted_fence_end_marker(self): + """When a tool result carrying an UNTRUSTED WEB EXTRACT fence is + truncated, the closing marker must be re-appended so the downstream + prompt-injection defence fence stays intact. + """ + dm = DialogueMemory() + dm.add_message("user", "q") + begin = "<<>>" + end = "<<>>" + payload = ( + "Search result:\n" + begin + "\n" + ("x" * 5000) + "\n" + end + ) + dm.record_tool_turn([ + {"role": "tool", "tool_call_id": "c1", "content": payload}, + ]) + out = dm.get_recent_turns_with_tools(per_entry_chars=500) + tool_msg = next(m for m in out if m.get("role") == "tool") + assert begin in tool_msg["content"] + assert end in tool_msg["content"], ( + "closing fence marker must survive truncation" + ) + + def test_get_pending_chunks_excludes_tool_rows(self): + """Tool messages must not pollute the diary summariser input.""" + dm = DialogueMemory() + dm.add_message("user", "q") + dm.record_tool_turn([ + {"role": "tool", "tool_call_id": "c1", + "content": "raw web extract with secrets"}, + ]) + dm.add_message("assistant", "a") + + chunks = dm.get_pending_chunks() + joined = " | ".join(chunks) + assert "raw web extract" not in joined + assert "User: q" in joined + assert "Assistant: a" in joined diff --git a/tests/test_engine_hot_window_caches.py b/tests/test_engine_hot_window_caches.py new file mode 100644 index 00000000..5a316b14 --- /dev/null +++ b/tests/test_engine_hot_window_caches.py @@ -0,0 +1,256 @@ +"""End-to-end coverage for the hot-window scratch caches in run_reply_engine. + +Three caches share one primitive (DialogueMemory.hot_cache_*): + +1. Warm profile block — query-agnostic, keyed on a constant. +2. Memory enrichment extractor — keyed on the redacted query (+topic hint). +3. Tool router output — keyed on redacted query + strategy + catalogue. + +All three should fire on the second matching turn within the hot window so +follow-up queries don't pay for SQLite reads or LLM hops they already did. + +Also covers the C1 fix: when the planner explicitly emits a `searchMemory` +step, the recall gate must NOT short-circuit memory enrichment even when +hot-window coverage is high. +""" + +from unittest.mock import Mock, patch + +import pytest + +from src.jarvis.memory.conversation import DialogueMemory +from src.jarvis.reply.engine import run_reply_engine + + +def _mock_cfg(): + cfg = Mock() + cfg.ollama_base_url = "http://localhost:11434" + cfg.ollama_chat_model = "test-large" + cfg.voice_debug = False + cfg.llm_tools_timeout_sec = 8.0 + cfg.llm_embed_timeout_sec = 10.0 + cfg.llm_chat_timeout_sec = 45.0 + cfg.llm_digest_timeout_sec = 8.0 + cfg.memory_enrichment_max_results = 5 + cfg.memory_enrichment_source = "diary" + cfg.memory_digest_enabled = False + cfg.tool_result_digest_enabled = False + cfg.location_ip_address = None + cfg.location_auto_detect = False + cfg.location_enabled = False + cfg.agentic_max_turns = 8 + cfg.tool_search_max_calls = 3 + cfg.tool_selection_strategy = "all" + cfg.tool_carryover_max_turns = 2 + cfg.tool_carryover_per_entry_chars = 1200 + cfg.mcps = {} + cfg.llm_thinking_enabled = False + cfg.tts_engine = "none" + cfg.ollama_embed_model = "test-embed" + cfg.db_path = ":memory:" + return cfg + + +@pytest.mark.unit +@patch("src.jarvis.memory.graph_ops.format_warm_profile_block", return_value="") +@patch("src.jarvis.memory.graph_ops.build_warm_profile", return_value={"user": "", "directives": ""}) +@patch("src.jarvis.memory.graph.GraphMemoryStore") +@patch("src.jarvis.reply.engine.select_tools", return_value=[]) +@patch("src.jarvis.reply.engine.plan_query", return_value=[]) +@patch("src.jarvis.reply.engine.extract_search_params_for_memory", return_value={}) +@patch("src.jarvis.reply.engine.extract_text_from_response") +@patch("src.jarvis.reply.engine.chat_with_messages") +def test_tool_router_cached_across_turns( + mock_chat, mock_extract, mock_extractor, mock_plan, mock_select, + _mock_graph, _mock_warm, _mock_fmt, +): + """Two identical queries within the same DialogueMemory should call the + tool router exactly once — the second turn must hit the hot-window cache. + """ + mock_chat.side_effect = [ + {"message": {"content": "hello"}}, + {"message": {"content": "hello again"}}, + ] + mock_extract.side_effect = ["hello", "hello again"] + + db = Mock() + cfg = _mock_cfg() + dm = DialogueMemory() + + run_reply_engine(db=db, cfg=cfg, tts=None, text="say hi", dialogue_memory=dm) + run_reply_engine(db=db, cfg=cfg, tts=None, text="say hi", dialogue_memory=dm) + + assert mock_select.call_count == 1, ( + f"router should be cached on identical query; called {mock_select.call_count} times" + ) + + +@pytest.mark.unit +@patch("src.jarvis.memory.graph_ops.format_warm_profile_block", return_value="") +@patch("src.jarvis.memory.graph_ops.build_warm_profile", return_value={"user": "", "directives": ""}) +@patch("src.jarvis.memory.graph.GraphMemoryStore") +@patch("src.jarvis.reply.engine.select_tools", return_value=[]) +@patch("src.jarvis.reply.engine.plan_query", return_value=[]) +@patch("src.jarvis.reply.engine.extract_search_params_for_memory", return_value={"keywords": ["x"], "questions": []}) +@patch("src.jarvis.memory.conversation.search_conversation_memory_by_keywords", return_value=[]) +@patch("src.jarvis.reply.engine.extract_text_from_response") +@patch("src.jarvis.reply.engine.chat_with_messages") +def test_memory_extractor_cached_across_turns( + mock_chat, mock_extract, _mock_search, mock_extractor, + _mock_plan, _mock_select, _mock_graph, _mock_warm, _mock_fmt, +): + """Empty plan → fail-open path runs the extractor. The second identical + follow-up must skip the extractor LLM call. + + The recall gate would also fire on a tool-grounded follow-up, so we + keep the dialogue free of tool messages here to exercise the extractor + path on both turns. + """ + mock_chat.side_effect = [ + {"message": {"content": "first"}}, + {"message": {"content": "second"}}, + ] + mock_extract.side_effect = ["first", "second"] + + db = Mock() + cfg = _mock_cfg() + dm = DialogueMemory() + + run_reply_engine(db=db, cfg=cfg, tts=None, + text="tell me about pushkin", dialogue_memory=dm) + run_reply_engine(db=db, cfg=cfg, tts=None, + text="tell me about pushkin", dialogue_memory=dm) + + assert mock_extractor.call_count == 1, ( + f"extractor should be cached; called {mock_extractor.call_count} times" + ) + + +@pytest.mark.unit +@patch("src.jarvis.memory.graph_ops.format_warm_profile_block", return_value="warm-block") +@patch("src.jarvis.memory.graph_ops.build_warm_profile", return_value={"user": "u", "directives": "d"}) +@patch("src.jarvis.memory.graph.GraphMemoryStore") +@patch("src.jarvis.reply.engine.select_tools", return_value=[]) +@patch("src.jarvis.reply.engine.plan_query", return_value=[]) +@patch("src.jarvis.reply.engine.extract_search_params_for_memory", return_value={}) +@patch("src.jarvis.reply.engine.extract_text_from_response") +@patch("src.jarvis.reply.engine.chat_with_messages") +def test_warm_profile_cached_across_turns( + mock_chat, mock_extract, _mock_extractor, _mock_plan, + _mock_select, _mock_graph, mock_build, _mock_fmt, +): + """Warm profile is query-agnostic; second turn must reuse the cached + block instead of re-traversing the graph store. + """ + mock_chat.side_effect = [ + {"message": {"content": "a"}}, + {"message": {"content": "b"}}, + ] + mock_extract.side_effect = ["a", "b"] + + db = Mock() + cfg = _mock_cfg() + dm = DialogueMemory() + + run_reply_engine(db=db, cfg=cfg, tts=None, text="hi", dialogue_memory=dm) + run_reply_engine(db=db, cfg=cfg, tts=None, text="anything else", dialogue_memory=dm) + + assert mock_build.call_count == 1, ( + f"warm profile should be built once and cached; got {mock_build.call_count} calls" + ) + + +@pytest.mark.unit +@patch("src.jarvis.memory.graph_ops.format_warm_profile_block", return_value="") +@patch("src.jarvis.memory.graph_ops.build_warm_profile", return_value={"user": "", "directives": ""}) +@patch("src.jarvis.memory.graph.GraphMemoryStore") +@patch("src.jarvis.reply.engine.select_tools", return_value=[]) +@patch( + "src.jarvis.reply.engine.plan_query", + return_value=["searchMemory topic='justin bieber'", "reply"], +) +@patch("src.jarvis.reply.engine.extract_search_params_for_memory", + return_value={"keywords": ["bieber"], "questions": []}) +@patch("src.jarvis.memory.conversation.search_conversation_memory_by_keywords", return_value=[]) +@patch("src.jarvis.reply.engine.extract_text_from_response") +@patch("src.jarvis.reply.engine.chat_with_messages") +def test_planner_search_memory_overrides_recall_gate( + mock_chat, mock_extract, _mock_search, mock_extractor, + _mock_plan, _mock_select, _mock_graph, _mock_warm, _mock_fmt, +): + """C1 fix: when the planner emits `searchMemory`, the recall gate must + NOT short-circuit memory enrichment even though the hot window contains + a fresh tool result that overlaps the query. + """ + mock_chat.side_effect = [ + {"message": {"content": "Canadian singer."}}, + ] + mock_extract.side_effect = ["Canadian singer."] + + db = Mock() + cfg = _mock_cfg() + dm = DialogueMemory() + # Plant a fresh tool result that would otherwise satisfy the recall gate. + dm.add_message("user", "who is justin bieber") + dm.record_tool_turn([ + {"role": "tool", "tool_call_id": "c1", + "content": "Justin Bieber is a Canadian singer with the song Baby."}, + ]) + dm.add_message("assistant", "Canadian singer.") + + run_reply_engine(db=db, cfg=cfg, tts=None, + text="bieber more about justin", dialogue_memory=dm) + + # Planner explicitly demanded memory → extractor must run. + assert mock_extractor.call_count == 1, ( + "extractor must run when planner emits searchMemory, " + "regardless of recall-gate coverage" + ) + + +@pytest.mark.unit +@patch("src.jarvis.memory.graph_ops.format_warm_profile_block", return_value="") +@patch("src.jarvis.memory.graph_ops.build_warm_profile", return_value={"user": "", "directives": ""}) +@patch("src.jarvis.memory.graph.GraphMemoryStore") +@patch("src.jarvis.reply.engine.select_tools", return_value=[]) +@patch("src.jarvis.reply.engine.plan_query", return_value=[]) +@patch("src.jarvis.reply.engine.extract_search_params_for_memory", return_value={}) +@patch("src.jarvis.reply.engine.extract_text_from_response") +@patch("src.jarvis.reply.engine.chat_with_messages") +def test_new_conversation_clears_cache_and_carryover( + mock_chat, mock_extract, _mock_extractor, _mock_plan, mock_select, + _mock_graph, _mock_warm, _mock_fmt, +): + """When the previous conversation has lapsed past the inactivity + window, the engine must wipe the conversation-scoped cache and any + leftover tool carryover before running the new turn. Otherwise stale + state from a previous session would leak into a fresh one. + """ + mock_chat.side_effect = [ + {"message": {"content": "fresh"}}, + ] + mock_extract.side_effect = ["fresh"] + + db = Mock() + cfg = _mock_cfg() + dm = DialogueMemory() + + # Plant cache + carryover from a prior (now-lapsed) session. + dm.hot_cache_put(dm.WARM_PROFILE_CACHE_KEY, "old-block") + dm.hot_cache_put("router:old", ["webSearch"]) + dm.record_tool_turn([ + {"role": "tool", "tool_call_id": "c1", "content": "ancient result"}, + ]) + assert dm._tool_turns + assert dm.hot_cache_get(dm.WARM_PROFILE_CACHE_KEY) == "old-block" + + # No recent messages → engine treats this turn as a new conversation. + run_reply_engine(db=db, cfg=cfg, tts=None, text="hello", dialogue_memory=dm) + + # Stale router entry must be gone (full hot-cache wipe), and the old + # tool carryover must not be visible to the new conversation. + assert dm.hot_cache_get("router:old") is None + # The tool carryover from before must have been cleared on entry; any + # tool turns recorded later in this turn would only come from THIS + # reply (mock chat returns a final reply with no tool calls). + assert dm._tool_turns == [] diff --git a/tests/test_engine_tool_carryover.py b/tests/test_engine_tool_carryover.py new file mode 100644 index 00000000..0a7343e4 --- /dev/null +++ b/tests/test_engine_tool_carryover.py @@ -0,0 +1,227 @@ +"""End-to-end: tool-call + tool-result messages from one reply must be +visible to the LLM on the next reply within the hot window, so the model +can synthesise from prior results rather than re-fetching. +""" + +from unittest.mock import Mock, patch + +import pytest + +from src.jarvis.memory.conversation import DialogueMemory +from src.jarvis.reply.engine import run_reply_engine + + +def _mock_cfg(): + cfg = Mock() + cfg.ollama_base_url = "http://localhost:11434" + cfg.ollama_chat_model = "test-large" # avoid SMALL-model text-tool path + cfg.voice_debug = False + cfg.llm_tools_timeout_sec = 8.0 + cfg.llm_embed_timeout_sec = 10.0 + cfg.llm_chat_timeout_sec = 45.0 + cfg.llm_digest_timeout_sec = 8.0 + cfg.memory_enrichment_max_results = 5 + cfg.memory_enrichment_source = "diary" + cfg.memory_digest_enabled = False + cfg.tool_result_digest_enabled = False + cfg.location_ip_address = None + cfg.location_auto_detect = False + cfg.location_enabled = False + cfg.agentic_max_turns = 8 + cfg.tool_search_max_calls = 3 + cfg.tool_selection_strategy = "all" + cfg.tool_carryover_max_turns = 2 + cfg.tool_carryover_per_entry_chars = 1200 + cfg.mcps = {} + cfg.llm_thinking_enabled = False + cfg.tts_engine = "none" + cfg.ollama_embed_model = "test-embed" + return cfg + + +@pytest.mark.unit +@patch("src.jarvis.reply.engine.plan_query", return_value=[]) +@patch("src.jarvis.reply.engine.extract_search_params_for_memory", return_value={}) +@patch("src.jarvis.reply.engine.run_tool_with_retries") +@patch("src.jarvis.reply.engine.extract_text_from_response") +@patch("src.jarvis.reply.engine.chat_with_messages") +def test_tool_carryover_makes_prior_result_visible_to_next_turn( + mock_chat, mock_extract, mock_tool, _mock_extract, _mock_plan +): + # Turn 1: model emits webSearch call, then final text. + mock_tool.return_value = Mock( + reply_text="Justin Bieber is a Canadian singer.", + error_message=None, + ) + mock_chat.side_effect = [ + # Turn 1a: tool call + {"message": {"content": "", "tool_calls": [{ + "id": "c1", "type": "function", + "function": {"name": "webSearch", + "arguments": {"query": "justin bieber"}}, + }]}}, + # Turn 1b: final reply + {"message": {"content": "He is a Canadian singer."}}, + # Turn 2a: final reply directly — reuse from prior context + {"message": {"content": "His breakout song was Baby."}}, + ] + mock_extract.side_effect = [ + "", + "He is a Canadian singer.", + "His breakout song was Baby.", + ] + + db = Mock() + cfg = _mock_cfg() + dm = DialogueMemory() + + run_reply_engine(db=db, cfg=cfg, tts=None, + text="who is justin bieber", + dialogue_memory=dm) + + # Confirm carryover was recorded + assert len(dm._tool_turns) == 1 + stored = dm._tool_turns[0][1] + stored_roles = [m.get("role") for m in stored] + assert "tool" in stored_roles + assert any(m.get("tool_calls") for m in stored) + + # Turn 2: query on the same topic — the turn-2 LLM call should receive + # the turn-1 tool messages in its `messages` argument. + run_reply_engine(db=db, cfg=cfg, tts=None, + text="what is his most famous song", + dialogue_memory=dm) + + # The third chat_with_messages call is turn-2's only turn (single text). + turn2_kwargs = mock_chat.call_args_list[-1].kwargs + turn2_messages = turn2_kwargs.get("messages") + roles_in_turn2 = [m.get("role") for m in turn2_messages] + assert "tool" in roles_in_turn2, ( + f"Expected prior tool-role message to be injected on turn 2; " + f"got roles={roles_in_turn2}" + ) + # The tool message content must be the prior webSearch result + tool_contents = [ + m.get("content") for m in turn2_messages if m.get("role") == "tool" + ] + assert any("Canadian singer" in (c or "") for c in tool_contents) + + +@pytest.mark.unit +@patch("src.jarvis.reply.engine.plan_query", return_value=[]) +@patch("src.jarvis.reply.engine.extract_search_params_for_memory", return_value={}) +@patch("src.jarvis.reply.engine.run_tool_with_retries") +@patch("src.jarvis.reply.engine.extract_text_from_response") +@patch("src.jarvis.reply.engine.chat_with_messages") +def test_stop_signal_clears_tool_carryover( + mock_chat, mock_extract, mock_tool, _mock_extract, _mock_plan +): + """Turn 1 runs a tool; turn 2 receives the stop signal. After turn 2, + carryover must be empty so the next wake-word turn starts fresh. + """ + from src.jarvis.tools.builtin.stop import STOP_SIGNAL + + mock_tool.side_effect = [ + Mock(reply_text="Justin Bieber is a Canadian singer.", error_message=None), + Mock(reply_text=STOP_SIGNAL, error_message=None), + ] + mock_chat.side_effect = [ + # Turn 1a: tool call + {"message": {"content": "", "tool_calls": [{ + "id": "c1", "type": "function", + "function": {"name": "webSearch", "arguments": {"query": "bieber"}}, + }]}}, + # Turn 1b: final reply + {"message": {"content": "He is a Canadian singer."}}, + # Turn 2: stop tool + {"message": {"content": "", "tool_calls": [{ + "id": "c2", "type": "function", + "function": {"name": "stop", "arguments": {}}, + }]}}, + ] + mock_extract.side_effect = ["", "He is a Canadian singer.", ""] + + db = Mock() + cfg = _mock_cfg() + dm = DialogueMemory() + + run_reply_engine(db=db, cfg=cfg, tts=None, + text="who is justin bieber", dialogue_memory=dm) + assert len(dm._tool_turns) == 1, "turn-1 tool carryover should be recorded" + + reply = run_reply_engine(db=db, cfg=cfg, tts=None, + text="stop", dialogue_memory=dm) + assert reply is None, "stop signal returns None" + assert dm._tool_turns == [], ( + "stop signal must clear carryover so the next wake-word turn is clean" + ) + + +@pytest.mark.unit +@patch("src.jarvis.reply.engine.plan_query", return_value=[]) +@patch("src.jarvis.reply.engine.extract_search_params_for_memory", return_value={}) +@patch("src.jarvis.reply.engine.run_tool_with_retries") +@patch("src.jarvis.reply.engine.extract_text_from_response") +@patch("src.jarvis.reply.engine.chat_with_messages") +def test_tool_carryover_text_tool_mode( + mock_chat, mock_extract, mock_tool, _mock_extract, _mock_plan +): + """Small-model path: tool results come back as role=user with a + ``tool_name`` tag. Carryover must pick those up too. + """ + cfg = _mock_cfg() + cfg.ollama_chat_model = "gemma4:e2b" # triggers SMALL/text-tool path + + mock_tool.return_value = Mock( + reply_text="Paris is the capital of France.", error_message=None, + ) + fence_call = ( + "```tool_call\n" + '{"name": "webSearch", "arguments": {"query": "paris"}}\n' + "```" + ) + mock_chat.side_effect = [ + # Turn 1a: text-tool call emitted inside a markdown fence + {"message": {"content": fence_call}}, + # Turn 1b: final reply + {"message": {"content": "Paris is in France."}}, + # Turn 2: follow-up reply + {"message": {"content": "Its population is about 2.1 million."}}, + ] + mock_extract.side_effect = [ + fence_call, + "Paris is in France.", + "Its population is about 2.1 million.", + ] + + db = Mock() + dm = DialogueMemory() + + run_reply_engine(db=db, cfg=cfg, tts=None, + text="what about paris", dialogue_memory=dm) + + assert len(dm._tool_turns) == 1 + stored = dm._tool_turns[0][1] + roles = [m.get("role") for m in stored] + # Text-tool fallback stores tool results as role=user with tool_name. + assert "user" in roles + assert any(m.get("tool_name") == "webSearch" for m in stored) + + run_reply_engine(db=db, cfg=cfg, tts=None, + text="tell me more", dialogue_memory=dm) + + turn2_messages = mock_chat.call_args_list[-1].kwargs.get("messages") or [] + # The prior tool payload should appear in the turn-2 messages list — + # either as role=tool (native) or role=user with tool_name (text-tool). + tool_like = [ + m for m in turn2_messages + if m.get("role") == "tool" + or (m.get("role") == "user" and m.get("tool_name")) + ] + assert tool_like, ( + f"expected prior text-tool result to be carried over; got roles=" + f"{[m.get('role') for m in turn2_messages]}" + ) + assert any( + "Paris" in (m.get("content") or "") for m in tool_like + ) diff --git a/tests/test_graph_mutation_listener.py b/tests/test_graph_mutation_listener.py new file mode 100644 index 00000000..89b8d09d --- /dev/null +++ b/tests/test_graph_mutation_listener.py @@ -0,0 +1,247 @@ +"""Tests for the graph mutation listener registry and the warm-profile +invalidation hook it powers. + +The registry lets consumers (notably ``DialogueMemory``'s warm-profile +cache) react to writes against the User / Directives branches mid- +conversation. World-branch writes must NOT invalidate the warm profile, +since the warm profile does not include world facts. +""" + +from __future__ import annotations + +import os +import tempfile + +import pytest + +from src.jarvis.memory.conversation import DialogueMemory +from src.jarvis.memory.graph import ( + BRANCH_DIRECTIVES, + BRANCH_USER, + BRANCH_WORLD, + GraphMemoryStore, + register_graph_mutation_listener, + unregister_graph_mutation_listener, +) + + +@pytest.fixture +def graph_store(): + fd, path = tempfile.mkstemp(suffix=".db") + os.close(fd) + store = GraphMemoryStore(path) + yield store + try: + os.unlink(path) + except OSError: + pass + + +@pytest.mark.unit +class TestMutationListenerRegistry: + def test_create_under_user_notifies_with_user_branch(self, graph_store): + events: list[dict] = [] + + def cb(*, action, node_id, branch): + events.append({"action": action, "node_id": node_id, "branch": branch}) + + register_graph_mutation_listener(cb) + try: + graph_store.create_node("Alice", "user fact", parent_id=BRANCH_USER) + finally: + unregister_graph_mutation_listener(cb) + + actions = [e["action"] for e in events] + branches = [e["branch"] for e in events] + assert "create" in actions + assert BRANCH_USER in branches + + def test_update_under_directives_notifies_with_directives_branch(self, graph_store): + node = graph_store.create_node( + "be brief", "rule", parent_id=BRANCH_DIRECTIVES, + ) + events: list[dict] = [] + + def cb(*, action, node_id, branch): + events.append({"action": action, "node_id": node_id, "branch": branch}) + + register_graph_mutation_listener(cb) + try: + graph_store.update_node(node.id, data="updated") + finally: + unregister_graph_mutation_listener(cb) + + update_events = [e for e in events if e["action"] == "update"] + assert update_events + assert update_events[-1]["branch"] == BRANCH_DIRECTIVES + + def test_delete_under_world_notifies_with_world_branch(self, graph_store): + node = graph_store.create_node( + "Paris", "city", parent_id=BRANCH_WORLD, + ) + events: list[dict] = [] + + def cb(*, action, node_id, branch): + events.append({"action": action, "node_id": node_id, "branch": branch}) + + register_graph_mutation_listener(cb) + try: + graph_store.delete_node(node.id) + finally: + unregister_graph_mutation_listener(cb) + + delete_events = [e for e in events if e["action"] == "delete"] + assert delete_events + assert delete_events[-1]["branch"] == BRANCH_WORLD + + def test_listener_exception_does_not_break_write(self, graph_store): + def boom(*, action, node_id, branch): + raise RuntimeError("listener should not break writes") + + register_graph_mutation_listener(boom) + try: + # Must complete despite the listener raising. + node = graph_store.create_node( + "Bob", "another user fact", parent_id=BRANCH_USER, + ) + assert graph_store.get_node(node.id) is not None + finally: + unregister_graph_mutation_listener(boom) + + def test_unregister_is_idempotent(self): + def cb(**_): + pass + + register_graph_mutation_listener(cb) + unregister_graph_mutation_listener(cb) + unregister_graph_mutation_listener(cb) # second remove must not raise + + def test_resolve_branch_returns_none_past_depth_cap(self, graph_store): + """A chain longer than ``MAX_TRAVERSAL_DEPTH`` must terminate + rather than spin. Returns ``None`` — listener treats that as + "unknown branch" and skips invalidation. + """ + from src.jarvis.memory.graph import MAX_TRAVERSAL_DEPTH + + # Build a chain of MAX_TRAVERSAL_DEPTH + 2 nodes under user; the + # tail node should still resolve because the walk can finish + # before the cap. Then create one MORE level past the cap and + # confirm it returns None. + parent_id = BRANCH_USER + chain: list = [] + for i in range(MAX_TRAVERSAL_DEPTH + 2): + n = graph_store.create_node(f"n{i}", "deep", parent_id=parent_id) + chain.append(n) + parent_id = n.id + # The deepest node is past the cap from BRANCH_USER. + assert graph_store._resolve_branch(chain[-1].id) is None + + def test_resolve_branch_handles_unknown_node_id(self, graph_store): + """A node id that does not exist returns ``None`` rather than + raising — write paths must never crash on stale ids. + """ + assert graph_store._resolve_branch("does-not-exist") is None + + def test_listener_not_called_when_create_fails(self, graph_store): + """If ``create_node`` raises (e.g. unknown parent_id), no + mutation event should fire because no row was written. + """ + events: list = [] + + def cb(*, action, node_id, branch): + events.append({"action": action, "node_id": node_id, "branch": branch}) + + register_graph_mutation_listener(cb) + try: + with pytest.raises(ValueError): + graph_store.create_node( + "Orphan", "no parent", parent_id="missing-parent", + ) + finally: + unregister_graph_mutation_listener(cb) + + assert events == [], "no mutation should be reported for failed write" + + def test_deep_descendant_resolves_to_branch(self, graph_store): + """A grandchild several levels deep under user must resolve to the + ``user`` branch so the listener can scope correctly even for nested + nodes. + """ + parent = graph_store.create_node("Profile", "child", parent_id=BRANCH_USER) + child = graph_store.create_node("Tastes", "grandchild", parent_id=parent.id) + events: list[dict] = [] + + def cb(*, action, node_id, branch): + events.append({"action": action, "node_id": node_id, "branch": branch}) + + register_graph_mutation_listener(cb) + try: + graph_store.append_to_node(child.id, "loves jazz") + finally: + unregister_graph_mutation_listener(cb) + + # append_to_node calls update_node internally → at least one update. + update_events = [e for e in events if e["action"] == "update"] + assert update_events + assert update_events[-1]["branch"] == BRANCH_USER + + +@pytest.mark.unit +class TestWarmProfileInvalidationHook: + """End-to-end: the wiring done in ``daemon.py`` invalidates the warm + profile entry on User / Directives writes but ignores World writes. + Re-create that wiring here so the test does not depend on daemon + start-up. + """ + + def _wire(self, dm: DialogueMemory): + relevant = {BRANCH_USER, BRANCH_DIRECTIVES} + + def cb(*, action, node_id, branch): + del action, node_id + if branch in relevant: + dm.invalidate_warm_profile() + + register_graph_mutation_listener(cb) + return cb + + def test_user_write_invalidates_warm_profile(self, graph_store): + dm = DialogueMemory() + dm.hot_cache_put(dm.WARM_PROFILE_CACHE_KEY, "stale-block") + dm.hot_cache_put("router:abc", ["webSearch"]) + cb = self._wire(dm) + try: + graph_store.create_node("Eve", "user fact", parent_id=BRANCH_USER) + finally: + unregister_graph_mutation_listener(cb) + + assert dm.hot_cache_get(dm.WARM_PROFILE_CACHE_KEY) is None + # Other cache entries are untouched. + assert dm.hot_cache_get("router:abc") == ["webSearch"] + + def test_directives_write_invalidates_warm_profile(self, graph_store): + dm = DialogueMemory() + dm.hot_cache_put(dm.WARM_PROFILE_CACHE_KEY, "stale-block") + cb = self._wire(dm) + try: + graph_store.create_node( + "be concise", "rule", parent_id=BRANCH_DIRECTIVES, + ) + finally: + unregister_graph_mutation_listener(cb) + + assert dm.hot_cache_get(dm.WARM_PROFILE_CACHE_KEY) is None + + def test_world_write_does_not_invalidate_warm_profile(self, graph_store): + dm = DialogueMemory() + dm.hot_cache_put(dm.WARM_PROFILE_CACHE_KEY, "fresh-block") + cb = self._wire(dm) + try: + graph_store.create_node( + "Paris", "world fact", parent_id=BRANCH_WORLD, + ) + finally: + unregister_graph_mutation_listener(cb) + + # World-branch writes are noise for the warm profile. + assert dm.hot_cache_get(dm.WARM_PROFILE_CACHE_KEY) == "fresh-block" diff --git a/tests/test_recall_gate.py b/tests/test_recall_gate.py new file mode 100644 index 00000000..76241512 --- /dev/null +++ b/tests/test_recall_gate.py @@ -0,0 +1,75 @@ +"""Tests for recall_gate.should_recall — cheap heuristic for skipping +long-term memory enrichment when the hot-window already covers the topic. +""" + +import pytest + +from src.jarvis.memory.recall_gate import should_recall + + +@pytest.mark.unit +class TestShouldRecall: + def test_empty_hot_window_always_recalls(self): + assert should_recall("who is justin bieber", []) is True + + def test_no_tool_result_in_history_always_recalls(self): + recent = [ + {"role": "user", "content": "who is justin bieber"}, + {"role": "assistant", "content": "He is a Canadian singer."}, + ] + # No tool row → no fresh grounded data → recall (diary may know more) + assert should_recall("what is his most famous song", recent) is True + + def test_tool_covered_topic_skips_recall(self): + recent = [ + {"role": "user", "content": "who is justin bieber"}, + {"role": "assistant", "content": "", "tool_calls": [ + {"id": "c1", "type": "function", + "function": {"name": "webSearch", + "arguments": {"query": "justin bieber"}}} + ]}, + {"role": "tool", "tool_call_id": "c1", + "content": "Justin Bieber is a Canadian singer with hits like Baby."}, + {"role": "assistant", "content": "Canadian singer."}, + ] + # Follow-up on same entity, fresh tool row present → skip + assert should_recall("what is his most famous song bieber hits", + recent) is False + + def test_topic_change_still_recalls(self): + recent = [ + {"role": "user", "content": "who is justin bieber"}, + {"role": "tool", "tool_call_id": "c1", + "content": "Justin Bieber is a Canadian singer."}, + {"role": "assistant", "content": "Canadian singer."}, + ] + # Completely different topic → no overlap → recall runs + assert should_recall("what's the weather in hackney", recent) is True + + def test_non_latin_script_query_matches_hot_window(self): + """Per CLAUDE.md the gate must be language-agnostic. A Cyrillic query + covered by a Cyrillic tool result should skip recall just like English. + """ + recent = [ + {"role": "user", "content": "кто такой пушкин"}, + {"role": "tool", "tool_call_id": "c1", + "content": "Пушкин это русский поэт девятнадцатого века."}, + {"role": "assistant", "content": "Русский поэт."}, + ] + assert should_recall("пушкин русский поэт стихи", recent) is False + + def test_non_latin_topic_change_still_recalls(self): + recent = [ + {"role": "user", "content": "кто такой пушкин"}, + {"role": "tool", "tool_call_id": "c1", + "content": "Пушкин это русский поэт."}, + ] + # Different topic in the same script → no overlap → recall + assert should_recall("какая сегодня погода", recent) is True + + def test_stopword_only_query_does_not_skip(self): + recent = [ + {"role": "tool", "tool_call_id": "c1", "content": "foo bar"}, + ] + # "what is it" has no content words → cannot justify skipping + assert should_recall("what is it", recent) is True diff --git a/tests/test_redact_extended.py b/tests/test_redact_extended.py new file mode 100644 index 00000000..1bdbaf92 --- /dev/null +++ b/tests/test_redact_extended.py @@ -0,0 +1,86 @@ +"""Tests for the extended structural-redaction rules added so tool-output +carryover and recall-gate debug logs cannot leak credentials. +""" + +import pytest + +from src.jarvis.utils.redact import redact, scrub_secrets + + +@pytest.mark.unit +class TestVendorAccessKeys: + def test_aws_akia_key_redacted(self): + out = redact("key=AKIAIOSFODNN7EXAMPLE rest") + assert "AKIAIOSFODNN7EXAMPLE" not in out + assert "[REDACTED_AWS_KEY]" in out + + def test_aws_asia_key_redacted(self): + out = redact("ASIAIOSFODNN7EXAMPLE") + assert "ASIAIOSFODNN7EXAMPLE" not in out + assert "[REDACTED_AWS_KEY]" in out + + def test_stripe_live_secret_redacted(self): + token = "sk_live_" + "a" * 24 + out = redact(f"see {token} please") + assert token not in out + assert "[REDACTED_STRIPE_KEY]" in out + + def test_stripe_test_publishable_redacted(self): + token = "pk_test_" + "Z" * 24 + out = redact(token) + assert token not in out + assert "[REDACTED_STRIPE_KEY]" in out + + def test_github_pat_redacted(self): + token = "ghp_" + "A" * 36 + out = redact(token) + assert token not in out + assert "[REDACTED_GH_TOKEN]" in out + + def test_openai_key_redacted(self): + token = "sk-" + "A" * 40 + out = redact(token) + assert token not in out + assert "[REDACTED_OPENAI_KEY]" in out + + def test_google_api_key_redacted(self): + token = "AIza" + "B" * 35 + out = redact(token) + assert token not in out + assert "[REDACTED_GOOG_KEY]" in out + + +@pytest.mark.unit +class TestAuthorizationHeaders: + def test_bearer_header_redacted(self): + out = scrub_secrets("Authorization: Bearer abc.def.ghi") + assert "abc.def.ghi" not in out + assert "Authorization: Bearer [REDACTED]" in out + + def test_basic_header_redacted(self): + out = scrub_secrets("Authorization: Basic dXNlcjpwYXNz") + assert "dXNlcjpwYXNz" not in out + assert "Authorization: Basic [REDACTED]" in out + + +@pytest.mark.unit +class TestKeywordAnchoredCredentials: + def test_refresh_token_keyword_redacted(self): + out = redact("refresh_token=abcdef123456") + assert "abcdef123456" not in out + assert "refresh_token=[REDACTED]" in out + + def test_access_token_keyword_redacted(self): + out = redact("access_token: zzz999") + assert "zzz999" not in out + assert "access_token=[REDACTED]" in out + + def test_session_id_redacted(self): + out = redact("session_id=deadbeefcafe") + assert "deadbeefcafe" not in out + assert "session_id=[REDACTED]" in out + + def test_oauth_token_redacted(self): + out = redact("oauth_token=qwertyuiop") + assert "qwertyuiop" not in out + assert "oauth_token=[REDACTED]" in out