Skip to content

Commit 6186cf8

Browse files
jmchiltonclaude
andcommitted
Implement GA4GH WES API with task logs & comprehensive test coverage
Implement full Workflow Execution Service (WES) API support for running Galaxy workflows via the GA4GH standard. Includes complete core functionality with run submission, status tracking, cancellation, input/output handling, and task logs. Core WES Features: - 6 WES API endpoints (service-info, runs CRUD, cancel, status) - Support for both workflow_url and workflow_attachment input methods - Support for gxworkflow:// URI scheme for direct database workflow references - Automatic history creation with optional custom naming - Full workflow support: gx_workflow_ga and gx_workflow_format2 - DRS URI generation for all workflow outputs - State mapping between Galaxy invocation states and WES states - Cursor-based pagination for run listings Task Log Features: - /api/jobs/{job_id}/stdout - Job stdout as plain text - /api/jobs/{job_id}/stderr - Job stderr as plain text - /ga4gh/wes/v1/runs/{run_id}/tasks - Paginated task list - /ga4gh/wes/v1/runs/{run_id}/tasks/{task_id} - Task details - Build TaskLogs from workflow invocation steps - Populate RunLog with task_logs list and task_logs_url Implementation Highlights: - Auto-generated Pydantic models from GA4GH WES OpenAPI spec - FastAPI CBV router following Galaxy API patterns - Service layer with full workflow submission/execution pipeline - Shared GA4GH utilities for DRS code reuse - Reduced test duplication with helper functions Files Added: - lib/galaxy/schema/wes/__init__.py - Generated Pydantic models - lib/galaxy/webapps/galaxy/api/wes.py - WES API router - lib/galaxy/webapps/galaxy/services/wes.py - WesService - lib/galaxy/webapps/galaxy/services/ga4gh.py - Shared GA4GH utilities - WES_PLAN.md - Implementation plan 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent bef42b4 commit 6186cf8

File tree

9 files changed

+2325
-42
lines changed

9 files changed

+2325
-42
lines changed

