Skip to content

Introduce file staging delegation via JSON staging manifest #399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ flake8

mypy<=1.0.1 # https://github.com/pydantic/pydantic/issues/5192
types-paramiko
types-pkg-resources
types-setuptools
types-PyYAML
types-pycurl
types-requests
Expand Down
76 changes: 57 additions & 19 deletions docker/coexecutor/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,41 +1,79 @@
FROM conda/miniconda3
# use the root of the repository as context, i.e. `docker build . -f ./docker/coexecutor/Dockerfile`

FROM python:3.12-bookworm as build_wheel

ENV PIP_ROOT_USER_ACTION=ignore

WORKDIR /build

# install requirements
COPY requirements.txt .
COPY dev-requirements.txt .
RUN pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir setuptools -r requirements.txt -r dev-requirements.txt

# build Pulsar wheel
COPY . .
RUN python setup.py sdist bdist_wheel


FROM python:3.12-bookworm

ENV PYTHONUNBUFFERED 1
ENV PIP_ROOT_USER_ACTION=ignore
ENV DEBIAN_FRONTEND noninteractive
ENV PULSAR_CONFIG_CONDA_PREFIX /usr/local

# set up Galaxy Depot repository (provides SLURM DRMAA packages for Debian Buster and newer releases)
RUN apt-get update \
&& apt-get install -y --no-install-recommends ca-certificates curl gnupg \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
&& curl -fsSL "http://keyserver.ubuntu.com/pks/lookup?op=get&search=0x18381AC8832160AF" | gpg --dearmor -o /etc/apt/trusted.gpg.d/galaxy-depot.gpg \
&& echo "deb https://depot.galaxyproject.org/apt/ $(bash -c '. /etc/os-release; echo ${VERSION_CODENAME:-bookworm}') main" | tee /etc/apt/sources.list.d/galaxy-depot.list

# set up Debian Bullseye repository (and use it only for libslurm36, needed by slurm-drmaa1, and slurm)
RUN echo "deb http://deb.debian.org/debian/ bullseye main" > /etc/apt/sources.list.d/bullseye.list && \
cat <<EOF > /etc/apt/preferences.d/bullseye.pref
Package: *
Pin: release n=bullseye
Pin-Priority: -1

Package: libslurm36, slurm
Pin: release n=bullseye
Pin-Priority: 100
EOF

# set up CVMFS repository
RUN apt-get update \
&& apt-get install -y --no-install-recommends lsb-release wget \
&& wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \
&& dpkg -i cvmfs-release-latest_all.deb && rm -f cvmfs-release-latest_all.deb

