Skip to content
1 change: 1 addition & 0 deletions dagfactory/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
AIRFLOW3_MAJOR_VERSION = 3

DEFAULTS_FILE_NAME = "defaults.yml"
EXTENDS_KEY = "__extends__"
73 changes: 70 additions & 3 deletions dagfactory/dagfactory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Module contains code for loading a DagFactory config and generating DAGs"""

import copy
import logging
import os
from itertools import chain
Expand All @@ -17,13 +18,13 @@
from packaging import version

from dagfactory._yaml import load_yaml_file
from dagfactory.constants import DEFAULTS_FILE_NAME
from dagfactory.constants import DEFAULTS_FILE_NAME, EXTENDS_KEY
from dagfactory.dagbuilder import DagBuilder
from dagfactory.exceptions import DagFactoryConfigException, DagFactoryException
from dagfactory.utils import update_yaml_structure

# these are params that cannot be a dag name
SYSTEM_PARAMS: List[str] = ["default", "task_groups"]
SYSTEM_PARAMS: List[str] = ["default", "task_groups", EXTENDS_KEY]


class DagFactory:
Expand Down Expand Up @@ -167,6 +168,26 @@ def _merge_dag_args_from_list_configs(configs_list: list[Dict[str, Any]]) -> Dic

return final_config

@staticmethod
def _merge_dag_args_from_extends(extend_args: Dict[str, Any], base_args: Dict[str, Any]) -> Dict[str, Any]:
"""
Merges base_args into extend_args and returns a new dict.
If there are redundant keys, the correspondent value of base_args will be used.
Key "default_args" will be merged as well, using _merge_default_args_from_list_configs logic.
For other dag-level args, uses _merge_dag_args_from_list_configs logic.
"""
# Reuse existing merge methods by passing both dicts as a list
configs_list = [extend_args, base_args]

merged_default_args = DagFactory._merge_default_args_from_list_configs(configs_list)
merged_dag_args = DagFactory._merge_dag_args_from_list_configs(configs_list)

# Combine the results
if merged_default_args:
merged_dag_args["default_args"] = merged_default_args

return merged_dag_args

@staticmethod
def _serialise_config_md(dag_name, dag_config, default_config):
# Remove empty task_groups if it exists
Expand Down Expand Up @@ -228,11 +249,57 @@ def get_default_config(self) -> Dict[str, Any]:
"""
return self.config.get("default", {})

def get_default_args_from_extends(self) -> Dict[str, Any]:
"""
Extract and merge default configuration from all extended files.
This method processes the __extends__ key from the main config and returns
the merged default configuration from all extended files.

:returns: dict with default configuration from extends
"""
if EXTENDS_KEY not in self.config:
return {}

base_dir = self.default_args_config_path
merged_extends_defaults = {}
visited_files = set() # Track visited relative paths to detect cycles

extend_config_queue = copy.deepcopy(self.config.get(EXTENDS_KEY, []))
while extend_config_queue:
extend_config_relative_path = extend_config_queue.pop(0)

# Check for infinite loop by tracking visited relative paths
if extend_config_relative_path in visited_files:
raise DagFactoryConfigException(
f"Infinite extending loop detected: '{extend_config_relative_path}' "
f"has already been processed. Visited files: {sorted(visited_files)}"
)

visited_files.add(extend_config_relative_path)
extend_config_path = os.path.join(base_dir, extend_config_relative_path)
extend_config_path = os.path.abspath(extend_config_path) # Ensure absolute path

extend_config = DagFactory(config_filepath=extend_config_path).config
# Only 'default' is recognized in the extended config
extend_defaults = extend_config.get("default", {})

merged_extends_defaults = self._merge_dag_args_from_extends(extend_defaults, merged_extends_defaults)
extend_config_queue.extend(extend_config.get(EXTENDS_KEY, []))

return merged_extends_defaults


def build_dags(self) -> Dict[str, DAG]:
"""Build DAGs using the config file."""
dag_configs: Dict[str, Dict[str, Any]] = self.get_dag_configs()
global_default_args = self._global_default_args()
default_args_from_extends = self.get_default_args_from_extends()
default_config: Dict[str, Any] = self.get_default_config()

