diff --git a/core/routes/ingest.py b/core/routes/ingest.py index 32bd24ef..4e6ae0e3 100644 --- a/core/routes/ingest.py +++ b/core/routes/ingest.py @@ -326,7 +326,7 @@ async def _process_document(doc: Document, override_flag: Optional[bool]) -> Non use_colpali_flag = bool(raw_value) break if use_colpali_flag is None: - use_colpali_flag = True + use_colpali_flag = False system_metadata = doc.system_metadata or {} if isinstance(system_metadata, str): diff --git a/core/vector_store/multi_vector_store.py b/core/vector_store/multi_vector_store.py index 0660eed9..8e99a62d 100644 --- a/core/vector_store/multi_vector_store.py +++ b/core/vector_store/multi_vector_store.py @@ -936,6 +936,7 @@ async def delete_chunks_by_document_id(self, document_id: str, app_id: Optional[ query = f"DELETE FROM multi_vector_embeddings WHERE document_id = '{document_id}'" with self.get_connection() as conn: conn.execute(query) + conn.commit() logger.info(f"Deleted all chunks for document {document_id} from multi-vector store") diff --git a/core/workers/ingestion_worker.py b/core/workers/ingestion_worker.py index 204f4e3e..a4b51fb4 100644 --- a/core/workers/ingestion_worker.py +++ b/core/workers/ingestion_worker.py @@ -552,7 +552,7 @@ def _meta_resolver(): # noqa: D401 phase_times["html_to_pdf"] = time.time() - html_conversion_start # Check if we're using ColPali - using_colpali = ( + using_colpali = bool( use_colpali and ingestion_service.colpali_embedding_model and ingestion_service.colpali_vector_store ) logger.debug( @@ -870,6 +870,38 @@ def _meta_resolver(): # noqa: D401 phase_times["generate_embeddings"] = 0 phase_times["create_chunk_objects"] = 0 + # 11b. Delete old chunks if this is a re-ingestion (requeue) + # Must run before any new chunks are stored (both regular and ColPali) + if doc.chunk_ids: + logger.info(f"Re-ingestion detected for {document_id}, deleting {len(doc.chunk_ids)} old chunks") + deletion_tasks = [] + if hasattr(vector_store, "delete_chunks_by_document_id"): + deletion_tasks.append(vector_store.delete_chunks_by_document_id(document_id, auth.app_id)) + # Always try to clean colpali store — the doc may have been ingested + # with colpali previously even if this re-ingestion doesn't use it + cleanup_colpali_store = colpali_vector_store + if not cleanup_colpali_store and settings.ENABLE_COLPALI: + try: + cleanup_colpali_store = await _get_worker_colpali_store(database) + except Exception as e: + logger.warning(f"Could not init colpali store for cleanup: {e}") + if cleanup_colpali_store and hasattr(cleanup_colpali_store, "delete_chunks_by_document_id"): + deletion_tasks.append(cleanup_colpali_store.delete_chunks_by_document_id(document_id, auth.app_id)) + chunk_v2_store = ctx.get("chunk_v2_store") + if chunk_v2_store and auth.app_id and hasattr(chunk_v2_store, "delete_chunks_by_document_id"): + deletion_tasks.append(chunk_v2_store.delete_chunks_by_document_id(document_id, auth)) + if deletion_tasks: + try: + results = await asyncio.wait_for( + asyncio.gather(*deletion_tasks, return_exceptions=True), + timeout=30, + ) + for i, result in enumerate(results): + if isinstance(result, Exception): + logger.error(f"Error deleting old chunks (task {i}): {result}") + except asyncio.TimeoutError: + logger.error(f"Timeout deleting old chunks for {document_id}, proceeding anyway") + # 12. Handle ColPali embeddings chunk_objects_multivector = [] colpali_chunk_ids: List[str] = [] @@ -1108,6 +1140,7 @@ def _meta_resolver(): # noqa: D401 # Update document status to completed after all processing doc.system_metadata["page_count"] = final_page_count doc.system_metadata["status"] = "completed" + doc.system_metadata["use_colpali"] = using_colpali doc.system_metadata["updated_at"] = datetime.now(UTC) # Clear progress info on completion doc.system_metadata.pop("progress", None)