Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ flake8

mypy<=1.0.1 # https://github.com/pydantic/pydantic/issues/5192
types-paramiko
types-pkg-resources
types-setuptools
types-PyYAML
types-pycurl
types-requests
Expand Down
76 changes: 57 additions & 19 deletions docker/coexecutor/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,41 +1,79 @@
FROM conda/miniconda3
# use the root of the repository as context, i.e. `docker build . -f ./docker/coexecutor/Dockerfile`

FROM python:3.12-bookworm as build_wheel

ENV PIP_ROOT_USER_ACTION=ignore

WORKDIR /build

# install requirements
COPY requirements.txt .
COPY dev-requirements.txt .
RUN pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir setuptools -r requirements.txt -r dev-requirements.txt

# build Pulsar wheel
COPY . .
RUN python setup.py sdist bdist_wheel


FROM python:3.12-bookworm

ENV PYTHONUNBUFFERED 1
ENV PIP_ROOT_USER_ACTION=ignore
ENV DEBIAN_FRONTEND noninteractive
ENV PULSAR_CONFIG_CONDA_PREFIX /usr/local

# set up Galaxy Depot repository (provides SLURM DRMAA packages for Debian Buster and newer releases)
RUN apt-get update \
&& apt-get install -y --no-install-recommends ca-certificates curl gnupg \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
&& curl -fsSL "http://keyserver.ubuntu.com/pks/lookup?op=get&search=0x18381AC8832160AF" | gpg --dearmor -o /etc/apt/trusted.gpg.d/galaxy-depot.gpg \
&& echo "deb https://depot.galaxyproject.org/apt/ $(bash -c '. /etc/os-release; echo ${VERSION_CODENAME:-bookworm}') main" | tee /etc/apt/sources.list.d/galaxy-depot.list

# set up Debian Bullseye repository (and use it only for libslurm36, needed by slurm-drmaa1, and slurm)
RUN echo "deb http://deb.debian.org/debian/ bullseye main" > /etc/apt/sources.list.d/bullseye.list && \
cat <<EOF > /etc/apt/preferences.d/bullseye.pref
Package: *
Pin: release n=bullseye
Pin-Priority: -1

Package: libslurm36, slurm
Pin: release n=bullseye
Pin-Priority: 100
EOF

# set up CVMFS repository
RUN apt-get update \
&& apt-get install -y --no-install-recommends lsb-release wget \
&& wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \
&& dpkg -i cvmfs-release-latest_all.deb && rm -f cvmfs-release-latest_all.deb