# Merge extends defaults into default_config (extends as base, main config overrides)
if default_args_from_extends:
default_config = self._merge_dag_args_from_extends(default_args_from_extends, default_config)

if isinstance(global_default_args, dict):
default_config["default_args"] = self._merge_default_args_from_list_configs(
[global_default_args, default_config]
Expand Down Expand Up @@ -301,7 +368,7 @@ def load_yaml_dags(

:param globals_dict: The globals() from the file used to generate DAGs
:param dags_folder: Path to the folder you want to get recursively scanned
:param default_args_config_path: The Folder path where defaults.yml exist.
:param default_args_config_path: The folder path where defaults.yml exist, or the root directory of the extended config files.
:param suffix: file suffix to filter `in` what files to scan for dags
"""
# chain all file suffixes in a single iterator
Expand Down
1 change: 1 addition & 0 deletions dagfactory/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Module contains various utilities used by dag-factory"""

import ast
import copy
import importlib.util
import json
import logging
Expand Down
19 changes: 19 additions & 0 deletions dev/dags/example_extends_chained.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

# This config file demonstrates chained __extends__ functionality:
# extends_example_dags.yml → extends_environment_specific.yml → extends_base.yml
config_file = str(CONFIG_ROOT_DIR / "extends_example_dags.yml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())
17 changes: 17 additions & 0 deletions dev/dags/extends_base.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
default:
default_args:
owner: "base_owner"
start_date: "2024-01-01"
retries: 1
retry_delay_sec: 300
email_on_failure: false
email_on_retry: false
schedule_interval: "@daily"
catchup: false
max_active_runs: 1
dagrun_timeout_sec: 3600
default_view: "tree"
orientation: "LR"
tags:
- "dag-factory"
- "base"
16 changes: 16 additions & 0 deletions dev/dags/extends_environment_specific.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
__extends__:
- "extends_base.yml"

default:
default_args:
owner: "dev_team"
retries: 3
retry_delay_sec: 180
email_on_failure: true
email: ["[email protected]"]
schedule_interval: "0 8 * * *" # 8 AM daily
max_active_runs: 2
dagrun_timeout_sec: 7200 # 2 hours
tags:
- "development"
- "etl"
48 changes: 48 additions & 0 deletions dev/dags/extends_example_dags.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
__extends__:
- "extends_environment_specific.yml"

# Override some defaults for this specific set of DAGs
default:
default_args:
owner: "data_engineering_team"
tags:
- "production"
- "daily_etl"

# Define actual DAGs
data_ingestion_dag:
description: "Daily data ingestion from external sources"
default_args:
retries: 5 # Override for this specific DAG
tasks:
extract_source_a:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'Extracting from source A'"
extract_source_b:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'Extracting from source B'"
transform_data:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'Transforming data'"
dependencies: [extract_source_a, extract_source_b]
load_data:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'Loading data to warehouse'"
dependencies: [transform_data]

data_quality_dag:
description: "Data quality checks on ingested data"
schedule_interval: "0 10 * * *" # 10 AM daily, after ingestion
default_args:
owner: "data_quality_team"
tasks:
check_row_counts:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'Checking row counts'"
check_data_freshness:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'Checking data freshness'"
generate_quality_report:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'Generating quality report'"
dependencies: [check_row_counts, check_data_freshness]
33 changes: 33 additions & 0 deletions dev/dags/extends_multiple_inheritance.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
__extends__:
- "extends_base.yml"
- "extends_environment_specific.yml"

# This demonstrates extending from multiple files at once
# Later files in the extends list take precedence over earlier ones
# So extends_environment_specific.yml will override extends_base.yml

default:
default_args:
owner: "multiple_inheritance_team"
tags:
- "multi-extend"
- "demo"

reporting_dag:
description: "Example DAG demonstrating multiple inheritance through __extends__"
schedule_interval: "0 6 * * *" # 6 AM daily
tasks:
gather_metrics:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'Gathering system metrics'"
process_logs:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'Processing application logs'"
generate_dashboard:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'Generating daily dashboard'"
dependencies: [gather_metrics, process_logs]
send_notifications:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'Sending notifications to stakeholders'"
dependencies: [generate_dashboard]
71 changes: 71 additions & 0 deletions docs/configuration/defaults.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ sample_project
└── defaults.yml
```

Currently, only `default_args` can be specified using the `defaults.yml` file.

Assuming you instantiate the DAG by using:

```python
Expand All @@ -154,3 +156,72 @@ Given the various ways to specify top-level DAG arguments, including `default_ar
2. In the `default` block within the workflow's YAML file
3. The arguments defined in `default_args_config_dict`
4. If (3) is not declared, the `defaults.yml` hierarchy.


## Configuration Inheritance with `__extends__`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this documentation would have to live in "### Combining multiple defaults.yml files", and we'd either replace the current description or we'd have both as sub-cases of this broader approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having both as sub-cases is exactly what I tried to do here. It has these H2 level title:

## YAML top-level `default`
## Specifying `default` arguments via a Python dictionary
## Declaring default values using the `defaults.yml` file
## Configuration Inheritance with `__extends__`


You can create modular, reusable configuration default config using the `__extends__` feature. This allows you to build configuration hierarchies by extending other YAML configuration files, promoting better organization and reusability of common settings.

The `__extends__` key accepts an array of config file paths, relative to the Airflow's `dags_folder` by default. The root directory of extended files is configurable through `default_args_config_path` argument fo `load_yaml_dags`.

### Benefits of using `__extends__`

- **Modularity**: Split configurations into logical, reusable components
- **Maintainability**: Centralize common configurations and reduce duplication
- **Flexibility**: Support multiple inheritance levels and chaining
- **Organization**: Create clear configuration hierarchies (For example, base → environment → team → DAG)

### Example usage of `__extends__`

#### Basic extension

**File: `extends_base.yml`**
```yaml
default:
default_args:
owner: "data_team"
retries: 2
retry_delay_sec: 300
schedule_interval: "@daily"
tags: ["dag-factory"]
```

**File: `my_dags.yml`**
```yaml
__extends__:
- "extends_base.yml"

default:
default_args:
owner: "analytics_team" # Overrides "data_team"
start_date: "2024-01-01" # Added to inherited config

my_analytics_dag:
description: "Analytics pipeline"
tasks:
extract_data:
operator: airflow.operators.bash.BashOperator
bash_command: "echo 'Extracting data...'"
```

#### Chained extension

You can create inheritance chains where configurations extend other configurations:

```yaml
# production_dags.yml extends environment_config.yml which extends base_config.yml
__extends__:
- "environment_config.yml"

default:
default_args:
owner: "prod_team" # Final override
```

### Configuration precedence with `__extends__`

When using `__extends__` along with other default configuration methods, the following precedence order applies:

1. The DAG configuration (highest priority)
2. The `default` block of the extended configuration files
3. The global default configuration in `defaults.yml` (lowest priority)
15 changes: 15 additions & 0 deletions tests/fixtures/dag_factory_extends.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
__extends__:
- extends_base.yml

default:
default_args:
owner: main_owner # This should override the base owner
email: [email protected] # This should be added to the base config
schedule_interval: "@hourly" # This should override the base schedule

example_dag_with_extends:
description: "DAG created with extends functionality"
tasks:
task_1:
operator: airflow.operators.bash.BashOperator
bash_command: echo "Hello from extended config"
19 changes: 19 additions & 0 deletions tests/fixtures/dag_factory_extends_chained.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
__extends__:
- dag_factory_extends.yml

default:
default_args:
owner: chained_owner # This should override both base and main config
depends_on_past: false # This should be added
end_date: 2023-12-31 # From the intermediate layer
email_on_failure: true # From the intermediate layer
max_active_runs: 1 # This should override the base value
dagrun_timeout_sec: 1800 # From the intermediate layer
orientation: TB # From the intermediate layer

example_dag_chained_extends:
description: "DAG created with chained extends functionality"
tasks:
task_1:
operator: airflow.operators.bash.BashOperator
bash_command: echo "Hello from chained extended config"
10 changes: 10 additions & 0 deletions tests/fixtures/extends_base.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
default:
default_args:
owner: base_owner
retries: 2
retry_delay_sec: 600
start_date: 2023-01-01
concurrency: 5
max_active_runs: 3
default_view: graph
schedule_interval: "@daily"
Loading