diff --git a/dagfactory/constants.py b/dagfactory/constants.py index a1cac77b..00e64562 100644 --- a/dagfactory/constants.py +++ b/dagfactory/constants.py @@ -5,3 +5,4 @@ AIRFLOW3_MAJOR_VERSION = 3 DEFAULTS_FILE_NAME = "defaults.yml" +EXTENDS_KEY = "__extends__" diff --git a/dagfactory/dagfactory.py b/dagfactory/dagfactory.py index 68cc4359..c0050707 100644 --- a/dagfactory/dagfactory.py +++ b/dagfactory/dagfactory.py @@ -1,5 +1,6 @@ """Module contains code for loading a DagFactory config and generating DAGs""" +import copy import logging import os from itertools import chain @@ -17,13 +18,13 @@ from packaging import version from dagfactory._yaml import load_yaml_file -from dagfactory.constants import DEFAULTS_FILE_NAME +from dagfactory.constants import DEFAULTS_FILE_NAME, EXTENDS_KEY from dagfactory.dagbuilder import DagBuilder from dagfactory.exceptions import DagFactoryConfigException, DagFactoryException from dagfactory.utils import update_yaml_structure # these are params that cannot be a dag name -SYSTEM_PARAMS: List[str] = ["default", "task_groups"] +SYSTEM_PARAMS: List[str] = ["default", "task_groups", EXTENDS_KEY] class DagFactory: @@ -167,6 +168,26 @@ def _merge_dag_args_from_list_configs(configs_list: list[Dict[str, Any]]) -> Dic return final_config + @staticmethod + def _merge_dag_args_from_extends(extend_args: Dict[str, Any], base_args: Dict[str, Any]) -> Dict[str, Any]: + """ + Merges base_args into extend_args and returns a new dict. + If there are redundant keys, the correspondent value of base_args will be used. + Key "default_args" will be merged as well, using _merge_default_args_from_list_configs logic. + For other dag-level args, uses _merge_dag_args_from_list_configs logic. + """ + # Reuse existing merge methods by passing both dicts as a list + configs_list = [extend_args, base_args] + + merged_default_args = DagFactory._merge_default_args_from_list_configs(configs_list) + merged_dag_args = DagFactory._merge_dag_args_from_list_configs(configs_list) + + # Combine the results + if merged_default_args: + merged_dag_args["default_args"] = merged_default_args + + return merged_dag_args + @staticmethod def _serialise_config_md(dag_name, dag_config, default_config): # Remove empty task_groups if it exists @@ -228,11 +249,57 @@ def get_default_config(self) -> Dict[str, Any]: """ return self.config.get("default", {}) + def get_default_args_from_extends(self) -> Dict[str, Any]: + """ + Extract and merge default configuration from all extended files. + This method processes the __extends__ key from the main config and returns + the merged default configuration from all extended files. + + :returns: dict with default configuration from extends + """ + if EXTENDS_KEY not in self.config: + return {} + + base_dir = self.default_args_config_path + merged_extends_defaults = {} + visited_files = set() # Track visited relative paths to detect cycles + + extend_config_queue = copy.deepcopy(self.config.get(EXTENDS_KEY, [])) + while extend_config_queue: + extend_config_relative_path = extend_config_queue.pop(0) + + # Check for infinite loop by tracking visited relative paths + if extend_config_relative_path in visited_files: + raise DagFactoryConfigException( + f"Infinite extending loop detected: '{extend_config_relative_path}' " + f"has already been processed. Visited files: {sorted(visited_files)}" + ) + + visited_files.add(extend_config_relative_path) + extend_config_path = os.path.join(base_dir, extend_config_relative_path) + extend_config_path = os.path.abspath(extend_config_path) # Ensure absolute path + + extend_config = DagFactory(config_filepath=extend_config_path).config + # Only 'default' is recognized in the extended config + extend_defaults = extend_config.get("default", {}) + + merged_extends_defaults = self._merge_dag_args_from_extends(extend_defaults, merged_extends_defaults) + extend_config_queue.extend(extend_config.get(EXTENDS_KEY, [])) + + return merged_extends_defaults + + def build_dags(self) -> Dict[str, DAG]: """Build DAGs using the config file.""" dag_configs: Dict[str, Dict[str, Any]] = self.get_dag_configs() global_default_args = self._global_default_args() + default_args_from_extends = self.get_default_args_from_extends() default_config: Dict[str, Any] = self.get_default_config() + + # Merge extends defaults into default_config (extends as base, main config overrides) + if default_args_from_extends: + default_config = self._merge_dag_args_from_extends(default_args_from_extends, default_config) + if isinstance(global_default_args, dict): default_config["default_args"] = self._merge_default_args_from_list_configs( [global_default_args, default_config] @@ -301,7 +368,7 @@ def load_yaml_dags( :param globals_dict: The globals() from the file used to generate DAGs :param dags_folder: Path to the folder you want to get recursively scanned - :param default_args_config_path: The Folder path where defaults.yml exist. + :param default_args_config_path: The folder path where defaults.yml exist, or the root directory of the extended config files. :param suffix: file suffix to filter `in` what files to scan for dags """ # chain all file suffixes in a single iterator diff --git a/dagfactory/utils.py b/dagfactory/utils.py index 367931dc..46a4dde0 100644 --- a/dagfactory/utils.py +++ b/dagfactory/utils.py @@ -1,6 +1,7 @@ """Module contains various utilities used by dag-factory""" import ast +import copy import importlib.util import json import logging diff --git a/dev/dags/example_extends_chained.py b/dev/dags/example_extends_chained.py new file mode 100644 index 00000000..e8730da3 --- /dev/null +++ b/dev/dags/example_extends_chained.py @@ -0,0 +1,19 @@ +import os +from pathlib import Path + +# The following import is here so Airflow parses this file +# from airflow import DAG +import dagfactory + +DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/" +CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR)) + +# This config file demonstrates chained __extends__ functionality: +# extends_example_dags.yml → extends_environment_specific.yml → extends_base.yml +config_file = str(CONFIG_ROOT_DIR / "extends_example_dags.yml") + +example_dag_factory = dagfactory.DagFactory(config_file) + +# Creating task dependencies +example_dag_factory.clean_dags(globals()) +example_dag_factory.generate_dags(globals()) diff --git a/dev/dags/extends_base.yml b/dev/dags/extends_base.yml new file mode 100644 index 00000000..464f3fc0 --- /dev/null +++ b/dev/dags/extends_base.yml @@ -0,0 +1,17 @@ +default: + default_args: + owner: "base_owner" + start_date: "2024-01-01" + retries: 1 + retry_delay_sec: 300 + email_on_failure: false + email_on_retry: false + schedule_interval: "@daily" + catchup: false + max_active_runs: 1 + dagrun_timeout_sec: 3600 + default_view: "tree" + orientation: "LR" + tags: + - "dag-factory" + - "base" diff --git a/dev/dags/extends_environment_specific.yml b/dev/dags/extends_environment_specific.yml new file mode 100644 index 00000000..b0e774d9 --- /dev/null +++ b/dev/dags/extends_environment_specific.yml @@ -0,0 +1,16 @@ +__extends__: + - "extends_base.yml" + +default: + default_args: + owner: "dev_team" + retries: 3 + retry_delay_sec: 180 + email_on_failure: true + email: ["dev-team@company.com"] + schedule_interval: "0 8 * * *" # 8 AM daily + max_active_runs: 2 + dagrun_timeout_sec: 7200 # 2 hours + tags: + - "development" + - "etl" diff --git a/dev/dags/extends_example_dags.yml b/dev/dags/extends_example_dags.yml new file mode 100644 index 00000000..038d0214 --- /dev/null +++ b/dev/dags/extends_example_dags.yml @@ -0,0 +1,48 @@ +__extends__: + - "extends_environment_specific.yml" + +# Override some defaults for this specific set of DAGs +default: + default_args: + owner: "data_engineering_team" + tags: + - "production" + - "daily_etl" + +# Define actual DAGs +data_ingestion_dag: + description: "Daily data ingestion from external sources" + default_args: + retries: 5 # Override for this specific DAG + tasks: + extract_source_a: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 'Extracting from source A'" + extract_source_b: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 'Extracting from source B'" + transform_data: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 'Transforming data'" + dependencies: [extract_source_a, extract_source_b] + load_data: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 'Loading data to warehouse'" + dependencies: [transform_data] + +data_quality_dag: + description: "Data quality checks on ingested data" + schedule_interval: "0 10 * * *" # 10 AM daily, after ingestion + default_args: + owner: "data_quality_team" + tasks: + check_row_counts: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 'Checking row counts'" + check_data_freshness: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 'Checking data freshness'" + generate_quality_report: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 'Generating quality report'" + dependencies: [check_row_counts, check_data_freshness] diff --git a/dev/dags/extends_multiple_inheritance.yml b/dev/dags/extends_multiple_inheritance.yml new file mode 100644 index 00000000..dafbb7e5 --- /dev/null +++ b/dev/dags/extends_multiple_inheritance.yml @@ -0,0 +1,33 @@ +__extends__: + - "extends_base.yml" + - "extends_environment_specific.yml" + +# This demonstrates extending from multiple files at once +# Later files in the extends list take precedence over earlier ones +# So extends_environment_specific.yml will override extends_base.yml + +default: + default_args: + owner: "multiple_inheritance_team" + tags: + - "multi-extend" + - "demo" + +reporting_dag: + description: "Example DAG demonstrating multiple inheritance through __extends__" + schedule_interval: "0 6 * * *" # 6 AM daily + tasks: + gather_metrics: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 'Gathering system metrics'" + process_logs: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 'Processing application logs'" + generate_dashboard: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 'Generating daily dashboard'" + dependencies: [gather_metrics, process_logs] + send_notifications: + operator: airflow.operators.bash_operator.BashOperator + bash_command: "echo 'Sending notifications to stakeholders'" + dependencies: [generate_dashboard] diff --git a/docs/configuration/defaults.md b/docs/configuration/defaults.md index 2013d886..472f0350 100644 --- a/docs/configuration/defaults.md +++ b/docs/configuration/defaults.md @@ -129,6 +129,8 @@ sample_project └── defaults.yml ``` +Currently, only `default_args` can be specified using the `defaults.yml` file. + Assuming you instantiate the DAG by using: ```python @@ -154,3 +156,72 @@ Given the various ways to specify top-level DAG arguments, including `default_ar 2. In the `default` block within the workflow's YAML file 3. The arguments defined in `default_args_config_dict` 4. If (3) is not declared, the `defaults.yml` hierarchy. + + +## Configuration Inheritance with `__extends__` + +You can create modular, reusable configuration default config using the `__extends__` feature. This allows you to build configuration hierarchies by extending other YAML configuration files, promoting better organization and reusability of common settings. + +The `__extends__` key accepts an array of config file paths, relative to the Airflow's `dags_folder` by default. The root directory of extended files is configurable through `default_args_config_path` argument fo `load_yaml_dags`. + +### Benefits of using `__extends__` + +- **Modularity**: Split configurations into logical, reusable components +- **Maintainability**: Centralize common configurations and reduce duplication +- **Flexibility**: Support multiple inheritance levels and chaining +- **Organization**: Create clear configuration hierarchies (For example, base → environment → team → DAG) + +### Example usage of `__extends__` + +#### Basic extension + +**File: `extends_base.yml`** +```yaml +default: + default_args: + owner: "data_team" + retries: 2 + retry_delay_sec: 300 + schedule_interval: "@daily" + tags: ["dag-factory"] +``` + +**File: `my_dags.yml`** +```yaml +__extends__: + - "extends_base.yml" + +default: + default_args: + owner: "analytics_team" # Overrides "data_team" + start_date: "2024-01-01" # Added to inherited config + +my_analytics_dag: + description: "Analytics pipeline" + tasks: + extract_data: + operator: airflow.operators.bash.BashOperator + bash_command: "echo 'Extracting data...'" +``` + +#### Chained extension + +You can create inheritance chains where configurations extend other configurations: + +```yaml +# production_dags.yml extends environment_config.yml which extends base_config.yml +__extends__: + - "environment_config.yml" + +default: + default_args: + owner: "prod_team" # Final override +``` + +### Configuration precedence with `__extends__` + +When using `__extends__` along with other default configuration methods, the following precedence order applies: + +1. The DAG configuration (highest priority) +2. The `default` block of the extended configuration files +3. The global default configuration in `defaults.yml` (lowest priority) diff --git a/tests/fixtures/dag_factory_extends.yml b/tests/fixtures/dag_factory_extends.yml new file mode 100644 index 00000000..b9686b69 --- /dev/null +++ b/tests/fixtures/dag_factory_extends.yml @@ -0,0 +1,15 @@ +__extends__: + - extends_base.yml + +default: + default_args: + owner: main_owner # This should override the base owner + email: test@example.com # This should be added to the base config + schedule_interval: "@hourly" # This should override the base schedule + +example_dag_with_extends: + description: "DAG created with extends functionality" + tasks: + task_1: + operator: airflow.operators.bash.BashOperator + bash_command: echo "Hello from extended config" diff --git a/tests/fixtures/dag_factory_extends_chained.yml b/tests/fixtures/dag_factory_extends_chained.yml new file mode 100644 index 00000000..7f4a433c --- /dev/null +++ b/tests/fixtures/dag_factory_extends_chained.yml @@ -0,0 +1,19 @@ +__extends__: + - dag_factory_extends.yml + +default: + default_args: + owner: chained_owner # This should override both base and main config + depends_on_past: false # This should be added + end_date: 2023-12-31 # From the intermediate layer + email_on_failure: true # From the intermediate layer + max_active_runs: 1 # This should override the base value + dagrun_timeout_sec: 1800 # From the intermediate layer + orientation: TB # From the intermediate layer + +example_dag_chained_extends: + description: "DAG created with chained extends functionality" + tasks: + task_1: + operator: airflow.operators.bash.BashOperator + bash_command: echo "Hello from chained extended config" diff --git a/tests/fixtures/extends_base.yml b/tests/fixtures/extends_base.yml new file mode 100644 index 00000000..507c1199 --- /dev/null +++ b/tests/fixtures/extends_base.yml @@ -0,0 +1,10 @@ +default: + default_args: + owner: base_owner + retries: 2 + retry_delay_sec: 600 + start_date: 2023-01-01 + concurrency: 5 + max_active_runs: 3 + default_view: graph + schedule_interval: "@daily" diff --git a/tests/test_dagfactory.py b/tests/test_dagfactory.py index 8f010b47..08c40c17 100644 --- a/tests/test_dagfactory.py +++ b/tests/test_dagfactory.py @@ -25,6 +25,7 @@ from dagfactory import DagFactory, dagfactory, exceptions, load_yaml_dags + TEST_DAG_FACTORY = os.path.join(here, "fixtures/dag_factory.yml") DAG_FACTORY_NO_OR_NONE_STRING_SCHEDULE = os.path.join(here, "fixtures/dag_factory_no_or_none_string_schedule.yml") DAG_FACTORY_SCHEDULE_INTERVAL = os.path.join(here, "fixtures/dag_factory_schedule_interval.yml") @@ -306,7 +307,8 @@ def test_get_dag_configs(monkeypatch): assert sorted(actual) == sorted(expected) -def test_get_default_config(): +def test_get_default_config(monkeypatch): + monkeypatch.setenv("AUTO_CONVERT_TO_AF3", "true") td = dagfactory.DagFactory(TEST_DAG_FACTORY) expected = { "default_args": { @@ -847,3 +849,305 @@ def test_tasks_and_task_groups_as_dict_yaml(tmp_path): assert len(dag.tasks) == 2 # Validate grouping assert any(task.task_id.startswith("task_group_1.task_2") for task in dag.tasks) + + +def test_load_dag_config_with_extends(monkeypatch): + """Test that extends functionality works correctly through get_default_args_from_extends and build_dags.""" + monkeypatch.setenv("AUTO_CONVERT_TO_AF3", "true") + test_config_path = os.path.join(here, "fixtures/dag_factory_extends.yml") + fixtures_dir = os.path.join(here, "fixtures") + td = dagfactory.DagFactory(test_config_path, default_args_config_path=fixtures_dir) + + # Test that _load_dag_config preserves extends key but doesn't process it + config = td._load_dag_config(test_config_path) + assert "__extends__" in config + assert config["__extends__"] == ["extends_base.yml"] + + # Test that get_default_args_from_extends processes extends correctly + extends_defaults = td.get_default_args_from_extends() + assert extends_defaults["concurrency"] == 5 # From base config (will be converted to max_active_tasks later) + assert extends_defaults["max_active_runs"] == 3 # From base config + assert extends_defaults["default_view"] == "graph" # From base config + assert extends_defaults[get_schedule_key()] == "@daily" # From base config (handles Airflow 2/3 compatibility) + assert extends_defaults["default_args"]["owner"] == "base_owner" # From base config + assert extends_defaults["default_args"]["retries"] == 2 # From base config + + # Test that the full DAG building process works correctly + dags = td.build_dags() + dag = dags["example_dag_with_extends"] + + # Verify that base config values are applied + assert dag.max_active_runs == 3 # From base config + assert dag.max_active_tasks == 5 # From base config (Airflow 3 equivalent of concurrency) + + # Verify that main config overrides work + assert dag.schedule == "@hourly" # Overridden from main config + assert dag.default_args["owner"] == "main_owner" # Overridden from main config + + # Verify that base config values are preserved when not overridden + assert dag.default_args["retries"] == 2 # From base config + assert dag.default_args["retry_delay_sec"] == 600 # From base config + + # Verify that new values are added + assert dag.default_args["email"] == "test@example.com" # New from main config + + +def test_load_dag_config_with_chained_extends(monkeypatch): + """Test that chained extends functionality works correctly through get_default_args_from_extends and build_dags.""" + monkeypatch.setenv("AUTO_CONVERT_TO_AF3", "true") + test_config_path = os.path.join(here, "fixtures/dag_factory_extends_chained.yml") + fixtures_dir = os.path.join(here, "fixtures") + td = dagfactory.DagFactory(test_config_path, default_args_config_path=fixtures_dir) + + # Test that _load_dag_config preserves extends key but doesn't process it + config = td._load_dag_config(test_config_path) + assert "__extends__" in config + assert config["__extends__"] == ["dag_factory_extends.yml"] + + # Test that get_default_args_from_extends processes chained extends correctly + extends_defaults = td.get_default_args_from_extends() + + # Values should come from the base config (extends_base.yml) + assert extends_defaults["concurrency"] == 5 # From base + assert extends_defaults["default_view"] == "graph" # From base + assert extends_defaults["default_args"]["retries"] == 2 # From base + assert extends_defaults["default_args"]["retry_delay_sec"] == 600 # From base + + # Values from middle config should override base + assert extends_defaults[get_schedule_key()] == "@hourly" # From middle config (overrides @daily from base) + assert extends_defaults["default_args"]["email"] == "test@example.com" # From middle config + + # Test that the full DAG building process works correctly + dags = td.build_dags() + dag = dags["example_dag_chained_extends"] + + # Verify that chained extends values are applied correctly + assert dag.max_active_runs == 1 # Final override from main config + assert dag.max_active_tasks == 5 # From base config + assert dag.default_args["owner"] == "chained_owner" # Final override from main config + assert dag.default_args["retries"] == 2 # From base config (preserved through chain) + assert dag.default_args["email"] == "test@example.com" # From middle config (preserved) + + +def test_load_dag_config_extends_missing_file(): + """Test that get_default_args_from_extends raises appropriate error when extended file is missing.""" + # Create a temporary config that references a non-existent file + import tempfile + + with tempfile.NamedTemporaryFile(mode="w", suffix=".yml", delete=False) as f: + f.write( + """ +__extends__: + - non_existent_file.yml + +default: + default_args: + owner: test_owner + +test_dag: + tasks: + task_1: + operator: airflow.operators.bash.BashOperator + bash_command: echo "test" +""" + ) + temp_config_path = f.name + + try: + # Set the base directory to the temp directory where the config file is created + temp_dir = os.path.dirname(temp_config_path) + with pytest.raises(FileNotFoundError): # Should raise FileNotFoundError due to missing file + td = dagfactory.DagFactory(temp_config_path, default_args_config_path=temp_dir) + td.get_default_args_from_extends() + finally: + # Clean up temporary file + os.unlink(temp_config_path) + + +def test_load_dag_config_extends_empty_list(): + """Test that _load_dag_config handles empty __extends__ list correctly.""" + import tempfile + + with tempfile.NamedTemporaryFile(mode="w", suffix=".yml", delete=False) as f: + f.write( + """ +__extends__: [] + +default: + default_args: + owner: test_owner + start_date: 2023-01-01 + +test_dag: + tasks: + task_1: + operator: airflow.operators.bash.BashOperator + bash_command: echo "test" +""" + ) + temp_config_path = f.name + + try: + td = dagfactory.DagFactory(temp_config_path) + actual = td._load_dag_config(temp_config_path) + + # Verify that the extends key is preserved (even when empty) + assert "__extends__" in actual + assert actual["__extends__"] == [] + + # Verify that the config is processed normally + assert actual["default"]["default_args"]["owner"] == "test_owner" + assert actual["default"]["default_args"]["start_date"] == datetime.date(2023, 1, 1) # Parsed as date + assert "test_dag" in actual + finally: + # Clean up temporary file + os.unlink(temp_config_path) + + +def test_load_dag_config_extends_infinite_loop(): + """Test that get_default_args_from_extends raises appropriate error when infinite extending loop is detected.""" + import tempfile + + # Create two temp files that reference each other to create an infinite loop + with tempfile.NamedTemporaryFile(mode="w", suffix=".yml", delete=False) as f1: + f1.write( + """ +__extends__: + - file2.yml + +default: + default_args: + owner: file1_owner +""" + ) + temp_config1_path = f1.name + + with tempfile.NamedTemporaryFile(mode="w", suffix=".yml", delete=False) as f2: + f2.write( + """ +__extends__: + - file1.yml + +default: + default_args: + owner: file2_owner +""" + ) + temp_config2_path = f2.name + + # Rename files to match the references in the config + base_dir = os.path.dirname(temp_config1_path) + file1_path = os.path.join(base_dir, "file1.yml") + file2_path = os.path.join(base_dir, "file2.yml") + + os.rename(temp_config1_path, file1_path) + os.rename(temp_config2_path, file2_path) + + try: + # Test starting from file1 -> file2 -> file1 (infinite loop) + with pytest.raises(exceptions.DagFactoryConfigException) as exc_info: + td = dagfactory.DagFactory(file1_path, default_args_config_path=base_dir) + td.get_default_args_from_extends() + + # Verify that the error message mentions the infinite loop and the problematic file + error_message = str(exc_info.value) + assert "Infinite extending loop detected" in error_message + assert "file1.yml" in error_message + assert "has already been processed" in error_message + + finally: + # Clean up temporary files + for file_path in [file1_path, file2_path]: + if os.path.exists(file_path): + os.unlink(file_path) + + +def test_priority_defaults_extends_and_default_block(): + """Test that defaults.yml, __extends__, and default block are applied with correct priority. + + Priority behavior: + - For default_args: main config > __extends__ > defaults.yml + - For DAG-level properties: defaults.yml appears to override __extends__ configs + - Tags: defaults.yml tags are preserved, with dagfactory auto-added + """ + import tempfile + + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # 1. Create defaults.yml (lowest priority) + defaults_content = """ +default_args: + owner: global_owner + retries: 5 + start_date: 2020-01-01 +max_active_runs: 5 +tags: ["global"] +""" + defaults_file = temp_path / "defaults.yml" + with open(defaults_file, "w") as f: + f.write(defaults_content) + + # 2. Create base config for __extends__ (medium priority) + base_config_content = """ +default: + default_args: + owner: extends_owner + retries: 3 + start_date: 2021-01-01 + email: test@extends.com + max_active_runs: 3 + tags: ["extends"] +""" + base_config_file = temp_path / "base_config.yml" + with open(base_config_file, "w") as f: + f.write(base_config_content) + + # 3. Create main config with __extends__ and default block (highest priority) + main_config_content = f""" +__extends__: + - base_config.yml + +default: + default_args: + owner: main_owner + retries: 1 + depends_on_past: true + tags: ["main"] + +test_dag: + tasks: + task_1: + operator: airflow.operators.bash.BashOperator + bash_command: echo "test" +""" + main_config_file = temp_path / "main_config.yml" + with open(main_config_file, "w") as f: + f.write(main_config_content) + + # Create DagFactory with defaults.yml path and main config + td = dagfactory.DagFactory( + config_filepath=str(main_config_file), + default_args_config_path=str(temp_path) + ) + + # Build DAGs to test the full priority chain + dags = td.build_dags() + dag = dags["test_dag"] + + # Test default_args priority: main > extends > defaults + assert dag.default_args["owner"] == "main_owner" # From main config (highest priority) + assert dag.default_args["retries"] == 1 # From main config (highest priority) + assert dag.default_args["start_date"] == DateTime(2021, 1, 1, 0, 0, 0, tzinfo=Timezone("UTC")) # From extends (medium priority, not overridden) + assert dag.default_args["email"] == "test@extends.com" # From extends (medium priority, not overridden) + assert dag.default_args["depends_on_past"] == True # From main config (only defined there) + + # Test DAG-level properties priority: + # For max_active_runs, defaults.yml overrides extends config + assert dag.max_active_runs == 5 # From defaults.yml (global defaults take precedence for DAG-level props) + assert dag.owner == "main_owner" # From main config default_args (highest priority) + + # Test tags behavior (defaults.yml tags are used as base) + # Both "global" from defaults.yml and "dagfactory" (auto-added) should be present + assert "global" in dag.tags + assert "dagfactory" in dag.tags