# wget, gcc, pip - to build and install Pulsar.
# bzip2 for Miniconda.
# TODO: pycurl stuff...
RUN apt-get update \
&& apt-get install -y --no-install-recommends apt-transport-https \
RUN apt-get update && apt-get install -y --no-install-recommends \
# Install CVMFS client
&& apt-get install -y --no-install-recommends lsb-release wget \
&& wget https://ecsft.cern.ch/dist/cvmfs/cvmfs-release/cvmfs-release-latest_all.deb \
&& dpkg -i cvmfs-release-latest_all.deb \
&& rm -f cvmfs-release-latest_all.deb \
cvmfs cvmfs-config-default \
# Install packages
&& apt-get update \
&& apt-get install -y --no-install-recommends gcc \
libcurl4-openssl-dev \
cvmfs cvmfs-config-default \
slurm-llnl slurm-drmaa-dev \
bzip2 \
gcc libcurl4-openssl-dev \
munge libmunge-dev slurm slurm-drmaa-dev \
bzip2 \
# Install Pulsar Python requirements
&& pip install --no-cache-dir -U pip \
&& pip install --no-cache-dir drmaa wheel kombu pykube pycurl \
webob psutil PasteDeploy pyyaml paramiko \
# Remove build deps and cleanup
&& apt-get -y remove gcc wget lsb-release \
&& apt-get -y autoremove \
&& apt-get autoclean \
&& rm -rf /var/lib/apt/lists/* /var/log/dpkg.log \
&& /usr/sbin/create-munge-key
&& apt-get clean && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log

COPY --from=build_wheel /build/dist/pulsar_app-*-py2.py3-none-any.whl /

ADD pulsar_app-*-py2.py3-none-any.whl /pulsar_app-*-py2.py3-none-any.whl
SHELL ["/bin/bash", "-c"]

RUN pip install --upgrade setuptools && pip install pyOpenSSL --upgrade && pip install cryptography --upgrade
RUN pip install --no-cache-dir /pulsar_app-*-py2.py3-none-any.whl[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl
RUN pip install --no-cache-dir --upgrade setuptools pyOpenSSL cryptography
RUN pip install --no-cache-dir "$(echo /pulsar_app-*-py2.py3-none-any.whl)"[galaxy_extended_metadata] && rm /pulsar_app-*-py2.py3-none-any.whl
RUN pip install --upgrade 'importlib-metadata<5.0'
RUN _pulsar-configure-galaxy-cvmfs
RUN _pulsar-conda-init --conda_prefix=/pulsar_dependencies/conda
68 changes: 60 additions & 8 deletions pulsar/client/action_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Any,
Dict,
List,
Optional,
Type,
)
from urllib.parse import urlencode
Expand Down Expand Up @@ -190,6 +191,9 @@ def __init__(self, client=None, config=None):
self.ssh_port = config.get("ssh_port", None)
self.mappers = mappers_from_dicts(config.get("paths", []))
self.files_endpoint = config.get("files_endpoint", None)
self.actions = []
# Might want to make the working directory available here so that we know where to place archive
# for archive action

def action(self, source, type, mapper=None):
path = source.get("path", None)
Expand All @@ -200,10 +204,14 @@ def action(self, source, type, mapper=None):
if mapper:
file_lister = mapper.file_lister
action_kwds = mapper.action_kwds
action = action_class(source, file_lister=file_lister, **action_kwds)
action = action_class(source, file_lister=file_lister, file_type=type, **action_kwds)
self.__process_action(action, type)
self.actions.append(action)
return action

def finalize(self):
return [_ for _ in (action.finalize() for action in self.actions) if _]

def unstructured_mappers(self):
""" Return mappers that will map 'unstructured' files (i.e. go beyond
mapping inputs, outputs, and config files).
Expand Down Expand Up @@ -265,6 +273,7 @@ def __process_action(self, action, file_type):
""" Extension point to populate extra action information after an
action has been created.
"""
action.file_type = file_type
if getattr(action, "inject_url", False):
self.__inject_url(action, file_type)
if getattr(action, "inject_ssh_properties", False):
Expand Down Expand Up @@ -300,10 +309,12 @@ class BaseAction:
whole_directory_transfer_supported = False
action_spec: Dict[str, Any] = {}
action_type: str
file_type: Optional[str] = None

def __init__(self, source, file_lister=None):
def __init__(self, source, file_lister=None, file_type=None):
self.source = source
self.file_lister = file_lister or DEFAULT_FILE_LISTER
self.file_type = file_type

@property
def path(self):
Expand Down Expand Up @@ -342,6 +353,9 @@ def _extend_base_dict(self, **kwds):
base_dict.update(**kwds)
return base_dict

def finalize(self):
pass

def to_dict(self):
return self._extend_base_dict()

Expand Down Expand Up @@ -390,8 +404,8 @@ class RewriteAction(BaseAction):
action_type = "rewrite"
staging = STAGING_ACTION_NONE

def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None):
super().__init__(source, file_lister=file_lister)
def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None, file_type=None):
super().__init__(source, file_lister=file_lister, file_type=file_type)
self.source_directory = source_directory
self.destination_directory = destination_directory

Expand Down Expand Up @@ -467,8 +481,8 @@ class RemoteTransferAction(BaseAction):
action_type = "remote_transfer"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None):
super().__init__(source, file_lister=file_lister)
def __init__(self, source, file_lister=None, url=None, file_type=None):
super().__init__(source, file_lister=file_lister, file_type=file_type)
self.url = url

def to_dict(self):
Expand All @@ -495,8 +509,8 @@ class RemoteTransferTusAction(BaseAction):
action_type = "remote_transfer_tus"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None):
super().__init__(source, file_lister=file_lister)
def __init__(self, source, file_lister=None, url=None, file_type=None):
super().__init__(source, file_lister=file_lister, file_type=file_type)
self.url = url

def to_dict(self):
Expand All @@ -513,6 +527,42 @@ def write_from_path(self, pulsar_path):
tus_upload_file(self.url, pulsar_path)


class JsonTransferAction(BaseAction):
"""
This action indicates that the pulsar server should create a JSON manifest that can be used to stage files by an
external system that can stage files in and out of the compute environment.
"""
inject_url = True
whole_directory_transfer_supported = True
action_type = "json_transfer"
staging = STAGING_ACTION_REMOTE

def __init__(self, source, file_lister=None, url=None, file_type=None):
super().__init__(source, file_lister, file_type)
self.url = url
self._from_path = None
self._to_path = None

@classmethod
def from_dict(cls, action_dict):
return JsonTransferAction(source=action_dict["source"], url=action_dict["url"])

def to_dict(self):
return self._extend_base_dict(url=self.url)

def write_to_path(self, path):
self._to_path = path

def write_from_path(self, pulsar_path: str):
self._from_path = pulsar_path

def finalize(self):
if self._to_path:
return {"url": self.url, "to_path": self._to_path}
else:
return {"url": self.url, "from_path": self._from_path}


class RemoteObjectStoreCopyAction(BaseAction):
"""
"""
Expand Down Expand Up @@ -664,6 +714,7 @@ def write_to_path(self, path):


DICTIFIABLE_ACTION_CLASSES = [
JsonTransferAction,
RemoteCopyAction,
RemoteTransferAction,
RemoteTransferTusAction,
Expand Down Expand Up @@ -844,6 +895,7 @@ def unstructured_map(self, path):

ACTION_CLASSES: List[Type[BaseAction]] = [
NoneAction,
JsonTransferAction,
RewriteAction,
TransferAction,
CopyAction,
Expand Down
Loading
Loading