diff --git a/core/services/v2_document_service.py b/core/services/v2_document_service.py index a6f13deb..43c4594c 100644 --- a/core/services/v2_document_service.py +++ b/core/services/v2_document_service.py @@ -20,11 +20,12 @@ from core.models.auth import AuthContext from core.models.chunk import Chunk from core.models.documents import Document +from core.models.folders import Folder from core.parser.docling_v2 import DoclingV2Parser from core.parser.morphik_parser import MorphikParser from core.storage.base_storage import BaseStorage from core.storage.utils_file_extensions import detect_content_type -from core.utils.folder_utils import normalize_ingest_folder_inputs +from core.utils.folder_utils import normalize_folder_path, normalize_ingest_folder_inputs from core.utils.typed_metadata import MetadataBundle, normalize_metadata from core.vector_store.chunk_v2_store import ChunkV2Store @@ -223,6 +224,81 @@ def _reset_processing_metadata(system_metadata: Optional[Dict[str, Any]]) -> Dic cleaned_metadata["updated_at"] = datetime.now(UTC) return cleaned_metadata + @staticmethod + def _folder_update_fields(folder_obj: Folder) -> Dict[str, Any]: + """Build a consistent update payload for folder metadata columns.""" + try: + path_value = folder_obj.full_path or (normalize_folder_path(folder_obj.name) if folder_obj.name else None) + except ValueError: + path_value = folder_obj.name + + return { + "folder_id": folder_obj.id, + "folder_path": path_value, + "folder_name": folder_obj.name, + } + + async def _ensure_folder_exists(self, folder_path: str, document_id: str, auth: AuthContext) -> Optional[Folder]: + """ + Ensure a folder path exists (creating ancestors as needed) and add the document to the leaf. + """ + try: + canonical_path = normalize_folder_path(folder_path) + segments = canonical_path.strip("/").split("/") if canonical_path and canonical_path != "/" else [] + + if canonical_path == "/": + logger.error("Cannot ingest into root folder '/'") + raise ValueError("Cannot ingest into root folder '/'") + + parent_id: Optional[str] = None + current_path_parts: List[str] = [] + target_folder: Optional[Folder] = None + + for idx, segment in enumerate(segments): + current_path_parts.append(segment) + current_path = "/" + "/".join(current_path_parts) + existing = await self.db.get_folder_by_full_path(current_path, auth) + if existing: + parent_id = existing.id + if idx == len(segments) - 1: + target_folder = existing + continue + + folder_depth = idx + 1 + folder = Folder( + name=segment, + full_path=current_path, + parent_id=parent_id, + depth=folder_depth, + document_ids=[document_id] if idx == len(segments) - 1 else [], + app_id=auth.app_id, + ) + await self.db.create_folder(folder, auth) + parent_id = folder.id + if idx == len(segments) - 1: + target_folder = folder + + if target_folder is None: + logger.error("Failed to ensure target folder for path %s", canonical_path) + return None + + if document_id not in (target_folder.document_ids or []): + success = await self.db.add_document_to_folder(target_folder.id, document_id, auth) + if not success: + logger.warning( + "Failed to add document %s to folder %s. This may be due to a race condition.", + document_id, + target_folder.name, + ) + else: + logger.info("Successfully added document %s to folder %s", document_id, target_folder.name) + + return target_folder + + except Exception as exc: # noqa: BLE001 + logger.error("Error ensuring folder exists: %s", exc) + return None + @classmethod def _build_ingestion_job_payload( cls, @@ -589,6 +665,22 @@ async def ingest_document( await self._mark_document_failed(doc, auth, f"Storage upload failed: {exc}") raise HTTPException(status_code=500, detail=f"Failed to upload file to storage: {exc}") from exc + if folder_path_value: + try: + folder_obj = await self._ensure_folder_exists(folder_path_value, doc.external_id, auth) + if folder_obj and folder_obj.id: + doc.folder_id = folder_obj.id + folder_updates = self._folder_update_fields(folder_obj) + await self.db.update_document(doc.external_id, folder_updates, auth=auth) + logger.debug("Ensured folder '%s' exists and contains document %s", folder_path_value, doc.external_id) + except Exception as exc: # noqa: BLE001 + logger.error( + "Error ensuring folder exists for doc %s (path=%s): %s", + doc.external_id, + folder_path_value, + exc, + ) + if queue is None: queue = redis is not None