diff --git a/client/src/api/schema/schema.ts b/client/src/api/schema/schema.ts index 67f4cd7e1821..0d87592004a1 100644 --- a/client/src/api/schema/schema.ts +++ b/client/src/api/schema/schema.ts @@ -2997,6 +2997,44 @@ export interface paths { patch?: never; trace?: never; }; + "/api/jobs/{job_id}/files": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + /** + * Get a file required to staging a job. + * @description Get a file required to staging a job (proper datasets, extra inputs, task-split inputs, working directory + * files). + * + * This API method is intended only for consumption by job runners, not end users. + */ + get: operations["index_api_jobs__job_id__files_get"]; + put?: never; + /** + * Populate an output file. + * @description Populate an output file (formal dataset, task split part, working directory file (such as those related to + * metadata). This should be a multipart POST with a 'file' parameter containing the contents of the actual file to + * create. + * + * This API method is intended only for consumption by job runners, not end users. + */ + post: operations["create_api_jobs__job_id__files_post"]; + delete?: never; + options?: never; + /** + * Get a file required to staging a job. + * @description Get a file required to staging a job (proper datasets, extra inputs, task-split inputs, working directory + * files). + * + * This API method is intended only for consumption by job runners, not end users. + */ + head: operations["index_api_jobs__job_id__files_head"]; + patch?: never; + trace?: never; + }; "/api/jobs/{job_id}/inputs": { parameters: { query?: never; @@ -6628,6 +6666,34 @@ export interface components { /** Name */ name?: unknown; }; + /** Body_create_api_jobs__job_id__files_post */ + Body_create_api_jobs__job_id__files_post: { + /** + * File + * Format: binary + */ + __file?: string; + /** File Path */ + __file_path?: string; + /** + * File + * Format: binary + * @description Contents of the file to create. + */ + file?: string; + /** + * Job Key + * @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. + */ + job_key?: string | null; + /** + * Path + * @description Path to file to create. + */ + path?: string | null; + /** Session Id */ + session_id?: string; + }; /** Body_create_form_api_libraries__library_id__contents_post */ Body_create_form_api_libraries__library_id__contents_post: { /** Create Type */ @@ -31164,6 +31230,171 @@ export interface operations { }; }; }; + index_api_jobs__job_id__files_get: { + parameters: { + query: { + /** @description Path to file. */ + path: string; + /** @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. */ + job_key: string; + }; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path: { + /** @description Encoded id string of the job. */ + job_id: string; + }; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description Contents of file. */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/octet-stream": unknown; + }; + }; + /** @description File not found, path does not refer to a file, or input dataset(s) for job have been purged. */ + 400: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; + create_api_jobs__job_id__files_post: { + parameters: { + query?: { + /** @description Path to file to create. */ + path?: string | null; + /** @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. */ + job_key?: string | null; + }; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path: { + /** @description Encoded id string of the job. */ + job_id: string; + }; + cookie?: never; + }; + requestBody?: { + content: { + "multipart/form-data": components["schemas"]["Body_create_api_jobs__job_id__files_post"]; + }; + }; + responses: { + /** @description An okay message. */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": unknown; + }; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; + index_api_jobs__job_id__files_head: { + parameters: { + query: { + /** @description Path to file. */ + path: string; + /** @description A key used to authenticate this request as acting on behalf or a job runner for the specified job. */ + job_key: string; + }; + header?: { + /** @description The user ID that will be used to effectively make this API call. Only admins and designated users can make API calls on behalf of other users. */ + "run-as"?: string | null; + }; + path: { + /** @description Encoded id string of the job. */ + job_id: string; + }; + cookie?: never; + }; + requestBody?: never; + responses: { + /** @description Contents of file. */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/octet-stream": unknown; + }; + }; + /** @description File not found, path does not refer to a file, or input dataset(s) for job have been purged. */ + 400: { + headers: { + [name: string]: unknown; + }; + content?: never; + }; + /** @description Request Error */ + "4XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + /** @description Server Error */ + "5XX": { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["MessageExceptionModel"]; + }; + }; + }; + }; get_inputs_api_jobs__job_id__inputs_get: { parameters: { query?: never; diff --git a/lib/galaxy/webapps/galaxy/api/job_files.py b/lib/galaxy/webapps/galaxy/api/job_files.py index 31a4974376e7..a8d3e792cba9 100644 --- a/lib/galaxy/webapps/galaxy/api/job_files.py +++ b/lib/galaxy/webapps/galaxy/api/job_files.py @@ -1,11 +1,28 @@ -"""API for asynchronous job running mechanisms can use to fetch or put files -related to running and queued jobs. +""" +API for asynchronous job running mechanisms can use to fetch or put files related to running and queued jobs. """ import logging import os import re import shutil +from typing import ( + cast, + IO, + Optional, +) +from urllib.parse import unquote + +from fastapi import ( + File, + Form, + Path, + Query, + Request, + UploadFile, +) +from fastapi.params import Depends +from typing_extensions import Annotated from galaxy import ( exceptions, @@ -13,55 +30,144 @@ ) from galaxy.managers.context import ProvidesAppContext from galaxy.model import Job -from galaxy.web import ( - expose_api_anonymous_and_sessionless, - expose_api_raw_anonymous_and_sessionless, +from galaxy.web import expose_api_anonymous_and_sessionless +from galaxy.webapps.base.api import GalaxyFileResponse +from galaxy.webapps.galaxy.api import ( + DependsOnTrans, + Router, ) from . import BaseGalaxyAPIController +__all__ = ("FastAPIJobFiles", "JobFilesAPIController", "router") + log = logging.getLogger(__name__) -class JobFilesAPIController(BaseGalaxyAPIController): - """This job files controller allows remote job running mechanisms to - read and modify the current state of files for queued and running jobs. - It is certainly not meant to represent part of Galaxy's stable, user - facing API. - - Furthermore, even if a user key corresponds to the user running the job, - it should not be accepted for authorization - this API allows access to - low-level unfiltered files and such authorization would break Galaxy's - security model for tool execution. +router = Router( + # keep the endpoint in the undocumented section of the API docs `/api/docs`, as all endpoints from `FastAPIJobFiles` + # are certainly not meant to represent part of Galaxy's stable, user facing API + tags=["undocumented"] +) + + +def path_query_or_form( + request: Request, + path_query: Annotated[Optional[str], Query(alias="path", description="Path to file to create.")] = None, + path: Annotated[Optional[str], Form(alias="path", description="Path to file to create.")] = None, +): """ + Accept `path` parameter both in query and form format. - @expose_api_raw_anonymous_and_sessionless - def index(self, trans: ProvidesAppContext, job_id, **kwargs): - """ - GET /api/jobs/{job_id}/files + This method does not force the client to provide the parameter, it could simply not submit the parameter in either + format. To force the client to provide the parameter, coerce the output of the method to a string, i.e. + `path: str = Depends(path_query_or_form)` so that FastAPI responds with status code 500 when the parameter is not + provided. + """ + return path_query or path + + +def job_key_query_or_form( + request: Request, + job_key_query: Annotated[ + Optional[str], + Query( + alias="job_key", + description=( + "A key used to authenticate this request as acting on behalf or a job runner for the specified job." + ), + ), + ] = None, + job_key: Annotated[ + Optional[str], + Form( + alias="job_key", + description=( + "A key used to authenticate this request as acting on behalf or a job runner for the specified job." + ), + ), + ] = None, +): + """ + Accept `job_key` parameter both in query and form format. + + This method does not force the client to provide the parameter, it could simply not submit the parameter in either + format. To force the client to provide the parameter, coerce the output of the method to a string, i.e. + `job_key: str = Depends(job_key_query_or_form)` so that FastAPI responds with status code 500 when the parameter is + not provided. + """ + return job_key_query or job_key - Get a file required to staging a job (proper datasets, extra inputs, - task-split inputs, working directory files). - :type job_id: str - :param job_id: encoded id string of the job - :type path: str - :param path: Path to file. - :type job_key: str - :param job_key: A key used to authenticate this request as acting on - behalf or a job runner for the specified job. +@router.cbv +class FastAPIJobFiles: + """ + This job files controller allows remote job running mechanisms to read and modify the current state of files for + queued and running jobs. It is certainly not meant to represent part of Galaxy's stable, user facing API. - ..note: - This API method is intended only for consumption by job runners, - not end users. + Furthermore, even if a user key corresponds to the user running the job, it should not be accepted for authorization + - this API allows access to low-level unfiltered files and such authorization would break Galaxy's security model + for tool execution. + """ - :rtype: binary - :returns: contents of file + # FastAPI answers HEAD requests automatically for GET endpoints. However, because of the way legacy WSGI endpoints + # are injected into the FastAPI app (using `app.mount("/", wsgi_handler)`), the built-in support for `HEAD` requests + # breaks, because such requests are passed to the `wsgi_handler` sub-application. This means that the endpoint still + # needs to include some code to handle this behavior, as tests existing before the migration to FastAPI expect HEAD + # requests to work. + # + @router.get( + # simplify me (remove `_args` and `_kwargs` defined using the walrus operator) when ALL endpoints have been + # migrated to FastAPI, this is a workaround for FastAPI bug https://github.com/fastapi/fastapi/issues/13175 + *(_args := ["/api/jobs/{job_id}/files"]), + **( + _kwargs := dict( + summary="Get a file required to staging a job.", + responses={ + 200: { + "description": "Contents of file.", + "content": {"application/json": None, "application/octet-stream": {"example": None}}, + }, + 400: { + "description": ( + "File not found, path does not refer to a file, or input dataset(s) for job have been purged." + ) + }, + }, + ) + ), + ) + @router.head(*_args, **_kwargs) # type: ignore[name-defined] + # remove `@router.head(...)` when ALL endpoints have been migrated to FastAPI + def index( + self, + job_id: Annotated[str, Path(description="Encoded id string of the job.")], + path: Annotated[ + str, + Query( + description="Path to file.", + ), + ], + job_key: Annotated[ + str, + Query( + description=( + "A key used to authenticate this request as acting on behalf or a job runner for the specified job." + ), + ), + ], + trans: ProvidesAppContext = DependsOnTrans, + ) -> GalaxyFileResponse: """ - job = self.__authorize_job_access(trans, job_id, **kwargs) - path = kwargs["path"] - try: - return open(path, "rb") - except FileNotFoundError: + Get a file required to staging a job (proper datasets, extra inputs, task-split inputs, working directory + files). + + This API method is intended only for consumption by job runners, not end users. + """ + path = unquote(path) + + job = self.__authorize_job_access(trans, job_id, path=path, job_key=job_key) + + if not os.path.exists(path): # We know that the job is not terminal, but users (or admin scripts) can purge input datasets. # Here we discriminate that case from truly unexpected bugs. # Not failing the job here, this is or should be handled by pulsar. @@ -70,55 +176,67 @@ def index(self, trans: ProvidesAppContext, job_id, **kwargs): # This looks like a galaxy dataset, check if any job input has been deleted. if any(jtid.dataset.dataset.purged for jtid in job.input_datasets): raise exceptions.ItemDeletionException("Input dataset(s) for job have been purged.") - else: - raise - @expose_api_anonymous_and_sessionless - def create(self, trans, job_id, payload, **kwargs): + return GalaxyFileResponse(path) + + @router.post( + "/api/jobs/{job_id}/files", + summary="Populate an output file.", + responses={ + 200: {"description": "An okay message.", "content": {"application/json": {"example": {"message": "ok"}}}}, + }, + ) + def create( + self, + job_id: Annotated[str, Path(description="Encoded id string of the job.")], + path: Annotated[str, Depends(path_query_or_form)], + job_key: Annotated[str, Depends(job_key_query_or_form)], + file: UploadFile = File(None, description="Contents of the file to create."), + session_id: str = Form(None), + nginx_upload_module_file_path: str = Form( + None, + alias="__file_path", + validation_alias="__file_path", + # both `alias` and `validation_alias` are needed for body parameters, see + # https://github.com/fastapi/fastapi/issues/10286#issuecomment-1727642960 + ), + underscore_file: UploadFile = File( + None, + alias="__file", + validation_alias="__file", + # both `alias` and `validation_alias` are needed for body parameters, see + # https://github.com/fastapi/fastapi/issues/10286#issuecomment-1727642960 + ), + trans: ProvidesAppContext = DependsOnTrans, + ): """ - create( self, trans, job_id, payload, **kwargs ) - * POST /api/jobs/{job_id}/files - Populate an output file (formal dataset, task split part, working - directory file (such as those related to metadata)). This should be - a multipart post with a 'file' parameter containing the contents of - the actual file to create. - - :type job_id: str - :param job_id: encoded id string of the job - :type payload: dict - :param payload: dictionary structure containing:: - 'job_key' = Key authenticating - 'path' = Path to file to create. - - ..note: - This API method is intended only for consumption by job runners, - not end users. - - :rtype: dict - :returns: an okay message + Populate an output file (formal dataset, task split part, working directory file (such as those related to + metadata). This should be a multipart POST with a 'file' parameter containing the contents of the actual file to + create. + + This API method is intended only for consumption by job runners, not end users. """ - job = self.__authorize_job_access(trans, job_id, **payload) - path = payload.get("path") - if not path: - raise exceptions.RequestParameterInvalidException("'path' parameter not provided or empty.") + path = unquote(path) + + job = self.__authorize_job_access(trans, job_id, path=path, job_key=job_key) self.__check_job_can_write_to_path(trans, job, path) + input_file: Optional[IO[bytes]] = None + input_file_path: Optional[str] = None # Is this writing an unneeded file? Should this just copy in Python? - if "__file_path" in payload: - file_path = payload.get("__file_path") + if nginx_upload_module_file_path: + input_file_path = nginx_upload_module_file_path upload_store = trans.app.config.nginx_upload_job_files_store assert upload_store, ( "Request appears to have been processed by" " nginx_upload_module but Galaxy is not" " configured to recognize it" ) - assert file_path.startswith( + assert input_file_path.startswith( upload_store - ), f"Filename provided by nginx ({file_path}) is not in correct directory ({upload_store})" - input_file = open(file_path) - elif "session_id" in payload: + ), f"Filename provided by nginx ({input_file_path}) is not in correct directory ({upload_store})" + elif session_id: # code stolen from basic.py - session_id = payload["session_id"] upload_store = ( trans.app.config.tus_upload_store_job_files or trans.app.config.tus_upload_store @@ -127,76 +245,41 @@ def create(self, trans, job_id, payload, **kwargs): if re.match(r"^[\w-]+$", session_id) is None: raise ValueError("Invalid session id format.") local_filename = os.path.abspath(os.path.join(upload_store, session_id)) - input_file = open(local_filename) + input_file_path = local_filename + elif file: + input_file = file.file + elif underscore_file: + input_file = underscore_file.file else: - input_file = payload.get("file", payload.get("__file", None)).file + raise exceptions.RequestParameterMissingException("No file uploaded.") + target_dir = os.path.dirname(path) util.safe_makedirs(target_dir) - try: - if os.path.exists(path) and (path.endswith("tool_stdout") or path.endswith("tool_stderr")): - with open(path, "ab") as destination: - shutil.copyfileobj(open(input_file.name, "rb"), destination) + if os.path.exists(path) and (path.endswith("tool_stdout") or path.endswith("tool_stderr")): + with open(path, "ab") as destination: + if input_file_path: + with open(input_file_path, "rb") as input_file_handle: + shutil.copyfileobj(input_file_handle, destination) + else: + shutil.copyfileobj(cast(IO[bytes], input_file), destination) + else: + # prior to migrating to FastAPI, this operation was done more efficiently for all cases using + # `shutil.move(input_file_path, path)`, but FastAPI stores the uploaded file as + # `tempfile.SpooledTemporaryFile` + # (https://docs.python.org/3/library/tempfile.html#tempfile.SpooledTemporaryFile), so now there is not even + # a path where uploaded files can be accessed on disk + if input_file_path: + shutil.move(input_file_path, path) else: - shutil.move(input_file.name, path) - finally: - try: - input_file.close() - except OSError: - # Fails to close file if not using nginx upload because the - # tempfile has moved and Python wants to delete it. - pass - return {"message": "ok"} - - @expose_api_anonymous_and_sessionless - def tus_patch(self, trans, **kwds): - """ - Exposed as PATCH /api/job_files/resumable_upload. - - I think based on the docs, a separate tusd server is needed for job files if - also hosting one for use facing uploads. - - Setting up tusd for job files should just look like (I think): - - tusd -host localhost -port 1080 -upload-dir=/database/tmp - - See more discussion of checking upload access, but we shouldn't need the - API key and session stuff the user upload tusd server should be configured with. + with open(path, "wb") as destination: + shutil.copyfileobj(cast(IO[bytes], input_file), destination) - Also shouldn't need a hooks endpoint for this reason but if you want to add one - the target CLI entry would be -hooks-http=/api/job_files/tus_hooks - and the action is featured below. - - I would love to check the job state with __authorize_job_access on the first - POST but it seems like TusMiddleware doesn't default to coming in here for that - initial POST the way it does for the subsequent PATCHes. Ultimately, the upload - is still authorized before the write done with POST /api/jobs//files - so I think there is no route here to mess with user data - the worst of the security - issues that can be caused is filling up the sever with needless files that aren't - acted on. Since this endpoint is not meant for public consumption - all the job - files stuff and the TUS server should be blocked to public IPs anyway and restricted - to your Pulsar servers and similar targeting could be accomplished with a user account - and the user facing upload endpoints. - """ - return None - - @expose_api_anonymous_and_sessionless - def tus_hooks(self, trans, **kwds): - """No-op but if hook specified the way we do for user upload it would hit this action. - - Exposed as PATCH /api/job_files/tus_hooks and documented in the docstring for - tus_patch. - """ - pass - - def __authorize_job_access(self, trans, encoded_job_id, **kwargs): - for key in ["path", "job_key"]: - if key not in kwargs: - error_message = f"Job files action requires a valid '{key}'." - raise exceptions.ObjectAttributeMissingException(error_message) + return {"message": "ok"} + def __authorize_job_access(self, trans, encoded_job_id, path, job_key): job_id = trans.security.decode_id(encoded_job_id) - job_key = trans.security.encode_id(job_id, kind="jobs_files") - if not util.safe_str_cmp(str(kwargs["job_key"]), job_key): + job_key_from_job_id = trans.security.encode_id(job_id, kind="jobs_files") + if not util.safe_str_cmp(str(job_key), job_key_from_job_id): raise exceptions.ItemAccessibilityException("Invalid job_key supplied.") # Verify job is active. Don't update the contents of complete jobs. @@ -207,9 +290,9 @@ def __authorize_job_access(self, trans, encoded_job_id, **kwargs): return job def __check_job_can_write_to_path(self, trans, job, path): - """Verify an idealized job runner should actually be able to write to - the specified path - it must be a dataset output, a dataset "extra - file", or a some place in the working directory of this job. + """ + Verify an idealized job runner should actually be able to write to the specified path - it must be a dataset + output, a dataset "extra file", or a some place in the working directory of this job. Would like similar checks for reading the unstructured nature of loc files make this very difficult. (See abandoned work here @@ -220,8 +303,8 @@ def __check_job_can_write_to_path(self, trans, job, path): raise exceptions.ItemAccessibilityException("Job is not authorized to write to supplied path.") def __is_output_dataset_path(self, job, path): - """Check if is an output path for this job or a file in the an - output's extra files path. + """ + Check if is an output path for this job or a file in the output's extra files path. """ da_lists = [job.output_datasets, job.output_library_datasets] for da_list in da_lists: @@ -240,3 +323,59 @@ def __in_working_directory(self, job, path, app): job, base_dir="job_work", dir_only=True, extra_dir=str(job.id) ) return util.in_directory(path, working_directory) + + +class JobFilesAPIController(BaseGalaxyAPIController): + """ + Legacy WSGI endpoints dedicated to TUS uploads. + + TUS upload endpoints work in tandem with the WSGI middleware `TusMiddleware` from the `tuswsgi` package. Both + WSGI middlewares and endpoints are injected into the FastAPI app after FastAPI routes as a single sub-application + `wsgi_handler` using `app.mount("/", wsgi_handler)`, meaning that requests are passed to the `wsgi_handler` + sub-application (and thus to `TusMiddleware`) only if there was no FastAPI endpoint defined to handle them. + + Therefore, these legacy WSGI endpoints cannot be migrated to FastAPI unless `TusMiddleware` is migrated to ASGI. + """ + + @expose_api_anonymous_and_sessionless + def tus_patch(self, trans, **kwds): + """ + Exposed as PATCH /api/job_files/resumable_upload. + + I think based on the docs, a separate tusd server is needed for job files if + also hosting one for use facing uploads. + + Setting up tusd for job files should just look like (I think): + + `tusd -host localhost -port 1080 -upload-dir=/database/tmp` + + See more discussion of checking upload access, but we shouldn't need the + API key and session stuff the user upload tusd server should be configured with. + + Also shouldn't need a hooks endpoint for this reason but if you want to add one + the target CLI entry would be `-hooks-http=/api/job_files/tus_hooks` + and the action is featured below. + + I would love to check the job state with `__authorize_job_access` on the first + POST but it seems like `TusMiddleware` doesn't default to coming in here for that + initial POST the way it does for the subsequent PATCHes. Ultimately, the upload + is still authorized before the write done with POST `/api/jobs//files` + so I think there is no route here to mess with user data - the worst of the security + issues that can be caused is filling up the sever with needless files that aren't + acted on. Since this endpoint is not meant for public consumption - all the job + files stuff and the TUS server should be blocked to public IPs anyway and restricted + to your Pulsar servers and similar targeting could be accomplished with a user account + and the user facing upload endpoints. + """ + ... + return None + + @expose_api_anonymous_and_sessionless + def tus_hooks(self, trans, **kwds): + """ + No-op but if hook specified the way we do for user upload it would hit this action. + + Exposed as PATCH /api/job_files/tus_hooks and documented in the docstring for tus_patch. + """ + ... + return None diff --git a/lib/galaxy/webapps/galaxy/buildapp.py b/lib/galaxy/webapps/galaxy/buildapp.py index e603abf6a1ef..ad1962de7188 100644 --- a/lib/galaxy/webapps/galaxy/buildapp.py +++ b/lib/galaxy/webapps/galaxy/buildapp.py @@ -870,24 +870,6 @@ def populate_api_routes(webapp, app): conditions=dict(method=["GET"]), ) - # Job files controllers. Only for consumption by remote job runners. - webapp.mapper.resource( - "file", - "files", - controller="job_files", - name_prefix="job_", - path_prefix="/api/jobs/{job_id}", - parent_resources=dict(member_name="job", collection_name="jobs"), - ) - - webapp.mapper.connect( - "index", - "/api/jobs/{job_id}/files", - controller="job_files", - action="index", - conditions=dict(method=["HEAD"]), - ) - webapp.mapper.resource( "port", "ports", diff --git a/test/integration/test_job_files.py b/test/integration/test_job_files.py index 4f244419156b..1d6a450211dd 100644 --- a/test/integration/test_job_files.py +++ b/test/integration/test_job_files.py @@ -17,8 +17,14 @@ import io import os +import shutil import tempfile -from typing import Dict +from typing import ( + Any, + Dict, + IO, + Optional, +) import requests from sqlalchemy import select @@ -42,26 +48,39 @@ class TestJobFilesIntegration(integration_util.IntegrationTestCase): initialized = False dataset_populator: DatasetPopulator + input_hda: model.HistoryDatasetAssociation + input_hda_dict: Dict[str, Any] + _nginx_upload_job_files_store: str + @classmethod def handle_galaxy_config_kwds(cls, config): super().handle_galaxy_config_kwds(config) + config["job_config_file"] = SIMPLE_JOB_CONFIG_FILE config["object_store_store_by"] = "uuid" config["server_name"] = "files" + config["nginx_upload_job_files_store"] = tempfile.mkdtemp() + cls._nginx_upload_job_files_store = config["nginx_upload_job_files_store"] cls.initialized = False + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls._nginx_upload_job_files_store) + super().tearDownClass() + def setUp(self): super().setUp() - self.dataset_populator = DatasetPopulator(self.galaxy_interactor) - if not TestJobFilesIntegration.initialized: - history_id = self.dataset_populator.new_history() + cls = TestJobFilesIntegration + cls.dataset_populator = DatasetPopulator(self.galaxy_interactor) + if not cls.initialized: + history_id = cls.dataset_populator.new_history() sa_session = self.sa_session stmt = select(model.HistoryDatasetAssociation) assert len(sa_session.scalars(stmt).all()) == 0 - self.input_hda_dict = self.dataset_populator.new_dataset(history_id, content=TEST_INPUT_TEXT, wait=True) + cls.input_hda_dict = cls.dataset_populator.new_dataset(history_id, content=TEST_INPUT_TEXT, wait=True) assert len(sa_session.scalars(stmt).all()) == 1 - self.input_hda = sa_session.scalars(stmt).all()[0] - TestJobFilesIntegration.initialized = True + cls.input_hda = sa_session.scalars(stmt).all()[0] + cls.initialized = True def test_read_by_state(self): job, _, _ = self.create_static_job_with_state("running") @@ -160,6 +179,78 @@ def test_write_with_tus(self): api_asserts.assert_status_code_is_ok(response) assert open(path).read() == "some initial text data" + def test_write_with_nginx_upload_module(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + file: Optional[IO[bytes]] = None + try: + with open(os.path.join(self._app.config.nginx_upload_job_files_store, "nginx_upload"), "wb") as file: + file.write(b"some initial text data") + + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=dict(**data, __file_path=file.name)) + + api_asserts.assert_status_code_is_ok(response) + assert not os.path.exists(file.name) + assert os.path.exists(path) + with open(path) as uploaded_file: + assert uploaded_file.read() == "some initial text data" + finally: + # remove `file.name` + try: + if file is not None: + os.remove(file.name) + except FileNotFoundError: + pass + + def test_write_with_session_id(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + upload_store = ( + self._app.config.tus_upload_store_job_files + or self._app.config.tus_upload_store + or self._app.config.new_file_path + ) + upload_id = "35a7c8d3-e659-430e-8579-8d085e7e569d" + upload_path = os.path.join(upload_store, "35a7c8d3-e659-430e-8579-8d085e7e569d") + try: + with open(upload_path, "w") as upload_file: + upload_file.write("some initial text data") + + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=dict(**data, session_id=upload_id)) + api_asserts.assert_status_code_is_ok(response) + assert not os.path.exists(upload_path) + assert os.path.exists(path) + with open(path) as uploaded_file: + assert uploaded_file.read() == "some initial text data" + finally: + # remove `upload_path` + try: + os.remove(upload_path) + except FileNotFoundError: + pass + + def test_write_with_underscored_file_param(self): + job, output_hda, working_directory = self.create_static_job_with_state("running") + job_id, job_key = self._api_job_keys(job) + path = self._app.object_store.get_filename(output_hda.dataset) + assert path + data = {"path": path, "job_key": job_key} + + post_url = self._api_url(f"jobs/{job_id}/files", use_key=False) + response = requests.post(post_url, data=data, files={"__file": io.StringIO("some initial text data")}) + api_asserts.assert_status_code_is_ok(response) + assert open(path).read() == "some initial text data" + def test_write_protection(self): job, _, _ = self.create_static_job_with_state("running") job_id, job_key = self._api_job_keys(job)