Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class Settings(BaseSettings):
# Vector store configuration
VECTOR_STORE_PROVIDER: Literal["pgvector"]
VECTOR_STORE_DATABASE_NAME: Optional[str] = None
VECTOR_IVFFLAT_PROBES: int = 100

# Multivector store configuration
MULTIVECTOR_STORE_PROVIDER: Literal["postgres", "morphik"] = "postgres"
Expand Down Expand Up @@ -361,6 +362,12 @@ def get_settings() -> Settings:
if "POSTGRES_URI" not in os.environ:
raise ValueError(em.format(missing_value="POSTGRES_URI", field="vector_store.provider", value="pgvector"))

ivfflat_probes = config["vector_store"].get("ivfflat_probes", 100)
try:
settings_dict["VECTOR_IVFFLAT_PROBES"] = max(1, int(ivfflat_probes))
except (TypeError, ValueError):
settings_dict["VECTOR_IVFFLAT_PROBES"] = 100

# Load morphik config
api_domain = config["morphik"].get("api_domain", "api.morphik.ai")
# morphik_embedding_api_domain is always a list of endpoints
Expand Down
154 changes: 144 additions & 10 deletions core/parser/docling_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,80 @@
from html import escape as html_escape
from typing import Dict, List, Optional, Tuple

from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import EasyOcrOptions, PdfPipelineOptions, TableStructureOptions
from docling.document_converter import DocumentConverter, PdfFormatOption
from docling_core.types.doc.document import ContentLayer
from docling_core.types.doc.labels import DocItemLabel
from httpx import AsyncClient, Timeout

logger = logging.getLogger(__name__)

# Lazy imports for docling (heavy dependencies, only needed for local mode)
DocumentConverter = None
InputFormat = None
PdfFormatOption = None
PdfPipelineOptions = None
EasyOcrOptions = None
TableStructureOptions = None
ContentLayer = None
DocItemLabel = None


def _ensure_docling_imports():
"""Lazily import docling dependencies only when needed for local parsing."""
global DocumentConverter, InputFormat, PdfFormatOption, PdfPipelineOptions
global EasyOcrOptions, TableStructureOptions, ContentLayer, DocItemLabel

if DocumentConverter is None:
from docling.datamodel.base_models import InputFormat as _InputFormat
from docling.datamodel.pipeline_options import EasyOcrOptions as _EasyOcrOptions
from docling.datamodel.pipeline_options import PdfPipelineOptions as _PdfPipelineOptions
from docling.datamodel.pipeline_options import TableStructureOptions as _TableStructureOptions
from docling.document_converter import DocumentConverter as _DocumentConverter
from docling.document_converter import PdfFormatOption as _PdfFormatOption
from docling_core.types.doc.document import ContentLayer as _ContentLayer
from docling_core.types.doc.labels import DocItemLabel as _DocItemLabel

DocumentConverter = _DocumentConverter
InputFormat = _InputFormat
PdfFormatOption = _PdfFormatOption
PdfPipelineOptions = _PdfPipelineOptions
EasyOcrOptions = _EasyOcrOptions
TableStructureOptions = _TableStructureOptions
ContentLayer = _ContentLayer
DocItemLabel = _DocItemLabel

class DoclingV2Parser:
"""Docling parser that returns page-wise XML chunks with bbox metadata."""

_docling_converter: Optional[DocumentConverter] = None
class DoclingV2Parser:
"""Docling parser that returns page-wise XML chunks with bbox metadata.

Supports both local parsing (with GPU) and remote API parsing.
When PARSER_MODE is 'api', documents are sent to GPU servers for processing.
"""

_docling_converter = None

def __init__(self, settings=None):
"""Initialize the parser with optional settings for API mode.

Args:
settings: Settings object with PARSER_MODE, MORPHIK_EMBEDDING_API_DOMAIN,
and MORPHIK_EMBEDDING_API_KEY for API mode configuration.
"""
self.settings = settings
self._parse_api_endpoints: Optional[List[str]] = None
self._parse_api_key: Optional[str] = None

