diff --git a/dev-requirements.txt b/dev-requirements.txt index 5351cb14..1a36a4e5 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -21,7 +21,7 @@ flake8 mypy<=1.0.1 # https://github.com/pydantic/pydantic/issues/5192 types-paramiko -types-pkg-resources +types-setuptools types-PyYAML types-pycurl types-requests diff --git a/docker/coexecutor/Dockerfile b/docker/coexecutor/Dockerfile index e94d9a49..9d3a5374 100644 --- a/docker/coexecutor/Dockerfile +++ b/docker/coexecutor/Dockerfile @@ -1,26 +1,64 @@ -FROM conda/miniconda3 +# use the root of the repository as context, i.e. `docker build . -f ./docker/coexecutor/Dockerfile` + +FROM python:3.12-bookworm as build_wheel + +ENV PIP_ROOT_USER_ACTION=ignore + +WORKDIR /build + +# install requirements +COPY requirements.txt . +COPY dev-requirements.txt . +RUN pip install --no-cache-dir --upgrade pip \ + && pip install --no-cache-dir setuptools -r requirements.txt -r dev-requirements.txt + +# build Pulsar wheel +COPY . . +RUN python setup.py sdist bdist_wheel + + +FROM python:3.12-bookworm ENV PYTHONUNBUFFERED 1 +ENV PIP_ROOT_USER_ACTION=ignore ENV DEBIAN_FRONTEND noninteractive ENV PULSAR_CONFIG_CONDA_PREFIX /usr/local +# set up Galaxy Depot repository (provides SLURM DRMAA packages for Debian Buster and newer releases) +RUN apt-get update \ + && apt-get install -y --no-install-recommends ca-certificates curl gnupg \ + && apt-get clean && rm -rf /var/lib/apt/lists/* \ + && curl -fsSL "http://keyserver.ubuntu.com/pks/lookup?op=get&search=0x18381AC8832160AF" | gpg --dearmor -o /etc/apt/trusted.gpg.d/galaxy-depot.gpg \ + && echo "deb https://depot.galaxyproject.org/apt/ $(bash -c '. /etc/os-release; echo ${VERSION_CODENAME:-bookworm}') main" | tee /etc/apt/sources.list.d/galaxy-depot.list + +# set up Debian Bullseye repository (and use it only for libslurm36, needed by slurm-drmaa1, and slurm) +RUN echo "deb http://deb.debian.org/debian/ bullseye main" > /etc/apt/sources.list.d/bullseye.list && \ +cat < /etc/apt/preferences.d/bullseye.pref +Package: * +Pin: release n=bullseye +Pin-Priority: -1 + +Package: libslurm36, slurm +Pin: release n=bullseye +Pin-Priority: 100 +EOF + +# set up CVMFS repository +RUN apt-get update \ + && apt-get install -y --no-install-recommends lsb-release wget \ + && wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \ + && dpkg -i cvmfs-release-latest_all.deb && rm -f cvmfs-release-latest_all.deb + # wget, gcc, pip - to build and install Pulsar. # bzip2 for Miniconda. # TODO: pycurl stuff... -RUN apt-get update \ - && apt-get install -y --no-install-recommends apt-transport-https \ +RUN apt-get update && apt-get install -y --no-install-recommends \ # Install CVMFS client - && apt-get install -y --no-install-recommends lsb-release wget \ - && wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \ - && dpkg -i cvmfs-release-latest_all.deb \ - && rm -f cvmfs-release-latest_all.deb \ + cvmfs cvmfs-config-default \ # Install packages - && apt-get update \ - && apt-get install -y --no-install-recommends gcc \ - libcurl4-openssl-dev \ - cvmfs cvmfs-config-default \ - slurm-llnl slurm-drmaa-dev \ - bzip2 \ + gcc libcurl4-openssl-dev \ + munge libmunge-dev slurm slurm-drmaa-dev \ + bzip2 \ # Install Pulsar Python requirements && pip install --no-cache-dir -U pip \ && pip install --no-cache-dir drmaa wheel kombu pykube pycurl \ @@ -28,14 +66,14 @@ RUN apt-get update \ # Remove build deps and cleanup && apt-get -y remove gcc wget lsb-release \ && apt-get -y autoremove \ - && apt-get autoclean \ - && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log \ - && /usr/sbin/create-munge-key + && apt-get clean && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log + +COPY --from=build_wheel /build/dist/pulsar_app-*-py2.py3-none-any.whl / -ADD pulsar_app-*-py2.py3-none-any.whl /pulsar_app-*-py2.py3-none-any.whl +SHELL ["/bin/bash", "-c"] -RUN pip install --upgrade setuptools && pip install pyOpenSSL --upgrade && pip install cryptography --upgrade -RUN pip install --no-cache-dir /pulsar_app-*-py2.py3-none-any.whl[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl +RUN pip install --no-cache-dir --upgrade setuptools pyOpenSSL cryptography +RUN pip install --no-cache-dir "$(echo /pulsar_app-*-py2.py3-none-any.whl)"[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl RUN pip install --upgrade 'importlib-metadata<5.0' RUN _pulsar-configure-galaxy-cvmfs RUN _pulsar-conda-init --conda_prefix=/pulsar_dependencies/conda diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index ce2f54ec..5cca1d83 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -21,6 +21,7 @@ Any, Dict, List, + Optional, Type, ) from urllib.parse import urlencode @@ -190,6 +191,9 @@ def __init__(self, client=None, config=None): self.ssh_port = config.get("ssh_port", None) self.mappers = mappers_from_dicts(config.get("paths", [])) self.files_endpoint = config.get("files_endpoint", None) + self.actions = [] + # Might want to make the working directory available here so that we know where to place archive + # for archive action def action(self, source, type, mapper=None): path = source.get("path", None) @@ -200,10 +204,14 @@ def action(self, source, type, mapper=None): if mapper: file_lister = mapper.file_lister action_kwds = mapper.action_kwds - action = action_class(source, file_lister=file_lister, **action_kwds) + action = action_class(source, file_lister=file_lister, file_type=type, **action_kwds) self.__process_action(action, type) + self.actions.append(action) return action + def finalize(self): + return [_ for _ in (action.finalize() for action in self.actions) if _] + def unstructured_mappers(self): """ Return mappers that will map 'unstructured' files (i.e. go beyond mapping inputs, outputs, and config files). @@ -265,6 +273,7 @@ def __process_action(self, action, file_type): """ Extension point to populate extra action information after an action has been created. """ + action.file_type = file_type if getattr(action, "inject_url", False): self.__inject_url(action, file_type) if getattr(action, "inject_ssh_properties", False): @@ -300,10 +309,12 @@ class BaseAction: whole_directory_transfer_supported = False action_spec: Dict[str, Any] = {} action_type: str + file_type: Optional[str] = None - def __init__(self, source, file_lister=None): + def __init__(self, source, file_lister=None, file_type=None): self.source = source self.file_lister = file_lister or DEFAULT_FILE_LISTER + self.file_type = file_type @property def path(self): @@ -342,6 +353,9 @@ def _extend_base_dict(self, **kwds): base_dict.update(**kwds) return base_dict + def finalize(self): + pass + def to_dict(self): return self._extend_base_dict() @@ -390,8 +404,8 @@ class RewriteAction(BaseAction): action_type = "rewrite" staging = STAGING_ACTION_NONE - def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None): - super().__init__(source, file_lister=file_lister) + def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None, file_type=None): + super().__init__(source, file_lister=file_lister, file_type=file_type) self.source_directory = source_directory self.destination_directory = destination_directory @@ -467,8 +481,8 @@ class RemoteTransferAction(BaseAction): action_type = "remote_transfer" staging = STAGING_ACTION_REMOTE - def __init__(self, source, file_lister=None, url=None): - super().__init__(source, file_lister=file_lister) + def __init__(self, source, file_lister=None, url=None, file_type=None): + super().__init__(source, file_lister=file_lister, file_type=file_type) self.url = url def to_dict(self): @@ -495,8 +509,8 @@ class RemoteTransferTusAction(BaseAction): action_type = "remote_transfer_tus" staging = STAGING_ACTION_REMOTE - def __init__(self, source, file_lister=None, url=None): - super().__init__(source, file_lister=file_lister) + def __init__(self, source, file_lister=None, url=None, file_type=None): + super().__init__(source, file_lister=file_lister, file_type=file_type) self.url = url def to_dict(self): @@ -513,6 +527,42 @@ def write_from_path(self, pulsar_path): tus_upload_file(self.url, pulsar_path) +class JsonTransferAction(BaseAction): + """ + This action indicates that the pulsar server should create a JSON manifest that can be used to stage files by an + external system that can stage files in and out of the compute environment. + """ + inject_url = True + whole_directory_transfer_supported = True + action_type = "json_transfer" + staging = STAGING_ACTION_REMOTE + + def __init__(self, source, file_lister=None, files_endpoint=None, file_type=None): + super().__init__(source, file_lister, file_type) + self.files_endpoint = files_endpoint + self._from_path = None + self._to_path = None + + @classmethod + def from_dict(cls, action_dict): + return JsonTransferAction(source=action_dict["source"], files_endpoint=action_dict["files_endpoint"]) + + def to_dict(self): + return self._extend_base_dict(files_endpoint=self.files_endpoint) + + def write_to_path(self, path): + self._to_path = path + + def write_from_path(self, pulsar_path: str): + self._from_path = pulsar_path + + def finalize(self): + if self._to_path: + return {"files_endpoint": self.files_endpoint, "to_path": self._to_path} + else: + return {"files_endpoint": self.files_endpoint, "from_path": self._from_path} + + class RemoteObjectStoreCopyAction(BaseAction): """ """ @@ -664,6 +714,7 @@ def write_to_path(self, path): DICTIFIABLE_ACTION_CLASSES = [ + JsonTransferAction, RemoteCopyAction, RemoteTransferAction, RemoteTransferTusAction, @@ -844,6 +895,7 @@ def unstructured_map(self, path): ACTION_CLASSES: List[Type[BaseAction]] = [ NoneAction, + JsonTransferAction, RewriteAction, TransferAction, CopyAction, diff --git a/pulsar/client/client.py b/pulsar/client/client.py index e28d84ea..79d5734d 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -168,7 +168,7 @@ def __init__(self, destination_params, job_id, job_manager_interface): self.job_manager_interface = job_manager_interface def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, - dynamic_file_sources=None, token_endpoint=None): + dynamic_file_sources=None, token_endpoint=None, staging_manifest=None): """ Queue up the execution of the supplied `command_line` on the remote server. Called launch for historical reasons, should be renamed to @@ -405,7 +405,7 @@ def _build_status_request_message(self): class MessageJobClient(BaseMessageJobClient): def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, - dynamic_file_sources=None, token_endpoint=None): + dynamic_file_sources=None, token_endpoint=None, staging_manifest=None): """ """ launch_params = self._build_setup_message( @@ -439,7 +439,7 @@ def __init__(self, destination_params, job_id, client_manager, shell): self.shell = shell def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, - dynamic_file_sources=None, token_endpoint=None): + dynamic_file_sources=None, token_endpoint=None, staging_manifest=None): """ """ launch_params = self._build_setup_message( @@ -477,6 +477,55 @@ class ExecutionType(str, Enum): PARALLEL = "parallel" +class LocalSequentialLaunchMixin(BaseRemoteConfiguredJobClient): + + def launch( + self, + command_line, + dependencies_description=None, + env=None, + remote_staging=None, + job_config=None, + dynamic_file_sources=None, + container_info=None, + token_endpoint=None, + pulsar_app_config=None, + staging_manifest=None + ) -> Optional[ExternalId]: + # 1. call staging script with staging manifest [handled by ARC] + # 2. call actual command_line + # 3. call script that does output collection (similar to __collect_outputs) and outputs staging manifest + # 4. stage outputs back using manifest [handled by ARC] + import importlib.resources + import tempfile + import subprocess + import sys + from pulsar import scripts + STAGING_SCRIPT = importlib.resources.path(scripts, "staging_arc.py") + MANIFEST_SCRIPT = importlib.resources.path(scripts, "collect_output_manifest.py") + + with tempfile.NamedTemporaryFile(mode="w") as temp_fh: + temp_fh.write(json_dumps(staging_manifest)) + temp_fh.flush() + staging_process = subprocess.run([sys.executable, STAGING_SCRIPT, "--json", temp_fh.name], capture_output=True) + assert staging_process.returncode == 0, staging_process.stderr.decode() + job_process = subprocess.run(command_line, shell=True, capture_output=True) + assert job_process.returncode == 0, job_process.stderr.decode() + + job_directory = self.job_directory.job_directory + + output_manifest_path = os.path.join(job_directory, "output_manifest.json") + + with tempfile.NamedTemporaryFile(mode="w") as staging_config_fh: + staging_config_fh.write(json_dumps(remote_staging)) + staging_config_fh.flush() + + p = subprocess.run([sys.executable, MANIFEST_SCRIPT, "--job-directory", job_directory, "--staging-config-path", staging_config_fh.name, "--output-manifest-path", output_manifest_path]) + assert p.returncode == 0 + + stage_out_process = subprocess.run([sys.executable, STAGING_SCRIPT, "--json", output_manifest_path], capture_output=True) + assert stage_out_process.returncode == 0, stage_out_process.stderr.decode() + class CoexecutionLaunchMixin(BaseRemoteConfiguredJobClient): execution_type: ExecutionType pulsar_container_image: str @@ -491,7 +540,8 @@ def launch( dynamic_file_sources=None, container_info=None, token_endpoint=None, - pulsar_app_config=None + pulsar_app_config=None, + staging_manifest=None ) -> Optional[ExternalId]: """ """ @@ -756,6 +806,12 @@ def raw_check_complete(self) -> Dict[str, Any]: } +class LocalSequentialClient(BaseMessageCoexecutionJobClient, LocalSequentialLaunchMixin): + + def __init__(self, destination_params, job_id, client_manager): + super().__init__(destination_params, job_id, client_manager) + + class TesPollingCoexecutionJobClient(BasePollingCoexecutionJobClient, LaunchesTesContainersMixin): """A client that co-executes pods via GA4GH TES and depends on amqp for status updates.""" diff --git a/pulsar/client/manager.py b/pulsar/client/manager.py index 9c429734..0e931004 100644 --- a/pulsar/client/manager.py +++ b/pulsar/client/manager.py @@ -29,6 +29,7 @@ MessageJobClient, TesMessageCoexecutionJobClient, TesPollingCoexecutionJobClient, + LocalSequentialClient, ) from .destination import url_to_destination_params from .object_client import ObjectStoreClient @@ -256,6 +257,8 @@ def get_client(self, destination_params, job_id, **kwargs): return K8sPollingCoexecutionJobClient(destination_params, job_id, self) elif destination_params.get("tes_url", False): return TesPollingCoexecutionJobClient(destination_params, job_id, self) + elif destination_params.get("arc_url", False): + return LocalSequentialClient(destination_params, job_id, self) else: raise Exception("Unknown client configuration") @@ -268,7 +271,7 @@ def build_client_manager(**kwargs: Dict[str, Any]) -> ClientManagerInterface: return ClientManager(**kwargs) # TODO: Consider more separation here. elif kwargs.get('amqp_url', None): return MessageQueueClientManager(**kwargs) - elif kwargs.get("k8s_enabled") or kwargs.get("tes_url"): + elif kwargs.get("k8s_enabled") or kwargs.get("tes_url") or kwargs.get("arc_enabled"): return PollingJobClientManager(**kwargs) else: return ClientManager(**kwargs) diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py index ea8f8aa1..82f9d348 100644 --- a/pulsar/client/staging/down.py +++ b/pulsar/client/staging/down.py @@ -72,6 +72,9 @@ def collect(self): self.__collect_other_working_directory_files() self.__collect_metadata_directory_files() self.__collect_job_directory_files() + # Give actions that require a final action, like those that write a manifest, to write out their content + self.__finalize_action_mapper() + # finalize collection here for executors that need this ? return self.exception_tracker.collection_failure_exceptions def __collect_working_directory_outputs(self): @@ -134,6 +137,9 @@ def __collect_job_directory_files(self): 'output_jobdir', ) + def __finalize_action_mapper(self): + self.action_mapper.finalize() + def __realized_dynamic_file_source_references(self): references = {"filename": [], "extra_files": []} diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index 9f08bef8..81b5117b 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -71,6 +71,19 @@ def submit_job(client, client_job_description, job_config=None): # it needs to be in the response to Pulsar even Pulsar is inititing staging actions launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources launch_kwds["token_endpoint"] = client.token_endpoint + + # populate `to_path` + staging_manifest = [] + for action in file_stager.action_mapper.actions: + if action.file_type not in ("output", "output_workdir"): + name = basename(action.path) + path = file_stager.job_directory.calculate_path(name, action.file_type) + action.write_to_path(path) + staging_manifest.append(action.finalize()) + + if staging_manifest: + launch_kwds["staging_manifest"] = staging_manifest + # for pulsar modalities that skip the explicit "setup" step, give them a chance to set an external # id from the submission process (e.g. to TES). launch_response = client.launch(**launch_kwds) @@ -559,7 +572,8 @@ def register_rewrite(self, local_path, remote_path, type, force=False): def register_rewrite_action(self, action, remote_path, force=False): if action.staging_needed or force: path = getattr(action, 'path', None) - if path: + if path and path not in self.file_renames: + # this should only happen in unit testing ... don't really know why self.file_renames[path] = remote_path def rewrite_input_paths(self): diff --git a/pulsar/client/test/check.py b/pulsar/client/test/check.py index 5ddc00f4..638b38c8 100644 --- a/pulsar/client/test/check.py +++ b/pulsar/client/test/check.py @@ -382,7 +382,7 @@ def on_update(message): self.client_manager.ensure_has_status_update_callback(on_update) - def wait(self, seconds=120): + def wait(self, seconds=1200): final_status = None if not self.background: i = 0 diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py index 6ca07e91..b7d323a7 100644 --- a/pulsar/managers/base/__init__.py +++ b/pulsar/managers/base/__init__.py @@ -232,7 +232,7 @@ class JobDirectory(RemoteJobDirectory): def __init__( self, staging_directory, - job_id, + job_id=None, lock_manager=None, directory_maker=None ): @@ -240,7 +240,8 @@ def __init__( self._directory_maker = directory_maker or DirectoryMaker() self.lock_manager = lock_manager # Assert this job id isn't hacking path somehow. - assert job_id == basename(job_id) + if job_id: + assert job_id == basename(job_id) def _job_file(self, name): return os.path.join(self.job_directory, name) diff --git a/pulsar/managers/staging/post.py b/pulsar/managers/staging/post.py index 6ca59390..3c5fdd32 100644 --- a/pulsar/managers/staging/post.py +++ b/pulsar/managers/staging/post.py @@ -20,14 +20,14 @@ def postprocess(job_directory, action_executor, was_cancelled): staging_config = job_directory.load_metadata("launch_config").get("remote_staging", None) else: staging_config = None - collected = __collect_outputs(job_directory, staging_config, action_executor, was_cancelled) + file_action_mapper, collected = _collect_outputs(job_directory, staging_config, action_executor, was_cancelled) return collected finally: job_directory.write_file("postprocessed", "") return False -def __collect_outputs(job_directory, staging_config, action_executor, was_cancelled): +def _collect_outputs(job_directory, staging_config, action_executor, was_cancelled): collected = True if "action_mapper" in staging_config: file_action_mapper = action_mapper.FileActionMapper(config=staging_config["action_mapper"]) @@ -39,7 +39,7 @@ def __collect_outputs(job_directory, staging_config, action_executor, was_cancel if collection_failure_exceptions: log.warn("Failures collecting results %s" % collection_failure_exceptions) collected = False - return collected + return file_action_mapper, collected def realized_dynamic_file_sources(job_directory): diff --git a/pulsar/scripts/collect_output_manifest.py b/pulsar/scripts/collect_output_manifest.py new file mode 100644 index 00000000..a1d2e80a --- /dev/null +++ b/pulsar/scripts/collect_output_manifest.py @@ -0,0 +1,35 @@ +import argparse +import json + +from pulsar.managers.base import JobDirectory +from pulsar.managers.staging.post import _collect_outputs +from pulsar.managers.util.retry import RetryActionExecutor + + +def make_parser(): + """Construct an argument parser used to call the script from the command line.""" + + parser = argparse.ArgumentParser(description="Create output staging manifest") + + parser.add_argument("--job-directory") + parser.add_argument("--staging-config-path", help="Path to staging config JSON file") + parser.add_argument("--output-manifest-path") + + return parser + + +def collect_outputs(job_directory: str, staging_config_path: str, output_manifest_path: str): + job_directory = JobDirectory(job_directory) + with open(staging_config_path) as staging_fh: + staging_config = json.load(staging_fh) + + action_mapper, _ = _collect_outputs(job_directory, staging_config=staging_config, action_executor=RetryActionExecutor(), was_cancelled=lambda: False) + new_manifest = action_mapper.finalize() + with open(output_manifest_path, "w") as manifest_fh: + json.dump(new_manifest, manifest_fh) + + +if __name__ == "__main__": + parser = make_parser() + args = parser.parse_args() + collect_outputs(args.job_directory, args.staging_config_path, args.output_manifest_path) \ No newline at end of file diff --git a/pulsar/scripts/staging_arc.py b/pulsar/scripts/staging_arc.py new file mode 100644 index 00000000..0995e580 --- /dev/null +++ b/pulsar/scripts/staging_arc.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python + +"""Stage files in or out of a compute environment made available via the Advanced Resource Connector (ARC) [1]. + +This script reads a set of source and target URL (with `http`, `https` or `file` as URL scheme) and/or path pairs passed +either as command line arguments and/or from a file in the form of a JSON array. It then reads the files from the source +URLs and posts (copies them for `file://` urls) them to the target URLs. + +Example usage: + +```shell +$ ./staging_arc.py --stage https://example.org file.dat --stage file:///home/user/text.txt https://example.org/files \ + --json staging_manifest.json +``` + +_staging_manifest.json_ +```json +[ + { + "source": "file:///home/user/data.txt", + "target": "file:///home/person/file.txt" + }, + { + "source": "file:///home/user/analysis.txt", + "target": "https://example.org/files/analysis.txt" + } +] +``` + +Retrieve files from a set of source URLs and save them to a set of target URLs. + +References: +- [1] https://www.nordugrid.org/arc/about-arc.html +""" + +# When the URL is the target, use POST. + +import json +import sys +from argparse import ArgumentParser + +from pulsar.client.transport import ( + get_file, + post_file, +) + + + +def main(args): + for entry in parse_json_manifest(args.json): + if entry.get("to_path"): + get_file(entry["url"], entry["to_path"]) + elif entry.get("from_path"): + post_file(entry["url"], entry["from_path"]) + else: + raise Exception(f"Didn't expect this in the staging manifest: {entry}") + + +def parse_json_manifest(json_path): + with open(json_path) as fh: + return json.load(fh) + + +HELP_STAGE = "Read a file from `source` and save it to `target`." +HELP_JSON = "Read a list of `source` and `target` URLs from a JSON file." + + +def make_parser() -> ArgumentParser: + """Construct an argument parser used to call the script from the command line.""" + + module_docstring = sys.modules[__name__].__doc__ + + parser = ArgumentParser(description=module_docstring) + + parser.add_argument( + "--stage", dest="stage", metavar=("source", "target"), nargs=2, action="append", help=HELP_STAGE + ) + parser.add_argument("--json", help=HELP_JSON) + + return parser + + +if __name__ == "__main__": + """Invoke script from the command line.""" + argument_parser = make_parser() + args = argument_parser.parse_args() + main(args) diff --git a/pulsar/scripts/submit_util.py b/pulsar/scripts/submit_util.py index c00a2c32..186c06d0 100644 --- a/pulsar/scripts/submit_util.py +++ b/pulsar/scripts/submit_util.py @@ -4,6 +4,9 @@ import logging import time +from pulsar.client import ClientJobDescription, ClientOutputs, ClientInput +from pulsar.client import submit_job as submit_client_job +from pulsar.client.manager import build_client_manager from pulsar.client.util import from_base64_json from pulsar.main import ( load_pulsar_app, diff --git a/test/action_mapper_test.py b/test/action_mapper_test.py index 2131cb65..7783696f 100644 --- a/test/action_mapper_test.py +++ b/test/action_mapper_test.py @@ -4,6 +4,17 @@ ) +def test_action_mapper_finalization(): + client = _client("json_transfer") + mapper = FileActionMapper(client) + mapper.action({'path': '/opt/galaxy/tools/filters/catWrapper.py'}, 'input') + mapper.action({'path': '/the_file'}, 'input') + mapper_summary = mapper.finalize() + assert len(mapper_summary) == 2 + assert mapper_summary[0]["path"] == '/opt/galaxy/tools/filters/catWrapper.py' + assert mapper_summary[1]["path"] == '/the_file' + + def test_endpoint_validation(): client = _min_client("remote_transfer") mapper = FileActionMapper(client) diff --git a/test/client_test.py b/test/client_test.py index 27abe112..31dfe235 100644 --- a/test/client_test.py +++ b/test/client_test.py @@ -125,6 +125,10 @@ def test_launch(): request_checker.assert_called() +def test_sequential_local(): + pass + + def __test_upload(upload_type): client = TestClient() (temp_fileno, temp_file_path) = tempfile.mkstemp() diff --git a/test/integration_test_cli_submit.py b/test/integration_test_cli_submit.py deleted file mode 100644 index 64ee3e5c..00000000 --- a/test/integration_test_cli_submit.py +++ /dev/null @@ -1,98 +0,0 @@ -import os -import yaml - -from .test_utils import ( - TempDirectoryTestCase, - files_server, - integration_test, - skip_unless_module, - temp_directory_persist, -) - -from pulsar.client import ClientOutputs -from pulsar.client.util import to_base64_json -from pulsar.scripts import submit - - -class BaseCliTestCase(TempDirectoryTestCase): - - def run_and_check_submission(self): - job_id = "0" - galaxy_working = temp_directory_persist() - output_name = "dataset_1211231231231231231.dat" - galaxy_output = os.path.join(galaxy_working, output_name) - pulsar_output = os.path.join(self.staging_directory, job_id, "outputs", output_name) - pulsar_input = os.path.join(self.staging_directory, job_id, "inputs", "cow") - with files_server("/") as test_files_server: - files_endpoint = test_files_server.application_url - action = {"name": "cow", "type": "input", "action": {"action_type": "message", "contents": "cow file contents\n"}} - client_outputs = ClientOutputs( - working_directory=galaxy_working, - output_files=[os.path.join(galaxy_working, output_name)], - ) - launch_params = dict( - command_line="cat '{}' > '{}'".format(pulsar_input, pulsar_output), - job_id=job_id, - setup_params=dict( - job_id=job_id, - ), - setup=True, - remote_staging={ - "setup": [action], - "action_mapper": { - "default_action": "remote_transfer", - "files_endpoint": files_endpoint, - }, - "client_outputs": client_outputs.to_dict(), - }, - ) - base64 = to_base64_json(launch_params) - assert not os.path.exists(galaxy_output) - submit.main(["--base64", base64] + self._encode_application()) - assert os.path.exists(galaxy_output) - out_contents = open(galaxy_output).read() - assert out_contents == "cow file contents\n", out_contents - - @property - def staging_directory(self): - return os.path.join(self.temp_directory, "staging") - - @property - def config_directory(self): - config_directory = os.path.join(self.temp_directory, "config") - os.makedirs(config_directory) - return config_directory - - -class CliFileAppConfigTestCase(BaseCliTestCase): - - @skip_unless_module("kombu") - @integration_test - def test(self): - self.run_and_check_submission() - - def _encode_application(self): - app_conf = dict( - staging_directory=self.staging_directory, - message_queue_url="memory://submittest" - ) - app_conf_path = os.path.join(self.config_directory, "app.yml") - with open(app_conf_path, "w") as f: - f.write(yaml.dump(app_conf)) - - return ["--app_conf_path", app_conf_path] - - -class CliCommandLineAppConfigTestCase(BaseCliTestCase): - - @skip_unless_module("kombu") - @integration_test - def test(self): - self.run_and_check_submission() - - def _encode_application(self): - app_conf = dict( - staging_directory=self.staging_directory, - message_queue_url="memory://submittest" - ) - return ["--app_conf_base64", to_base64_json(app_conf)] diff --git a/test/test_cli_submit.py b/test/test_cli_submit.py new file mode 100644 index 00000000..636ca879 --- /dev/null +++ b/test/test_cli_submit.py @@ -0,0 +1,152 @@ +import os +from abc import ( + ABC, + abstractmethod, +) + +import yaml + +from .test_utils import ( + TempDirectoryTestCase, + files_server, + integration_test, + skip_unless_module, +) + +from pulsar.client import ClientOutputs, ClientInput +from pulsar.client.util import to_base64_json +from pulsar.scripts import submit + + +class BaseCliTestCase(ABC, TempDirectoryTestCase): + + def run_and_check_submission(self): + # prepare job input directory + input_directory = os.path.join(self.temp_directory, "input_files") + os.makedirs(input_directory, exist_ok=False) + self.setup_input_directory(input_directory) + + # prepare job output directory + output_directory = os.path.join(self.temp_directory, "output_files") + output_file_name = "dataset_1211231231231231231.dat" + output_file_path = os.path.join(output_directory, output_file_name) + os.makedirs(output_directory, exist_ok=False) + + # prepare Galaxy working directory + galaxy_working_directory = os.path.join(self.temp_directory, "galaxy_working") + os.makedirs(galaxy_working_directory, exist_ok=False) + + job_id = "0" + with files_server("/", allow_multiple_downloads=True) as test_files_server: + launch_params = self.setup_launch_params( + job_id=job_id, + files_endpoint=test_files_server.application_url, + galaxy_working_directory=galaxy_working_directory, + pulsar_staging_directory=self.staging_directory, + input_directory=input_directory, + output_file_path=output_file_path, + ) + launch_params_base64 = to_base64_json(launch_params) + + # submit job and test results + assert not os.path.exists(output_file_path) + submit.main(["--base64", launch_params_base64] + self.encode_application()) + assert os.path.exists(output_file_path) + out_contents = open(output_file_path).read() + assert out_contents == "cow file contents\n", out_contents + + def setup_input_directory(self, directory): + pass + + def setup_launch_params( + self, + *, + job_id, + files_endpoint, + galaxy_working_directory, + pulsar_staging_directory, + output_file_path, + **kwargs + ): + output_file_name = os.path.basename(output_file_path) + pulsar_output = os.path.join(pulsar_staging_directory, job_id, "outputs", output_file_name) + pulsar_input = os.path.join(pulsar_staging_directory, job_id, "inputs", "cow") + action = { + "name": "cow", + "type": "input", + "action": { + "action_type": "message", + "contents": "cow file contents\n" + } + } + client_outputs = ClientOutputs( + working_directory=galaxy_working_directory, + output_files=[output_file_path], + ) + launch_params = dict( + command_line="cat '{}' > '{}'".format(pulsar_input, pulsar_output), + job_id=job_id, + setup_params=dict( + job_id=job_id, + ), + setup=True, + remote_staging={ + "setup": [action], + "action_mapper": { + "default_action": "remote_transfer", + "files_endpoint": files_endpoint, + }, + "client_outputs": client_outputs.to_dict(), + }, + ) + return launch_params + + @abstractmethod + def encode_application(self): + pass + + @property + def staging_directory(self): + return os.path.join(self.temp_directory, "pulsar_staging") + + @property + def config_directory(self): + config_directory = os.path.join(self.temp_directory, "config") + os.makedirs(config_directory, exist_ok=True) + return config_directory + + +class CliFileAppConfigTestCase(BaseCliTestCase): + + @skip_unless_module("kombu") + @integration_test + def test(self): + self.run_and_check_submission() + + def encode_application(self): + app_conf = dict( + staging_directory=self.staging_directory, + message_queue_url="memory://submittest", + conda_auto_init=False, + ) + app_conf_path = os.path.join(self.config_directory, "app.yml") + with open(app_conf_path, "w") as f: + f.write(yaml.dump(app_conf)) + + return ["--app_conf_path", app_conf_path] + + +class CliCommandLineAppConfigTestCase(BaseCliTestCase): + + @skip_unless_module("kombu") + @integration_test + def test(self): + self.run_and_check_submission() + + def encode_application(self): + app_conf = dict( + staging_directory=self.staging_directory, + message_queue_url="memory://submittest", + conda_auto_init=False, + ) + return ["--app_conf_base64", to_base64_json(app_conf)] diff --git a/test/test_utils.py b/test/test_utils.py index e41f9130..eb852248 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -68,7 +68,7 @@ def wrapper(*args, **kwargs): return outer_wrapper -INTEGRATION_MAXIMUM_TEST_TIME = 120 +INTEGRATION_MAXIMUM_TEST_TIME = 1200 integration_test = timed(INTEGRATION_MAXIMUM_TEST_TIME) TEST_DIR = dirname(__file__) @@ -466,7 +466,7 @@ def get_authorization(self, tool_id): @contextmanager -def files_server(directory=None): +def files_server(directory=None, allow_multiple_downloads=False): external_url = os.environ.get("PULSAR_TEST_EXTERNAL_JOB_FILES_URL") if external_url: if directory is None: @@ -481,11 +481,11 @@ def files_server(directory=None): else: if not directory: with temp_directory() as directory: - app = TestApp(JobFilesApp(directory)) + app = TestApp(JobFilesApp(directory, allow_multiple_downloads=allow_multiple_downloads)) with server_for_test_app(app) as server: yield server, directory else: - app = TestApp(JobFilesApp(directory)) + app = TestApp(JobFilesApp(directory, allow_multiple_downloads=allow_multiple_downloads)) with server_for_test_app(app) as server: yield server diff --git a/test/transfer_action_test.py b/test/transfer_action_test.py index 30b927f9..0cf3053d 100644 --- a/test/transfer_action_test.py +++ b/test/transfer_action_test.py @@ -1,7 +1,35 @@ import os from .test_utils import files_server -from pulsar.client.action_mapper import RemoteTransferAction +from pulsar.client.action_mapper import ( + JsonTransferAction, + RemoteTransferAction, +) + + +def test_write_to_path_json(): + with files_server() as (server, directory): + from_path = os.path.join(directory, "remote_get") + + to_path = os.path.join(directory, "local_get") + url = server.application_url + "?path=%s" % from_path + action = JsonTransferAction({"path": to_path}, url=url) + action.write_to_path(to_path) + assert action.path == to_path + assert action.url == url + assert action.finalize() == {"path": to_path, "url": url} + + +def test_write_from_file_json(): + with files_server() as (server, directory): + from_path = os.path.join(directory, "local_post") + to_path = os.path.join(directory, "remote_post") + url = server.application_url + "?path=%s" % to_path + action = JsonTransferAction({"path": to_path}, url=url) + action.write_from_path(from_path) + assert action.path == to_path + assert action.url == url + assert action.finalize() == {"path": to_path, "url": url} def test_write_to_file():