diff --git a/core/api.py b/core/api.py index 4cd3173a..ef562552 100644 --- a/core/api.py +++ b/core/api.py @@ -52,6 +52,7 @@ from core.routes.logs import router as logs_router # noqa: E402 – import after FastAPI app from core.routes.models import router as models_router from core.routes.usage import router as usage_router +from core.routes.v2 import router as v2_router from core.services.telemetry import TelemetryService from core.services_init import document_service, ingestion_service from core.utils.folder_utils import normalize_folder_selector @@ -326,6 +327,9 @@ def _extract_provider(model_name: str) -> str: # Register models router app.include_router(models_router) +# Register v2 router +app.include_router(v2_router) + # Register logs router app.include_router(logs_router) diff --git a/core/app_factory.py b/core/app_factory.py index d3b20e2b..5ece02d4 100644 --- a/core/app_factory.py +++ b/core/app_factory.py @@ -27,7 +27,7 @@ async def lifespan(app_instance: FastAPI): # ------------------------------------------------------------------ # Import services directly from services_init instead of through api_module # ------------------------------------------------------------------ - from core.services_init import database, settings, vector_store + from core.services_init import database, settings, v2_chunk_store, vector_store # --- BEGIN MOVED STARTUP LOGIC --- logger.info("Lifespan: Initializing Database…") @@ -57,6 +57,18 @@ async def lifespan(app_instance: FastAPI): exc_info=True, ) + logger.info("Lifespan: Initializing V2 Chunk Store…") + try: + if hasattr(v2_chunk_store, "initialize"): + await v2_chunk_store.initialize() + logger.info("Lifespan: V2 Chunk Store initialization successful (or not applicable).") + except Exception as exc: # noqa: BLE001 + logger.error( + "Lifespan: CRITICAL - Failed to initialize V2 Chunk Store: %s", + exc, + exc_info=True, + ) + # Initialize ColPali vector store if it exists # Note: max_sim function creation happens in MultiVectorStore.initialize() logger.info("Lifespan: Initializing ColPali Vector Store…") diff --git a/core/database/metadata_filters.py b/core/database/metadata_filters.py index 4c10cd1a..8dcefc83 100644 --- a/core/database/metadata_filters.py +++ b/core/database/metadata_filters.py @@ -29,9 +29,16 @@ class InvalidMetadataFilterError(ValueError): class MetadataFilterBuilder: """Translate JSON-style metadata filters into SQL, covering arrays, regex, and substring operators.""" - _COLUMN_FIELDS = { - "filename": "filename", - } + def __init__( + self, + *, + metadata_column: str = "doc_metadata", + metadata_types_column: Optional[str] = "metadata_types", + column_fields: Optional[Dict[str, str]] = None, + ) -> None: + self.metadata_column = metadata_column + self.metadata_types_column = metadata_types_column + self._column_fields = column_fields or {"filename": "filename"} def build(self, filters: Optional[Dict[str, Any]]) -> str: """Construct a SQL WHERE clause from a metadata filter dictionary.""" @@ -113,7 +120,7 @@ def _combine_clauses(self, clauses: List[str], operator: str, context: str) -> s def _build_field_metadata_clause(self, field: str, value: Any) -> str: """Build SQL clause for a single metadata field.""" - if field in self._COLUMN_FIELDS: + if field in self._column_fields: return self._build_column_field_clause(field, value) if isinstance(value, dict) and not any(key.startswith("$") for key in value): @@ -192,7 +199,7 @@ def _build_single_value_clause(self, field: str, value: Any) -> str: def _build_column_field_clause(self, field: str, value: Any) -> str: """Build SQL clause for a reserved column field (e.g., filename).""" - column = self._COLUMN_FIELDS[field] + column = self._column_fields[field] builder = TextColumnFilterBuilder(column) if isinstance(value, dict): @@ -220,7 +227,7 @@ def _build_exists_clause(self, field: str, operand: Any) -> str: raise InvalidMetadataFilterError(f"$exists operator for field '{field}' expects a boolean value.") field_key = self._escape_single_quotes(field) - clause = f"(doc_metadata ? '{field_key}')" + clause = f"({self.metadata_column} ? '{field_key}')" return clause if expected else f"(NOT {clause})" def _build_comparison_clause(self, field: str, operator: str, operand: Any) -> str: @@ -270,9 +277,9 @@ def _build_numeric_comparison_clause(self, field: str, sql_operator: str, operan field_key = self._escape_single_quotes(field) type_expr = self._metadata_type_expr(field_key) - # Use CASE to ensure casting only happens when type is correct value_expr = ( - f"(CASE WHEN {type_expr} = 'number' THEN (doc_metadata ->> '{field_key}')::double precision ELSE NULL END)" + f"(CASE WHEN {type_expr} = 'number' THEN ({self.metadata_column} ->> '{field_key}')::double precision " + "ELSE NULL END)" ) return f"({value_expr} {sql_operator} {literal})" @@ -285,8 +292,10 @@ def _build_decimal_comparison_clause(self, field: str, sql_operator: str, operan field_key = self._escape_single_quotes(field) type_expr = self._metadata_type_expr(field_key) - # Use CASE to ensure casting only happens when type is correct - value_expr = f"(CASE WHEN {type_expr} = 'decimal' THEN (doc_metadata ->> '{field_key}')::numeric ELSE NULL END)" + value_expr = ( + f"(CASE WHEN {type_expr} = 'decimal' THEN ({self.metadata_column} ->> '{field_key}')::numeric " + "ELSE NULL END)" + ) return f"({value_expr} {sql_operator} {literal}::numeric)" def _build_datetime_comparison_clause(self, field: str, sql_operator: str, operand: Any) -> str: @@ -298,9 +307,9 @@ def _build_datetime_comparison_clause(self, field: str, sql_operator: str, opera field_key = self._escape_single_quotes(field) type_expr = self._metadata_type_expr(field_key) - # Use CASE to ensure casting only happens when type is correct value_expr = ( - f"(CASE WHEN {type_expr} = 'datetime' THEN (doc_metadata ->> '{field_key}')::timestamptz ELSE NULL END)" + f"(CASE WHEN {type_expr} = 'datetime' THEN ({self.metadata_column} ->> '{field_key}')::timestamptz " + "ELSE NULL END)" ) return f"({value_expr} {sql_operator} {literal})" @@ -313,8 +322,9 @@ def _build_date_comparison_clause(self, field: str, sql_operator: str, operand: field_key = self._escape_single_quotes(field) type_expr = self._metadata_type_expr(field_key) - # Use CASE to ensure casting only happens when type is correct - value_expr = f"(CASE WHEN {type_expr} = 'date' THEN (doc_metadata ->> '{field_key}')::date ELSE NULL END)" + value_expr = ( + f"(CASE WHEN {type_expr} = 'date' THEN ({self.metadata_column} ->> '{field_key}')::date " "ELSE NULL END)" + ) return f"({value_expr} {sql_operator} {literal})" def _build_string_comparison_clause(self, field: str, sql_operator: str, operand: str) -> str: @@ -322,7 +332,7 @@ def _build_string_comparison_clause(self, field: str, sql_operator: str, operand field_key = self._escape_single_quotes(field) escaped_value = self._escape_single_quotes(operand) type_expr = self._metadata_type_expr(field_key) - value_expr = f"(doc_metadata ->> '{field_key}')" + value_expr = f"({self.metadata_column} ->> '{field_key}')" # For strings without explicit type, assume string type (COALESCE handles missing metadata_types) return f"((COALESCE({type_expr}, 'string') = 'string') AND {value_expr} {sql_operator} '{escaped_value}')" @@ -345,8 +355,23 @@ def _build_type_clause(self, field: str, operand: Any) -> str: raise InvalidMetadataFilterError(str(exc)) from exc field_key = self._escape_single_quotes(field) - type_expr = f"COALESCE(metadata_types ->> '{field_key}', 'string')" - clauses = [f"({type_expr} = '{type_name}')" for type_name in canonical_types] + if not self.metadata_types_column: + jsonb_type_expr = f"jsonb_typeof({self.metadata_column} -> '{field_key}')" + type_map = { + "string": "string", + "number": "number", + "decimal": "number", + "boolean": "boolean", + "object": "object", + "array": "array", + "null": "null", + "datetime": "string", + "date": "string", + } + clauses = [f"({jsonb_type_expr} = '{type_map.get(type_name, type_name)}')" for type_name in canonical_types] + else: + type_expr = f"COALESCE({self.metadata_types_column} ->> '{field_key}', 'string')" + clauses = [f"({type_expr} = '{type_name}')" for type_name in canonical_types] if len(clauses) == 1: return clauses[0] return "(" + " OR ".join(clauses) + ")" @@ -367,7 +392,7 @@ def _jsonb_contains_clause(self, field: str, value: Any) -> str: ) from exc escaped_payload = json_payload.replace("'", "''") - base_clause = f"(doc_metadata @> '{escaped_payload}'::jsonb)" + base_clause = f"({self.metadata_column} @> '{escaped_payload}'::jsonb)" array_clause = self._build_array_membership_clause(field, value) if array_clause: @@ -391,8 +416,8 @@ def _build_array_membership_clause(self, field: str, value: Any) -> str: field_key = self._escape_single_quotes(field) return ( - f"((jsonb_typeof(doc_metadata -> '{field_key}') = 'array') " - f"AND ((doc_metadata -> '{field_key}') @> '{escaped_array_payload}'::jsonb))" + f"((jsonb_typeof({self.metadata_column} -> '{field_key}') = 'array') " + f"AND (({self.metadata_column} -> '{field_key}') @> '{escaped_array_payload}'::jsonb))" ) def _build_regex_clause(self, field: str, operand: Any) -> str: @@ -403,7 +428,7 @@ def _build_regex_clause(self, field: str, operand: Any) -> str: escaped_pattern = pattern.replace("\\", "\\\\").replace("'", "''") field_key = self._escape_single_quotes(field) - base_clause = f"((doc_metadata ->> '{field_key}') {regex_operator} '{escaped_pattern}')" + base_clause = f"(({self.metadata_column} ->> '{field_key}') {regex_operator} '{escaped_pattern}')" array_clause = self._build_array_regex_clause(field, regex_operator, escaped_pattern) if array_clause: return f"({base_clause} OR {array_clause})" @@ -440,8 +465,8 @@ def _build_array_regex_clause(self, field: str, regex_operator: str, escaped_pat field_key = self._escape_single_quotes(field) array_value_expr = "trim('\"' FROM arr.value::text)" return ( - f"((jsonb_typeof(doc_metadata -> '{field_key}') = 'array') AND EXISTS (" - f"SELECT 1 FROM jsonb_array_elements(doc_metadata -> '{field_key}') AS arr(value) " + f"((jsonb_typeof({self.metadata_column} -> '{field_key}') = 'array') AND EXISTS (" + f"SELECT 1 FROM jsonb_array_elements({self.metadata_column} -> '{field_key}') AS arr(value) " f"WHERE jsonb_typeof(arr.value) = 'string' AND {array_value_expr} {regex_operator} '{escaped_pattern}'))" ) @@ -453,7 +478,7 @@ def _build_contains_clause(self, field: str, operand: Any) -> str: escaped_pattern = self._escape_like_pattern(value) field_key = self._escape_single_quotes(field) - base_clause = f"((doc_metadata ->> '{field_key}') {like_operator} '%{escaped_pattern}%')" + base_clause = f"(({self.metadata_column} ->> '{field_key}') {like_operator} '%{escaped_pattern}%')" array_clause = self._build_array_like_clause(field, like_operator, escaped_pattern) if array_clause: return f"({base_clause} OR {array_clause})" @@ -492,15 +517,17 @@ def _build_array_like_clause(self, field: str, like_operator: str, escaped_patte field_key = self._escape_single_quotes(field) array_value_expr = "trim('\"' FROM arr.value::text)" return ( - f"((jsonb_typeof(doc_metadata -> '{field_key}') = 'array') AND EXISTS (" - f"SELECT 1 FROM jsonb_array_elements(doc_metadata -> '{field_key}') AS arr(value) " + f"((jsonb_typeof({self.metadata_column} -> '{field_key}') = 'array') AND EXISTS (" + f"SELECT 1 FROM jsonb_array_elements({self.metadata_column} -> '{field_key}') AS arr(value) " f"WHERE jsonb_typeof(arr.value) = 'string' AND " f"{array_value_expr} {like_operator} '%{escaped_pattern}%'))" ) def _metadata_type_expr(self, field_key: str) -> str: """Return SQL expression fetching the stored metadata type for a field.""" - return f"(metadata_types ->> '{field_key}')" + if not self.metadata_types_column: + return "NULL" + return f"({self.metadata_types_column} ->> '{field_key}')" def _map_comparison_operator(self, operator: str) -> str: """Map comparison operators to SQL symbols.""" diff --git a/core/models/v2.py b/core/models/v2.py new file mode 100644 index 00000000..c8eb41da --- /dev/null +++ b/core/models/v2.py @@ -0,0 +1,43 @@ +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel, Field, model_validator + + +class V2IngestResponse(BaseModel): + document_id: str + filename: str + chunk_count: int + status: Optional[str] = None + + +class V2RetrieveFilters(BaseModel): + document_ids: Optional[List[str]] = Field(default=None, description="Limit to specific document IDs") + folder_paths: Optional[List[str]] = Field(default=None, description="Limit to specific folder paths") + metadata: Optional[Dict[str, Any]] = Field(default=None, description="Chunk-level metadata filters") + + +class V2RetrieveRequest(BaseModel): + query: str + filters: Optional[V2RetrieveFilters] = None + top_k: int = Field(default=5, ge=1, le=100) + end_user_id: Optional[str] = Field(default=None, description="Optional end-user scope") + + @model_validator(mode="after") + def validate_query(self): + if not self.query or not self.query.strip(): + raise ValueError("query must be a non-empty string") + return self + + +class V2ChunkResult(BaseModel): + chunk_id: str + document_id: str + page_number: Optional[int] = None + chunk_number: Optional[int] = None + score: float + content: str + + +class V2RetrieveResponse(BaseModel): + query: str + chunks: List[V2ChunkResult] diff --git a/core/parser/docling_v2.py b/core/parser/docling_v2.py new file mode 100644 index 00000000..851cf891 --- /dev/null +++ b/core/parser/docling_v2.py @@ -0,0 +1,218 @@ +import logging +import os +import tempfile +from html import escape as html_escape +from typing import Dict, List, Optional, Tuple + +from docling.datamodel.base_models import InputFormat +from docling.datamodel.pipeline_options import EasyOcrOptions, PdfPipelineOptions, TableStructureOptions +from docling.document_converter import DocumentConverter, PdfFormatOption +from docling_core.types.doc.document import ContentLayer +from docling_core.types.doc.labels import DocItemLabel + +logger = logging.getLogger(__name__) + + +class DoclingV2Parser: + """Docling parser that returns page-wise XML chunks with bbox metadata.""" + + _docling_converter: Optional[DocumentConverter] = None + + @classmethod + def _get_converter(cls) -> DocumentConverter: + if cls._docling_converter is None: + pipeline_options = PdfPipelineOptions() + pipeline_options.do_ocr = True + try: + import easyocr # noqa: F401 + + pipeline_options.ocr_options = EasyOcrOptions(lang=["en"]) + except ImportError: + logger.info("EasyOCR not installed; disabling OCR for Docling v2 parser.") + pipeline_options.do_ocr = False + + pipeline_options.do_table_structure = True + pipeline_options.table_structure_options = TableStructureOptions(mode="accurate") + pipeline_options.images_scale = 2.0 + pipeline_options.generate_picture_images = True + + cls._docling_converter = DocumentConverter( + format_options={ + InputFormat.PDF: PdfFormatOption(pipeline_options=pipeline_options), + } + ) + return cls._docling_converter + + @classmethod + def convert_bytes(cls, file_bytes: bytes, filename: str): + """Convert a file (bytes) to a Docling document.""" + suffix = os.path.splitext(filename)[1] or ".pdf" + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: + temp_file.write(file_bytes) + temp_path = temp_file.name + + try: + converter = cls._get_converter() + result = converter.convert(temp_path) + return result.document + finally: + try: + os.unlink(temp_path) + except OSError: + pass + + @staticmethod + def _label_key(label: object) -> str: + if hasattr(label, "name"): + return str(label.name).upper() + if hasattr(label, "value"): + return str(label.value).upper() + return str(label).upper() + + @staticmethod + def _bbox_to_loc(bbox, page_width: float, page_height: float) -> Optional[str]: + if bbox is None or page_width <= 0 or page_height <= 0: + return None + + def _norm(value: float, max_value: float) -> int: + scaled = (value / max_value) * 500 + return max(0, min(500, int(round(scaled)))) + + x1 = _norm(bbox.l, page_width) + y1 = _norm(bbox.t, page_height) + x2 = _norm(bbox.r, page_width) + y2 = _norm(bbox.b, page_height) + return f"{x1},{y1},{x2},{y2}" + + @classmethod + def build_page_xml_chunks( + cls, + doc, + document_id: str, + filename: str, + ) -> List[Tuple[str, int]]: + """Build one XML chunk per page with bbox metadata.""" + label_to_tag: Dict[DocItemLabel, str] = {} + + def _add(label_name: str, tag: str) -> None: + label = getattr(DocItemLabel, label_name, None) + if label is not None: + label_to_tag[label] = tag + + _add("TEXT", "t") + _add("PARAGRAPH", "t") + _add("SECTION_HEADER", "h") + _add("TITLE", "title") + _add("PAGE_HEADER", "r") + _add("PAGE_FOOTER", "f") + _add("TABLE", "tbl") + _add("PICTURE", "img") + _add("CHART", "chart") + _add("LIST_ITEM", "li") + _add("CAPTION", "cap") + _add("FOOTNOTE", "fn") + _add("FORMULA", "math") + _add("CODE", "code") + _add("CHECKBOX_SELECTED", "cb") + _add("CHECKBOX_UNSELECTED", "cb") + _add("FORM", "form") + _add("KEY_VALUE_REGION", "kv") + _add("REFERENCE", "ref") + _add("DOCUMENT_INDEX", "idx") + _add("HANDWRITTEN_TEXT", "hw") + + label_by_name = {cls._label_key(k): v for k, v in label_to_tag.items()} + table_label = getattr(DocItemLabel, "TABLE", None) + checkbox_selected_label = getattr(DocItemLabel, "CHECKBOX_SELECTED", None) + + pages: Dict[int, List[str]] = {} + + for item, _level in doc.iterate_items(included_content_layers={ContentLayer.BODY, ContentLayer.FURNITURE}): + if not hasattr(item, "prov") or not item.prov: + continue + + prov = item.prov[0] + page_no = getattr(prov, "page_no", None) + if page_no is None: + continue + + page = None + pages_obj = getattr(doc, "pages", None) + if isinstance(pages_obj, dict): + page = pages_obj.get(page_no) + elif isinstance(pages_obj, list): + idx = page_no - 1 + if 0 <= idx < len(pages_obj): + page = pages_obj[idx] + else: + try: + page = pages_obj[page_no] + except Exception: # noqa: BLE001 + page = None + if not page or not getattr(page, "size", None): + continue + + bbox = getattr(prov, "bbox", None) + loc = cls._bbox_to_loc(bbox, page.size.width, page.size.height) + + label = getattr(item, "label", None) + tag = None + if label in label_to_tag: + tag = label_to_tag[label] + elif label is not None: + tag = label_by_name.get(cls._label_key(label)) + if not tag: + tag = "t" + + text = "" + if table_label is not None and label == table_label and hasattr(item, "export_to_markdown"): + try: + text = item.export_to_markdown(doc=doc) + except TypeError: + text = item.export_to_markdown() + except Exception: # noqa: BLE001 + text = "" + elif hasattr(item, "text") and item.text: + text = item.text + elif hasattr(item, "export_to_markdown"): + try: + text = item.export_to_markdown(doc=doc) + except TypeError: + text = item.export_to_markdown() + except Exception: # noqa: BLE001 + text = "" + + text = text or "" + text = text.strip() + if not text and tag not in {"img", "chart"}: + continue + + attr_parts = [] + if loc: + attr_parts.append(f'loc="{loc}"') + + if tag == "cb": + checked = "true" if label == checkbox_selected_label else "false" + attr_parts.append(f'checked="{checked}"') + + attr_str = (" " + " ".join(attr_parts)) if attr_parts else "" + + if tag in {"img", "chart"}: + # Always self-closing for images/charts - no base64 data + element = f"<{tag}{attr_str}/>" + else: + escaped_text = html_escape(text, quote=False) + element = f"<{tag}{attr_str}>{escaped_text}" + + pages.setdefault(page_no, []).append(element) + + file_attr = html_escape(filename, quote=True) + doc_attr = html_escape(document_id, quote=True) + xml_chunks: List[Tuple[str, int]] = [] + + for page_no in sorted(pages.keys()): + elements = "\n".join(pages[page_no]) + xml = f'' f'

