Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ flake8

mypy<=1.0.1 # https://github.com/pydantic/pydantic/issues/5192
types-paramiko
types-pkg-resources
types-setuptools
types-PyYAML
types-pycurl
types-requests
Expand Down
57 changes: 38 additions & 19 deletions docker/coexecutor/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,41 +1,60 @@
FROM conda/miniconda3
FROM python:3.12-bookworm

ENV PYTHONUNBUFFERED 1
ENV PIP_ROOT_USER_ACTION=ignore
ENV DEBIAN_FRONTEND noninteractive
ENV PULSAR_CONFIG_CONDA_PREFIX /usr/local

# set up Galaxy Depot repository (provides SLURM DRMAA packages for Debian Buster and newer releases)
RUN apt-get update \
&& apt-get install -y --no-install-recommends ca-certificates curl gnupg \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
&& curl -fsSL "http://keyserver.ubuntu.com/pks/lookup?op=get&search=0x18381AC8832160AF" | gpg --dearmor -o /etc/apt/trusted.gpg.d/galaxy-depot.gpg \
&& echo "deb https://depot.galaxyproject.org/apt/ $(bash -c '. /etc/os-release; echo ${VERSION_CODENAME:-bookworm}') main" | tee /etc/apt/sources.list.d/galaxy-depot.list

# set up Debian Bullseye repository (and use it only for libslurm36, needed by slurm-drmaa1, and slurm)
RUN echo "deb http://deb.debian.org/debian/ bullseye main" > /etc/apt/sources.list.d/bullseye.list && \
cat <<EOF > /etc/apt/preferences.d/bullseye.pref
Package: *
Pin: release n=bullseye
Pin-Priority: -1

Package: libslurm36, slurm
Pin: release n=bullseye
Pin-Priority: 100
EOF

# set up CVMFS repository
RUN apt-get update \
&& apt-get install -y --no-install-recommends lsb-release wget \
&& wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \
&& dpkg -i cvmfs-release-latest_all.deb && rm -f cvmfs-release-latest_all.deb

