Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 93 additions & 1 deletion core/services/v2_document_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
from core.models.auth import AuthContext
from core.models.chunk import Chunk
from core.models.documents import Document
from core.models.folders import Folder
from core.parser.docling_v2 import DoclingV2Parser
from core.parser.morphik_parser import MorphikParser
from core.storage.base_storage import BaseStorage
from core.storage.utils_file_extensions import detect_content_type
from core.utils.folder_utils import normalize_ingest_folder_inputs
from core.utils.folder_utils import normalize_folder_path, normalize_ingest_folder_inputs
from core.utils.typed_metadata import MetadataBundle, normalize_metadata
from core.vector_store.chunk_v2_store import ChunkV2Store

Expand Down Expand Up @@ -223,6 +224,81 @@ def _reset_processing_metadata(system_metadata: Optional[Dict[str, Any]]) -> Dic
cleaned_metadata["updated_at"] = datetime.now(UTC)
return cleaned_metadata

@staticmethod
def _folder_update_fields(folder_obj: Folder) -> Dict[str, Any]:
"""Build a consistent update payload for folder metadata columns."""
try:
path_value = folder_obj.full_path or (normalize_folder_path(folder_obj.name) if folder_obj.name else None)
except ValueError:
path_value = folder_obj.name

return {
"folder_id": folder_obj.id,
"folder_path": path_value,
"folder_name": folder_obj.name,
}

async def _ensure_folder_exists(self, folder_path: str, document_id: str, auth: AuthContext) -> Optional[Folder]:
"""
Ensure a folder path exists (creating ancestors as needed) and add the document to the leaf.
"""
try:
canonical_path = normalize_folder_path(folder_path)
segments = canonical_path.strip("/").split("/") if canonical_path and canonical_path != "/" else []

if canonical_path == "/":
logger.error("Cannot ingest into root folder '/'")
raise ValueError("Cannot ingest into root folder '/'")

parent_id: Optional[str] = None
current_path_parts: List[str] = []
target_folder: Optional[Folder] = None

for idx, segment in enumerate(segments):
current_path_parts.append(segment)
current_path = "/" + "/".join(current_path_parts)
existing = await self.db.get_folder_by_full_path(current_path, auth)
if existing:
parent_id = existing.id
if idx == len(segments) - 1:
target_folder = existing
continue

folder_depth = idx + 1
folder = Folder(
name=segment,
full_path=current_path,
parent_id=parent_id,
depth=folder_depth,
document_ids=[document_id] if idx == len(segments) - 1 else [],
app_id=auth.app_id,
)
await self.db.create_folder(folder, auth)
parent_id = folder.id
if idx == len(segments) - 1:
target_folder = folder

if target_folder is None:
logger.error("Failed to ensure target folder for path %s", canonical_path)
return None

if document_id not in (target_folder.document_ids or []):
success = await self.db.add_document_to_folder(target_folder.id, document_id, auth)
if not success:
logger.warning(
"Failed to add document %s to folder %s. This may be due to a race condition.",
document_id,
target_folder.name,
)
else:
logger.info("Successfully added document %s to folder %s", document_id, target_folder.name)

return target_folder

except Exception as exc: # noqa: BLE001
logger.error("Error ensuring folder exists: %s", exc)
return None

@classmethod
def _build_ingestion_job_payload(
cls,
Expand Down Expand Up @@ -589,6 +665,22 @@ async def ingest_document(
await self._mark_document_failed(doc, auth, f"Storage upload failed: {exc}")
raise HTTPException(status_code=500, detail=f"Failed to upload file to storage: {exc}") from exc

if folder_path_value:
try:
folder_obj = await self._ensure_folder_exists(folder_path_value, doc.external_id, auth)
if folder_obj and folder_obj.id:
doc.folder_id = folder_obj.id
folder_updates = self._folder_update_fields(folder_obj)
await self.db.update_document(doc.external_id, folder_updates, auth=auth)
logger.debug("Ensured folder '%s' exists and contains document %s", folder_path_value, doc.external_id)
except Exception as exc: # noqa: BLE001
logger.error(
"Error ensuring folder exists for doc %s (path=%s): %s",
doc.external_id,
folder_path_value,
exc,
)

if queue is None:
queue = redis is not None

Expand Down
Loading