From 2a6a6ea04ce17bc5a0ea19123aff8a946fdd149d Mon Sep 17 00:00:00 2001 From: Abi Date: Wed, 20 May 2026 11:07:11 -0700 Subject: [PATCH 1/9] refactor: record passage_id_scheme in meta.json (default "sequential") MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sub-PR 1 of 5 from the plan in #329. Purely additive — no behavior change for any caller, existing index loaders ignore the field. Writes a new `passage_id_scheme: "sequential"` field into the .meta.json produced by both build_index and build_index_from_arrays. Bumps version to "1.1" for human-inspectable schema tracking (no code reads version today, so the bump is safe). Module-level constants PASSAGE_ID_SCHEME_SEQUENTIAL / _CONTENT_HASH document the value space; the content-hash scheme itself ships in sub-PR 2. --- packages/leann-core/src/leann/api.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/packages/leann-core/src/leann/api.py b/packages/leann-core/src/leann/api.py index e22633ad..6ac629a3 100644 --- a/packages/leann-core/src/leann/api.py +++ b/packages/leann-core/src/leann/api.py @@ -30,6 +30,15 @@ logger = logging.getLogger(__name__) +# Passage ID schemes recorded in .meta.json["passage_id_scheme"]. +# - "sequential": today's default; IDs are str(insertion_index) (api.py:add_text). +# - "content-hash": planned in #329; IDs are sha256(text)[:16], stable across +# file moves and reorderings. +# Older indexes have no passage_id_scheme field — readers must default to +# "sequential" when the key is absent. See #329 for the rollout plan. +PASSAGE_ID_SCHEME_SEQUENTIAL = "sequential" +PASSAGE_ID_SCHEME_CONTENT_HASH = "content-hash" + def get_registered_backends() -> list[str]: """Get list of registered backend names.""" @@ -570,12 +579,13 @@ def build_index(self, index_path: str): builder_instance.build(embeddings, string_ids, index_path, **current_backend_kwargs) leann_meta_path = index_dir / f"{index_name}.meta.json" meta_data = { - "version": "1.0", + "version": "1.1", "backend_name": self.backend_name, "embedding_model": self.embedding_model, "dimensions": self.dimensions, "backend_kwargs": self.backend_kwargs, "embedding_mode": self.embedding_mode, + "passage_id_scheme": PASSAGE_ID_SCHEME_SEQUENTIAL, "passage_sources": [ { "type": "jsonl", @@ -714,12 +724,13 @@ def build_index_from_arrays(self, index_path: str, ids: list, embeddings: np.nda # Create metadata file leann_meta_path = index_dir / f"{index_name}.meta.json" meta_data = { - "version": "1.0", + "version": "1.1", "backend_name": self.backend_name, "embedding_model": self.embedding_model, "dimensions": self.dimensions, "backend_kwargs": self.backend_kwargs, "embedding_mode": self.embedding_mode, + "passage_id_scheme": PASSAGE_ID_SCHEME_SEQUENTIAL, "passage_sources": [ { "type": "jsonl", From e0a8525d3efd6ea9742b3ba4e0a3fe7d64cacd06 Mon Sep 17 00:00:00 2001 From: Abi Date: Wed, 20 May 2026 11:17:29 -0700 Subject: [PATCH 2/9] feat: content-hash passage IDs via --id-scheme content-hash MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sub-PR 2 of 5 from #329. Builds on #330 (which added the meta.json field). New behavior: - `LeannBuilder(..., passage_id_scheme="content-hash")` makes add_text() key passages by sha256(text)[:16] instead of insertion index. Stable across file moves, reorderings, and re-runs of the same corpus. - `leann build --id-scheme content-hash` exposes it at the CLI. - Default unchanged ("sequential"). Existing indexes continue to work identically; no migration triggered. Identical-text chunks collide (same hash). For this sub-PR the second occurrence overwrites the first in the offset map — that's the dedup behavior I'd want by default. A `--preserve-duplicates` escape hatch can land later if needed (see the open question in #329). --- packages/leann-core/src/leann/api.py | 30 +++++++++++++++++++++++++--- packages/leann-core/src/leann/cli.py | 12 +++++++++++ 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/packages/leann-core/src/leann/api.py b/packages/leann-core/src/leann/api.py index 6ac629a3..517e4e9b 100644 --- a/packages/leann-core/src/leann/api.py +++ b/packages/leann-core/src/leann/api.py @@ -385,6 +385,7 @@ def __init__( embedding_options: Optional[dict[str, Any]] = None, prebuild_bm25: bool = False, bm25_backend: str = "fts5", + passage_id_scheme: str = PASSAGE_ID_SCHEME_SEQUENTIAL, **backend_kwargs, ): if bm25_backend != "fts5": @@ -392,6 +393,16 @@ def __init__( bm25_backend = "fts5" self.bm25_backend = bm25_backend self.prebuild_bm25 = prebuild_bm25 or bm25_backend == "fts5" + if passage_id_scheme not in ( + PASSAGE_ID_SCHEME_SEQUENTIAL, + PASSAGE_ID_SCHEME_CONTENT_HASH, + ): + raise ValueError( + f"Unknown passage_id_scheme: {passage_id_scheme!r}. " + f"Expected one of: {PASSAGE_ID_SCHEME_SEQUENTIAL!r}, " + f"{PASSAGE_ID_SCHEME_CONTENT_HASH!r}." + ) + self.passage_id_scheme = passage_id_scheme self.backend_name = backend_name # Normalize incompatible combinations early (for consistent metadata) if backend_name == "hnsw": @@ -486,10 +497,23 @@ def __init__( self.backend_kwargs = backend_kwargs self.chunks: list[dict[str, Any]] = [] + def _generate_passage_id(self, text: str) -> str: + """Generate a passage ID per the configured scheme. + + sequential: str(insertion index) — fast, position-dependent, current default. + content-hash: sha256(text)[:16] — content-stable, dedup-friendly across + file moves and reorderings. See #329 for the design. + """ + if self.passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH: + import hashlib + + return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] + return str(len(self.chunks)) + def add_text(self, text: str, metadata: Optional[dict[str, Any]] = None): if metadata is None: metadata = {} - passage_id = metadata.get("id", str(len(self.chunks))) + passage_id = metadata.get("id") or self._generate_passage_id(text) chunk_data = {"id": passage_id, "text": text, "metadata": metadata} self.chunks.append(chunk_data) @@ -585,7 +609,7 @@ def build_index(self, index_path: str): "dimensions": self.dimensions, "backend_kwargs": self.backend_kwargs, "embedding_mode": self.embedding_mode, - "passage_id_scheme": PASSAGE_ID_SCHEME_SEQUENTIAL, + "passage_id_scheme": self.passage_id_scheme, "passage_sources": [ { "type": "jsonl", @@ -730,7 +754,7 @@ def build_index_from_arrays(self, index_path: str, ids: list, embeddings: np.nda "dimensions": self.dimensions, "backend_kwargs": self.backend_kwargs, "embedding_mode": self.embedding_mode, - "passage_id_scheme": PASSAGE_ID_SCHEME_SEQUENTIAL, + "passage_id_scheme": self.passage_id_scheme, "passage_sources": [ { "type": "jsonl", diff --git a/packages/leann-core/src/leann/cli.py b/packages/leann-core/src/leann/cli.py index 0d50cf34..c07b9510 100644 --- a/packages/leann-core/src/leann/cli.py +++ b/packages/leann-core/src/leann/cli.py @@ -347,6 +347,16 @@ def create_parser(self) -> argparse.ArgumentParser: default=True, help="Fall back to traditional chunking if AST chunking fails (default: True)", ) + build_parser.add_argument( + "--id-scheme", + choices=["sequential", "content-hash"], + default="sequential", + help=( + "How passage IDs are assigned. 'sequential' (default) keys by insertion " + "order; 'content-hash' uses sha256(text)[:16], stable across file moves " + "and reorderings. See #329." + ), + ) # Watch command watch_parser = subparsers.add_parser( @@ -1891,6 +1901,7 @@ def _make_incremental_builder(self, args) -> "LeannBuilder": is_compact=args.compact, is_recompute=args.recompute, num_threads=args.num_threads, + passage_id_scheme=getattr(args, "id_scheme", "sequential"), ) def _incremental_add_only( @@ -2384,6 +2395,7 @@ async def build_index(self, args): is_compact=args.compact, is_recompute=args.recompute, num_threads=args.num_threads, + passage_id_scheme=getattr(args, "id_scheme", "sequential"), ) for chunk in all_texts: From e1fa6fa7c06931a251885010dbed20e22f662dc6 Mon Sep 17 00:00:00 2001 From: Abi Date: Wed, 20 May 2026 11:54:15 -0700 Subject: [PATCH 3/9] feat: incremental updates honor existing index's passage_id_scheme MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sub-PR 3 of 5 from #329. Builds on #330 / #331. Two changes: 1. `LeannCLI._make_incremental_builder` now reads the existing index's `passage_id_scheme` from meta.json and uses that, ignoring any conflicting `--id-scheme` on the args (with a note printed). Otherwise an update command on a content-hash index would mix sequential IDs into a hash-keyed passages.jsonl and break lookups. 2. `LeannSearcher` exposes `self.passage_id_scheme` so consumers can introspect; defaults to "sequential" for older indexes that don't record it (pre-#330). No behavior change for fresh builds — the CLI's --id-scheme still controls which scheme a brand-new index gets. --- packages/leann-core/src/leann/api.py | 6 ++++++ packages/leann-core/src/leann/cli.py | 27 ++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/packages/leann-core/src/leann/api.py b/packages/leann-core/src/leann/api.py index 517e4e9b..b5ee65de 100644 --- a/packages/leann-core/src/leann/api.py +++ b/packages/leann-core/src/leann/api.py @@ -1212,6 +1212,12 @@ def __init__( ) self.bm25_scorer: Optional[BM25Index] = None + # Surface the index's passage ID scheme so callers can introspect. + # Older indexes (pre-#330) don't record this field — they're sequential. + self.passage_id_scheme: str = self.meta_data.get( + "passage_id_scheme", PASSAGE_ID_SCHEME_SEQUENTIAL + ) + # Optional one-shot warmup at construction time to hide cold-start latency. if self._warmup: self.warmup() diff --git a/packages/leann-core/src/leann/cli.py b/packages/leann-core/src/leann/cli.py index c07b9510..ce10be49 100644 --- a/packages/leann-core/src/leann/cli.py +++ b/packages/leann-core/src/leann/cli.py @@ -1890,7 +1890,32 @@ def _chunks_for_paths(self, all_texts: list[dict], paths: set[str]) -> list[dict in paths ] + def _existing_index_id_scheme(self, index_path: str) -> Optional[str]: + """Return the passage_id_scheme recorded in an existing index's meta.json. + + Returns None when the index doesn't exist yet or the field isn't + recorded (older indexes pre-#330). Callers should treat None as + "fall back to whatever the args say or the default". + """ + meta_path = Path(index_path).with_suffix(".leann.meta.json") + if not meta_path.exists(): + return None + try: + with open(meta_path, encoding="utf-8") as f: + return json.load(f).get("passage_id_scheme") + except Exception: + return None + def _make_incremental_builder(self, args) -> "LeannBuilder": + # For incremental updates, the existing index's scheme wins. Otherwise + # IDs would mix schemes within one index, which breaks lookups. + existing_scheme = self._existing_index_id_scheme(self.get_index_path(args.index_name)) + scheme = existing_scheme or getattr(args, "id_scheme", "sequential") + if existing_scheme and getattr(args, "id_scheme", existing_scheme) != existing_scheme: + print( + f"Note: --id-scheme={args.id_scheme} ignored — index '{args.index_name}' " + f"was built with passage_id_scheme={existing_scheme!r}, keeping that." + ) return LeannBuilder( backend_name=args.backend_name, embedding_model=args.embedding_model, @@ -1901,7 +1926,7 @@ def _make_incremental_builder(self, args) -> "LeannBuilder": is_compact=args.compact, is_recompute=args.recompute, num_threads=args.num_threads, - passage_id_scheme=getattr(args, "id_scheme", "sequential"), + passage_id_scheme=scheme, ) def _incremental_add_only( From fcdfe1c11a73d73395d050a31dbd55489af3dcc9 Mon Sep 17 00:00:00 2001 From: Abi Date: Wed, 20 May 2026 11:57:04 -0700 Subject: [PATCH 4/9] feat(cli): add `leann migrate-ids` to convert existing index to content-hash MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sub-PR 4 of 5 from #329. Builds on the reader-side support in sub-PR 3. `leann migrate-ids ` rewrites an existing sequential-ID index to use sha256(text)[:16] keys. Purely a Python-side rewrite — FAISS labels stay valid, only the string IDs they map to change. Four artifacts get rewritten atomically (via .migrate sibling files + shutil.move): - .passages.jsonl : new "id" field per line - .passages.idx : new offset map keyed by new IDs (deduplicates collisions) - .ids.txt : new label → ID mapping (preserves FAISS label count) - .meta.json : passage_id_scheme = "content-hash", version = "1.1" Collisions (identical-text chunks) get deduped — the later occurrence wins in the offset map. ids.txt still has one line per FAISS label so the graph keeps working; duplicate labels just point to the same passage. Interactive confirmation required unless --yes is passed. --dry-run reports the collision count without writing anything. --- packages/leann-core/src/leann/cli.py | 143 +++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) diff --git a/packages/leann-core/src/leann/cli.py b/packages/leann-core/src/leann/cli.py index ce10be49..15abd093 100644 --- a/packages/leann-core/src/leann/cli.py +++ b/packages/leann-core/src/leann/cli.py @@ -381,6 +381,26 @@ def create_parser(self) -> argparse.ArgumentParser: help="Report changes without rebuilding (original watch behavior)", ) + migrate_parser = subparsers.add_parser( + "migrate-ids", + help=( + "Rewrite an existing index's passage IDs to content-hash form. " + "Irreversible; back up the index first." + ), + ) + migrate_parser.add_argument("index_name", help="Index name") + migrate_parser.add_argument( + "--dry-run", + action="store_true", + help="Show what would change without writing anything.", + ) + migrate_parser.add_argument( + "-y", + "--yes", + action="store_true", + help="Skip the interactive confirmation prompt.", + ) + # Search command search_parser = subparsers.add_parser("search", help="Search documents") search_parser.add_argument("index_name", help="Index name") @@ -2541,6 +2561,127 @@ async def _watch_trigger_build(self, index_name: str) -> None: build_args = parser.parse_args(build_args_list) await self.build_index(build_args) + def migrate_ids(self, args) -> None: + """Rewrite an existing index's passage IDs to content-hash form. + + Migration is purely a Python-side rewrite — the vector graph isn't + touched, so FAISS labels stay valid. What gets rewritten: + - .passages.jsonl : new IDs in each line's "id" field + - .passages.idx : new offset map keyed by new IDs + - .ids.txt : new label → ID mapping for FAISS + - .meta.json : passage_id_scheme = "content-hash" + + Identical-text chunks collide on the same sha256 prefix; the later + occurrence wins in the offset map (dedup). A `--preserve-duplicates` + knob to suffix collisions can land separately. + + Irreversible. Prompts for confirmation unless --yes is passed. + """ + import hashlib + import shutil + + index_name = args.index_name + index_path = self._resolve_index_path(index_name, purpose="migrate") + if not index_path: + return + meta_path = Path(index_path).with_suffix(".leann.meta.json") + if not meta_path.exists(): + print(f"Cannot migrate '{index_name}': metadata missing at {meta_path}.") + return + with open(meta_path, encoding="utf-8") as f: + meta = json.load(f) + current_scheme = meta.get("passage_id_scheme", "sequential") + if current_scheme == "content-hash": + print(f"Index '{index_name}' already uses content-hash IDs. Nothing to do.") + return + + # Locate the sibling artifacts using the same conventions as build_index. + index_dir = Path(index_path).parent + index_base = Path(index_path).name + passages_file = index_dir / f"{index_base}.passages.jsonl" + offset_file = index_dir / f"{index_base}.passages.idx" + base_no_leann = ( + index_base[: -len(".leann")] if index_base.endswith(".leann") else index_base + ) + idmap_file = index_dir / f"{base_no_leann}.ids.txt" + + for p in (passages_file, offset_file): + if not p.exists(): + print(f"Cannot migrate: required artifact missing at {p}.") + return + + # Stream the passages to plan the rewrite and surface collision count + # before committing to anything irreversible. + old_ids: list[str] = [] + new_ids: list[str] = [] + with open(passages_file, encoding="utf-8") as f: + for line in f: + if not line.strip(): + continue + data = json.loads(line) + old_ids.append(data["id"]) + new_ids.append(hashlib.sha256(data["text"].encode("utf-8")).hexdigest()[:16]) + + unique_new = len(set(new_ids)) + collisions = len(new_ids) - unique_new + print( + f"Migrating '{index_name}': {len(new_ids)} passages → {unique_new} unique " + f"content-hash IDs ({collisions} collision(s) will dedup)." + ) + + if args.dry_run: + print("(dry-run; not writing anything)") + return + if not args.yes: + confirm = input( + "Proceed? This rewrites passages.jsonl, .idx, .ids.txt, .meta.json. [y/N] " + ) + if confirm.strip().lower() not in ("y", "yes"): + print("Aborted.") + return + + # Stage writes into siblings, then atomically rename. + new_passages = passages_file.with_suffix(passages_file.suffix + ".migrate") + new_offsets: dict[str, int] = {} + with ( + open(passages_file, encoding="utf-8") as src, + open(new_passages, "w", encoding="utf-8") as dst, + ): + idx = 0 + for line in src: + if not line.strip(): + continue + data = json.loads(line) + data["id"] = new_ids[idx] + offset = dst.tell() + json.dump(data, dst, ensure_ascii=False) + dst.write("\n") + new_offsets[new_ids[idx]] = offset + idx += 1 + + new_idx = offset_file.with_suffix(offset_file.suffix + ".migrate") + with open(new_idx, "wb") as f: + pickle.dump(new_offsets, f) + + if idmap_file.exists(): + new_idmap = idmap_file.with_suffix(idmap_file.suffix + ".migrate") + with open(new_idmap, "w", encoding="utf-8") as f: + for nid in new_ids: + f.write(nid + "\n") + shutil.move(str(new_idmap), str(idmap_file)) + + shutil.move(str(new_passages), str(passages_file)) + shutil.move(str(new_idx), str(offset_file)) + + meta["passage_id_scheme"] = "content-hash" + meta["version"] = "1.1" + with open(meta_path, "w", encoding="utf-8") as f: + json.dump(meta, f, indent=2) + + print( + f"✓ Migrated '{index_name}' to content-hash IDs. {collisions} collisions were deduped." + ) + async def watch_index(self, args): index_name = args.index_name resolved = self._resolve_index_for_watch(index_name) @@ -3265,6 +3406,8 @@ async def run(self, args=None): await self.build_index(args) elif args.command == "watch": await self.watch_index(args) + elif args.command == "migrate-ids": + self.migrate_ids(args) elif args.command == "search": with suppress_cpp_output(suppress): await self.search_documents(args) From 1ec660c33b37eb4ee1548e49d7e52aedec033b7b Mon Sep 17 00:00:00 2001 From: Abi Date: Tue, 2 Jun 2026 19:19:14 -0700 Subject: [PATCH 5/9] fix: harden migrate id publishing --- packages/leann-core/src/leann/cli.py | 135 ++++++++++++++++++++++++--- tests/test_migrate_ids.py | 70 ++++++++++++++ tests/test_rebuild_cli.py | 95 +++++++++++++++++++ 3 files changed, 285 insertions(+), 15 deletions(-) create mode 100644 tests/test_migrate_ids.py create mode 100644 tests/test_rebuild_cli.py diff --git a/packages/leann-core/src/leann/cli.py b/packages/leann-core/src/leann/cli.py index 15abd093..50bf941b 100644 --- a/packages/leann-core/src/leann/cli.py +++ b/packages/leann-core/src/leann/cli.py @@ -38,6 +38,83 @@ def _normalize_path(path: str) -> str: return str(Path(path).resolve()) +def _replace_files_with_rollback(replacements: list[tuple[Path, Path]]) -> None: + """Replace target files with staged files, restoring originals on failure.""" + token = uuid.uuid4().hex + backups: list[tuple[Path, Optional[Path]]] = [] + placed_targets: list[Path] = [] + try: + for staged_path, target_path in replacements: + backup_path: Optional[Path] = None + if target_path.exists(): + backup_path = target_path.with_name(f".{target_path.name}.backup-{token}") + os.replace(target_path, backup_path) + backups.append((target_path, backup_path)) + try: + os.replace(staged_path, target_path) + except Exception: + if backup_path is not None and backup_path.exists(): + os.replace(backup_path, target_path) + backups.pop() + raise + placed_targets.append(target_path) + except Exception: + for target_path in reversed(placed_targets): + if target_path.exists(): + target_path.unlink() + for target_path, backup_path in reversed(backups): + if backup_path is not None and backup_path.exists(): + os.replace(backup_path, target_path) + raise + else: + for _target_path, backup_path in backups: + if backup_path is not None and backup_path.exists(): + backup_path.unlink() + finally: + for staged_path, _target_path in replacements: + if staged_path.exists(): + staged_path.unlink() + + +def _cleanup_path(path: Path) -> None: + if path.is_dir(): + import shutil + + shutil.rmtree(path) + elif path.exists(): + path.unlink() + + +def _existing_index_artifacts(index_dir: Path) -> bool: + return ( + (index_dir / "documents.leann.meta.json").exists() + and (index_dir / "documents.leann.passages.jsonl").exists() + and (index_dir / "documents.leann.passages.idx").exists() + ) + + +def _publish_rebuilt_index(staging_dir: Path, index_dir: Path) -> None: + """Publish a staged full rebuild, restoring the previous directory if publish fails.""" + backup_dir = index_dir.with_name(f".{index_dir.name}.backup-{uuid.uuid4().hex}") + live_moved = False + published = False + try: + if index_dir.exists(): + os.replace(index_dir, backup_dir) + live_moved = True + os.replace(staging_dir, index_dir) + published = True + except Exception: + if published: + _cleanup_path(index_dir) + if live_moved and backup_dir.exists(): + os.replace(backup_dir, index_dir) + raise + else: + if backup_dir.exists(): + _cleanup_path(backup_dir) + + @contextlib.contextmanager def suppress_cpp_output(suppress: bool = True): """Context manager to suppress C++ stdout/stderr output from FAISS/HNSW @@ -2429,6 +2506,12 @@ async def build_index(self, args): return print(f"Building index '{index_name}' with {args.backend_name} backend...") + publish_from_staging = _existing_index_artifacts(index_dir) + target_index_dir = index_dir + target_index_path = index_path + if publish_from_staging: + target_index_dir = index_dir.with_name(f".{index_dir.name}.rebuild-{uuid.uuid4().hex}") + target_index_path = str(target_index_dir / "documents.leann") builder = LeannBuilder( backend_name=args.backend_name, @@ -2446,15 +2529,34 @@ async def build_index(self, args): for chunk in all_texts: builder.add_text(chunk["text"], metadata=chunk["metadata"]) - builder.build_index(index_path) - for fs in synchronizers: - fs.create_snapshot() - self._write_sync_config( - index_dir, - self._resolve_sync_roots(docs_paths), - self._parse_file_types(args.file_types), - self._sync_ignore_patterns(args.include_hidden), - ) + try: + builder.build_index(target_index_path) + target_synchronizers = ( + self._build_synchronizers( + docs_paths, + target_index_dir, + file_types=args.file_types, + include_hidden=args.include_hidden, + ) + if publish_from_staging + else synchronizers + ) + for fs in target_synchronizers: + fs.create_snapshot() + self._write_sync_config( + target_index_dir, + self._resolve_sync_roots(docs_paths), + self._parse_file_types(args.file_types), + self._sync_ignore_patterns(args.include_hidden), + ) + if publish_from_staging: + _publish_rebuilt_index(target_index_dir, index_dir) + except Exception: + if publish_from_staging and target_index_dir.exists(): + import shutil + + shutil.rmtree(target_index_dir) + raise print(f"Index built at {index_path}") self.register_project_dir() @@ -2578,7 +2680,6 @@ def migrate_ids(self, args) -> None: Irreversible. Prompts for confirmation unless --yes is passed. """ import hashlib - import shutil index_name = args.index_name index_path = self._resolve_index_path(index_name, purpose="migrate") @@ -2663,20 +2764,24 @@ def migrate_ids(self, args) -> None: with open(new_idx, "wb") as f: pickle.dump(new_offsets, f) + replacements = [ + (new_passages, passages_file), + (new_idx, offset_file), + ] if idmap_file.exists(): new_idmap = idmap_file.with_suffix(idmap_file.suffix + ".migrate") with open(new_idmap, "w", encoding="utf-8") as f: for nid in new_ids: f.write(nid + "\n") - shutil.move(str(new_idmap), str(idmap_file)) - - shutil.move(str(new_passages), str(passages_file)) - shutil.move(str(new_idx), str(offset_file)) + replacements.append((new_idmap, idmap_file)) meta["passage_id_scheme"] = "content-hash" meta["version"] = "1.1" - with open(meta_path, "w", encoding="utf-8") as f: + new_meta = meta_path.with_suffix(meta_path.suffix + ".migrate") + with open(new_meta, "w", encoding="utf-8") as f: json.dump(meta, f, indent=2) + replacements.append((new_meta, meta_path)) + _replace_files_with_rollback(replacements) print( f"✓ Migrated '{index_name}' to content-hash IDs. {collisions} collisions were deduped." diff --git a/tests/test_migrate_ids.py b/tests/test_migrate_ids.py new file mode 100644 index 00000000..a58d2b8b --- /dev/null +++ b/tests/test_migrate_ids.py @@ -0,0 +1,70 @@ +import json +import os +import pickle +from pathlib import Path +from types import SimpleNamespace + +import pytest +from leann.cli import LeannCLI + + +def test_migrate_ids_rolls_back_when_publish_fails(tmp_path, monkeypatch): + index_dir = tmp_path / ".leann" / "indexes" / "sample" + index_dir.mkdir(parents=True) + index_path = index_dir / "documents.leann" + + passages = [ + {"id": "0", "text": "alpha beta", "metadata": {"source": "a.txt"}}, + {"id": "1", "text": "gamma delta", "metadata": {"source": "b.txt"}}, + ] + passages_file = index_dir / "documents.leann.passages.jsonl" + offsets = {} + with open(passages_file, "w", encoding="utf-8") as f: + for passage in passages: + offsets[passage["id"]] = f.tell() + json.dump(passage, f) + f.write("\n") + + offset_file = index_dir / "documents.leann.passages.idx" + with open(offset_file, "wb") as f: + pickle.dump(offsets, f) + + idmap_file = index_dir / "documents.ids.txt" + idmap_file.write_text("0\n1\n", encoding="utf-8") + meta_path = index_dir / "documents.leann.meta.json" + meta_path.write_text( + json.dumps( + { + "version": "1.0", + "backend_name": "hnsw", + "embedding_model": "dummy", + "dimensions": 3, + "backend_kwargs": {}, + "embedding_mode": "sentence-transformers", + "passage_id_scheme": "sequential", + } + ), + encoding="utf-8", + ) + original_bytes = { + path: path.read_bytes() for path in (passages_file, offset_file, idmap_file, meta_path) + } + real_replace = os.replace + + def fail_on_meta_publish(src, dst): + if Path(dst) == meta_path and str(src).endswith(".migrate"): + raise OSError("simulated publish failure") + real_replace(src, dst) + + cli = LeannCLI() + monkeypatch.setattr(cli, "_resolve_index_path", lambda *_args, **_kwargs: str(index_path)) + monkeypatch.setattr("leann.cli.os.replace", fail_on_meta_publish) + + with pytest.raises(OSError, match="simulated publish failure"): + cli.migrate_ids(SimpleNamespace(index_name="sample", dry_run=False, yes=True)) + + assert { + path: path.read_bytes() for path in (passages_file, offset_file, idmap_file, meta_path) + } == original_bytes + assert not list(index_dir.glob("*.migrate")) + assert not list(index_dir.glob(".*.backup-*")) diff --git a/tests/test_rebuild_cli.py b/tests/test_rebuild_cli.py new file mode 100644 index 00000000..38e50211 --- /dev/null +++ b/tests/test_rebuild_cli.py @@ -0,0 +1,95 @@ +import asyncio +import json +import pickle +from pathlib import Path + +import pytest +from leann.cli import LeannCLI + + +def test_full_rebuild_failure_preserves_existing_index(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + docs_root = tmp_path / "docs" + docs_root.mkdir() + docs_file = docs_root / "only.md" + docs_file.write_text("hello", encoding="utf-8") + + cli = LeannCLI() + index_dir = tmp_path / ".leann" / "indexes" / "sample" + index_dir.mkdir(parents=True) + meta_path = index_dir / "documents.leann.meta.json" + meta_path.write_text( + json.dumps( + { + "backend_name": "hnsw", + "embedding_model": "facebook/contriever", + "embedding_mode": "sentence-transformers", + "backend_kwargs": {"is_compact": False, "is_recompute": True}, + "passage_id_scheme": "sequential", + } + ), + encoding="utf-8", + ) + passages_file = index_dir / "documents.leann.passages.jsonl" + passages_file.write_text( + json.dumps({"id": "old", "text": "old text", "metadata": {}}) + "\n", + encoding="utf-8", + ) + offset_file = index_dir / "documents.leann.passages.idx" + with open(offset_file, "wb") as f: + pickle.dump({"old": 0}, f) + original_artifacts = { + path: path.read_bytes() for path in (meta_path, passages_file, offset_file) + } + args = cli.create_parser().parse_args( + [ + "build", + "sample", + "--docs", + str(docs_file), + "--backend-name", + "hnsw", + "--force", + ] + ) + built_paths: list[str] = [] + + class FakeSynchronizer: + def create_snapshot(self): + pass + + class FailingBuilder: + def __init__(self, **_kwargs): + pass + + def add_text(self, _text, metadata=None): + pass + + def build_index(self, index_path): + built_paths.append(index_path) + target_dir = Path(index_path).parent + target_dir.mkdir(parents=True, exist_ok=True) + (target_dir / "documents.leann.passages.jsonl").write_text( + json.dumps({"id": "new", "text": "new text", "metadata": {}}) + "\n", + encoding="utf-8", + ) + raise RuntimeError("simulated build failure") + + monkeypatch.setattr(cli, "_build_synchronizers", lambda *_args, **_kwargs: [FakeSynchronizer()]) + monkeypatch.setattr( + cli, + "load_documents", + lambda *_args, **_kwargs: [{"text": "rebuilt", "metadata": {"file_path": str(docs_file)}}], + ) + monkeypatch.setattr(cli, "register_project_dir", lambda: None) + monkeypatch.setattr("leann.cli.LeannBuilder", FailingBuilder) + + with pytest.raises(RuntimeError, match="simulated build failure"): + asyncio.run(cli.build_index(args)) + + assert built_paths + assert built_paths[0] != str(index_dir / "documents.leann") + assert { + path: path.read_bytes() for path in (meta_path, passages_file, offset_file) + } == original_artifacts + assert not list(index_dir.parent.glob(".sample.rebuild-*")) From 600e066a86eb9ba9a19c1be7f9d1471f4fd4dafb Mon Sep 17 00:00:00 2001 From: Abi Date: Tue, 2 Jun 2026 19:32:42 -0700 Subject: [PATCH 6/9] fix: harden migrate ids conversion --- packages/leann-core/src/leann/api.py | 24 ++- packages/leann-core/src/leann/cli.py | 126 ++++++++++--- packages/leann-core/src/leann/index_lock.py | 86 +++++++++ tests/test_migrate_ids.py | 190 ++++++++++++++++++++ 4 files changed, 393 insertions(+), 33 deletions(-) create mode 100644 packages/leann-core/src/leann/index_lock.py diff --git a/packages/leann-core/src/leann/api.py b/packages/leann-core/src/leann/api.py index b5ee65de..c3c7d225 100644 --- a/packages/leann-core/src/leann/api.py +++ b/packages/leann-core/src/leann/api.py @@ -497,23 +497,37 @@ def __init__( self.backend_kwargs = backend_kwargs self.chunks: list[dict[str, Any]] = [] - def _generate_passage_id(self, text: str) -> str: + @staticmethod + def _make_unique_passage_id(base_id: str, reserved_ids: set[str]) -> str: + if base_id not in reserved_ids: + return base_id + suffix = 1 + while f"{base_id}-{suffix}" in reserved_ids: + suffix += 1 + return f"{base_id}-{suffix}" + + def _generate_passage_id(self, text: str, reserved_ids: Optional[set[str]] = None) -> str: """Generate a passage ID per the configured scheme. sequential: str(insertion index) — fast, position-dependent, current default. - content-hash: sha256(text)[:16] — content-stable, dedup-friendly across - file moves and reorderings. See #329 for the design. + content-hash: sha256(text)[:16] — content-stable across file moves and + reorderings when text is unique. Duplicate text receives a numeric suffix + so the passage store, ID maps, and vector labels stay one-to-one. """ if self.passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH: import hashlib - return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] + base_id = hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] + return self._make_unique_passage_id(base_id, reserved_ids or set()) return str(len(self.chunks)) def add_text(self, text: str, metadata: Optional[dict[str, Any]] = None): if metadata is None: metadata = {} - passage_id = metadata.get("id") or self._generate_passage_id(text) + if "id" in metadata and metadata["id"] is not None: + passage_id = str(metadata["id"]) + else: + passage_id = self._generate_passage_id(text, {chunk["id"] for chunk in self.chunks}) chunk_data = {"id": passage_id, "text": text, "metadata": metadata} self.chunks.append(chunk_data) diff --git a/packages/leann-core/src/leann/cli.py b/packages/leann-core/src/leann/cli.py index 50bf941b..03a54b2e 100644 --- a/packages/leann-core/src/leann/cli.py +++ b/packages/leann-core/src/leann/cli.py @@ -18,6 +18,7 @@ from .api import LeannBuilder, LeannChat, LeannSearcher from .embedding_server_manager import EmbeddingServerManager +from .index_lock import index_write_lock from .interactive_utils import create_cli_session from .registry import register_project_directory from .settings import ( @@ -85,6 +86,28 @@ def _cleanup_path(path: Path) -> None: path.unlink() +def _build_fts5_bm25_db(db_path: Path, passages: list[dict[str, Any]]) -> None: + """Build the persisted FTS5 BM25 sidecar used by newer LEANN indexes.""" + import sqlite3 + + if db_path.exists(): + db_path.unlink() + conn = sqlite3.connect(db_path) + try: + conn.execute( + "CREATE VIRTUAL TABLE bm25_passages USING fts5(" + "id UNINDEXED, text, tokenize='unicode61 remove_diacritics 2'" + ")" + ) + conn.executemany( + "INSERT INTO bm25_passages(id, text) VALUES (?, ?)", + ((str(passage["id"]), passage.get("text", "")) for passage in passages), + ) + conn.commit() + finally: + conn.close() + + def _existing_index_artifacts(index_dir: Path) -> bool: return ( (index_dir / "documents.leann.meta.json").exists() @@ -2664,6 +2687,14 @@ async def _watch_trigger_build(self, index_name: str) -> None: await self.build_index(build_args) def migrate_ids(self, args) -> None: + index_name = args.index_name + index_path = self._resolve_index_path(index_name, purpose="migrate") + if not index_path: + return + with index_write_lock(Path(index_path).parent): + self._migrate_ids_unlocked(args, index_path=index_path) + + def _migrate_ids_unlocked(self, args, index_path: str) -> None: """Rewrite an existing index's passage IDs to content-hash form. Migration is purely a Python-side rewrite — the vector graph isn't @@ -2673,24 +2704,24 @@ def migrate_ids(self, args) -> None: - .ids.txt : new label → ID mapping for FAISS - .meta.json : passage_id_scheme = "content-hash" - Identical-text chunks collide on the same sha256 prefix; the later - occurrence wins in the offset map (dedup). A `--preserve-duplicates` - knob to suffix collisions can land separately. + Identical-text chunks collide on the same sha256 prefix; later + occurrences receive numeric suffixes so all passage IDs remain unique. Irreversible. Prompts for confirmation unless --yes is passed. """ - import hashlib - index_name = args.index_name - index_path = self._resolve_index_path(index_name, purpose="migrate") - if not index_path: - return meta_path = Path(index_path).with_suffix(".leann.meta.json") if not meta_path.exists(): print(f"Cannot migrate '{index_name}': metadata missing at {meta_path}.") return with open(meta_path, encoding="utf-8") as f: meta = json.load(f) + if meta.get("backend_name") == "diskann": + print( + f"Cannot migrate '{index_name}': DiskANN does not yet support content-hash " + "passage IDs because searches return numeric labels without a persisted ID map." + ) + return current_scheme = meta.get("passage_id_scheme", "sequential") if current_scheme == "content-hash": print(f"Index '{index_name}' already uses content-hash IDs. Nothing to do.") @@ -2711,23 +2742,41 @@ def migrate_ids(self, args) -> None: print(f"Cannot migrate: required artifact missing at {p}.") return - # Stream the passages to plan the rewrite and surface collision count - # before committing to anything irreversible. - old_ids: list[str] = [] - new_ids: list[str] = [] + with open(offset_file, "rb") as f: + live_offset_map: dict[str, int] = pickle.load(f) + + live_passages: list[dict[str, Any]] = [] with open(passages_file, encoding="utf-8") as f: - for line in f: + for old_id, offset in sorted(live_offset_map.items(), key=lambda item: item[1]): + f.seek(offset) + line = f.readline() if not line.strip(): continue data = json.loads(line) - old_ids.append(data["id"]) - new_ids.append(hashlib.sha256(data["text"].encode("utf-8")).hexdigest()[:16]) + data["id"] = str(old_id) + live_passages.append(data) + + # Plan the live-passage rewrite and surface duplicate suffix count + # before committing to anything irreversible. Stale append-only JSONL + # rows absent from passages.idx stay excluded. + old_ids: list[str] = [] + new_ids: list[str] = [] + reserved_ids: set[str] = set() + duplicate_suffixes = 0 + for data in live_passages: + base_id = hashlib.sha256(data["text"].encode("utf-8")).hexdigest()[:16] + new_id = LeannBuilder._make_unique_passage_id(base_id, reserved_ids) + if new_id != base_id: + duplicate_suffixes += 1 + reserved_ids.add(new_id) + old_ids.append(str(data["id"])) + new_ids.append(new_id) + old_to_new = dict(zip(old_ids, new_ids)) unique_new = len(set(new_ids)) - collisions = len(new_ids) - unique_new print( f"Migrating '{index_name}': {len(new_ids)} passages → {unique_new} unique " - f"content-hash IDs ({collisions} collision(s) will dedup)." + f"content-hash IDs ({duplicate_suffixes} duplicate-text suffix(es))." ) if args.dry_run: @@ -2744,21 +2793,16 @@ def migrate_ids(self, args) -> None: # Stage writes into siblings, then atomically rename. new_passages = passages_file.with_suffix(passages_file.suffix + ".migrate") new_offsets: dict[str, int] = {} - with ( - open(passages_file, encoding="utf-8") as src, - open(new_passages, "w", encoding="utf-8") as dst, - ): - idx = 0 - for line in src: - if not line.strip(): - continue - data = json.loads(line) + rewritten_passages: list[dict[str, Any]] = [] + with open(new_passages, "w", encoding="utf-8") as dst: + for idx, data in enumerate(live_passages): + data = dict(data) data["id"] = new_ids[idx] offset = dst.tell() json.dump(data, dst, ensure_ascii=False) dst.write("\n") new_offsets[new_ids[idx]] = offset - idx += 1 + rewritten_passages.append(data) new_idx = offset_file.with_suffix(offset_file.suffix + ".migrate") with open(new_idx, "wb") as f: @@ -2775,6 +2819,31 @@ def migrate_ids(self, args) -> None: f.write(nid + "\n") replacements.append((new_idmap, idmap_file)) + ivf_map_file = index_dir / f"{base_no_leann}.ivf_id_map.json" + if meta.get("backend_name") == "ivf" and ivf_map_file.exists(): + new_ivf_map = ivf_map_file.with_suffix(ivf_map_file.suffix + ".migrate") + with open(ivf_map_file, encoding="utf-8") as f: + ivf_map = json.load(f) + migrated_id_to_passage = { + str(label): old_to_new.get(str(old_pid), str(old_pid)) + for label, old_pid in (ivf_map.get("id_to_passage") or {}).items() + } + ivf_map["id_to_passage"] = migrated_id_to_passage + ivf_map["passage_to_id"] = { + passage_id: int(label) for label, passage_id in migrated_id_to_passage.items() + } + with open(new_ivf_map, "w", encoding="utf-8") as f: + json.dump(ivf_map, f, indent=2) + replacements.append((new_ivf_map, ivf_map_file)) + + bm25_db_name = meta.get("bm25_db") or f"{index_base}.bm25.sqlite" + bm25_file = index_dir / bm25_db_name + if meta.get("bm25_backend") == "fts5" and bm25_file.exists(): + new_bm25 = bm25_file.with_suffix(bm25_file.suffix + ".migrate") + _build_fts5_bm25_db(new_bm25, rewritten_passages) + replacements.append((new_bm25, bm25_file)) + meta["bm25_db"] = bm25_db_name + meta["passage_id_scheme"] = "content-hash" meta["version"] = "1.1" new_meta = meta_path.with_suffix(meta_path.suffix + ".migrate") @@ -2784,7 +2853,8 @@ def migrate_ids(self, args) -> None: _replace_files_with_rollback(replacements) print( - f"✓ Migrated '{index_name}' to content-hash IDs. {collisions} collisions were deduped." + f"✓ Migrated '{index_name}' to content-hash IDs. " + f"{duplicate_suffixes} duplicate-text suffix(es) were added." ) async def watch_index(self, args): diff --git a/packages/leann-core/src/leann/index_lock.py b/packages/leann-core/src/leann/index_lock.py new file mode 100644 index 00000000..15012e8f --- /dev/null +++ b/packages/leann-core/src/leann/index_lock.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +import contextlib +import hashlib +import sys +import threading +import time +from collections.abc import Iterator +from pathlib import Path +from typing import TextIO + +_INDEX_LOCKS_GUARD = threading.Lock() +_INDEX_LOCKS: dict[str, threading.RLock] = {} +_WINDOWS_LOCK_TIMEOUT_SECONDS = 300 + + +def _lock_key(index_dir: Path) -> str: + try: + resolved = index_dir.resolve() + except OSError: + resolved = index_dir + return hashlib.sha256(str(resolved).encode("utf-8")).hexdigest()[:24] + + +def _lock_path(index_dir: Path, key: str) -> Path: + lock_root = index_dir.parent / ".leann-locks" + return lock_root / f"{key}.write.lock" + + +def _flock_acquire(lock_file: TextIO) -> None: + if sys.platform == "win32": + import msvcrt + + lock_file.seek(0, 2) + if lock_file.tell() == 0: + lock_file.write("\n") + lock_file.flush() + lock_file.seek(0) + deadline = time.monotonic() + _WINDOWS_LOCK_TIMEOUT_SECONDS + while True: + try: + msvcrt.locking(lock_file.fileno(), msvcrt.LK_NBLCK, 1) + return + except OSError: + if time.monotonic() >= deadline: + raise TimeoutError( + f"Timed out acquiring index write lock after " + f"{_WINDOWS_LOCK_TIMEOUT_SECONDS}s" + ) + time.sleep(0.25) + + import fcntl + + fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) + + +def _flock_release(lock_file: TextIO) -> None: + if sys.platform == "win32": + import msvcrt + + lock_file.seek(0) + msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1) + return + + import fcntl + + fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) + + +@contextlib.contextmanager +def index_write_lock(index_dir: Path) -> Iterator[None]: + """Serialize writes to one LEANN index across threads and processes.""" + key = _lock_key(index_dir) + with _INDEX_LOCKS_GUARD: + thread_lock = _INDEX_LOCKS.setdefault(key, threading.RLock()) + + with thread_lock: + lock_path = _lock_path(index_dir, key) + lock_path.parent.mkdir(parents=True, exist_ok=True) + with lock_path.open("a+", encoding="utf-8") as lock_file: + _flock_acquire(lock_file) + try: + index_dir.mkdir(parents=True, exist_ok=True) + yield + finally: + _flock_release(lock_file) diff --git a/tests/test_migrate_ids.py b/tests/test_migrate_ids.py index a58d2b8b..ce4002b7 100644 --- a/tests/test_migrate_ids.py +++ b/tests/test_migrate_ids.py @@ -1,6 +1,8 @@ +import hashlib import json import os import pickle +import sqlite3 from pathlib import Path from types import SimpleNamespace @@ -8,6 +10,194 @@ from leann.cli import LeannCLI +def _content_id(text: str) -> str: + return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] + + +def _write_basic_meta(index_dir: Path, **overrides): + meta = { + "version": "1.0", + "backend_name": "hnsw", + "embedding_model": "dummy", + "dimensions": 3, + "backend_kwargs": {}, + "embedding_mode": "sentence-transformers", + "passage_id_scheme": "sequential", + } + meta.update(overrides) + (index_dir / "documents.leann.meta.json").write_text( + json.dumps(meta), + encoding="utf-8", + ) + + +def _write_passages(index_dir: Path, passages: list[dict]) -> dict[str, int]: + offsets: dict[str, int] = {} + with open(index_dir / "documents.leann.passages.jsonl", "w", encoding="utf-8") as f: + for passage in passages: + offsets[passage["id"]] = f.tell() + json.dump(passage, f) + f.write("\n") + with open(index_dir / "documents.leann.passages.idx", "wb") as f: + pickle.dump(offsets, f) + return offsets + + +def _build_fts5_db(db_path: Path, passages: list[dict]) -> None: + conn = sqlite3.connect(db_path) + try: + conn.execute( + "CREATE VIRTUAL TABLE bm25_passages USING fts5(" + "id UNINDEXED, text, tokenize='unicode61 remove_diacritics 2'" + ")" + ) + conn.executemany( + "INSERT INTO bm25_passages(id, text) VALUES (?, ?)", + ((passage["id"], passage["text"]) for passage in passages), + ) + conn.commit() + finally: + conn.close() + + +def test_migrate_ids_rewrites_live_offsets_idmaps_meta_and_fts5(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + index_dir = tmp_path / ".leann" / "indexes" / "sample" + index_dir.mkdir(parents=True) + + passages = [ + {"id": "0", "text": "alpha beta", "metadata": {"source": "a.txt"}}, + {"id": "1", "text": "gamma delta", "metadata": {"source": "b.txt"}}, + ] + _write_passages(index_dir, passages) + (index_dir / "documents.ids.txt").write_text("0\n1\n", encoding="utf-8") + (index_dir / "documents.ivf_id_map.json").write_text( + json.dumps( + { + "id_to_passage": {"0": "0", "1": "1"}, + "passage_to_id": {"0": 0, "1": 1}, + "next_id": 2, + } + ), + encoding="utf-8", + ) + bm25_db = index_dir / "documents.leann.bm25.sqlite" + _build_fts5_db(bm25_db, passages) + _write_basic_meta( + index_dir, + backend_name="ivf", + bm25_backend="fts5", + bm25_db="documents.leann.bm25.sqlite", + ) + + LeannCLI().migrate_ids(SimpleNamespace(index_name="sample", dry_run=False, yes=True)) + + expected_ids = [_content_id("alpha beta"), _content_id("gamma delta")] + with open(index_dir / "documents.leann.passages.jsonl", encoding="utf-8") as f: + migrated_passages = [json.loads(line) for line in f if line.strip()] + assert [passage["id"] for passage in migrated_passages] == expected_ids + + with open(index_dir / "documents.leann.passages.idx", "rb") as f: + assert set(pickle.load(f)) == set(expected_ids) + assert (index_dir / "documents.ids.txt").read_text( + encoding="utf-8" + ).splitlines() == expected_ids + + ivf_map = json.loads((index_dir / "documents.ivf_id_map.json").read_text(encoding="utf-8")) + assert ivf_map == { + "id_to_passage": {"0": expected_ids[0], "1": expected_ids[1]}, + "passage_to_id": {expected_ids[0]: 0, expected_ids[1]: 1}, + "next_id": 2, + } + + meta = json.loads((index_dir / "documents.leann.meta.json").read_text(encoding="utf-8")) + assert meta["version"] == "1.1" + assert meta["passage_id_scheme"] == "content-hash" + assert meta["bm25_backend"] == "fts5" + assert meta["bm25_db"] == "documents.leann.bm25.sqlite" + + conn = sqlite3.connect(bm25_db) + try: + rows = conn.execute( + "SELECT id FROM bm25_passages WHERE bm25_passages MATCH ?", + ("alpha",), + ).fetchall() + finally: + conn.close() + assert rows == [(expected_ids[0],)] + + +def test_migrate_ids_rejects_diskann_indexes(tmp_path, monkeypatch, capsys): + monkeypatch.chdir(tmp_path) + index_dir = tmp_path / ".leann" / "indexes" / "sample" + index_dir.mkdir(parents=True) + _write_basic_meta(index_dir, backend_name="diskann") + + LeannCLI().migrate_ids(SimpleNamespace(index_name="sample", dry_run=False, yes=True)) + + assert "Cannot migrate" in capsys.readouterr().out + meta = json.loads((index_dir / "documents.leann.meta.json").read_text(encoding="utf-8")) + assert meta["passage_id_scheme"] == "sequential" + + +def test_migrate_ids_ignores_stale_passages_absent_from_offset_map(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + index_dir = tmp_path / ".leann" / "indexes" / "sample" + index_dir.mkdir(parents=True) + + live = {"id": "0", "text": "live text", "metadata": {"source": "live.txt"}} + stale = {"id": "1", "text": "stale text", "metadata": {"source": "stale.txt"}} + passages_file = index_dir / "documents.leann.passages.jsonl" + with open(passages_file, "w", encoding="utf-8") as f: + live_offset = f.tell() + json.dump(live, f) + f.write("\n") + json.dump(stale, f) + f.write("\n") + with open(index_dir / "documents.leann.passages.idx", "wb") as f: + pickle.dump({"0": live_offset}, f) + (index_dir / "documents.ids.txt").write_text("0\n1\n", encoding="utf-8") + _write_basic_meta(index_dir) + + LeannCLI().migrate_ids(SimpleNamespace(index_name="sample", dry_run=False, yes=True)) + + expected_id = _content_id("live text") + with open(passages_file, encoding="utf-8") as f: + migrated_passages = [json.loads(line) for line in f if line.strip()] + assert migrated_passages == [ + {"id": expected_id, "text": "live text", "metadata": {"source": "live.txt"}} + ] + with open(index_dir / "documents.leann.passages.idx", "rb") as f: + assert set(pickle.load(f)) == {expected_id} + assert (index_dir / "documents.ids.txt").read_text(encoding="utf-8").splitlines() == [ + expected_id + ] + + +def test_migrate_ids_suffixes_duplicate_text_ids(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + index_dir = tmp_path / ".leann" / "indexes" / "duplicates" + index_dir.mkdir(parents=True) + + passages = [ + {"id": "0", "text": "repeat me", "metadata": {"source": "a.txt"}}, + {"id": "1", "text": "repeat me", "metadata": {"source": "b.txt"}}, + ] + _write_passages(index_dir, passages) + (index_dir / "documents.ids.txt").write_text("0\n1\n", encoding="utf-8") + _write_basic_meta(index_dir) + + LeannCLI().migrate_ids(SimpleNamespace(index_name="duplicates", dry_run=False, yes=True)) + + base_id = _content_id("repeat me") + with open(index_dir / "documents.leann.passages.jsonl", encoding="utf-8") as f: + migrated_ids = [json.loads(line)["id"] for line in f if line.strip()] + assert migrated_ids == [base_id, f"{base_id}-1"] + assert (index_dir / "documents.ids.txt").read_text( + encoding="utf-8" + ).splitlines() == migrated_ids + + def test_migrate_ids_rolls_back_when_publish_fails(tmp_path, monkeypatch): index_dir = tmp_path / ".leann" / "indexes" / "sample" index_dir.mkdir(parents=True) From f1047e7e0e6212526d2b51d8f594ae67178185a9 Mon Sep 17 00:00:00 2001 From: Abi Date: Tue, 2 Jun 2026 19:49:02 -0700 Subject: [PATCH 7/9] fix: preserve legacy id scheme during migration --- packages/leann-core/src/leann/cli.py | 9 ++++----- tests/test_migrate_ids.py | 9 +++++++++ 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/packages/leann-core/src/leann/cli.py b/packages/leann-core/src/leann/cli.py index 03a54b2e..91f2f5bc 100644 --- a/packages/leann-core/src/leann/cli.py +++ b/packages/leann-core/src/leann/cli.py @@ -16,7 +16,7 @@ from llama_index.core.node_parser import SentenceSplitter from tqdm import tqdm -from .api import LeannBuilder, LeannChat, LeannSearcher +from .api import PASSAGE_ID_SCHEME_SEQUENTIAL, LeannBuilder, LeannChat, LeannSearcher from .embedding_server_manager import EmbeddingServerManager from .index_lock import index_write_lock from .interactive_utils import create_cli_session @@ -2013,16 +2013,15 @@ def _chunks_for_paths(self, all_texts: list[dict], paths: set[str]) -> list[dict def _existing_index_id_scheme(self, index_path: str) -> Optional[str]: """Return the passage_id_scheme recorded in an existing index's meta.json. - Returns None when the index doesn't exist yet or the field isn't - recorded (older indexes pre-#330). Callers should treat None as - "fall back to whatever the args say or the default". + Returns None when the index doesn't exist yet. Older indexes pre-#330 + have no field and are sequential by definition. """ meta_path = Path(index_path).with_suffix(".leann.meta.json") if not meta_path.exists(): return None try: with open(meta_path, encoding="utf-8") as f: - return json.load(f).get("passage_id_scheme") + return json.load(f).get("passage_id_scheme", PASSAGE_ID_SCHEME_SEQUENTIAL) except Exception: return None diff --git a/tests/test_migrate_ids.py b/tests/test_migrate_ids.py index ce4002b7..a3908084 100644 --- a/tests/test_migrate_ids.py +++ b/tests/test_migrate_ids.py @@ -60,6 +60,15 @@ def _build_fts5_db(db_path: Path, passages: list[dict]) -> None: conn.close() +def test_legacy_missing_id_scheme_is_sequential(tmp_path): + meta_path = tmp_path / "documents.leann.meta.json" + meta_path.write_text(json.dumps({"version": "1.0"}), encoding="utf-8") + + cli = LeannCLI() + + assert cli._existing_index_id_scheme(str(tmp_path / "documents.leann")) == "sequential" + + def test_migrate_ids_rewrites_live_offsets_idmaps_meta_and_fts5(tmp_path, monkeypatch): monkeypatch.chdir(tmp_path) index_dir = tmp_path / ".leann" / "indexes" / "sample" From 3441061c462eb12d3d75d51407fb8a4be83857a8 Mon Sep 17 00:00:00 2001 From: Abi Date: Tue, 2 Jun 2026 20:39:01 -0700 Subject: [PATCH 8/9] fix: preserve passage ids during incremental updates --- packages/leann-core/src/leann/api.py | 43 ++++--- packages/leann-core/src/leann/cli.py | 19 ++- tests/test_passage_id_scheme.py | 172 +++++++++++++++++++++++++++ 3 files changed, 215 insertions(+), 19 deletions(-) create mode 100644 tests/test_passage_id_scheme.py diff --git a/packages/leann-core/src/leann/api.py b/packages/leann-core/src/leann/api.py index c3c7d225..691e0f1f 100644 --- a/packages/leann-core/src/leann/api.py +++ b/packages/leann-core/src/leann/api.py @@ -930,14 +930,28 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]] ) valid_chunks: list[dict[str, Any]] = [] + reserved_ids = set(existing_ids) + next_sequential_id = len(offset_map) for chunk in self.chunks: text = chunk.get("text", "") if not isinstance(text, str) or not text.strip(): continue metadata = chunk.setdefault("metadata", {}) - passage_id = chunk.get("id") or metadata.get("id") - if passage_id and passage_id in existing_ids: + if "id" in metadata and metadata["id"] is not None: + passage_id = str(metadata["id"]) + elif self.passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH: + passage_id = self._generate_passage_id(text, reserved_ids) + else: + while str(next_sequential_id) in reserved_ids: + next_sequential_id += 1 + passage_id = str(next_sequential_id) + next_sequential_id += 1 + + if passage_id in reserved_ids: raise ValueError(f"Passage ID '{passage_id}' already exists in the index.") + chunk["id"] = passage_id + metadata["id"] = passage_id + reserved_ids.add(passage_id) valid_chunks.append(chunk) if not valid_chunks: @@ -973,12 +987,6 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]] # IVF: add_vectors then append passages/offset (no ZMQ/server) if backend_name == "ivf": - for i, chunk in enumerate(valid_chunks): - pid = chunk.get("id") or chunk.get("metadata", {}).get("id") - if not pid: - pid = str(len(offset_map) + i) - chunk.setdefault("metadata", {})["id"] = pid - chunk["id"] = pid passage_ids = [c["id"] for c in valid_chunks] try: from leann_backend_ivf import add_vectors as ivf_add_vectors @@ -1061,17 +1069,16 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]] passage_meta_mode = meta.get("embedding_mode", self.embedding_mode) passage_provider_options = meta.get("embedding_options", self.embedding_options) - base_id = index.ntotal - for offset, chunk in enumerate(valid_chunks): - new_id = str(base_id + offset) - chunk.setdefault("metadata", {})["id"] = new_id - chunk["id"] = new_id - # Append passages/offsets before we attempt index.add so the ZMQ server # can resolve newly assigned IDs during recompute. Keep rollback hooks # so we can restore files if the update fails mid-way. rollback_passages_size = passages_file.stat().st_size if passages_file.exists() else 0 offset_map_backup = offset_map.copy() + idmap_file = ( + path.parent + / f"{path.name[: -len('.leann')] if path.name.endswith('.leann') else path.name}.ids.txt" + ) + rollback_idmap_size = idmap_file.stat().st_size if idmap_file.exists() else None try: with open(passages_file, "a", encoding="utf-8") as f: @@ -1091,6 +1098,9 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]] with open(offset_file, "wb") as f: pickle.dump(offset_map, f) + with open(idmap_file, "a", encoding="utf-8") as f: + for chunk in valid_chunks: + f.write(str(chunk["id"]) + "\n") server_manager: Optional[EmbeddingServerManager] = None server_started = False @@ -1146,6 +1156,11 @@ def update_index(self, index_path: str, remove_passage_ids: Optional[list[str]] offset_map = offset_map_backup with open(offset_file, "wb") as f: pickle.dump(offset_map, f) + if rollback_idmap_size is None: + idmap_file.unlink(missing_ok=True) + elif idmap_file.exists(): + with open(idmap_file, "rb+") as f: + f.truncate(rollback_idmap_size) raise meta["total_passages"] = len(offset_map) diff --git a/packages/leann-core/src/leann/cli.py b/packages/leann-core/src/leann/cli.py index 91f2f5bc..f736fa4d 100644 --- a/packages/leann-core/src/leann/cli.py +++ b/packages/leann-core/src/leann/cli.py @@ -16,7 +16,13 @@ from llama_index.core.node_parser import SentenceSplitter from tqdm import tqdm -from .api import PASSAGE_ID_SCHEME_SEQUENTIAL, LeannBuilder, LeannChat, LeannSearcher +from .api import ( + PASSAGE_ID_SCHEME_CONTENT_HASH, + PASSAGE_ID_SCHEME_SEQUENTIAL, + LeannBuilder, + LeannChat, + LeannSearcher, +) from .embedding_server_manager import EmbeddingServerManager from .index_lock import index_write_lock from .interactive_utils import create_cli_session @@ -2059,8 +2065,9 @@ def _incremental_add_only( new_chunks = self._chunks_for_paths(all_texts, new_paths) if not new_chunks: return False - self._assign_chunk_ids(new_chunks) builder = self._make_incremental_builder(args) + if builder.passage_id_scheme != PASSAGE_ID_SCHEME_CONTENT_HASH: + self._assign_chunk_ids(new_chunks) for chunk in new_chunks: builder.add_text(chunk["text"], metadata=chunk["metadata"]) print( @@ -2153,13 +2160,15 @@ def _incremental_ivf_update( for p in changed_paths: path_set.update(self._path_lookup_keys(p, sync_roots)) new_chunks = self._chunks_for_paths(all_texts, path_set) - # Use unique IDs: passages can have mixed path formats so we may miss some ids_to_remove - self._assign_unique_chunk_ids(new_chunks) + # Use unique IDs for sequential indexes: passages can have mixed path formats so we may + # miss some ids_to_remove. Content-hash indexes let LeannBuilder derive IDs from text. + builder = self._make_incremental_builder(args) + if builder.passage_id_scheme != PASSAGE_ID_SCHEME_CONTENT_HASH: + self._assign_unique_chunk_ids(new_chunks) if not ids_to_remove and not new_chunks: return False - builder = self._make_incremental_builder(args) for chunk in new_chunks: builder.add_text(chunk["text"], metadata=chunk["metadata"]) diff --git a/tests/test_passage_id_scheme.py b/tests/test_passage_id_scheme.py new file mode 100644 index 00000000..ac1d3398 --- /dev/null +++ b/tests/test_passage_id_scheme.py @@ -0,0 +1,172 @@ +import hashlib +import json +import pickle +import sys +from types import ModuleType, SimpleNamespace +from typing import Any, cast + +import leann.api as leann_api +import numpy as np +from leann.api import LeannBuilder +from leann.cli import LeannCLI + + +def _content_id(text: str) -> str: + return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] + + +def _write_ivf_index(tmp_path, *, passage_id_scheme: str = "content-hash") -> str: + index_path = tmp_path / "documents.leann" + existing_id = _content_id("existing text") + passages_file = tmp_path / "documents.leann.passages.jsonl" + offset_file = tmp_path / "documents.leann.passages.idx" + with passages_file.open("w", encoding="utf-8") as f: + offset = f.tell() + f.write( + json.dumps( + {"id": existing_id, "text": "existing text", "metadata": {"id": existing_id}} + ) + + "\n" + ) + with offset_file.open("wb") as f: + pickle.dump({existing_id: offset}, f) + (tmp_path / "documents.index").write_bytes(b"fake-index") + (tmp_path / "documents.leann.meta.json").write_text( + json.dumps( + { + "version": "1.1", + "backend_name": "ivf", + "embedding_model": "test-model", + "embedding_mode": "sentence-transformers", + "dimensions": 2, + "backend_kwargs": {}, + "passage_id_scheme": passage_id_scheme, + "passage_sources": [ + { + "type": "jsonl", + "path": passages_file.name, + "index_path": offset_file.name, + } + ], + } + ), + encoding="utf-8", + ) + return str(index_path) + + +def _patch_ivf_update(monkeypatch): + calls = [] + fake_ivf = ModuleType("leann_backend_ivf") + + def add_vectors(index_path, embeddings, passage_ids): + calls.append((index_path, embeddings.copy(), list(passage_ids))) + + cast(Any, fake_ivf).add_vectors = add_vectors + monkeypatch.setitem(sys.modules, "leann_backend_ivf", fake_ivf) + monkeypatch.setitem(leann_api.BACKEND_REGISTRY, "ivf", SimpleNamespace()) + monkeypatch.setattr( + leann_api, + "compute_embeddings", + lambda texts, *args, **kwargs: np.ones((len(texts), 2), dtype=np.float32), + ) + return calls + + +def test_builder_content_hash_passage_ids_suffix_duplicate_text(): + builder = LeannBuilder(backend_name="hnsw", passage_id_scheme="content-hash") + + builder.add_text("same text", metadata={"source": "a.txt"}) + builder.add_text("same text", metadata={"source": "b.txt"}) + builder.add_text("different text", metadata={"source": "c.txt"}) + + same_id = _content_id("same text") + assert builder.chunks[0]["id"] == same_id + assert builder.chunks[1]["id"] == f"{same_id}-1" + assert builder.chunks[2]["id"] == _content_id("different text") + + +def test_builder_preserves_falsy_explicit_metadata_id(): + builder = LeannBuilder(backend_name="hnsw", passage_id_scheme="content-hash") + + builder.add_text("zero id", metadata={"id": 0, "source": "a.txt"}) + builder.add_text("empty id", metadata={"id": "", "source": "b.txt"}) + + assert builder.chunks[0]["id"] == "0" + assert builder.chunks[1]["id"] == "" + + +def test_legacy_missing_id_scheme_is_sequential(tmp_path): + meta_path = tmp_path / "documents.leann.meta.json" + meta_path.write_text(json.dumps({"version": "1.0"}), encoding="utf-8") + + cli = LeannCLI() + + assert cli._existing_index_id_scheme(str(tmp_path / "documents.leann")) == "sequential" + + +def test_incremental_content_hash_add_only_does_not_preassign_path_ids(tmp_path, monkeypatch): + cli = LeannCLI() + added_chunks = [] + + class FakeBuilder: + passage_id_scheme = "content-hash" + + def add_text(self, text, metadata=None): + added_chunks.append((text, dict(metadata or {}))) + + def update_index(self, index_path): + assert index_path == str(tmp_path / "documents.leann") + + monkeypatch.setattr(cli, "_make_incremental_builder", lambda _args: FakeBuilder()) + all_texts = [ + { + "text": "stable content", + "metadata": {"file_path": str(tmp_path / "doc.txt")}, + } + ] + + assert cli._incremental_add_only( + str(tmp_path / "documents.leann"), + all_texts, + SimpleNamespace(index_name="demo"), + {str(tmp_path / "doc.txt")}, + ) + + assert added_chunks == [("stable content", {"file_path": str(tmp_path / "doc.txt")})] + + +def test_update_content_hash_suffixes_existing_id_collision(tmp_path, monkeypatch): + index_path = _write_ivf_index(tmp_path) + add_vectors_calls = _patch_ivf_update(monkeypatch) + builder = LeannBuilder( + backend_name="ivf", + dimensions=2, + passage_id_scheme="content-hash", + ) + + builder.add_text("existing text", metadata={"source": "duplicate.txt"}) + builder.update_index(index_path) + + assert add_vectors_calls[0][2] == [f"{_content_id('existing text')}-1"] + offset_map = pickle.loads((tmp_path / "documents.leann.passages.idx").read_bytes()) + assert f"{_content_id('existing text')}-1" in offset_map + + +def test_update_preserves_falsy_explicit_metadata_ids(tmp_path, monkeypatch): + index_path = _write_ivf_index(tmp_path) + add_vectors_calls = _patch_ivf_update(monkeypatch) + builder = LeannBuilder( + backend_name="ivf", + dimensions=2, + passage_id_scheme="content-hash", + ) + + builder.add_text("zero id", metadata={"id": 0}) + builder.add_text("empty id", metadata={"id": ""}) + builder.update_index(index_path) + + assert add_vectors_calls[0][2] == ["0", ""] + offset_map = pickle.loads((tmp_path / "documents.leann.passages.idx").read_bytes()) + assert "0" in offset_map + assert "" in offset_map From c106dc488e217c39a13d83ec9dbb630a8df12a8e Mon Sep 17 00:00:00 2001 From: Abi Date: Tue, 2 Jun 2026 21:37:20 -0700 Subject: [PATCH 9/9] fix: avoid model downloads in readme ci tests --- tests/test_readme_examples.py | 145 ++++++++++++++++++++++++++-------- 1 file changed, 114 insertions(+), 31 deletions(-) diff --git a/tests/test_readme_examples.py b/tests/test_readme_examples.py index fb9b1f3f..21202429 100644 --- a/tests/test_readme_examples.py +++ b/tests/test_readme_examples.py @@ -9,15 +9,80 @@ import pytest +CI_EMBEDDING_DIMENSIONS = 4 + + +def _is_ci() -> bool: + return os.environ.get("CI") == "true" + + +def _install_ci_embeddings(monkeypatch): + """Use deterministic embeddings in CI so docs tests do not depend on model downloads.""" + if not _is_ci(): + return + + import leann.api as leann_api + import leann.embedding_compute as embedding_compute + import numpy as np + + def fake_compute_embeddings( + chunks, + model_name, + mode="sentence-transformers", + use_server=True, + port=None, + is_build=False, + provider_options=None, + ): + del model_name, mode, use_server, port, is_build, provider_options + embeddings = [] + for chunk in chunks: + text = str(chunk).lower() + if ( + "fantastical" in text + or "ai-generated" in text + or "banana" in text + or "crocodile" in text + ): + embeddings.append([1.0, 0.0, 0.0, 0.0]) + elif "storage" in text or "97%" in text or "saves" in text: + embeddings.append([0.0, 1.0, 0.0, 0.0]) + elif "llm" in text or "testing" in text: + embeddings.append([0.0, 0.0, 1.0, 0.0]) + else: + embeddings.append([0.0, 0.0, 0.0, 1.0]) + return np.asarray(embeddings, dtype=np.float32) + + monkeypatch.setattr(leann_api, "compute_embeddings", fake_compute_embeddings) + monkeypatch.setattr(embedding_compute, "compute_embeddings", fake_compute_embeddings) + + +def _ci_builder_kwargs() -> dict: + if not _is_ci(): + return {} + return { + "embedding_model": "ci/deterministic-test-embedding", + "dimensions": CI_EMBEDDING_DIMENSIONS, + "is_compact": False, + "is_recompute": False, + } + + +def _ci_searcher_kwargs() -> dict: + if not _is_ci(): + return {} + return {"enable_warmup": False, "recompute_embeddings": False} + @pytest.mark.parametrize("backend_name", ["hnsw", "diskann"]) -def test_readme_basic_example(backend_name): +def test_readme_basic_example(backend_name, monkeypatch): """Test the basic example from README.md with both backends.""" + _install_ci_embeddings(monkeypatch) # Skip on macOS CI due to MPS environment issues with all-MiniLM-L6-v2 - if os.environ.get("CI") == "true" and platform.system() == "Darwin": + if _is_ci() and platform.system() == "Darwin": pytest.skip("Skipping on macOS CI due to MPS environment issues with all-MiniLM-L6-v2") # Skip DiskANN on CI (Linux runners) due to C++ extension memory/hardware constraints - if os.environ.get("CI") == "true" and backend_name == "diskann": + if _is_ci() and backend_name == "diskann": pytest.skip("Skip DiskANN tests in CI due to resource constraints and instability") # This is the exact code from README (with smaller model for CI) @@ -27,11 +92,10 @@ def test_readme_basic_example(backend_name): with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as temp_dir: INDEX_PATH = str(Path(temp_dir) / f"demo_{backend_name}.leann") - if os.environ.get("CI") == "true": + if _is_ci(): builder = LeannBuilder( backend_name=backend_name, - embedding_model="sentence-transformers/all-MiniLM-L6-v2", - dimensions=384, + **_ci_builder_kwargs(), ) else: builder = LeannBuilder(backend_name=backend_name) @@ -44,8 +108,12 @@ def test_readme_basic_example(backend_name): index_files = list(index_dir.glob(f"{Path(INDEX_PATH).stem}.*")) assert len(index_files) > 0 - with LeannSearcher(INDEX_PATH) as searcher: - results = searcher.search("fantastical AI-generated creatures", top_k=1) + with LeannSearcher(INDEX_PATH, **_ci_searcher_kwargs()) as searcher: + results = searcher.search( + "fantastical AI-generated creatures", + top_k=1, + recompute_embeddings=not _is_ci(), + ) assert len(results) > 0 assert isinstance(results[0], SearchResult) @@ -54,8 +122,16 @@ def test_readme_basic_example(backend_name): ) assert "banana" in results[0].text or "crocodile" in results[0].text - chat = LeannChat(INDEX_PATH, llm_config={"type": "simulated"}) - response = chat.ask("How much storage does LEANN save?", top_k=1) + chat = LeannChat( + INDEX_PATH, + llm_config={"type": "simulated"}, + **_ci_searcher_kwargs(), + ) + response = chat.ask( + "How much storage does LEANN save?", + top_k=1, + recompute_embeddings=not _is_ci(), + ) # Verify chat works assert isinstance(response, str) @@ -75,31 +151,33 @@ def test_readme_imports(): assert callable(LeannChat) -def test_backend_options(): +def test_backend_options(monkeypatch): """Test different backend options mentioned in documentation.""" + _install_ci_embeddings(monkeypatch) # Skip on macOS CI due to MPS environment issues with all-MiniLM-L6-v2 - if os.environ.get("CI") == "true" and platform.system() == "Darwin": + if _is_ci() and platform.system() == "Darwin": pytest.skip("Skipping on macOS CI due to MPS environment issues with all-MiniLM-L6-v2") from leann import LeannBuilder with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as temp_dir: - is_ci = os.environ.get("CI") == "true" - embedding_model = ( - "sentence-transformers/all-MiniLM-L6-v2" if is_ci else "facebook/contriever" - ) - dimensions = 384 if is_ci else None - hnsw_path = str(Path(temp_dir) / "test_hnsw.leann") - builder_hnsw = LeannBuilder( - backend_name="hnsw", embedding_model=embedding_model, dimensions=dimensions - ) + if _is_ci(): + builder_hnsw = LeannBuilder( + backend_name="hnsw", + **_ci_builder_kwargs(), + ) + else: + builder_hnsw = LeannBuilder( + backend_name="hnsw", + embedding_model="facebook/contriever", + ) builder_hnsw.add_text("Test document for HNSW backend") builder_hnsw.build_index(hnsw_path) assert Path(hnsw_path).parent.exists() assert len(list(Path(hnsw_path).parent.glob(f"{Path(hnsw_path).stem}.*"))) > 0 - if is_ci: + if _is_ci(): pytest.skip( "Skip DiskANN portion in CI - small datasets trigger MKL parameter " "errors and pytest-timeout thread kills cause segfaults on Windows" @@ -107,7 +185,8 @@ def test_backend_options(): diskann_path = str(Path(temp_dir) / "test_diskann.leann") builder_diskann = LeannBuilder( - backend_name="diskann", embedding_model=embedding_model, dimensions=dimensions + backend_name="diskann", + embedding_model="facebook/contriever", ) builder_diskann.add_text("Test document for DiskANN backend") builder_diskann.build_index(diskann_path) @@ -116,25 +195,25 @@ def test_backend_options(): @pytest.mark.parametrize("backend_name", ["hnsw", "diskann"]) -def test_llm_config_simulated(backend_name): +def test_llm_config_simulated(backend_name, monkeypatch): """Test simulated LLM configuration option with both backends.""" + _install_ci_embeddings(monkeypatch) # Skip on macOS CI due to MPS environment issues with all-MiniLM-L6-v2 - if os.environ.get("CI") == "true" and platform.system() == "Darwin": + if _is_ci() and platform.system() == "Darwin": pytest.skip("Skipping on macOS CI due to MPS environment issues with all-MiniLM-L6-v2") # Skip DiskANN tests in CI due to hardware requirements - if os.environ.get("CI") == "true" and backend_name == "diskann": + if _is_ci() and backend_name == "diskann": pytest.skip("Skip DiskANN tests in CI - requires specific hardware and large memory") from leann import LeannBuilder, LeannChat with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as temp_dir: index_path = str(Path(temp_dir) / f"test_{backend_name}.leann") - if os.environ.get("CI") == "true": + if _is_ci(): builder = LeannBuilder( backend_name=backend_name, - embedding_model="sentence-transformers/all-MiniLM-L6-v2", - dimensions=384, + **_ci_builder_kwargs(), ) else: builder = LeannBuilder(backend_name=backend_name) @@ -142,8 +221,12 @@ def test_llm_config_simulated(backend_name): builder.build_index(index_path) llm_config = {"type": "simulated"} - chat = LeannChat(index_path, llm_config=llm_config) - response = chat.ask("What is this document about?", top_k=1) + chat = LeannChat(index_path, llm_config=llm_config, **_ci_searcher_kwargs()) + response = chat.ask( + "What is this document about?", + top_k=1, + recompute_embeddings=not _is_ci(), + ) assert isinstance(response, str) assert len(response) > 0