# wget, gcc, pip - to build and install Pulsar.
# bzip2 for Miniconda.
# TODO: pycurl stuff...
RUN apt-get update \
&& apt-get install -y --no-install-recommends apt-transport-https \
RUN apt-get update && apt-get install -y --no-install-recommends \
# Install CVMFS client
&& apt-get install -y --no-install-recommends lsb-release wget \
&& wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \
&& dpkg -i cvmfs-release-latest_all.deb \
&& rm -f cvmfs-release-latest_all.deb \
cvmfs cvmfs-config-default \
# Install packages
&& apt-get update \
&& apt-get install -y --no-install-recommends gcc \
libcurl4-openssl-dev \
cvmfs cvmfs-config-default \
slurm-llnl slurm-drmaa-dev \
bzip2 \
gcc libcurl4-openssl-dev \
munge libmunge-dev slurm slurm-drmaa-dev \
bzip2 \
# Install Pulsar Python requirements
&& pip install --no-cache-dir -U pip \
&& pip install --no-cache-dir drmaa wheel kombu pykube pycurl \
webob psutil PasteDeploy pyyaml paramiko \
# Remove build deps and cleanup
&& apt-get -y remove gcc wget lsb-release \
&& apt-get -y autoremove \
&& apt-get autoclean \
&& rm -rf /var/lib/apt/lists/* /var/log/dpkg.log \
&& /usr/sbin/create-munge-key
&& apt-get clean && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log

ADD pulsar_app-*-py2.py3-none-any.whl /

ADD pulsar_app-*-py2.py3-none-any.whl /pulsar_app-*-py2.py3-none-any.whl
SHELL ["/bin/bash", "-c"]

RUN pip install --upgrade setuptools && pip install pyOpenSSL --upgrade && pip install cryptography --upgrade
RUN pip install --no-cache-dir /pulsar_app-*-py2.py3-none-any.whl[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl
RUN pip install --no-cache-dir --upgrade setuptools pyOpenSSL cryptography
RUN pip install --no-cache-dir "$(echo /pulsar_app-*-py2.py3-none-any.whl)"[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl
RUN pip install --upgrade 'importlib-metadata<5.0'
RUN _pulsar-configure-galaxy-cvmfs
RUN _pulsar-conda-init --conda_prefix=/pulsar_dependencies/conda
68 changes: 60 additions & 8 deletions pulsar/client/action_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Any,
Dict,
List,
Optional,
Type,
)
from urllib.parse import urlencode
Expand Down Expand Up @@ -190,6 +191,9 @@ def __init__(self, client=None, config=None):
self.ssh_port = config.get("ssh_port", None)
self.mappers = mappers_from_dicts(config.get("paths", []))
self.files_endpoint = config.get("files_endpoint", None)
self.actions = []
# Might want to make the working directory available here so that we know where to place archive
# for archive action

def action(self, source, type, mapper=None):
path = source.get("path", None)
Expand All @@ -200,10 +204,14 @@ def action(self, source, type, mapper=None):
if mapper:
file_lister = mapper.file_lister
action_kwds = mapper.action_kwds
action = action_class(source, file_lister=file_lister, **action_kwds)
action = action_class(source, file_lister=file_lister, file_type=type, **action_kwds)
self.__process_action(action, type)
self.actions.append(action)
return action

def finalize(self):
return [_ for _ in (action.finalize() for action in self.actions) if _]

def unstructured_mappers(self):
""" Return mappers that will map 'unstructured' files (i.e. go beyond
mapping inputs, outputs, and config files).
Expand Down Expand Up @@ -265,6 +273,7 @@ def __process_action(self, action, file_type):
""" Extension point to populate extra action information after an
action has been created.
"""
action.file_type = file_type
if getattr(action, "inject_url", False):
self.__inject_url(action, file_type)
if getattr(action, "inject_ssh_properties", False):
Expand Down Expand Up @@ -300,10 +309,12 @@ class BaseAction:
whole_directory_transfer_supported = False
action_spec: Dict[str, Any] = {}
action_type: str
file_type: Optional[str] = None

def __init__(self, source, file_lister=None):
def __init__(self, source, file_lister=None, file_type=None):
self.source = source
self.file_lister = file_lister or DEFAULT_FILE_LISTER
self.file_type = file_type

@property
def path(self):
Expand Down Expand Up @@ -342,6 +353,9 @@ def _extend_base_dict(self, **kwds):
base_dict.update(**kwds)
return base_dict

def finalize(self):
pass

def to_dict(self):
return self._extend_base_dict()

Expand Down Expand Up @@ -390,8 +404,8 @@ class RewriteAction(BaseAction):
action_type = "rewrite"
staging = STAGING_ACTION_NONE

def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None):
super().__init__(source, file_lister=file_lister)
def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None, file_type=None):
super().__init__(source, file_lister=file_lister, file_type=file_type)
self.source_directory = source_directory
self.destination_directory = destination_directory

Expand Down Expand Up @@ -467,8 +481,8 @@ class RemoteTransferAction(BaseAction):
action_type = "remote_transfer"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None):
super().__init__(source, file_lister=file_lister)
def __init__(self, source, file_lister=None, url=None, file_type=None):
super().__init__(source, file_lister=file_lister, file_type=file_type)
self.url = url

def to_dict(self):
Expand All @@ -495,8 +509,8 @@ class RemoteTransferTusAction(BaseAction):
action_type = "remote_transfer_tus"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None):
super().__init__(source, file_lister=file_lister)
def __init__(self, source, file_lister=None, url=None, file_type=None):
super().__init__(source, file_lister=file_lister, file_type=file_type)
self.url = url

def to_dict(self):
Expand All @@ -513,6 +527,42 @@ def write_from_path(self, pulsar_path):
tus_upload_file(self.url, pulsar_path)


class JsonTransferAction(BaseAction):
"""
This action indicates that the pulsar server should create a JSON manifest that can be used to stage files by an
external system that can stage files in and out of the compute environment.
"""
inject_url = True
whole_directory_transfer_supported = True
action_type = "json_transfer"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None, file_type=None):
super().__init__(source, file_lister, file_type)
self.url = url
self._from_path = None
self._to_path = None

@classmethod
def from_dict(cls, action_dict):
return JsonTransferAction(source=action_dict["source"], url=action_dict["url"])

def to_dict(self):
return self._extend_base_dict(url=self.url)

def write_to_path(self, path):
self._to_path = path

def write_from_path(self, pulsar_path: str):
self._from_path = pulsar_path

def finalize(self):
if self._to_path:
return {"url": self.url, "to_path": self._to_path}
else:
return {"url": self.url, "from_path": self._from_path}


class RemoteObjectStoreCopyAction(BaseAction):
"""
"""
Expand Down Expand Up @@ -664,6 +714,7 @@ def write_to_path(self, path):


DICTIFIABLE_ACTION_CLASSES = [
JsonTransferAction,
RemoteCopyAction,
RemoteTransferAction,
RemoteTransferTusAction,
Expand Down Expand Up @@ -844,6 +895,7 @@ def unstructured_map(self, path):

ACTION_CLASSES: List[Type[BaseAction]] = [
NoneAction,
JsonTransferAction,
RewriteAction,
TransferAction,
CopyAction,
Expand Down
9 changes: 5 additions & 4 deletions pulsar/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def __init__(self, destination_params, job_id, job_manager_interface):
self.job_manager_interface = job_manager_interface

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
dynamic_file_sources=None, token_endpoint=None, staging_manifest=None):
"""
Queue up the execution of the supplied `command_line` on the remote
server. Called launch for historical reasons, should be renamed to
Expand Down Expand Up @@ -405,7 +405,7 @@ def _build_status_request_message(self):
class MessageJobClient(BaseMessageJobClient):

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
dynamic_file_sources=None, token_endpoint=None, staging_manifest=None):
"""
"""
launch_params = self._build_setup_message(
Expand Down Expand Up @@ -439,7 +439,7 @@ def __init__(self, destination_params, job_id, client_manager, shell):
self.shell = shell

def launch(self, command_line, dependencies_description=None, env=None, remote_staging=None, job_config=None,
dynamic_file_sources=None, token_endpoint=None):
dynamic_file_sources=None, token_endpoint=None, staging_manifest=None):
"""
"""
launch_params = self._build_setup_message(
Expand Down Expand Up @@ -491,7 +491,8 @@ def launch(
dynamic_file_sources=None,
container_info=None,
token_endpoint=None,
pulsar_app_config=None
pulsar_app_config=None,
staging_manifest=None
) -> Optional[ExternalId]:
"""
"""
Expand Down
6 changes: 6 additions & 0 deletions pulsar/client/staging/down.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ def collect(self):
self.__collect_other_working_directory_files()
self.__collect_metadata_directory_files()
self.__collect_job_directory_files()
# Give actions that require a final action, like those that write a manifest, to write out their content
self.__finalize_action_mapper()
# finalize collection here for executors that need this ?
return self.exception_tracker.collection_failure_exceptions

def __collect_working_directory_outputs(self):
Expand Down Expand Up @@ -134,6 +137,9 @@ def __collect_job_directory_files(self):
'output_jobdir',
)

def __finalize_action_mapper(self):
self.action_mapper.finalize()

def __realized_dynamic_file_source_references(self):
references = {"filename": [], "extra_files": []}

Expand Down
13 changes: 13 additions & 0 deletions pulsar/client/staging/up.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ def submit_job(client, client_job_description, job_config=None):
# it needs to be in the response to Pulsar even Pulsar is inititing staging actions
launch_kwds["dynamic_file_sources"] = client_job_description.client_outputs.dynamic_file_sources
launch_kwds["token_endpoint"] = client.token_endpoint

# populate `to_path`
staging_manifest = []
for action in file_stager.action_mapper.actions:
if action.file_type not in ("output", "output_workdir"):
name = basename(action.path)
path = file_stager.job_directory.calculate_path(name, action.file_type)
action.write_to_path(path)
staging_manifest.append(action.finalize())

if staging_manifest:
launch_kwds["staging_manifest"] = staging_manifest

# for pulsar modalities that skip the explicit "setup" step, give them a chance to set an external
# id from the submission process (e.g. to TES).
launch_response = client.launch(**launch_kwds)
Expand Down
5 changes: 3 additions & 2 deletions pulsar/managers/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,15 +232,16 @@ class JobDirectory(RemoteJobDirectory):
def __init__(
self,
staging_directory,
job_id,
job_id=None,
lock_manager=None,
directory_maker=None
):
super().__init__(staging_directory, remote_id=job_id, remote_sep=sep)
self._directory_maker = directory_maker or DirectoryMaker()
self.lock_manager = lock_manager
# Assert this job id isn't hacking path somehow.
assert job_id == basename(job_id)
if job_id:
assert job_id == basename(job_id)

def _job_file(self, name):
return os.path.join(self.job_directory, name)
Expand Down
6 changes: 3 additions & 3 deletions pulsar/managers/staging/post.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ def postprocess(job_directory, action_executor, was_cancelled):
staging_config = job_directory.load_metadata("launch_config").get("remote_staging", None)
else:
staging_config = None
collected = __collect_outputs(job_directory, staging_config, action_executor, was_cancelled)
file_action_mapper, collected = _collect_outputs(job_directory, staging_config, action_executor, was_cancelled)
return collected
finally:
job_directory.write_file("postprocessed", "")
return False


def __collect_outputs(job_directory, staging_config, action_executor, was_cancelled):
def _collect_outputs(job_directory, staging_config, action_executor, was_cancelled):
collected = True
if "action_mapper" in staging_config:
file_action_mapper = action_mapper.FileActionMapper(config=staging_config["action_mapper"])
Expand All @@ -39,7 +39,7 @@ def __collect_outputs(job_directory, staging_config, action_executor, was_cancel
if collection_failure_exceptions:
log.warn("Failures collecting results %s" % collection_failure_exceptions)
collected = False
return collected
return file_action_mapper, collected


def realized_dynamic_file_sources(job_directory):
Expand Down
Loading