lib/galaxy/schema/wes/__init__.py

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
# generated by datamodel-codegen:
2+
# filename: https://raw.githubusercontent.com/ga4gh/workflow-execution-service-schemas/develop/openapi/workflow_execution_service.openapi.yaml
3+
# timestamp: 2025-11-18T17:23:28+00:00
4+
5+
from __future__ import annotations
6+
7+
from enum import Enum
8+
from typing import (
9+
Any,
10+
Dict,
11+
List,
12+
Optional,
13+
Union,
14+
)
15+
16+
from pydantic import (
17+
AnyUrl,
18+
AwareDatetime,
19+
BaseModel,
20+
Field,
21+
)
22+
23+
24+
class Organization(BaseModel):
25+
name: str = Field(
26+
..., description="Name of the organization responsible for the service", examples=["My organization"]
27+
)
28+
url: AnyUrl = Field(
29+
..., description="URL of the website of the organization (RFC 3986 format)", examples=["https://example.com"]
30+
)
31+
32+
33+
class ServiceType(BaseModel):
34+
group: str = Field(
35+
...,
36+
description="Namespace in reverse domain name format. Use `org.ga4gh` for implementations compliant with official GA4GH specifications. For services with custom APIs not standardized by GA4GH, or implementations diverging from official GA4GH specifications, use a different namespace (e.g. your organization's reverse domain name).",
37+
examples=["org.ga4gh"],
38+
)
39+
artifact: str = Field(
40+
...,
41+
description="Name of the API or GA4GH specification implemented. Official GA4GH types should be assigned as part of standards approval process. Custom artifacts are supported.",
42+
examples=["beacon"],
43+
)
44+
version: str = Field(
45+
...,
46+
description="Version of the API or specification. GA4GH specifications use semantic versioning.",
47+
examples=["1.0.0"],
48+
)
49+
50+
51+
class RunId(BaseModel):
52+
run_id: Optional[str] = Field(None, description="workflow run ID")
53+
54+
55+
class State(Enum):
56+
UNKNOWN = "UNKNOWN"
57+
QUEUED = "QUEUED"
58+
INITIALIZING = "INITIALIZING"
59+
RUNNING = "RUNNING"
60+
PAUSED = "PAUSED"
61+
COMPLETE = "COMPLETE"
62+
EXECUTOR_ERROR = "EXECUTOR_ERROR"
63+
SYSTEM_ERROR = "SYSTEM_ERROR"
64+
CANCELED = "CANCELED"
65+
CANCELING = "CANCELING"
66+
PREEMPTED = "PREEMPTED"
67+
68+
69+
class RunStatus(BaseModel):
70+
run_id: str
71+
state: Optional[State] = None
72+
73+
74+
class RunSummary(RunStatus):
75+
start_time: Optional[str] = Field(
76+
None, description='When the run started executing, in ISO 8601 format "%Y-%m-%dT%H:%M:%SZ"'
77+
)
78+
end_time: Optional[str] = Field(
79+
None,
80+
description='When the run stopped executing (completed, failed, or cancelled), in ISO 8601 format "%Y-%m-%dT%H:%M:%SZ"',
81+
)
82+
tags: Dict[str, str] = Field(..., description="Arbitrary key/value tags added by the client during run creation")
83+
84+
85+
class RunRequest(BaseModel):
86+
workflow_params: Optional[Dict[str, Any]] = Field(
87+
None,
88+
description="REQUIRED\nThe workflow run parameterizations (JSON encoded), including input and output file locations",
89+
)
90+
workflow_type: str = Field(
91+
...,
92+
description='REQUIRED\nThe workflow descriptor type, must be "CWL" or "WDL" currently (or another alternative supported by this WES instance)',
93+
)
94+
workflow_type_version: str = Field(
95+
..., description="REQUIRED\nThe workflow descriptor type version, must be one supported by this WES instance"
96+
)
97+
tags: Optional[Dict[str, str]] = None
98+
workflow_engine_parameters: Optional[Dict[str, str]] = None
99+
workflow_engine: Optional[str] = Field(
100+
None,
101+
description="The workflow engine, must be one supported by this WES instance. Required if workflow_engine_version is provided.",
102+
)
103+
workflow_engine_version: Optional[str] = Field(
104+
None,
105+
description="The workflow engine version, must be one supported by this WES instance. If workflow_engine is provided, but workflow_engine_version is not, servers can make no assumptions with regard to the engine version the WES instance uses to process the request if that WES instance supports multiple versions of the requested engine.",
106+
)
107+
workflow_url: str = Field(
108+
...,
109+
description="REQUIRED\nThe workflow CWL or WDL document. When `workflow_attachments` is used to attach files, the `workflow_url` may be a relative path to one of the attachments.",
110+
)
111+
112+
113+
class Log(BaseModel):
114+
name: Optional[str] = Field(None, description="The task or workflow name")
115+
cmd: Optional[List[str]] = Field(None, description="The command line that was executed")
116+
start_time: Optional[str] = Field(
117+
None, description='When the command started executing, in ISO 8601 format "%Y-%m-%dT%H:%M:%SZ"'
118+
)
119+
end_time: Optional[str] = Field(
120+
None,
121+
description='When the command stopped executing (completed, failed, or cancelled), in ISO 8601 format "%Y-%m-%dT%H:%M:%SZ"',
122+
)
123+
stdout: Optional[str] = Field(
124+
None,
125+
description="A URL to retrieve standard output logs of the workflow run or task. This URL may change between status requests, or may not be available until the task or workflow has finished execution. Should be available using the same credentials used to access the WES endpoint.",
126+
)
127+
stderr: Optional[str] = Field(
128+
None,
129+
description="A URL to retrieve standard error logs of the workflow run or task. This URL may change between status requests, or may not be available until the task or workflow has finished execution. Should be available using the same credentials used to access the WES endpoint.",
130+
)
131+
exit_code: Optional[int] = Field(None, description="Exit code of the program")
132+
system_logs: Optional[List[str]] = Field(
133+
None,
134+
description="System logs are any logs the system decides are relevant,\nwhich are not tied directly to a workflow.\nContent is implementation specific: format, size, etc.\n\nSystem logs may be collected here to provide convenient access.\n\nFor example, the system may include an error message that caused\na SYSTEM_ERROR state (e.g. disk is full), etc.",
135+
)
136+
137+
138+
class DefaultWorkflowEngineParameter(BaseModel):
139+
name: Optional[str] = Field(None, description="The name of the parameter")
140+
type: Optional[str] = Field(None, description="Describes the type of the parameter, e.g. float.")
141+
default_value: Optional[str] = Field(
142+
None, description='The stringified version of the default parameter. e.g. "2.45".'
143+
)
144+
145+
146+
class WorkflowTypeVersion(BaseModel):
147+
workflow_type_version: Optional[List[str]] = Field(
148+
None, description="an array of one or more acceptable types for the `workflow_type`"
149+
)
150+
151+
152+
class TaskLog(Log):
153+
id: str = Field(..., description="A unique identifier which may be used to reference the task")
154+
system_logs: Optional[List[str]] = Field(
155+
None,
156+
description="System logs are any logs the system decides are relevant,\nwhich are not tied directly to a task.\nContent is implementation specific: format, size, etc.\n\nSystem logs may be collected here to provide convenient access.\n\nFor example, the system may include the name of the host\nwhere the task is executing, an error message that caused\na SYSTEM_ERROR state (e.g. disk is full), etc.",
157+
)
158+
tes_uri: Optional[str] = Field(
159+
None,
160+
description="An optional URL pointing to an extended task definition defined by a [TES api](https://github.com/ga4gh/task-execution-schemas)",
161+
)
162+
name: str = Field(..., description="The task or workflow name")
163+
164+
165+
class WorkflowEngineVersion(BaseModel):
166+
workflow_engine_version: Optional[List[str]] = Field(
167+
None, description="An array of one or more acceptable engines versions for the `workflow_engine`"
168+
)
169+
170+
171+
class RunListResponse(BaseModel):
172+
runs: Optional[List[Union[RunStatus, RunSummary]]] = Field(
173+
None,
174+
description="A list of workflow runs that the service has executed or is executing. The list is filtered to only include runs that the caller has permission to see.",
175+
)
176+
next_page_token: Optional[str] = Field(
177+
None,
178+
description="A token which may be supplied as `page_token` in workflow run list request to get the next page of results. An empty string indicates there are no more items to return.",
179+
)
180+
181+
182+
class ErrorResponse(BaseModel):
183+
msg: Optional[str] = Field(None, description="A detailed error message.")
184+
status_code: Optional[int] = Field(
185+
None, description="The integer representing the HTTP status code (e.g. 200, 404)."
186+
)
187+
188+
189+
class Service(BaseModel):
190+
id: str = Field(
191+
...,
192+
description="Unique ID of this service. Reverse domain name notation is recommended, though not required. The identifier should attempt to be globally unique so it can be used in downstream aggregator services e.g. Service Registry.",
193+
examples=["org.ga4gh.myservice"],
194+
)
195+
name: str = Field(..., description="Name of this service. Should be human readable.", examples=["My project"])
196+
type: ServiceType
197+
description: Optional[str] = Field(
198+
None,
199+
description="Description of the service. Should be human readable and provide information about the service.",
200+
examples=["This service provides..."],
201+
)
202+
organization: Organization = Field(..., description="Organization providing the service")
203+
contactUrl: Optional[AnyUrl] = Field(
204+
None,
205+
description="URL of the contact for the provider of this service, e.g. a link to a contact form (RFC 3986 format), or an email (RFC 2368 format).",
206+
examples=["mailto:[email protected]"],
207+
)
208+
documentationUrl: Optional[AnyUrl] = Field(
209+
None,
210+
description="URL of the documentation of this service (RFC 3986 format). This should help someone learn how to use your service, including any specifics required to access data, e.g. authentication.",
211+
examples=["https://docs.myservice.example.com"],
212+
)
213+
createdAt: Optional[AwareDatetime] = Field(
214+
None,
215+
description="Timestamp describing when the service was first deployed and available (RFC 3339 format)",
216+
examples=["2019-06-04T12:58:19Z"],
217+
)
218+
updatedAt: Optional[AwareDatetime] = Field(
219+
None,
220+
description="Timestamp describing when the service was last updated (RFC 3339 format)",
221+
examples=["2019-06-04T12:58:19Z"],
222+
)
223+
environment: Optional[str] = Field(
224+
None,
225+
description="Environment the service is running in. Use this to distinguish between production, development and testing/staging deployments. Suggested values are prod, test, dev, staging. However this is advised and not enforced.",
226+
examples=["test"],
227+
)
228+
version: str = Field(
229+
...,
230+
description="Version of the service being described. Semantic versioning is recommended, but other identifiers, such as dates or commit hashes, are also allowed. The version should be changed whenever the service is updated.",
231+
examples=["1.0.0"],
232+
)
233+
234+
235+
class RunLog(BaseModel):
236+
run_id: Optional[str] = Field(None, description="workflow run ID")
237+
request: Optional[RunRequest] = None
238+
state: Optional[State] = None
239+
run_log: Optional[Log] = None
240+
task_logs_url: Optional[str] = Field(
241+
None,
242+
description="A reference to the complete url which may be used to obtain a paginated list of task logs for this workflow",
243+
)
244+
task_logs: Optional[List[Union[Log, TaskLog]]] = Field(
245+
None,
246+
description="The logs, and other key info like timing and exit code, for each step in the workflow run. This field is deprecated and the `task_logs_url` should be used to retrieve a paginated list of steps from the workflow run. This field will be removed in the next major version of the specification (2.0.0)",
247+
)
248+
outputs: Optional[Dict[str, Any]] = Field(None, description="The outputs from the workflow run.")
249+
250+
251+
class TaskListResponse(BaseModel):
252+
task_logs: Optional[List[TaskLog]] = Field(
253+
None, description="The logs, and other key info like timing and exit code, for each step in the workflow run."
254+
)
255+
next_page_token: Optional[str] = Field(
256+
None,
257+
description="A token which may be supplied as `page_token` in workflow run task list request to get the next page of results. An empty string indicates there are no more items to return.",
258+
)
259+
260+
261+
class ServiceInfo(Service):
262+
workflow_type_versions: Dict[str, WorkflowTypeVersion]
263+
supported_wes_versions: List[str] = Field(
264+
..., description="The version(s) of the WES schema supported by this service"
265+
)
266+
supported_filesystem_protocols: List[str] = Field(
267+
...,
268+
description="The filesystem protocols supported by this service, currently these may include common protocols using the terms 'http', 'https', 'sftp', 's3', 'gs', 'file', or 'synapse', but others are possible and the terms beyond these core protocols are currently not fixed. This section reports those protocols (either common or not) supported by this WES service.",
269+
)
270+
workflow_engine_versions: Dict[str, WorkflowEngineVersion]
271+
default_workflow_engine_parameters: List[DefaultWorkflowEngineParameter] = Field(
272+
...,
273+
description="Each workflow engine can present additional parameters that can be sent to the workflow engine. This message will list the default values, and their types for each workflow engine.",
274+
)
275+
system_state_counts: Dict[str, int] = Field(
276+
...,
277+
description="The system statistics, key is the statistic, value is the count of runs in that state. See the State enum for the possible keys.",
278+
)
279+
auth_instructions_url: str = Field(
280+
...,
281+
description="A web page URL with human-readable instructions on how to get an authorization token for use with a specific WES endpoint.",
282+
)
283+
tags: Dict[str, str]

