From 9151f08b00cb276cd900008e066b845253c7f22e Mon Sep 17 00:00:00 2001 From: Baris Sencan Date: Sun, 26 Apr 2026 16:06:49 +0100 Subject: [PATCH 1/5] feat(memory): tool carryover + recall gate to skip redundant lookups When a follow-up turn lands within the hot window and the prior reply already pulled fresh data via a tool, re-running webSearch / diary / graph wastes calls. Two cheap, deterministic mechanisms address this: - DialogueMemory.record_tool_turn / get_recent_turns_with_tools persist in-loop tool-call + tool-result messages so the next turn sees them as prior context. Capped by tool_carryover_max_turns and tool_carryover_per_entry_chars; secrets scrubbed; UNTRUSTED WEB EXTRACT fence markers preserved on truncation; cleared on stop. - recall_gate.should_recall: language-agnostic content-word coverage heuristic (\\w{3,} with re.UNICODE). Skips diary/graph enrichment only when the hot window already contains a fresh tool-result row covering >=50% of the query. Fail-open on any error. Wires into engine.run_reply_engine via a hasattr-guarded fetch, an idempotent _maybe_record_tool_carryover helper called on success and error paths, and a clear_tool_carryover on the stop-signal path. Adds cfg fields, settings UI entries, spec + llm_contexts updates, and 19 new tests covering carryover, scrubbing, fence preservation, stop-clear, text-tool fallback, and Cyrillic recall coverage. --- docs/llm_contexts.md | 11 +- src/desktop_app/settings_window.py | 6 + src/jarvis/config.py | 14 ++ src/jarvis/memory/conversation.py | 91 ++++++++ src/jarvis/memory/recall_gate.py | 104 +++++++++ src/jarvis/reply/engine.py | 64 +++++- src/jarvis/reply/reply.spec.md | 2 + src/jarvis/utils/redact.py | 12 + tests/test_dialogue_memory_tool_carryover.py | 191 ++++++++++++++++ tests/test_engine_tool_carryover.py | 227 +++++++++++++++++++ tests/test_recall_gate.py | 75 ++++++ 11 files changed, 795 insertions(+), 2 deletions(-) create mode 100644 src/jarvis/memory/recall_gate.py create mode 100644 tests/test_dialogue_memory_tool_carryover.py create mode 100644 tests/test_engine_tool_carryover.py create mode 100644 tests/test_recall_gate.py diff --git a/docs/llm_contexts.md b/docs/llm_contexts.md index ad0031be..10a41afb 100644 --- a/docs/llm_contexts.md +++ b/docs/llm_contexts.md @@ -11,7 +11,7 @@ Every distinct LLM call in Jarvis, what feeds it, what consumes it, and how it i - **Model / gating**: `cfg.ollama_chat_model` (the big model). Not optional. No size branching on the loop itself — size branching affects the digests/evaluator around it. - **Inputs**: - Redacted user query - - Recent dialogue (last 5 minutes) + - Recent dialogue (last 5 minutes), including in-loop tool-call + tool-role messages from prior replies within the hot window (tool carryover — `DialogueMemory.record_tool_turn` / `get_recent_turns_with_tools` in [src/jarvis/memory/conversation.py](src/jarvis/memory/conversation.py); capped by `cfg.tool_carryover_max_turns` / `tool_carryover_per_entry_chars`; cleared on `stop` signal; UNTRUSTED WEB EXTRACT fence markers preserved on truncation) - Unified system prompt from [src/jarvis/system_prompt.py](src/jarvis/system_prompt.py) + ASR note + tool-protocol guidance - **Warm profile block** (query-agnostic User + Directives excerpt from the knowledge graph, composed by `build_warm_profile()` / `format_warm_profile_block()` in [src/jarvis/memory/graph_ops.py](src/jarvis/memory/graph_ops.py) at Step 3.5 of `reply()`; no LLM call, pure SQLite read; injected unconditionally so personalisation is the default) - Digested memory enrichment (optional, see #4) @@ -45,6 +45,15 @@ Every distinct LLM call in Jarvis, what feeds it, what consumes it, and how it i - **Output**: `{keywords, from?, to?, questions?}`. Consumed by memory search in the reply engine. - **Limits**: up to 2 retries; timeout from `llm_tools_timeout_sec`. +## 3b. Recall Gate (pre-enrichment short-circuit) + +- **File**: [src/jarvis/memory/recall_gate.py](src/jarvis/memory/recall_gate.py) — `should_recall()`. +- **Trigger**: once per reply, before diary/graph/digest enrichment runs (after the planner has decided memory is potentially needed). +- **Model / gating**: NO LLM — deterministic keyword-coverage heuristic. Cheap. +- **Inputs**: query, recent dialogue (incl. tool carryover rows). +- **Output**: `False` only if hot-window contains a fresh tool result AND ≥50% of the query's content words appear in the hot-window transcript → skips diary, graph, and memory digest for this reply. Else `True`. Fail-open on any exception. Content-word extraction uses `\w{3,}` with `re.UNICODE`, so the gate works for Latin, Cyrillic, CJK, Arabic, Hebrew, etc. (per CLAUDE.md "no hardcoded language patterns"). Overlap words are run through `redact()` before being written to debug logs. +- **Rationale**: prevents re-running diary/graph lookups when the hot window already grounds the follow-up (e.g. "his most famous song" after a Bieber webSearch). + ## 4. Memory Digest (optional, SMALL models) - **File**: [src/jarvis/reply/enrichment.py](src/jarvis/reply/enrichment.py) — `digest_memory_for_query()` + `_distil_batch()`. diff --git a/src/desktop_app/settings_window.py b/src/desktop_app/settings_window.py index ae57c482..c1e56501 100644 --- a/src/desktop_app/settings_window.py +++ b/src/desktop_app/settings_window.py @@ -264,6 +264,12 @@ def f(key, label, desc, cat, ftype, **kw): f("memory_enrichment_source", "Enrichment Source", "Which memory system enriches replies: all (diary + graph), diary only, or graph only", "memory", "choice", choices=[("diary", "Diary only"), ("graph", "Graph only"), ("all", "All (diary + graph)")]) + f("tool_carryover_max_turns", "Tool Carryover Turns", + "How many prior replies' tool results to keep visible for follow-up questions", + "memory", "int", min_val=0, max_val=10) + f("tool_carryover_per_entry_chars", "Tool Carryover Length", + "Chars kept per carried-over tool result (UNTRUSTED fence markers preserved)", + "memory", "int", min_val=200, max_val=8000, step=100) f("agentic_max_turns", "Agentic Max Turns", "Maximum turns in agentic tool-use loops", "memory", "int", min_val=1, max_val=30) diff --git a/src/jarvis/config.py b/src/jarvis/config.py index 24117027..98586eed 100644 --- a/src/jarvis/config.py +++ b/src/jarvis/config.py @@ -169,6 +169,13 @@ class Settings: dialogue_memory_timeout: float memory_enrichment_max_results: int memory_enrichment_source: str # "all", "diary", or "graph" + # Tool-call + tool-result messages from prior replies in the hot window + # are re-injected into the next turn so follow-ups can reuse them instead + # of re-fetching. These knobs cap how many prior tool turns survive and + # how much of each tool payload is retained (the fence markers of + # UNTRUSTED WEB EXTRACT blocks are preserved on truncation). + tool_carryover_max_turns: int + tool_carryover_per_entry_chars: int # Distil diary + graph into a short relevance-filtered note via a cheap # LLM pass before injecting into the reply system prompt. When None # (the default), it auto-enables for SMALL models (≤7B) and stays off @@ -470,6 +477,9 @@ def get_default_config() -> Dict[str, Any]: "dialogue_memory_timeout": 300.0, "memory_enrichment_max_results": 3, "memory_enrichment_source": "all", # "all", "diary", or "graph" + # Tool carryover: cap re-injected prior tool turns + chars per entry. + "tool_carryover_max_turns": 2, + "tool_carryover_per_entry_chars": 1200, # None = auto (on for small models ≤7B, off for large). Set true/false to force. "memory_digest_enabled": None, # Distil raw tool results (e.g. webSearch extracts) into a short @@ -658,6 +668,8 @@ def load_settings() -> Settings: memory_enrichment_source = str(merged.get("memory_enrichment_source", "all")).lower() if memory_enrichment_source not in ("all", "diary", "graph"): memory_enrichment_source = "all" + tool_carryover_max_turns = max(0, int(merged.get("tool_carryover_max_turns", 2))) + tool_carryover_per_entry_chars = max(200, int(merged.get("tool_carryover_per_entry_chars", 1200))) _digest_raw = merged.get("memory_digest_enabled", None) memory_digest_enabled: Optional[bool] if _digest_raw is None: @@ -818,6 +830,8 @@ def load_settings() -> Settings: dialogue_memory_timeout=dialogue_memory_timeout, memory_enrichment_max_results=memory_enrichment_max_results, memory_enrichment_source=memory_enrichment_source, + tool_carryover_max_turns=tool_carryover_max_turns, + tool_carryover_per_entry_chars=tool_carryover_per_entry_chars, memory_digest_enabled=memory_digest_enabled, tool_result_digest_enabled=tool_result_digest_enabled, agentic_max_turns=agentic_max_turns, diff --git a/src/jarvis/memory/conversation.py b/src/jarvis/memory/conversation.py index caf6886c..473479d4 100644 --- a/src/jarvis/memory/conversation.py +++ b/src/jarvis/memory/conversation.py @@ -8,6 +8,11 @@ from ..llm import call_llm_direct from .embeddings import get_embedding from ..debug import debug_log +from ..utils.redact import redact, scrub_secrets + + +_UNTRUSTED_FENCE_BEGIN = "<<>>" +_UNTRUSTED_FENCE_END = "<<>>" def _filter_contexts_by_time( @@ -84,6 +89,13 @@ def __init__(self, inactivity_timeout: float = 300.0, max_interactions: int = 20 a diary update (same as the window, since enrichment covers older context) """ self._messages: List[Tuple[float, str, str]] = [] # (timestamp, role, content) + # Tool carryover: in-loop assistant-with-tool_calls + tool-role messages + # from prior replies, so follow-up turns within the hot window can reuse + # the prior tool output instead of re-fetching. Stored as a list of + # (timestamp, [msg_dict, ...]) where each entry is one reply's worth of + # tool-related messages. Excluded from `get_pending_chunks` so raw tool + # payloads never reach the diary summariser. + self._tool_turns: List[Tuple[float, List[dict]]] = [] self._last_activity_time: float = time.time() self._inactivity_timeout = inactivity_timeout # Unified window: context retention = forced diary update interval @@ -125,6 +137,85 @@ def get_recent_messages(self) -> List[dict]: return [{"role": role, "content": content} for _, role, content in recent_messages] + def record_tool_turn(self, tool_msgs: List[dict]) -> None: + """Store in-loop tool-call/tool-role messages from a just-finished reply. + + Called once per reply with the tool-related messages extracted from the + engine's messages array. These interleave with text messages on + subsequent `get_recent_turns_with_tools` calls so follow-ups can see + the prior tool output. + """ + if not tool_msgs: + return + with self._lock: + ts = time.time() + scrubbed: List[dict] = [] + for m in tool_msgs: + mm = dict(m) + c = mm.get("content") + if isinstance(c, str) and c: + # Tool outputs may contain PII or secrets (email bodies, + # API responses, scraped pages). Scrub before persisting + # so re-injection on the next turn can't leak them. + mm["content"] = scrub_secrets(c) + scrubbed.append(mm) + self._tool_turns.append((ts, scrubbed)) + + def clear_tool_carryover(self) -> None: + """Drop all stored tool-turn messages. Text messages are untouched.""" + with self._lock: + self._tool_turns = [] + + def get_recent_turns_with_tools( + self, + max_tool_turns: int = 2, + per_entry_chars: int = 1200, + ) -> List[dict]: + """Like `get_recent_messages`, but interleaves stored tool turns in + timestamp order. Only the most recent `max_tool_turns` tool groups + survive; older ones are dropped wholesale (avoids orphan + assistant-with-tool_calls without a matching tool result, which would + break native tool calling). + """ + with self._lock: + if not self._messages and not self._tool_turns: + return [] + cutoff = time.time() - self.RECENT_WINDOW_SEC + # Build timeline of (ts, payload) where payload is either a single + # text message dict or a list of tool messages. + timeline: list = [] + for ts, role, content in self._messages: + if ts >= cutoff: + timeline.append((ts, "msg", {"role": role, "content": content})) + # Keep only the last N tool turns within the window. + live_tool_turns = [(ts, msgs) for ts, msgs in self._tool_turns if ts >= cutoff] + for ts, msgs in live_tool_turns[-max_tool_turns:]: + truncated: list[dict] = [] + for m in msgs: + mm = dict(m) + c = mm.get("content") + if isinstance(c, str) and len(c) > per_entry_chars: + cut = c[:per_entry_chars].rstrip() + "…" + # If truncation sliced away the closing marker of an + # UNTRUSTED WEB EXTRACT fence, re-append it so the + # injection-defence fence stays intact downstream. + if ( + _UNTRUSTED_FENCE_BEGIN in cut + and _UNTRUSTED_FENCE_END not in cut + ): + cut = cut + "\n" + _UNTRUSTED_FENCE_END + mm["content"] = cut + truncated.append(mm) + timeline.append((ts, "group", truncated)) + timeline.sort(key=lambda t: t[0]) + flat: List[dict] = [] + for _, kind, payload in timeline: + if kind == "msg": + flat.append(payload) + else: + flat.extend(payload) + return flat + def has_recent_messages(self) -> bool: """Check if there are any messages in the last 5 minutes.""" with self._lock: diff --git a/src/jarvis/memory/recall_gate.py b/src/jarvis/memory/recall_gate.py new file mode 100644 index 00000000..6aaa6a60 --- /dev/null +++ b/src/jarvis/memory/recall_gate.py @@ -0,0 +1,104 @@ +"""Cheap heuristic for deciding whether long-term memory enrichment (diary +recall, graph recall, memory digest) is worth running for the current query. + +When the hot-window transcript already covers the topic (same content words +*and* a fresh tool result is present), running the diary/graph hops adds cost +and context bloat for no new information. Fail open: if in doubt, recall. + +No LLM hop — keyword Jaccard + tool-row presence is deterministic and cheap. +""" +from __future__ import annotations + +import re +from typing import List + +from ..debug import debug_log +from ..utils.redact import redact + + +_STOPWORDS = { + "a", "an", "the", "and", "or", "but", "if", "then", "is", "are", "was", + "were", "be", "been", "being", "do", "does", "did", "have", "has", "had", + "of", "in", "on", "at", "to", "for", "with", "by", "from", "about", + "what", "who", "where", "when", "why", "how", "which", "whose", + "it", "this", "that", "these", "those", "his", "her", "their", "my", + "your", "our", "me", "you", "i", "we", "they", "he", "she", "them", + "can", "could", "would", "should", "will", "may", "might", "shall", + "tell", "show", "give", "find", "know", "think", "want", "need", "get", + "so", "too", "more", "less", "some", "any", "no", "not", "also", "just", + "as", "than", "up", "out", "over", "under", "again", "further", "here", + "there", "all", "most", "other", "such", "own", "same", "very", "s", + "t", "don", "now", "ll", "m", "re", "ve", "d", +} + + +def _content_words(text: str) -> set[str]: + # \w with UNICODE (default in Py3) matches letters in any script — + # Latin, Cyrillic, CJK, Arabic, Hebrew, etc. Keeps Jarvis language-agnostic + # per CLAUDE.md. Digit-only runs are excluded by the stopword-style filter. + words = re.findall(r"\w{3,}", (text or "").lower(), flags=re.UNICODE) + return {w for w in words if w not in _STOPWORDS and not w.isdigit()} + + +def _has_fresh_tool_result(recent_messages: List[dict]) -> bool: + for m in recent_messages: + role = m.get("role") + if role == "tool": + return True + if role == "assistant" and m.get("tool_calls"): + return True + # Text-tool fallback format: role=user carrying a tool_name tag. + if role == "user" and m.get("tool_name"): + return True + return False + + +def should_recall( + query: str, + recent_messages: List[dict], + *, + min_coverage: float = 0.5, +) -> bool: + """Return True iff diary/graph recall should run for this query. + + False only when: + 1. Hot-window contains at least one fresh tool result, AND + 2. At least `min_coverage` fraction of the query's content words + appear in the combined hot-window text (coverage, not symmetric + Jaccard — the window is always larger than the query). + + Fail-open: any exception or missing data → True. + """ + try: + if not recent_messages: + return True + if not _has_fresh_tool_result(recent_messages): + return True + q_words = _content_words(query) + if not q_words: + # Stopword-only query cannot justify skipping recall. + return True + window_text_parts: list[str] = [] + for m in recent_messages: + c = m.get("content") + if isinstance(c, str) and c: + window_text_parts.append(c) + window_words = _content_words(" ".join(window_text_parts)) + if not window_words: + return True + overlap = q_words & window_words + coverage = len(overlap) / len(q_words) if q_words else 0.0 + if coverage >= min_coverage: + # Overlap words come from the user query and may carry names or + # PII; push them through the structural scrub before logging so + # debug logs don't become a side-channel. + safe_overlap = redact(" ".join(sorted(overlap)[:5])) + debug_log( + f"recall gate: skip (coverage={coverage:.2f}, overlap=[{safe_overlap}])", + "memory", + ) + return False + return True + except Exception as e: + debug_log(f"recall gate failed open: {e}", "memory") + return True diff --git a/src/jarvis/reply/engine.py b/src/jarvis/reply/engine.py index 900a81bb..d089dbe9 100644 --- a/src/jarvis/reply/engine.py +++ b/src/jarvis/reply/engine.py @@ -703,7 +703,13 @@ def run_reply_engine(db: "Database", cfg, tts: Optional[Any], is_new_conversation = True if dialogue_memory and dialogue_memory.has_recent_messages(): - recent_messages = dialogue_memory.get_recent_messages() + if hasattr(dialogue_memory, "get_recent_turns_with_tools"): + recent_messages = dialogue_memory.get_recent_turns_with_tools( + max_tool_turns=getattr(cfg, "tool_carryover_max_turns", 2), + per_entry_chars=getattr(cfg, "tool_carryover_per_entry_chars", 1200), + ) + else: + recent_messages = dialogue_memory.get_recent_messages() is_new_conversation = False # Refresh MCP tools on new conversation (memory expired) @@ -820,6 +826,21 @@ def run_reply_engine(db: "Database", cfg, tts: Optional[Any], # - Plan without it → skip memory work entirely (no keyword LLM, # no diary search, no graph search, no digest LLM). needs_memory = (not action_plan) or plan_requires_memory(action_plan) + + # Recall gate: if the hot-window already carries a fresh tool result + # covering the query topic, skip diary/graph enrichment for this turn. + # Cheap deterministic heuristic, no LLM. Fail-open on any error. + if needs_memory and recent_messages: + try: + from ..memory.recall_gate import should_recall + if not should_recall(redacted, recent_messages): + debug_log( + "recall gate: hot-window covers topic, skipping enrichment", + "memory", + ) + needs_memory = False + except Exception as exc: # noqa: BLE001 + debug_log(f"recall gate failed (fail-open): {exc}", "memory") # Topic hint from the directive (if any) — passed to the memory # extractor so keyword selection is anchored on what the planner # actually wanted to look up, instead of re-deriving from the raw @@ -1271,6 +1292,31 @@ def _build_initial_system_message() -> str: user_msg_index = len(messages) messages.append({"role": "user", "content": redacted}) + # Idempotent flag — once carryover capture runs (success, error, or stop), + # don't run it again. Lets us call _maybe_record_tool_carryover from any + # exit path safely. + _carryover_state = {"recorded": False} + + def _maybe_record_tool_carryover() -> None: + if _carryover_state["recorded"]: + return + _carryover_state["recorded"] = True + if not dialogue_memory or not hasattr(dialogue_memory, "record_tool_turn"): + return + try: + tool_msgs = [ + m for m in messages[user_msg_index + 1:] + if ( + m.get("role") == "tool" + or (m.get("role") == "assistant" and m.get("tool_calls")) + or (m.get("role") == "user" and m.get("tool_name")) + ) + ] + if tool_msgs: + dialogue_memory.record_tool_turn(tool_msgs) + except Exception as exc: # noqa: BLE001 + debug_log(f"tool-carryover record failed: {exc}", "reply") + def _extract_structured_tool_call(resp: dict): try: if isinstance(resp, dict) and isinstance(resp.get("message"), dict): @@ -1831,6 +1877,17 @@ def _extract_text_from_json_response(content: str) -> Optional[str]: except Exception: pass + # Stop is a dismissal — clear any tool carryover from the + # prior turn so the next wake-word turn starts fresh, and + # mark carryover as "recorded" so we don't re-inject this + # turn's stop call into future turns. + _carryover_state["recorded"] = True + if dialogue_memory and hasattr(dialogue_memory, "clear_tool_carryover"): + try: + dialogue_memory.clear_tool_carryover() + except Exception: + pass + # Return None to signal no response should be generated # Don't add to dialogue memory - this is a dismissal, not a conversation return None @@ -2076,6 +2133,7 @@ def _extract_text_from_json_response(content: str) -> Optional[str]: if dialogue_memory is not None: try: dialogue_memory.add_message("user", redacted) + _maybe_record_tool_carryover() dialogue_memory.add_message("assistant", reply) debug_log("error interaction added to dialogue memory", "memory") except Exception as e: @@ -2108,6 +2166,10 @@ def _extract_text_from_json_response(content: str) -> Optional[str]: # Add user message dialogue_memory.add_message("user", redacted) + # Capture this turn's tool-call + tool-result messages so the next + # reply within the hot window can reuse them instead of re-fetching. + _maybe_record_tool_carryover() + # Add assistant reply if we have one if reply and reply.strip(): dialogue_memory.add_message("assistant", reply.strip()) diff --git a/src/jarvis/reply/reply.spec.md b/src/jarvis/reply/reply.spec.md index 88eaab73..86ef6930 100644 --- a/src/jarvis/reply/reply.spec.md +++ b/src/jarvis/reply/reply.spec.md @@ -30,6 +30,8 @@ Design principles enforced by the engine: 2. Recent Dialogue Context - Include short-term dialogue memory (last 5 minutes) as prior messages. + - The fetch returns not only user/assistant prose but also **tool-call and tool-result messages** from in-loop work in prior replies within the hot window (capped by `cfg.tool_carryover_max_turns` and `cfg.tool_carryover_per_entry_chars`, fence markers of UNTRUSTED WEB EXTRACT blocks preserved on truncation, secrets scrubbed). This lets follow-up turns reuse a prior `webSearch` / MCP result instead of re-fetching it. The carryover is captured at the end of each reply (success or error), and is cleared when the user dismisses with the `stop` tool so the next wake-word turn starts clean. + - A **recall gate** (`src/jarvis/memory/recall_gate.py`, deterministic, no LLM) skips diary / graph / memory-digest enrichment when the hot window already covers the topic (≥50% content-word overlap with a fresh tool-result row). Language-agnostic via `\w{3,}` with `re.UNICODE`. Fail-open on any error. 3. Pre-flight Planner - The task-list planner (`plan_query` in `src/jarvis/reply/planner.py`) runs **first**, before any memory lookup or tool routing. It sees the query, a compact dialogue snippet, and the full builtin + MCP tool catalogue (names + one-line descriptions). diff --git a/src/jarvis/utils/redact.py b/src/jarvis/utils/redact.py index 6820ac1b..1a23f589 100644 --- a/src/jarvis/utils/redact.py +++ b/src/jarvis/utils/redact.py @@ -21,3 +21,15 @@ def redact(text: str, max_len: int = 8000) -> str: if len(scrubbed) > max_len: scrubbed = scrubbed[:max_len] return scrubbed + + +def scrub_secrets(text: str) -> str: + """Apply the structural scrub rules without whitespace collapse or length cap. + + Use for structured content (tool output, multi-line payloads) where + preserving newlines matters but tokens/emails/etc. must still be masked. + """ + scrubbed = text + for pattern, repl in _REDACTION_RULES: + scrubbed = pattern.sub(repl, scrubbed) + return scrubbed diff --git a/tests/test_dialogue_memory_tool_carryover.py b/tests/test_dialogue_memory_tool_carryover.py new file mode 100644 index 00000000..123fc89c --- /dev/null +++ b/tests/test_dialogue_memory_tool_carryover.py @@ -0,0 +1,191 @@ +"""Tests for DialogueMemory tool-message carryover across turns. + +Behaviour under test: within the hot-window (RECENT_WINDOW_SEC), tool-call +and tool-result messages generated during one reply must be retrievable as +part of the next reply's initial messages, so follow-up turns can reuse the +prior tool output instead of re-fetching. +""" + +import time +import pytest + +from src.jarvis.memory.conversation import DialogueMemory + + +@pytest.mark.unit +class TestToolCarryover: + def test_record_tool_turn_stores_messages(self): + dm = DialogueMemory() + dm.add_message("user", "who is justin bieber") + dm.record_tool_turn([ + { + "role": "assistant", + "content": "", + "tool_calls": [ + {"id": "call_1", "type": "function", + "function": {"name": "webSearch", + "arguments": {"query": "justin bieber"}}} + ], + }, + {"role": "tool", "tool_call_id": "call_1", + "content": "Justin Bieber is a Canadian singer..."}, + ]) + dm.add_message("assistant", "He is a Canadian singer.") + + out = dm.get_recent_turns_with_tools() + roles = [m.get("role") for m in out] + # Order: user, assistant-with-tool_calls, tool, assistant + assert roles == ["user", "assistant", "tool", "assistant"] + assert out[1].get("tool_calls") + assert out[2].get("tool_call_id") == "call_1" + assert "Canadian singer" in out[2]["content"] + + def test_carryover_survives_second_add_message(self): + """Tool rows must interleave at the correct timestamps between text messages.""" + dm = DialogueMemory() + dm.add_message("user", "q1") + dm.record_tool_turn([ + {"role": "assistant", "content": "", + "tool_calls": [{"id": "c1", "type": "function", + "function": {"name": "webSearch", + "arguments": {"query": "q1"}}}]}, + {"role": "tool", "tool_call_id": "c1", "content": "r1"}, + ]) + dm.add_message("assistant", "a1") + time.sleep(0.005) + dm.add_message("user", "q2") + + out = dm.get_recent_turns_with_tools() + roles = [m.get("role") for m in out] + assert roles == ["user", "assistant", "tool", "assistant", "user"] + + def test_truncates_large_tool_content(self): + dm = DialogueMemory() + huge = "x" * 5000 + dm.add_message("user", "q") + dm.record_tool_turn([ + {"role": "assistant", "content": "", + "tool_calls": [{"id": "c1", "type": "function", + "function": {"name": "webSearch", + "arguments": {"query": "q"}}}]}, + {"role": "tool", "tool_call_id": "c1", "content": huge}, + ]) + out = dm.get_recent_turns_with_tools(per_entry_chars=1200) + tool_msg = next(m for m in out if m.get("role") == "tool") + assert len(tool_msg["content"]) <= 1201 # 1200 + ellipsis char + + def test_caps_to_max_tool_turns(self): + dm = DialogueMemory() + for i in range(4): + dm.add_message("user", f"q{i}") + dm.record_tool_turn([ + {"role": "assistant", "content": "", + "tool_calls": [{"id": f"c{i}", "type": "function", + "function": {"name": "webSearch", + "arguments": {"q": f"q{i}"}}}]}, + {"role": "tool", "tool_call_id": f"c{i}", "content": f"r{i}"}, + ]) + dm.add_message("assistant", f"a{i}") + + out = dm.get_recent_turns_with_tools(max_tool_turns=2) + tool_contents = [m["content"] for m in out if m.get("role") == "tool"] + # Only the most recent 2 tool turns survive + assert tool_contents == ["r2", "r3"] + + def test_clear_tool_carryover_drops_tool_msgs_only(self): + dm = DialogueMemory() + dm.add_message("user", "q") + dm.record_tool_turn([ + {"role": "assistant", "content": "", + "tool_calls": [{"id": "c1", "type": "function", + "function": {"name": "webSearch", + "arguments": {"q": "x"}}}]}, + {"role": "tool", "tool_call_id": "c1", "content": "r"}, + ]) + dm.add_message("assistant", "a") + + dm.clear_tool_carryover() + + out = dm.get_recent_turns_with_tools() + roles = [m.get("role") for m in out] + # Tool rows gone, but user/assistant prose preserved + assert roles == ["user", "assistant"] + + def test_expired_tool_turns_are_pruned(self): + dm = DialogueMemory(inactivity_timeout=300.0) + dm.add_message("user", "q") + dm.record_tool_turn([ + {"role": "assistant", "content": "", + "tool_calls": [{"id": "c1", "type": "function", + "function": {"name": "webSearch", + "arguments": {"q": "x"}}}]}, + {"role": "tool", "tool_call_id": "c1", "content": "r"}, + ]) + # Force all stored tool-turn timestamps beyond RECENT_WINDOW_SEC + with dm._lock: + dm._tool_turns = [ + (ts - (dm.RECENT_WINDOW_SEC + 10), msgs) + for ts, msgs in dm._tool_turns + ] + + out = dm.get_recent_turns_with_tools() + # No tool rows because they expired + assert not any(m.get("role") == "tool" for m in out) + + def test_tool_payloads_are_scrubbed_of_secrets(self): + """Tool results may contain emails, API tokens, JWTs. record_tool_turn + must scrub those before persisting so follow-up injection can't leak. + """ + dm = DialogueMemory() + dm.add_message("user", "look up the api") + dirty = ( + "Contact: alice@example.com\n" + "Bearer token: eyJhbGciOiJIUzI1NiJ9.abc.def\n" + "Fine content stays." + ) + dm.record_tool_turn([ + {"role": "tool", "tool_call_id": "c1", "content": dirty}, + ]) + stored = dm._tool_turns[0][1][0]["content"] + assert "alice@example.com" not in stored + assert "[REDACTED_EMAIL]" in stored + assert "eyJhbGciOiJIUzI1NiJ9" not in stored + assert "Fine content stays." in stored + + def test_truncation_preserves_untrusted_fence_end_marker(self): + """When a tool result carrying an UNTRUSTED WEB EXTRACT fence is + truncated, the closing marker must be re-appended so the downstream + prompt-injection defence fence stays intact. + """ + dm = DialogueMemory() + dm.add_message("user", "q") + begin = "<<>>" + end = "<<>>" + payload = ( + "Search result:\n" + begin + "\n" + ("x" * 5000) + "\n" + end + ) + dm.record_tool_turn([ + {"role": "tool", "tool_call_id": "c1", "content": payload}, + ]) + out = dm.get_recent_turns_with_tools(per_entry_chars=500) + tool_msg = next(m for m in out if m.get("role") == "tool") + assert begin in tool_msg["content"] + assert end in tool_msg["content"], ( + "closing fence marker must survive truncation" + ) + + def test_get_pending_chunks_excludes_tool_rows(self): + """Tool messages must not pollute the diary summariser input.""" + dm = DialogueMemory() + dm.add_message("user", "q") + dm.record_tool_turn([ + {"role": "tool", "tool_call_id": "c1", + "content": "raw web extract with secrets"}, + ]) + dm.add_message("assistant", "a") + + chunks = dm.get_pending_chunks() + joined = " | ".join(chunks) + assert "raw web extract" not in joined + assert "User: q" in joined + assert "Assistant: a" in joined diff --git a/tests/test_engine_tool_carryover.py b/tests/test_engine_tool_carryover.py new file mode 100644 index 00000000..0a7343e4 --- /dev/null +++ b/tests/test_engine_tool_carryover.py @@ -0,0 +1,227 @@ +"""End-to-end: tool-call + tool-result messages from one reply must be +visible to the LLM on the next reply within the hot window, so the model +can synthesise from prior results rather than re-fetching. +""" + +from unittest.mock import Mock, patch + +import pytest + +from src.jarvis.memory.conversation import DialogueMemory +from src.jarvis.reply.engine import run_reply_engine + + +def _mock_cfg(): + cfg = Mock() + cfg.ollama_base_url = "http://localhost:11434" + cfg.ollama_chat_model = "test-large" # avoid SMALL-model text-tool path + cfg.voice_debug = False + cfg.llm_tools_timeout_sec = 8.0 + cfg.llm_embed_timeout_sec = 10.0 + cfg.llm_chat_timeout_sec = 45.0 + cfg.llm_digest_timeout_sec = 8.0 + cfg.memory_enrichment_max_results = 5 + cfg.memory_enrichment_source = "diary" + cfg.memory_digest_enabled = False + cfg.tool_result_digest_enabled = False + cfg.location_ip_address = None + cfg.location_auto_detect = False + cfg.location_enabled = False + cfg.agentic_max_turns = 8 + cfg.tool_search_max_calls = 3 + cfg.tool_selection_strategy = "all" + cfg.tool_carryover_max_turns = 2 + cfg.tool_carryover_per_entry_chars = 1200 + cfg.mcps = {} + cfg.llm_thinking_enabled = False + cfg.tts_engine = "none" + cfg.ollama_embed_model = "test-embed" + return cfg + + +@pytest.mark.unit +@patch("src.jarvis.reply.engine.plan_query", return_value=[]) +@patch("src.jarvis.reply.engine.extract_search_params_for_memory", return_value={}) +@patch("src.jarvis.reply.engine.run_tool_with_retries") +@patch("src.jarvis.reply.engine.extract_text_from_response") +@patch("src.jarvis.reply.engine.chat_with_messages") +def test_tool_carryover_makes_prior_result_visible_to_next_turn( + mock_chat, mock_extract, mock_tool, _mock_extract, _mock_plan +): + # Turn 1: model emits webSearch call, then final text. + mock_tool.return_value = Mock( + reply_text="Justin Bieber is a Canadian singer.", + error_message=None, + ) + mock_chat.side_effect = [ + # Turn 1a: tool call + {"message": {"content": "", "tool_calls": [{ + "id": "c1", "type": "function", + "function": {"name": "webSearch", + "arguments": {"query": "justin bieber"}}, + }]}}, + # Turn 1b: final reply + {"message": {"content": "He is a Canadian singer."}}, + # Turn 2a: final reply directly — reuse from prior context + {"message": {"content": "His breakout song was Baby."}}, + ] + mock_extract.side_effect = [ + "", + "He is a Canadian singer.", + "His breakout song was Baby.", + ] + + db = Mock() + cfg = _mock_cfg() + dm = DialogueMemory() + + run_reply_engine(db=db, cfg=cfg, tts=None, + text="who is justin bieber", + dialogue_memory=dm) + + # Confirm carryover was recorded + assert len(dm._tool_turns) == 1 + stored = dm._tool_turns[0][1] + stored_roles = [m.get("role") for m in stored] + assert "tool" in stored_roles + assert any(m.get("tool_calls") for m in stored) + + # Turn 2: query on the same topic — the turn-2 LLM call should receive + # the turn-1 tool messages in its `messages` argument. + run_reply_engine(db=db, cfg=cfg, tts=None, + text="what is his most famous song", + dialogue_memory=dm) + + # The third chat_with_messages call is turn-2's only turn (single text). + turn2_kwargs = mock_chat.call_args_list[-1].kwargs + turn2_messages = turn2_kwargs.get("messages") + roles_in_turn2 = [m.get("role") for m in turn2_messages] + assert "tool" in roles_in_turn2, ( + f"Expected prior tool-role message to be injected on turn 2; " + f"got roles={roles_in_turn2}" + ) + # The tool message content must be the prior webSearch result + tool_contents = [ + m.get("content") for m in turn2_messages if m.get("role") == "tool" + ] + assert any("Canadian singer" in (c or "") for c in tool_contents) + + +@pytest.mark.unit +@patch("src.jarvis.reply.engine.plan_query", return_value=[]) +@patch("src.jarvis.reply.engine.extract_search_params_for_memory", return_value={}) +@patch("src.jarvis.reply.engine.run_tool_with_retries") +@patch("src.jarvis.reply.engine.extract_text_from_response") +@patch("src.jarvis.reply.engine.chat_with_messages") +def test_stop_signal_clears_tool_carryover( + mock_chat, mock_extract, mock_tool, _mock_extract, _mock_plan +): + """Turn 1 runs a tool; turn 2 receives the stop signal. After turn 2, + carryover must be empty so the next wake-word turn starts fresh. + """ + from src.jarvis.tools.builtin.stop import STOP_SIGNAL + + mock_tool.side_effect = [ + Mock(reply_text="Justin Bieber is a Canadian singer.", error_message=None), + Mock(reply_text=STOP_SIGNAL, error_message=None), + ] + mock_chat.side_effect = [ + # Turn 1a: tool call + {"message": {"content": "", "tool_calls": [{ + "id": "c1", "type": "function", + "function": {"name": "webSearch", "arguments": {"query": "bieber"}}, + }]}}, + # Turn 1b: final reply + {"message": {"content": "He is a Canadian singer."}}, + # Turn 2: stop tool + {"message": {"content": "", "tool_calls": [{ + "id": "c2", "type": "function", + "function": {"name": "stop", "arguments": {}}, + }]}}, + ] + mock_extract.side_effect = ["", "He is a Canadian singer.", ""] + + db = Mock() + cfg = _mock_cfg() + dm = DialogueMemory() + + run_reply_engine(db=db, cfg=cfg, tts=None, + text="who is justin bieber", dialogue_memory=dm) + assert len(dm._tool_turns) == 1, "turn-1 tool carryover should be recorded" + + reply = run_reply_engine(db=db, cfg=cfg, tts=None, + text="stop", dialogue_memory=dm) + assert reply is None, "stop signal returns None" + assert dm._tool_turns == [], ( + "stop signal must clear carryover so the next wake-word turn is clean" + ) + + +@pytest.mark.unit +@patch("src.jarvis.reply.engine.plan_query", return_value=[]) +@patch("src.jarvis.reply.engine.extract_search_params_for_memory", return_value={}) +@patch("src.jarvis.reply.engine.run_tool_with_retries") +@patch("src.jarvis.reply.engine.extract_text_from_response") +@patch("src.jarvis.reply.engine.chat_with_messages") +def test_tool_carryover_text_tool_mode( + mock_chat, mock_extract, mock_tool, _mock_extract, _mock_plan +): + """Small-model path: tool results come back as role=user with a + ``tool_name`` tag. Carryover must pick those up too. + """ + cfg = _mock_cfg() + cfg.ollama_chat_model = "gemma4:e2b" # triggers SMALL/text-tool path + + mock_tool.return_value = Mock( + reply_text="Paris is the capital of France.", error_message=None, + ) + fence_call = ( + "```tool_call\n" + '{"name": "webSearch", "arguments": {"query": "paris"}}\n' + "```" + ) + mock_chat.side_effect = [ + # Turn 1a: text-tool call emitted inside a markdown fence + {"message": {"content": fence_call}}, + # Turn 1b: final reply + {"message": {"content": "Paris is in France."}}, + # Turn 2: follow-up reply + {"message": {"content": "Its population is about 2.1 million."}}, + ] + mock_extract.side_effect = [ + fence_call, + "Paris is in France.", + "Its population is about 2.1 million.", + ] + + db = Mock() + dm = DialogueMemory() + + run_reply_engine(db=db, cfg=cfg, tts=None, + text="what about paris", dialogue_memory=dm) + + assert len(dm._tool_turns) == 1 + stored = dm._tool_turns[0][1] + roles = [m.get("role") for m in stored] + # Text-tool fallback stores tool results as role=user with tool_name. + assert "user" in roles + assert any(m.get("tool_name") == "webSearch" for m in stored) + + run_reply_engine(db=db, cfg=cfg, tts=None, + text="tell me more", dialogue_memory=dm) + + turn2_messages = mock_chat.call_args_list[-1].kwargs.get("messages") or [] + # The prior tool payload should appear in the turn-2 messages list — + # either as role=tool (native) or role=user with tool_name (text-tool). + tool_like = [ + m for m in turn2_messages + if m.get("role") == "tool" + or (m.get("role") == "user" and m.get("tool_name")) + ] + assert tool_like, ( + f"expected prior text-tool result to be carried over; got roles=" + f"{[m.get('role') for m in turn2_messages]}" + ) + assert any( + "Paris" in (m.get("content") or "") for m in tool_like + ) diff --git a/tests/test_recall_gate.py b/tests/test_recall_gate.py new file mode 100644 index 00000000..76241512 --- /dev/null +++ b/tests/test_recall_gate.py @@ -0,0 +1,75 @@ +"""Tests for recall_gate.should_recall — cheap heuristic for skipping +long-term memory enrichment when the hot-window already covers the topic. +""" + +import pytest + +from src.jarvis.memory.recall_gate import should_recall + + +@pytest.mark.unit +class TestShouldRecall: + def test_empty_hot_window_always_recalls(self): + assert should_recall("who is justin bieber", []) is True + + def test_no_tool_result_in_history_always_recalls(self): + recent = [ + {"role": "user", "content": "who is justin bieber"}, + {"role": "assistant", "content": "He is a Canadian singer."}, + ] + # No tool row → no fresh grounded data → recall (diary may know more) + assert should_recall("what is his most famous song", recent) is True + + def test_tool_covered_topic_skips_recall(self): + recent = [ + {"role": "user", "content": "who is justin bieber"}, + {"role": "assistant", "content": "", "tool_calls": [ + {"id": "c1", "type": "function", + "function": {"name": "webSearch", + "arguments": {"query": "justin bieber"}}} + ]}, + {"role": "tool", "tool_call_id": "c1", + "content": "Justin Bieber is a Canadian singer with hits like Baby."}, + {"role": "assistant", "content": "Canadian singer."}, + ] + # Follow-up on same entity, fresh tool row present → skip + assert should_recall("what is his most famous song bieber hits", + recent) is False + + def test_topic_change_still_recalls(self): + recent = [ + {"role": "user", "content": "who is justin bieber"}, + {"role": "tool", "tool_call_id": "c1", + "content": "Justin Bieber is a Canadian singer."}, + {"role": "assistant", "content": "Canadian singer."}, + ] + # Completely different topic → no overlap → recall runs + assert should_recall("what's the weather in hackney", recent) is True + + def test_non_latin_script_query_matches_hot_window(self): + """Per CLAUDE.md the gate must be language-agnostic. A Cyrillic query + covered by a Cyrillic tool result should skip recall just like English. + """ + recent = [ + {"role": "user", "content": "кто такой пушкин"}, + {"role": "tool", "tool_call_id": "c1", + "content": "Пушкин это русский поэт девятнадцатого века."}, + {"role": "assistant", "content": "Русский поэт."}, + ] + assert should_recall("пушкин русский поэт стихи", recent) is False + + def test_non_latin_topic_change_still_recalls(self): + recent = [ + {"role": "user", "content": "кто такой пушкин"}, + {"role": "tool", "tool_call_id": "c1", + "content": "Пушкин это русский поэт."}, + ] + # Different topic in the same script → no overlap → recall + assert should_recall("какая сегодня погода", recent) is True + + def test_stopword_only_query_does_not_skip(self): + recent = [ + {"role": "tool", "tool_call_id": "c1", "content": "foo bar"}, + ] + # "what is it" has no content words → cannot justify skipping + assert should_recall("what is it", recent) is True From 0bea96b0a3a528af93da818b4099980d055380b4 Mon Sep 17 00:00:00 2001 From: Baris Sencan Date: Sun, 26 Apr 2026 18:12:47 +0100 Subject: [PATCH 2/5] perf(reply): hot-window scratch cache + bound carryover + scrub keys Layer three optimisations on top of the carryover + recall gate so the warm-profile, extractor, and router paths each pay at most once per hot window, and tighten privacy + bounds along the way. - DialogueMemory.hot_cache_get/put/clear: per-conversation scratch cache bounded by RECENT_WINDOW_SEC, used by the engine to memoise: - warm profile block (query-agnostic SQLite traversal) - memory enrichment extractor LLM call - tool router LLM call (catalogue signature in key invalidates on mid-window MCP refresh) Cleared on the stop signal alongside tool carryover; auto-expires with the hot window. - Recall gate is now bypassed when the planner explicitly emits a searchMemory step. Planner intent always wins over coverage heuristics (C1 fix, was silently dropping memory work the planner asked for). - record_tool_turn scrubs outside the lock and bounds storage to _tool_turns_max_storage = 16 as a backstop against unbounded growth in pathological sessions. - Extended redact() vendor and keyword patterns: AWS AKIA/ASIA, Stripe sk/pk/rk_(live|test)_, GitHub PATs, OpenAI sk-, Google AIza, Authorization Bearer/Basic, plus refresh/access/id/oauth tokens and session ids. Closes the gap where carryover payloads or recall-gate debug logs could leak structurally-detectable credentials. - Adds recall_gate.spec.md (scope, heuristic, language-agnostic design, privacy, fail-open) and registers it in CLAUDE.md. - Tests: tests/test_dialogue_memory_hot_cache.py (cache primitives, is_tool_message, _tool_turns cap), tests/test_redact_extended.py (vendor + Authorization + keyword patterns), tests/test_engine_hot_window_caches.py (warm profile / extractor / router cache hits across two turns; planner overrides recall gate). - Updates docs/llm_contexts.md and reply.spec.md to document the cache primitive, planner precedence over the gate, and new redact coverage. Co-Authored-By: Claude Opus 4.7 --- CLAUDE.md | 1 + docs/llm_contexts.md | 5 +- src/jarvis/memory/conversation.py | 98 +++++++++-- src/jarvis/memory/recall_gate.py | 12 +- src/jarvis/memory/recall_gate.spec.md | 48 ++++++ src/jarvis/reply/engine.py | 146 ++++++++++++----- src/jarvis/reply/reply.spec.md | 7 +- src/jarvis/utils/redact.py | 24 ++- tests/test_dialogue_memory_hot_cache.py | 93 +++++++++++ tests/test_engine_hot_window_caches.py | 208 ++++++++++++++++++++++++ tests/test_redact_extended.py | 86 ++++++++++ 11 files changed, 659 insertions(+), 69 deletions(-) create mode 100644 src/jarvis/memory/recall_gate.spec.md create mode 100644 tests/test_dialogue_memory_hot_cache.py create mode 100644 tests/test_engine_hot_window_caches.py create mode 100644 tests/test_redact_extended.py diff --git a/CLAUDE.md b/CLAUDE.md index 874cd3e2..0419c55a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -24,6 +24,7 @@ Any code change must either adhere to our spec files perfectly or you should ask | `src/jarvis/utils/location.spec.md` | GeoIP location detection | Privacy-first; local GeoLite2 DB only | | `src/jarvis/memory/graph.spec.md` | Node graph memory (v2), self-organising tree, UI explorer | Dynamic structure; access-aware; auto-split/merge (future) | | `src/jarvis/memory/summariser.spec.md` | Diary summariser prompt contract and hygiene rules (deflection, attribution, topic separation) | Summariser is the source; corrupted summaries poison every downstream consumer | +| `src/jarvis/memory/recall_gate.spec.md` | Deterministic skip-enrichment heuristic when the hot window covers a follow-up | Fail-open; language-agnostic via `\w{3,}` + `re.UNICODE`; planner intent always wins | The LLM contexts graph at `docs/llm_contexts.md` maps every LLM call in the app (model, gating, inputs, outputs, limits, flow). Keep it up-to-date at all times: any change that adds, removes, or alters an LLM context (model resolution, timeout, cap, prompt source, gating flag, data-flow edge) must update `docs/llm_contexts.md` in the same PR. diff --git a/docs/llm_contexts.md b/docs/llm_contexts.md index 10a41afb..fa71ef30 100644 --- a/docs/llm_contexts.md +++ b/docs/llm_contexts.md @@ -13,7 +13,7 @@ Every distinct LLM call in Jarvis, what feeds it, what consumes it, and how it i - Redacted user query - Recent dialogue (last 5 minutes), including in-loop tool-call + tool-role messages from prior replies within the hot window (tool carryover — `DialogueMemory.record_tool_turn` / `get_recent_turns_with_tools` in [src/jarvis/memory/conversation.py](src/jarvis/memory/conversation.py); capped by `cfg.tool_carryover_max_turns` / `tool_carryover_per_entry_chars`; cleared on `stop` signal; UNTRUSTED WEB EXTRACT fence markers preserved on truncation) - Unified system prompt from [src/jarvis/system_prompt.py](src/jarvis/system_prompt.py) + ASR note + tool-protocol guidance - - **Warm profile block** (query-agnostic User + Directives excerpt from the knowledge graph, composed by `build_warm_profile()` / `format_warm_profile_block()` in [src/jarvis/memory/graph_ops.py](src/jarvis/memory/graph_ops.py) at Step 3.5 of `reply()`; no LLM call, pure SQLite read; injected unconditionally so personalisation is the default) + - **Warm profile block** (query-agnostic User + Directives excerpt from the knowledge graph, composed by `build_warm_profile()` / `format_warm_profile_block()` in [src/jarvis/memory/graph_ops.py](src/jarvis/memory/graph_ops.py) at Step 3.5 of `reply()`; no LLM call, pure SQLite read; injected unconditionally so personalisation is the default; result cached in `DialogueMemory._hot_cache` under key `warm_profile_block` so follow-up turns within one hot window skip the BFS — cleared on `stop` and hot-window expiry) - Digested memory enrichment (optional, see #4) - Time + location context (re-injected each turn) - Tool schema: native via `generate_tools_json_schema()` ([src/jarvis/tools/registry.py](src/jarvis/tools/registry.py)) or text fallback via `_text_tool_call_guidance()` ([engine.py:68](src/jarvis/reply/engine.py:68)) @@ -44,6 +44,7 @@ Every distinct LLM call in Jarvis, what feeds it, what consumes it, and how it i - **System prompt**: inline at [enrichment.py:35-63](src/jarvis/reply/enrichment.py:35). - **Output**: `{keywords, from?, to?, questions?}`. Consumed by memory search in the reply engine. - **Limits**: up to 2 retries; timeout from `llm_tools_timeout_sec`. +- **Caching**: result cached in `DialogueMemory._hot_cache` under key `enrichment:{redacted_query[+topic_hint]}`. Identical follow-ups within one hot window reuse the dict and skip the LLM hop. Cleared on `clear_hot_cache()` (e.g. `stop` signal) and on hot-window expiry. ## 3b. Recall Gate (pre-enrichment short-circuit) @@ -52,6 +53,7 @@ Every distinct LLM call in Jarvis, what feeds it, what consumes it, and how it i - **Model / gating**: NO LLM — deterministic keyword-coverage heuristic. Cheap. - **Inputs**: query, recent dialogue (incl. tool carryover rows). - **Output**: `False` only if hot-window contains a fresh tool result AND ≥50% of the query's content words appear in the hot-window transcript → skips diary, graph, and memory digest for this reply. Else `True`. Fail-open on any exception. Content-word extraction uses `\w{3,}` with `re.UNICODE`, so the gate works for Latin, Cyrillic, CJK, Arabic, Hebrew, etc. (per CLAUDE.md "no hardcoded language patterns"). Overlap words are run through `redact()` before being written to debug logs. +- **Planner precedence**: when the planner explicitly emitted a `searchMemory` step, the gate is bypassed — the planner has more signal than coverage and overriding it would silently drop intent. The gate only short-circuits the fail-open empty-plan path. - **Rationale**: prevents re-running diary/graph lookups when the hot window already grounds the follow-up (e.g. "his most famous song" after a Bieber webSearch). ## 4. Memory Digest (optional, SMALL models) @@ -93,6 +95,7 @@ Every distinct LLM call in Jarvis, what feeds it, what consumes it, and how it i - **System prompt**: inline (~lines 260-315). Teaches pick up-to-5 tools or `none`. - **Output**: comma-separated tool names or `none`. Capped at `_LLM_MAX_SELECTED` (5). Always-included tools (`stop`, `toolSearchTool`) are unioned in regardless. - **Limits**: `llm_timeout_sec`. On failure → all tools. +- **Caching**: `routed_tools` cached in `DialogueMemory._hot_cache` under key `router:{redacted_query}|{strategy}|{builtin-names}|{mcp-names}`. The catalogue signature lets a mid-window MCP refresh invalidate the cache; `context_hint` is intentionally excluded so time/location drift inside one hot window doesn't bust it. Cleared on `stop` signal and hot-window expiry. ## 8. Tool Searcher (mid-loop escape hatch) diff --git a/src/jarvis/memory/conversation.py b/src/jarvis/memory/conversation.py index 473479d4..c97eae9a 100644 --- a/src/jarvis/memory/conversation.py +++ b/src/jarvis/memory/conversation.py @@ -15,6 +15,27 @@ _UNTRUSTED_FENCE_END = "<<>>" +def is_tool_message(msg: dict) -> bool: + """True if a message is a tool-call request or a tool-result. + + Covers both protocols Jarvis speaks: + - Native: ``role="tool"`` for results, or ``role="assistant"`` carrying + a non-empty ``tool_calls`` list for the outbound call. + - Text-tool fallback (small models): the tool result is appended as a + ``role="user"`` message tagged with ``tool_name``. + """ + if not isinstance(msg, dict): + return False + role = msg.get("role") + if role == "tool": + return True + if role == "assistant" and msg.get("tool_calls"): + return True + if role == "user" and msg.get("tool_name"): + return True + return False + + def _filter_contexts_by_time( contexts: List[str], from_time: Optional[str], @@ -96,6 +117,16 @@ def __init__(self, inactivity_timeout: float = 300.0, max_interactions: int = 20 # tool-related messages. Excluded from `get_pending_chunks` so raw tool # payloads never reach the diary summariser. self._tool_turns: List[Tuple[float, List[dict]]] = [] + # Hot-window scratch cache: per-key (timestamp, value) entries that + # auto-expire with the conversation. Lets the reply engine memoise + # idempotent per-turn work (warm profile, memory enrichment params, + # tool router output) within a single hot window without leaking + # state across conversations. + self._hot_cache: dict[str, Tuple[float, object]] = {} + # Hard ceiling on stored tool turns to bound memory in pathological + # sessions. The read path further filters by max_tool_turns and + # window expiry; this cap is a backstop against unbounded growth. + self._tool_turns_max_storage = 16 self._last_activity_time: float = time.time() self._inactivity_timeout = inactivity_timeout # Unified window: context retention = forced diary update interval @@ -147,25 +178,70 @@ def record_tool_turn(self, tool_msgs: List[dict]) -> None: """ if not tool_msgs: return + # Scrub outside the lock — pure function over message content. + scrubbed: List[dict] = [] + for m in tool_msgs: + mm = dict(m) + c = mm.get("content") + if isinstance(c, str) and c: + # Tool outputs may contain PII or secrets (email bodies, + # API responses, scraped pages). Scrub before persisting + # so re-injection on the next turn can't leak them. + mm["content"] = scrub_secrets(c) + scrubbed.append(mm) + ts = time.time() with self._lock: - ts = time.time() - scrubbed: List[dict] = [] - for m in tool_msgs: - mm = dict(m) - c = mm.get("content") - if isinstance(c, str) and c: - # Tool outputs may contain PII or secrets (email bodies, - # API responses, scraped pages). Scrub before persisting - # so re-injection on the next turn can't leak them. - mm["content"] = scrub_secrets(c) - scrubbed.append(mm) self._tool_turns.append((ts, scrubbed)) + # Bound storage: drop entries older than the window, then cap + # to the most recent `_tool_turns_max_storage`. Backstop against + # unbounded growth in long sessions. + cutoff = ts - self.RECENT_WINDOW_SEC + self._tool_turns = [ + (t, m) for t, m in self._tool_turns if t >= cutoff + ] + if len(self._tool_turns) > self._tool_turns_max_storage: + self._tool_turns = self._tool_turns[-self._tool_turns_max_storage:] def clear_tool_carryover(self) -> None: """Drop all stored tool-turn messages. Text messages are untouched.""" with self._lock: self._tool_turns = [] + # ------------------------------------------------------------------ + # Hot-window cache + # ------------------------------------------------------------------ + # Small primitive used by the reply engine to memoise expensive + # per-turn work that's idempotent within a single conversation: warm + # profile (SQLite reads), memory enrichment extractor (LLM call), + # tool router (LLM call). Entries auto-expire with RECENT_WINDOW_SEC + # and are wiped on `clear_hot_cache()` (e.g. on the stop signal). + # + # Callers pick a key that captures the invalidation contract — + # typically the redacted query for query-dependent values, or a + # constant for query-agnostic values. + + def hot_cache_get(self, key: str) -> Optional[object]: + """Return cached value if present and within the hot window.""" + with self._lock: + entry = self._hot_cache.get(key) + if not entry: + return None + ts, value = entry + if ts < time.time() - self.RECENT_WINDOW_SEC: + self._hot_cache.pop(key, None) + return None + return value + + def hot_cache_put(self, key: str, value: object) -> None: + """Store value under key with current timestamp.""" + with self._lock: + self._hot_cache[key] = (time.time(), value) + + def clear_hot_cache(self) -> None: + """Drop all hot-window cache entries.""" + with self._lock: + self._hot_cache = {} + def get_recent_turns_with_tools( self, max_tool_turns: int = 2, diff --git a/src/jarvis/memory/recall_gate.py b/src/jarvis/memory/recall_gate.py index 6aaa6a60..8413f134 100644 --- a/src/jarvis/memory/recall_gate.py +++ b/src/jarvis/memory/recall_gate.py @@ -41,16 +41,8 @@ def _content_words(text: str) -> set[str]: def _has_fresh_tool_result(recent_messages: List[dict]) -> bool: - for m in recent_messages: - role = m.get("role") - if role == "tool": - return True - if role == "assistant" and m.get("tool_calls"): - return True - # Text-tool fallback format: role=user carrying a tool_name tag. - if role == "user" and m.get("tool_name"): - return True - return False + from .conversation import is_tool_message + return any(is_tool_message(m) for m in recent_messages) def should_recall( diff --git a/src/jarvis/memory/recall_gate.spec.md b/src/jarvis/memory/recall_gate.spec.md new file mode 100644 index 00000000..b0cfe1d4 --- /dev/null +++ b/src/jarvis/memory/recall_gate.spec.md @@ -0,0 +1,48 @@ +# Recall Gate + +A deterministic, no-LLM heuristic that lets the reply engine skip diary, graph and memory-digest enrichment when the hot window already grounds the user's follow-up. + +The gate is a cheap pre-flight check, not a routing decision. It either tells the engine "keep going as planned" (recall) or "the hot window has this covered, you can short-circuit enrichment" (skip). + +## Scope + +- File: `src/jarvis/memory/recall_gate.py`. +- Caller: `run_reply_engine` in `src/jarvis/reply/engine.py`, between the planner's `needs_memory` decision and the diary/graph search. +- Inputs: the redacted user query, the recent dialogue messages (already including tool-carryover rows from prior replies in the hot window). +- Output: `True` to recall, `False` to skip. + +## When the gate runs + +The gate runs only when: + +1. The planner did **not** explicitly emit a `searchMemory` step. An explicit planner intent always wins; the gate does not second-guess it. +2. There is at least one recent message in the hot window. + +When the planner returned an empty plan (fail-open), the gate is allowed to short-circuit. When the planner returned a concrete plan that doesn't include `searchMemory`, the engine is already skipping enrichment, so the gate is a no-op. + +## Heuristic + +The gate returns `False` (skip enrichment) only if both hold: + +1. The hot window contains at least one tool-related message — i.e. an entry for which `is_tool_message()` returns true. This is the freshness signal: a tool was already invoked in this conversation, so grounded data is sitting in the messages array. +2. The query's content words have ≥ 50% overlap with the words in the hot-window transcript. Coverage is asymmetric (`|overlap| / |query_words|`), not Jaccard — long histories shouldn't penalise a short follow-up. + +Anything else returns `True`. On any exception the gate fails open with `True`. + +## Language-agnostic by construction + +Per the project's no-hardcoded-language-patterns rule, content-word extraction uses `re.findall(r"\w{3,}", text, flags=re.UNICODE)`. The unicode flag makes `\w` match Cyrillic, CJK, Arabic, Hebrew, etc. + +A small English stopword list (`is`, `the`, `what`, etc.) filters function words before scoring. Non-English queries simply skip stopword filtering — the worst case is a slightly more conservative (i.e. more recall-prone) decision, which is the safe direction for a fail-open gate. Adding language-specific stopword lists is out of scope; the heuristic is intentionally conservative and the cost of recalling unnecessarily is one extractor LLM call, not user-visible failure. + +## Privacy + +The overlap words can include user-supplied query terms. Before they reach `debug_log`, they are passed through `redact()` so emails, JWTs, and other structurally-detectable secrets in the query don't leak into logs. The gate does not store anything itself. + +## Why not have the planner do this? + +The planner is an LLM call and runs once per turn regardless. Adding "is the hot window enough?" to its prompt would make every planner call slower and more brittle. The gate is a 1 ms pure-Python pass that only fires after the planner has decided memory might be useful, so it's strictly additive and trivially removable. + +## Failure mode + +`should_recall()` returns `True` on every exception path. The gate cannot make a turn worse by failing — at most it stops being an optimisation. diff --git a/src/jarvis/reply/engine.py b/src/jarvis/reply/engine.py index d089dbe9..35da6432 100644 --- a/src/jarvis/reply/engine.py +++ b/src/jarvis/reply/engine.py @@ -776,18 +776,40 @@ def run_reply_engine(db: "Database", cfg, tts: Optional[Any], strategy = ToolSelectionStrategy(getattr(cfg, "tool_selection_strategy", "llm")) except ValueError: strategy = ToolSelectionStrategy.LLM - routed_tools = select_tools( - query=redacted, - builtin_tools=BUILTIN_TOOLS, - mcp_tools=mcp_tools, - strategy=strategy, - llm_base_url=cfg.ollama_base_url, - llm_model=resolve_tool_router_model(cfg), - llm_timeout_sec=float(getattr(cfg, "llm_tools_timeout_sec", 8.0)), - embed_model=getattr(cfg, "ollama_embed_model", "nomic-embed-text"), - embed_timeout_sec=float(getattr(cfg, "llm_embed_timeout_sec", 10.0)), - context_hint=context_hint, + # Hot-window cache: router output for the same redacted query and + # tool catalogue is reused within one conversation. Catalogue + # signature includes builtin + MCP tool names so a mid-window MCP + # refresh invalidates the cache. context_hint is intentionally not + # part of the key — time/location drift inside one hot window + # rarely changes the tool pick. + _router_cache_key = ( + f"router:{redacted}|" + f"{strategy.value}|" + f"{','.join(sorted(BUILTIN_TOOLS.keys()))}|" + f"{','.join(sorted((mcp_tools or {}).keys()))}" ) + _cached_routed = ( + dialogue_memory.hot_cache_get(_router_cache_key) + if dialogue_memory and hasattr(dialogue_memory, "hot_cache_get") else None + ) + if isinstance(_cached_routed, list): + routed_tools = list(_cached_routed) + debug_log("tool router served from hot-window cache", "planning") + else: + routed_tools = select_tools( + query=redacted, + builtin_tools=BUILTIN_TOOLS, + mcp_tools=mcp_tools, + strategy=strategy, + llm_base_url=cfg.ollama_base_url, + llm_model=resolve_tool_router_model(cfg), + llm_timeout_sec=float(getattr(cfg, "llm_tools_timeout_sec", 8.0)), + embed_model=getattr(cfg, "ollama_embed_model", "nomic-embed-text"), + embed_timeout_sec=float(getattr(cfg, "llm_embed_timeout_sec", 10.0)), + context_hint=context_hint, + ) + if dialogue_memory and hasattr(dialogue_memory, "hot_cache_put"): + dialogue_memory.hot_cache_put(_router_cache_key, list(routed_tools or [])) _planner_schema = generate_tools_json_schema(routed_tools, mcp_tools) _planner_tool_catalog: list[tuple[str, str]] = [] for _schema in (_planner_schema or []): @@ -825,12 +847,18 @@ def run_reply_engine(db: "Database", cfg, tts: Optional[Any], # - Plan with `searchMemory` directive → run memory enrichment. # - Plan without it → skip memory work entirely (no keyword LLM, # no diary search, no graph search, no digest LLM). - needs_memory = (not action_plan) or plan_requires_memory(action_plan) + plan_demands_memory = bool(action_plan) and plan_requires_memory(action_plan) + needs_memory = (not action_plan) or plan_demands_memory # Recall gate: if the hot-window already carries a fresh tool result # covering the query topic, skip diary/graph enrichment for this turn. # Cheap deterministic heuristic, no LLM. Fail-open on any error. - if needs_memory and recent_messages: + # + # Skip the gate when the planner explicitly emitted `searchMemory` — + # the planner has more signal than coverage heuristics, and overriding + # it would silently drop intent. The gate only short-circuits the + # fail-open empty-plan path. + if needs_memory and not plan_demands_memory and recent_messages: try: from ..memory.recall_gate import should_recall if not should_recall(redacted, recent_messages): @@ -864,26 +892,40 @@ def run_reply_engine(db: "Database", cfg, tts: Optional[Any], # interest me" can be answered directly when the model already sees # the user's interests in its system prompt. warm_profile_block = "" - try: - from ..memory.graph import GraphMemoryStore - from ..memory.graph_ops import build_warm_profile, format_warm_profile_block - _graph_store_warm = GraphMemoryStore(cfg.db_path) - _warm_profile = build_warm_profile(_graph_store_warm) - warm_profile_block = format_warm_profile_block(_warm_profile) - if warm_profile_block: - _user_len = len(_warm_profile.get("user", "")) - _dir_len = len(_warm_profile.get("directives", "")) - print( - f" 🪴 Warm profile: {_user_len} user chars, " - f"{_dir_len} directive chars", - flush=True, - ) - debug_log( - f"warm profile loaded: user={_user_len} directives={_dir_len}", - "memory", - ) - except Exception as e: - debug_log(f"warm profile load failed (non-fatal): {e}", "memory") + # Hot-window cache: warm profile is query-agnostic and the User / + # Directives branches change rarely, so reusing the block within a + # single conversation saves the SQLite BFS on every follow-up turn. + # Cache is wiped on stop signal and on hot-window expiry. + _wp_cached = ( + dialogue_memory.hot_cache_get("warm_profile_block") + if dialogue_memory and hasattr(dialogue_memory, "hot_cache_get") else None + ) + if isinstance(_wp_cached, str): + warm_profile_block = _wp_cached + debug_log("warm profile served from hot-window cache", "memory") + else: + try: + from ..memory.graph import GraphMemoryStore + from ..memory.graph_ops import build_warm_profile, format_warm_profile_block + _graph_store_warm = GraphMemoryStore(cfg.db_path) + _warm_profile = build_warm_profile(_graph_store_warm) + warm_profile_block = format_warm_profile_block(_warm_profile) + if warm_profile_block: + _user_len = len(_warm_profile.get("user", "")) + _dir_len = len(_warm_profile.get("directives", "")) + print( + f" 🪴 Warm profile: {_user_len} user chars, " + f"{_dir_len} directive chars", + flush=True, + ) + debug_log( + f"warm profile loaded: user={_user_len} directives={_dir_len}", + "memory", + ) + if dialogue_memory and hasattr(dialogue_memory, "hot_cache_put"): + dialogue_memory.hot_cache_put("warm_profile_block", warm_profile_block) + except Exception as e: + debug_log(f"warm profile load failed (non-fatal): {e}", "memory") # Step 4: Memory enrichment — controlled by cfg.memory_enrichment_source # "all" = diary + graph, "diary" = diary only, "graph" = graph only @@ -916,12 +958,27 @@ def run_reply_engine(db: "Database", cfg, tts: Optional[Any], # keyword selection tracks what the planner actually # wanted to look up, not just the surface utterance. _extractor_query = f"{redacted}\n[Memory topic: {_memory_topic_hint}]" - search_params = extract_search_params_for_memory( - _extractor_query, cfg.ollama_base_url, resolve_tool_router_model(cfg), - timeout_sec=float(getattr(cfg, 'llm_tools_timeout_sec', 8.0)), - thinking=getattr(cfg, 'llm_thinking_enabled', False), - context_hint=context_hint, + # Hot-window cache: extractor output is a pure function of + # the (query, topic-hint) pair, so identical follow-ups within + # one conversation reuse the keywords/questions/from/to dict + # and skip the LLM call entirely. + _extractor_cache_key = f"enrichment:{_extractor_query}" + _cached_params = ( + dialogue_memory.hot_cache_get(_extractor_cache_key) + if dialogue_memory and hasattr(dialogue_memory, "hot_cache_get") else None ) + if isinstance(_cached_params, dict): + search_params = _cached_params + debug_log("memory extractor served from hot-window cache", "memory") + else: + search_params = extract_search_params_for_memory( + _extractor_query, cfg.ollama_base_url, resolve_tool_router_model(cfg), + timeout_sec=float(getattr(cfg, 'llm_tools_timeout_sec', 8.0)), + thinking=getattr(cfg, 'llm_thinking_enabled', False), + context_hint=context_hint, + ) + if dialogue_memory and hasattr(dialogue_memory, "hot_cache_put"): + dialogue_memory.hot_cache_put(_extractor_cache_key, search_params) keywords = search_params.get('keywords', []) questions = search_params.get('questions', []) if keywords: @@ -1304,13 +1361,9 @@ def _maybe_record_tool_carryover() -> None: if not dialogue_memory or not hasattr(dialogue_memory, "record_tool_turn"): return try: + from ..memory.conversation import is_tool_message tool_msgs = [ - m for m in messages[user_msg_index + 1:] - if ( - m.get("role") == "tool" - or (m.get("role") == "assistant" and m.get("tool_calls")) - or (m.get("role") == "user" and m.get("tool_name")) - ) + m for m in messages[user_msg_index + 1:] if is_tool_message(m) ] if tool_msgs: dialogue_memory.record_tool_turn(tool_msgs) @@ -1887,6 +1940,11 @@ def _extract_text_from_json_response(content: str) -> Optional[str]: dialogue_memory.clear_tool_carryover() except Exception: pass + if dialogue_memory and hasattr(dialogue_memory, "clear_hot_cache"): + try: + dialogue_memory.clear_hot_cache() + except Exception: + pass # Return None to signal no response should be generated # Don't add to dialogue memory - this is a dismissal, not a conversation diff --git a/src/jarvis/reply/reply.spec.md b/src/jarvis/reply/reply.spec.md index 86ef6930..e6e0d5d4 100644 --- a/src/jarvis/reply/reply.spec.md +++ b/src/jarvis/reply/reply.spec.md @@ -31,7 +31,12 @@ Design principles enforced by the engine: 2. Recent Dialogue Context - Include short-term dialogue memory (last 5 minutes) as prior messages. - The fetch returns not only user/assistant prose but also **tool-call and tool-result messages** from in-loop work in prior replies within the hot window (capped by `cfg.tool_carryover_max_turns` and `cfg.tool_carryover_per_entry_chars`, fence markers of UNTRUSTED WEB EXTRACT blocks preserved on truncation, secrets scrubbed). This lets follow-up turns reuse a prior `webSearch` / MCP result instead of re-fetching it. The carryover is captured at the end of each reply (success or error), and is cleared when the user dismisses with the `stop` tool so the next wake-word turn starts clean. - - A **recall gate** (`src/jarvis/memory/recall_gate.py`, deterministic, no LLM) skips diary / graph / memory-digest enrichment when the hot window already covers the topic (≥50% content-word overlap with a fresh tool-result row). Language-agnostic via `\w{3,}` with `re.UNICODE`. Fail-open on any error. + - A **recall gate** (`src/jarvis/memory/recall_gate.py`, deterministic, no LLM) skips diary / graph / memory-digest enrichment when the hot window already covers the topic (≥50% content-word overlap with a fresh tool-result row). Language-agnostic via `\w{3,}` with `re.UNICODE`. Fail-open on any error. The gate is bypassed when the planner explicitly emitted a `searchMemory` step — planner intent always wins over coverage heuristics. See `src/jarvis/memory/recall_gate.spec.md`. + - **Hot-window scratch cache** (`DialogueMemory.hot_cache_get` / `hot_cache_put`): per-conversation, time-bounded (`RECENT_WINDOW_SEC`) cache used by the engine to memoise three idempotent per-turn computations within a single hot window: + - **Warm profile** (`warm_profile_block` key, query-agnostic): skips the SQLite traversal of the User + Directives branches on every follow-up turn. + - **Memory enrichment extractor** (`enrichment:{redacted_query[+topic_hint]}` key): skips the small-model LLM call that derives keywords / questions / time bounds when an identical query repeats. + - **Tool router** (`router:{redacted_query}|{strategy}|{builtin-names}|{mcp-names}` key): skips the router LLM call when the query and tool catalogue match. The catalogue signature lets a mid-window MCP refresh invalidate the cache. + - Cleared on the `stop` signal alongside tool carryover, and entries auto-expire with the hot window. 3. Pre-flight Planner - The task-list planner (`plan_query` in `src/jarvis/reply/planner.py`) runs **first**, before any memory lookup or tool routing. It sees the query, a compact dialogue snippet, and the full builtin + MCP tool catalogue (names + one-line descriptions). diff --git a/src/jarvis/utils/redact.py b/src/jarvis/utils/redact.py index 1a23f589..15d7d088 100644 --- a/src/jarvis/utils/redact.py +++ b/src/jarvis/utils/redact.py @@ -1,13 +1,33 @@ from __future__ import annotations import re -# Deterministic structural scrub patterns +# Deterministic structural scrub patterns. Order matters: specific +# vendor-shaped tokens are matched before generic catches so the more +# informative label wins (e.g. "[REDACTED_AWS_KEY]" beats "[REDACTED_HEX]"). _REDACTION_RULES: list[tuple[re.Pattern[str], str]] = [ (re.compile(r"[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}", re.IGNORECASE), "[REDACTED_EMAIL]"), (re.compile(r"\b(?:\d[ -]*?){13,19}\b"), "[REDACTED_CARD]"), + # Vendor-specific access keys (bare, no surrounding keyword required). + (re.compile(r"\b(?:AKIA|ASIA)[0-9A-Z]{16}\b"), "[REDACTED_AWS_KEY]"), + (re.compile(r"\b(?:sk|pk|rk)_(?:live|test)_[A-Za-z0-9]{16,}\b"), "[REDACTED_STRIPE_KEY]"), + (re.compile(r"\bgh[pousr]_[A-Za-z0-9]{36,}\b"), "[REDACTED_GH_TOKEN]"), + (re.compile(r"\bsk-[A-Za-z0-9]{32,}\b"), "[REDACTED_OPENAI_KEY]"), + (re.compile(r"\bAIza[0-9A-Za-z_\-]{35}\b"), "[REDACTED_GOOG_KEY]"), + # Authorisation headers — Bearer/Basic carry credentials in line. + (re.compile(r"Authorization:\s*Bearer\s+\S+", re.IGNORECASE), "Authorization: Bearer [REDACTED]"), + (re.compile(r"Authorization:\s*Basic\s+[A-Za-z0-9+/=]+", re.IGNORECASE), "Authorization: Basic [REDACTED]"), + # Generic prefix catch — left after the vendor-specific rules so + # newer formats like gh[pousr]_ get a precise label first. (re.compile(r"\b(AWS|GH|GCP|AZURE|xox[abpcr]-)[A-Za-z0-9_\-]{10,}\b", re.IGNORECASE), "[REDACTED_TOKEN]"), (re.compile(r"\b(?:eyJ[0-9A-Za-z._\-]+)\b"), "[REDACTED_JWT]"), - (re.compile(r"\b(pass(word)?|secret|token|apikey|api_key)\s*[:=]\s*\S+\b", re.IGNORECASE), r"\1=[REDACTED]"), + # Keyword-anchored credentials. Covers refresh/access/oauth/session + # variants in addition to the original pass/secret/token/apikey set. + (re.compile( + r"\b(pass(?:word)?|secret|token|apikey|api_key|" + r"(?:refresh|access|id|oauth)_?token|session(?:_?id)?|sid)" + r"\s*[:=]\s*\S+\b", + re.IGNORECASE, + ), r"\1=[REDACTED]"), (re.compile(r"\b[0-9A-Fa-f]{32,}\b"), "[REDACTED_HEX]"), (re.compile(r"\b\d{6}\b(?=.*(otp|2fa|code))", re.IGNORECASE), "[REDACTED_OTP]"), ] diff --git a/tests/test_dialogue_memory_hot_cache.py b/tests/test_dialogue_memory_hot_cache.py new file mode 100644 index 00000000..82b83ee5 --- /dev/null +++ b/tests/test_dialogue_memory_hot_cache.py @@ -0,0 +1,93 @@ +"""Tests for the DialogueMemory hot-window scratch cache and the +``is_tool_message`` helper. + +The cache is a thin per-conversation primitive used by the reply engine to +memoise idempotent per-turn work (warm profile, memory extractor, tool +router) within a single hot window. Entries auto-expire with +``RECENT_WINDOW_SEC`` and are wiped on ``clear_hot_cache()``. +""" + +import time + +import pytest + +from src.jarvis.memory.conversation import DialogueMemory, is_tool_message + + +@pytest.mark.unit +class TestHotCachePrimitives: + def test_get_returns_none_for_missing_key(self): + dm = DialogueMemory() + assert dm.hot_cache_get("nope") is None + + def test_put_then_get_roundtrips(self): + dm = DialogueMemory() + dm.hot_cache_put("k", {"v": 1}) + assert dm.hot_cache_get("k") == {"v": 1} + + def test_entries_expire_with_hot_window(self): + dm = DialogueMemory(inactivity_timeout=300.0) + dm.hot_cache_put("k", "v") + # Force the entry's timestamp past the window. + with dm._lock: + ts, value = dm._hot_cache["k"] + dm._hot_cache["k"] = (ts - (dm.RECENT_WINDOW_SEC + 10), value) + assert dm.hot_cache_get("k") is None + # Expired entry must also be removed from storage. + assert "k" not in dm._hot_cache + + def test_clear_hot_cache_drops_all_entries(self): + dm = DialogueMemory() + dm.hot_cache_put("a", 1) + dm.hot_cache_put("b", 2) + dm.clear_hot_cache() + assert dm.hot_cache_get("a") is None + assert dm.hot_cache_get("b") is None + + def test_put_overwrites_existing_value(self): + dm = DialogueMemory() + dm.hot_cache_put("k", "old") + dm.hot_cache_put("k", "new") + assert dm.hot_cache_get("k") == "new" + + +@pytest.mark.unit +class TestToolTurnsStorageCap: + def test_tool_turns_capped_to_max_storage(self): + dm = DialogueMemory() + # Push more entries than the cap; each call appends one turn. + for i in range(dm._tool_turns_max_storage + 5): + dm.record_tool_turn([ + {"role": "tool", "tool_call_id": f"c{i}", "content": f"r{i}"}, + ]) + assert len(dm._tool_turns) == dm._tool_turns_max_storage + # The oldest entries are dropped — last one survives. + last_msg = dm._tool_turns[-1][1][0]["content"] + assert last_msg.endswith(str(dm._tool_turns_max_storage + 4)) + + +@pytest.mark.unit +class TestIsToolMessage: + def test_native_tool_role(self): + assert is_tool_message({"role": "tool", "content": "x"}) is True + + def test_assistant_with_tool_calls(self): + assert is_tool_message({ + "role": "assistant", "content": "", + "tool_calls": [{"id": "c1"}], + }) is True + + def test_assistant_without_tool_calls(self): + assert is_tool_message({"role": "assistant", "content": "hi"}) is False + + def test_text_tool_user_with_tool_name(self): + assert is_tool_message({ + "role": "user", "content": "result", "tool_name": "webSearch", + }) is True + + def test_plain_user_message(self): + assert is_tool_message({"role": "user", "content": "hi"}) is False + + def test_non_dict_returns_false(self): + assert is_tool_message("tool") is False + assert is_tool_message(None) is False diff --git a/tests/test_engine_hot_window_caches.py b/tests/test_engine_hot_window_caches.py new file mode 100644 index 00000000..781af19f --- /dev/null +++ b/tests/test_engine_hot_window_caches.py @@ -0,0 +1,208 @@ +"""End-to-end coverage for the hot-window scratch caches in run_reply_engine. + +Three caches share one primitive (DialogueMemory.hot_cache_*): + +1. Warm profile block — query-agnostic, keyed on a constant. +2. Memory enrichment extractor — keyed on the redacted query (+topic hint). +3. Tool router output — keyed on redacted query + strategy + catalogue. + +All three should fire on the second matching turn within the hot window so +follow-up queries don't pay for SQLite reads or LLM hops they already did. + +Also covers the C1 fix: when the planner explicitly emits a `searchMemory` +step, the recall gate must NOT short-circuit memory enrichment even when +hot-window coverage is high. +""" + +from unittest.mock import Mock, patch + +import pytest + +from src.jarvis.memory.conversation import DialogueMemory +from src.jarvis.reply.engine import run_reply_engine + + +def _mock_cfg(): + cfg = Mock() + cfg.ollama_base_url = "http://localhost:11434" + cfg.ollama_chat_model = "test-large" + cfg.voice_debug = False + cfg.llm_tools_timeout_sec = 8.0 + cfg.llm_embed_timeout_sec = 10.0 + cfg.llm_chat_timeout_sec = 45.0 + cfg.llm_digest_timeout_sec = 8.0 + cfg.memory_enrichment_max_results = 5 + cfg.memory_enrichment_source = "diary" + cfg.memory_digest_enabled = False + cfg.tool_result_digest_enabled = False + cfg.location_ip_address = None + cfg.location_auto_detect = False + cfg.location_enabled = False + cfg.agentic_max_turns = 8 + cfg.tool_search_max_calls = 3 + cfg.tool_selection_strategy = "all" + cfg.tool_carryover_max_turns = 2 + cfg.tool_carryover_per_entry_chars = 1200 + cfg.mcps = {} + cfg.llm_thinking_enabled = False + cfg.tts_engine = "none" + cfg.ollama_embed_model = "test-embed" + cfg.db_path = ":memory:" + return cfg + + +@pytest.mark.unit +@patch("src.jarvis.memory.graph_ops.format_warm_profile_block", return_value="") +@patch("src.jarvis.memory.graph_ops.build_warm_profile", return_value={"user": "", "directives": ""}) +@patch("src.jarvis.memory.graph.GraphMemoryStore") +@patch("src.jarvis.reply.engine.select_tools", return_value=[]) +@patch("src.jarvis.reply.engine.plan_query", return_value=[]) +@patch("src.jarvis.reply.engine.extract_search_params_for_memory", return_value={}) +@patch("src.jarvis.reply.engine.extract_text_from_response") +@patch("src.jarvis.reply.engine.chat_with_messages") +def test_tool_router_cached_across_turns( + mock_chat, mock_extract, mock_extractor, mock_plan, mock_select, + _mock_graph, _mock_warm, _mock_fmt, +): + """Two identical queries within the same DialogueMemory should call the + tool router exactly once — the second turn must hit the hot-window cache. + """ + mock_chat.side_effect = [ + {"message": {"content": "hello"}}, + {"message": {"content": "hello again"}}, + ] + mock_extract.side_effect = ["hello", "hello again"] + + db = Mock() + cfg = _mock_cfg() + dm = DialogueMemory() + + run_reply_engine(db=db, cfg=cfg, tts=None, text="say hi", dialogue_memory=dm) + run_reply_engine(db=db, cfg=cfg, tts=None, text="say hi", dialogue_memory=dm) + + assert mock_select.call_count == 1, ( + f"router should be cached on identical query; called {mock_select.call_count} times" + ) + + +@pytest.mark.unit +@patch("src.jarvis.memory.graph_ops.format_warm_profile_block", return_value="") +@patch("src.jarvis.memory.graph_ops.build_warm_profile", return_value={"user": "", "directives": ""}) +@patch("src.jarvis.memory.graph.GraphMemoryStore") +@patch("src.jarvis.reply.engine.select_tools", return_value=[]) +@patch("src.jarvis.reply.engine.plan_query", return_value=[]) +@patch("src.jarvis.reply.engine.extract_search_params_for_memory", return_value={"keywords": ["x"], "questions": []}) +@patch("src.jarvis.memory.conversation.search_conversation_memory_by_keywords", return_value=[]) +@patch("src.jarvis.reply.engine.extract_text_from_response") +@patch("src.jarvis.reply.engine.chat_with_messages") +def test_memory_extractor_cached_across_turns( + mock_chat, mock_extract, _mock_search, mock_extractor, + _mock_plan, _mock_select, _mock_graph, _mock_warm, _mock_fmt, +): + """Empty plan → fail-open path runs the extractor. The second identical + follow-up must skip the extractor LLM call. + + The recall gate would also fire on a tool-grounded follow-up, so we + keep the dialogue free of tool messages here to exercise the extractor + path on both turns. + """ + mock_chat.side_effect = [ + {"message": {"content": "first"}}, + {"message": {"content": "second"}}, + ] + mock_extract.side_effect = ["first", "second"] + + db = Mock() + cfg = _mock_cfg() + dm = DialogueMemory() + + run_reply_engine(db=db, cfg=cfg, tts=None, + text="tell me about pushkin", dialogue_memory=dm) + run_reply_engine(db=db, cfg=cfg, tts=None, + text="tell me about pushkin", dialogue_memory=dm) + + assert mock_extractor.call_count == 1, ( + f"extractor should be cached; called {mock_extractor.call_count} times" + ) + + +@pytest.mark.unit +@patch("src.jarvis.memory.graph_ops.format_warm_profile_block", return_value="warm-block") +@patch("src.jarvis.memory.graph_ops.build_warm_profile", return_value={"user": "u", "directives": "d"}) +@patch("src.jarvis.memory.graph.GraphMemoryStore") +@patch("src.jarvis.reply.engine.select_tools", return_value=[]) +@patch("src.jarvis.reply.engine.plan_query", return_value=[]) +@patch("src.jarvis.reply.engine.extract_search_params_for_memory", return_value={}) +@patch("src.jarvis.reply.engine.extract_text_from_response") +@patch("src.jarvis.reply.engine.chat_with_messages") +def test_warm_profile_cached_across_turns( + mock_chat, mock_extract, _mock_extractor, _mock_plan, + _mock_select, _mock_graph, mock_build, _mock_fmt, +): + """Warm profile is query-agnostic; second turn must reuse the cached + block instead of re-traversing the graph store. + """ + mock_chat.side_effect = [ + {"message": {"content": "a"}}, + {"message": {"content": "b"}}, + ] + mock_extract.side_effect = ["a", "b"] + + db = Mock() + cfg = _mock_cfg() + dm = DialogueMemory() + + run_reply_engine(db=db, cfg=cfg, tts=None, text="hi", dialogue_memory=dm) + run_reply_engine(db=db, cfg=cfg, tts=None, text="anything else", dialogue_memory=dm) + + assert mock_build.call_count == 1, ( + f"warm profile should be built once and cached; got {mock_build.call_count} calls" + ) + + +@pytest.mark.unit +@patch("src.jarvis.memory.graph_ops.format_warm_profile_block", return_value="") +@patch("src.jarvis.memory.graph_ops.build_warm_profile", return_value={"user": "", "directives": ""}) +@patch("src.jarvis.memory.graph.GraphMemoryStore") +@patch("src.jarvis.reply.engine.select_tools", return_value=[]) +@patch( + "src.jarvis.reply.engine.plan_query", + return_value=["searchMemory topic='justin bieber'", "reply"], +) +@patch("src.jarvis.reply.engine.extract_search_params_for_memory", + return_value={"keywords": ["bieber"], "questions": []}) +@patch("src.jarvis.memory.conversation.search_conversation_memory_by_keywords", return_value=[]) +@patch("src.jarvis.reply.engine.extract_text_from_response") +@patch("src.jarvis.reply.engine.chat_with_messages") +def test_planner_search_memory_overrides_recall_gate( + mock_chat, mock_extract, _mock_search, mock_extractor, + _mock_plan, _mock_select, _mock_graph, _mock_warm, _mock_fmt, +): + """C1 fix: when the planner emits `searchMemory`, the recall gate must + NOT short-circuit memory enrichment even though the hot window contains + a fresh tool result that overlaps the query. + """ + mock_chat.side_effect = [ + {"message": {"content": "Canadian singer."}}, + ] + mock_extract.side_effect = ["Canadian singer."] + + db = Mock() + cfg = _mock_cfg() + dm = DialogueMemory() + # Plant a fresh tool result that would otherwise satisfy the recall gate. + dm.add_message("user", "who is justin bieber") + dm.record_tool_turn([ + {"role": "tool", "tool_call_id": "c1", + "content": "Justin Bieber is a Canadian singer with the song Baby."}, + ]) + dm.add_message("assistant", "Canadian singer.") + + run_reply_engine(db=db, cfg=cfg, tts=None, + text="bieber more about justin", dialogue_memory=dm) + + # Planner explicitly demanded memory → extractor must run. + assert mock_extractor.call_count == 1, ( + "extractor must run when planner emits searchMemory, " + "regardless of recall-gate coverage" + ) diff --git a/tests/test_redact_extended.py b/tests/test_redact_extended.py new file mode 100644 index 00000000..1bdbaf92 --- /dev/null +++ b/tests/test_redact_extended.py @@ -0,0 +1,86 @@ +"""Tests for the extended structural-redaction rules added so tool-output +carryover and recall-gate debug logs cannot leak credentials. +""" + +import pytest + +from src.jarvis.utils.redact import redact, scrub_secrets + + +@pytest.mark.unit +class TestVendorAccessKeys: + def test_aws_akia_key_redacted(self): + out = redact("key=AKIAIOSFODNN7EXAMPLE rest") + assert "AKIAIOSFODNN7EXAMPLE" not in out + assert "[REDACTED_AWS_KEY]" in out + + def test_aws_asia_key_redacted(self): + out = redact("ASIAIOSFODNN7EXAMPLE") + assert "ASIAIOSFODNN7EXAMPLE" not in out + assert "[REDACTED_AWS_KEY]" in out + + def test_stripe_live_secret_redacted(self): + token = "sk_live_" + "a" * 24 + out = redact(f"see {token} please") + assert token not in out + assert "[REDACTED_STRIPE_KEY]" in out + + def test_stripe_test_publishable_redacted(self): + token = "pk_test_" + "Z" * 24 + out = redact(token) + assert token not in out + assert "[REDACTED_STRIPE_KEY]" in out + + def test_github_pat_redacted(self): + token = "ghp_" + "A" * 36 + out = redact(token) + assert token not in out + assert "[REDACTED_GH_TOKEN]" in out + + def test_openai_key_redacted(self): + token = "sk-" + "A" * 40 + out = redact(token) + assert token not in out + assert "[REDACTED_OPENAI_KEY]" in out + + def test_google_api_key_redacted(self): + token = "AIza" + "B" * 35 + out = redact(token) + assert token not in out + assert "[REDACTED_GOOG_KEY]" in out + + +@pytest.mark.unit +class TestAuthorizationHeaders: + def test_bearer_header_redacted(self): + out = scrub_secrets("Authorization: Bearer abc.def.ghi") + assert "abc.def.ghi" not in out + assert "Authorization: Bearer [REDACTED]" in out + + def test_basic_header_redacted(self): + out = scrub_secrets("Authorization: Basic dXNlcjpwYXNz") + assert "dXNlcjpwYXNz" not in out + assert "Authorization: Basic [REDACTED]" in out + + +@pytest.mark.unit +class TestKeywordAnchoredCredentials: + def test_refresh_token_keyword_redacted(self): + out = redact("refresh_token=abcdef123456") + assert "abcdef123456" not in out + assert "refresh_token=[REDACTED]" in out + + def test_access_token_keyword_redacted(self): + out = redact("access_token: zzz999") + assert "zzz999" not in out + assert "access_token=[REDACTED]" in out + + def test_session_id_redacted(self): + out = redact("session_id=deadbeefcafe") + assert "deadbeefcafe" not in out + assert "session_id=[REDACTED]" in out + + def test_oauth_token_redacted(self): + out = redact("oauth_token=qwertyuiop") + assert "qwertyuiop" not in out + assert "oauth_token=[REDACTED]" in out From a48be9e8b793862d2fa2b378e7b0304254ca233f Mon Sep 17 00:00:00 2001 From: Baris Sencan Date: Mon, 27 Apr 2026 21:17:28 +0100 Subject: [PATCH 3/5] perf(reply): conversation-scoped cache + graph-driven warm-profile invalidation Extends the reply-engine scratch cache lifetime from 5-minute hot-window to the full active conversation, with proper invalidation hooks so the cached warm profile and tool router output stay correct as the conversation grows. DialogueMemory: - hot_cache_get/put no longer expire entries by RECENT_WINDOW_SEC age. Entries persist until clear_hot_cache(), invalidate_warm_profile(), or new-conversation reset in the engine. - _tool_turns no longer pruned by age either; bounded by hard storage cap (16 turns) and cleared on new-conversation entry / stop. - Adds WARM_PROFILE_CACHE_KEY constant so the engine and the graph-mutation invalidator share a single key. - Adds invalidate_warm_profile() that drops only that one entry. - Scrubs tool_calls.arguments (dict and JSON-string forms) so re- injection of the assistant tool_calls row on the next turn cannot leak PII captured in query args. - _next_ts() helper guarantees strictly-monotonic timestamps for _messages and _tool_turns, fixing interleave ordering on Windows where time.time() has ~16ms granularity. graph.py: - Adds register_graph_mutation_listener / unregister registry plus _resolve_branch helper (parent-walk to FIXED_BRANCH ancestor with depth cap). create_node / update_node / delete_node now notify listeners with action + node_id + branch attribution. delete resolves branch BEFORE the row is gone. reply/engine.py: - New-conversation entry (has_recent_messages was False) now wipes hot_cache and tool carryover before the turn runs, so stale state from a lapsed session can't leak into a fresh one. - Warm-profile cache lookup uses the shared cache key constant. daemon.py: - After DialogueMemory init, registers a graph mutation listener filtering on {BRANCH_USER, BRANCH_DIRECTIVES} that calls invalidate_warm_profile() on the global DialogueMemory. World- branch writes are noise for the warm profile and are ignored. Specs (reply, graph, llm_contexts) updated to reflect conversation- scoped cache lifetime and the new invalidation hooks. Tests: new tests/test_graph_mutation_listener.py covers the registry + branch attribution + warm-profile invalidation hook end-to-end. test_dialogue_memory_hot_cache, test_dialogue_memory_tool_carryover, and test_engine_hot_window_caches updated for new lifetime contract, tool_call arg scrubbing, and new-conversation cache wipe. Co-Authored-By: Claude Opus 4.7 --- docs/llm_contexts.md | 8 +- src/jarvis/daemon.py | 36 ++++ src/jarvis/memory/conversation.py | 145 +++++++++---- src/jarvis/memory/graph.py | 79 +++++++- src/jarvis/memory/graph.spec.md | 8 + src/jarvis/reply/engine.py | 40 +++- src/jarvis/reply/reply.spec.md | 12 +- tests/test_dialogue_memory_hot_cache.py | 32 ++- tests/test_dialogue_memory_tool_carryover.py | 69 ++++++- tests/test_engine_hot_window_caches.py | 48 +++++ tests/test_graph_mutation_listener.py | 201 +++++++++++++++++++ 11 files changed, 610 insertions(+), 68 deletions(-) create mode 100644 tests/test_graph_mutation_listener.py diff --git a/docs/llm_contexts.md b/docs/llm_contexts.md index fa71ef30..ab51c827 100644 --- a/docs/llm_contexts.md +++ b/docs/llm_contexts.md @@ -11,9 +11,9 @@ Every distinct LLM call in Jarvis, what feeds it, what consumes it, and how it i - **Model / gating**: `cfg.ollama_chat_model` (the big model). Not optional. No size branching on the loop itself — size branching affects the digests/evaluator around it. - **Inputs**: - Redacted user query - - Recent dialogue (last 5 minutes), including in-loop tool-call + tool-role messages from prior replies within the hot window (tool carryover — `DialogueMemory.record_tool_turn` / `get_recent_turns_with_tools` in [src/jarvis/memory/conversation.py](src/jarvis/memory/conversation.py); capped by `cfg.tool_carryover_max_turns` / `tool_carryover_per_entry_chars`; cleared on `stop` signal; UNTRUSTED WEB EXTRACT fence markers preserved on truncation) + - Recent dialogue (last 5 minutes), including in-loop tool-call + tool-role messages from prior replies within the active conversation (tool carryover, `DialogueMemory.record_tool_turn` / `get_recent_turns_with_tools` in [src/jarvis/memory/conversation.py](src/jarvis/memory/conversation.py); per-prompt cap via `cfg.tool_carryover_max_turns` / `tool_carryover_per_entry_chars`; storage cap `_tool_turns_max_storage = 16`; cleared on `stop` signal AND on new-conversation entry; UNTRUSTED WEB EXTRACT fence markers preserved on truncation; both `content` and `tool_calls[*].function.arguments` scrubbed on write) - Unified system prompt from [src/jarvis/system_prompt.py](src/jarvis/system_prompt.py) + ASR note + tool-protocol guidance - - **Warm profile block** (query-agnostic User + Directives excerpt from the knowledge graph, composed by `build_warm_profile()` / `format_warm_profile_block()` in [src/jarvis/memory/graph_ops.py](src/jarvis/memory/graph_ops.py) at Step 3.5 of `reply()`; no LLM call, pure SQLite read; injected unconditionally so personalisation is the default; result cached in `DialogueMemory._hot_cache` under key `warm_profile_block` so follow-up turns within one hot window skip the BFS — cleared on `stop` and hot-window expiry) + - **Warm profile block** (query-agnostic User + Directives excerpt from the knowledge graph, composed by `build_warm_profile()` / `format_warm_profile_block()` in [src/jarvis/memory/graph_ops.py](src/jarvis/memory/graph_ops.py) at Step 3.5 of `reply()`; no LLM call, pure SQLite read; injected unconditionally so personalisation is the default; result cached in `DialogueMemory._hot_cache` under `DialogueMemory.WARM_PROFILE_CACHE_KEY` for the lifetime of the active conversation. Invalidated on `stop`, on new-conversation entry, AND on User/Directives graph mutations via the listener registered in [src/jarvis/daemon.py](src/jarvis/daemon.py) against `register_graph_mutation_listener` in [src/jarvis/memory/graph.py](src/jarvis/memory/graph.py); World-branch writes are ignored) - Digested memory enrichment (optional, see #4) - Time + location context (re-injected each turn) - Tool schema: native via `generate_tools_json_schema()` ([src/jarvis/tools/registry.py](src/jarvis/tools/registry.py)) or text fallback via `_text_tool_call_guidance()` ([engine.py:68](src/jarvis/reply/engine.py:68)) @@ -44,7 +44,7 @@ Every distinct LLM call in Jarvis, what feeds it, what consumes it, and how it i - **System prompt**: inline at [enrichment.py:35-63](src/jarvis/reply/enrichment.py:35). - **Output**: `{keywords, from?, to?, questions?}`. Consumed by memory search in the reply engine. - **Limits**: up to 2 retries; timeout from `llm_tools_timeout_sec`. -- **Caching**: result cached in `DialogueMemory._hot_cache` under key `enrichment:{redacted_query[+topic_hint]}`. Identical follow-ups within one hot window reuse the dict and skip the LLM hop. Cleared on `clear_hot_cache()` (e.g. `stop` signal) and on hot-window expiry. +- **Caching**: result cached in `DialogueMemory._hot_cache` under key `enrichment:{redacted_query[+topic_hint]}` for the lifetime of the active conversation. Identical follow-ups within the same conversation reuse the dict and skip the LLM hop. Cleared by `clear_hot_cache()` on the `stop` signal and on new-conversation entry. ## 3b. Recall Gate (pre-enrichment short-circuit) @@ -95,7 +95,7 @@ Every distinct LLM call in Jarvis, what feeds it, what consumes it, and how it i - **System prompt**: inline (~lines 260-315). Teaches pick up-to-5 tools or `none`. - **Output**: comma-separated tool names or `none`. Capped at `_LLM_MAX_SELECTED` (5). Always-included tools (`stop`, `toolSearchTool`) are unioned in regardless. - **Limits**: `llm_timeout_sec`. On failure → all tools. -- **Caching**: `routed_tools` cached in `DialogueMemory._hot_cache` under key `router:{redacted_query}|{strategy}|{builtin-names}|{mcp-names}`. The catalogue signature lets a mid-window MCP refresh invalidate the cache; `context_hint` is intentionally excluded so time/location drift inside one hot window doesn't bust it. Cleared on `stop` signal and hot-window expiry. +- **Caching**: `routed_tools` cached in `DialogueMemory._hot_cache` under key `router:{redacted_query}|{strategy}|{builtin-names}|{mcp-names}` for the lifetime of the active conversation. The catalogue signature lets a mid-conversation MCP refresh invalidate the cache; `context_hint` is intentionally excluded so time/location drift inside one conversation doesn't bust it. Cleared by `clear_hot_cache()` on the `stop` signal and on new-conversation entry. ## 8. Tool Searcher (mid-loop escape hatch) diff --git a/src/jarvis/daemon.py b/src/jarvis/daemon.py index 14806c8c..ad488929 100644 --- a/src/jarvis/daemon.py +++ b/src/jarvis/daemon.py @@ -348,6 +348,42 @@ def main() -> None: ) print("✓ Dialogue memory initialized", flush=True) + # Wire the conversation-scoped warm-profile cache to graph mutations. + # When the User or Directives branch is mutated mid-conversation, the + # cached warm profile is dropped so the next reply rebuilds it from + # the current graph state. World-branch writes (typical webSearch + # extractions) do not touch warm profile, so they are ignored. + try: + from .memory.graph import ( + BRANCH_DIRECTIVES, + BRANCH_USER, + register_graph_mutation_listener, + ) + + _wp_relevant_branches = {BRANCH_USER, BRANCH_DIRECTIVES} + + def _invalidate_wp_on_graph_mutation(*, action, node_id, branch): + del action, node_id # Unused; only branch matters here. + if branch in _wp_relevant_branches and _global_dialogue_memory is not None: + try: + _global_dialogue_memory.invalidate_warm_profile() + debug_log( + f"warm profile invalidated by {branch} graph mutation", + "memory", + ) + except Exception as exc: + debug_log( + f"warm profile invalidation failed (non-fatal): {exc}", + "memory", + ) + + register_graph_mutation_listener(_invalidate_wp_on_graph_mutation) + except Exception as exc: + debug_log( + f"warm profile mutation listener wiring failed (non-fatal): {exc}", + "memory", + ) + # Knowledge graph: wipe + re-seed if the on-disk shape predates the # User/Directives/World taxonomy. Non-destructive to the diary — # users can re-import via the memory viewer. diff --git a/src/jarvis/memory/conversation.py b/src/jarvis/memory/conversation.py index c97eae9a..aa985bc3 100644 --- a/src/jarvis/memory/conversation.py +++ b/src/jarvis/memory/conversation.py @@ -15,6 +15,29 @@ _UNTRUSTED_FENCE_END = "<<>>" +def _scrub_tool_call(tc: dict) -> dict: + """Return a copy of a tool-call entry with the function arguments + scrubbed of secrets. Handles both dict and string-encoded arguments + (some providers serialise arguments as a JSON string). + """ + if not isinstance(tc, dict): + return tc + out = dict(tc) + fn = out.get("function") + if isinstance(fn, dict): + fn_out = dict(fn) + args = fn_out.get("arguments") + if isinstance(args, str) and args: + fn_out["arguments"] = scrub_secrets(args) + elif isinstance(args, dict): + fn_out["arguments"] = { + k: (scrub_secrets(v) if isinstance(v, str) else v) + for k, v in args.items() + } + out["function"] = fn_out + return out + + def is_tool_message(msg: dict) -> bool: """True if a message is a tool-call request or a tool-result. @@ -22,7 +45,10 @@ def is_tool_message(msg: dict) -> bool: - Native: ``role="tool"`` for results, or ``role="assistant"`` carrying a non-empty ``tool_calls`` list for the outbound call. - Text-tool fallback (small models): the tool result is appended as a - ``role="user"`` message tagged with ``tool_name``. + ``role="user"`` message tagged with ``tool_name``. The tagging is + done by the reply engine in `src/jarvis/reply/engine.py` (see the + text-tool branch where ``"tool_name": tool_name`` is attached to + the synthetic user message). """ if not isinstance(msg, dict): return False @@ -117,16 +143,29 @@ def __init__(self, inactivity_timeout: float = 300.0, max_interactions: int = 20 # tool-related messages. Excluded from `get_pending_chunks` so raw tool # payloads never reach the diary summariser. self._tool_turns: List[Tuple[float, List[dict]]] = [] - # Hot-window scratch cache: per-key (timestamp, value) entries that - # auto-expire with the conversation. Lets the reply engine memoise - # idempotent per-turn work (warm profile, memory enrichment params, - # tool router output) within a single hot window without leaking - # state across conversations. + # Conversation-scoped scratch cache: per-key (timestamp, value) + # entries that survive for the lifetime of the active conversation. + # The reply engine wipes this on new-conversation entry (when + # ``has_recent_messages`` was False at turn start), and individual + # entries can be invalidated on demand (e.g. ``invalidate_warm_profile`` + # on graph mutations). The timestamp is retained so callers may + # inspect entry age, but reads are NOT bounded by RECENT_WINDOW_SEC + # any more — long active conversations would otherwise see warm + # profile / router caches expire while the session is still going. self._hot_cache: dict[str, Tuple[float, object]] = {} - # Hard ceiling on stored tool turns to bound memory in pathological - # sessions. The read path further filters by max_tool_turns and - # window expiry; this cap is a backstop against unbounded growth. + # Hard ceiling on stored tool turns. With the default + # ``tool_carryover_max_turns=2`` re-injected per reply, 16 lets a + # session accumulate roughly 8x the visible budget before the + # oldest entries get evicted; well below the prompt-bloat + # threshold, well above any realistic single-conversation need. self._tool_turns_max_storage = 16 + # Monotonic high-water timestamp. ``time.time()`` has ~16ms + # granularity on Windows, so consecutive inserts can collide and + # break interleave ordering between text and tool messages. We + # bump the stored ts by a tiny epsilon so insertion order is + # always preserved, while keeping wall-clock semantics close + # enough for the RECENT_WINDOW_SEC cutoff. + self._last_ts: float = 0.0 self._last_activity_time: float = time.time() self._inactivity_timeout = inactivity_timeout # Unified window: context retention = forced diary update interval @@ -139,10 +178,18 @@ def __init__(self, inactivity_timeout: float = 300.0, max_interactions: int = 20 # Track the last profile used for follow-up detection self._last_profile: Optional[str] = None + def _next_ts(self) -> float: + """Return a strictly-monotonic timestamp. Caller must hold ``_lock``.""" + now = time.time() + if now <= self._last_ts: + now = self._last_ts + 1e-6 + self._last_ts = now + return now + def add_message(self, role: str, content: str) -> None: """Add a message to recent memory. Thread-safe.""" with self._lock: - timestamp = time.time() + timestamp = self._next_ts() self._messages.append((timestamp, role.strip(), content.strip())) self._last_activity_time = timestamp @@ -178,7 +225,7 @@ def record_tool_turn(self, tool_msgs: List[dict]) -> None: """ if not tool_msgs: return - # Scrub outside the lock — pure function over message content. + # Scrub outside the lock, pure function over message content. scrubbed: List[dict] = [] for m in tool_msgs: mm = dict(m) @@ -188,17 +235,21 @@ def record_tool_turn(self, tool_msgs: List[dict]) -> None: # API responses, scraped pages). Scrub before persisting # so re-injection on the next turn can't leak them. mm["content"] = scrub_secrets(c) + # Native tool-call arguments can also carry sensitive query + # text (e.g. webSearch(query="my email is alice@example.com")). + # Scrub each argument value so re-injection of the assistant + # tool_calls row on the next turn cannot leak them. + tcalls = mm.get("tool_calls") + if isinstance(tcalls, list): + mm["tool_calls"] = [_scrub_tool_call(tc) for tc in tcalls] scrubbed.append(mm) - ts = time.time() with self._lock: + ts = self._next_ts() self._tool_turns.append((ts, scrubbed)) - # Bound storage: drop entries older than the window, then cap - # to the most recent `_tool_turns_max_storage`. Backstop against - # unbounded growth in long sessions. - cutoff = ts - self.RECENT_WINDOW_SEC - self._tool_turns = [ - (t, m) for t, m in self._tool_turns if t >= cutoff - ] + # Bound storage to a hard ceiling. Tool turns are NOT pruned + # by RECENT_WINDOW_SEC age any more; the engine clears them + # on new-conversation entry so an active session keeps its + # carryover regardless of how long ago each tool fired. if len(self._tool_turns) > self._tool_turns_max_storage: self._tool_turns = self._tool_turns[-self._tool_turns_max_storage:] @@ -208,28 +259,44 @@ def clear_tool_carryover(self) -> None: self._tool_turns = [] # ------------------------------------------------------------------ - # Hot-window cache + # Conversation-scoped scratch cache # ------------------------------------------------------------------ - # Small primitive used by the reply engine to memoise expensive - # per-turn work that's idempotent within a single conversation: warm - # profile (SQLite reads), memory enrichment extractor (LLM call), - # tool router (LLM call). Entries auto-expire with RECENT_WINDOW_SEC - # and are wiped on `clear_hot_cache()` (e.g. on the stop signal). + # Primitive used by the reply engine to memoise expensive per-turn + # work that's idempotent within a single conversation: warm profile + # (SQLite reads), memory enrichment extractor (LLM call), tool + # router (LLM call). + # + # Lifetime contract: + # - Entries persist for the lifetime of the active conversation; + # they are NOT bounded by RECENT_WINDOW_SEC age. A long active + # chat keeps the warm profile / router cache hot for hours. + # - The reply engine wipes the cache when it detects a new + # conversation (i.e. ``has_recent_messages()`` was False at turn + # entry) and on the ``stop`` signal. + # - Granular invalidation hooks: ``invalidate_warm_profile()`` is + # called from a graph-mutation listener so the User / Directives + # branches stay fresh even mid-conversation. # # Callers pick a key that captures the invalidation contract — # typically the redacted query for query-dependent values, or a # constant for query-agnostic values. + # Cache key for the warm-profile block. Centralised so the engine + # and the graph-mutation invalidator agree on it. + WARM_PROFILE_CACHE_KEY = "warm_profile_block" + def hot_cache_get(self, key: str) -> Optional[object]: - """Return cached value if present and within the hot window.""" + """Return the cached value for ``key`` if present, else ``None``. + + No age-based expiry: callers control invalidation via + ``clear_hot_cache``, ``invalidate_warm_profile``, or new- + conversation reset in the engine. + """ with self._lock: entry = self._hot_cache.get(key) if not entry: return None - ts, value = entry - if ts < time.time() - self.RECENT_WINDOW_SEC: - self._hot_cache.pop(key, None) - return None + _ts, value = entry return value def hot_cache_put(self, key: str, value: object) -> None: @@ -238,10 +305,18 @@ def hot_cache_put(self, key: str, value: object) -> None: self._hot_cache[key] = (time.time(), value) def clear_hot_cache(self) -> None: - """Drop all hot-window cache entries.""" + """Drop all conversation-scoped cache entries.""" with self._lock: self._hot_cache = {} + def invalidate_warm_profile(self) -> None: + """Drop the cached warm-profile block. Called from the graph + mutation listener so a mid-conversation User/Directives change + is reflected on the very next turn. + """ + with self._lock: + self._hot_cache.pop(self.WARM_PROFILE_CACHE_KEY, None) + def get_recent_turns_with_tools( self, max_tool_turns: int = 2, @@ -263,9 +338,11 @@ def get_recent_turns_with_tools( for ts, role, content in self._messages: if ts >= cutoff: timeline.append((ts, "msg", {"role": role, "content": content})) - # Keep only the last N tool turns within the window. - live_tool_turns = [(ts, msgs) for ts, msgs in self._tool_turns if ts >= cutoff] - for ts, msgs in live_tool_turns[-max_tool_turns:]: + # Keep only the last N tool turns. Tool carryover lives for + # the conversation, not for RECENT_WINDOW_SEC: an active session + # past the window still benefits from the prior tool result. + # The engine clears ``_tool_turns`` on new-conversation entry. + for ts, msgs in self._tool_turns[-max_tool_turns:]: truncated: list[dict] = [] for m in msgs: mm = dict(m) diff --git a/src/jarvis/memory/graph.py b/src/jarvis/memory/graph.py index 168febff..a2f3c874 100644 --- a/src/jarvis/memory/graph.py +++ b/src/jarvis/memory/graph.py @@ -18,11 +18,51 @@ import uuid from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Optional +from typing import Callable, Optional from ..debug import debug_log +# ── Mutation listeners ───────────────────────────────────────────────────── +# +# Lightweight observer registry so consumers (e.g. DialogueMemory's warm +# profile cache) can invalidate derived state when a node is created, +# updated, or deleted. The listener receives the action name, node id, and +# the FIXED_BRANCH ancestor (e.g. ``"user"``, ``"directives"``, ``"world"``) +# so it can scope its reaction. Failures in listeners are logged and +# swallowed so they cannot break a write. + +_MUTATION_LISTENERS: "list[Callable[..., None]]" = [] + + +def register_graph_mutation_listener(cb: Callable[..., None]) -> None: + """Register a callback invoked after every node mutation. + + The callback is invoked with keyword arguments ``action``, ``node_id``, + and ``branch`` where ``branch`` is the id of the FIXED_BRANCH ancestor + (or the node id itself when the node is a fixed branch), or ``None`` + when the branch cannot be resolved (e.g. root mutations). + """ + if cb not in _MUTATION_LISTENERS: + _MUTATION_LISTENERS.append(cb) + + +def unregister_graph_mutation_listener(cb: Callable[..., None]) -> None: + """Remove a previously registered mutation listener (idempotent).""" + try: + _MUTATION_LISTENERS.remove(cb) + except ValueError: + pass + + +def _notify_graph_mutation(action: str, node_id: str, branch: Optional[str]) -> None: + for cb in list(_MUTATION_LISTENERS): + try: + cb(action=action, node_id=node_id, branch=branch) + except Exception as exc: + debug_log(f"graph mutation listener failed (non-fatal): {exc}", "memory") + + # ── Fact normalisation ───────────────────────────────────────────────────── # # Used for dedupe comparisons. Locale-safe — the user base includes @@ -352,6 +392,33 @@ def get_root(self) -> MemoryNode: ).fetchone() return self._row_to_node(row) + def _resolve_branch(self, node_id: Optional[str]) -> Optional[str]: + """Walk parents from ``node_id`` up to find the FIXED_BRANCH id it + belongs to (or itself, if the node IS a fixed branch). Returns + ``None`` for the root or when the node cannot be located. + + Capped at ``MAX_TRAVERSAL_DEPTH`` so a corrupt parent cycle cannot + spin the loop. SQLite reads only — safe to call from write paths. + """ + if not node_id or node_id == "root": + return None + if node_id in FIXED_BRANCH_IDS: + return node_id + current = node_id + for _ in range(MAX_TRAVERSAL_DEPTH): + row = self.conn.execute( + "SELECT parent_id FROM memory_nodes WHERE id = ?", (current,) + ).fetchone() + if row is None: + return None + parent = row["parent_id"] + if parent is None or parent == "root": + return None + if parent in FIXED_BRANCH_IDS: + return parent + current = parent + return None + def create_node( self, name: str, @@ -388,6 +455,7 @@ def create_node( self.conn.commit() debug_log(f"Created memory node '{name}' ({node_id[:8]})", "memory") + _notify_graph_mutation("create", node_id, self._resolve_branch(parent_id)) return MemoryNode( id=node_id, name=name, @@ -440,6 +508,7 @@ def update_node( ) self.conn.commit() + _notify_graph_mutation("update", node_id, self._resolve_branch(node_id)) return node def delete_node(self, node_id: str) -> bool: @@ -451,12 +520,18 @@ def delete_node(self, node_id: str) -> bool: """ if node_id == "root" or node_id in FIXED_BRANCH_IDS: return False + # Resolve branch BEFORE the delete so listeners get a meaningful + # branch attribution even though the row is about to vanish. + branch = self._resolve_branch(node_id) with self._lock: cur = self.conn.execute( "DELETE FROM memory_nodes WHERE id = ?", (node_id,) ) self.conn.commit() - return cur.rowcount > 0 + deleted = cur.rowcount > 0 + if deleted: + _notify_graph_mutation("delete", node_id, branch) + return deleted def node_contains_fact(self, node_id: str, fact: str) -> bool: """True if ``fact`` matches any line of the node's data after diff --git a/src/jarvis/memory/graph.spec.md b/src/jarvis/memory/graph.spec.md index 6833447b..28d52a80 100644 --- a/src/jarvis/memory/graph.spec.md +++ b/src/jarvis/memory/graph.spec.md @@ -89,6 +89,14 @@ Any node except root can be deleted. Children are orphaned (parent_id set to NUL Increments `access_count` and updates `last_accessed`. Called automatically when a node is viewed in the UI or retrieved during query traversal. +### Mutation Listeners + +The graph module exposes a small observer registry, `register_graph_mutation_listener(cb)` / `unregister_graph_mutation_listener(cb)`, invoked after every successful `create_node`, `update_node`, `delete_node`, and (transitively) `append_to_node`. Callbacks receive `action`, `node_id`, and `branch` (the FIXED_BRANCH ancestor id, or `None` for root-level mutations and unresolvable nodes). Listener exceptions are logged via `debug_log` and swallowed so they cannot break a write. + +Touch is intentionally NOT a mutation event: it changes access metadata only, not the warm-profile-relevant fields, so it does not need to invalidate caches. + +The reply layer uses this hook from `daemon.py` to invalidate `DialogueMemory`'s warm-profile cache when the User or Directives branches change mid-conversation. World-branch writes are filtered out because the warm profile does not include the world branch. + ### Access Decay All ordering by access frequency uses a **time-decayed score** computed at query time: `access_count / (1 + age_days / half_life)`. This is hyperbolic decay — a node's effective score halves every `DECAY_HALF_LIFE_DAYS` (default 14) since its last access. The raw `access_count` is never modified, so changing the half-life retroactively reweights all nodes. This applies to `get_top_nodes`, `get_children`, `get_all_nodes`, and `search_nodes` tie-breaking. diff --git a/src/jarvis/reply/engine.py b/src/jarvis/reply/engine.py index 35da6432..b4ee317f 100644 --- a/src/jarvis/reply/engine.py +++ b/src/jarvis/reply/engine.py @@ -712,6 +712,22 @@ def run_reply_engine(db: "Database", cfg, tts: Optional[Any], recent_messages = dialogue_memory.get_recent_messages() is_new_conversation = False + # New conversation reset: when the previous session lapsed past the + # inactivity window, drop the conversation-scoped cache and any + # tool-carryover from the previous session. This is what bounds the + # cache lifetime now that individual entries no longer expire by age. + if is_new_conversation and dialogue_memory is not None: + if hasattr(dialogue_memory, "clear_hot_cache"): + try: + dialogue_memory.clear_hot_cache() + except Exception: + pass + if hasattr(dialogue_memory, "clear_tool_carryover"): + try: + dialogue_memory.clear_tool_carryover() + except Exception: + pass + # Refresh MCP tools on new conversation (memory expired) if is_new_conversation and getattr(cfg, "mcps", {}): try: @@ -892,17 +908,27 @@ def run_reply_engine(db: "Database", cfg, tts: Optional[Any], # interest me" can be answered directly when the model already sees # the user's interests in its system prompt. warm_profile_block = "" - # Hot-window cache: warm profile is query-agnostic and the User / - # Directives branches change rarely, so reusing the block within a - # single conversation saves the SQLite BFS on every follow-up turn. - # Cache is wiped on stop signal and on hot-window expiry. + # Conversation-scoped cache: warm profile is query-agnostic and the + # User / Directives branches change rarely, so reusing the block for + # the lifetime of the conversation saves the SQLite BFS on every + # follow-up turn. The cache is invalidated on: + # - new conversation entry (cleared above with the full hot cache), + # - the stop signal (also clears the full hot cache), + # - any User/Directives graph mutation (via the listener registered + # in daemon.py, which calls ``invalidate_warm_profile`` on the + # active DialogueMemory). + _wp_cache_key = getattr( + type(dialogue_memory), + "WARM_PROFILE_CACHE_KEY", + "warm_profile_block", + ) if dialogue_memory else "warm_profile_block" _wp_cached = ( - dialogue_memory.hot_cache_get("warm_profile_block") + dialogue_memory.hot_cache_get(_wp_cache_key) if dialogue_memory and hasattr(dialogue_memory, "hot_cache_get") else None ) if isinstance(_wp_cached, str): warm_profile_block = _wp_cached - debug_log("warm profile served from hot-window cache", "memory") + debug_log("warm profile served from conversation cache", "memory") else: try: from ..memory.graph import GraphMemoryStore @@ -923,7 +949,7 @@ def run_reply_engine(db: "Database", cfg, tts: Optional[Any], "memory", ) if dialogue_memory and hasattr(dialogue_memory, "hot_cache_put"): - dialogue_memory.hot_cache_put("warm_profile_block", warm_profile_block) + dialogue_memory.hot_cache_put(_wp_cache_key, warm_profile_block) except Exception as e: debug_log(f"warm profile load failed (non-fatal): {e}", "memory") diff --git a/src/jarvis/reply/reply.spec.md b/src/jarvis/reply/reply.spec.md index e6e0d5d4..c2d88387 100644 --- a/src/jarvis/reply/reply.spec.md +++ b/src/jarvis/reply/reply.spec.md @@ -30,13 +30,13 @@ Design principles enforced by the engine: 2. Recent Dialogue Context - Include short-term dialogue memory (last 5 minutes) as prior messages. - - The fetch returns not only user/assistant prose but also **tool-call and tool-result messages** from in-loop work in prior replies within the hot window (capped by `cfg.tool_carryover_max_turns` and `cfg.tool_carryover_per_entry_chars`, fence markers of UNTRUSTED WEB EXTRACT blocks preserved on truncation, secrets scrubbed). This lets follow-up turns reuse a prior `webSearch` / MCP result instead of re-fetching it. The carryover is captured at the end of each reply (success or error), and is cleared when the user dismisses with the `stop` tool so the next wake-word turn starts clean. - - A **recall gate** (`src/jarvis/memory/recall_gate.py`, deterministic, no LLM) skips diary / graph / memory-digest enrichment when the hot window already covers the topic (≥50% content-word overlap with a fresh tool-result row). Language-agnostic via `\w{3,}` with `re.UNICODE`. Fail-open on any error. The gate is bypassed when the planner explicitly emitted a `searchMemory` step — planner intent always wins over coverage heuristics. See `src/jarvis/memory/recall_gate.spec.md`. - - **Hot-window scratch cache** (`DialogueMemory.hot_cache_get` / `hot_cache_put`): per-conversation, time-bounded (`RECENT_WINDOW_SEC`) cache used by the engine to memoise three idempotent per-turn computations within a single hot window: - - **Warm profile** (`warm_profile_block` key, query-agnostic): skips the SQLite traversal of the User + Directives branches on every follow-up turn. + - The fetch returns not only user/assistant prose but also **tool-call and tool-result messages** from in-loop work in prior replies within the active conversation (capped per-prompt by `cfg.tool_carryover_max_turns` and `cfg.tool_carryover_per_entry_chars`, fence markers of UNTRUSTED WEB EXTRACT blocks preserved on truncation, payloads scrubbed including `tool_calls[*].function.arguments`). This lets follow-up turns reuse a prior `webSearch` / MCP result instead of re-fetching it. Carryover is captured at the end of each reply (success or error). It survives for the lifetime of the conversation and is cleared on (a) the `stop` tool, and (b) new-conversation entry, when `has_recent_messages()` was False at turn start. + - A **recall gate** (`src/jarvis/memory/recall_gate.py`, deterministic, no LLM) skips diary / graph / memory-digest enrichment when the hot window already covers the topic (≥50% content-word overlap with a fresh tool-result row). Language-agnostic via `\w{3,}` with `re.UNICODE`. Fail-open on any error. The gate is bypassed when the planner explicitly emitted a `searchMemory` step, planner intent always wins over coverage heuristics. See `src/jarvis/memory/recall_gate.spec.md`. + - **Conversation-scoped scratch cache** (`DialogueMemory.hot_cache_get` / `hot_cache_put`): a small primitive used by the engine to memoise three idempotent per-turn computations for the lifetime of the active conversation: + - **Warm profile** (`DialogueMemory.WARM_PROFILE_CACHE_KEY`, query-agnostic): skips the SQLite traversal of the User + Directives branches on every follow-up turn. Invalidated on User/Directives graph mutations via a listener registered in `daemon.py` against `register_graph_mutation_listener` (`src/jarvis/memory/graph.py`); World-branch writes do not affect it. - **Memory enrichment extractor** (`enrichment:{redacted_query[+topic_hint]}` key): skips the small-model LLM call that derives keywords / questions / time bounds when an identical query repeats. - - **Tool router** (`router:{redacted_query}|{strategy}|{builtin-names}|{mcp-names}` key): skips the router LLM call when the query and tool catalogue match. The catalogue signature lets a mid-window MCP refresh invalidate the cache. - - Cleared on the `stop` signal alongside tool carryover, and entries auto-expire with the hot window. + - **Tool router** (`router:{redacted_query}|{strategy}|{builtin-names}|{mcp-names}` key): skips the router LLM call when the query and tool catalogue match. The catalogue signature lets a mid-conversation MCP refresh invalidate the cache. + - Lifetime: entries persist until (a) the `stop` signal clears the whole cache, (b) the engine detects a new conversation at turn entry (`has_recent_messages()` was False) and clears it before running, or (c) targeted invalidation (warm profile only) on graph mutations. Entries are *not* bounded by `RECENT_WINDOW_SEC` age, so a long active session keeps them warm. 3. Pre-flight Planner - The task-list planner (`plan_query` in `src/jarvis/reply/planner.py`) runs **first**, before any memory lookup or tool routing. It sees the query, a compact dialogue snippet, and the full builtin + MCP tool catalogue (names + one-line descriptions). diff --git a/tests/test_dialogue_memory_hot_cache.py b/tests/test_dialogue_memory_hot_cache.py index 82b83ee5..b2a4a33d 100644 --- a/tests/test_dialogue_memory_hot_cache.py +++ b/tests/test_dialogue_memory_hot_cache.py @@ -1,14 +1,13 @@ -"""Tests for the DialogueMemory hot-window scratch cache and the +"""Tests for the DialogueMemory conversation-scoped scratch cache and the ``is_tool_message`` helper. -The cache is a thin per-conversation primitive used by the reply engine to +The cache is a per-conversation primitive used by the reply engine to memoise idempotent per-turn work (warm profile, memory extractor, tool -router) within a single hot window. Entries auto-expire with -``RECENT_WINDOW_SEC`` and are wiped on ``clear_hot_cache()``. +router). Entries persist for the lifetime of the active conversation and +are wiped on ``clear_hot_cache()``; the warm profile entry can also be +invalidated on demand via ``invalidate_warm_profile()``. """ -import time - import pytest from src.jarvis.memory.conversation import DialogueMemory, is_tool_message @@ -25,16 +24,27 @@ def test_put_then_get_roundtrips(self): dm.hot_cache_put("k", {"v": 1}) assert dm.hot_cache_get("k") == {"v": 1} - def test_entries_expire_with_hot_window(self): + def test_entries_persist_past_recent_window_age(self): + """Cache entries are conversation-scoped, not bounded by + RECENT_WINDOW_SEC. A long active conversation must keep the + cache hot even when the original write is older than the window. + """ dm = DialogueMemory(inactivity_timeout=300.0) dm.hot_cache_put("k", "v") - # Force the entry's timestamp past the window. with dm._lock: ts, value = dm._hot_cache["k"] dm._hot_cache["k"] = (ts - (dm.RECENT_WINDOW_SEC + 10), value) - assert dm.hot_cache_get("k") is None - # Expired entry must also be removed from storage. - assert "k" not in dm._hot_cache + # Age alone must NOT cause the value to disappear; only explicit + # invalidation should drop it. + assert dm.hot_cache_get("k") == "v" + + def test_invalidate_warm_profile_drops_only_that_key(self): + dm = DialogueMemory() + dm.hot_cache_put(dm.WARM_PROFILE_CACHE_KEY, "warm-block") + dm.hot_cache_put("router:abc", ["webSearch"]) + dm.invalidate_warm_profile() + assert dm.hot_cache_get(dm.WARM_PROFILE_CACHE_KEY) is None + assert dm.hot_cache_get("router:abc") == ["webSearch"] def test_clear_hot_cache_drops_all_entries(self): dm = DialogueMemory() diff --git a/tests/test_dialogue_memory_tool_carryover.py b/tests/test_dialogue_memory_tool_carryover.py index 123fc89c..abe533bc 100644 --- a/tests/test_dialogue_memory_tool_carryover.py +++ b/tests/test_dialogue_memory_tool_carryover.py @@ -111,7 +111,12 @@ def test_clear_tool_carryover_drops_tool_msgs_only(self): # Tool rows gone, but user/assistant prose preserved assert roles == ["user", "assistant"] - def test_expired_tool_turns_are_pruned(self): + def test_tool_turns_survive_past_recent_window_age(self): + """Tool carryover is conversation-scoped, not RECENT_WINDOW_SEC- + bounded. An ongoing conversation must keep prior tool results + visible regardless of how long ago each tool fired; the engine + clears them on new-conversation entry and on ``stop``. + """ dm = DialogueMemory(inactivity_timeout=300.0) dm.add_message("user", "q") dm.record_tool_turn([ @@ -121,7 +126,8 @@ def test_expired_tool_turns_are_pruned(self): "arguments": {"q": "x"}}}]}, {"role": "tool", "tool_call_id": "c1", "content": "r"}, ]) - # Force all stored tool-turn timestamps beyond RECENT_WINDOW_SEC + # Even when we backdate the tool-turn timestamp past the window, + # the carryover survives until explicitly cleared. with dm._lock: dm._tool_turns = [ (ts - (dm.RECENT_WINDOW_SEC + 10), msgs) @@ -129,8 +135,63 @@ def test_expired_tool_turns_are_pruned(self): ] out = dm.get_recent_turns_with_tools() - # No tool rows because they expired - assert not any(m.get("role") == "tool" for m in out) + assert any(m.get("role") == "tool" for m in out), ( + "tool carryover must persist beyond RECENT_WINDOW_SEC age" + ) + + dm.clear_tool_carryover() + out_after_clear = dm.get_recent_turns_with_tools() + assert not any(m.get("role") == "tool" for m in out_after_clear) + + def test_tool_call_arguments_are_scrubbed(self): + """Native tool-call arguments can carry secrets too (e.g. an + email or token in the search query). They must be scrubbed + on record so re-injection on the next turn cannot leak them. + """ + dm = DialogueMemory() + dm.record_tool_turn([ + { + "role": "assistant", + "content": "", + "tool_calls": [{ + "id": "c1", + "type": "function", + "function": { + "name": "webSearch", + "arguments": { + "query": "look up alice@example.com please", + }, + }, + }], + }, + {"role": "tool", "tool_call_id": "c1", "content": "ok"}, + ]) + stored_call = dm._tool_turns[0][1][0]["tool_calls"][0] + stored_args = stored_call["function"]["arguments"] + assert "alice@example.com" not in stored_args["query"] + assert "[REDACTED_EMAIL]" in stored_args["query"] + + def test_tool_call_arguments_string_form_is_scrubbed(self): + """Some providers serialise arguments as a JSON string, not a dict.""" + dm = DialogueMemory() + dm.record_tool_turn([ + { + "role": "assistant", + "content": "", + "tool_calls": [{ + "id": "c1", + "type": "function", + "function": { + "name": "webSearch", + "arguments": '{"query": "alice@example.com"}', + }, + }], + }, + {"role": "tool", "tool_call_id": "c1", "content": "ok"}, + ]) + stored_args = dm._tool_turns[0][1][0]["tool_calls"][0]["function"]["arguments"] + assert "alice@example.com" not in stored_args + assert "[REDACTED_EMAIL]" in stored_args def test_tool_payloads_are_scrubbed_of_secrets(self): """Tool results may contain emails, API tokens, JWTs. record_tool_turn diff --git a/tests/test_engine_hot_window_caches.py b/tests/test_engine_hot_window_caches.py index 781af19f..5a316b14 100644 --- a/tests/test_engine_hot_window_caches.py +++ b/tests/test_engine_hot_window_caches.py @@ -206,3 +206,51 @@ def test_planner_search_memory_overrides_recall_gate( "extractor must run when planner emits searchMemory, " "regardless of recall-gate coverage" ) + + +@pytest.mark.unit +@patch("src.jarvis.memory.graph_ops.format_warm_profile_block", return_value="") +@patch("src.jarvis.memory.graph_ops.build_warm_profile", return_value={"user": "", "directives": ""}) +@patch("src.jarvis.memory.graph.GraphMemoryStore") +@patch("src.jarvis.reply.engine.select_tools", return_value=[]) +@patch("src.jarvis.reply.engine.plan_query", return_value=[]) +@patch("src.jarvis.reply.engine.extract_search_params_for_memory", return_value={}) +@patch("src.jarvis.reply.engine.extract_text_from_response") +@patch("src.jarvis.reply.engine.chat_with_messages") +def test_new_conversation_clears_cache_and_carryover( + mock_chat, mock_extract, _mock_extractor, _mock_plan, mock_select, + _mock_graph, _mock_warm, _mock_fmt, +): + """When the previous conversation has lapsed past the inactivity + window, the engine must wipe the conversation-scoped cache and any + leftover tool carryover before running the new turn. Otherwise stale + state from a previous session would leak into a fresh one. + """ + mock_chat.side_effect = [ + {"message": {"content": "fresh"}}, + ] + mock_extract.side_effect = ["fresh"] + + db = Mock() + cfg = _mock_cfg() + dm = DialogueMemory() + + # Plant cache + carryover from a prior (now-lapsed) session. + dm.hot_cache_put(dm.WARM_PROFILE_CACHE_KEY, "old-block") + dm.hot_cache_put("router:old", ["webSearch"]) + dm.record_tool_turn([ + {"role": "tool", "tool_call_id": "c1", "content": "ancient result"}, + ]) + assert dm._tool_turns + assert dm.hot_cache_get(dm.WARM_PROFILE_CACHE_KEY) == "old-block" + + # No recent messages → engine treats this turn as a new conversation. + run_reply_engine(db=db, cfg=cfg, tts=None, text="hello", dialogue_memory=dm) + + # Stale router entry must be gone (full hot-cache wipe), and the old + # tool carryover must not be visible to the new conversation. + assert dm.hot_cache_get("router:old") is None + # The tool carryover from before must have been cleared on entry; any + # tool turns recorded later in this turn would only come from THIS + # reply (mock chat returns a final reply with no tool calls). + assert dm._tool_turns == [] diff --git a/tests/test_graph_mutation_listener.py b/tests/test_graph_mutation_listener.py new file mode 100644 index 00000000..6457e369 --- /dev/null +++ b/tests/test_graph_mutation_listener.py @@ -0,0 +1,201 @@ +"""Tests for the graph mutation listener registry and the warm-profile +invalidation hook it powers. + +The registry lets consumers (notably ``DialogueMemory``'s warm-profile +cache) react to writes against the User / Directives branches mid- +conversation. World-branch writes must NOT invalidate the warm profile, +since the warm profile does not include world facts. +""" + +from __future__ import annotations + +import os +import tempfile + +import pytest + +from src.jarvis.memory.conversation import DialogueMemory +from src.jarvis.memory.graph import ( + BRANCH_DIRECTIVES, + BRANCH_USER, + BRANCH_WORLD, + GraphMemoryStore, + register_graph_mutation_listener, + unregister_graph_mutation_listener, +) + + +@pytest.fixture +def graph_store(): + fd, path = tempfile.mkstemp(suffix=".db") + os.close(fd) + store = GraphMemoryStore(path) + yield store + try: + os.unlink(path) + except OSError: + pass + + +@pytest.mark.unit +class TestMutationListenerRegistry: + def test_create_under_user_notifies_with_user_branch(self, graph_store): + events: list[dict] = [] + + def cb(*, action, node_id, branch): + events.append({"action": action, "node_id": node_id, "branch": branch}) + + register_graph_mutation_listener(cb) + try: + graph_store.create_node("Alice", "user fact", parent_id=BRANCH_USER) + finally: + unregister_graph_mutation_listener(cb) + + actions = [e["action"] for e in events] + branches = [e["branch"] for e in events] + assert "create" in actions + assert BRANCH_USER in branches + + def test_update_under_directives_notifies_with_directives_branch(self, graph_store): + node = graph_store.create_node( + "be brief", "rule", parent_id=BRANCH_DIRECTIVES, + ) + events: list[dict] = [] + + def cb(*, action, node_id, branch): + events.append({"action": action, "node_id": node_id, "branch": branch}) + + register_graph_mutation_listener(cb) + try: + graph_store.update_node(node.id, data="updated") + finally: + unregister_graph_mutation_listener(cb) + + update_events = [e for e in events if e["action"] == "update"] + assert update_events + assert update_events[-1]["branch"] == BRANCH_DIRECTIVES + + def test_delete_under_world_notifies_with_world_branch(self, graph_store): + node = graph_store.create_node( + "Paris", "city", parent_id=BRANCH_WORLD, + ) + events: list[dict] = [] + + def cb(*, action, node_id, branch): + events.append({"action": action, "node_id": node_id, "branch": branch}) + + register_graph_mutation_listener(cb) + try: + graph_store.delete_node(node.id) + finally: + unregister_graph_mutation_listener(cb) + + delete_events = [e for e in events if e["action"] == "delete"] + assert delete_events + assert delete_events[-1]["branch"] == BRANCH_WORLD + + def test_listener_exception_does_not_break_write(self, graph_store): + def boom(*, action, node_id, branch): + raise RuntimeError("listener should not break writes") + + register_graph_mutation_listener(boom) + try: + # Must complete despite the listener raising. + node = graph_store.create_node( + "Bob", "another user fact", parent_id=BRANCH_USER, + ) + assert graph_store.get_node(node.id) is not None + finally: + unregister_graph_mutation_listener(boom) + + def test_unregister_is_idempotent(self): + def cb(**_): + pass + + register_graph_mutation_listener(cb) + unregister_graph_mutation_listener(cb) + unregister_graph_mutation_listener(cb) # second remove must not raise + + def test_deep_descendant_resolves_to_branch(self, graph_store): + """A grandchild several levels deep under user must resolve to the + ``user`` branch so the listener can scope correctly even for nested + nodes. + """ + parent = graph_store.create_node("Profile", "child", parent_id=BRANCH_USER) + child = graph_store.create_node("Tastes", "grandchild", parent_id=parent.id) + events: list[dict] = [] + + def cb(*, action, node_id, branch): + events.append({"action": action, "node_id": node_id, "branch": branch}) + + register_graph_mutation_listener(cb) + try: + graph_store.append_to_node(child.id, "loves jazz") + finally: + unregister_graph_mutation_listener(cb) + + # append_to_node calls update_node internally → at least one update. + update_events = [e for e in events if e["action"] == "update"] + assert update_events + assert update_events[-1]["branch"] == BRANCH_USER + + +@pytest.mark.unit +class TestWarmProfileInvalidationHook: + """End-to-end: the wiring done in ``daemon.py`` invalidates the warm + profile entry on User / Directives writes but ignores World writes. + Re-create that wiring here so the test does not depend on daemon + start-up. + """ + + def _wire(self, dm: DialogueMemory): + relevant = {BRANCH_USER, BRANCH_DIRECTIVES} + + def cb(*, action, node_id, branch): + del action, node_id + if branch in relevant: + dm.invalidate_warm_profile() + + register_graph_mutation_listener(cb) + return cb + + def test_user_write_invalidates_warm_profile(self, graph_store): + dm = DialogueMemory() + dm.hot_cache_put(dm.WARM_PROFILE_CACHE_KEY, "stale-block") + dm.hot_cache_put("router:abc", ["webSearch"]) + cb = self._wire(dm) + try: + graph_store.create_node("Eve", "user fact", parent_id=BRANCH_USER) + finally: + unregister_graph_mutation_listener(cb) + + assert dm.hot_cache_get(dm.WARM_PROFILE_CACHE_KEY) is None + # Other cache entries are untouched. + assert dm.hot_cache_get("router:abc") == ["webSearch"] + + def test_directives_write_invalidates_warm_profile(self, graph_store): + dm = DialogueMemory() + dm.hot_cache_put(dm.WARM_PROFILE_CACHE_KEY, "stale-block") + cb = self._wire(dm) + try: + graph_store.create_node( + "be concise", "rule", parent_id=BRANCH_DIRECTIVES, + ) + finally: + unregister_graph_mutation_listener(cb) + + assert dm.hot_cache_get(dm.WARM_PROFILE_CACHE_KEY) is None + + def test_world_write_does_not_invalidate_warm_profile(self, graph_store): + dm = DialogueMemory() + dm.hot_cache_put(dm.WARM_PROFILE_CACHE_KEY, "fresh-block") + cb = self._wire(dm) + try: + graph_store.create_node( + "Paris", "world fact", parent_id=BRANCH_WORLD, + ) + finally: + unregister_graph_mutation_listener(cb) + + # World-branch writes are noise for the warm profile. + assert dm.hot_cache_get(dm.WARM_PROFILE_CACHE_KEY) == "fresh-block" From 7ecfff14f30db75b01100da0daefc1b43090d39c Mon Sep 17 00:00:00 2001 From: Baris Sencan Date: Mon, 27 Apr 2026 21:53:56 +0100 Subject: [PATCH 4/5] fix(memory): close review gaps on conversation cache + listener Addresses issues raised in review of a48be9e: - _scrub_tool_call now also scrubs list/tuple-form arguments via a recursive _scrub_args helper. Closes the privacy gap where a non-standard provider emitting positional args could leak secrets on tool re-injection. - daemon.py reads _global_dialogue_memory through the module global at fire time instead of capturing it by closure, so a future singleton swap (tests, hot reload) routes invalidation to the current instance. Tracks the registered listener so it can be unregistered on shutdown and on re-entry, preventing stale closures from accumulating in the module-level registry. - _next_ts() docstring now explains the Windows ~16ms granularity motivation and the lock requirement. - Tests: * direct unit tests for _next_ts() (consecutive monotonic, advance past artificially high _last_ts). * _resolve_branch returns None past MAX_TRAVERSAL_DEPTH and on unknown node ids. * mutation listener is NOT called when create_node raises (no spurious events on failed writes). * tool_call arguments scrubbed when passed as list of mixed scalars and dicts. Co-Authored-By: Claude Opus 4.7 --- src/jarvis/daemon.py | 62 +++++++++++++++----- src/jarvis/memory/conversation.py | 39 +++++++++--- tests/test_dialogue_memory_hot_cache.py | 31 ++++++++++ tests/test_dialogue_memory_tool_carryover.py | 30 ++++++++++ tests/test_graph_mutation_listener.py | 46 +++++++++++++++ 5 files changed, 185 insertions(+), 23 deletions(-) diff --git a/src/jarvis/daemon.py b/src/jarvis/daemon.py index ad488929..c2b39588 100644 --- a/src/jarvis/daemon.py +++ b/src/jarvis/daemon.py @@ -44,6 +44,7 @@ # Global instances for coordination between modules _global_dialogue_memory: Optional[DialogueMemory] = None _global_stop_requested: bool = False +_warm_profile_graph_listener = None # registered callback, kept for shutdown unregister _global_tts_engine = None # TTS engine reference for face animation polling _global_dictation_engine = None # Dictation engine reference for history UI @@ -362,22 +363,41 @@ def main() -> None: _wp_relevant_branches = {BRANCH_USER, BRANCH_DIRECTIVES} + # Read the DialogueMemory ref through the module global at fire + # time, not via closure capture, so a future singleton swap (tests + # or hot-reload) routes invalidation to the live instance instead + # of the freed one. def _invalidate_wp_on_graph_mutation(*, action, node_id, branch): - del action, node_id # Unused; only branch matters here. - if branch in _wp_relevant_branches and _global_dialogue_memory is not None: - try: - _global_dialogue_memory.invalidate_warm_profile() - debug_log( - f"warm profile invalidated by {branch} graph mutation", - "memory", - ) - except Exception as exc: - debug_log( - f"warm profile invalidation failed (non-fatal): {exc}", - "memory", - ) - + del action, node_id # Only the branch matters for warm-profile filtering. + if branch not in _wp_relevant_branches: + return + dm = _global_dialogue_memory + if dm is None: + return + try: + dm.invalidate_warm_profile() + debug_log( + f"warm profile invalidated by {branch} graph mutation", + "memory", + ) + except Exception as exc: + debug_log( + f"warm profile invalidation failed (non-fatal): {exc}", + "memory", + ) + + global _warm_profile_graph_listener + # If a previous run left a listener registered (re-entry without + # full process restart), drop it before installing the new one so + # the registry never accumulates stale closures. + if _warm_profile_graph_listener is not None: + try: + from .memory.graph import unregister_graph_mutation_listener + unregister_graph_mutation_listener(_warm_profile_graph_listener) + except Exception: + pass register_graph_mutation_listener(_invalidate_wp_on_graph_mutation) + _warm_profile_graph_listener = _invalidate_wp_on_graph_mutation except Exception as exc: debug_log( f"warm profile mutation listener wiring failed (non-fatal): {exc}", @@ -603,6 +623,20 @@ def stdin_monitor(): if tts is not None: tts.stop() db.close() + + # Drop the warm-profile graph listener so the module registry does + # not retain a closure pointing at this run's DialogueMemory after + # shutdown — relevant for tests and any embedder that re-runs the + # daemon in-process. + global _warm_profile_graph_listener + if _warm_profile_graph_listener is not None: + try: + from .memory.graph import unregister_graph_mutation_listener + unregister_graph_mutation_listener(_warm_profile_graph_listener) + except Exception: + pass + _warm_profile_graph_listener = None + debug_log("daemon stopped", "jarvis") print("👋 Daemon stopped", flush=True) diff --git a/src/jarvis/memory/conversation.py b/src/jarvis/memory/conversation.py index aa985bc3..1adf9e25 100644 --- a/src/jarvis/memory/conversation.py +++ b/src/jarvis/memory/conversation.py @@ -26,18 +26,29 @@ def _scrub_tool_call(tc: dict) -> dict: fn = out.get("function") if isinstance(fn, dict): fn_out = dict(fn) - args = fn_out.get("arguments") - if isinstance(args, str) and args: - fn_out["arguments"] = scrub_secrets(args) - elif isinstance(args, dict): - fn_out["arguments"] = { - k: (scrub_secrets(v) if isinstance(v, str) else v) - for k, v in args.items() - } + fn_out["arguments"] = _scrub_args(fn_out.get("arguments")) out["function"] = fn_out return out +def _scrub_args(args): + """Scrub a tool-call ``arguments`` value of secrets. + + Handles every shape we have seen across providers: JSON-encoded + strings, dict objects, and (rarely) lists/tuples of values. Anything + else passes through untouched — there is no safe way to scrub an + opaque scalar. + """ + if isinstance(args, str) and args: + return scrub_secrets(args) + if isinstance(args, dict): + return {k: _scrub_args(v) for k, v in args.items()} + if isinstance(args, (list, tuple)): + scrubbed = [_scrub_args(v) for v in args] + return type(args)(scrubbed) if isinstance(args, tuple) else scrubbed + return args + + def is_tool_message(msg: dict) -> bool: """True if a message is a tool-call request or a tool-result. @@ -179,7 +190,17 @@ def __init__(self, inactivity_timeout: float = 300.0, max_interactions: int = 20 self._last_profile: Optional[str] = None def _next_ts(self) -> float: - """Return a strictly-monotonic timestamp. Caller must hold ``_lock``.""" + """Return a strictly-monotonic timestamp. + + On Windows, ``time.time()`` has ~16ms granularity — consecutive + calls within the same tick return the identical float. That + breaks interleave ordering between text messages and tool turns + when both land in the same tick. We bump by a 1µs epsilon so + insertion order is always preserved while staying close enough + to wall-clock for ``RECENT_WINDOW_SEC`` filtering. + + Caller MUST hold ``_lock`` — ``_last_ts`` is shared mutable state. + """ now = time.time() if now <= self._last_ts: now = self._last_ts + 1e-6 diff --git a/tests/test_dialogue_memory_hot_cache.py b/tests/test_dialogue_memory_hot_cache.py index b2a4a33d..91003bd2 100644 --- a/tests/test_dialogue_memory_hot_cache.py +++ b/tests/test_dialogue_memory_hot_cache.py @@ -8,6 +8,8 @@ invalidated on demand via ``invalidate_warm_profile()``. """ +import time + import pytest from src.jarvis.memory.conversation import DialogueMemory, is_tool_message @@ -61,6 +63,35 @@ def test_put_overwrites_existing_value(self): assert dm.hot_cache_get("k") == "new" +@pytest.mark.unit +class TestNextTsMonotonic: + """``_next_ts`` exists because ``time.time()`` has ~16ms granularity + on Windows and consecutive calls can return identical values. Without + the epsilon bump, text/tool messages recorded in the same tick would + collide and break interleave ordering downstream. + """ + + def test_consecutive_calls_strictly_increase(self): + dm = DialogueMemory() + with dm._lock: + t1 = dm._next_ts() + t2 = dm._next_ts() + t3 = dm._next_ts() + assert t1 < t2 < t3 + + def test_advances_past_artificially_high_last_ts(self): + """Even if ``_last_ts`` is ahead of the wall clock (clock skew, + manual seed), the next call must still advance. + """ + dm = DialogueMemory() + future = time.time() + 100.0 + with dm._lock: + dm._last_ts = future + nxt = dm._next_ts() + assert nxt > future + assert nxt - future < 0.01 # only an epsilon bump, not a wall jump + + @pytest.mark.unit class TestToolTurnsStorageCap: def test_tool_turns_capped_to_max_storage(self): diff --git a/tests/test_dialogue_memory_tool_carryover.py b/tests/test_dialogue_memory_tool_carryover.py index abe533bc..59dc11b8 100644 --- a/tests/test_dialogue_memory_tool_carryover.py +++ b/tests/test_dialogue_memory_tool_carryover.py @@ -171,6 +171,36 @@ def test_tool_call_arguments_are_scrubbed(self): assert "alice@example.com" not in stored_args["query"] assert "[REDACTED_EMAIL]" in stored_args["query"] + def test_tool_call_arguments_list_form_is_scrubbed(self): + """Some providers / custom tools pass arguments as a list of + scalars or dicts. Each element must be scrubbed too — otherwise + a positional secret slips through. + """ + dm = DialogueMemory() + dm.record_tool_turn([ + { + "role": "assistant", + "content": "", + "tool_calls": [{ + "id": "c1", + "type": "function", + "function": { + "name": "lookup", + "arguments": [ + "alice@example.com", + {"note": "ping bob@example.com"}, + ], + }, + }], + }, + {"role": "tool", "tool_call_id": "c1", "content": "ok"}, + ]) + stored = dm._tool_turns[0][1][0]["tool_calls"][0]["function"]["arguments"] + flat = repr(stored) + assert "alice@example.com" not in flat + assert "bob@example.com" not in flat + assert flat.count("[REDACTED_EMAIL]") >= 2 + def test_tool_call_arguments_string_form_is_scrubbed(self): """Some providers serialise arguments as a JSON string, not a dict.""" dm = DialogueMemory() diff --git a/tests/test_graph_mutation_listener.py b/tests/test_graph_mutation_listener.py index 6457e369..89b8d09d 100644 --- a/tests/test_graph_mutation_listener.py +++ b/tests/test_graph_mutation_listener.py @@ -116,6 +116,52 @@ def cb(**_): unregister_graph_mutation_listener(cb) unregister_graph_mutation_listener(cb) # second remove must not raise + def test_resolve_branch_returns_none_past_depth_cap(self, graph_store): + """A chain longer than ``MAX_TRAVERSAL_DEPTH`` must terminate + rather than spin. Returns ``None`` — listener treats that as + "unknown branch" and skips invalidation. + """ + from src.jarvis.memory.graph import MAX_TRAVERSAL_DEPTH + + # Build a chain of MAX_TRAVERSAL_DEPTH + 2 nodes under user; the + # tail node should still resolve because the walk can finish + # before the cap. Then create one MORE level past the cap and + # confirm it returns None. + parent_id = BRANCH_USER + chain: list = [] + for i in range(MAX_TRAVERSAL_DEPTH + 2): + n = graph_store.create_node(f"n{i}", "deep", parent_id=parent_id) + chain.append(n) + parent_id = n.id + # The deepest node is past the cap from BRANCH_USER. + assert graph_store._resolve_branch(chain[-1].id) is None + + def test_resolve_branch_handles_unknown_node_id(self, graph_store): + """A node id that does not exist returns ``None`` rather than + raising — write paths must never crash on stale ids. + """ + assert graph_store._resolve_branch("does-not-exist") is None + + def test_listener_not_called_when_create_fails(self, graph_store): + """If ``create_node`` raises (e.g. unknown parent_id), no + mutation event should fire because no row was written. + """ + events: list = [] + + def cb(*, action, node_id, branch): + events.append({"action": action, "node_id": node_id, "branch": branch}) + + register_graph_mutation_listener(cb) + try: + with pytest.raises(ValueError): + graph_store.create_node( + "Orphan", "no parent", parent_id="missing-parent", + ) + finally: + unregister_graph_mutation_listener(cb) + + assert events == [], "no mutation should be reported for failed write" + def test_deep_descendant_resolves_to_branch(self, graph_store): """A grandchild several levels deep under user must resolve to the ``user`` branch so the listener can scope correctly even for nested From c23244c756a25db614e12b10180c528930c75066 Mon Sep 17 00:00:00 2001 From: Baris Sencan Date: Fri, 1 May 2026 18:57:31 +0100 Subject: [PATCH 5/5] fix(daemon): move global declaration to function top to fix SyntaxError Python requires all global statements to precede any use of the variable in the same function scope. The two inner global _warm_profile_graph_listener declarations inside try/finally blocks were triggering a SyntaxError on startup. Consolidate the declaration at the top of main() alongside the other module globals. Co-Authored-By: Claude Sonnet 4.6 --- src/jarvis/daemon.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/jarvis/daemon.py b/src/jarvis/daemon.py index c2b39588..4bca8b13 100644 --- a/src/jarvis/daemon.py +++ b/src/jarvis/daemon.py @@ -295,6 +295,7 @@ def on_token_handler(token: str): def main() -> None: """Main daemon entry point.""" global _global_dialogue_memory, _global_stop_requested, _global_tts_engine, _global_dictation_engine + global _warm_profile_graph_listener # Reset stop flag at start (in case of restart) _global_stop_requested = False @@ -386,7 +387,6 @@ def _invalidate_wp_on_graph_mutation(*, action, node_id, branch): "memory", ) - global _warm_profile_graph_listener # If a previous run left a listener registered (re-entry without # full process restart), drop it before installing the new one so # the registry never accumulates stale closures. @@ -628,7 +628,6 @@ def stdin_monitor(): # not retain a closure pointing at this run's DialogueMemory after # shutdown — relevant for tests and any embedder that re-runs the # daemon in-process. - global _warm_profile_graph_listener if _warm_profile_graph_listener is not None: try: from .memory.graph import unregister_graph_mutation_listener