Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from core.routes.logs import router as logs_router # noqa: E402 – import after FastAPI app
from core.routes.models import router as models_router
from core.routes.usage import router as usage_router
from core.routes.v2 import router as v2_router
from core.services.telemetry import TelemetryService
from core.services_init import document_service, ingestion_service
from core.utils.folder_utils import normalize_folder_selector
Expand Down Expand Up @@ -326,6 +327,9 @@ def _extract_provider(model_name: str) -> str:
# Register models router
app.include_router(models_router)

# Register v2 router
app.include_router(v2_router)

# Register logs router
app.include_router(logs_router)

Expand Down
14 changes: 13 additions & 1 deletion core/app_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async def lifespan(app_instance: FastAPI):
# ------------------------------------------------------------------
# Import services directly from services_init instead of through api_module
# ------------------------------------------------------------------
from core.services_init import database, settings, vector_store
from core.services_init import database, settings, v2_chunk_store, vector_store

# --- BEGIN MOVED STARTUP LOGIC ---
logger.info("Lifespan: Initializing Database…")
Expand Down Expand Up @@ -57,6 +57,18 @@ async def lifespan(app_instance: FastAPI):
exc_info=True,
)

logger.info("Lifespan: Initializing V2 Chunk Store…")
try:
if hasattr(v2_chunk_store, "initialize"):
await v2_chunk_store.initialize()
logger.info("Lifespan: V2 Chunk Store initialization successful (or not applicable).")
except Exception as exc: # noqa: BLE001
logger.error(
"Lifespan: CRITICAL - Failed to initialize V2 Chunk Store: %s",
exc,
exc_info=True,
)

# Initialize ColPali vector store if it exists
# Note: max_sim function creation happens in MultiVectorStore.initialize()
logger.info("Lifespan: Initializing ColPali Vector Store…")
Expand Down
81 changes: 54 additions & 27 deletions core/database/metadata_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,16 @@ class InvalidMetadataFilterError(ValueError):
class MetadataFilterBuilder:
"""Translate JSON-style metadata filters into SQL, covering arrays, regex, and substring operators."""

_COLUMN_FIELDS = {
"filename": "filename",
}
def __init__(
self,
*,
metadata_column: str = "doc_metadata",
metadata_types_column: Optional[str] = "metadata_types",
column_fields: Optional[Dict[str, str]] = None,
) -> None:
self.metadata_column = metadata_column
self.metadata_types_column = metadata_types_column
self._column_fields = column_fields or {"filename": "filename"}

def build(self, filters: Optional[Dict[str, Any]]) -> str:
"""Construct a SQL WHERE clause from a metadata filter dictionary."""
Expand Down Expand Up @@ -113,7 +120,7 @@ def _combine_clauses(self, clauses: List[str], operator: str, context: str) -> s

def _build_field_metadata_clause(self, field: str, value: Any) -> str:
"""Build SQL clause for a single metadata field."""
if field in self._COLUMN_FIELDS:
if field in self._column_fields:
return self._build_column_field_clause(field, value)

if isinstance(value, dict) and not any(key.startswith("$") for key in value):
Expand Down Expand Up @@ -192,7 +199,7 @@ def _build_single_value_clause(self, field: str, value: Any) -> str:

def _build_column_field_clause(self, field: str, value: Any) -> str:
"""Build SQL clause for a reserved column field (e.g., filename)."""
column = self._COLUMN_FIELDS[field]
column = self._column_fields[field]
builder = TextColumnFilterBuilder(column)

if isinstance(value, dict):
Expand Down Expand Up @@ -220,7 +227,7 @@ def _build_exists_clause(self, field: str, operand: Any) -> str:
raise InvalidMetadataFilterError(f"$exists operator for field '{field}' expects a boolean value.")

field_key = self._escape_single_quotes(field)
clause = f"(doc_metadata ? '{field_key}')"
clause = f"({self.metadata_column} ? '{field_key}')"
return clause if expected else f"(NOT {clause})"

