Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions packages/leann-core/src/leann/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@

logger = logging.getLogger(__name__)

# Passage ID schemes recorded in <index>.meta.json["passage_id_scheme"].
# - "sequential": today's default; IDs are str(insertion_index) (api.py:add_text).
# - "content-hash": planned in #329; IDs are sha256(text)[:16], stable across
# file moves and reorderings.
# Older indexes have no passage_id_scheme field — readers must default to
# "sequential" when the key is absent. See #329 for the rollout plan.
PASSAGE_ID_SCHEME_SEQUENTIAL = "sequential"
PASSAGE_ID_SCHEME_CONTENT_HASH = "content-hash"


def get_registered_backends() -> list[str]:
"""Get list of registered backend names."""
Expand Down Expand Up @@ -361,8 +370,19 @@ def __init__(
dimensions: Optional[int] = None,
embedding_mode: str = "sentence-transformers",
embedding_options: Optional[dict[str, Any]] = None,
passage_id_scheme: str = PASSAGE_ID_SCHEME_SEQUENTIAL,
**backend_kwargs,
):
if passage_id_scheme not in (
PASSAGE_ID_SCHEME_SEQUENTIAL,
PASSAGE_ID_SCHEME_CONTENT_HASH,
):
raise ValueError(
f"Unknown passage_id_scheme: {passage_id_scheme!r}. "
f"Expected one of: {PASSAGE_ID_SCHEME_SEQUENTIAL!r}, "
f"{PASSAGE_ID_SCHEME_CONTENT_HASH!r}."
)
self.passage_id_scheme = passage_id_scheme
self.backend_name = backend_name
# Normalize incompatible combinations early (for consistent metadata)
if backend_name == "hnsw":
Expand Down Expand Up @@ -457,10 +477,23 @@ def __init__(
self.backend_kwargs = backend_kwargs
self.chunks: list[dict[str, Any]] = []

def _generate_passage_id(self, text: str) -> str:
"""Generate a passage ID per the configured scheme.

sequential: str(insertion index) — fast, position-dependent, current default.
content-hash: sha256(text)[:16] — content-stable, dedup-friendly across
file moves and reorderings. See #329 for the design.
"""
if self.passage_id_scheme == PASSAGE_ID_SCHEME_CONTENT_HASH:
import hashlib

return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]
return str(len(self.chunks))

def add_text(self, text: str, metadata: Optional[dict[str, Any]] = None):
if metadata is None:
metadata = {}
passage_id = metadata.get("id", str(len(self.chunks)))
passage_id = metadata.get("id") or self._generate_passage_id(text)
chunk_data = {"id": passage_id, "text": text, "metadata": metadata}
self.chunks.append(chunk_data)

