diff --git a/core/config.py b/core/config.py index ffb4c549..6c4978dc 100644 --- a/core/config.py +++ b/core/config.py @@ -112,6 +112,7 @@ class Settings(BaseSettings): # Vector store configuration VECTOR_STORE_PROVIDER: Literal["pgvector"] VECTOR_STORE_DATABASE_NAME: Optional[str] = None + VECTOR_IVFFLAT_PROBES: int = 100 # Multivector store configuration MULTIVECTOR_STORE_PROVIDER: Literal["postgres", "morphik"] = "postgres" @@ -361,6 +362,12 @@ def get_settings() -> Settings: if "POSTGRES_URI" not in os.environ: raise ValueError(em.format(missing_value="POSTGRES_URI", field="vector_store.provider", value="pgvector")) + ivfflat_probes = config["vector_store"].get("ivfflat_probes", 100) + try: + settings_dict["VECTOR_IVFFLAT_PROBES"] = max(1, int(ivfflat_probes)) + except (TypeError, ValueError): + settings_dict["VECTOR_IVFFLAT_PROBES"] = 100 + # Load morphik config api_domain = config["morphik"].get("api_domain", "api.morphik.ai") # morphik_embedding_api_domain is always a list of endpoints diff --git a/core/parser/docling_v2.py b/core/parser/docling_v2.py index 851cf891..2f7c459b 100644 --- a/core/parser/docling_v2.py +++ b/core/parser/docling_v2.py @@ -4,23 +4,80 @@ from html import escape as html_escape from typing import Dict, List, Optional, Tuple -from docling.datamodel.base_models import InputFormat -from docling.datamodel.pipeline_options import EasyOcrOptions, PdfPipelineOptions, TableStructureOptions -from docling.document_converter import DocumentConverter, PdfFormatOption -from docling_core.types.doc.document import ContentLayer -from docling_core.types.doc.labels import DocItemLabel +from httpx import AsyncClient, Timeout logger = logging.getLogger(__name__) +# Lazy imports for docling (heavy dependencies, only needed for local mode) +DocumentConverter = None +InputFormat = None +PdfFormatOption = None +PdfPipelineOptions = None +EasyOcrOptions = None +TableStructureOptions = None +ContentLayer = None +DocItemLabel = None + + +def _ensure_docling_imports(): + """Lazily import docling dependencies only when needed for local parsing.""" + global DocumentConverter, InputFormat, PdfFormatOption, PdfPipelineOptions + global EasyOcrOptions, TableStructureOptions, ContentLayer, DocItemLabel + + if DocumentConverter is None: + from docling.datamodel.base_models import InputFormat as _InputFormat + from docling.datamodel.pipeline_options import EasyOcrOptions as _EasyOcrOptions + from docling.datamodel.pipeline_options import PdfPipelineOptions as _PdfPipelineOptions + from docling.datamodel.pipeline_options import TableStructureOptions as _TableStructureOptions + from docling.document_converter import DocumentConverter as _DocumentConverter + from docling.document_converter import PdfFormatOption as _PdfFormatOption + from docling_core.types.doc.document import ContentLayer as _ContentLayer + from docling_core.types.doc.labels import DocItemLabel as _DocItemLabel + + DocumentConverter = _DocumentConverter + InputFormat = _InputFormat + PdfFormatOption = _PdfFormatOption + PdfPipelineOptions = _PdfPipelineOptions + EasyOcrOptions = _EasyOcrOptions + TableStructureOptions = _TableStructureOptions + ContentLayer = _ContentLayer + DocItemLabel = _DocItemLabel -class DoclingV2Parser: - """Docling parser that returns page-wise XML chunks with bbox metadata.""" - _docling_converter: Optional[DocumentConverter] = None +class DoclingV2Parser: + """Docling parser that returns page-wise XML chunks with bbox metadata. + + Supports both local parsing (with GPU) and remote API parsing. + When PARSER_MODE is 'api', documents are sent to GPU servers for processing. + """ + + _docling_converter = None + + def __init__(self, settings=None): + """Initialize the parser with optional settings for API mode. + + Args: + settings: Settings object with PARSER_MODE, MORPHIK_EMBEDDING_API_DOMAIN, + and MORPHIK_EMBEDDING_API_KEY for API mode configuration. + """ + self.settings = settings + self._parse_api_endpoints: Optional[List[str]] = None + self._parse_api_key: Optional[str] = None + + if settings and getattr(settings, "PARSER_MODE", "local") == "api": + api_domain = getattr(settings, "MORPHIK_EMBEDDING_API_DOMAIN", None) + if api_domain: + if isinstance(api_domain, str): + api_domain = [api_domain] + self._parse_api_endpoints = [f"{d.rstrip('/')}/parse/v2" for d in api_domain] + self._parse_api_key = getattr(settings, "MORPHIK_EMBEDDING_API_KEY", None) + logger.info(f"DoclingV2Parser API mode enabled with {len(self._parse_api_endpoints)} endpoint(s)") @classmethod - def _get_converter(cls) -> DocumentConverter: + def _get_converter(cls): + """Get or create the Docling converter for local parsing.""" if cls._docling_converter is None: + _ensure_docling_imports() pipeline_options = PdfPipelineOptions() pipeline_options.do_ocr = True try: @@ -46,6 +103,7 @@ def _get_converter(cls) -> DocumentConverter: @classmethod def convert_bytes(cls, file_bytes: bytes, filename: str): """Convert a file (bytes) to a Docling document.""" + _ensure_docling_imports() suffix = os.path.splitext(filename)[1] or ".pdf" with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: temp_file.write(file_bytes) @@ -92,7 +150,8 @@ def build_page_xml_chunks( filename: str, ) -> List[Tuple[str, int]]: """Build one XML chunk per page with bbox metadata.""" - label_to_tag: Dict[DocItemLabel, str] = {} + _ensure_docling_imports() + label_to_tag: Dict = {} def _add(label_name: str, tag: str) -> None: label = getattr(DocItemLabel, label_name, None) @@ -216,3 +275,78 @@ def _add(label_name: str, tag: str) -> None: xml_chunks.append((xml, page_no)) return xml_chunks + + async def _parse_via_api( + self, + file_bytes: bytes, + filename: str, + document_id: str, + display_filename: Optional[str], + ) -> List[Tuple[str, int]]: + """Parse document via remote API (GPU server).""" + if not self._parse_api_endpoints or not self._parse_api_key: + raise RuntimeError("Parser V2 API not configured") + + headers = {"Authorization": f"Bearer {self._parse_api_key}"} + timeout = Timeout(read=300.0, connect=30.0, write=60.0, pool=30.0) + + last_error: Optional[Exception] = None + for endpoint in self._parse_api_endpoints: + try: + async with AsyncClient(timeout=timeout) as client: + files = {"file": (filename, file_bytes)} + data = {"filename": filename, "document_id": document_id} + if display_filename and display_filename != filename: + data["display_filename"] = display_filename + resp = await client.post(endpoint, files=files, data=data, headers=headers) + resp.raise_for_status() + result = resp.json() + + # Convert API response to list of (xml, page_number) tuples + chunks = result.get("chunks", []) + return [(c["xml"], c["page_number"]) for c in chunks] + except Exception as e: + logger.warning(f"Parse V2 API call to {endpoint} failed: {e}") + last_error = e + continue + + raise RuntimeError(f"All parse V2 API endpoints failed. Last error: {last_error}") + + def _parse_local( + self, + file_bytes: bytes, + filename: str, + document_id: str, + display_filename: Optional[str], + ) -> List[Tuple[str, int]]: + """Parse document using local Docling.""" + doc = self.convert_bytes(file_bytes, filename) + return self.build_page_xml_chunks(doc, document_id, display_filename or filename) + + async def parse( + self, + file_bytes: bytes, + filename: str, + document_id: str, + display_filename: Optional[str] = None, + ) -> List[Tuple[str, int]]: + """Parse document and return page-wise XML chunks with bbox metadata. + + Uses API if configured, otherwise falls back to local parsing. + + Args: + file_bytes: Raw file content + filename: Original filename + document_id: Document identifier for the XML + + Returns: + List of (xml_string, page_number) tuples + """ + if self._parse_api_endpoints: + try: + return await self._parse_via_api(file_bytes, filename, document_id, display_filename) + except Exception as e: + logger.warning(f"API parsing failed, falling back to local: {e}") + return self._parse_local(file_bytes, filename, document_id, display_filename) + else: + return self._parse_local(file_bytes, filename, document_id, display_filename) diff --git a/core/services/v2_document_service.py b/core/services/v2_document_service.py index 0ed96cae..a6f13deb 100644 --- a/core/services/v2_document_service.py +++ b/core/services/v2_document_service.py @@ -72,7 +72,7 @@ def __init__( self.parser = parser self.embedding_model = embedding_model self.chunk_store = chunk_store - self.docling_parser = DoclingV2Parser() + self.docling_parser = DoclingV2Parser(settings) # ------------------------------------------------------------------ # Helpers @@ -365,10 +365,14 @@ async def _build_xml_chunks( parse_bytes = self._convert_office_to_pdf_bytes(file_bytes, ".docx", "Word document") parse_filename = f"{Path(filename).stem}.pdf" - docling_doc = None if ext == ".pptx": try: - docling_doc = self.docling_parser.convert_bytes(parse_bytes, parse_filename) + xml_chunks = await self.docling_parser.parse( + parse_bytes, + parse_filename, + document_id, + display_filename=filename, + ) except Exception: # noqa: BLE001 parse_bytes = self._convert_office_to_pdf_bytes( file_bytes, @@ -376,14 +380,19 @@ async def _build_xml_chunks( "PowerPoint presentation", ) parse_filename = f"{Path(filename).stem}.pdf" - docling_doc = self.docling_parser.convert_bytes(parse_bytes, parse_filename) + xml_chunks = await self.docling_parser.parse( + parse_bytes, + parse_filename, + document_id, + display_filename=filename, + ) else: - docling_doc = self.docling_parser.convert_bytes(parse_bytes, parse_filename) - xml_chunks = self.docling_parser.build_page_xml_chunks( - docling_doc, - document_id, - filename, - ) + xml_chunks = await self.docling_parser.parse( + parse_bytes, + parse_filename, + document_id, + display_filename=filename, + ) if not xml_chunks: raise ValueError("No page chunks extracted from document") diff --git a/core/vector_store/chunk_v2_store.py b/core/vector_store/chunk_v2_store.py index f763de6c..2f2895fb 100644 --- a/core/vector_store/chunk_v2_store.py +++ b/core/vector_store/chunk_v2_store.py @@ -111,6 +111,7 @@ def __init__( metadata_column="doc_metadata", metadata_types_column="metadata_types", ) + self.ivfflat_probes = max(1, int(getattr(settings, "VECTOR_IVFFLAT_PROBES", 100) or 100)) @asynccontextmanager async def get_session_with_retry(self) -> AsyncContextManager[AsyncSession]: @@ -398,6 +399,7 @@ async def query_similar( ) async with self.get_session_with_retry() as session: + await session.execute(text("SET LOCAL ivfflat.probes = :probes"), {"probes": self.ivfflat_probes}) result = await session.execute(query) rows = result.all() diff --git a/core/vector_store/pgvector_store.py b/core/vector_store/pgvector_store.py index 3795c67d..34a9e1b6 100644 --- a/core/vector_store/pgvector_store.py +++ b/core/vector_store/pgvector_store.py @@ -122,6 +122,7 @@ def __init__( pool_recycle = getattr(settings, "DB_POOL_RECYCLE", 3600) pool_timeout = getattr(settings, "DB_POOL_TIMEOUT", 10) pool_pre_ping = getattr(settings, "DB_POOL_PRE_PING", True) + self.ivfflat_probes = max(1, int(getattr(settings, "VECTOR_IVFFLAT_PROBES", 100) or 100)) # Strip parameters that asyncpg doesn't accept as keyword arguments # These will raise "unexpected keyword argument" errors @@ -457,6 +458,7 @@ async def query_similar( """ try: async with self.get_session_with_retry() as session: + await session.execute(text("SET LOCAL ivfflat.probes = :probes"), {"probes": self.ivfflat_probes}) # Build query with cosine distance calculation, which is normalized to [0, 2]. # A distance of 0 is perfect similarity. distance = VectorEmbedding.embedding.op("<=>")(query_embedding)