def _build_comparison_clause(self, field: str, operator: str, operand: Any) -> str:
Expand Down Expand Up @@ -270,9 +277,9 @@ def _build_numeric_comparison_clause(self, field: str, sql_operator: str, operan

field_key = self._escape_single_quotes(field)
type_expr = self._metadata_type_expr(field_key)
# Use CASE to ensure casting only happens when type is correct
value_expr = (
f"(CASE WHEN {type_expr} = 'number' THEN (doc_metadata ->> '{field_key}')::double precision ELSE NULL END)"
f"(CASE WHEN {type_expr} = 'number' THEN ({self.metadata_column} ->> '{field_key}')::double precision "
"ELSE NULL END)"
)
return f"({value_expr} {sql_operator} {literal})"

Expand All @@ -285,8 +292,10 @@ def _build_decimal_comparison_clause(self, field: str, sql_operator: str, operan

field_key = self._escape_single_quotes(field)
type_expr = self._metadata_type_expr(field_key)
# Use CASE to ensure casting only happens when type is correct
value_expr = f"(CASE WHEN {type_expr} = 'decimal' THEN (doc_metadata ->> '{field_key}')::numeric ELSE NULL END)"
value_expr = (
f"(CASE WHEN {type_expr} = 'decimal' THEN ({self.metadata_column} ->> '{field_key}')::numeric "
"ELSE NULL END)"
)
return f"({value_expr} {sql_operator} {literal}::numeric)"

def _build_datetime_comparison_clause(self, field: str, sql_operator: str, operand: Any) -> str:
Expand All @@ -298,9 +307,9 @@ def _build_datetime_comparison_clause(self, field: str, sql_operator: str, opera

field_key = self._escape_single_quotes(field)
type_expr = self._metadata_type_expr(field_key)
# Use CASE to ensure casting only happens when type is correct
value_expr = (
f"(CASE WHEN {type_expr} = 'datetime' THEN (doc_metadata ->> '{field_key}')::timestamptz ELSE NULL END)"
f"(CASE WHEN {type_expr} = 'datetime' THEN ({self.metadata_column} ->> '{field_key}')::timestamptz "
"ELSE NULL END)"
)
return f"({value_expr} {sql_operator} {literal})"

Expand All @@ -313,16 +322,17 @@ def _build_date_comparison_clause(self, field: str, sql_operator: str, operand:

field_key = self._escape_single_quotes(field)
type_expr = self._metadata_type_expr(field_key)
# Use CASE to ensure casting only happens when type is correct
value_expr = f"(CASE WHEN {type_expr} = 'date' THEN (doc_metadata ->> '{field_key}')::date ELSE NULL END)"
value_expr = (
f"(CASE WHEN {type_expr} = 'date' THEN ({self.metadata_column} ->> '{field_key}')::date " "ELSE NULL END)"
)
return f"({value_expr} {sql_operator} {literal})"

def _build_string_comparison_clause(self, field: str, sql_operator: str, operand: str) -> str:
"""Build comparison clause for 'string' typed metadata (only for $eq/$ne)."""
field_key = self._escape_single_quotes(field)
escaped_value = self._escape_single_quotes(operand)
type_expr = self._metadata_type_expr(field_key)
value_expr = f"(doc_metadata ->> '{field_key}')"
value_expr = f"({self.metadata_column} ->> '{field_key}')"
# For strings without explicit type, assume string type (COALESCE handles missing metadata_types)
return f"((COALESCE({type_expr}, 'string') = 'string') AND {value_expr} {sql_operator} '{escaped_value}')"

Expand All @@ -345,8 +355,23 @@ def _build_type_clause(self, field: str, operand: Any) -> str:
raise InvalidMetadataFilterError(str(exc)) from exc

field_key = self._escape_single_quotes(field)
type_expr = f"COALESCE(metadata_types ->> '{field_key}', 'string')"
clauses = [f"({type_expr} = '{type_name}')" for type_name in canonical_types]
if not self.metadata_types_column:
jsonb_type_expr = f"jsonb_typeof({self.metadata_column} -> '{field_key}')"
type_map = {
"string": "string",
"number": "number",
"decimal": "number",
"boolean": "boolean",
"object": "object",
"array": "array",
"null": "null",
"datetime": "string",
"date": "string",
}
clauses = [f"({jsonb_type_expr} = '{type_map.get(type_name, type_name)}')" for type_name in canonical_types]
else:
type_expr = f"COALESCE({self.metadata_types_column} ->> '{field_key}', 'string')"
clauses = [f"({type_expr} = '{type_name}')" for type_name in canonical_types]
if len(clauses) == 1:
return clauses[0]
return "(" + " OR ".join(clauses) + ")"
Expand All @@ -367,7 +392,7 @@ def _jsonb_contains_clause(self, field: str, value: Any) -> str:
) from exc

escaped_payload = json_payload.replace("'", "''")
base_clause = f"(doc_metadata @> '{escaped_payload}'::jsonb)"
base_clause = f"({self.metadata_column} @> '{escaped_payload}'::jsonb)"

array_clause = self._build_array_membership_clause(field, value)
if array_clause:
Expand All @@ -391,8 +416,8 @@ def _build_array_membership_clause(self, field: str, value: Any) -> str:
field_key = self._escape_single_quotes(field)

return (
f"((jsonb_typeof(doc_metadata -> '{field_key}') = 'array') "
f"AND ((doc_metadata -> '{field_key}') @> '{escaped_array_payload}'::jsonb))"
f"((jsonb_typeof({self.metadata_column} -> '{field_key}') = 'array') "
f"AND (({self.metadata_column} -> '{field_key}') @> '{escaped_array_payload}'::jsonb))"
)

def _build_regex_clause(self, field: str, operand: Any) -> str:
Expand All @@ -403,7 +428,7 @@ def _build_regex_clause(self, field: str, operand: Any) -> str:
escaped_pattern = pattern.replace("\\", "\\\\").replace("'", "''")
field_key = self._escape_single_quotes(field)

base_clause = f"((doc_metadata ->> '{field_key}') {regex_operator} '{escaped_pattern}')"
base_clause = f"(({self.metadata_column} ->> '{field_key}') {regex_operator} '{escaped_pattern}')"
array_clause = self._build_array_regex_clause(field, regex_operator, escaped_pattern)
if array_clause:
return f"({base_clause} OR {array_clause})"
Expand Down Expand Up @@ -440,8 +465,8 @@ def _build_array_regex_clause(self, field: str, regex_operator: str, escaped_pat
field_key = self._escape_single_quotes(field)
array_value_expr = "trim('\"' FROM arr.value::text)"
return (
f"((jsonb_typeof(doc_metadata -> '{field_key}') = 'array') AND EXISTS ("
f"SELECT 1 FROM jsonb_array_elements(doc_metadata -> '{field_key}') AS arr(value) "
f"((jsonb_typeof({self.metadata_column} -> '{field_key}') = 'array') AND EXISTS ("
f"SELECT 1 FROM jsonb_array_elements({self.metadata_column} -> '{field_key}') AS arr(value) "
f"WHERE jsonb_typeof(arr.value) = 'string' AND {array_value_expr} {regex_operator} '{escaped_pattern}'))"
)

Expand All @@ -453,7 +478,7 @@ def _build_contains_clause(self, field: str, operand: Any) -> str:
escaped_pattern = self._escape_like_pattern(value)
field_key = self._escape_single_quotes(field)

base_clause = f"((doc_metadata ->> '{field_key}') {like_operator} '%{escaped_pattern}%')"
base_clause = f"(({self.metadata_column} ->> '{field_key}') {like_operator} '%{escaped_pattern}%')"
array_clause = self._build_array_like_clause(field, like_operator, escaped_pattern)
if array_clause:
return f"({base_clause} OR {array_clause})"
Expand Down Expand Up @@ -492,15 +517,17 @@ def _build_array_like_clause(self, field: str, like_operator: str, escaped_patte
field_key = self._escape_single_quotes(field)
array_value_expr = "trim('\"' FROM arr.value::text)"
return (
f"((jsonb_typeof(doc_metadata -> '{field_key}') = 'array') AND EXISTS ("
f"SELECT 1 FROM jsonb_array_elements(doc_metadata -> '{field_key}') AS arr(value) "
f"((jsonb_typeof({self.metadata_column} -> '{field_key}') = 'array') AND EXISTS ("
f"SELECT 1 FROM jsonb_array_elements({self.metadata_column} -> '{field_key}') AS arr(value) "
f"WHERE jsonb_typeof(arr.value) = 'string' AND "
f"{array_value_expr} {like_operator} '%{escaped_pattern}%'))"
)

def _metadata_type_expr(self, field_key: str) -> str:
"""Return SQL expression fetching the stored metadata type for a field."""
return f"(metadata_types ->> '{field_key}')"
if not self.metadata_types_column:
return "NULL"
return f"({self.metadata_types_column} ->> '{field_key}')"

def _map_comparison_operator(self, operator: str) -> str:
"""Map comparison operators to SQL symbols."""
Expand Down
43 changes: 43 additions & 0 deletions core/models/v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, Field, model_validator


class V2IngestResponse(BaseModel):
document_id: str
filename: str
chunk_count: int
status: Optional[str] = None


class V2RetrieveFilters(BaseModel):
document_ids: Optional[List[str]] = Field(default=None, description="Limit to specific document IDs")
folder_paths: Optional[List[str]] = Field(default=None, description="Limit to specific folder paths")
metadata: Optional[Dict[str, Any]] = Field(default=None, description="Chunk-level metadata filters")


class V2RetrieveRequest(BaseModel):
query: str
filters: Optional[V2RetrieveFilters] = None
top_k: int = Field(default=5, ge=1, le=100)
end_user_id: Optional[str] = Field(default=None, description="Optional end-user scope")

@model_validator(mode="after")
def validate_query(self):
if not self.query or not self.query.strip():
raise ValueError("query must be a non-empty string")
return self


class V2ChunkResult(BaseModel):
chunk_id: str
document_id: str
page_number: Optional[int] = None
chunk_number: Optional[int] = None
score: float
content: str


class V2RetrieveResponse(BaseModel):
query: str
chunks: List[V2ChunkResult]
Loading
Loading