diff --git a/datasets/sentinel-1-grd/dataset.yaml b/datasets/sentinel-1-grd/dataset.yaml index f3365181..35e232dc 100644 --- a/datasets/sentinel-1-grd/dataset.yaml +++ b/datasets/sentinel-1-grd/dataset.yaml @@ -1,5 +1,5 @@ id: sentinel-1-grd -image: ${{ args.registry }}/pctasks-sentinel-1-grd:20250708.1 +image: ${{ args.registry }}/pctasks-sentinel-1-grd:20250814.1 args: - registry diff --git a/datasets/sentinel-3/Dockerfile b/datasets/sentinel-3/Dockerfile index 7bdffadf..2652b9d4 100644 --- a/datasets/sentinel-3/Dockerfile +++ b/datasets/sentinel-3/Dockerfile @@ -21,15 +21,15 @@ RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 10 # See https://github.com/mapbox/rasterio/issues/1289 ENV CURL_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt -# Install Python 3.8 -RUN curl -L -O "https://github.com/conda-forge/miniforge/releases/latest/download/Mambaforge-$(uname)-$(uname -m).sh" \ - && bash "Mambaforge-$(uname)-$(uname -m).sh" -b -p /opt/conda \ - && rm -rf "Mambaforge-$(uname)-$(uname -m).sh" +# Install Python 3.10 +RUN curl -L -O "https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-$(uname)-$(uname -m).sh" \ + && bash "Miniforge3-$(uname)-$(uname -m).sh" -b -p /opt/conda \ + && rm -rf "Miniforge3-$(uname)-$(uname -m).sh" ENV PATH /opt/conda/bin:$PATH ENV LD_LIBRARY_PATH /opt/conda/lib/:$LD_LIBRARY_PATH -RUN mamba install -y -c conda-forge python=3.8 gdal=3.3.3 pip setuptools cython numpy==1.21.5 +RUN mamba install -y -c conda-forge python=3.10 gdal libgdal-netcdf pip setuptools cython numpy RUN python -m pip install --upgrade pip diff --git a/datasets/sentinel-3/README.md b/datasets/sentinel-3/README.md index e559d41d..f8cceed7 100644 --- a/datasets/sentinel-3/README.md +++ b/datasets/sentinel-3/README.md @@ -32,12 +32,13 @@ $ PYTHONPATH=datasets/sentinel-3 python -m pytest datasets/sentinel-3/tests/ ```console $ ls datasets/sentinel-3/collection/ | xargs -I {} \ - pctasks dataset process-items '${{ args.since }}' \ + pctasks dataset process-items \ -d datasets/sentinel-3/dataset.yaml \ -c {} \ --workflow-id={}-update \ - --is-update-workflow \ - --upsert + --is-update-workflow {}-update \ + -u \ + -y ``` **Notes:** diff --git a/datasets/sentinel-3/dataset.yaml b/datasets/sentinel-3/dataset.yaml index 7261428e..632f235d 100644 --- a/datasets/sentinel-3/dataset.yaml +++ b/datasets/sentinel-3/dataset.yaml @@ -1,5 +1,5 @@ id: sentinel-3 -image: ${{ args.registry }}/pctasks-sentinel-3:20230630.1 +image: ${{ args.registry }}/pctasks-sentinel-3:20250922.1 args: - registry diff --git a/datasets/sentinel-3/requirements.txt b/datasets/sentinel-3/requirements.txt index 3aa350eb..2387c907 100644 --- a/datasets/sentinel-3/requirements.txt +++ b/datasets/sentinel-3/requirements.txt @@ -1 +1 @@ -git+https://github.com/stactools-packages/sentinel3.git@36375cc63c053087380664ff931ceed5ad3b5f83 +git+https://github.com/stactools-packages/sentinel3.git@93518a430556f290d5e55d3ae0fa1d76cec26197 \ No newline at end of file diff --git a/datasets/sentinel-3/sentinel_3.py b/datasets/sentinel-3/sentinel_3.py index 6d648552..b3b90a00 100644 --- a/datasets/sentinel-3/sentinel_3.py +++ b/datasets/sentinel-3/sentinel_3.py @@ -12,7 +12,7 @@ import pctasks.dataset.collection from pctasks.core.models.task import WaitTaskResult -from pctasks.core.storage import Storage, StorageFactory +from pctasks.core.storage import StorageFactory from pctasks.core.utils.backoff import is_common_throttle_exception, with_backoff handler = logging.StreamHandler() @@ -240,7 +240,6 @@ class Sentinel3Collections(pctasks.dataset.collection.Collection): def create_item( cls, asset_uri: str, storage_factory: StorageFactory ) -> Union[List[pystac.Item], WaitTaskResult]: - # Only create Items for NT (Not Time critical) products sen3_archive = os.path.dirname(asset_uri) assert sen3_archive.endswith(".SEN3") diff --git a/datasets/sentinel-5p/Dockerfile b/datasets/sentinel-5p/Dockerfile index 828da64b..f136c5ad 100644 --- a/datasets/sentinel-5p/Dockerfile +++ b/datasets/sentinel-5p/Dockerfile @@ -21,15 +21,15 @@ RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 10 # See https://github.com/mapbox/rasterio/issues/1289 ENV CURL_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt -# Install Python 3.8 -RUN curl -L -O "https://github.com/conda-forge/miniforge/releases/latest/download/Mambaforge-$(uname)-$(uname -m).sh" \ - && bash "Mambaforge-$(uname)-$(uname -m).sh" -b -p /opt/conda \ - && rm -rf "Mambaforge-$(uname)-$(uname -m).sh" +# Install Python 3.10 +RUN curl -L -O "https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-$(uname)-$(uname -m).sh" \ + && bash "Miniforge3-$(uname)-$(uname -m).sh" -b -p /opt/conda \ + && rm -rf "Miniforge3-$(uname)-$(uname -m).sh" ENV PATH /opt/conda/bin:$PATH ENV LD_LIBRARY_PATH /opt/conda/lib/:$LD_LIBRARY_PATH -RUN mamba install -y -c conda-forge python=3.8 gdal=3.3.3 pip setuptools cython numpy==1.21.5 +RUN mamba install -y -c conda-forge python=3.10 gdal libgdal-netcdf pip setuptools cython numpy RUN python -m pip install --upgrade pip diff --git a/datasets/sentinel-5p/README.md b/datasets/sentinel-5p/README.md index 90565025..803f3a92 100644 --- a/datasets/sentinel-5p/README.md +++ b/datasets/sentinel-5p/README.md @@ -15,12 +15,13 @@ az acr build -r {the registry} --subscription {the subscription} -t pctasks-sent This collection is updated regularly. ```console -$ pctasks dataset process-items '${{ args.since }}' \ +$ pctasks dataset process-items \ -d datasets/sentinel-5p/dataset.yaml \ -c sentinel-5p-l2-netcdf \ --workflow-id=sentinel-5p-l2-netcdf-update \ - --is-update-workflow \ - --upsert + --is-update-workflow sentinel-5p-l2-netcdf-update \ + -u \ + -y ``` **Notes:** diff --git a/datasets/sentinel-5p/dataset.yaml b/datasets/sentinel-5p/dataset.yaml index dbdeb57e..da6cd42d 100644 --- a/datasets/sentinel-5p/dataset.yaml +++ b/datasets/sentinel-5p/dataset.yaml @@ -1,5 +1,5 @@ id: sentinel_5p -image: ${{ args.registry }}/pctasks-sentinel-5p:20230630.3 +image: ${{ args.registry }}/pctasks-sentinel-5p:20250910.1 args: - registry diff --git a/datasets/sentinel-5p/sentinel_5p.py b/datasets/sentinel-5p/sentinel_5p.py index a155f76b..94d9c1f2 100644 --- a/datasets/sentinel-5p/sentinel_5p.py +++ b/datasets/sentinel-5p/sentinel_5p.py @@ -23,7 +23,6 @@ class Sentinel5pNetCDFCollection(Collection): def create_item( cls, asset_uri: str, storage_factory: StorageFactory ) -> Union[List[pystac.Item], WaitTaskResult]: - storage, nc_path = storage_factory.get_storage_for_file(asset_uri) with TemporaryDirectory() as tmp_dir: diff --git a/pctasks/dataset/pctasks/dataset/items/task.py b/pctasks/dataset/pctasks/dataset/items/task.py index e719cc9e..9b4bd14a 100644 --- a/pctasks/dataset/pctasks/dataset/items/task.py +++ b/pctasks/dataset/pctasks/dataset/items/task.py @@ -178,21 +178,22 @@ def create_items( try: with traced_create_item(args.asset_uri, args.collection_id): result = self._create_item(args.asset_uri, storage_factory) + if isinstance(result, WaitTaskResult): + return result + elif result is None: + logger.warning(f"No items created from {args.asset_uri}") + else: + results.extend( + validate_create_items_result( + result, + collection_id=args.collection_id, + skip_validation=args.options.skip_validation, + ) + ) except Exception as e: - raise CreateItemsError( - f"Failed to create item from {args.asset_uri}" - ) from e - if isinstance(result, WaitTaskResult): - return result - elif result is None: - logger.warning(f"No items created from {args.asset_uri}") - else: - results.extend( - validate_create_items_result( - result, - collection_id=args.collection_id, - skip_validation=args.options.skip_validation, - ) + tb_str = traceback.format_exc() + logger.error( + f"Failed to create item from {args.asset_uri}: {type(e).__name__}: {str(e)}\n{tb_str}" # noqa: E501 ) elif args.asset_chunk_info: chunk_storage, chunk_path = storage_factory.get_storage_for_file( @@ -208,24 +209,25 @@ def create_items( asset_uri, args.collection_id, i=i, asset_count=asset_count ): result = self._create_item(asset_uri, storage_factory) + if isinstance(result, WaitTaskResult): + return result + else: + if not result: + logger.warning(f"No items created from {asset_uri}") + else: + results.extend( + validate_create_items_result( + result, + collection_id=args.collection_id, + skip_validation=args.options.skip_validation, + ) + ) except Exception as e: tb_str = traceback.format_exc() logger.error( f"Failed to create item from {asset_uri}: {type(e).__name__}: {str(e)}\n{tb_str}" # noqa: E501 ) - if isinstance(result, WaitTaskResult): - return result - else: - if not result: - logger.warning(f"No items created from {asset_uri}") - else: - results.extend( - validate_create_items_result( - result, - collection_id=args.collection_id, - skip_validation=args.options.skip_validation, - ) - ) + continue else: # Should be prevented by validator diff --git a/pctasks/ingest_task/pctasks/ingest_task/pgstac.py b/pctasks/ingest_task/pctasks/ingest_task/pgstac.py index 09dcfe0b..b48f7762 100644 --- a/pctasks/ingest_task/pctasks/ingest_task/pgstac.py +++ b/pctasks/ingest_task/pctasks/ingest_task/pgstac.py @@ -54,16 +54,19 @@ def ingest_items( mode: Methods = Methods.upsert, insert_group_size: Optional[int] = None, ) -> None: + all_unique_items = list( + self.unique_items(items, lambda b: orjson.loads(b)["id"]) + ) if insert_group_size: - groups = grouped(items, insert_group_size) + groups = grouped(all_unique_items, insert_group_size) else: - groups = [items] + groups = [all_unique_items] for i, group in enumerate(groups): logger.info(f" ...Loading group {i + 1}") self._with_connection_retry( lambda: self.loader.load_items( - iter(self.unique_items(group, lambda b: orjson.loads(b)["id"])), + iter(group), insert_mode=mode, ) ) diff --git a/pctasks/ingest_task/tests/test_items.py b/pctasks/ingest_task/tests/test_items.py index 05c8dad2..fbdc7f99 100644 --- a/pctasks/ingest_task/tests/test_items.py +++ b/pctasks/ingest_task/tests/test_items.py @@ -1,9 +1,15 @@ import json import pathlib +from pathlib import Path +from typing import Dict, Generator, List, Optional, Tuple +from unittest.mock import MagicMock, Mock, patch import orjson +import pytest +from pypgstac.load import Methods from pctasks.core.models.task import FailedTaskResult +from pctasks.core.utils import grouped from pctasks.dev.mocks import MockTaskContext from pctasks.ingest.models import ( IngestNdjsonInput, @@ -22,7 +28,43 @@ TEST_DUPE_NDJSON = HERE / "data-files/items/items_dupe.ndjson" -def test_single_item_ingest(): +@pytest.fixture +def pgstac_fixture() -> Generator[Tuple[PgSTAC, Mock, Mock], None, None]: + with ( + patch("pypgstac.db.PgstacDB") as MockPgstacDB, + patch("pypgstac.load.Loader") as MockLoader, + ): + mock_db = MockPgstacDB.return_value + mock_loader = MockLoader.return_value + + pgstac = PgSTAC("postgresql://username:password@database:5432/postgis") + yield (pgstac, mock_db, mock_loader) + + +@pytest.fixture +def dupe_ndjson_lines() -> List[bytes]: + with open(TEST_DUPE_NDJSON, "r") as f: + return [line.strip().encode("utf-8") for line in f.readlines() if line.strip()] + + +@pytest.fixture +def capture_loader_calls() -> Tuple[MagicMock, List[dict]]: + captured_calls = [] + + def mock_load_items( + items_iter: Generator[bytes, None, None], insert_mode: Methods + ) -> None: + items_list = list(items_iter) + captured_calls.append({"items": items_list, "mode": insert_mode}) + return None + + mock_loader = MagicMock() + mock_loader.load_items.side_effect = mock_load_items + + return mock_loader, captured_calls + + +def test_single_item_ingest() -> None: """Test ingesting Items through the ingest task logic.""" task_context = MockTaskContext.default() @@ -47,7 +89,7 @@ def test_single_item_ingest(): assert not isinstance(result, FailedTaskResult) -def test_ndjson_ingest(): +def test_ndjson_ingest() -> None: """Test ingesting Items through the ingest task logic.""" task_context = MockTaskContext.default() @@ -70,7 +112,7 @@ def test_ndjson_ingest(): assert not isinstance(result, FailedTaskResult) -def test_ingest_ndjson_add_service_principal(): +def test_ingest_ndjson_add_service_principal() -> None: result = IngestTaskConfig.from_ndjson( ndjson_data=IngestNdjsonInput(ndjson_folder=NdjsonFolder(uri="test/")), add_service_principal=True, @@ -101,13 +143,13 @@ def test_ingest_ndjson_add_service_principal(): result = IngestTaskConfig.from_ndjson( ndjson_data=IngestNdjsonInput(ndjson_folder=NdjsonFolder(uri="test/")), ) - - assert "AZURE_TENANT_ID" not in result.environment - assert "AZURE_TENANT_ID" not in result.environment + assert result.environment is not None assert "AZURE_TENANT_ID" not in result.environment + assert "AZURE_CLIENT_ID" not in result.environment + assert "AZURE_CLIENT_SECRET" not in result.environment -def test_empty_ndjson_ingest(tmp_path): +def test_empty_ndjson_ingest(tmp_path: Path) -> None: """Test ingesting an empty item collection works.""" task_context = MockTaskContext.default() @@ -133,7 +175,7 @@ def test_empty_ndjson_ingest(tmp_path): assert not isinstance(result, FailedTaskResult) -def test_ingest_dupe_items_ndjson(): +def test_ingest_dupe_items_ndjson() -> None: task_context = MockTaskContext.default() with ingest_test_environment(): @@ -154,17 +196,115 @@ def test_ingest_dupe_items_ndjson(): assert not isinstance(result, FailedTaskResult) -def test_unique_items_deduplication(): - pgstac = PgSTAC("postgresql://dummy:dummy@localhost:5432/dummy") +def test_unique_items_deduplication( + pgstac_fixture: Tuple[PgSTAC, Mock, Mock], + dupe_ndjson_lines: List[bytes], +) -> None: + unique_items = list( + pgstac_fixture[0].unique_items( + dupe_ndjson_lines, lambda b: orjson.loads(b)["id"] + ) + ) - with open(TEST_DUPE_NDJSON, "r") as f: - lines = [line.strip().encode("utf-8") for line in f.readlines() if line.strip()] + assert len(dupe_ndjson_lines) == 5 + assert len(unique_items) == 3 - unique_items = list(pgstac.unique_items(lines, lambda b: orjson.loads(b)["id"])) + unique_ids = [orjson.loads(item)["id"] for item in unique_items] + assert len(unique_ids) == 3 + assert unique_ids == ["item1", "item2", "item3"] - assert len(lines) == 5 - assert len(unique_items) == 3 +def test_unique_items_grouped_deduplication( + pgstac_fixture: Tuple[PgSTAC, Mock, Mock], + dupe_ndjson_lines: List[bytes], +) -> None: + unique_items = list( + pgstac_fixture[0].unique_items( + dupe_ndjson_lines, lambda b: orjson.loads(b)["id"] + ) + ) unique_ids = [orjson.loads(item)["id"] for item in unique_items] - assert len(set(unique_ids)) == 3 - assert set(unique_ids) == {"item1", "item2", "item3"} + + assert len(dupe_ndjson_lines) == 5 + assert len(unique_ids) == 3 + + groups = grouped(unique_items, size=3) + + seen_items: List[str] = [] + for group in groups: + assert group + seen_items.extend(orjson.loads(item)["id"] for item in group) + + assert len(seen_items) == 3 + assert unique_ids == seen_items + + +@pytest.mark.parametrize( + "insert_group_size, mode", [(None, Methods.upsert), (2, Methods.insert)] +) +def test_ingest_items_deduplication_and_grouping( + pgstac_fixture: Tuple[PgSTAC, Mock, Mock], + dupe_ndjson_lines: List[bytes], + insert_group_size: Optional[int], + mode: Methods, +) -> None: + captured_groups: List[Dict[str, List[bytes]]] = [] + modes_passed: List[Methods] = [] + + # gets the groups from the actual code path to evaluate without side effects + def mock_load_items( + items_iter: Generator[bytes, None, None], insert_mode: Methods + ) -> None: + items_list = list(items_iter) + captured_groups.append({"items": items_list}) + modes_passed.append(insert_mode) + return None + + pgstac_fixture[2].load_items = mock_load_items + pgstac_fixture[2].format_item = lambda item: item + + with patch.object(pgstac_fixture[0], "loader", pgstac_fixture[2]): + pgstac_fixture[0].ingest_items( + dupe_ndjson_lines, mode=Methods.upsert, insert_group_size=insert_group_size + ) + + expected_group_count = 1 if insert_group_size is None else insert_group_size + + assert ( + len(captured_groups) == expected_group_count + ), f"Expected {expected_group_count} groups" + + all_items = [] + for group in captured_groups: + all_items.extend(group["items"]) + + assert len(all_items) == 3, "Should have 3 unique items after deduplication" + + all_ids = [orjson.loads(item)["id"] for item in all_items] + assert len(all_ids) == len(set(all_ids)), "No duplicates across groups" + assert set(all_ids) == {"item1", "item2", "item3"}, "All items correctly included" + + +@pytest.mark.parametrize("mode", [Methods.upsert, Methods.insert]) +def test_ingest_items_with_different_modes( + pgstac_fixture: Tuple[PgSTAC, Mock, Mock], + dupe_ndjson_lines: List[bytes], + mode: Methods, +) -> None: + modes_passed = [] + + def mock_load_items( + items_iter: Generator[bytes, None, None], insert_mode: Methods + ) -> None: + modes_passed.append(insert_mode) + list(items_iter) + return None + + pgstac_fixture[2].load_items = mock_load_items + pgstac_fixture[2].format_item = lambda item: item + + with patch.object(pgstac_fixture[0], "loader", pgstac_fixture[2]): + pgstac_fixture[0].ingest_items(dupe_ndjson_lines, mode=mode) + + assert len(modes_passed) == 1, "load_items should be called once" + assert modes_passed[0] == mode, f"Mode should be {mode}"