diff --git a/dev-requirements.txt b/dev-requirements.txt index 5351cb14..1a36a4e5 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -21,7 +21,7 @@ flake8 mypy<=1.0.1 # https://github.com/pydantic/pydantic/issues/5192 types-paramiko -types-pkg-resources +types-setuptools types-PyYAML types-pycurl types-requests diff --git a/docker/coexecutor/Dockerfile b/docker/coexecutor/Dockerfile index e94d9a49..9d3a5374 100644 --- a/docker/coexecutor/Dockerfile +++ b/docker/coexecutor/Dockerfile @@ -1,26 +1,64 @@ -FROM conda/miniconda3 +# use the root of the repository as context, i.e. `docker build . -f ./docker/coexecutor/Dockerfile` + +FROM python:3.12-bookworm as build_wheel + +ENV PIP_ROOT_USER_ACTION=ignore + +WORKDIR /build + +# install requirements +COPY requirements.txt . +COPY dev-requirements.txt . +RUN pip install --no-cache-dir --upgrade pip \ + && pip install --no-cache-dir setuptools -r requirements.txt -r dev-requirements.txt + +# build Pulsar wheel +COPY . . +RUN python setup.py sdist bdist_wheel + + +FROM python:3.12-bookworm ENV PYTHONUNBUFFERED 1 +ENV PIP_ROOT_USER_ACTION=ignore ENV DEBIAN_FRONTEND noninteractive ENV PULSAR_CONFIG_CONDA_PREFIX /usr/local +# set up Galaxy Depot repository (provides SLURM DRMAA packages for Debian Buster and newer releases) +RUN apt-get update \ + && apt-get install -y --no-install-recommends ca-certificates curl gnupg \ + && apt-get clean && rm -rf /var/lib/apt/lists/* \ + && curl -fsSL "http://keyserver.ubuntu.com/pks/lookup?op=get&search=0x18381AC8832160AF" | gpg --dearmor -o /etc/apt/trusted.gpg.d/galaxy-depot.gpg \ + && echo "deb https://depot.galaxyproject.org/apt/ $(bash -c '. /etc/os-release; echo ${VERSION_CODENAME:-bookworm}') main" | tee /etc/apt/sources.list.d/galaxy-depot.list + +# set up Debian Bullseye repository (and use it only for libslurm36, needed by slurm-drmaa1, and slurm) +RUN echo "deb http://deb.debian.org/debian/ bullseye main" > /etc/apt/sources.list.d/bullseye.list && \ +cat < /etc/apt/preferences.d/bullseye.pref +Package: * +Pin: release n=bullseye +Pin-Priority: -1 + +Package: libslurm36, slurm +Pin: release n=bullseye +Pin-Priority: 100 +EOF + +# set up CVMFS repository +RUN apt-get update \ + && apt-get install -y --no-install-recommends lsb-release wget \ + && wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \ + && dpkg -i cvmfs-release-latest_all.deb && rm -f cvmfs-release-latest_all.deb + # wget, gcc, pip - to build and install Pulsar. # bzip2 for Miniconda. # TODO: pycurl stuff... -RUN apt-get update \ - && apt-get install -y --no-install-recommends apt-transport-https \ +RUN apt-get update && apt-get install -y --no-install-recommends \ # Install CVMFS client - && apt-get install -y --no-install-recommends lsb-release wget \ - && wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \ - && dpkg -i cvmfs-release-latest_all.deb \ - && rm -f cvmfs-release-latest_all.deb \ + cvmfs cvmfs-config-default \ # Install packages - && apt-get update \ - && apt-get install -y --no-install-recommends gcc \ - libcurl4-openssl-dev \ - cvmfs cvmfs-config-default \ - slurm-llnl slurm-drmaa-dev \ - bzip2 \ + gcc libcurl4-openssl-dev \ + munge libmunge-dev slurm slurm-drmaa-dev \ + bzip2 \ # Install Pulsar Python requirements && pip install --no-cache-dir -U pip \ && pip install --no-cache-dir drmaa wheel kombu pykube pycurl \ @@ -28,14 +66,14 @@ RUN apt-get update \ # Remove build deps and cleanup && apt-get -y remove gcc wget lsb-release \ && apt-get -y autoremove \ - && apt-get autoclean \ - && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log \ - && /usr/sbin/create-munge-key + && apt-get clean && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log + +COPY --from=build_wheel /build/dist/pulsar_app-*-py2.py3-none-any.whl / -ADD pulsar_app-*-py2.py3-none-any.whl /pulsar_app-*-py2.py3-none-any.whl +SHELL ["/bin/bash", "-c"] -RUN pip install --upgrade setuptools && pip install pyOpenSSL --upgrade && pip install cryptography --upgrade -RUN pip install --no-cache-dir /pulsar_app-*-py2.py3-none-any.whl[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl +RUN pip install --no-cache-dir --upgrade setuptools pyOpenSSL cryptography +RUN pip install --no-cache-dir "$(echo /pulsar_app-*-py2.py3-none-any.whl)"[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl RUN pip install --upgrade 'importlib-metadata<5.0' RUN _pulsar-configure-galaxy-cvmfs RUN _pulsar-conda-init --conda_prefix=/pulsar_dependencies/conda diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index ce2f54ec..10fdc186 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -21,6 +21,7 @@ Any, Dict, List, + Optional, Type, ) from urllib.parse import urlencode @@ -190,6 +191,9 @@ def __init__(self, client=None, config=None): self.ssh_port = config.get("ssh_port", None) self.mappers = mappers_from_dicts(config.get("paths", [])) self.files_endpoint = config.get("files_endpoint", None) + self.actions = [] + # Might want to make the working directory available here so that we know where to place archive + # for archive action def action(self, source, type, mapper=None): path = source.get("path", None) @@ -200,10 +204,14 @@ def action(self, source, type, mapper=None): if mapper: file_lister = mapper.file_lister action_kwds = mapper.action_kwds - action = action_class(source, file_lister=file_lister, **action_kwds) + action = action_class(source, file_lister=file_lister, file_type=type, **action_kwds) self.__process_action(action, type) + self.actions.append(action) return action + def finalize(self): + return [_ for _ in (action.finalize() for action in self.actions) if _] + def unstructured_mappers(self): """ Return mappers that will map 'unstructured' files (i.e. go beyond mapping inputs, outputs, and config files). @@ -265,6 +273,7 @@ def __process_action(self, action, file_type): """ Extension point to populate extra action information after an action has been created. """ + action.file_type = file_type if getattr(action, "inject_url", False): self.__inject_url(action, file_type) if getattr(action, "inject_ssh_properties", False): @@ -300,10 +309,12 @@ class BaseAction: whole_directory_transfer_supported = False action_spec: Dict[str, Any] = {} action_type: str + file_type: Optional[str] = None - def __init__(self, source, file_lister=None): + def __init__(self, source, file_lister=None, file_type=None): self.source = source self.file_lister = file_lister or DEFAULT_FILE_LISTER + self.file_type = file_type @property def path(self): @@ -342,6 +353,9 @@ def _extend_base_dict(self, **kwds): base_dict.update(**kwds) return base_dict + def finalize(self): + pass + def to_dict(self): return self._extend_base_dict() @@ -390,8 +404,8 @@ class RewriteAction(BaseAction): action_type = "rewrite" staging = STAGING_ACTION_NONE - def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None): - super().__init__(source, file_lister=file_lister) + def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None, file_type=None): + super().__init__(source, file_lister=file_lister, file_type=file_type) self.source_directory = source_directory self.destination_directory = destination_directory @@ -467,8 +481,8 @@ class RemoteTransferAction(BaseAction): action_type = "remote_transfer" staging = STAGING_ACTION_REMOTE - def __init__(self, source, file_lister=None, url=None): - super().__init__(source, file_lister=file_lister) + def __init__(self, source, file_lister=None, url=None, file_type=None): + super().__init__(source, file_lister=file_lister, file_type=file_type) self.url = url def to_dict(self): @@ -495,8 +509,8 @@ class RemoteTransferTusAction(BaseAction): action_type = "remote_transfer_tus" staging = STAGING_ACTION_REMOTE - def __init__(self, source, file_lister=None, url=None): - super().__init__(source, file_lister=file_lister) + def __init__(self, source, file_lister=None, url=None, file_type=None): + super().__init__(source, file_lister=file_lister, file_type=file_type) self.url = url def to_dict(self): @@ -513,6 +527,42 @@ def write_from_path(self, pulsar_path): tus_upload_file(self.url, pulsar_path) +class JsonTransferAction(BaseAction): + """ + This action indicates that the pulsar server should create a JSON manifest that can be used to stage files by an + external system that can stage files in and out of the compute environment. + """ + inject_url = True + whole_directory_transfer_supported = True + action_type = "json_transfer" + staging = STAGING_ACTION_REMOTE + + def __init__(self, source, file_lister=None, url=None, file_type=None): + super().__init__(source, file_lister, file_type) + self.url = url + self._from_path = None + self._to_path = None + + @classmethod + def from_dict(cls, action_dict): + return JsonTransferAction(source=action_dict["source"], url=action_dict["url"]) + + def to_dict(self): + return self._extend_base_dict(url=self.url) + + def write_to_path(self, path): + self._to_path = path + + def write_from_path(self, pulsar_path: str): + self._from_path = pulsar_path + + def finalize(self): + if self._to_path: + return {"url": self.url, "to_path": self._to_path} + else: + return {"url": self.url, "from_path": self._from_path} + + class RemoteObjectStoreCopyAction(BaseAction): """ """ @@ -664,6 +714,7 @@ def write_to_path(self, path): DICTIFIABLE_ACTION_CLASSES = [ + JsonTransferAction, RemoteCopyAction, RemoteTransferAction, RemoteTransferTusAction, @@ -844,6 +895,7 @@ def unstructured_map(self, path): ACTION_CLASSES: List[Type[BaseAction]] = [ NoneAction, + JsonTransferAction, RewriteAction, TransferAction, CopyAction, diff --git a/pulsar/client/client.py b/pulsar/client/client.py index e28d84ea..1f9b868e 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -1,6 +1,9 @@ import logging import os +import tempfile from enum import Enum +from pathlib import Path +from textwrap import dedent from typing import ( Any, Callable, @@ -10,7 +13,14 @@ Optional, ) from typing_extensions import Protocol +from xml.etree.ElementTree import ( + Element, + fromstring, + SubElement, + tostring, +) +from pulsar.managers.util.arc import ensure_pyarc, pyarcrest, arc_state_to_pulsar_state from pulsar.managers.util.tes import ( ensure_tes_client, TesClient, @@ -69,7 +79,7 @@ echo 'ran script'""" -PULSAR_CONTAINER_IMAGE = "galaxy/pulsar-pod-staging:0.15.0.0" +PULSAR_CONTAINER_IMAGE = "galaxy/pulsar-pod-staging:0.16.0" CONTAINER_STAGING_DIRECTORY = "/pulsar_staging/" @@ -168,7 +178,7 @@ def __init__(self, destination_params, job_id, job_manager_interface): self.job_manager_interface = job_manager_interface def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, - dynamic_file_sources=None, token_endpoint=None): + dynamic_file_sources=None, token_endpoint=None, staging_manifest=None): """ Queue up the execution of the supplied `command_line` on the remote server. Called launch for historical reasons, should be renamed to @@ -374,6 +384,90 @@ def _build_setup_message(self, command_line, dependencies_description, env, remo launch_params["setup_params"] = setup_params return launch_params + def get_pulsar_app_config( + self, + pulsar_app_config, + container, + wait_after_submission, + manager_name, + manager_type, + dependencies_description, + ): + + pulsar_app_config = pulsar_app_config or {} + manager_config = self._ensure_manager_config( + pulsar_app_config, + manager_name, + manager_type, + ) + + if ( + "staging_directory" not in manager_config and "staging_directory" not in pulsar_app_config + ): + pulsar_app_config["staging_directory"] = CONTAINER_STAGING_DIRECTORY + + if self.amqp_key_prefix: + pulsar_app_config["amqp_key_prefix"] = self.amqp_key_prefix + + if "monitor" not in manager_config: + manager_config["monitor"] = ( + MonitorStyle.BACKGROUND.value + if wait_after_submission + else MonitorStyle.NONE.value + ) + if "persistence_directory" not in pulsar_app_config: + pulsar_app_config["persistence_directory"] = os.path.join( + CONTAINER_STAGING_DIRECTORY, "persisted_data" + ) + elif "manager" in pulsar_app_config and manager_name != "_default_": + log.warning( + "'manager' set in app config but client has non-default manager '%s', this will cause communication" + " failures, remove `manager` from app or client config to fix", + manager_name, + ) + + using_dependencies = container is None and dependencies_description is not None + if using_dependencies and "dependency_resolution" not in pulsar_app_config: + # Setup default dependency resolution for container above... + dependency_resolution = { + "cache": False, + "use": True, + "default_base_path": "/pulsar_dependencies", + "cache_dir": "/pulsar_dependencies/_cache", + "resolvers": [ + { # TODO: add CVMFS resolution... + "type": "conda", + "auto_init": True, + "auto_install": True, + "prefix": "/pulsar_dependencies/conda", + }, + { + "type": "conda", + "auto_init": True, + "auto_install": True, + "prefix": "/pulsar_dependencies/conda", + "versionless": True, + }, + ], + } + pulsar_app_config["dependency_resolution"] = dependency_resolution + return pulsar_app_config + + def _ensure_manager_config(self, pulsar_app_config, manager_name, manager_type): + if "manager" in pulsar_app_config: + manager_config = pulsar_app_config["manager"] + elif "managers" in pulsar_app_config: + managers_config = pulsar_app_config["managers"] + if manager_name not in managers_config: + managers_config[manager_name] = {} + manager_config = managers_config[manager_name] + else: + manager_config = {} + pulsar_app_config["manager"] = manager_config + if "type" not in manager_config: + manager_config["type"] = manager_type + return manager_config + class MessagingClientManagerProtocol(ClientManagerProtocol): status_cache: Dict[str, Dict[str, Any]] @@ -405,7 +499,7 @@ def _build_status_request_message(self): class MessageJobClient(BaseMessageJobClient): def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, - dynamic_file_sources=None, token_endpoint=None): + dynamic_file_sources=None, token_endpoint=None, staging_manifest=None): """ """ launch_params = self._build_setup_message( @@ -439,7 +533,7 @@ def __init__(self, destination_params, job_id, client_manager, shell): self.shell = shell def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, - dynamic_file_sources=None, token_endpoint=None): + dynamic_file_sources=None, token_endpoint=None, staging_manifest=None): """ """ launch_params = self._build_setup_message( @@ -477,6 +571,295 @@ class ExecutionType(str, Enum): PARALLEL = "parallel" +class ARCLaunchMixin(BaseRemoteConfiguredJobClient): + """Execute containers sequentially using ARC.""" + + ensure_library_available = ensure_pyarc + execution_type = ExecutionType.SEQUENTIAL + + def launch( + self, + command_line, + dependencies_description=None, + env=None, + remote_staging=None, + job_config=None, + dynamic_file_sources=None, + container_info=None, + token_endpoint=None, + pulsar_app_config=None, + staging_manifest=None, + ) -> Optional[ExternalId]: + container = container_info["container_id"] if container_info else None + guest_ports = [int(p) for p in container_info["guest_ports"]] if container_info else None + + # prepare auxiliary artifacts + auxiliary_artifacts_directory = Path(tempfile.mkdtemp(prefix="pulsar_arc_")) + input_manifest_path = auxiliary_artifacts_directory / "input_manifest.json" + staging_config_path = auxiliary_artifacts_directory / "staging_config.json" + with open(input_manifest_path, "w") as input_manifest_fh: + input_manifest_fh.write(json_dumps(staging_manifest)) + with open(staging_config_path, mode="w") as staging_config_fh: + staging_config_fh.write(json_dumps(remote_staging)) + + # build Pulsar submit command + launch_params = self._build_setup_message( + command_line, + dependencies_description=dependencies_description, + env=env, + remote_staging=remote_staging, + job_config=job_config, + dynamic_file_sources=dynamic_file_sources, + token_endpoint=token_endpoint, + ) + pulsar_app_config = self.get_pulsar_app_config( + pulsar_app_config=pulsar_app_config, + container=container, + wait_after_submission=False, + manager_name="_default_", + manager_type="unqueued", + dependencies_description=dependencies_description, + ) + base64_message = to_base64_json(launch_params) + base64_app_conf = to_base64_json(pulsar_app_config) + pulsar_submit = CoexecutionContainerCommand( + self.pulsar_container_image, + "pulsar-submit", + [ + "--base64", + base64_message, + "--app_conf_base64", + base64_app_conf, + "--no_wait", + ], + job_config["job_directory"], + None, + ) + + # build tool command + tool_container = CoexecutionContainerCommand( + container, + "bash", + [f"{job_config['job_directory']}/command.sh"], + job_config["job_directory"], + guest_ports, + ) + + # build Pulsar output manifest command + output_manifest_path = "output_manifest.json" + output_manifest = CoexecutionContainerCommand( + self.pulsar_container_image, + "pulsar-create-output-manifest", + [ + "--job-directory", + job_config["job_directory"], + "--staging-config-path", + staging_config_path.name, + "--output-manifest-path", + output_manifest_path, + ], + job_config["job_directory"], + ) + + # build ARC job + executable_path = Path(auxiliary_artifacts_directory) / "job.sh" + with open(executable_path, "wb") as executable_fh: + executable: bytes = self._generate_executable( + job_directory=job_config["job_directory"], + output_manifest_path=output_manifest_path, + persistence_directory=pulsar_app_config["persistence_directory"], + staging_directory=str( + Path(pulsar_app_config["staging_directory"]) / Path(job_config["job_directory"]).name + ), + pulsar_submit_command=pulsar_submit, + tool_container_command=tool_container, + pulsar_manifest_command=output_manifest, + ) + executable_fh.write(executable) + job_description: bytes = self._generate_job_description( + job_directory=job_config["job_directory"], + input_manifest=staging_manifest, + executable_path=executable_path, + staging_config_path=Path(staging_config_path), + ) + + # submit arc job + arc_endpoint = self.destination_params["arc_url"] + oidc_token = self.destination_params["oidc_token"] + arc_job_id = self._launch_arc_job( + arc_endpoint, + oidc_token, + job_description.decode("utf-8"), + ) + + return arc_job_id + + @staticmethod + def _generate_executable( + job_directory: str, + output_manifest_path: str, + persistence_directory: str, + staging_directory: str, + pulsar_submit_command: CoexecutionContainerCommand, + tool_container_command: CoexecutionContainerCommand, + pulsar_manifest_command: CoexecutionContainerCommand, + ) -> bytes: + return dedent(f""" + #!/bin/bash + set -e + + # clear SLURM variables (isolate the job's environment from ARC's internal infrastructure) + # https://hpcc.umd.edu/hpcc/help/slurmenv.html + unset SLURM_CPUS_ON_NODE + unset SLURM_CPUS_PER_TASK + unset SLURM_JOB_ID + unset SLURM_JOBID + unset SLURM_JOB_NAME + unset SLURM_JOB_NODELIST + unset SLURM_JOB_NUM_NODES + unset SLURM_LOCALID + unset SLURM_NODEID + unset SLURM_NNODES + unset SLURM_NODELIST + unset SLURM_NTASKS + unset SLURM_SUBMIT_DIR + unset SLURM_SUBMIT_HOST + unset SLURM_TASKS_PER_NODE + + mkdir -p job_directory + mkdir -p persistence_directory + mkdir -p arc_staging_directory # contains symlinks to files staged by ARC + + singularity run \\ + --no-mount bind-paths \\ + --bind /grid:/grid \\ + --bind "job_directory":{staging_directory} \\ + --bind "persistence_directory":{persistence_directory} \\ + --pwd {staging_directory} \\ + https://github.com/kysrpex/galaxyproject-pulsar/releases/download/27ec4e75/galaxyproject-pulsar@27ec4e75.sif \\ + {pulsar_submit_command.command} {" ".join(pulsar_submit_command.args)} + + # copy symlinks to the job directory + cp -an arc_staging_directory/. job_directory + + singularity run \\ + --no-mount bind-paths \\ + --bind /grid:/grid \\ + --bind "job_directory":{job_directory} \\ + --bind "job_directory":{staging_directory} \\ + --bind "persistence_directory":{persistence_directory} \\ + --pwd {tool_container_command.working_directory} \\ + {tool_container_command.image} \\ + {tool_container_command.command} {" ".join(tool_container_command.args)} + + mv staging_config.json job_directory + singularity run \\ + --no-mount bind-paths \\ + --bind /grid:/grid \\ + --bind "job_directory":{job_directory} \\ + --bind "job_directory":{staging_directory} \\ + --bind "persistence_directory":{persistence_directory} \\ + --pwd {tool_container_command.working_directory} \\ + https://github.com/kysrpex/galaxyproject-pulsar/releases/download/27ec4e75/galaxyproject-pulsar@27ec4e75.sif \\ + {pulsar_manifest_command.command} {" ".join(pulsar_manifest_command.args)} + + cat job_directory/{output_manifest_path} | jq -r '.[] | "\(.from_path | sub("^{job_directory}/"; "job_directory/")) \(.url | capture("^(?[^:]+://[^/]+)(?/.*)") | "\(.protocolurlhostport);overwrite=yes\(.fileandmetadataoptions)" )"' > output.files + """).encode("utf-8") + + def _generate_job_description( + self, + job_directory: str, + input_manifest: dict, + executable_path: Path, + staging_config_path: Path, + ) -> bytes: + # job_directory = Path(self.job_directory.job_directory) + # metadata_directory = Path(self.job_directory.metadata_directory) + + activity_description = Element("ActivityDescription") + activity_description.set("xmlns", "http://www.eu-emi.eu/es/2010/12/adl") + activity_description.set("xmlns:emiestypes", "http://www.eu-emi.eu/es/2010/12/types") + activity_description.set("xmlns:nordugrid-adl", "http://www.nordugrid.org/es/2011/12/nordugrid-adl") + + activity_identification = SubElement(activity_description, "ActivityIdentification") + activity_identification_name = SubElement(activity_identification, "Name") + activity_identification_name.text = f"Galaxy job {self.job_id}" + + application = SubElement(activity_description, "Application") + application_executable = SubElement(application, "Executable") + application_executable_path = SubElement(application_executable, "Path") + application_executable_path.text = executable_path.name + application_output = SubElement(application, "Output") + application_output.text = "arc.out" + application_error = SubElement(application, "Error") + application_error.text = "arc.err" + + # resources = SubElement(activity_description, "Resources") + # resources_cpu_time = SubElement(resources, "IndividualCPUTime") + # resources_cpu_time.text = self.cpu_time + # resources_memory = SubElement(resources, "IndividualPhysicalMemory") + # resources_memory.text = self.memory + + data_staging = SubElement(activity_description, "DataStaging") + staging_config = SubElement(data_staging, "InputFile") + staging_config_name = SubElement(staging_config, "Name") + staging_config_name.text = staging_config_path.name + staging_config_uri = SubElement(staging_config, "URI") + staging_config_uri.text = f"file://{staging_config_path.absolute()}" + executable = SubElement(data_staging, "InputFile") + executable_name = SubElement(executable, "Name") + executable_name.text = executable_path.name + executable_uri = SubElement(executable, "URI") + executable_uri.text = f"file://{executable_path.absolute()}" + for input_ in input_manifest: + input_file = SubElement(data_staging, "InputFile") + name = SubElement(input_file, "Name") + name.text = str(Path("arc_staging_directory") / Path(input_["to_path"]).relative_to(job_directory)) + source = SubElement(input_file, "Source") + uri = SubElement(source, "URI") + uri.text = input_["url"] + output_file = SubElement(data_staging, "OutputFile") + output_file_name = SubElement(output_file, "Name") + output_file_name.text = f"@output.files" + + return tostring(activity_description, encoding="UTF-8", method="xml") + + @staticmethod + def _launch_arc_job( + arc_endpoint: str, + oidc_token: str, + job_description: str, + ) -> Optional[ExternalId]: + client = pyarcrest.arc.ARCRest.getClient(url=arc_endpoint, token=oidc_token) + delegation_id = client.createDelegation() + + results = client.createJobs(job_description, delegationID=delegation_id)[0].value + if isinstance(results, Exception): + raise results + arc_job_id, status = results + + job_description_tree_root = fromstring(job_description) + inputs = { + node.find("{http://www.eu-emi.eu/es/2010/12/adl}Name").text: + node.find("{http://www.eu-emi.eu/es/2010/12/adl}URI").text + for node in job_description_tree_root.findall( + "./{http://www.eu-emi.eu/es/2010/12/adl}DataStaging/{http://www.eu-emi.eu/es/2010/12/adl}InputFile" + ) + if node.find("{http://www.eu-emi.eu/es/2010/12/adl}URI") is not None and node.find( + "{http://www.eu-emi.eu/es/2010/12/adl}URI").text.startswith("file://") + } + + upload_errors = client.uploadJobFiles([arc_job_id], [inputs])[0] + if upload_errors: # input upload error + raise Exception("Error uploading job files to ARC") + + return ExternalId(str(arc_job_id)) + + def kill(self): + # take from the original pull request + pass + + class CoexecutionLaunchMixin(BaseRemoteConfiguredJobClient): execution_type: ExecutionType pulsar_container_image: str @@ -491,7 +874,8 @@ def launch( dynamic_file_sources=None, container_info=None, token_endpoint=None, - pulsar_app_config=None + pulsar_app_config=None, + staging_manifest=None, ) -> Optional[ExternalId]: """ """ @@ -513,48 +897,15 @@ def launch( manager_name = self.client_manager.manager_name manager_type = "coexecution" if container is not None else "unqueued" - pulsar_app_config = pulsar_app_config or {} - manager_config = self._ensure_manager_config( - pulsar_app_config, manager_name, manager_type, + pulsar_app_config = self.get_pulsar_app_config( + pulsar_app_config=pulsar_app_config, + container=container, + wait_after_submission=wait_after_submission, + manager_name=manager_name, + manager_type=manager_type, + dependencies_description=dependencies_description, ) - if "staging_directory" not in manager_config and "staging_directory" not in pulsar_app_config: - pulsar_app_config["staging_directory"] = CONTAINER_STAGING_DIRECTORY - - if self.amqp_key_prefix: - pulsar_app_config["amqp_key_prefix"] = self.amqp_key_prefix - - if "monitor" not in manager_config: - manager_config["monitor"] = MonitorStyle.BACKGROUND.value if wait_after_submission else MonitorStyle.NONE.value - if "persistence_directory" not in pulsar_app_config: - pulsar_app_config["persistence_directory"] = os.path.join(CONTAINER_STAGING_DIRECTORY, "persisted_data") - elif "manager" in pulsar_app_config and manager_name != '_default_': - log.warning( - "'manager' set in app config but client has non-default manager '%s', this will cause communication" - " failures, remove `manager` from app or client config to fix", manager_name) - - using_dependencies = container is None and dependencies_description is not None - if using_dependencies and "dependency_resolution" not in pulsar_app_config: - # Setup default dependency resolution for container above... - dependency_resolution = { - "cache": False, - "use": True, - "default_base_path": "/pulsar_dependencies", - "cache_dir": "/pulsar_dependencies/_cache", - "resolvers": [{ # TODO: add CVMFS resolution... - "type": "conda", - "auto_init": True, - "auto_install": True, - "prefix": '/pulsar_dependencies/conda', - }, { - "type": "conda", - "auto_init": True, - "auto_install": True, - "prefix": '/pulsar_dependencies/conda', - "versionless": True, - }] - } - pulsar_app_config["dependency_resolution"] = dependency_resolution base64_message = to_base64_json(launch_params) base64_app_conf = to_base64_json(pulsar_app_config) pulsar_container_image = self.pulsar_container_image @@ -606,21 +957,6 @@ def _pulsar_script_args(self, manager_name, base64_job, base64_app_conf, wait_ar manager_args.extend(["--base64", base64_job, "--app_conf_base64", base64_app_conf]) return manager_args - def _ensure_manager_config(self, pulsar_app_config, manager_name, manager_type): - if "manager" in pulsar_app_config: - manager_config = pulsar_app_config["manager"] - elif "managers" in pulsar_app_config: - managers_config = pulsar_app_config["managers"] - if manager_name not in managers_config: - managers_config[manager_name] = {} - manager_config = managers_config[manager_name] - else: - manager_config = {} - pulsar_app_config["manager"] = manager_config - if "type" not in manager_config: - manager_config["type"] = manager_type - return manager_config - def _launch_containers( self, pulsar_submit_container: CoexecutionContainerCommand, @@ -756,6 +1092,38 @@ def raw_check_complete(self) -> Dict[str, Any]: } +class ARCPollingSequentialJobClient(BasePollingCoexecutionJobClient, ARCLaunchMixin): + """A client that (sequentially) executes containers in ARC and does not depend on AMQP.""" + + def __init__(self, destination_params, job_id, client_manager): + super().__init__(destination_params, job_id, client_manager) + + def get_status(self): + arc_endpoint = self.destination_params["arc_url"] + oidc_token = self.destination_params["oidc_token"] + arc_job_id = self._arc_job_id # injected by `PulsarARCJobRunner` when creating the client + + arc_client = pyarcrest.arc.ARCRest.getClient(url=arc_endpoint, token=oidc_token) + + arc_state = arc_client.getJobsStatus([arc_job_id])[0].value + if isinstance(arc_state, (pyarcrest.errors.ARCHTTPError, pyarcrest.errors.NoValueInARCResult)): + pulsar_state = manager_status.LOST + else: + pulsar_state = arc_state_to_pulsar_state(arc_state) + + return pulsar_state + + def full_status(self): + pulsar_state = self.get_status() + return { + "status": pulsar_state, + "complete": "true" if manager_status.is_job_done(pulsar_state) else "false", + # ancient John, what were you thinking? 👀 + "outputs_directory_contents": [], + # it needs to be defined, otherwise `PulsarOutputs.has_outputs` fails; it is ok that it is empty because + # ARC is responsible for staging the outputs (Galaxy does not have to collect any outputs) + } + class TesPollingCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesTesContainersMixin): """A client that co-executes pods via GA4GH TES and depends on amqp for status updates.""" diff --git a/pulsar/client/manager.py b/pulsar/client/manager.py index 9c429734..1716b173 100644 --- a/pulsar/client/manager.py +++ b/pulsar/client/manager.py @@ -29,6 +29,7 @@ MessageJobClient, TesMessageCoexecutionJobClient, TesPollingCoexecutionJobClient, + ARCPollingSequentialJobClient, ) from .destination import url_to_destination_params from .object_client import ObjectStoreClient @@ -240,6 +241,8 @@ def get_client(self, destination_params, job_id, **kwargs): return K8sMessageCoexecutionJobClient(destination_params, job_id, self) elif destination_params.get("tes_url", False): return TesMessageCoexecutionJobClient(destination_params, job_id, self) + elif destination_params.get("arc_enabled", False): + return ARCPollingSequentialJobClient(destination_params, job_id, self) else: return MessageJobClient(destination_params, job_id, self) @@ -256,6 +259,8 @@ def get_client(self, destination_params, job_id, **kwargs): return K8sPollingCoexecutionJobClient(destination_params, job_id, self) elif destination_params.get("tes_url", False): return TesPollingCoexecutionJobClient(destination_params, job_id, self) + elif destination_params.get("arc_enabled", False): + return ARCPollingSequentialJobClient(destination_params, job_id, self) else: raise Exception("Unknown client configuration") @@ -268,7 +273,7 @@ def build_client_manager(**kwargs: Dict[str, Any]) -> ClientManagerInterface: return ClientManager(**kwargs) # TODO: Consider more separation here. elif kwargs.get('amqp_url', None): return MessageQueueClientManager(**kwargs) - elif kwargs.get("k8s_enabled") or kwargs.get("tes_url"): + elif kwargs.get("k8s_enabled") or kwargs.get("tes_url") or kwargs.get("arc_enabled"): return PollingJobClientManager(**kwargs) else: return ClientManager(**kwargs) diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py index ea8f8aa1..82f9d348 100644 --- a/pulsar/client/staging/down.py +++ b/pulsar/client/staging/down.py @@ -72,6 +72,9 @@ def collect(self): self.__collect_other_working_directory_files() self.__collect_metadata_directory_files() self.__collect_job_directory_files() + # Give actions that require a final action, like those that write a manifest, to write out their content + self.__finalize_action_mapper() + # finalize collection here for executors that need this ? return self.exception_tracker.collection_failure_exceptions def __collect_working_directory_outputs(self): @@ -134,6 +137,9 @@ def __collect_job_directory_files(self): 'output_jobdir', ) + def __finalize_action_mapper(self): + self.action_mapper.finalize() + def __realized_dynamic_file_source_references(self): references = {"filename": [], "extra_files": []} diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index 9f08bef8..3b8f8eaa 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -71,6 +71,19 @@ def submit_job(client, client_job_description, job_config=None): # it needs to be in the response to Pulsar even Pulsar is inititing staging actions launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources launch_kwds["token_endpoint"] = client.token_endpoint + + # populate `to_path` + staging_manifest = [] + for action in file_stager.action_mapper.actions: + if action.file_type not in ("output", "output_workdir"): + name = basename(action.path) + path = file_stager.job_directory.calculate_path(name, action.file_type) + action.write_to_path(path) + staging_manifest.append(action.finalize()) + + if staging_manifest: + launch_kwds["staging_manifest"] = staging_manifest + # for pulsar modalities that skip the explicit "setup" step, give them a chance to set an external # id from the submission process (e.g. to TES). launch_response = client.launch(**launch_kwds) diff --git a/pulsar/client/util.py b/pulsar/client/util.py index e10531ed..09cedb69 100644 --- a/pulsar/client/util.py +++ b/pulsar/client/util.py @@ -60,7 +60,7 @@ def _copy_and_close(object, output): @wraps(_b64encode) def b64encode(val, **kwargs): try: - return _b64encode(val, **kwargs) + return _b64encode(val, **kwargs).decode("utf-8") except TypeError: return _b64encode(val.encode('UTF-8'), **kwargs).decode('UTF-8') diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py index 6ca07e91..b7d323a7 100644 --- a/pulsar/managers/base/__init__.py +++ b/pulsar/managers/base/__init__.py @@ -232,7 +232,7 @@ class JobDirectory(RemoteJobDirectory): def __init__( self, staging_directory, - job_id, + job_id=None, lock_manager=None, directory_maker=None ): @@ -240,7 +240,8 @@ def __init__( self._directory_maker = directory_maker or DirectoryMaker() self.lock_manager = lock_manager # Assert this job id isn't hacking path somehow. - assert job_id == basename(job_id) + if job_id: + assert job_id == basename(job_id) def _job_file(self, name): return os.path.join(self.job_directory, name) diff --git a/pulsar/managers/staging/post.py b/pulsar/managers/staging/post.py index 6ca59390..3c5fdd32 100644 --- a/pulsar/managers/staging/post.py +++ b/pulsar/managers/staging/post.py @@ -20,14 +20,14 @@ def postprocess(job_directory, action_executor, was_cancelled): staging_config = job_directory.load_metadata("launch_config").get("remote_staging", None) else: staging_config = None - collected = __collect_outputs(job_directory, staging_config, action_executor, was_cancelled) + file_action_mapper, collected = _collect_outputs(job_directory, staging_config, action_executor, was_cancelled) return collected finally: job_directory.write_file("postprocessed", "") return False -def __collect_outputs(job_directory, staging_config, action_executor, was_cancelled): +def _collect_outputs(job_directory, staging_config, action_executor, was_cancelled): collected = True if "action_mapper" in staging_config: file_action_mapper = action_mapper.FileActionMapper(config=staging_config["action_mapper"]) @@ -39,7 +39,7 @@ def __collect_outputs(job_directory, staging_config, action_executor, was_cancel if collection_failure_exceptions: log.warn("Failures collecting results %s" % collection_failure_exceptions) collected = False - return collected + return file_action_mapper, collected def realized_dynamic_file_sources(job_directory): diff --git a/pulsar/managers/util/arc.py b/pulsar/managers/util/arc.py new file mode 100644 index 00000000..f2d06c72 --- /dev/null +++ b/pulsar/managers/util/arc.py @@ -0,0 +1,89 @@ +import logging +from enum import Enum +from typing import Optional + +from pulsar.managers import status as state + +try: + import pyarcrest + import pyarcrest.arc +except ImportError: + pyarcrest = None + +__all__ = ("ARCState", "arc_state_to_pulsar_state", "ensure_pyarc", "pyarcrest") + + +log = logging.getLogger(__name__) + +PYARCREST_UNAVAILABLE_MESSAGE = ( + "Pulsar ARC client requires the Python package `pyarcrest` - but it is unavailable. Please install `pyarcrest`." +) + +def ensure_pyarc(): + if pyarcrest is None: + raise ImportError(PYARCREST_UNAVAILABLE_MESSAGE) + + +class ARCState(str, Enum): + """ + ARC job states that the REST interface may report. + + References: + - [1] https://www.nordugrid.org/arc/arc7/tech/rest/rest.html#rest-interface-job-states + """ + + ACCEPTING = "ACCEPTING" + ACCEPTED = "ACCEPTED" + PREPARING = "PREPARING" + PREPARED = "PREPARED" + SUBMITTING = "SUBMITTING" + QUEUING = "QUEUING" + RUNNING = "RUNNING" + HELD = "HELD" + EXITINGLRMS = "EXITINGLRMS" + OTHER = "OTHER" + EXECUTED = "EXECUTED" + FINISHING = "FINISHING" + FINISHED = "FINISHED" + FAILED = "FAILED" + KILLING = "KILLING" + KILLED = "KILLED" + WIPED = "WIPED" + + +ARC_STATE_TO_PULSAR_STATE_MAP = { + # Mapping from ARC REST interface job states to Pulsar job states. + ARCState.ACCEPTING: state.PREPROCESSING, # Session created, files can be uploaded; not yet processed. + ARCState.ACCEPTED: state.PREPROCESSING, # Detected by A-REX, can't proceed yet. + ARCState.PREPARING: state.PREPROCESSING, # Data stage-in, input data gathering. + ARCState.PREPARED: state.QUEUED, # Waiting in queue for batch submission. + ARCState.SUBMITTING: state.QUEUED, # Preparing for submission. + ARCState.QUEUING: state.QUEUED, # In batch system queue. + ARCState.RUNNING: state.RUNNING, # Running. + ARCState.HELD: state.RUNNING, # On hold/suspended; keep as queued. + ARCState.EXITINGLRMS: state.RUNNING, # Finishing execution in batch system. + ARCState.OTHER: state.RUNNING, # Unknown state; treat as lost. + ARCState.EXECUTED: state.POSTPROCESSING, # Completed, waiting for post-processing. + ARCState.FINISHING: state.POSTPROCESSING, # Data stage-out, cleaning up. + ARCState.FINISHED: state.COMPLETE, # Successful completion. + ARCState.FAILED: state.FAILED, # Failed. + ARCState.KILLING: state.CANCELLED, # Being cancelled. + ARCState.KILLED: state.CANCELLED, # Killed by user. + ARCState.WIPED: state.LOST, # Data deleted, treat as lost. +} + + +def arc_state_to_pulsar_state(arc_state: Optional[ARCState]) -> str: + """ + Map ARC REST interface job states to Pulsar job states. + + Assign the Pulsar state FAILED to jobs whose ARC state does not match any of the states from the mapping + ``ARC_STATE_TO_PULSAR_STATE_MAP``. + """ + pulsar_state = ARC_STATE_TO_PULSAR_STATE_MAP.get(arc_state) + + if pulsar_state is None: + log.warning(f"Unknown ARC state encountered [{arc_state}]") + return state.FAILED + + return pulsar_state diff --git a/pulsar/scripts/collect_output_manifest.py b/pulsar/scripts/collect_output_manifest.py new file mode 100644 index 00000000..72a7a1a4 --- /dev/null +++ b/pulsar/scripts/collect_output_manifest.py @@ -0,0 +1,45 @@ +import argparse +import json + +from pulsar.managers.base import JobDirectory +from pulsar.managers.staging.post import _collect_outputs +from pulsar.managers.util.retry import RetryActionExecutor + + +def make_parser(): + """Construct an argument parser used to call the script from the command line.""" + + parser = argparse.ArgumentParser(description="Create output staging manifest") + + parser.add_argument("--job-directory") + parser.add_argument("--staging-config-path", help="Path to staging config JSON file") + parser.add_argument("--output-manifest-path") + + return parser + + +def collect_outputs(job_directory: str, staging_config_path: str, output_manifest_path: str): + job_directory_ = JobDirectory(job_directory) + with open(staging_config_path) as staging_fh: + staging_config = json.load(staging_fh) + + action_mapper, _ = _collect_outputs( + job_directory_, + staging_config=staging_config, + action_executor=RetryActionExecutor(), + was_cancelled=lambda: False + ) + new_manifest = action_mapper.finalize() + with open(output_manifest_path, "w") as manifest_fh: + json.dump(new_manifest, manifest_fh) + + +def main(): + """Run the script from the command line.""" + parser = make_parser() + args = parser.parse_args() + collect_outputs(args.job_directory, args.staging_config_path, args.output_manifest_path) + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index 68faeed3..843f02cd 100644 --- a/setup.py +++ b/setup.py @@ -92,6 +92,7 @@ pulsar-submit=pulsar.scripts.submit:main pulsar-finish=pulsar.scripts.finish:main pulsar-run=pulsar.scripts.run:main + pulsar-create-output-manifest=pulsar.scripts.collect_output_manifest:main _pulsar-conda-init=pulsar.scripts._conda_init:main _pulsar-configure-slurm=pulsar.scripts._configure_slurm:main _pulsar-configure-galaxy-cvmfs=pulsar.scripts._configure_galaxy_cvmfs:main diff --git a/test/action_mapper_test.py b/test/action_mapper_test.py index 2131cb65..7783696f 100644 --- a/test/action_mapper_test.py +++ b/test/action_mapper_test.py @@ -4,6 +4,17 @@ ) +def test_action_mapper_finalization(): + client = _client("json_transfer") + mapper = FileActionMapper(client) + mapper.action({'path': '/opt/galaxy/tools/filters/catWrapper.py'}, 'input') + mapper.action({'path': '/the_file'}, 'input') + mapper_summary = mapper.finalize() + assert len(mapper_summary) == 2 + assert mapper_summary[0]["path"] == '/opt/galaxy/tools/filters/catWrapper.py' + assert mapper_summary[1]["path"] == '/the_file' + + def test_endpoint_validation(): client = _min_client("remote_transfer") mapper = FileActionMapper(client) diff --git a/test/integration_test_cli_submit.py b/test/test_cli_submit.py similarity index 100% rename from test/integration_test_cli_submit.py rename to test/test_cli_submit.py diff --git a/test/transfer_action_test.py b/test/transfer_action_test.py index 30b927f9..0cf3053d 100644 --- a/test/transfer_action_test.py +++ b/test/transfer_action_test.py @@ -1,7 +1,35 @@ import os from .test_utils import files_server -from pulsar.client.action_mapper import RemoteTransferAction +from pulsar.client.action_mapper import ( + JsonTransferAction, + RemoteTransferAction, +) + + +def test_write_to_path_json(): + with files_server() as (server, directory): + from_path = os.path.join(directory, "remote_get") + + to_path = os.path.join(directory, "local_get") + url = server.application_url + "?path=%s" % from_path + action = JsonTransferAction({"path": to_path}, url=url) + action.write_to_path(to_path) + assert action.path == to_path + assert action.url == url + assert action.finalize() == {"path": to_path, "url": url} + + +def test_write_from_file_json(): + with files_server() as (server, directory): + from_path = os.path.join(directory, "local_post") + to_path = os.path.join(directory, "remote_post") + url = server.application_url + "?path=%s" % to_path + action = JsonTransferAction({"path": to_path}, url=url) + action.write_from_path(from_path) + assert action.path == to_path + assert action.url == url + assert action.finalize() == {"path": to_path, "url": url} def test_write_to_file():