Skip to content

Commit 3eb9388

Browse files
authored
Adding a metadata flag for signaling successful ingestion (#140)
* Adding a metadata flag for signaling successful ingestion * Removing the valid metadata KV on each run before termination
1 parent b0be051 commit 3eb9388

File tree

3 files changed

+67
-0
lines changed

3 files changed

+67
-0
lines changed

tests/integration/converters/test_ome_tiff.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,3 +297,28 @@ def test_ome_tiff_converter_different_compressors(
297297
with open_bioimg(str(tmp_path / "l_1.tdb")) as A:
298298
assert len(A.schema.attr(0).filters) == 1
299299
assert isinstance(A.schema.attr(0).filters[0], type(single_compressor))
300+
301+
302+
@pytest.mark.parametrize(
303+
"filename", ["CMU-1-Small-Region.ome.tiff", "CMU-1-Small-Region-rgb.ome.tiff"]
304+
)
305+
def test_valid_ingestion_check(tmp_path, filename):
306+
input_path = str(get_path(filename))
307+
output_path = str(tmp_path)
308+
OMETiffConverter.to_tiledb(input_path, output_path, compressor=None)
309+
310+
with tiledb.Group(output_path, "r") as grp:
311+
# Check that ingestion was successful
312+
assert grp.meta.get("valid")
313+
314+
# Manually change the valid KV simulating ingestion was not completed
315+
with tiledb.Group(output_path, "w") as grp:
316+
grp.meta.update(valid=False)
317+
318+
# Initialize ingestion again - Simulating a retry on the same destination path
319+
OMETiffConverter.to_tiledb(input_path, output_path, compressor=None)
320+
321+
# Check that ingestion was successful after the group was overwritten
322+
with tiledb.Group(output_path, "r") as grp:
323+
# Check that ingestion was successful
324+
assert grp.meta.get("valid")

tiledb/bioimg/converters/base.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,26 @@ def to_tiledb(
410410
metadata = {}
411411
original_metadata = {}
412412

413+
# Checks group validity
414+
overwrite = False
415+
with rw_group:
416+
if not rw_group.valid_group:
417+
logger.debug(
418+
f"Already existing on disk Group at {output_path} evaluated as corrupted."
419+
)
420+
# Overwrite destination group
421+
rw_group.m_group.delete(recursive=True)
422+
overwrite = True
423+
else:
424+
# If group is valid we remove the KV and we will re-introduce it
425+
# after the current ingestion process as the final step
426+
del rw_group.w_group.meta["valid"]
427+
428+
if overwrite:
429+
# Re-initializes the destination Group for re-ingestion
430+
logger.debug(f"Group {output_path} will be overwritten.")
431+
rw_group = ReadWriteGroup(output_path, ctx=reader.dest_ctx)
432+
413433
with rw_group, reader:
414434
# Update MIME type
415435
rw_group.w_group.meta.update(dataset_type=DATASET_TYPE)
@@ -541,6 +561,11 @@ def to_tiledb(
541561
metadata=jsonpickle.encode(metadata, unpicklable=False),
542562
original_metadata=jsonpickle.encode(original_metadata),
543563
)
564+
565+
# This is the last step of the ingestion unless the following metadata KV
566+
# is not stored then we consider the ingested asset as corrupted
567+
with rw_group:
568+
rw_group.w_group.meta.update(valid=True)
544569
return cls
545570

546571

tiledb/bioimg/helpers.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,29 @@ def __init__(self, uri: str, ctx: Optional[Ctx] = None):
4141
# Windows paths produce single letter scheme matching the drive letter
4242
# Unix absolute path produce an empty scheme
4343
self._ctx = ctx
44+
self._group_exists = False
45+
46+
# At construction time we consider it as valid, this value will be valuated upon
47+
# opening the Group under a context
48+
self._valid_group = True
4449
if len(parsed_uri.scheme) < 2 or parsed_uri.scheme == "file":
4550
uri = str(Path(parsed_uri.path).resolve()).replace("\\", "/")
4651
if tiledb.object_type(uri, ctx=ctx) != "group":
4752
tiledb.group_create(uri, ctx=ctx)
53+
else:
54+
# The destination path is already a group thus we check the validity of
55+
# a previously successful ingestion by checking the existence of
56+
# the ACK metadata KV
57+
self._group_exists = True
4858
self._uri = uri if uri.endswith("/") else uri + "/"
4959
self._is_cloud = parsed_uri.scheme == "tiledb"
5060

5161
def __enter__(self) -> ReadWriteGroup:
5262
self.r_group = tiledb.Group(self._uri, "r", ctx=self._ctx)
5363
self.w_group = tiledb.Group(self._uri, "w", ctx=self._ctx)
64+
self.m_group = tiledb.Group(self._uri, "m", ctx=self._ctx)
65+
if self._group_exists:
66+
self._valid_group = self.r_group.meta.get("valid", False)
5467
return self
5568

5669
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
@@ -93,6 +106,10 @@ def get_or_create(self, name: str, schema: tiledb.ArraySchema) -> Tuple[str, boo
93106
self.w_group.add(name, name, relative=True)
94107
return uri, create
95108

109+
@property
110+
def valid_group(self) -> bool:
111+
return self._valid_group
112+
96113

97114
def validate_ingestion(uri: str, ctx: tiledb.Ctx = None) -> bool:
98115
"""

0 commit comments

Comments
 (0)