if settings and getattr(settings, "PARSER_MODE", "local") == "api":
api_domain = getattr(settings, "MORPHIK_EMBEDDING_API_DOMAIN", None)
if api_domain:
if isinstance(api_domain, str):
api_domain = [api_domain]
self._parse_api_endpoints = [f"{d.rstrip('/')}/parse/v2" for d in api_domain]
self._parse_api_key = getattr(settings, "MORPHIK_EMBEDDING_API_KEY", None)
logger.info(f"DoclingV2Parser API mode enabled with {len(self._parse_api_endpoints)} endpoint(s)")

@classmethod
def _get_converter(cls) -> DocumentConverter:
def _get_converter(cls):
"""Get or create the Docling converter for local parsing."""
if cls._docling_converter is None:
_ensure_docling_imports()
pipeline_options = PdfPipelineOptions()
pipeline_options.do_ocr = True
try:
Expand All @@ -46,6 +103,7 @@ def _get_converter(cls) -> DocumentConverter:
@classmethod
def convert_bytes(cls, file_bytes: bytes, filename: str):
"""Convert a file (bytes) to a Docling document."""
_ensure_docling_imports()
suffix = os.path.splitext(filename)[1] or ".pdf"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file:
temp_file.write(file_bytes)
Expand Down Expand Up @@ -92,7 +150,8 @@ def build_page_xml_chunks(
filename: str,
) -> List[Tuple[str, int]]:
"""Build one XML chunk per page with bbox metadata."""
label_to_tag: Dict[DocItemLabel, str] = {}
_ensure_docling_imports()
label_to_tag: Dict = {}

def _add(label_name: str, tag: str) -> None:
label = getattr(DocItemLabel, label_name, None)
Expand Down Expand Up @@ -216,3 +275,78 @@ def _add(label_name: str, tag: str) -> None:
xml_chunks.append((xml, page_no))

return xml_chunks

async def _parse_via_api(
self,
file_bytes: bytes,
filename: str,
document_id: str,
display_filename: Optional[str],
) -> List[Tuple[str, int]]:
"""Parse document via remote API (GPU server)."""
if not self._parse_api_endpoints or not self._parse_api_key:
raise RuntimeError("Parser V2 API not configured")

headers = {"Authorization": f"Bearer {self._parse_api_key}"}
timeout = Timeout(read=300.0, connect=30.0, write=60.0, pool=30.0)

last_error: Optional[Exception] = None
for endpoint in self._parse_api_endpoints:
try:
async with AsyncClient(timeout=timeout) as client:
files = {"file": (filename, file_bytes)}
data = {"filename": filename, "document_id": document_id}
if display_filename and display_filename != filename:
data["display_filename"] = display_filename
resp = await client.post(endpoint, files=files, data=data, headers=headers)
resp.raise_for_status()
result = resp.json()

# Convert API response to list of (xml, page_number) tuples
chunks = result.get("chunks", [])
return [(c["xml"], c["page_number"]) for c in chunks]
except Exception as e:
logger.warning(f"Parse V2 API call to {endpoint} failed: {e}")
last_error = e
continue

raise RuntimeError(f"All parse V2 API endpoints failed. Last error: {last_error}")

def _parse_local(
self,
file_bytes: bytes,
filename: str,
document_id: str,
display_filename: Optional[str],
) -> List[Tuple[str, int]]:
"""Parse document using local Docling."""
doc = self.convert_bytes(file_bytes, filename)
return self.build_page_xml_chunks(doc, document_id, display_filename or filename)

