diff --git a/dev-requirements.txt b/dev-requirements.txt index 5351cb14..1a36a4e5 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -21,7 +21,7 @@ flake8 mypy<=1.0.1 # https://github.com/pydantic/pydantic/issues/5192 types-paramiko -types-pkg-resources +types-setuptools types-PyYAML types-pycurl types-requests diff --git a/docker/coexecutor/Dockerfile b/docker/coexecutor/Dockerfile index e94d9a49..9d3a5374 100644 --- a/docker/coexecutor/Dockerfile +++ b/docker/coexecutor/Dockerfile @@ -1,26 +1,64 @@ -FROM conda/miniconda3 +# use the root of the repository as context, i.e. `docker build . -f ./docker/coexecutor/Dockerfile` + +FROM python:3.12-bookworm as build_wheel + +ENV PIP_ROOT_USER_ACTION=ignore + +WORKDIR /build + +# install requirements +COPY requirements.txt . +COPY dev-requirements.txt . +RUN pip install --no-cache-dir --upgrade pip \ + && pip install --no-cache-dir setuptools -r requirements.txt -r dev-requirements.txt + +# build Pulsar wheel +COPY . . +RUN python setup.py sdist bdist_wheel + + +FROM python:3.12-bookworm ENV PYTHONUNBUFFERED 1 +ENV PIP_ROOT_USER_ACTION=ignore ENV DEBIAN_FRONTEND noninteractive ENV PULSAR_CONFIG_CONDA_PREFIX /usr/local +# set up Galaxy Depot repository (provides SLURM DRMAA packages for Debian Buster and newer releases) +RUN apt-get update \ + && apt-get install -y --no-install-recommends ca-certificates curl gnupg \ + && apt-get clean && rm -rf /var/lib/apt/lists/* \ + && curl -fsSL "http://keyserver.ubuntu.com/pks/lookup?op=get&search=0x18381AC8832160AF" | gpg --dearmor -o /etc/apt/trusted.gpg.d/galaxy-depot.gpg \ + && echo "deb https://depot.galaxyproject.org/apt/ $(bash -c '. /etc/os-release; echo ${VERSION_CODENAME:-bookworm}') main" | tee /etc/apt/sources.list.d/galaxy-depot.list + +# set up Debian Bullseye repository (and use it only for libslurm36, needed by slurm-drmaa1, and slurm) +RUN echo "deb http://deb.debian.org/debian/ bullseye main" > /etc/apt/sources.list.d/bullseye.list && \ +cat < /etc/apt/preferences.d/bullseye.pref +Package: * +Pin: release n=bullseye +Pin-Priority: -1 + +Package: libslurm36, slurm +Pin: release n=bullseye +Pin-Priority: 100 +EOF + +# set up CVMFS repository +RUN apt-get update \ + && apt-get install -y --no-install-recommends lsb-release wget \ + && wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \ + && dpkg -i cvmfs-release-latest_all.deb && rm -f cvmfs-release-latest_all.deb + # wget, gcc, pip - to build and install Pulsar. # bzip2 for Miniconda. # TODO: pycurl stuff... -RUN apt-get update \ - && apt-get install -y --no-install-recommends apt-transport-https \ +RUN apt-get update && apt-get install -y --no-install-recommends \ # Install CVMFS client - && apt-get install -y --no-install-recommends lsb-release wget \ - && wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \ - && dpkg -i cvmfs-release-latest_all.deb \ - && rm -f cvmfs-release-latest_all.deb \ + cvmfs cvmfs-config-default \ # Install packages - && apt-get update \ - && apt-get install -y --no-install-recommends gcc \ - libcurl4-openssl-dev \ - cvmfs cvmfs-config-default \ - slurm-llnl slurm-drmaa-dev \ - bzip2 \ + gcc libcurl4-openssl-dev \ + munge libmunge-dev slurm slurm-drmaa-dev \ + bzip2 \ # Install Pulsar Python requirements && pip install --no-cache-dir -U pip \ && pip install --no-cache-dir drmaa wheel kombu pykube pycurl \ @@ -28,14 +66,14 @@ RUN apt-get update \ # Remove build deps and cleanup && apt-get -y remove gcc wget lsb-release \ && apt-get -y autoremove \ - && apt-get autoclean \ - && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log \ - && /usr/sbin/create-munge-key + && apt-get clean && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log + +COPY --from=build_wheel /build/dist/pulsar_app-*-py2.py3-none-any.whl / -ADD pulsar_app-*-py2.py3-none-any.whl /pulsar_app-*-py2.py3-none-any.whl +SHELL ["/bin/bash", "-c"] -RUN pip install --upgrade setuptools && pip install pyOpenSSL --upgrade && pip install cryptography --upgrade -RUN pip install --no-cache-dir /pulsar_app-*-py2.py3-none-any.whl[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl +RUN pip install --no-cache-dir --upgrade setuptools pyOpenSSL cryptography +RUN pip install --no-cache-dir "$(echo /pulsar_app-*-py2.py3-none-any.whl)"[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl RUN pip install --upgrade 'importlib-metadata<5.0' RUN _pulsar-configure-galaxy-cvmfs RUN _pulsar-conda-init --conda_prefix=/pulsar_dependencies/conda diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index ce2f54ec..10fdc186 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -21,6 +21,7 @@ Any, Dict, List, + Optional, Type, ) from urllib.parse import urlencode @@ -190,6 +191,9 @@ def __init__(self, client=None, config=None): self.ssh_port = config.get("ssh_port", None) self.mappers = mappers_from_dicts(config.get("paths", [])) self.files_endpoint = config.get("files_endpoint", None) + self.actions = [] + # Might want to make the working directory available here so that we know where to place archive + # for archive action def action(self, source, type, mapper=None): path = source.get("path", None) @@ -200,10 +204,14 @@ def action(self, source, type, mapper=None): if mapper: file_lister = mapper.file_lister action_kwds = mapper.action_kwds - action = action_class(source, file_lister=file_lister, **action_kwds) + action = action_class(source, file_lister=file_lister, file_type=type, **action_kwds) self.__process_action(action, type) + self.actions.append(action) return action + def finalize(self): + return [_ for _ in (action.finalize() for action in self.actions) if _] + def unstructured_mappers(self): """ Return mappers that will map 'unstructured' files (i.e. go beyond mapping inputs, outputs, and config files). @@ -265,6 +273,7 @@ def __process_action(self, action, file_type): """ Extension point to populate extra action information after an action has been created. """ + action.file_type = file_type if getattr(action, "inject_url", False): self.__inject_url(action, file_type) if getattr(action, "inject_ssh_properties", False): @@ -300,10 +309,12 @@ class BaseAction: whole_directory_transfer_supported = False action_spec: Dict[str, Any] = {} action_type: str + file_type: Optional[str] = None - def __init__(self, source, file_lister=None): + def __init__(self, source, file_lister=None, file_type=None): self.source = source self.file_lister = file_lister or DEFAULT_FILE_LISTER + self.file_type = file_type @property def path(self): @@ -342,6 +353,9 @@ def _extend_base_dict(self, **kwds): base_dict.update(**kwds) return base_dict + def finalize(self): + pass + def to_dict(self): return self._extend_base_dict() @@ -390,8 +404,8 @@ class RewriteAction(BaseAction): action_type = "rewrite" staging = STAGING_ACTION_NONE - def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None): - super().__init__(source, file_lister=file_lister) + def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None, file_type=None): + super().__init__(source, file_lister=file_lister, file_type=file_type) self.source_directory = source_directory self.destination_directory = destination_directory @@ -467,8 +481,8 @@ class RemoteTransferAction(BaseAction): action_type = "remote_transfer" staging = STAGING_ACTION_REMOTE - def __init__(self, source, file_lister=None, url=None): - super().__init__(source, file_lister=file_lister) + def __init__(self, source, file_lister=None, url=None, file_type=None): + super().__init__(source, file_lister=file_lister, file_type=file_type) self.url = url def to_dict(self): @@ -495,8 +509,8 @@ class RemoteTransferTusAction(BaseAction): action_type = "remote_transfer_tus" staging = STAGING_ACTION_REMOTE - def __init__(self, source, file_lister=None, url=None): - super().__init__(source, file_lister=file_lister) + def __init__(self, source, file_lister=None, url=None, file_type=None): + super().__init__(source, file_lister=file_lister, file_type=file_type) self.url = url def to_dict(self): @@ -513,6 +527,42 @@ def write_from_path(self, pulsar_path): tus_upload_file(self.url, pulsar_path) +class JsonTransferAction(BaseAction): + """ + This action indicates that the pulsar server should create a JSON manifest that can be used to stage files by an + external system that can stage files in and out of the compute environment. + """ + inject_url = True + whole_directory_transfer_supported = True + action_type = "json_transfer" + staging = STAGING_ACTION_REMOTE + + def __init__(self, source, file_lister=None, url=None, file_type=None): + super().__init__(source, file_lister, file_type) + self.url = url + self._from_path = None + self._to_path = None + + @classmethod + def from_dict(cls, action_dict): + return JsonTransferAction(source=action_dict["source"], url=action_dict["url"]) + + def to_dict(self): + return self._extend_base_dict(url=self.url) + + def write_to_path(self, path): + self._to_path = path + + def write_from_path(self, pulsar_path: str): + self._from_path = pulsar_path + + def finalize(self): + if self._to_path: + return {"url": self.url, "to_path": self._to_path} + else: + return {"url": self.url, "from_path": self._from_path} + + class RemoteObjectStoreCopyAction(BaseAction): """ """ @@ -664,6 +714,7 @@ def write_to_path(self, path): DICTIFIABLE_ACTION_CLASSES = [ + JsonTransferAction, RemoteCopyAction, RemoteTransferAction, RemoteTransferTusAction, @@ -844,6 +895,7 @@ def unstructured_map(self, path): ACTION_CLASSES: List[Type[BaseAction]] = [ NoneAction, + JsonTransferAction, RewriteAction, TransferAction, CopyAction, diff --git a/pulsar/client/client.py b/pulsar/client/client.py index e28d84ea..273dc1e7 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -168,7 +168,7 @@ def __init__(self, destination_params, job_id, job_manager_interface): self.job_manager_interface = job_manager_interface def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, - dynamic_file_sources=None, token_endpoint=None): + dynamic_file_sources=None, token_endpoint=None, staging_manifest=None): """ Queue up the execution of the supplied `command_line` on the remote server. Called launch for historical reasons, should be renamed to @@ -374,6 +374,90 @@ def _build_setup_message(self, command_line, dependencies_description, env, remo launch_params["setup_params"] = setup_params return launch_params + def get_pulsar_app_config( + self, + pulsar_app_config, + container, + wait_after_submission, + manager_name, + manager_type, + dependencies_description, + ): + + pulsar_app_config = pulsar_app_config or {} + manager_config = self._ensure_manager_config( + pulsar_app_config, + manager_name, + manager_type, + ) + + if ( + "staging_directory" not in manager_config and "staging_directory" not in pulsar_app_config + ): + pulsar_app_config["staging_directory"] = CONTAINER_STAGING_DIRECTORY + + if self.amqp_key_prefix: + pulsar_app_config["amqp_key_prefix"] = self.amqp_key_prefix + + if "monitor" not in manager_config: + manager_config["monitor"] = ( + MonitorStyle.BACKGROUND.value + if wait_after_submission + else MonitorStyle.NONE.value + ) + if "persistence_directory" not in pulsar_app_config: + pulsar_app_config["persistence_directory"] = os.path.join( + CONTAINER_STAGING_DIRECTORY, "persisted_data" + ) + elif "manager" in pulsar_app_config and manager_name != "_default_": + log.warning( + "'manager' set in app config but client has non-default manager '%s', this will cause communication" + " failures, remove `manager` from app or client config to fix", + manager_name, + ) + + using_dependencies = container is None and dependencies_description is not None + if using_dependencies and "dependency_resolution" not in pulsar_app_config: + # Setup default dependency resolution for container above... + dependency_resolution = { + "cache": False, + "use": True, + "default_base_path": "/pulsar_dependencies", + "cache_dir": "/pulsar_dependencies/_cache", + "resolvers": [ + { # TODO: add CVMFS resolution... + "type": "conda", + "auto_init": True, + "auto_install": True, + "prefix": "/pulsar_dependencies/conda", + }, + { + "type": "conda", + "auto_init": True, + "auto_install": True, + "prefix": "/pulsar_dependencies/conda", + "versionless": True, + }, + ], + } + pulsar_app_config["dependency_resolution"] = dependency_resolution + return pulsar_app_config + + def _ensure_manager_config(self, pulsar_app_config, manager_name, manager_type): + if "manager" in pulsar_app_config: + manager_config = pulsar_app_config["manager"] + elif "managers" in pulsar_app_config: + managers_config = pulsar_app_config["managers"] + if manager_name not in managers_config: + managers_config[manager_name] = {} + manager_config = managers_config[manager_name] + else: + manager_config = {} + pulsar_app_config["manager"] = manager_config + if "type" not in manager_config: + manager_config["type"] = manager_type + return manager_config + class MessagingClientManagerProtocol(ClientManagerProtocol): status_cache: Dict[str, Dict[str, Any]] @@ -405,7 +489,7 @@ def _build_status_request_message(self): class MessageJobClient(BaseMessageJobClient): def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, - dynamic_file_sources=None, token_endpoint=None): + dynamic_file_sources=None, token_endpoint=None, staging_manifest=None): """ """ launch_params = self._build_setup_message( @@ -439,7 +523,7 @@ def __init__(self, destination_params, job_id, client_manager, shell): self.shell = shell def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None, - dynamic_file_sources=None, token_endpoint=None): + dynamic_file_sources=None, token_endpoint=None, staging_manifest=None): """ """ launch_params = self._build_setup_message( @@ -491,7 +575,8 @@ def launch( dynamic_file_sources=None, container_info=None, token_endpoint=None, - pulsar_app_config=None + pulsar_app_config=None, + staging_manifest=None, ) -> Optional[ExternalId]: """ """ @@ -513,48 +598,15 @@ def launch( manager_name = self.client_manager.manager_name manager_type = "coexecution" if container is not None else "unqueued" - pulsar_app_config = pulsar_app_config or {} - manager_config = self._ensure_manager_config( - pulsar_app_config, manager_name, manager_type, + pulsar_app_config = self.get_pulsar_app_config( + pulsar_app_config=pulsar_app_config, + container=container, + wait_after_submission=wait_after_submission, + manager_name=manager_name, + manager_type=manager_type, + dependencies_description=dependencies_description, ) - if "staging_directory" not in manager_config and "staging_directory" not in pulsar_app_config: - pulsar_app_config["staging_directory"] = CONTAINER_STAGING_DIRECTORY - - if self.amqp_key_prefix: - pulsar_app_config["amqp_key_prefix"] = self.amqp_key_prefix - - if "monitor" not in manager_config: - manager_config["monitor"] = MonitorStyle.BACKGROUND.value if wait_after_submission else MonitorStyle.NONE.value - if "persistence_directory" not in pulsar_app_config: - pulsar_app_config["persistence_directory"] = os.path.join(CONTAINER_STAGING_DIRECTORY, "persisted_data") - elif "manager" in pulsar_app_config and manager_name != '_default_': - log.warning( - "'manager' set in app config but client has non-default manager '%s', this will cause communication" - " failures, remove `manager` from app or client config to fix", manager_name) - - using_dependencies = container is None and dependencies_description is not None - if using_dependencies and "dependency_resolution" not in pulsar_app_config: - # Setup default dependency resolution for container above... - dependency_resolution = { - "cache": False, - "use": True, - "default_base_path": "/pulsar_dependencies", - "cache_dir": "/pulsar_dependencies/_cache", - "resolvers": [{ # TODO: add CVMFS resolution... - "type": "conda", - "auto_init": True, - "auto_install": True, - "prefix": '/pulsar_dependencies/conda', - }, { - "type": "conda", - "auto_init": True, - "auto_install": True, - "prefix": '/pulsar_dependencies/conda', - "versionless": True, - }] - } - pulsar_app_config["dependency_resolution"] = dependency_resolution base64_message = to_base64_json(launch_params) base64_app_conf = to_base64_json(pulsar_app_config) pulsar_container_image = self.pulsar_container_image @@ -606,21 +658,6 @@ def _pulsar_script_args(self, manager_name, base64_job, base64_app_conf, wait_ar manager_args.extend(["--base64", base64_job, "--app_conf_base64", base64_app_conf]) return manager_args - def _ensure_manager_config(self, pulsar_app_config, manager_name, manager_type): - if "manager" in pulsar_app_config: - manager_config = pulsar_app_config["manager"] - elif "managers" in pulsar_app_config: - managers_config = pulsar_app_config["managers"] - if manager_name not in managers_config: - managers_config[manager_name] = {} - manager_config = managers_config[manager_name] - else: - manager_config = {} - pulsar_app_config["manager"] = manager_config - if "type" not in manager_config: - manager_config["type"] = manager_type - return manager_config - def _launch_containers( self, pulsar_submit_container: CoexecutionContainerCommand, diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py index ea8f8aa1..82f9d348 100644 --- a/pulsar/client/staging/down.py +++ b/pulsar/client/staging/down.py @@ -72,6 +72,9 @@ def collect(self): self.__collect_other_working_directory_files() self.__collect_metadata_directory_files() self.__collect_job_directory_files() + # Give actions that require a final action, like those that write a manifest, to write out their content + self.__finalize_action_mapper() + # finalize collection here for executors that need this ? return self.exception_tracker.collection_failure_exceptions def __collect_working_directory_outputs(self): @@ -134,6 +137,9 @@ def __collect_job_directory_files(self): 'output_jobdir', ) + def __finalize_action_mapper(self): + self.action_mapper.finalize() + def __realized_dynamic_file_source_references(self): references = {"filename": [], "extra_files": []} diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index 9f08bef8..3b8f8eaa 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -71,6 +71,19 @@ def submit_job(client, client_job_description, job_config=None): # it needs to be in the response to Pulsar even Pulsar is inititing staging actions launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources launch_kwds["token_endpoint"] = client.token_endpoint + + # populate `to_path` + staging_manifest = [] + for action in file_stager.action_mapper.actions: + if action.file_type not in ("output", "output_workdir"): + name = basename(action.path) + path = file_stager.job_directory.calculate_path(name, action.file_type) + action.write_to_path(path) + staging_manifest.append(action.finalize()) + + if staging_manifest: + launch_kwds["staging_manifest"] = staging_manifest + # for pulsar modalities that skip the explicit "setup" step, give them a chance to set an external # id from the submission process (e.g. to TES). launch_response = client.launch(**launch_kwds) diff --git a/pulsar/client/util.py b/pulsar/client/util.py index e10531ed..09cedb69 100644 --- a/pulsar/client/util.py +++ b/pulsar/client/util.py @@ -60,7 +60,7 @@ def _copy_and_close(object, output): @wraps(_b64encode) def b64encode(val, **kwargs): try: - return _b64encode(val, **kwargs) + return _b64encode(val, **kwargs).decode("utf-8") except TypeError: return _b64encode(val.encode('UTF-8'), **kwargs).decode('UTF-8') diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py index 6ca07e91..b7d323a7 100644 --- a/pulsar/managers/base/__init__.py +++ b/pulsar/managers/base/__init__.py @@ -232,7 +232,7 @@ class JobDirectory(RemoteJobDirectory): def __init__( self, staging_directory, - job_id, + job_id=None, lock_manager=None, directory_maker=None ): @@ -240,7 +240,8 @@ def __init__( self._directory_maker = directory_maker or DirectoryMaker() self.lock_manager = lock_manager # Assert this job id isn't hacking path somehow. - assert job_id == basename(job_id) + if job_id: + assert job_id == basename(job_id) def _job_file(self, name): return os.path.join(self.job_directory, name) diff --git a/pulsar/managers/staging/post.py b/pulsar/managers/staging/post.py index 6ca59390..3c5fdd32 100644 --- a/pulsar/managers/staging/post.py +++ b/pulsar/managers/staging/post.py @@ -20,14 +20,14 @@ def postprocess(job_directory, action_executor, was_cancelled): staging_config = job_directory.load_metadata("launch_config").get("remote_staging", None) else: staging_config = None - collected = __collect_outputs(job_directory, staging_config, action_executor, was_cancelled) + file_action_mapper, collected = _collect_outputs(job_directory, staging_config, action_executor, was_cancelled) return collected finally: job_directory.write_file("postprocessed", "") return False -def __collect_outputs(job_directory, staging_config, action_executor, was_cancelled): +def _collect_outputs(job_directory, staging_config, action_executor, was_cancelled): collected = True if "action_mapper" in staging_config: file_action_mapper = action_mapper.FileActionMapper(config=staging_config["action_mapper"]) @@ -39,7 +39,7 @@ def __collect_outputs(job_directory, staging_config, action_executor, was_cancel if collection_failure_exceptions: log.warn("Failures collecting results %s" % collection_failure_exceptions) collected = False - return collected + return file_action_mapper, collected def realized_dynamic_file_sources(job_directory): diff --git a/pulsar/scripts/collect_output_manifest.py b/pulsar/scripts/collect_output_manifest.py new file mode 100644 index 00000000..72a7a1a4 --- /dev/null +++ b/pulsar/scripts/collect_output_manifest.py @@ -0,0 +1,45 @@ +import argparse +import json + +from pulsar.managers.base import JobDirectory +from pulsar.managers.staging.post import _collect_outputs +from pulsar.managers.util.retry import RetryActionExecutor + + +def make_parser(): + """Construct an argument parser used to call the script from the command line.""" + + parser = argparse.ArgumentParser(description="Create output staging manifest") + + parser.add_argument("--job-directory") + parser.add_argument("--staging-config-path", help="Path to staging config JSON file") + parser.add_argument("--output-manifest-path") + + return parser + + +def collect_outputs(job_directory: str, staging_config_path: str, output_manifest_path: str): + job_directory_ = JobDirectory(job_directory) + with open(staging_config_path) as staging_fh: + staging_config = json.load(staging_fh) + + action_mapper, _ = _collect_outputs( + job_directory_, + staging_config=staging_config, + action_executor=RetryActionExecutor(), + was_cancelled=lambda: False + ) + new_manifest = action_mapper.finalize() + with open(output_manifest_path, "w") as manifest_fh: + json.dump(new_manifest, manifest_fh) + + +def main(): + """Run the script from the command line.""" + parser = make_parser() + args = parser.parse_args() + collect_outputs(args.job_directory, args.staging_config_path, args.output_manifest_path) + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index 68faeed3..843f02cd 100644 --- a/setup.py +++ b/setup.py @@ -92,6 +92,7 @@ pulsar-submit=pulsar.scripts.submit:main pulsar-finish=pulsar.scripts.finish:main pulsar-run=pulsar.scripts.run:main + pulsar-create-output-manifest=pulsar.scripts.collect_output_manifest:main _pulsar-conda-init=pulsar.scripts._conda_init:main _pulsar-configure-slurm=pulsar.scripts._configure_slurm:main _pulsar-configure-galaxy-cvmfs=pulsar.scripts._configure_galaxy_cvmfs:main diff --git a/test/action_mapper_test.py b/test/action_mapper_test.py index 2131cb65..7783696f 100644 --- a/test/action_mapper_test.py +++ b/test/action_mapper_test.py @@ -4,6 +4,17 @@ ) +def test_action_mapper_finalization(): + client = _client("json_transfer") + mapper = FileActionMapper(client) + mapper.action({'path': '/opt/galaxy/tools/filters/catWrapper.py'}, 'input') + mapper.action({'path': '/the_file'}, 'input') + mapper_summary = mapper.finalize() + assert len(mapper_summary) == 2 + assert mapper_summary[0]["path"] == '/opt/galaxy/tools/filters/catWrapper.py' + assert mapper_summary[1]["path"] == '/the_file' + + def test_endpoint_validation(): client = _min_client("remote_transfer") mapper = FileActionMapper(client) diff --git a/test/integration_test_cli_submit.py b/test/test_cli_submit.py similarity index 100% rename from test/integration_test_cli_submit.py rename to test/test_cli_submit.py diff --git a/test/transfer_action_test.py b/test/transfer_action_test.py index 30b927f9..0cf3053d 100644 --- a/test/transfer_action_test.py +++ b/test/transfer_action_test.py @@ -1,7 +1,35 @@ import os from .test_utils import files_server -from pulsar.client.action_mapper import RemoteTransferAction +from pulsar.client.action_mapper import ( + JsonTransferAction, + RemoteTransferAction, +) + + +def test_write_to_path_json(): + with files_server() as (server, directory): + from_path = os.path.join(directory, "remote_get") + + to_path = os.path.join(directory, "local_get") + url = server.application_url + "?path=%s" % from_path + action = JsonTransferAction({"path": to_path}, url=url) + action.write_to_path(to_path) + assert action.path == to_path + assert action.url == url + assert action.finalize() == {"path": to_path, "url": url} + + +def test_write_from_file_json(): + with files_server() as (server, directory): + from_path = os.path.join(directory, "local_post") + to_path = os.path.join(directory, "remote_post") + url = server.application_url + "?path=%s" % to_path + action = JsonTransferAction({"path": to_path}, url=url) + action.write_from_path(from_path) + assert action.path == to_path + assert action.url == url + assert action.finalize() == {"path": to_path, "url": url} def test_write_to_file():