Expand Down Expand Up @@ -550,12 +583,13 @@ def build_index(self, index_path: str):
builder_instance.build(embeddings, string_ids, index_path, **current_backend_kwargs)
leann_meta_path = index_dir / f"{index_name}.meta.json"
meta_data = {
"version": "1.0",
"version": "1.1",
"backend_name": self.backend_name,
"embedding_model": self.embedding_model,
"dimensions": self.dimensions,
"backend_kwargs": self.backend_kwargs,
"embedding_mode": self.embedding_mode,
"passage_id_scheme": self.passage_id_scheme,
"passage_sources": [
{
"type": "jsonl",
Expand Down Expand Up @@ -675,12 +709,13 @@ def build_index_from_arrays(self, index_path: str, ids: list, embeddings: np.nda
# Create metadata file
leann_meta_path = index_dir / f"{index_name}.meta.json"
meta_data = {
"version": "1.0",
"version": "1.1",
"backend_name": self.backend_name,
"embedding_model": self.embedding_model,
"dimensions": self.dimensions,
"backend_kwargs": self.backend_kwargs,
"embedding_mode": self.embedding_mode,
"passage_id_scheme": self.passage_id_scheme,
"passage_sources": [
{
"type": "jsonl",
Expand Down Expand Up @@ -1138,6 +1173,12 @@ def __init__(
)
self.bm25_scorer: Optional[BM25Scorer] = None

# Surface the index's passage ID scheme so callers can introspect.
# Older indexes (pre-#330) don't record this field — they're sequential.
self.passage_id_scheme: str = self.meta_data.get(
"passage_id_scheme", PASSAGE_ID_SCHEME_SEQUENTIAL
)

# Optional one-shot warmup at construction time to hide cold-start latency.
if self._warmup:
self.warmup()
Expand Down
180 changes: 180 additions & 0 deletions packages/leann-core/src/leann/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,16 @@ def create_parser(self) -> argparse.ArgumentParser:
default=True,
help="Fall back to traditional chunking if AST chunking fails (default: True)",
)
build_parser.add_argument(
"--id-scheme",
choices=["sequential", "content-hash"],
default="sequential",
help=(
"How passage IDs are assigned. 'sequential' (default) keys by insertion "
"order; 'content-hash' uses sha256(text)[:16], stable across file moves "
"and reorderings. See #329."
),
)

# Watch command
watch_parser = subparsers.add_parser(
Expand All @@ -365,6 +375,26 @@ def create_parser(self) -> argparse.ArgumentParser:
help="Report changes without rebuilding (original watch behavior)",
)

migrate_parser = subparsers.add_parser(
"migrate-ids",
help=(
"Rewrite an existing index's passage IDs to content-hash form. "
"Irreversible; back up the index first."
),
)
migrate_parser.add_argument("index_name", help="Index name")
migrate_parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would change without writing anything.",
)
migrate_parser.add_argument(
"-y",
"--yes",
action="store_true",
help="Skip the interactive confirmation prompt.",
)

# Search command
search_parser = subparsers.add_parser("search", help="Search documents")
search_parser.add_argument("index_name", help="Index name")
Expand Down Expand Up @@ -1874,7 +1904,32 @@ def _chunks_for_paths(self, all_texts: list[dict], paths: set[str]) -> list[dict
in paths
]

def _existing_index_id_scheme(self, index_path: str) -> Optional[str]:
"""Return the passage_id_scheme recorded in an existing index's meta.json.

Returns None when the index doesn't exist yet or the field isn't
recorded (older indexes pre-#330). Callers should treat None as
"fall back to whatever the args say or the default".
"""
meta_path = Path(index_path).with_suffix(".leann.meta.json")
if not meta_path.exists():
return None
try:
with open(meta_path, encoding="utf-8") as f:
return json.load(f).get("passage_id_scheme")
except Exception:
return None

def _make_incremental_builder(self, args) -> "LeannBuilder":
# For incremental updates, the existing index's scheme wins. Otherwise
# IDs would mix schemes within one index, which breaks lookups.
existing_scheme = self._existing_index_id_scheme(self.get_index_path(args.index_name))
scheme = existing_scheme or getattr(args, "id_scheme", "sequential")
if existing_scheme and getattr(args, "id_scheme", existing_scheme) != existing_scheme:
print(
f"Note: --id-scheme={args.id_scheme} ignored — index '{args.index_name}' "
f"was built with passage_id_scheme={existing_scheme!r}, keeping that."
)
return LeannBuilder(
backend_name=args.backend_name,
embedding_model=args.embedding_model,
Expand All @@ -1885,6 +1940,7 @@ def _make_incremental_builder(self, args) -> "LeannBuilder":
is_compact=args.compact,
is_recompute=args.recompute,
num_threads=args.num_threads,
passage_id_scheme=scheme,
)

def _incremental_add_only(
Expand Down Expand Up @@ -2378,6 +2434,7 @@ async def build_index(self, args):
is_compact=args.compact,
is_recompute=args.recompute,
num_threads=args.num_threads,
passage_id_scheme=getattr(args, "id_scheme", "sequential"),
)

for chunk in all_texts:
Expand Down Expand Up @@ -2498,6 +2555,127 @@ async def _watch_trigger_build(self, index_name: str) -> None:
build_args = parser.parse_args(build_args_list)
await self.build_index(build_args)

def migrate_ids(self, args) -> None:
"""Rewrite an existing index's passage IDs to content-hash form.

Migration is purely a Python-side rewrite — the vector graph isn't
touched, so FAISS labels stay valid. What gets rewritten:
- <index>.passages.jsonl : new IDs in each line's "id" field
- <index>.passages.idx : new offset map keyed by new IDs
- <index>.ids.txt : new label → ID mapping for FAISS
- <index>.meta.json : passage_id_scheme = "content-hash"

Identical-text chunks collide on the same sha256 prefix; the later
occurrence wins in the offset map (dedup). A `--preserve-duplicates`
knob to suffix collisions can land separately.

Irreversible. Prompts for confirmation unless --yes is passed.
"""
import hashlib
import shutil

index_name = args.index_name
index_path = self._resolve_index_path(index_name, purpose="migrate")
if not index_path:
return
meta_path = Path(index_path).with_suffix(".leann.meta.json")
if not meta_path.exists():
print(f"Cannot migrate '{index_name}': metadata missing at {meta_path}.")
return
with open(meta_path, encoding="utf-8") as f:
meta = json.load(f)
current_scheme = meta.get("passage_id_scheme", "sequential")
if current_scheme == "content-hash":
print(f"Index '{index_name}' already uses content-hash IDs. Nothing to do.")
return

# Locate the sibling artifacts using the same conventions as build_index.
index_dir = Path(index_path).parent
index_base = Path(index_path).name
passages_file = index_dir / f"{index_base}.passages.jsonl"
offset_file = index_dir / f"{index_base}.passages.idx"
base_no_leann = (
index_base[: -len(".leann")] if index_base.endswith(".leann") else index_base
)
idmap_file = index_dir / f"{base_no_leann}.ids.txt"

for p in (passages_file, offset_file):
if not p.exists():
print(f"Cannot migrate: required artifact missing at {p}.")
return

# Stream the passages to plan the rewrite and surface collision count
# before committing to anything irreversible.
old_ids: list[str] = []
new_ids: list[str] = []
with open(passages_file, encoding="utf-8") as f:
for line in f:
if not line.strip():
continue
data = json.loads(line)
old_ids.append(data["id"])
new_ids.append(hashlib.sha256(data["text"].encode("utf-8")).hexdigest()[:16])

unique_new = len(set(new_ids))
collisions = len(new_ids) - unique_new
print(
f"Migrating '{index_name}': {len(new_ids)} passages → {unique_new} unique "
f"content-hash IDs ({collisions} collision(s) will dedup)."
)

if args.dry_run:
print("(dry-run; not writing anything)")
return
if not args.yes:
confirm = input(
"Proceed? This rewrites passages.jsonl, .idx, .ids.txt, .meta.json. [y/N] "
)
if confirm.strip().lower() not in ("y", "yes"):
print("Aborted.")
return

# Stage writes into siblings, then atomically rename.
new_passages = passages_file.with_suffix(passages_file.suffix + ".migrate")
new_offsets: dict[str, int] = {}
with (
open(passages_file, encoding="utf-8") as src,
open(new_passages, "w", encoding="utf-8") as dst,
):
idx = 0
for line in src:
if not line.strip():
continue
data = json.loads(line)
data["id"] = new_ids[idx]
offset = dst.tell()
json.dump(data, dst, ensure_ascii=False)
dst.write("\n")
new_offsets[new_ids[idx]] = offset
idx += 1

new_idx = offset_file.with_suffix(offset_file.suffix + ".migrate")
with open(new_idx, "wb") as f:
pickle.dump(new_offsets, f)

if idmap_file.exists():
new_idmap = idmap_file.with_suffix(idmap_file.suffix + ".migrate")
with open(new_idmap, "w", encoding="utf-8") as f:
for nid in new_ids:
f.write(nid + "\n")
shutil.move(str(new_idmap), str(idmap_file))

shutil.move(str(new_passages), str(passages_file))
shutil.move(str(new_idx), str(offset_file))

meta["passage_id_scheme"] = "content-hash"
meta["version"] = "1.1"
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(meta, f, indent=2)

print(
f"✓ Migrated '{index_name}' to content-hash IDs. {collisions} collisions were deduped."
)

async def watch_index(self, args):
index_name = args.index_name
resolved = self._resolve_index_for_watch(index_name)
Expand Down Expand Up @@ -3218,6 +3396,8 @@ async def run(self, args=None):
await self.build_index(args)
elif args.command == "watch":
await self.watch_index(args)
elif args.command == "migrate-ids":
self.migrate_ids(args)
elif args.command == "search":
with suppress_cpp_output(suppress):
await self.search_documents(args)
Expand Down
Loading