async def parse(
self,
file_bytes: bytes,
filename: str,
document_id: str,
display_filename: Optional[str] = None,
) -> List[Tuple[str, int]]:
"""Parse document and return page-wise XML chunks with bbox metadata.

Uses API if configured, otherwise falls back to local parsing.

Args:
file_bytes: Raw file content
filename: Original filename
document_id: Document identifier for the XML

Returns:
List of (xml_string, page_number) tuples
"""
if self._parse_api_endpoints:
try:
return await self._parse_via_api(file_bytes, filename, document_id, display_filename)
except Exception as e:
logger.warning(f"API parsing failed, falling back to local: {e}")
return self._parse_local(file_bytes, filename, document_id, display_filename)
else:
return self._parse_local(file_bytes, filename, document_id, display_filename)
29 changes: 19 additions & 10 deletions core/services/v2_document_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __init__(
self.parser = parser
self.embedding_model = embedding_model
self.chunk_store = chunk_store
self.docling_parser = DoclingV2Parser()
self.docling_parser = DoclingV2Parser(settings)

# ------------------------------------------------------------------
# Helpers
Expand Down Expand Up @@ -365,25 +365,34 @@ async def _build_xml_chunks(
parse_bytes = self._convert_office_to_pdf_bytes(file_bytes, ".docx", "Word document")
parse_filename = f"{Path(filename).stem}.pdf"

docling_doc = None
if ext == ".pptx":
try:
docling_doc = self.docling_parser.convert_bytes(parse_bytes, parse_filename)
xml_chunks = await self.docling_parser.parse(
parse_bytes,
parse_filename,
document_id,
display_filename=filename,
)
except Exception: # noqa: BLE001
parse_bytes = self._convert_office_to_pdf_bytes(
file_bytes,
".pptx",
"PowerPoint presentation",
)
parse_filename = f"{Path(filename).stem}.pdf"
docling_doc = self.docling_parser.convert_bytes(parse_bytes, parse_filename)
xml_chunks = await self.docling_parser.parse(
parse_bytes,
parse_filename,
document_id,
display_filename=filename,
)
else:
docling_doc = self.docling_parser.convert_bytes(parse_bytes, parse_filename)
xml_chunks = self.docling_parser.build_page_xml_chunks(
docling_doc,
document_id,
filename,
)
xml_chunks = await self.docling_parser.parse(
parse_bytes,
parse_filename,
document_id,
display_filename=filename,
)

if not xml_chunks:
raise ValueError("No page chunks extracted from document")
Expand Down
2 changes: 2 additions & 0 deletions core/vector_store/chunk_v2_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def __init__(
metadata_column="doc_metadata",
metadata_types_column="metadata_types",
)
self.ivfflat_probes = max(1, int(getattr(settings, "VECTOR_IVFFLAT_PROBES", 100) or 100))

@asynccontextmanager
async def get_session_with_retry(self) -> AsyncContextManager[AsyncSession]:
Expand Down Expand Up @@ -398,6 +399,7 @@ async def query_similar(
)

async with self.get_session_with_retry() as session:
await session.execute(text("SET LOCAL ivfflat.probes = :probes"), {"probes": self.ivfflat_probes})
result = await session.execute(query)
rows = result.all()

Expand Down
2 changes: 2 additions & 0 deletions core/vector_store/pgvector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def __init__(
pool_recycle = getattr(settings, "DB_POOL_RECYCLE", 3600)
pool_timeout = getattr(settings, "DB_POOL_TIMEOUT", 10)
pool_pre_ping = getattr(settings, "DB_POOL_PRE_PING", True)
self.ivfflat_probes = max(1, int(getattr(settings, "VECTOR_IVFFLAT_PROBES", 100) or 100))

# Strip parameters that asyncpg doesn't accept as keyword arguments
# These will raise "unexpected keyword argument" errors
Expand Down Expand Up @@ -457,6 +458,7 @@ async def query_similar(
"""
try:
async with self.get_session_with_retry() as session:
await session.execute(text("SET LOCAL ivfflat.probes = :probes"), {"probes": self.ivfflat_probes})
# Build query with cosine distance calculation, which is normalized to [0, 2].
# A distance of 0 is perfect similarity.
distance = VectorEmbedding.embedding.op("<=>")(query_embedding)
Expand Down
Loading