{elements}

' "
" + xml_chunks.append((xml, page_no)) + + return xml_chunks diff --git a/core/routes/v2.py b/core/routes/v2.py new file mode 100644 index 00000000..81865204 --- /dev/null +++ b/core/routes/v2.py @@ -0,0 +1,135 @@ +import logging +from typing import Optional + +import arq +from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile + +from core.auth_utils import verify_token +from core.database.metadata_filters import InvalidMetadataFilterError +from core.dependencies import get_redis_pool +from core.models.auth import AuthContext +from core.models.responses import DocumentDeleteResponse +from core.models.v2 import V2ChunkResult, V2IngestResponse, V2RetrieveRequest, V2RetrieveResponse +from core.routes.utils import parse_json_dict +from core.services_init import document_service, v2_document_service + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/v2", tags=["V2"]) + + +def _require_app_id(auth: AuthContext) -> None: + if not auth.app_id: + raise HTTPException(status_code=403, detail="app_id is required for v2 endpoints") + + +@router.post("/documents", response_model=V2IngestResponse) +async def ingest_document_v2( + file: Optional[UploadFile] = File(None), + content: Optional[str] = Form(None), + filename: Optional[str] = Form(None), + metadata: str = Form("{}"), + metadata_types: str = Form("{}"), + folder_path: Optional[str] = Form(None), + end_user_id: Optional[str] = Form(None), + auth: AuthContext = Depends(verify_token), + redis: arq.ArqRedis = Depends(get_redis_pool), +) -> V2IngestResponse: + try: + _require_app_id(auth) + metadata_dict = parse_json_dict(metadata, "metadata", default={}) + metadata_types_dict = parse_json_dict(metadata_types, "metadata_types", default={}) + + file_bytes = None + resolved_filename = filename + content_type = None + if file is not None: + file_bytes = await file.read() + resolved_filename = resolved_filename or file.filename or "uploaded_file" + content_type = file.content_type + + result = await v2_document_service.ingest_document( + file_bytes=file_bytes, + filename=resolved_filename, + content=content, + content_type=content_type, + metadata=metadata_dict, + metadata_types=metadata_types_dict, + folder_path=folder_path, + end_user_id=end_user_id, + auth=auth, + redis=redis, + ) + + return V2IngestResponse( + document_id=result["document_id"], + filename=result["filename"], + chunk_count=result["chunk_count"], + status=result.get("status"), + ) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) + except HTTPException: + raise + except Exception as exc: # noqa: BLE001 + logger.error("Error during v2 ingestion: %s", exc) + raise HTTPException(status_code=500, detail=f"Error during v2 ingestion: {exc}") + + +@router.post("/retrieve/chunks", response_model=V2RetrieveResponse) +async def retrieve_chunks_v2( + request: V2RetrieveRequest, + auth: AuthContext = Depends(verify_token), +) -> V2RetrieveResponse: + _require_app_id(auth) + filters = request.filters + document_ids = filters.document_ids if filters else None + folder_paths = filters.folder_paths if filters else None + metadata_filters = filters.metadata if filters else None + + try: + results = await v2_document_service.retrieve_chunks( + query=request.query, + top_k=request.top_k, + auth=auth, + document_ids=document_ids, + folder_paths=folder_paths, + metadata_filters=metadata_filters, + end_user_id=request.end_user_id, + ) + except InvalidMetadataFilterError as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + + chunks = [ + V2ChunkResult( + chunk_id=item["id"], + document_id=item["document_id"], + page_number=item.get("page_number"), + chunk_number=item.get("chunk_number"), + score=item["score"], + content=item["content"], + ) + for item in results + ] + + return V2RetrieveResponse(query=request.query, chunks=chunks) + + +@router.delete("/documents/{document_id}", response_model=DocumentDeleteResponse) +async def delete_document_v2( + document_id: str, + auth: AuthContext = Depends(verify_token), +) -> DocumentDeleteResponse: + try: + _require_app_id(auth) + success = await document_service.delete_document(document_id, auth) + if not success: + raise HTTPException(status_code=404, detail="Document not found or delete failed") + return {"status": "success", "message": f"Document {document_id} deleted successfully"} + except PermissionError as exc: + raise HTTPException(status_code=403, detail=str(exc)) from exc + except HTTPException: + raise + except Exception as exc: # noqa: BLE001 + logger.error("Error deleting v2 document %s: %s", document_id, exc) + raise HTTPException(status_code=500, detail=f"Error deleting document: {exc}") diff --git a/core/services/document_service.py b/core/services/document_service.py index a614dd83..2feff977 100644 --- a/core/services/document_service.py +++ b/core/services/document_service.py @@ -28,6 +28,7 @@ from core.reranker.base_reranker import BaseReranker from core.storage.base_storage import BaseStorage from core.vector_store.base_vector_store import BaseVectorStore +from core.vector_store.chunk_v2_store import ChunkV2Store from core.vector_store.utils import derive_repaired_image_key, is_storage_key, normalize_storage_key from ..models.auth import AuthContext @@ -66,6 +67,7 @@ def __init__( enable_colpali: bool = False, colpali_embedding_model: Optional[ColpaliEmbeddingModel] = None, colpali_vector_store: Optional[BaseVectorStore] = None, + v2_chunk_store: Optional[ChunkV2Store] = None, ): self.db = database self.vector_store = vector_store @@ -76,6 +78,7 @@ def __init__( self.reranker = reranker self.colpali_embedding_model = colpali_embedding_model self.colpali_vector_store = colpali_vector_store + self.v2_chunk_store = v2_chunk_store # MultiVectorStore initialization is now handled in the FastAPI startup event # so we don't need to initialize it here again @@ -1771,6 +1774,13 @@ async def delete_document(self, document_id: str, auth: AuthContext) -> bool: logger.error(f"User {auth.user_id} doesn't have write access to document {document_id}") raise PermissionError(f"User doesn't have write access to document {document_id}") + # Delete v2 chunks first to satisfy FK constraints when present. + if self.v2_chunk_store and auth.app_id: + v2_deleted = await self.v2_chunk_store.delete_chunks_by_document_id(document_id, auth) + if not v2_deleted: + logger.error("Failed to delete v2 chunks for document %s", document_id) + return False + # Delete document from database db_success = await self.db.delete_document(document_id, auth) if not db_success: diff --git a/core/services/v2_document_service.py b/core/services/v2_document_service.py new file mode 100644 index 00000000..0ed96cae --- /dev/null +++ b/core/services/v2_document_service.py @@ -0,0 +1,648 @@ +import logging +import os +import re +import shutil +import subprocess +import tempfile +import uuid +from datetime import UTC, datetime, timedelta +from html import escape as html_escape +from pathlib import Path +from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple + +import arq +from fastapi import HTTPException + +from core.config import get_settings +from core.database.postgres_database import PostgresDatabase +from core.embedding.base_embedding_model import BaseEmbeddingModel +from core.limits_utils import check_and_increment_limits, estimate_pages_by_chars +from core.models.auth import AuthContext +from core.models.chunk import Chunk +from core.models.documents import Document +from core.parser.docling_v2 import DoclingV2Parser +from core.parser.morphik_parser import MorphikParser +from core.storage.base_storage import BaseStorage +from core.storage.utils_file_extensions import detect_content_type +from core.utils.folder_utils import normalize_ingest_folder_inputs +from core.utils.typed_metadata import MetadataBundle, normalize_metadata +from core.vector_store.chunk_v2_store import ChunkV2Store + +logger = logging.getLogger(__name__) +settings = get_settings() + + +class V2DocumentService: + """Service for v2 ingestion + retrieval (chunk_v2 store).""" + + _TEXT_EXTENSIONS = { + ".txt", + ".md", + ".markdown", + ".json", + ".csv", + ".tsv", + ".log", + ".rst", + ".yaml", + ".yml", + } + + _USER_IMMUTABLE_FIELDS = { + "folder_name", + "folder_id", + "folder_path", + "external_id", + "filename", + "app_id", + "owner_id", + "end_user_id", + } + + def __init__( + self, + database: PostgresDatabase, + storage: BaseStorage, + parser: MorphikParser, + embedding_model: BaseEmbeddingModel, + chunk_store: ChunkV2Store, + ): + self.db = database + self.storage = storage + self.parser = parser + self.embedding_model = embedding_model + self.chunk_store = chunk_store + self.docling_parser = DoclingV2Parser() + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + def _enforce_no_user_mutable_fields( + self, + metadata: Optional[Dict[str, Any]], + metadata_types: Optional[Dict[str, Any]] = None, + context: str = "ingest", + ) -> None: + invalid_fields = set() + if isinstance(metadata, dict): + invalid_fields.update({key for key in metadata.keys() if key in self._USER_IMMUTABLE_FIELDS}) + if isinstance(metadata_types, dict): + invalid_fields.update({key for key in metadata_types.keys() if key in self._USER_IMMUTABLE_FIELDS}) + if invalid_fields: + fields_str = ", ".join(sorted(invalid_fields)) + raise ValueError( + f"The following fields are managed by Morphik and cannot be set during {context}: {fields_str}. " + "Remove them from the request." + ) + + @staticmethod + def _normalize_text_filename(filename: Optional[str], content: str) -> str: + def _needs_html_ext(text: str) -> bool: + head = text.lstrip().lower() + return head.startswith(" Tuple[str, str]: + from core.storage.utils_file_extensions import detect_file_type + + safe_filename = Path(filename or "").name or "uploaded_file" + storage_key = f"ingest_uploads/{uuid.uuid4()}/{safe_filename}" + if not Path(storage_key).suffix: + detected_ext = detect_file_type(content_bytes) + if detected_ext: + storage_key = f"{storage_key}{detected_ext}" + if not Path(safe_filename).suffix: + safe_filename = f"{safe_filename}{detected_ext}" + return storage_key, safe_filename + + async def _upload_content_bytes( + self, + *, + content_bytes: bytes, + filename: Optional[str], + content_type: Optional[str], + ) -> Tuple[str, str, str]: + storage_key, safe_filename = self._build_storage_key(filename, content_bytes) + bucket_name, full_storage_path = await self.storage.upload_file( + file=content_bytes, + key=storage_key, + content_type=content_type, + ) + return bucket_name, full_storage_path, safe_filename + + async def _verify_ingest_and_storage_limits( + self, + auth: AuthContext, + content_length: int, + document_id: str, + ) -> None: + if settings.MODE != "cloud" or not auth.user_id: + return + num_pages = estimate_pages_by_chars(content_length) + await check_and_increment_limits(auth, "ingest", num_pages, document_id, verify_only=True) + await check_and_increment_limits(auth, "storage_file", 1, verify_only=True) + await check_and_increment_limits(auth, "storage_size", content_length, verify_only=True) + + async def _record_storage_usage(self, auth: AuthContext, content_length: int, document_id: str) -> None: + if settings.MODE != "cloud" or not auth.user_id: + return + try: + await check_and_increment_limits(auth, "storage_file", 1) + await check_and_increment_limits(auth, "storage_size", content_length) + except Exception as rec_err: # noqa: BLE001 + logger.error("Failed recording storage usage for doc %s: %s", document_id, rec_err) + + async def _record_raw_storage_bytes( + self, auth: Optional[AuthContext], document_id: str, content_length: int + ) -> None: + if not auth or not document_id: + return + try: + await self.db.set_document_raw_bytes(document_id, auth.app_id, content_length) + except Exception as rec_err: # noqa: BLE001 + logger.error("Failed recording raw storage bytes for doc %s: %s", document_id, rec_err) + + async def _get_storage_object_size(self, bucket: str, key: str) -> Optional[int]: + if not key or not hasattr(self.storage, "get_object_size"): + return None + try: + return await self.storage.get_object_size(bucket, key) + except Exception as size_err: # noqa: BLE001 + logger.warning("Failed reading stored size for %s/%s: %s", bucket, key, size_err) + return None + + @staticmethod + def _strip_xml_tags(text: str) -> str: + import html + + without_tags = re.sub(r"<[^>]+>", " ", text) + collapsed = re.sub(r"\s+", " ", without_tags).strip() + return html.unescape(collapsed) + + @staticmethod + def _build_auth_dict(auth: AuthContext) -> Dict[str, Any]: + user_id = getattr(auth, "user_id", None) + return { + "user_id": user_id, + "entity_id": user_id, + "app_id": auth.app_id, + } + + @staticmethod + def _resolve_content_type(content_bytes: bytes, filename: Optional[str], content_type_hint: Optional[str]) -> str: + return detect_content_type(content=content_bytes, filename=filename, content_type_hint=content_type_hint) + + @staticmethod + def _build_storage_info( + bucket: str, key: str, filename: Optional[str], content_type: Optional[str] + ) -> Dict[str, str]: + return { + "bucket": bucket, + "key": key, + "filename": filename or "", + "content_type": content_type or "", + } + + @staticmethod + def _reset_processing_metadata(system_metadata: Optional[Dict[str, Any]]) -> Dict[str, Any]: + cleaned_metadata = dict(system_metadata or {}) + cleaned_metadata.pop("progress", None) + cleaned_metadata.pop("error", None) + cleaned_metadata["status"] = "processing" + cleaned_metadata["updated_at"] = datetime.now(UTC) + return cleaned_metadata + + @classmethod + def _build_ingestion_job_payload( + cls, + *, + document_id: str, + file_key: str, + bucket: str, + original_filename: Optional[str], + content_type: Optional[str], + auth: AuthContext, + folder_path: Optional[str] = None, + end_user_id: Optional[str] = None, + force_plain_text: bool = False, + ) -> Dict[str, Any]: + return { + "_job_id": f"v2_ingest:{document_id}", + "_expires": timedelta(days=7), + "document_id": document_id, + "file_key": file_key, + "bucket": bucket, + "original_filename": original_filename, + "content_type": content_type, + "auth_dict": cls._build_auth_dict(auth), + "folder_path": folder_path, + "end_user_id": end_user_id, + "force_plain_text": bool(force_plain_text), + } + + async def _mark_document_failed(self, doc: Document, auth: AuthContext, error: str) -> None: + failure_metadata = dict(doc.system_metadata or {}) + failure_metadata["status"] = "failed" + failure_metadata["error"] = error + failure_metadata["updated_at"] = datetime.now(UTC) + doc.system_metadata = failure_metadata + try: + await self.db.update_document(doc.external_id, {"system_metadata": failure_metadata}, auth=auth) + except Exception as db_update_err: # noqa: BLE001 + logger.error("Failed to mark doc %s as failed: %s", doc.external_id, db_update_err) + + @staticmethod + def _is_rich_doc(filename: str) -> bool: + """Check if filename indicates a rich document (has bbox/pages).""" + ext = Path(filename).suffix.lower() + return ext in {".pdf", ".pptx", ".docx"} + + @staticmethod + def _strip_metadata_scope(metadata: Dict[str, Any]) -> Dict[str, Any]: + cleaned = dict(metadata) + for key in {"folder_name", "folder_id", "end_user_id", "app_id"}: + cleaned.pop(key, None) + return cleaned + + @staticmethod + def _convert_office_to_pdf_bytes(file_bytes: bytes, suffix: str, doc_type: str) -> bytes: + if not shutil.which("soffice"): + raise HTTPException( + status_code=500, + detail=f"LibreOffice is required to convert {doc_type} to PDF. Install 'soffice' and retry.", + ) + + with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as temp_input: + temp_input.write(file_bytes) + temp_input_path = temp_input.name + + with tempfile.TemporaryDirectory() as output_dir: + try: + result = subprocess.run( + [ + "soffice", + "--headless", + "--convert-to", + "pdf", + "--outdir", + output_dir, + temp_input_path, + ], + capture_output=True, + text=True, + timeout=30, + ) + if result.returncode != 0: + raise HTTPException( + status_code=500, + detail=f"LibreOffice conversion failed: {result.stderr.strip() or result.stdout.strip()}", + ) + + base_filename = Path(temp_input_path).stem + pdf_path = Path(output_dir) / f"{base_filename}.pdf" + if not pdf_path.exists() or pdf_path.stat().st_size == 0: + raise HTTPException(status_code=500, detail="LibreOffice produced an empty PDF.") + + return pdf_path.read_bytes() + finally: + try: + os.unlink(temp_input_path) + except OSError: + pass + + async def _build_xml_chunks( + self, + *, + file_bytes: bytes, + filename: str, + document_id: str, + force_plain_text: bool = False, + ) -> Tuple[List[Tuple[str, int]], bool]: + xml_chunks: List[Tuple[str, int]] = [] + ext = Path(filename).suffix.lower() + is_rich = self._is_rich_doc(filename) + + if force_plain_text or ext in self._TEXT_EXTENSIONS: + is_rich = False + text_content = file_bytes.decode("utf-8", errors="ignore") + chunks = await self.parser.split_text(text_content) + if not chunks: + raise ValueError("No text chunks produced for plain text document") + + file_attr = html_escape(filename, quote=True) + doc_attr = html_escape(document_id, quote=True) + for idx, chunk in enumerate(chunks, start=1): + escaped_text = html_escape(chunk.content, quote=False) + xml = f'' f"{escaped_text}" "" + xml_chunks.append((xml, idx)) + return xml_chunks, is_rich + + if ext in {".doc"}: + raise HTTPException(status_code=400, detail=".doc files are not supported in v2") + if ext in {".ppt"}: + raise HTTPException(status_code=400, detail=".ppt files are not supported in v2") + if ext in {".xls", ".xlsx"}: + raise HTTPException(status_code=400, detail="Excel files are not supported in v2") + if ext in {".html", ".htm"}: + raise HTTPException(status_code=400, detail="HTML files are not supported in v2") + if ext in {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".tif", ".webp"}: + raise HTTPException(status_code=400, detail="Images are not supported in v2") + + parse_bytes = file_bytes + parse_filename = filename + if ext == ".docx": + parse_bytes = self._convert_office_to_pdf_bytes(file_bytes, ".docx", "Word document") + parse_filename = f"{Path(filename).stem}.pdf" + + docling_doc = None + if ext == ".pptx": + try: + docling_doc = self.docling_parser.convert_bytes(parse_bytes, parse_filename) + except Exception: # noqa: BLE001 + parse_bytes = self._convert_office_to_pdf_bytes( + file_bytes, + ".pptx", + "PowerPoint presentation", + ) + parse_filename = f"{Path(filename).stem}.pdf" + docling_doc = self.docling_parser.convert_bytes(parse_bytes, parse_filename) + else: + docling_doc = self.docling_parser.convert_bytes(parse_bytes, parse_filename) + xml_chunks = self.docling_parser.build_page_xml_chunks( + docling_doc, + document_id, + filename, + ) + + if not xml_chunks: + raise ValueError("No page chunks extracted from document") + + return xml_chunks, is_rich + + async def process_document_bytes( + self, + *, + doc: Document, + file_bytes: bytes, + auth: AuthContext, + force_plain_text: bool = False, + progress_cb: Optional[Callable[[str], Awaitable[None]]] = None, + ) -> Dict[str, Any]: + filename = doc.filename or "uploaded_file" + + try: + if progress_cb: + await progress_cb("Parsing file") + xml_chunks, is_rich = await self._build_xml_chunks( + file_bytes=file_bytes, + filename=filename, + document_id=doc.external_id, + force_plain_text=force_plain_text, + ) + except HTTPException: + raise + except Exception as exc: # noqa: BLE001 + await self._mark_document_failed(doc, auth, f"Parsing failed: {exc}") + raise HTTPException(status_code=500, detail=f"Failed to parse document: {exc}") from exc + + stripped_texts = [self._strip_xml_tags(chunk_xml) for chunk_xml, _ in xml_chunks] + embedding_chunks = [Chunk(content=text, metadata={}) for text in stripped_texts] + + try: + if progress_cb: + await progress_cb("Generating embeddings") + embeddings = await self.embedding_model.embed_for_ingestion(embedding_chunks) + if len(embeddings) != len(xml_chunks): + raise ValueError("Embedding count mismatch") + except Exception as exc: # noqa: BLE001 + await self._mark_document_failed(doc, auth, f"Embedding failed: {exc}") + raise HTTPException(status_code=500, detail=f"Failed to generate embeddings: {exc}") from exc + + metadata_values = doc.metadata or {} + metadata_types = doc.metadata_types or {} + + chunk_records = [] + for idx, (xml, number) in enumerate(xml_chunks): + chunk_records.append( + { + "id": uuid.uuid4(), + "document_id": doc.external_id, + "content": xml, + "embedding": embeddings[idx], + "page_number": number if is_rich else None, + "chunk_number": number if not is_rich else None, + "app_id": auth.app_id, + "end_user_id": doc.end_user_id, + "folder_path": doc.folder_path, + "doc_metadata": metadata_values, + "metadata_types": metadata_types, + "filename": filename, + } + ) + + stored_ids: List[str] = [] + store_metrics: Dict[str, Any] = {} + try: + if progress_cb: + await progress_cb("Storing chunks") + _success, stored_ids, store_metrics = await self.chunk_store.store_chunks(chunk_records) + except Exception as exc: # noqa: BLE001 + await self._mark_document_failed(doc, auth, f"Chunk storage failed: {exc}") + raise HTTPException(status_code=500, detail=f"Failed to store chunks: {exc}") from exc + + page_count = len(xml_chunks) + doc.system_metadata = dict(doc.system_metadata or {}) + doc.system_metadata["page_count"] = max(1, page_count) + doc.system_metadata["status"] = "completed" + doc.system_metadata["updated_at"] = datetime.now(UTC) + + try: + await self.db.update_document(doc.external_id, {"system_metadata": doc.system_metadata}, auth=auth) + except Exception as exc: # noqa: BLE001 + logger.error("Failed to update v2 document status: %s", exc) + + if settings.MODE == "cloud" and auth.user_id: + try: + await check_and_increment_limits(auth, "ingest", doc.system_metadata["page_count"], doc.external_id) + except Exception as rec_exc: # noqa: BLE001 + logger.error("Failed to record v2 ingest usage: %s", rec_exc) + + if store_metrics: + try: + await self.db.record_document_storage_deltas( + doc.external_id, + auth.app_id, + chunk_bytes_delta=int(store_metrics.get("chunk_payload_bytes") or 0), + multivector_bytes_delta=0, + ) + except Exception as rec_exc: # noqa: BLE001 + logger.error("Failed to record v2 chunk storage bytes: %s", rec_exc) + + return { + "document_id": doc.external_id, + "filename": filename, + "chunk_count": len(xml_chunks), + "chunk_ids": stored_ids, + } + + async def ingest_document( + self, + *, + file_bytes: Optional[bytes], + filename: Optional[str], + content: Optional[str], + content_type: Optional[str], + metadata: Optional[Dict[str, Any]], + metadata_types: Optional[Dict[str, str]], + folder_path: Optional[str], + end_user_id: Optional[str], + auth: AuthContext, + redis: Optional[arq.ArqRedis] = None, + queue: Optional[bool] = None, + ) -> Dict[str, Any]: + if not auth.app_id: + raise HTTPException(status_code=403, detail="app_id is required for v2 ingestion") + if bool(file_bytes) == bool(content): + raise ValueError("Provide either file or content, not both.") + + self._enforce_no_user_mutable_fields(metadata, metadata_types=metadata_types, context="ingest") + + force_plain_text = content is not None + if content is not None: + if not filename: + raise ValueError("filename is required when using content") + filename = self._normalize_text_filename(filename, content) + file_bytes = content.encode("utf-8") + resolved_content_type = self._resolve_content_type(file_bytes, filename, content_type) + else: + if not file_bytes: + raise ValueError("file bytes are required when ingesting a file") + filename = filename or "uploaded_file" + resolved_content_type = self._resolve_content_type(file_bytes, filename, content_type) + + normalized_folder = normalize_ingest_folder_inputs(folder_path=folder_path) + folder_path_value, folder_leaf = normalized_folder.path, normalized_folder.leaf + + doc = Document( + filename=filename, + content_type=resolved_content_type, + metadata=metadata or {}, + app_id=auth.app_id, + end_user_id=end_user_id, + folder_name=folder_leaf, + folder_path=folder_path_value, + ) + doc.system_metadata = self._reset_processing_metadata(doc.system_metadata) + + await self._verify_ingest_and_storage_limits(auth, len(file_bytes), doc.external_id) + + metadata_payload = dict(metadata or {}) + metadata_payload.setdefault("external_id", doc.external_id) + if normalized_folder.metadata_value is not None: + metadata_payload["folder_name"] = normalized_folder.metadata_value + + metadata_bundle: MetadataBundle = normalize_metadata(metadata_payload, metadata_types) + doc.metadata = metadata_bundle.values + doc.metadata_types = metadata_bundle.types + + await self.db.store_document(doc, auth, metadata_bundle=metadata_bundle) + logger.info("v2 document record created for %s (doc_id=%s)", filename, doc.external_id) + + try: + bucket_name, storage_key, safe_filename = await self._upload_content_bytes( + content_bytes=file_bytes, + filename=filename, + content_type=resolved_content_type, + ) + doc.storage_info = self._build_storage_info(bucket_name, storage_key, safe_filename, resolved_content_type) + doc.system_metadata = self._strip_metadata_scope(doc.system_metadata) + await self.db.update_document( + document_id=doc.external_id, + updates={"storage_info": doc.storage_info, "system_metadata": doc.system_metadata}, + auth=auth, + ) + await self._record_storage_usage(auth, len(file_bytes), doc.external_id) + stored_size = await self._get_storage_object_size(bucket_name, storage_key) + if stored_size is not None: + await self._record_raw_storage_bytes(auth, doc.external_id, stored_size) + except Exception as exc: # noqa: BLE001 + await self._mark_document_failed(doc, auth, f"Storage upload failed: {exc}") + raise HTTPException(status_code=500, detail=f"Failed to upload file to storage: {exc}") from exc + + if queue is None: + queue = redis is not None + + if queue: + if redis is None: + raise ValueError("redis pool is required to queue v2 ingestion") + try: + job_payload = self._build_ingestion_job_payload( + document_id=doc.external_id, + file_key=storage_key, + bucket=bucket_name, + original_filename=filename, + content_type=resolved_content_type, + auth=auth, + folder_path=folder_path_value, + end_user_id=end_user_id, + force_plain_text=force_plain_text, + ) + job = await redis.enqueue_job("process_v2_ingestion_job", **job_payload) + if job is None: + logger.info("V2 ingestion job already queued (doc_id=%s)", doc.external_id) + else: + logger.info("V2 ingestion job queued (job_id=%s, doc=%s)", job.job_id, doc.external_id) + except Exception as exc: # noqa: BLE001 + await self._mark_document_failed(doc, auth, f"Failed to enqueue processing job: {exc}") + raise HTTPException(status_code=500, detail=f"Failed to enqueue v2 processing job: {exc}") from exc + + return { + "document_id": doc.external_id, + "filename": filename, + "chunk_count": 0, + "status": "queued", + } + + result = await self.process_document_bytes( + doc=doc, + file_bytes=file_bytes, + auth=auth, + force_plain_text=force_plain_text, + ) + result["status"] = "completed" + return result + + async def retrieve_chunks( + self, + *, + query: str, + top_k: int, + auth: AuthContext, + document_ids: Optional[List[str]] = None, + folder_paths: Optional[List[str]] = None, + metadata_filters: Optional[Dict[str, Any]] = None, + end_user_id: Optional[str] = None, + ) -> List[Dict[str, Any]]: + if not auth.app_id: + raise HTTPException(status_code=403, detail="app_id is required for v2 retrieval") + embedding = await self.embedding_model.embed_query(query) + return await self.chunk_store.query_similar( + embedding, + k=top_k, + auth=auth, + document_ids=document_ids, + folder_paths=folder_paths, + metadata_filters=metadata_filters, + end_user_id=end_user_id, + ) diff --git a/core/services_init.py b/core/services_init.py index 50cf4a6c..66bd2d98 100644 --- a/core/services_init.py +++ b/core/services_init.py @@ -24,8 +24,10 @@ from core.reranker.flag_reranker import FlagReranker from core.services.document_service import DocumentService from core.services.ingestion_service import IngestionService +from core.services.v2_document_service import V2DocumentService from core.storage.local_storage import LocalStorage from core.storage.s3_storage import S3Storage +from core.vector_store.chunk_v2_store import ChunkV2Store from core.vector_store.dual_multivector_store import DualMultiVectorStore from core.vector_store.fast_multivector_store import FastMultiVectorStore from core.vector_store.multi_vector_store import MultiVectorStore @@ -53,6 +55,9 @@ vector_store = PGVectorStore(uri=settings.POSTGRES_URI) logger.debug("Created PGVectorStore singleton") +v2_chunk_store = ChunkV2Store(uri=settings.POSTGRES_URI) +logger.debug("Created ChunkV2Store singleton") + # --------------------------------------------------------------------------- # Object storage # --------------------------------------------------------------------------- @@ -205,6 +210,7 @@ enable_colpali=settings.ENABLE_COLPALI, colpali_embedding_model=colpali_embedding_model, colpali_vector_store=colpali_vector_store, + v2_chunk_store=v2_chunk_store, ) logger.info("Document service initialised") @@ -223,6 +229,19 @@ ) logger.info("Ingestion service initialised") +# --------------------------------------------------------------------------- +# V2 document service (chunk_v2 store) +# --------------------------------------------------------------------------- + +v2_document_service = V2DocumentService( + database=database, + storage=storage, + parser=parser, + embedding_model=embedding_model, + chunk_store=v2_chunk_store, +) +logger.info("V2 document service initialised") + __all__ = [ "settings", "database", @@ -232,4 +251,6 @@ "completion_model", "document_service", "ingestion_service", + "v2_chunk_store", + "v2_document_service", ] diff --git a/core/vector_store/chunk_v2_store.py b/core/vector_store/chunk_v2_store.py new file mode 100644 index 00000000..f763de6c --- /dev/null +++ b/core/vector_store/chunk_v2_store.py @@ -0,0 +1,443 @@ +import asyncio +import logging +import time +from contextlib import asynccontextmanager +from typing import Any, AsyncContextManager, Dict, List, Optional + +from sqlalchemy import Column, Index, Integer, String, Text, select, text +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.dialects.postgresql import UUID as PGUUID +from sqlalchemy.exc import OperationalError +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +from sqlalchemy.orm import declarative_base, sessionmaker +from sqlalchemy.sql import func +from sqlalchemy.types import DateTime + +from core.database.metadata_filters import InvalidMetadataFilterError, MetadataFilterBuilder +from core.models.auth import AuthContext +from core.vector_store.pgvector_store import Vector +from core.vector_store.utils import build_store_metrics + +logger = logging.getLogger(__name__) +Base = declarative_base() +PGVECTOR_MAX_DIMENSIONS = 2000 + + +class ChunkV2Model(Base): + """SQLAlchemy model for v2 chunks.""" + + __tablename__ = "chunk_v2" + + id = Column(PGUUID(as_uuid=True), primary_key=True) + document_id = Column(String, nullable=False) + content = Column(Text, nullable=False) + embedding = Column(Vector, nullable=False) + page_number = Column(Integer) + chunk_number = Column(Integer) + app_id = Column(String) + end_user_id = Column(String) + folder_path = Column(String) + doc_metadata = Column("doc_metadata", JSONB) + metadata_types = Column(JSONB) + filename = Column(String) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + + __table_args__ = ( + Index( + "chunk_v2_embedding_idx", + "embedding", + postgresql_using="ivfflat", + postgresql_with={"lists": 100}, + ), + Index("chunk_v2_document_id_idx", "document_id"), + ) + + +class ChunkV2Store: + """PostgreSQL + pgvector store for v2 chunks.""" + + def __init__( + self, + uri: str, + max_retries: int = 3, + retry_delay: float = 1.0, + ): + from core.config import get_settings + + settings = get_settings() + + pool_size = getattr(settings, "DB_POOL_SIZE", 20) + max_overflow = getattr(settings, "DB_MAX_OVERFLOW", 30) + pool_recycle = getattr(settings, "DB_POOL_RECYCLE", 3600) + pool_timeout = getattr(settings, "DB_POOL_TIMEOUT", 10) + pool_pre_ping = getattr(settings, "DB_POOL_PRE_PING", True) + + from urllib.parse import parse_qs, urlencode, urlparse, urlunparse + + parsed = urlparse(uri) + query_params = parse_qs(parsed.query) + incompatible_params = ["sslmode", "channel_binding"] + removed_params = [] + for param in incompatible_params: + if param in query_params: + query_params.pop(param, None) + removed_params.append(param) + + if removed_params: + logger.debug("Removing parameters from PostgreSQL URI (not compatible with asyncpg): %s", removed_params) + parsed = parsed._replace(query=urlencode(query_params, doseq=True)) + uri = urlunparse(parsed) + + logger.info( + "Initializing v2 chunk store database engine with pool size=%s, max_overflow=%s", + pool_size, + max_overflow, + ) + + self.engine = create_async_engine( + uri, + pool_pre_ping=pool_pre_ping, + pool_size=pool_size, + max_overflow=max_overflow, + pool_recycle=pool_recycle, + pool_timeout=pool_timeout, + echo=False, + ) + self.async_session = sessionmaker(self.engine, class_=AsyncSession, expire_on_commit=False) + self.max_retries = max_retries + self.retry_delay = retry_delay + self._last_store_metrics: Dict[str, Any] = {} + self._metadata_filter_builder = MetadataFilterBuilder( + metadata_column="doc_metadata", + metadata_types_column="metadata_types", + ) + + @asynccontextmanager + async def get_session_with_retry(self) -> AsyncContextManager[AsyncSession]: + attempt = 0 + last_error = None + while attempt < self.max_retries: + try: + async with self.async_session() as session: + await session.execute(text("SELECT 1")) + yield session + return + except OperationalError as exc: + last_error = exc + attempt += 1 + if attempt < self.max_retries: + logger.warning( + "Chunk v2 DB connection attempt %s failed: %s. Retrying in %ss...", + attempt, + exc, + self.retry_delay, + ) + await asyncio.sleep(self.retry_delay) + + logger.error("All chunk v2 DB connection attempts failed: %s", last_error) + raise last_error + + async def initialize(self) -> bool: + from core.config import get_settings + + settings = get_settings() + dimensions = min(settings.VECTOR_DIMENSIONS, PGVECTOR_MAX_DIMENSIONS) + + try: + attempt = 0 + last_error = None + while attempt < self.max_retries: + try: + async with self.engine.begin() as conn: + await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + break + except OperationalError as exc: + last_error = exc + attempt += 1 + if attempt < self.max_retries: + logger.warning( + "Chunk v2 DB init attempt %s failed: %s. Retrying in %ss...", + attempt, + exc, + self.retry_delay, + ) + await asyncio.sleep(self.retry_delay) + else: + logger.error("Chunk v2 DB init failed after retries: %s", last_error) + raise + + async with self.engine.begin() as conn: + check_table_sql = """ + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = 'chunk_v2' + ); + """ + result = await conn.execute(text(check_table_sql)) + table_exists = result.scalar() + + if table_exists: + column_check_sql = """ + SELECT column_name + FROM information_schema.columns + WHERE table_name = 'chunk_v2'; + """ + result = await conn.execute(text(column_check_sql)) + columns = {row[0] for row in result.fetchall()} + if "doc_metadata" not in columns: + if "metadata" in columns: + await conn.execute(text("ALTER TABLE chunk_v2 RENAME COLUMN metadata TO doc_metadata;")) + else: + await conn.execute(text("ALTER TABLE chunk_v2 ADD COLUMN doc_metadata JSONB;")) + + check_dim_sql = """ + SELECT atttypmod - 4 AS dimensions + FROM pg_attribute a + JOIN pg_class c ON a.attrelid = c.oid + JOIN pg_type t ON a.atttypid = t.oid + WHERE c.relname = 'chunk_v2' + AND a.attname = 'embedding' + AND t.typname = 'vector'; + """ + result = await conn.execute(text(check_dim_sql)) + current_dim = result.scalar() + + if current_dim is not None and (current_dim + 4) != dimensions: + logger.warning( + "chunk_v2 vector dimensions changed from %s to %s. Table recreation required.", + current_dim, + dimensions, + ) + user_input = input( + f"WARNING: chunk_v2 embedding dimensions changed from {current_dim} to {dimensions}. " + "This will DELETE ALL existing v2 chunk data. Type 'yes' to continue: " + ) + if user_input.lower() != "yes": + raise ValueError( + "Operation aborted by user. chunk_v2 dimension change requires recreation." + ) + + await conn.execute(text("DROP INDEX IF EXISTS chunk_v2_embedding_idx;")) + await conn.execute(text("DROP INDEX IF EXISTS chunk_v2_document_id_idx;")) + await conn.execute(text("DROP TABLE IF EXISTS chunk_v2;")) + + create_table_sql = f""" + CREATE TABLE chunk_v2 ( + id UUID PRIMARY KEY, + document_id VARCHAR NOT NULL REFERENCES documents(external_id), + content TEXT NOT NULL, + embedding vector({dimensions}) NOT NULL, + page_number INT, + chunk_number INT, + app_id VARCHAR, + end_user_id VARCHAR, + folder_path VARCHAR, + doc_metadata JSONB, + metadata_types JSONB, + filename VARCHAR, + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP + ); + """ + await conn.execute(text(create_table_sql)) + await conn.execute( + text( + """ + CREATE INDEX chunk_v2_embedding_idx + ON chunk_v2 + USING ivfflat (embedding vector_cosine_ops) + WITH (lists = 100); + """ + ) + ) + await conn.execute(text("CREATE INDEX chunk_v2_document_id_idx ON chunk_v2 (document_id);")) + else: + logger.info("chunk_v2 table exists with matching vector dimensions (%s)", dimensions) + else: + create_table_sql = f""" + CREATE TABLE chunk_v2 ( + id UUID PRIMARY KEY, + document_id VARCHAR NOT NULL REFERENCES documents(external_id), + content TEXT NOT NULL, + embedding vector({dimensions}) NOT NULL, + page_number INT, + chunk_number INT, + app_id VARCHAR, + end_user_id VARCHAR, + folder_path VARCHAR, + doc_metadata JSONB, + metadata_types JSONB, + filename VARCHAR, + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP + ); + """ + await conn.execute(text(create_table_sql)) + await conn.execute( + text( + """ + CREATE INDEX chunk_v2_embedding_idx + ON chunk_v2 + USING ivfflat (embedding vector_cosine_ops) + WITH (lists = 100); + """ + ) + ) + await conn.execute(text("CREATE INDEX chunk_v2_document_id_idx ON chunk_v2 (document_id);")) + + logger.info("chunk_v2 store initialized") + return True + except Exception as exc: # noqa: BLE001 + logger.error("Error initializing chunk_v2 store: %s", exc) + return False + + async def store_chunks(self, chunks: List[Dict[str, Any]]) -> tuple[bool, List[str], Dict[str, Any]]: + if not chunks: + self._last_store_metrics = build_store_metrics( + chunk_payload_backend="none", + multivector_backend="none", + vector_store_backend="pgvector_v2", + ) + return True, [], self._last_store_metrics + + rows = [] + for chunk in chunks: + embedding = chunk.get("embedding") + if not embedding: + continue + rows.append( + { + "id": chunk["id"], + "document_id": chunk["document_id"], + "content": chunk["content"], + "embedding": embedding, + "page_number": chunk.get("page_number"), + "chunk_number": chunk.get("chunk_number"), + "app_id": chunk.get("app_id"), + "end_user_id": chunk.get("end_user_id"), + "folder_path": chunk.get("folder_path"), + "doc_metadata": chunk.get("doc_metadata"), + "metadata_types": chunk.get("metadata_types"), + "filename": chunk.get("filename"), + } + ) + + if not rows: + self._last_store_metrics = build_store_metrics( + chunk_payload_backend="none", + multivector_backend="none", + vector_store_backend="pgvector_v2", + ) + return True, [], self._last_store_metrics + + chunk_payload_bytes = sum(len(row["content"].encode("utf-8")) for row in rows if row.get("content")) + + write_start = time.perf_counter() + async with self.get_session_with_retry() as session: + await session.execute(ChunkV2Model.__table__.insert().values(rows)) + await session.commit() + write_duration = time.perf_counter() - write_start + + self._last_store_metrics = build_store_metrics( + chunk_payload_backend="none", + multivector_backend="none", + vector_store_backend="pgvector_v2", + chunk_payload_bytes=chunk_payload_bytes, + vector_store_write_s=write_duration, + vector_store_rows=len(rows), + ) + + stored_ids = [str(row["id"]) for row in rows] + return True, stored_ids, self._last_store_metrics + + def latest_store_metrics(self) -> Dict[str, Any]: + return dict(self._last_store_metrics) if self._last_store_metrics else {} + + async def query_similar( + self, + query_embedding: List[float], + k: int, + auth: AuthContext, + document_ids: Optional[List[str]] = None, + folder_paths: Optional[List[str]] = None, + metadata_filters: Optional[Dict[str, Any]] = None, + end_user_id: Optional[str] = None, + ) -> List[Dict[str, Any]]: + try: + conditions = [] + if not auth.app_id: + raise PermissionError("app_id is required for v2 chunk queries") + conditions.append(ChunkV2Model.app_id == auth.app_id) + + if end_user_id is not None: + conditions.append(ChunkV2Model.end_user_id == end_user_id) + + if document_ids: + conditions.append(ChunkV2Model.document_id.in_(document_ids)) + + if folder_paths: + conditions.append(ChunkV2Model.folder_path.in_(folder_paths)) + + if metadata_filters: + clause = self._metadata_filter_builder.build(metadata_filters) + if clause: + conditions.append(text(clause)) + + distance = ChunkV2Model.embedding.op("<=>")(query_embedding) + query = ( + select( + ChunkV2Model.id, + ChunkV2Model.document_id, + ChunkV2Model.content, + ChunkV2Model.page_number, + ChunkV2Model.chunk_number, + ChunkV2Model.filename, + distance.label("distance"), + ) + .where(*conditions) + .order_by(distance) + .limit(k) + ) + + async with self.get_session_with_retry() as session: + result = await session.execute(query) + rows = result.all() + + chunks: List[Dict[str, Any]] = [] + for row in rows: + distance_val = row.distance + if isinstance(distance_val, (list, tuple)): + distance_val = distance_val[0] if distance_val else 0.0 + score = 1.0 - float(distance_val) / 2.0 + chunks.append( + { + "id": str(row.id), + "document_id": row.document_id, + "content": row.content, + "page_number": row.page_number, + "chunk_number": row.chunk_number, + "filename": row.filename, + "score": score, + } + ) + + return chunks + except InvalidMetadataFilterError: + raise + except Exception as exc: # noqa: BLE001 + logger.error("Error querying chunk_v2 store: %s", exc) + return [] + + async def delete_chunks_by_document_id(self, document_id: str, auth: AuthContext) -> bool: + """Delete chunks for a document, scoped by app_id to prevent cross-tenant deletion.""" + try: + async with self.get_session_with_retry() as session: + if not auth.app_id: + raise PermissionError("app_id is required for v2 chunk deletion") + await session.execute( + text("DELETE FROM chunk_v2 WHERE document_id = :doc_id AND app_id = :app_id"), + {"doc_id": document_id, "app_id": auth.app_id}, + ) + await session.commit() + return True + except Exception as exc: # noqa: BLE001 + logger.error("Error deleting chunk_v2 rows for %s: %s", document_id, exc) + return False diff --git a/core/workers/ingestion_worker.py b/core/workers/ingestion_worker.py index 759275bd..204f4e3e 100644 --- a/core/workers/ingestion_worker.py +++ b/core/workers/ingestion_worker.py @@ -26,6 +26,7 @@ from core.parser.morphik_parser import MorphikParser from core.services.ingestion_service import IngestionService, PdfConversionError from core.services.telemetry import TelemetryService +from core.services.v2_document_service import V2DocumentService from core.storage.local_storage import LocalStorage from core.storage.s3_storage import S3Storage from core.storage.utils_file_extensions import detect_content_type, is_colpali_native_format @@ -33,6 +34,7 @@ from core.utils.storage_usage import extract_storage_bytes from core.utils.typed_metadata import MetadataBundle from core.vector_store.base_vector_store import BaseVectorStore +from core.vector_store.chunk_v2_store import ChunkV2Store from core.vector_store.dual_multivector_store import DualMultiVectorStore from core.vector_store.fast_multivector_store import FastMultiVectorStore from core.vector_store.multi_vector_store import MultiVectorStore @@ -183,6 +185,29 @@ async def update_document_progress(ingestion_service, document_id, auth, current # Don't fail the ingestion if progress update fails +async def update_document_progress_v2(database, document_id, auth, current_step, total_steps, step_name): + """ + Update progress metadata for v2 ingestion without requiring IngestionService. + """ + try: + updates = { + "system_metadata": { + "status": "processing", + "progress": { + "current_step": current_step, + "total_steps": total_steps, + "step_name": step_name, + "percentage": round((current_step / total_steps) * 100), + }, + "updated_at": datetime.now(UTC), + } + } + await database.update_document(document_id, updates, auth) + logger.debug("Updated v2 progress: %s (%s/%s)", step_name, current_step, total_steps) + except Exception as e: + logger.warning("Failed to update v2 progress for document %s: %s", document_id, e) + + _STORE_TIME_KEYS = ("chunk_payload_upload_s", "multivector_upload_s", "vector_store_write_s", "cache_write_s") _STORE_COUNT_KEYS = ( "chunk_payload_objects", @@ -1324,6 +1349,138 @@ def _meta_resolver(): # noqa: D401 } +async def process_v2_ingestion_job( + ctx: Dict[str, Any], + document_id: str, + file_key: str, + bucket: str, + original_filename: str, + content_type: str, + auth_dict: Dict[str, Any], + folder_path: Optional[str] = None, + end_user_id: Optional[str] = None, + force_plain_text: bool = False, +) -> Dict[str, Any]: + """ + Background worker task that processes v2 ingestion jobs (chunk_v2 store). + """ + telemetry = TelemetryService() + + def _meta_resolver(): # noqa: D401 + return { + "filename": original_filename, + "content_type": content_type, + "folder_path": folder_path, + "end_user_id": end_user_id, + "force_plain_text": force_plain_text, + } + + try: + async with telemetry.track_operation( + operation_type="v2_ingest_worker", + user_id=auth_dict.get("user_id") or auth_dict.get("entity_id", "unknown"), + app_id=auth_dict.get("app_id"), + metadata=_meta_resolver(), + ): + auth = AuthContext( + user_id=auth_dict.get("user_id") or auth_dict.get("entity_id", ""), + app_id=auth_dict.get("app_id"), + ) + + database: PostgresDatabase = ctx["database"] + storage = ctx["storage"] + chunk_store: ChunkV2Store = ctx["chunk_v2_store"] + + doc = await database.get_document(document_id, auth) + if not doc: + raise ValueError(f"Document {document_id} not found for v2 ingestion") + + if not doc.filename and original_filename: + doc.filename = original_filename + await database.update_document(document_id, {"filename": doc.filename}, auth=auth) + + if not doc.content_type and content_type: + doc.content_type = content_type + await database.update_document(document_id, {"content_type": doc.content_type}, auth=auth) + + total_steps = 4 + await update_document_progress_v2(database, document_id, auth, 1, total_steps, "Downloading file") + + file_content = await storage.download_file(bucket, file_key) + if hasattr(file_content, "read"): + file_content = file_content.read() + + v2_service = V2DocumentService( + database=database, + storage=storage, + parser=ctx["parser"], + embedding_model=ctx["embedding_model"], + chunk_store=chunk_store, + ) + + step_iter = iter(range(2, total_steps + 1)) + + async def _progress_cb(step_name: str) -> None: + step = next(step_iter, total_steps) + await update_document_progress_v2(database, document_id, auth, step, total_steps, step_name) + + result = await v2_service.process_document_bytes( + doc=doc, + file_bytes=file_content, + auth=auth, + force_plain_text=force_plain_text, + progress_cb=_progress_cb, + ) + + return { + "document_id": document_id, + "status": "completed", + "filename": result.get("filename") or original_filename, + "chunk_count": result.get("chunk_count", 0), + "timestamp": datetime.now(UTC).isoformat(), + } + except Exception as e: + logger.error("Error processing v2 ingestion job for file %s: %s", original_filename, e) + logger.error(traceback.format_exc()) + progress_logger.error("v2 ingest failed doc_id=%s file=%s error=%s", document_id, original_filename, e) + + try: + auth + except NameError: + auth = AuthContext( + user_id=auth_dict.get("user_id") or auth_dict.get("entity_id", ""), + app_id=auth_dict.get("app_id"), + ) + + try: + database = ctx.get("database") + if database: + doc = await database.get_document(document_id, auth) + if doc: + await database.update_document( + document_id=document_id, + updates={ + "system_metadata": { + **(doc.system_metadata or {}), + "status": "failed", + "error": str(e), + "updated_at": datetime.now(UTC), + "progress": None, + } + }, + auth=auth, + ) + except Exception as inner_e: # noqa: BLE001 + logger.error("Failed to update v2 document status: %s", inner_e) + + return { + "status": "failed", + "filename": original_filename, + "error": str(e), + "timestamp": datetime.now(UTC).isoformat(), + } + + async def startup(ctx): """ Worker startup: Initialize all necessary services that will be reused across jobs. @@ -1356,6 +1513,16 @@ async def startup(ctx): logger.error("Primary vector store initialization failed") ctx["vector_store"] = vector_store + # Initialize v2 chunk store + logger.info("Initializing v2 chunk store...") + chunk_v2_store = ChunkV2Store(uri=settings.POSTGRES_URI) + success = await chunk_v2_store.initialize() + if success: + logger.info("V2 chunk store initialization successful") + else: + logger.error("V2 chunk store initialization failed") + ctx["chunk_v2_store"] = chunk_v2_store + # Initialize storage if settings.STORAGE_PROVIDER == "local": storage = LocalStorage(storage_path=settings.STORAGE_PATH) @@ -1482,6 +1649,7 @@ async def _shutdown_store(store_key: str) -> None: await engine.dispose() await _shutdown_store("vector_store") + await _shutdown_store("chunk_v2_store") await _shutdown_store("colpali_vector_store") # Close any other open connections or resources that need cleanup @@ -1518,7 +1686,7 @@ class WorkerSettings: and any specific Redis settings. """ - functions = [process_ingestion_job] + functions = [process_ingestion_job, process_v2_ingestion_job] on_startup = startup on_shutdown = shutdown diff --git a/scripts/sanity_test.sh b/scripts/sanity_test.sh index cb14b239..a3535a18 100755 --- a/scripts/sanity_test.sh +++ b/scripts/sanity_test.sh @@ -2424,6 +2424,18 @@ main() { test_folder_move_integrity cleanup_folder_test + # V2 API sanity tests + if [[ "${SKIP_V2_SANITY:-0}" != "1" ]]; then + log_section "V2 API Sanity" + if AUTH_TOKEN="$AUTH_TOKEN" MORPHIK_URL="$BASE_URL" ./scripts/v2_api_sanity.sh; then + log_success "V2 API sanity checks passed" + else + log_error "V2 API sanity checks failed" + fi + else + log_warn "Skipping V2 API sanity tests (SKIP_V2_SANITY=1)" + fi + cleanup_test_files "${1:-}" cleanup_auth_app diff --git a/scripts/v2_api_sanity.sh b/scripts/v2_api_sanity.sh new file mode 100755 index 00000000..75fb0ae0 --- /dev/null +++ b/scripts/v2_api_sanity.sh @@ -0,0 +1,727 @@ +#!/bin/bash +# V2 API Sanity Tests +# Tests v2 ingestion/retrieval for all supported formats + tenant isolation +# +# Usage: JWT_SECRET_KEY=... ./scripts/v2_api_sanity.sh +# Or with bypass_auth_mode=true, no env var needed + +set -euo pipefail + +BASE_URL="${MORPHIK_URL:-http://localhost:8000}" +JWT_SECRET_KEY="${JWT_SECRET_KEY:-}" +TEST_RUN_ID="v2test_$(date +%s)" +WAIT_TIMEOUT="${WAIT_TIMEOUT:-180}" +POLL_INTERVAL="${POLL_INTERVAL:-3}" + +# Colors +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +TESTS_PASSED=0 +TESTS_FAILED=0 + +log_info() { echo -e "${BLUE}[INFO]${NC} $1"; } +log_success() { echo -e "${GREEN}[PASS]${NC} $1"; TESTS_PASSED=$((TESTS_PASSED + 1)); } +log_error() { echo -e "${RED}[FAIL]${NC} $1"; TESTS_FAILED=$((TESTS_FAILED + 1)); } +log_section() { echo -e "\n${YELLOW}═══════════════════════════════════════════════════════════════${NC}"; echo -e "${YELLOW} $1${NC}"; echo -e "${YELLOW}═══════════════════════════════════════════════════════════════${NC}"; } + +TMP_DIR=$(mktemp -d) +trap 'rm -rf "$TMP_DIR"' EXIT + +# Auth state for two apps +APP_A_TOKEN="" +APP_A_ID="" +APP_A_NAME="" +APP_B_TOKEN="" +APP_B_ID="" +APP_B_NAME="" +BYPASS_MODE=false + +# ============================================================================ +# Auth Setup +# ============================================================================ + +check_bypass_mode() { + local status_code + status_code=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE_URL/documents" \ + -H "Content-Type: application/json" \ + -d '{"skip":0,"limit":1}' 2>/dev/null || echo "000") + + if [[ "$status_code" != "401" ]]; then + BYPASS_MODE=true + return 0 + fi + return 1 +} + +create_app_token() { + local app_name="$1" + local user_id="$2" + local org_id="$3" + + # Generate bootstrap token with matching user_id + local bootstrap_token + bootstrap_token=$(python3 - </dev/null +import jwt +import time +payload = { + "sub": "$user_id", + "user_id": "$user_id", + "entity_id": "$user_id", + "type": "developer", + "exp": int(time.time()) + 3600, +} +print(jwt.encode(payload, "$JWT_SECRET_KEY", algorithm="HS256")) +PYEOF + ) + + if [[ -z "$bootstrap_token" ]]; then + log_error "Failed to generate bootstrap token for $app_name" + return 1 + fi + + # Create app via /cloud/generate_uri - requires org_id for fresh tokens + local response + response=$(curl -sf -X POST "$BASE_URL/cloud/generate_uri" \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer $bootstrap_token" \ + -d "{\"name\":\"$app_name\",\"org_id\":\"$org_id\"}" 2>&1) || { + log_error "Failed to create app $app_name: $response" + return 1 + } + + local app_id uri token + app_id=$(echo "$response" | python3 -c "import sys,json; print(json.load(sys.stdin).get('app_id',''))" 2>/dev/null) + uri=$(echo "$response" | python3 -c "import sys,json; print(json.load(sys.stdin).get('uri',''))" 2>/dev/null) + token=$(echo "$uri" | sed -n 's/morphik:\/\/[^:]*:\([^@]*\)@.*/\1/p') + + if [[ -z "$app_id" || -z "$token" ]]; then + log_error "Failed to extract app_id/token for $app_name" + return 1 + fi + + echo "$app_id|$token" +} + +setup_auth() { + log_section "Setting Up Authentication" + + if check_bypass_mode; then + log_info "Auth bypass mode detected - no tokens needed" + log_info "Tenant isolation tests will be skipped in bypass mode" + return 0 + fi + + if [[ -z "$JWT_SECRET_KEY" ]]; then + log_error "Auth required but JWT_SECRET_KEY not set" + log_info "Set JWT_SECRET_KEY environment variable or enable bypass_auth_mode in morphik.toml" + exit 1 + fi + + # Create App A (with its own org) + APP_A_NAME="${TEST_RUN_ID}_app_a" + local user_a_id org_a_id + user_a_id=$(python3 -c "import uuid; print(uuid.uuid4())") + org_a_id=$(python3 -c "import uuid; print(uuid.uuid4())") + local result_a + result_a=$(create_app_token "$APP_A_NAME" "$user_a_id" "$org_a_id") || exit 1 + APP_A_ID=$(echo "$result_a" | cut -d'|' -f1) + APP_A_TOKEN=$(echo "$result_a" | cut -d'|' -f2) + log_success "Created App A: $APP_A_ID" + + # Create App B (with its own org - ensures complete isolation) + APP_B_NAME="${TEST_RUN_ID}_app_b" + local user_b_id org_b_id + user_b_id=$(python3 -c "import uuid; print(uuid.uuid4())") + org_b_id=$(python3 -c "import uuid; print(uuid.uuid4())") + local result_b + result_b=$(create_app_token "$APP_B_NAME" "$user_b_id" "$org_b_id") || exit 1 + APP_B_ID=$(echo "$result_b" | cut -d'|' -f1) + APP_B_TOKEN=$(echo "$result_b" | cut -d'|' -f2) + log_success "Created App B: $APP_B_ID" +} + +cleanup_apps() { + if [[ -n "$APP_A_TOKEN" && -n "$APP_A_NAME" ]]; then + curl -sf -X DELETE "$BASE_URL/apps?app_name=${APP_A_NAME}" \ + -H "Authorization: Bearer $APP_A_TOKEN" > /dev/null 2>&1 || true + fi + if [[ -n "$APP_B_TOKEN" && -n "$APP_B_NAME" ]]; then + curl -sf -X DELETE "$BASE_URL/apps?app_name=${APP_B_NAME}" \ + -H "Authorization: Bearer $APP_B_TOKEN" > /dev/null 2>&1 || true + fi +} + +get_auth_opts() { + local token="$1" + if [[ -n "$token" ]]; then + echo "-H" "Authorization: Bearer $token" + fi +} + +# ============================================================================ +# Test File Creation +# ============================================================================ + +create_test_files() { + log_section "Creating Test Files" + + # TXT file + cat > "$TMP_DIR/test.txt" << 'EOF' +V2 Sanity Test - Plain Text Document + +This is a plain text file for testing v2 ingestion. +Keywords: v2_txt_test alpha bravo charlie +EOF + log_info "Created test.txt" + + # MD file + cat > "$TMP_DIR/test.md" << 'EOF' +# V2 Sanity Test - Markdown Document + +This is a **markdown** file for testing v2 ingestion. + +## Section One +Keywords: v2_md_test delta echo foxtrot + +## Section Two +- List item one +- List item two +EOF + log_info "Created test.md" + + # Simple DOCX (minimal valid docx structure) + # We'll use a real test file if available, otherwise skip + if [[ -f "core/tests/integration/test_data/test.docx" ]]; then + cp "core/tests/integration/test_data/test.docx" "$TMP_DIR/test.docx" + log_info "Copied test.docx" + else + log_info "No test.docx available - will skip DOCX test" + fi + + # PPTX + if [[ -f "core/tests/integration/test_data/test.pptx" ]]; then + cp "core/tests/integration/test_data/test.pptx" "$TMP_DIR/test.pptx" + log_info "Copied test.pptx" + else + log_info "No test.pptx available - will skip PPTX test" + fi + + # PDF + if [[ -f "core/tests/integration/test_data/test.pdf" ]]; then + cp "core/tests/integration/test_data/test.pdf" "$TMP_DIR/test.pdf" + log_info "Copied test.pdf" + else + log_info "No test.pdf available - will skip PDF test" + fi +} + +# ============================================================================ +# Ingestion Helper +# ============================================================================ + +ingest_file() { + local token="$1" + local file_path="$2" + local folder_path="$3" + local metadata="$4" + + local response + if [[ -n "$token" ]]; then + response=$(curl -sS -X POST "$BASE_URL/v2/documents" \ + -H "Authorization: Bearer $token" \ + -F "file=@${file_path}" \ + -F "folder_path=${folder_path}" \ + -F "metadata=${metadata}") + else + response=$(curl -sS -X POST "$BASE_URL/v2/documents" \ + -F "file=@${file_path}" \ + -F "folder_path=${folder_path}" \ + -F "metadata=${metadata}") + fi + + local doc_id + doc_id=$(echo "$response" | python3 -c "import sys,json; print(json.load(sys.stdin).get('document_id',''))" 2>/dev/null || echo "") + + if [[ -z "$doc_id" ]]; then + echo "ERROR:$response" + else + echo "$doc_id" + fi +} + +ingest_content() { + local token="$1" + local content="$2" + local filename="$3" + local folder_path="$4" + local metadata="$5" + + local response + if [[ -n "$token" ]]; then + response=$(curl -sS -X POST "$BASE_URL/v2/documents" \ + -H "Authorization: Bearer $token" \ + -F "content=${content}" \ + -F "filename=${filename}" \ + -F "folder_path=${folder_path}" \ + -F "metadata=${metadata}") + else + response=$(curl -sS -X POST "$BASE_URL/v2/documents" \ + -F "content=${content}" \ + -F "filename=${filename}" \ + -F "folder_path=${folder_path}" \ + -F "metadata=${metadata}") + fi + + local doc_id + doc_id=$(echo "$response" | python3 -c "import sys,json; print(json.load(sys.stdin).get('document_id',''))" 2>/dev/null || echo "") + + if [[ -z "$doc_id" ]]; then + echo "ERROR:$response" + else + echo "$doc_id" + fi +} + +# ============================================================================ +# Status Helpers +# ============================================================================ + +wait_for_completion() { + local token="$1" + local doc_id="$2" + local elapsed=0 + local status="" + + while [[ "$elapsed" -lt "$WAIT_TIMEOUT" ]]; do + local response + if [[ -n "$token" ]]; then + response=$(curl -sS -X GET "$BASE_URL/documents/${doc_id}/status" \ + -H "Authorization: Bearer $token" || true) + else + response=$(curl -sS -X GET "$BASE_URL/documents/${doc_id}/status" || true) + fi + + local status + status=$(echo "$response" | python3 -c "import sys,json; print(json.load(sys.stdin).get('status',''))" 2>/dev/null || echo "") + + if [[ "$status" == "completed" ]]; then + return 0 + fi + if [[ "$status" == "failed" ]]; then + local err + err=$(echo "$response" | python3 -c "import sys,json; print(json.load(sys.stdin).get('error',''))" 2>/dev/null || echo "") + log_error "Ingestion failed for $doc_id: ${err:-$response}" + return 1 + fi + + sleep "$POLL_INTERVAL" + elapsed=$((elapsed + POLL_INTERVAL)) + done + + log_error "Timed out waiting for document $doc_id to complete (last status: $status)" + return 1 +} + +# ============================================================================ +# Retrieval Helper +# ============================================================================ + +retrieve_chunks() { + local token="$1" + local query="$2" + local doc_id="$3" + local top_k="${4:-5}" + + local payload + payload=$(python3 - </dev/null || echo "0" +} + +# ============================================================================ +# Format Tests +# ============================================================================ + +test_txt_format() { + log_section "Testing TXT Format" + + local doc_id + doc_id=$(ingest_content "$APP_A_TOKEN" \ + "V2 TXT test content. Keywords: txt_unique_keyword_xyz" \ + "v2_test.txt" \ + "/v2/txt" \ + '{"format":"txt","test":"v2_sanity"}') + + if [[ "$doc_id" == ERROR:* ]]; then + log_error "TXT ingest failed: ${doc_id#ERROR:}" + return 1 + fi + log_success "TXT ingest ok (doc_id=$doc_id)" + if ! wait_for_completion "$APP_A_TOKEN" "$doc_id"; then + return 1 + fi + + local response chunk_count + response=$(retrieve_chunks "$APP_A_TOKEN" "txt_unique_keyword_xyz" "$doc_id") + chunk_count=$(count_chunks "$response") + + if [[ "$chunk_count" -ge 1 ]]; then + log_success "TXT retrieve ok (chunks=$chunk_count)" + else + log_error "TXT retrieve failed: $response" + return 1 + fi + + echo "$doc_id" +} + +test_md_format() { + log_section "Testing Markdown Format" + + local md_content + md_content=$(cat "$TMP_DIR/test.md") + + local doc_id + doc_id=$(ingest_content "$APP_A_TOKEN" \ + "$md_content" \ + "v2_test.md" \ + "/v2/md" \ + '{"format":"md","test":"v2_sanity"}') + + if [[ "$doc_id" == ERROR:* ]]; then + log_error "MD ingest failed: ${doc_id#ERROR:}" + return 1 + fi + log_success "MD ingest ok (doc_id=$doc_id)" + if ! wait_for_completion "$APP_A_TOKEN" "$doc_id"; then + return 1 + fi + + local response chunk_count + response=$(retrieve_chunks "$APP_A_TOKEN" "v2_md_test delta" "$doc_id") + chunk_count=$(count_chunks "$response") + + if [[ "$chunk_count" -ge 1 ]]; then + log_success "MD retrieve ok (chunks=$chunk_count)" + else + log_error "MD retrieve failed: $response" + return 1 + fi + + echo "$doc_id" +} + +test_pdf_format() { + log_section "Testing PDF Format" + + if [[ ! -f "$TMP_DIR/test.pdf" ]]; then + log_info "Skipping PDF test - no test file available" + return 0 + fi + + local doc_id + doc_id=$(ingest_file "$APP_A_TOKEN" "$TMP_DIR/test.pdf" "/v2/pdf" '{"format":"pdf","test":"v2_sanity"}') + + if [[ "$doc_id" == ERROR:* ]]; then + log_error "PDF ingest failed: ${doc_id#ERROR:}" + return 1 + fi + log_success "PDF ingest ok (doc_id=$doc_id)" + if ! wait_for_completion "$APP_A_TOKEN" "$doc_id"; then + return 1 + fi + + local response chunk_count + response=$(retrieve_chunks "$APP_A_TOKEN" "test document" "$doc_id") + chunk_count=$(count_chunks "$response") + + if [[ "$chunk_count" -ge 1 ]]; then + log_success "PDF retrieve ok (chunks=$chunk_count)" + # Verify XML structure with loc attributes + local has_loc + has_loc=$(echo "$response" | python3 -c " +import sys,json +data = json.load(sys.stdin) +chunks = data.get('chunks', []) +for c in chunks: + if 'loc=' in c.get('content',''): + print('yes') + break +else: + print('no') +" 2>/dev/null || echo "no") + if [[ "$has_loc" == "yes" ]]; then + log_success "PDF chunks contain bbox (loc=) attributes" + else + log_info "PDF chunks may not have bbox - check content" + fi + else + log_error "PDF retrieve failed: $response" + return 1 + fi + + echo "$doc_id" +} + +test_docx_format() { + log_section "Testing DOCX Format" + + if [[ ! -f "$TMP_DIR/test.docx" ]]; then + log_info "Skipping DOCX test - no test file available" + return 0 + fi + + local doc_id + doc_id=$(ingest_file "$APP_A_TOKEN" "$TMP_DIR/test.docx" "/v2/docx" '{"format":"docx","test":"v2_sanity"}') + + if [[ "$doc_id" == ERROR:* ]]; then + log_error "DOCX ingest failed: ${doc_id#ERROR:}" + return 1 + fi + log_success "DOCX ingest ok (doc_id=$doc_id)" + if ! wait_for_completion "$APP_A_TOKEN" "$doc_id"; then + return 1 + fi + + local response chunk_count + response=$(retrieve_chunks "$APP_A_TOKEN" "document" "$doc_id") + chunk_count=$(count_chunks "$response") + + if [[ "$chunk_count" -ge 1 ]]; then + log_success "DOCX retrieve ok (chunks=$chunk_count)" + else + log_error "DOCX retrieve failed: $response" + return 1 + fi + + echo "$doc_id" +} + +test_pptx_format() { + log_section "Testing PPTX Format" + + if [[ ! -f "$TMP_DIR/test.pptx" ]]; then + log_info "Skipping PPTX test - no test file available" + return 0 + fi + + local doc_id + doc_id=$(ingest_file "$APP_A_TOKEN" "$TMP_DIR/test.pptx" "/v2/pptx" '{"format":"pptx","test":"v2_sanity"}') + + if [[ "$doc_id" == ERROR:* ]]; then + log_error "PPTX ingest failed: ${doc_id#ERROR:}" + return 1 + fi + log_success "PPTX ingest ok (doc_id=$doc_id)" + if ! wait_for_completion "$APP_A_TOKEN" "$doc_id"; then + return 1 + fi + + local response chunk_count + response=$(retrieve_chunks "$APP_A_TOKEN" "presentation slide" "$doc_id") + chunk_count=$(count_chunks "$response") + + if [[ "$chunk_count" -ge 1 ]]; then + log_success "PPTX retrieve ok (chunks=$chunk_count)" + else + log_error "PPTX retrieve failed: $response" + return 1 + fi + + echo "$doc_id" +} + +# ============================================================================ +# Tenant Isolation Test +# ============================================================================ + +test_tenant_isolation() { + log_section "Testing Tenant Isolation (Cross-App Security)" + + if [[ "$BYPASS_MODE" == "true" ]]; then + log_info "Skipping tenant isolation test - bypass mode enabled" + return 0 + fi + + # Ingest document as App A + local app_a_doc_id + app_a_doc_id=$(ingest_content "$APP_A_TOKEN" \ + "Secret document for App A only. Keywords: app_a_secret_data_xyz" \ + "app_a_secret.txt" \ + "/v2/app_a" \ + '{"owner":"app_a","secret":"true"}') + + if [[ "$app_a_doc_id" == ERROR:* ]]; then + log_error "App A ingest failed: ${app_a_doc_id#ERROR:}" + return 1 + fi + log_success "App A document ingested (doc_id=$app_a_doc_id)" + if ! wait_for_completion "$APP_A_TOKEN" "$app_a_doc_id"; then + return 1 + fi + + # Ingest document as App B + local app_b_doc_id + app_b_doc_id=$(ingest_content "$APP_B_TOKEN" \ + "Secret document for App B only. Keywords: app_b_secret_data_xyz" \ + "app_b_secret.txt" \ + "/v2/app_b" \ + '{"owner":"app_b","secret":"true"}') + + if [[ "$app_b_doc_id" == ERROR:* ]]; then + log_error "App B ingest failed: ${app_b_doc_id#ERROR:}" + return 1 + fi + log_success "App B document ingested (doc_id=$app_b_doc_id)" + if ! wait_for_completion "$APP_B_TOKEN" "$app_b_doc_id"; then + return 1 + fi + + # Test 1: App A should find its own document + local response_a_own chunk_count_a_own + response_a_own=$(retrieve_chunks "$APP_A_TOKEN" "app_a_secret_data_xyz" "$app_a_doc_id") + chunk_count_a_own=$(count_chunks "$response_a_own") + + if [[ "$chunk_count_a_own" -ge 1 ]]; then + log_success "App A can retrieve its own document" + else + log_error "App A cannot retrieve its own document!" + return 1 + fi + + # Test 2: App B should find its own document + local response_b_own chunk_count_b_own + response_b_own=$(retrieve_chunks "$APP_B_TOKEN" "app_b_secret_data_xyz" "$app_b_doc_id") + chunk_count_b_own=$(count_chunks "$response_b_own") + + if [[ "$chunk_count_b_own" -ge 1 ]]; then + log_success "App B can retrieve its own document" + else + log_error "App B cannot retrieve its own document!" + return 1 + fi + + # Test 3: App A should NOT find App B's document (even with doc_id) + local response_a_cross chunk_count_a_cross + response_a_cross=$(retrieve_chunks "$APP_A_TOKEN" "app_b_secret_data_xyz" "$app_b_doc_id") + chunk_count_a_cross=$(count_chunks "$response_a_cross") + + if [[ "$chunk_count_a_cross" -eq 0 ]]; then + log_success "App A cannot access App B's document (isolation verified)" + else + log_error "SECURITY VIOLATION: App A retrieved App B's document!" + log_error "Response: $response_a_cross" + return 1 + fi + + # Test 4: App B should NOT find App A's document + local response_b_cross chunk_count_b_cross + response_b_cross=$(retrieve_chunks "$APP_B_TOKEN" "app_a_secret_data_xyz" "$app_a_doc_id") + chunk_count_b_cross=$(count_chunks "$response_b_cross") + + if [[ "$chunk_count_b_cross" -eq 0 ]]; then + log_success "App B cannot access App A's document (isolation verified)" + else + log_error "SECURITY VIOLATION: App B retrieved App A's document!" + log_error "Response: $response_b_cross" + return 1 + fi + + # Test 5: Open query should only return own app's results + local response_a_open + response_a_open=$(curl -sS -X POST "$BASE_URL/v2/retrieve/chunks" \ + -H "Authorization: Bearer $APP_A_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"query":"secret_data_xyz","top_k":10}') + + local has_b_content + has_b_content=$(echo "$response_a_open" | grep -c "app_b_secret" 2>/dev/null || true) + has_b_content="${has_b_content:-0}" + # Ensure we have a clean integer + has_b_content=$(echo "$has_b_content" | tr -d '[:space:]' | head -c 10) + if [[ -z "$has_b_content" || "$has_b_content" == "0" ]]; then + log_success "Open query from App A does not leak App B's data" + else + log_error "SECURITY VIOLATION: Open query leaked App B's data!" + return 1 + fi + + log_success "All tenant isolation tests passed" +} + +# ============================================================================ +# Main +# ============================================================================ + +main() { + log_section "V2 API Sanity Tests" + log_info "Server: $BASE_URL" + log_info "Test Run: $TEST_RUN_ID" + + # Check server + if ! curl -sf "$BASE_URL/health" > /dev/null 2>&1; then + log_error "Server not responding at $BASE_URL" + exit 1 + fi + log_success "Server is running" + + # Setup + setup_auth + create_test_files + + # Format tests + test_txt_format || true + test_md_format || true + test_pdf_format || true + test_docx_format || true + test_pptx_format || true + + # Security tests + test_tenant_isolation || true + + # Cleanup + cleanup_apps + + # Summary + log_section "Test Summary" + echo -e "${GREEN}Passed: $TESTS_PASSED${NC}" + echo -e "${RED}Failed: $TESTS_FAILED${NC}" + + if [[ "$TESTS_FAILED" -gt 0 ]]; then + exit 1 + fi + + log_success "All V2 API sanity tests completed successfully" +} + +main "$@"