Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 127 additions & 5 deletions client/client_tests/client/test_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
import tempfile
from concurrent.futures import Future
from unittest.mock import ANY
from unittest.mock import call
from unittest.mock import MagicMock
from unittest.mock import patch

import nv_ingest_client.client.interface as module_under_test
import pytest

from client.client_tests.utilities_for_test import (
from ..utilities_for_test import (
cleanup_test_workspace,
create_test_workspace,
get_git_root,
Expand Down Expand Up @@ -873,7 +874,10 @@ def test_vdb_upload_with_failures_return_failures_true(workspace, monkeypatch, c
os.makedirs(results_dir)

# Mock successful results for 2 jobs and failures for 1 job
successful_results = [[{"data": "embedding1", "source": "doc1"}], [{"data": "embedding2", "source": "doc2"}]]
successful_results = [
[{"data": "embedding1", "source": "doc1"}],
[{"data": "embedding2", "source": "doc2"}],
]
failures = [("job_3", "Processing failed")]

def fake_processor(completion_callback=None, **kwargs):
Expand All @@ -886,7 +890,10 @@ def fake_processor(completion_callback=None, **kwargs):
return (successful_results, failures)

mock_client.process_jobs_concurrently.side_effect = fake_processor
mock_client._job_index_to_job_spec = {"0": MagicMock(source_name=doc1_path), "1": MagicMock(source_name="doc2.txt")}
mock_client._job_index_to_job_spec = {
"0": MagicMock(source_name=doc1_path),
"1": MagicMock(source_name="doc2.txt"),
}

with Ingestor(documents=[doc1_path]) as ingestor:
ingestor.save_to_disk(output_directory=results_dir, cleanup=False, compression=None)
Expand Down Expand Up @@ -993,7 +1000,10 @@ def fake_processor(completion_callback=None, **kwargs):
return (successful_results, failures)

mock_client.process_jobs_concurrently.side_effect = fake_processor
mock_client._job_index_to_job_spec = {"0": MagicMock(source_name=doc1_path), "1": MagicMock(source_name="doc2.txt")}
mock_client._job_index_to_job_spec = {
"0": MagicMock(source_name=doc1_path),
"1": MagicMock(source_name="doc2.txt"),
}

with Ingestor(documents=[doc1_path]) as ingestor:
ingestor.save_to_disk(output_directory=results_dir, cleanup=False, compression=compression)
Expand Down Expand Up @@ -1034,7 +1044,11 @@ def test_vdb_upload_with_all_failures_return_failures_true(workspace, monkeypatc

# Mock no successful results, only failures
successful_results = []
failures = [("job_1", "Processing failed"), ("job_2", "Processing failed"), ("job_3", "Processing failed")]
failures = [
("job_1", "Processing failed"),
("job_2", "Processing failed"),
("job_3", "Processing failed"),
]

def fake_processor(completion_callback=None, **kwargs):
return (successful_results, failures)
Expand Down Expand Up @@ -1102,3 +1116,111 @@ def fake_processor(completion_callback=None, **kwargs):
mock_vdb_op.run.assert_called_once()
called_args = mock_vdb_op.run.call_args[0][0]
assert called_args == successful_results # Should be raw data when save_to_disk() is not used


@pytest.fixture
def mock_files_in_workspace(tmp_path):
"""
A factory fixture that creates a specified number of dummy files
in a temporary directory and returns their paths.
"""

def _create_files(num_files: int):
file_paths = []
for i in range(num_files):
file_path = tmp_path / f"doc{i+1}.txt"
file_path.write_text(f"content of doc {i+1}")
file_paths.append(str(file_path))
return file_paths

return _create_files


def test_ingest_in_chunks_raises_error_if_vdb_not_configured(ingestor_without_doc, mock_files_in_workspace):
"""
Verifies that ingest_in_chunks raises a ValueError if vdb_upload has not been called.
"""
doc_files = mock_files_in_workspace(num_files=2)
ingestor = ingestor_without_doc.files(doc_files)

with pytest.raises(ValueError, match=r"`vdb_upload\(\)` must be configured to use `ingest_in_chunks`."):
ingestor.ingest_in_chunks(chunk_size=1)


def test_ingest_in_chunks_splits_correctly_and_aggregates_results(ingestor_without_doc, mock_files_in_workspace):
"""
Tests the happy path for ingest_in_chunks.
Verifies that it splits documents into correct chunks, calls ingest for each,
and aggregates the results and failures.
"""
five_docs = mock_files_in_workspace(num_files=5)
ingestor = ingestor_without_doc.files(five_docs)

ingestor._vdb_bulk_upload = MagicMock()

mock_results_per_chunk = [
(["result1"], [("failure1", "error")]),
(["result2", "result3"], []),
(["result4"], [("failure2", "error")]),
]

with patch.object(ingestor, "ingest", side_effect=mock_results_per_chunk) as mock_ingest:
final_results, final_failures = ingestor.ingest_in_chunks(chunk_size=2, return_failures=True)

assert mock_ingest.call_count == 3

expected_calls = [
call(return_failures=True, show_progress=False),
call(return_failures=True, show_progress=False),
call(return_failures=True, show_progress=False),
]
mock_ingest.assert_has_calls(expected_calls, any_order=False)

expected_final_results = ["result1", "result2", "result3", "result4"]
expected_final_failures = [("failure1", "error"), ("failure2", "error")]

assert final_results == expected_final_results
assert final_failures == expected_final_failures


def test_ingest_in_chunks_handles_no_failures(ingestor_without_doc, mock_files_in_workspace):
"""
Tests ingest_in_chunks when the underlying ingest calls produce no failures.
"""
three_docs = mock_files_in_workspace(num_files=3)
ingestor = ingestor_without_doc.files(three_docs)
ingestor._vdb_bulk_upload = MagicMock()

mock_results_per_chunk = [
(["result1"], []),
(["result2", "result3"], []),
]

with patch.object(ingestor, "ingest", side_effect=mock_results_per_chunk) as mock_ingest:
final_results, final_failures = ingestor.ingest_in_chunks(chunk_size=2, return_failures=True)

assert mock_ingest.call_count == 2
assert final_results == ["result1", "result2", "result3"]
assert final_failures == []


def test_ingest_in_chunks_returns_only_results_when_failures_not_requested(
ingestor_without_doc, mock_files_in_workspace
):
"""
Tests that ingest_in_chunks returns only the results list when return_failures=False.
"""
three_docs = mock_files_in_workspace(num_files=3)
ingestor = ingestor_without_doc.files(three_docs)
ingestor._vdb_bulk_upload = MagicMock()

mock_results_per_chunk = [
(["result1"], [("failure1", "error")]),
(["result2"], []),
]

with patch.object(ingestor, "ingest", side_effect=mock_results_per_chunk) as mock_ingest:
final_results = ingestor.ingest_in_chunks(chunk_size=2, return_failures=False)

assert mock_ingest.call_count == 2
assert final_results == ["result1", "result2"]
80 changes: 79 additions & 1 deletion client/src/nv_ingest_client/client/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ def ingest(
show_progress: bool = False,
return_failures: bool = False,
save_to_disk: bool = False,
chunk_size: Optional[int] = None,
**kwargs: Any,
) -> Union[
List[List[Dict[str, Any]]], # In-memory: List of (response['data'] for each doc)
Expand All @@ -420,6 +421,12 @@ def ingest(
Whether to display a progress bar. Default is False.
return_failures : bool, optional
If True, return a tuple (results, failures); otherwise, return only results. Default is False.
save_to_disk : bool, optional
A convenience flag to enable saving results to disk in a temporary directory.
Equivalent to calling `.save_to_disk()` with default settings. Default is False.
chunk_size : int, optional
If set, documents will be processed in chunks of this size to limit disk usage.
If None (default), all documents are processed in a single pass.
**kwargs : Any
Additional keyword arguments for the underlying client methods. Supported keys:
'concurrency_limit', 'timeout', 'max_job_retries', 'retry_delay',
Expand All @@ -436,6 +443,12 @@ def ingest(
if save_to_disk and (not self._output_config):
self.save_to_disk()

if chunk_size is not None:
results, failures = self.ingest_in_chunks(
chunk_size=chunk_size, show_progress=show_progress, return_failures=return_failures
)
return (results, failures) if return_failures else results

self._prepare_ingest_run()

# Add jobs locally first
Expand Down Expand Up @@ -613,6 +626,72 @@ def _in_memory_callback(

return (results, failures) if return_failures else results

def ingest_in_chunks(
self,
chunk_size: int = 100,
show_progress: bool = False,
return_failures: bool = False,
**kwargs: Any,
) -> Tuple[List[Any], List[Tuple[str, str]]]:
"""
Ingests documents in smaller chunks to limit peak disk space usage.

This method processes the full list of documents in batches of `chunk_size`,
performing a full ingest, VDB upload, and cleanup for each chunk.

Parameters
----------
chunk_size : int, optional
The number of documents to process in each chunk. Default is 100.
show_progress : bool, optional
Whether to display a progress bar for the chunks. Default is False.
**kwargs : Any
Additional keyword arguments passed to the underlying `ingest` method.

Returns
-------
Tuple containing aggregated successful results and failure information.
"""
if not self._vdb_bulk_upload:
raise ValueError("`vdb_upload()` must be configured to use `ingest_in_chunks`.")

job_spec_chunks = {}
total_count = 0
for file_type, job_specs in self._job_specs.job_specs.items():
job_spec_chunks[file_type] = []
for i in range(0, len(job_specs), chunk_size):
chunk = job_specs[i : i + chunk_size]
job_spec_chunks[file_type].append(chunk)
total_count += len(chunk)

aggregated_results = []
aggregated_failures = []

pbar = tqdm(total=total_count, desc="Processing", unit="doc") if show_progress else None

original_job_specs = self._job_specs
for file_type, chunks in job_spec_chunks.items():
for chunk in chunks:
# Temporarily replace the main job_specs
chunk_job_spec = BatchJobSpec(chunk)
self._job_specs = chunk_job_spec

results, failures = self.ingest(return_failures=True, show_progress=False, **kwargs)

aggregated_results.extend(results)
aggregated_failures.extend(failures)

if pbar:
pbar.update(len(chunk))

# Restore original job_specs
self._job_specs = original_job_specs

if pbar:
pbar.close()

return (aggregated_results, aggregated_failures) if return_failures else aggregated_results

def ingest_async(self, **kwargs: Any) -> Future:
"""
Asynchronously submits jobs and returns a single future that completes when all jobs have finished.
Expand Down Expand Up @@ -742,7 +821,6 @@ def embed(self, **kwargs: Any) -> "Ingestor":
----------
kwargs : dict
Parameters specific to the EmbedTask.

Returns
-------
Ingestor
Expand Down
Loading