lib/galaxy/schema/wes/gen.sh

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#!/bin/bash
2+
3+
# must be run from a virtualenv with...
4+
# https://github.com/koxudaxi/datamodel-code-generator
5+
6+
# Use the installed datamodel-codegen
7+
CODEGEN="datamodel-codegen"
8+
9+
# Base URL for WES OpenAPI spec
10+
WES_SPEC_URL="https://raw.githubusercontent.com/ga4gh/workflow-execution-service-schemas/develop/openapi/workflow_execution_service.openapi.yaml"
11+
12+
# Generate models from full OpenAPI spec
13+
$CODEGEN --input-file-type openapi --output-model-type pydantic_v2.BaseModel --url "$WES_SPEC_URL" --output "__init__.py"

lib/galaxy/webapps/galaxy/api/drs.py

Lines changed: 9 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,9 @@
1313
from galaxy.config import GalaxyAppConfiguration
1414
from galaxy.exceptions import ObjectNotFound
1515
from galaxy.managers.context import ProvidesHistoryContext
16-
from galaxy.schema.drs import (
17-
DrsObject,
18-
Organization,
19-
Service,
20-
ServiceType,
21-
)
22-
from galaxy.version import VERSION
16+
from galaxy.schema.drs import DrsObject
2317
from galaxy.webapps.galaxy.services.datasets import DatasetsService
18+
from galaxy.webapps.galaxy.services.ga4gh import build_service_info
2419
from . import (
2520
depends,
2621
DependsOnTrans,
@@ -44,36 +39,14 @@ class DrsApi:
4439
config: GalaxyAppConfiguration = depends(GalaxyAppConfiguration)
4540

4641
@router.get("/ga4gh/drs/v1/service-info", public=True)
47-
def service_info(self, request: Request) -> Service:
48-
components = request.url.components
49-
hostname = components.hostname
50-
assert hostname
51-
default_organization_id = ".".join(reversed(hostname.split(".")))
52-
config = self.config
53-
organization_id = config.ga4gh_service_id or default_organization_id
54-
organization_name = config.organization_name or organization_id
55-
organization_url = config.organization_url or f"{components.scheme}://{components.netloc}"
56-
57-
organization = Organization(
58-
url=organization_url,
59-
name=organization_name,
60-
)
61-
service_type = ServiceType(
62-
group="org.ga4gh",
42+
def service_info(self, request: Request):
43+
return build_service_info(
44+
config=self.config,
45+
request_url=str(request.url),
6346
artifact="drs",
64-
version="1.2.0",
65-
)
66-
extra_kwds = {}
67-
if environment := config.ga4gh_service_environment:
68-
extra_kwds["environment"] = environment
69-
return Service(
70-
id=organization_id + ".drs",
71-
name=DRS_SERVICE_NAME,
72-
description=DRS_SERVICE_DESCRIPTION,
73-
organization=organization,
74-
type=service_type,
75-
version=VERSION,
76-
**extra_kwds,
47+
service_name=DRS_SERVICE_NAME,
48+
service_description=DRS_SERVICE_DESCRIPTION,
49+
artifact_version="1.2.0",
7750
)
7851

7952
@router.get("/ga4gh/drs/v1/objects/{object_id}", public=True)

0 commit comments

Comments
 (0)