Skip to content

[25.0] Fix deferred datasets in multiple dataset parameters. #20650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: release_25.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions lib/galaxy/job_execution/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,25 @@
ABCMeta,
abstractmethod,
)
from typing import Union
from typing import (
List,
Union,
)

from galaxy.model import (
DatasetCollectionElement,
DatasetInstance,
HistoryDatasetCollectionAssociation,
)

DeferrableObjectsT = Union[DatasetInstance, HistoryDatasetCollectionAssociation, DatasetCollectionElement]
DeferrableObjectsT = Union[
DatasetInstance,
HistoryDatasetCollectionAssociation,
DatasetCollectionElement,
List[DatasetInstance],
List[Union[HistoryDatasetCollectionAssociation, DatasetCollectionElement]],
List[Union[DatasetInstance, HistoryDatasetCollectionAssociation, DatasetCollectionElement]],
]


def dataset_path_rewrites(dataset_paths):
Expand Down
51 changes: 47 additions & 4 deletions lib/galaxy/tools/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
List,
Optional,
TYPE_CHECKING,
Union,
)

from packaging.version import Version
Expand Down Expand Up @@ -287,6 +288,30 @@ def _materialize_objects(
assert isinstance(value, (model.HistoryDatasetAssociation, model.LibraryDatasetDatasetAssociation))
undeferred = dataset_materializer.ensure_materialized(value)
undeferred_objects[key] = undeferred
elif isinstance(value, list):
undeferred_list: List[
Union[
model.DatasetInstance, model.HistoryDatasetCollectionAssociation, model.DatasetCollectionElement
]
] = []
for potentially_deferred in value:
if isinstance(potentially_deferred, model.DatasetInstance):
if potentially_deferred.state != model.Dataset.states.DEFERRED:
undeferred_list.append(potentially_deferred)
else:
assert isinstance(
potentially_deferred,
(model.HistoryDatasetAssociation, model.LibraryDatasetDatasetAssociation),
)
undeferred = dataset_materializer.ensure_materialized(potentially_deferred)
undeferred_list.append(undeferred)
elif isinstance(
potentially_deferred,
(model.HistoryDatasetCollectionAssociation, model.DatasetCollectionElement),
):
undeferred_collection = materialize_collection_input(potentially_deferred, dataset_materializer)
undeferred_list.append(undeferred_collection)
undeferred_objects[key] = undeferred_list
else:
undeferred_collection = materialize_collection_input(value, dataset_materializer)
undeferred_objects[key] = undeferred_collection
Expand Down Expand Up @@ -348,10 +373,6 @@ def _deferred_objects(
Walk input datasets and collections and find inputs that need to be materialized.
"""
deferred_objects: Dict[str, DeferrableObjectsT] = {}
for key, value in input_datasets.items():
if value is not None and value.state == model.Dataset.states.DEFERRED:
if self._should_materialize_deferred_input(key, value):
deferred_objects[key] = value

def find_deferred_collections(input, value, context, prefixed_name=None, **kwargs):
if (
Expand All @@ -360,6 +381,28 @@ def find_deferred_collections(input, value, context, prefixed_name=None, **kwarg
):
deferred_objects[prefixed_name] = value

def find_deferred_datasets(input, value, context, prefixed_name=None, **kwargs):
if isinstance(input, DataToolParameter):
if isinstance(value, model.DatasetInstance) and value.state == model.Dataset.states.DEFERRED:
deferred_objects[prefixed_name] = value
elif isinstance(value, list):
# handle single list reduction as a collection input
if (
value
and len(value) == 1
and isinstance(
value[0], (model.HistoryDatasetCollectionAssociation, model.DatasetCollectionElement)
)
):
deferred_objects[prefixed_name] = value
return

for v in value:
if self._should_materialize_deferred_input(prefixed_name, v):
deferred_objects[prefixed_name] = value
break

visit_input_values(self.tool.inputs, incoming, find_deferred_datasets)
visit_input_values(self.tool.inputs, incoming, find_deferred_collections)

return deferred_objects
Expand Down
32 changes: 32 additions & 0 deletions lib/galaxy_test/api/test_tool_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,3 +613,35 @@ def test_null_to_text_tool_with_validation(required_tool: RequiredTool, tool_inp
required_tool.execute.with_inputs(tool_input_format.when.any({})).assert_fails()
required_tool.execute.with_inputs(tool_input_format.when.any({"parameter": None})).assert_fails()
required_tool.execute.with_inputs(tool_input_format.when.any({"parameter": ""})).assert_fails()


@requires_tool_id("cat|cat1")
def test_deferred_basic(required_tool: RequiredTool, target_history: TargetHistory):
has_src_dict = target_history.with_deferred_dataset_for_test_file("1.bed", ext="bed")
inputs = {
"input1": has_src_dict.src_dict,
}
output = required_tool.execute.with_inputs(inputs).assert_has_single_job.with_single_output
output.assert_contains("chr1 147962192 147962580 CCDS989.1_cds_0_0_chr1_147962193_r 0 -")


@requires_tool_id("metadata_bam")
def test_deferred_with_metadata_options_filter(required_tool: RequiredTool, target_history: TargetHistory):
has_src_dict = target_history.with_deferred_dataset_for_test_file("1.bam", ext="bam")
inputs = {
"input_bam": has_src_dict.src_dict,
"ref_names": "chrM",
}
required_tool.execute.with_inputs(inputs).assert_has_single_job.with_single_output.with_contents_stripped("chrM")


@requires_tool_id("cat_list")
def test_deferred_multi_input(required_tool: RequiredTool, target_history: TargetHistory):
has_src_dict_bed = target_history.with_deferred_dataset_for_test_file("1.bed", ext="bed")
has_src_dict_txt = target_history.with_deferred_dataset_for_test_file("1.txt", ext="txt")
inputs = {
"input1": [has_src_dict_bed.src_dict, has_src_dict_txt.src_dict],
}
output = required_tool.execute.with_inputs(inputs).assert_has_single_job.with_single_output
output.assert_contains("chr1 147962192 147962580 CCDS989.1_cds_0_0_chr1_147962193_r 0 -")
output.assert_contains("chr1 4225 19670")
32 changes: 0 additions & 32 deletions lib/galaxy_test/api/test_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2826,38 +2826,6 @@ def test_group_tag_selection_multiple(self, history_id):
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
assert output_content.strip() == "123\n456\n456\n0ab"

@skip_without_tool("cat1")
def test_run_deferred_dataset(self, history_id):
details = self.dataset_populator.create_deferred_hda(
history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed", ext="bed"
)
inputs = {
"input1": dataset_to_param(details),
}
outputs = self._cat1_outputs(history_id, inputs=inputs)
output = outputs[0]
details = self.dataset_populator.get_history_dataset_details(
history_id, dataset=output, wait=True, assert_ok=True
)
assert details["state"] == "ok"
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
assert output_content.startswith("chr1 147962192 147962580 CCDS989.1_cds_0_0_chr1_147962193_r 0 -")

@skip_without_tool("metadata_bam")
def test_run_deferred_dataset_with_metadata_options_filter(self, history_id):
details = self.dataset_populator.create_deferred_hda(
history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bam", ext="bam"
)
inputs = {"input_bam": dataset_to_param(details), "ref_names": "chrM"}
run_response = self.dataset_populator.run_tool(tool_id="metadata_bam", inputs=inputs, history_id=history_id)
output = run_response["outputs"][0]
output_details = self.dataset_populator.get_history_dataset_details(
history_id, dataset=output, wait=True, assert_ok=True
)
assert output_details["state"] == "ok"
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
assert output_content.startswith("chrM")

@skip_without_tool("pileup")
def test_metadata_validator_on_deferred_input(self, history_id):
deferred_bam_details = self.dataset_populator.create_deferred_hda(
Expand Down
25 changes: 25 additions & 0 deletions lib/galaxy_test/base/populators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4137,6 +4137,31 @@ def with_dataset(
)
return HasSrcDict("hda", new_dataset)

def with_deferred_dataset(
self,
uri: str,
named: Optional[str] = None,
ext: Optional[str] = None,
) -> "HasSrcDict":
kwd = {}
if named is not None:
kwd["name"] = named
new_dataset = self._dataset_populator.create_deferred_hda(
history_id=self._history_id,
uri=uri,
ext=ext,
)
return HasSrcDict("hda", new_dataset)

def with_deferred_dataset_for_test_file(
self,
filename: str,
named: Optional[str] = None,
ext: Optional[str] = None,
) -> "HasSrcDict":
base64_url = self._dataset_populator.base64_url_for_test_file(filename)
return self.with_deferred_dataset(base64_url, named=named, ext=ext)

def with_unpaired(self) -> "HasSrcDict":
return self._fetch_response(
self._dataset_collection_populator.create_unpaired_in_history(self._history_id, wait=True)
Expand Down
Loading