Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions docs/concepts/transforms.rst
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,16 @@ about the state of the tasks at given points. Here is an example:

.. code-block:: python

from voluptuous import Optional, Required

from typing import Optional
from taskgraph.transforms.base import TransformSequence
from taskgraph.util.schema import Schema

my_schema = Schema({
Required("foo"): str,
Optional("bar"): bool,
})
class MySchema(Schema):
foo: str # Required field
bar: Optional[bool] = None # Optional field

transforms.add_validate(my_schema)
transforms = TransformSequence()
transforms.add_validate(MySchema)

In the above example, we can be sure that every task dict has a string field
called ``foo``, and may or may not have a boolean field called ``bar``.
Expand Down
16 changes: 8 additions & 8 deletions docs/tutorials/creating-a-task-graph.rst
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,16 @@ comments for explanations):

.. code-block:: python

from voluptuous import Optional, Required

from taskgraph.transforms.base import TransformSequence
from typing import Optional
from taskgraph.util.schema import Schema
from taskgraph.transforms.base import TransformSequence

# Define the schema using Schema base class for better type checking and performance.
class HelloDescriptionSchema(Schema):
text: str # Required field
description: Optional[str] = None # Optional field

# Define the schema. We use the `voluptuous` package to handle validation.
hello_description_schema = Schema({
Required("text"): str,
Optional("description"): str,
})
hello_description_schema = HelloDescriptionSchema

# Create a 'TransformSequence' instance. This class collects transform
# functions to run later.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ dependencies = [
"cookiecutter~=2.1",
"json-e>=2.7",
"mozilla-repo-urls",
"msgspec>=0.18.6",
"PyYAML>=5.3.1",
"redo>=2.0",
"requests>=2.25",
"slugid>=2.0",
"taskcluster-urls>=11.0",
"voluptuous>=0.12.1",
]

[project.optional-dependencies]
Expand Down
218 changes: 126 additions & 92 deletions src/taskgraph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Dict
from typing import Any, Dict, List, Literal, Optional, Union

from voluptuous import ALLOW_EXTRA, All, Any, Extra, Length, Optional, Required
import msgspec

from .util.caches import CACHES
from .util.python_path import find_object
from .util.schema import Schema, optionally_keyed_by, validate_schema
from .util.vcs import get_repository
Expand All @@ -21,94 +20,127 @@
logger = logging.getLogger(__name__)


#: Schema for the graph config
graph_config_schema = Schema(
{
# The trust-domain for this graph.
# (See https://firefox-source-docs.mozilla.org/taskcluster/taskcluster/taskgraph.html#taskgraph-trust-domain) # noqa
Required("trust-domain"): str,
Optional(
"docker-image-kind",
description="Name of the docker image kind (default: docker-image)",
): str,
Required("task-priority"): optionally_keyed_by(
"project",
"level",
Any(
"highest",
"very-high",
"high",
"medium",
"low",
"very-low",
"lowest",
),
),
Optional(
"task-deadline-after",
description="Default 'deadline' for tasks, in relative date format. "
"Eg: '1 week'",
): optionally_keyed_by("project", str),
Optional(
"task-expires-after",
description="Default 'expires-after' for level 1 tasks, in relative date format. "
"Eg: '90 days'",
): str,
Required("workers"): {
Required("aliases"): {
str: {
Required("provisioner"): optionally_keyed_by("level", str),
Required("implementation"): str,
Required("os"): str,
Required("worker-type"): optionally_keyed_by("level", str),
}
},
},
Required("taskgraph"): {
Optional(
"register",
description="Python function to call to register extensions.",
): str,
Optional("decision-parameters"): str,
Optional(
"cached-task-prefix",
description="The taskcluster index prefix to use for caching tasks. "
"Defaults to `trust-domain`.",
): str,
Optional(
"cache-pull-requests",
description="Should tasks from pull requests populate the cache",
): bool,
Optional(
"index-path-regexes",
description="Regular expressions matching index paths to be summarized.",
): [str],
Optional(
"run",
description="Configuration related to the 'run' transforms.",
): {
Optional(
"use-caches",
description="List of caches to enable, or a boolean to "
"enable/disable all of them.",
): Any(bool, list(CACHES.keys())),
},
Required("repositories"): All(
{
str: {
Required("name"): str,
Optional("project-regex"): str,
Optional("ssh-secret-name"): str,
# FIXME
Extra: str,
}
},
Length(min=1),
),
},
},
extra=ALLOW_EXTRA,
)
# TaskPriority type for the priority levels
TaskPriority = Literal[
"highest", "very-high", "high", "medium", "low", "very-low", "lowest"
]


class WorkerAlias(Schema):
"""Worker alias configuration."""