# wget, gcc, pip - to build and install Pulsar.
# bzip2 for Miniconda.
# TODO: pycurl stuff...
RUN apt-get update \
&& apt-get install -y --no-install-recommends apt-transport-https \
RUN apt-get update && apt-get install -y --no-install-recommends \
# Install CVMFS client
&& apt-get install -y --no-install-recommends lsb-release wget \
&& wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \
&& dpkg -i cvmfs-release-latest_all.deb \
&& rm -f cvmfs-release-latest_all.deb \
cvmfs cvmfs-config-default \
# Install packages
&& apt-get update \
&& apt-get install -y --no-install-recommends gcc \
libcurl4-openssl-dev \
cvmfs cvmfs-config-default \
slurm-llnl slurm-drmaa-dev \
bzip2 \
gcc libcurl4-openssl-dev \
munge libmunge-dev slurm slurm-drmaa-dev \
bzip2 \
# Install Pulsar Python requirements
&& pip install --no-cache-dir -U pip \
&& pip install --no-cache-dir drmaa wheel kombu pykube pycurl \
webob psutil PasteDeploy pyyaml paramiko \
# Remove build deps and cleanup
&& apt-get -y remove gcc wget lsb-release \
&& apt-get -y autoremove \
&& apt-get autoclean \
&& rm -rf /var/lib/apt/lists/* /var/log/dpkg.log \
&& /usr/sbin/create-munge-key
&& apt-get clean && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log

COPY --from=build_wheel /build/dist/pulsar_app-*-py2.py3-none-any.whl /

ADD pulsar_app-*-py2.py3-none-any.whl /pulsar_app-*-py2.py3-none-any.whl
SHELL ["/bin/bash", "-c"]

RUN pip install --upgrade setuptools && pip install pyOpenSSL --upgrade && pip install cryptography --upgrade
RUN pip install --no-cache-dir /pulsar_app-*-py2.py3-none-any.whl[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl
RUN pip install --no-cache-dir --upgrade setuptools pyOpenSSL cryptography
RUN pip install --no-cache-dir "$(echo /pulsar_app-*-py2.py3-none-any.whl)"[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl
RUN pip install --upgrade 'importlib-metadata<5.0'
RUN _pulsar-configure-galaxy-cvmfs
RUN _pulsar-conda-init --conda_prefix=/pulsar_dependencies/conda
68 changes: 60 additions & 8 deletions pulsar/client/action_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Any,
Dict,
List,
Optional,
Type,
)
from urllib.parse import urlencode
Expand Down Expand Up @@ -190,6 +191,9 @@ def __init__(self, client=None, config=None):
self.ssh_port = config.get("ssh_port", None)
self.mappers = mappers_from_dicts(config.get("paths", []))
self.files_endpoint = config.get("files_endpoint", None)
self.actions = []
# Might want to make the working directory available here so that we know where to place archive
# for archive action

def action(self, source, type, mapper=None):
path = source.get("path", None)
Expand All @@ -200,10 +204,14 @@ def action(self, source, type, mapper=None):
if mapper:
file_lister = mapper.file_lister
action_kwds = mapper.action_kwds
action = action_class(source, file_lister=file_lister, **action_kwds)
action = action_class(source, file_lister=file_lister, file_type=type, **action_kwds)
self.__process_action(action, type)
self.actions.append(action)
return action

def finalize(self):
return [_ for _ in (action.finalize() for action in self.actions) if _]

def unstructured_mappers(self):
""" Return mappers that will map 'unstructured' files (i.e. go beyond
mapping inputs, outputs, and config files).
Expand Down Expand Up @@ -265,6 +273,7 @@ def __process_action(self, action, file_type):
""" Extension point to populate extra action information after an
action has been created.
"""
action.file_type = file_type
if getattr(action, "inject_url", False):
self.__inject_url(action, file_type)
if getattr(action, "inject_ssh_properties", False):
Expand Down Expand Up @@ -300,10 +309,12 @@ class BaseAction:
whole_directory_transfer_supported = False
action_spec: Dict[str, Any] = {}
action_type: str
file_type: Optional[str] = None

def __init__(self, source, file_lister=None):
def __init__(self, source, file_lister=None, file_type=None):
self.source = source
self.file_lister = file_lister or DEFAULT_FILE_LISTER
self.file_type = file_type

@property
def path(self):
Expand Down Expand Up @@ -342,6 +353,9 @@ def _extend_base_dict(self, **kwds):
base_dict.update(**kwds)
return base_dict

def finalize(self):
pass

def to_dict(self):
return self._extend_base_dict()

Expand Down Expand Up @@ -390,8 +404,8 @@ class RewriteAction(BaseAction):
action_type = "rewrite"
staging = STAGING_ACTION_NONE

def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None):
super().__init__(source, file_lister=file_lister)
def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None, file_type=None):
super().__init__(source, file_lister=file_lister, file_type=file_type)
self.source_directory = source_directory
self.destination_directory = destination_directory

Expand Down Expand Up @@ -467,8 +481,8 @@ class RemoteTransferAction(BaseAction):
action_type = "remote_transfer"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None):
super().__init__(source, file_lister=file_lister)
def __init__(self, source, file_lister=None, url=None, file_type=None):
super().__init__(source, file_lister=file_lister, file_type=file_type)
self.url = url

def to_dict(self):
Expand All @@ -495,8 +509,8 @@ class RemoteTransferTusAction(BaseAction):
action_type = "remote_transfer_tus"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None):
super().__init__(source, file_lister=file_lister)
def __init__(self, source, file_lister=None, url=None, file_type=None):
super().__init__(source, file_lister=file_lister, file_type=file_type)
self.url = url

def to_dict(self):
Expand All @@ -513,6 +527,42 @@ def write_from_path(self, pulsar_path):
tus_upload_file(self.url, pulsar_path)


class JsonTransferAction(BaseAction):
"""
This action indicates that the pulsar server should create a JSON manifest that can be used to stage files by an
external system that can stage files in and out of the compute environment.
"""
inject_url = True
whole_directory_transfer_supported = True
action_type = "json_transfer"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None, file_type=None):
super().__init__(source, file_lister, file_type)
self.url = url
self._from_path = None
self._to_path = None

@classmethod
def from_dict(cls, action_dict):
return JsonTransferAction(source=action_dict["source"], url=action_dict["url"])

def to_dict(self):
return self._extend_base_dict(url=self.url)

def write_to_path(self, path):
self._to_path = path

def write_from_path(self, pulsar_path: str):
self._from_path = pulsar_path

def finalize(self):
if self._to_path:
return {"url": self.url, "to_path": self._to_path}
else:
return {"url": self.url, "from_path": self._from_path}


class RemoteObjectStoreCopyAction(BaseAction):
"""
"""
Expand Down Expand Up @@ -664,6 +714,7 @@ def write_to_path(self, path):


DICTIFIABLE_ACTION_CLASSES = [
JsonTransferAction,
RemoteCopyAction,
RemoteTransferAction,
RemoteTransferTusAction,
Expand Down Expand Up @@ -844,6 +895,7 @@ def unstructured_map(self, path):

ACTION_CLASSES: List[Type[BaseAction]] = [
NoneAction,
JsonTransferAction,
RewriteAction,
TransferAction,
CopyAction,
Expand Down
Loading
Loading