provisioner: Union[str, dict] # Can be keyed-by level
implementation: str
os: str
worker_type: Union[str, dict] # Can be keyed-by level, maps from "worker-type"

def __post_init__(self):
"""Validate keyed-by fields."""
# Validate provisioner can be keyed-by level
if isinstance(self.provisioner, dict):
validator = optionally_keyed_by("level", str)
# Just validate - it will raise an error if invalid
validator(self.provisioner)

# Validate worker_type can be keyed-by level
if isinstance(self.worker_type, dict):
validator = optionally_keyed_by("level", str)
# Just validate - it will raise an error if invalid
validator(self.worker_type)


class Workers(Schema, rename=None):
"""Workers configuration."""

aliases: Dict[str, WorkerAlias]


class Repository(Schema):
"""Repository configuration."""

# Required fields first
name: str

# Optional fields
project_regex: Optional[str] = None # Maps from "project-regex"
ssh_secret_name: Optional[str] = None # Maps from "ssh-secret-name"
# Allow extra fields for flexibility
__extras__: Dict[str, Any] = msgspec.field(default_factory=dict)


class RunConfig(Schema):
"""Run transforms configuration."""

use_caches: Optional[Union[bool, List[str]]] = None # Maps from "use-caches"


class TaskGraphConfig(Schema):
"""Taskgraph specific configuration."""

# Required fields first
repositories: Dict[str, Repository]

# Optional fields
register: Optional[str] = None
decision_parameters: Optional[str] = None # Maps from "decision-parameters"
cached_task_prefix: Optional[str] = None # Maps from "cached-task-prefix"
cache_pull_requests: Optional[bool] = None # Maps from "cache-pull-requests"
index_path_regexes: Optional[List[str]] = None # Maps from "index-path-regexes"
run: Optional[RunConfig] = None


class GraphConfigSchema(Schema):
"""Main graph configuration schema."""

# Required fields first
trust_domain: str # Maps from "trust-domain"
task_priority: Union[
TaskPriority, dict
] # Maps from "task-priority", can be keyed-by project or level
workers: Workers
taskgraph: TaskGraphConfig

# Optional fields
docker_image_kind: Optional[str] = None # Maps from "docker-image-kind"
task_deadline_after: Optional[Union[str, dict]] = (
None # Maps from "task-deadline-after", can be keyed-by project
)
task_expires_after: Optional[str] = None # Maps from "task-expires-after"
# Allow extra fields for flexibility
__extras__: Dict[str, Any] = msgspec.field(default_factory=dict)

def __post_init__(self):
"""Validate keyed-by fields."""
# Validate task_priority can be keyed-by project or level
if isinstance(self.task_priority, dict):
# Create a validator that accepts TaskPriority values
def validate_priority(x):
valid_priorities = [
"highest",
"very-high",
"high",
"medium",
"low",
"very-low",
"lowest",
]
if x not in valid_priorities:
raise ValueError(f"Invalid task priority: {x}")
return x

validator = optionally_keyed_by("project", "level", validate_priority)
# Just validate - it will raise an error if invalid
validator(self.task_priority)

# Validate task_deadline_after can be keyed-by project
if self.task_deadline_after and isinstance(self.task_deadline_after, dict):
validator = optionally_keyed_by("project", str)
# Just validate - it will raise an error if invalid
validator(self.task_deadline_after)


# Msgspec schema is now the main schema
graph_config_schema = GraphConfigSchema


@dataclass(frozen=True, eq=False)
Expand Down Expand Up @@ -177,7 +209,9 @@ def kinds_dir(self):


def validate_graph_config(config):
validate_schema(graph_config_schema, config, "Invalid graph configuration:")
"""Validate graph configuration using msgspec."""
# With rename="kebab", msgspec handles the conversion automatically
validate_schema(GraphConfigSchema, config, "Invalid graph configuration:")


def load_graph_config(root_dir):
Expand Down
13 changes: 7 additions & 6 deletions src/taskgraph/decision.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
import shutil
import time
from pathlib import Path
from typing import Any, Dict, Optional

import yaml
from voluptuous import Optional

from taskgraph.actions import render_actions_json
from taskgraph.create import create_tasks
Expand Down Expand Up @@ -40,11 +40,12 @@


#: Schema for try_task_config.json version 2
try_task_config_schema_v2 = Schema(
{
Optional("parameters"): {str: object},
}
)
class TryTaskConfigSchemaV2(Schema):
# All fields are optional
parameters: Optional[Dict[str, Any]] = None


try_task_config_schema_v2 = TryTaskConfigSchemaV2


def full_task_graph_to_runnable_tasks(full_task_json):
Expand Down
Loading
Loading