Skip to content

Commit 440201c

Browse files
authored
Merge branch 'main' into feat/no_retries_on_test_failure
2 parents 9d1aa7e + 0811e46 commit 440201c

File tree

23 files changed

+353
-26
lines changed

23 files changed

+353
-26
lines changed

.github/workflows/docs.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ on:
77

88
jobs:
99
pages:
10-
runs-on: ubuntu-20.04
10+
runs-on: ubuntu-latest
1111
environment:
1212
name: github-pages
1313
url: ${{ steps.deployment.outputs.page_url }}

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ repos:
5757
- --py37-plus
5858
- --keep-runtime-typing
5959
- repo: https://github.com/astral-sh/ruff-pre-commit
60-
rev: v0.9.6
60+
rev: v0.9.7
6161
hooks:
6262
- id: ruff
6363
args:

CHANGELOG.rst

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
Changelog
22
=========
33

4-
1.9.0a5 (2025-02-03)
4+
1.9.1a1 (2025-02-20)
5+
--------------------
6+
7+
Bug Fixes
8+
9+
* Fix import error in dbt bigquery adapter mock for ``dbt-bigquery<1.8`` for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti in #1548
10+
11+
12+
1.9.0 (2025-02-19)
513
--------------------
614

715
Breaking changes
@@ -19,23 +27,43 @@ Features
1927
* Add structure to support multiple db for async operator execution by @pankajastro in #1483
2028
* Support overriding the ``profile_config`` per dbt node or folder using config by @tatiana in #1492. More information `here <https://astronomer.github.io/astronomer-cosmos/profiles/#profile-customise-per-node>`_.
2129
* Create and run accurate SQL statements when using ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti, @tatiana and @pankajastro in #1474
30+
* Add AWS ECS task run execution mode by @CarlosGitto and @aoelvp94 in #1507
31+
* Add support for running ``DbtSourceOperator`` individually by @victormacaubas in #1510
32+
* Add setup task for async executions by @pankajastro in #1518
33+
* Add teardown task for async executions by @pankajastro in #1529
34+
* Add ``ProjectConfig.install_dbt_deps`` & change operator ``install_deps=True`` as default by @tatiana in #1521
35+
* Extend Virtualenv operator and mock dbt adapters for setup & teardown tasks in ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti, @tatiana and @pankajastro in #1544
2236

2337
Bug Fixes
2438

2539
* Fix select complex intersection of three tag-based graph selectors by @tatiana in #1466
40+
* Fix custom selector behaviour when the model name contains periods by @yakovlevvs and @60098727 in #1499
41+
* Filter dbt and non-dbt kwargs correctly for async operator by @pankajastro in #1526
2642

2743
Enhancement
2844

2945
* Fix OpenLineage deprecation warning by @CorsettiS in #1449
3046
* Move ``DbtRunner`` related functions into ``dbt/runner.py`` module by @tatiana in #1480
3147
* Add ``on_warning_callback`` to ``DbtSourceKubernetesOperator`` and refactor previous operators by @LuigiCerone in #1501
48+
* Gracefully error when users set incompatible ``RenderConfig.dbt_deps`` and ``operator_args`` ``install_deps`` by @tatiana in #1505
49+
* Store compiled SQL as template field for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti in #1534
50+
51+
Docs
3252

53+
* Improve ``RenderConfig`` arguments documentation by @tatiana in #1514
54+
* Improve callback documentation by @tatiana in #1516
55+
* Improve partial parsing docs by @tatiana in #1520
56+
* Fix typo in selecting & excluding docs by @pankajastro in #1523
57+
* Document ``async_py_requirements`` added in ``ExecutionConfig`` for ``ExecutionMode.AIRFLOW_ASYNC`` by @pankajkoti in #1545
3358

3459
Others
3560

3661
* Ignore dbt package tests when running Cosmos tests by @tatiana in #1502
62+
* Refactor to consolidate async dbt adapter code by @pankajkoti in #1509
63+
* Log elapsed time for sql file(s) upload/download by @pankajastro in #1536
64+
* Remove the fallback operator for async task by @pankajastro in #1538
3765
* GitHub Actions Dependabot: #1487
38-
* Pre-commit updates: #1473, #1493
66+
* Pre-commit updates: #1473, #1493, #1503, #1531
3967

4068

4169
1.8.2 (2025-01-15)

cosmos/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
Contains dags, task groups, and operators.
77
"""
88

9-
__version__ = "1.9.0a6"
9+
__version__ = "1.9.1a1"
1010

1111

1212
from cosmos.airflow.dag import DbtDag

cosmos/airflow/graph.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from cosmos.core.airflow import get_airflow_task as create_airflow_task
2626
from cosmos.core.graph.entities import Task as TaskMetadata
2727
from cosmos.dbt.graph import DbtNode
28+
from cosmos.exceptions import CosmosValueError
2829
from cosmos.log import get_logger
2930
from cosmos.settings import enable_setup_async_task, enable_teardown_async_task
3031

@@ -413,14 +414,19 @@ def _add_dbt_setup_async_task(
413414
tasks_map: dict[str, Any],
414415
task_group: TaskGroup | None,
415416
render_config: RenderConfig | None = None,
417+
async_py_requirements: list[str] | None = None,
416418
) -> None:
417419
if execution_mode != ExecutionMode.AIRFLOW_ASYNC:
418420
return
419421

422+
if not async_py_requirements:
423+
raise CosmosValueError("ExecutionConfig.AIRFLOW_ASYNC needs async_py_requirements to be set")
424+
420425
if render_config is not None:
421426
task_args["select"] = render_config.select
422427
task_args["selector"] = render_config.selector
423428
task_args["exclude"] = render_config.exclude
429+
task_args["py_requirements"] = async_py_requirements
424430

425431
setup_task_metadata = TaskMetadata(
426432
id=DBT_SETUP_ASYNC_TASK_ID,
@@ -495,14 +501,19 @@ def _add_teardown_task(
495501
tasks_map: dict[str, Any],
496502
task_group: TaskGroup | None,
497503
render_config: RenderConfig | None = None,
504+
async_py_requirements: list[str] | None = None,
498505
) -> None:
499506
if execution_mode != ExecutionMode.AIRFLOW_ASYNC:
500507
return
501508

509+
if not async_py_requirements:
510+
raise CosmosValueError("ExecutionConfig.AIRFLOW_ASYNC needs async_py_requirements to be set")
511+
502512
if render_config is not None:
503513
task_args["select"] = render_config.select
504514
task_args["selector"] = render_config.selector
505515
task_args["exclude"] = render_config.exclude
516+
task_args["py_requirements"] = async_py_requirements
506517

507518
teardown_task_metadata = TaskMetadata(
508519
id=DBT_TEARDOWN_ASYNC_TASK_ID,
@@ -529,6 +540,7 @@ def build_airflow_graph(
529540
render_config: RenderConfig,
530541
task_group: TaskGroup | None = None,
531542
on_warning_callback: Callable[..., Any] | None = None, # argument specific to the DBT test command
543+
async_py_requirements: list[str] | None = None,
532544
) -> dict[str, Union[TaskGroup, BaseOperator]]:
533545
"""
534546
Instantiate dbt `nodes` as Airflow tasks within the given `task_group` (optional) or `dag` (mandatory).
@@ -626,9 +638,25 @@ def build_airflow_graph(
626638

627639
create_airflow_task_dependencies(nodes, tasks_map)
628640
if enable_setup_async_task:
629-
_add_dbt_setup_async_task(dag, execution_mode, task_args, tasks_map, task_group, render_config=render_config)
641+
_add_dbt_setup_async_task(
642+
dag,
643+
execution_mode,
644+
task_args,
645+
tasks_map,
646+
task_group,
647+
render_config=render_config,
648+
async_py_requirements=async_py_requirements,
649+
)
630650
if enable_teardown_async_task:
631-
_add_teardown_task(dag, execution_mode, task_args, tasks_map, task_group, render_config=render_config)
651+
_add_teardown_task(
652+
dag,
653+
execution_mode,
654+
task_args,
655+
tasks_map,
656+
task_group,
657+
render_config=render_config,
658+
async_py_requirements=async_py_requirements,
659+
)
632660
return tasks_map
633661

634662

cosmos/config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ class ProjectConfig:
152152
:param snapshots_relative_path: The relative path to the dbt snapshots directory within the project. Defaults to
153153
snapshots
154154
:param manifest_path: The absolute path to the dbt manifest file. Defaults to None
155+
:param manifest_conn_id: Name of the Airflow connection used to access the manifest file if it is not stored locally. Defaults to None
155156
:param project_name: Allows the user to define the project name.
156157
Required if dbt_project_path is not defined. Defaults to the folder name of dbt_project_path.
157158
:param env_vars: Dictionary of environment variables that are used for both rendering and execution. Rendering with
@@ -175,6 +176,7 @@ class ProjectConfig:
175176
def __init__(
176177
self,
177178
dbt_project_path: str | Path | None = None,
179+
install_dbt_deps: bool = True,
178180
models_relative_path: str | Path = "models",
179181
seeds_relative_path: str | Path = "seeds",
180182
snapshots_relative_path: str | Path = "snapshots",
@@ -228,6 +230,7 @@ def __init__(
228230
self.env_vars = env_vars
229231
self.dbt_vars = dbt_vars
230232
self.partial_parse = partial_parse
233+
self.install_dbt_deps = install_dbt_deps
231234

232235
def validate_project(self) -> None:
233236
"""
@@ -398,6 +401,8 @@ class ExecutionConfig:
398401
:param dbt_project_path: Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
399402
:param virtualenv_dir: Directory path to locate the (cached) virtual env that
400403
should be used for execution when execution mode is set to `ExecutionMode.VIRTUALENV`
404+
:param async_py_requirements: A list of Python packages to install when `ExecutionMode.AIRFLOW_ASYNC`(Experimental) is used. This parameter is required only if both `enable_setup_async_task` and `enable_teardown_async_task` are set to `True`.
405+
Example: `["dbt-postgres==1.5.0"]`
401406
"""
402407

403408
execution_mode: ExecutionMode = ExecutionMode.LOCAL
@@ -409,6 +414,7 @@ class ExecutionConfig:
409414
virtualenv_dir: str | Path | None = None
410415

411416
project_path: Path | None = field(init=False)
417+
async_py_requirements: list[str] | None = None
412418

413419
def __post_init__(self, dbt_project_path: str | Path | None) -> None:
414420
if self.invocation_mode and self.execution_mode not in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV):

cosmos/converter.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ def override_configuration(
223223
if execution_config.invocation_mode:
224224
operator_args["invocation_mode"] = execution_config.invocation_mode
225225

226-
if execution_config in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV):
226+
if execution_config.execution_mode in (ExecutionMode.LOCAL, ExecutionMode.VIRTUALENV):
227227
if "install_deps" not in operator_args:
228228
operator_args["install_deps"] = project_config.install_dbt_deps
229229

@@ -335,6 +335,7 @@ def __init__(
335335
dbt_project_name=render_config.project_name,
336336
on_warning_callback=on_warning_callback,
337337
render_config=render_config,
338+
async_py_requirements=execution_config.async_py_requirements,
338339
)
339340

340341
current_time = time.perf_counter()

cosmos/operators/_asynchronous/__init__.py

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,74 @@
11
from __future__ import annotations
22

3+
import inspect
4+
import textwrap
5+
from pathlib import Path
36
from typing import Any
47

58
from airflow.utils.context import Context
69

7-
from cosmos.operators.local import DbtRunLocalOperator as DbtRunOperator
10+
from cosmos._utils.importer import load_method_from_module
11+
from cosmos.hooks.subprocess import FullOutputSubprocessResult
12+
from cosmos.operators.virtualenv import DbtRunVirtualenvOperator
813

914

10-
class SetupAsyncOperator(DbtRunOperator):
15+
class SetupAsyncOperator(DbtRunVirtualenvOperator):
1116
def __init__(self, *args: Any, **kwargs: Any):
1217
kwargs["emit_datasets"] = False
1318
super().__init__(*args, **kwargs)
1419

20+
def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> FullOutputSubprocessResult:
21+
profile_type = self.profile_config.get_profile_type()
22+
if not self._py_bin:
23+
raise AttributeError("_py_bin attribute not set for VirtualEnv operator")
24+
dbt_executable_path = str(Path(self._py_bin).parent / "dbt")
25+
asynchronous_operator_module = f"cosmos.operators._asynchronous.{profile_type}"
26+
mock_function_name = f"_mock_{profile_type}_adapter"
27+
mock_function = load_method_from_module(asynchronous_operator_module, mock_function_name)
28+
mock_function_full_source = inspect.getsource(mock_function)
29+
mock_function_body = textwrap.dedent("\n".join(mock_function_full_source.split("\n")[1:]))
30+
31+
with open(dbt_executable_path) as f:
32+
dbt_entrypoint_script = f.readlines()
33+
if dbt_entrypoint_script[0].startswith("#!"):
34+
dbt_entrypoint_script.insert(1, mock_function_body)
35+
with open(dbt_executable_path, "w") as f:
36+
f.writelines(dbt_entrypoint_script)
37+
38+
return super().run_subprocess(command, env, cwd)
39+
1540
def execute(self, context: Context, **kwargs: Any) -> None:
1641
async_context = {"profile_type": self.profile_config.get_profile_type()}
1742
self.build_and_run_cmd(
1843
context=context, cmd_flags=self.dbt_cmd_flags, run_as_async=True, async_context=async_context
1944
)
2045

2146

22-
class TeardownAsyncOperator(DbtRunOperator):
47+
class TeardownAsyncOperator(DbtRunVirtualenvOperator):
2348
def __init__(self, *args: Any, **kwargs: Any):
2449
kwargs["emit_datasets"] = False
2550
super().__init__(*args, **kwargs)
2651

52+
def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> FullOutputSubprocessResult:
53+
profile_type = self.profile_config.get_profile_type()
54+
if not self._py_bin:
55+
raise AttributeError("_py_bin attribute not set for VirtualEnv operator")
56+
dbt_executable_path = str(Path(self._py_bin).parent / "dbt")
57+
asynchronous_operator_module = f"cosmos.operators._asynchronous.{profile_type}"
58+
mock_function_name = f"_mock_{profile_type}_adapter"
59+
mock_function = load_method_from_module(asynchronous_operator_module, mock_function_name)
60+
mock_function_full_source = inspect.getsource(mock_function)
61+
mock_function_body = textwrap.dedent("\n".join(mock_function_full_source.split("\n")[1:]))
62+
63+
with open(dbt_executable_path) as f:
64+
dbt_entrypoint_script = f.readlines()
65+
if dbt_entrypoint_script[0].startswith("#!"):
66+
dbt_entrypoint_script.insert(1, mock_function_body)
67+
with open(dbt_executable_path, "w") as f:
68+
f.writelines(dbt_entrypoint_script)
69+
70+
return super().run_subprocess(command, env, cwd)
71+
2772
def execute(self, context: Context, **kwargs: Any) -> Any:
2873
async_context = {"profile_type": self.profile_config.get_profile_type(), "teardown_task": True}
2974
self.build_and_run_cmd(

cosmos/operators/_asynchronous/bigquery.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ def _mock_bigquery_adapter() -> None:
2828

2929
import agate
3030
from dbt.adapters.bigquery.connections import BigQueryAdapterResponse, BigQueryConnectionManager
31-
from dbt_common.clients.agate_helper import empty_table
31+
32+
try:
33+
from dbt_common.clients.agate_helper import empty_table
34+
except (ModuleNotFoundError, ImportError): # pragma: no cover
35+
from dbt.clients.agate_helper import empty_table
3236

3337
def execute( # type: ignore[no-untyped-def]
3438
self, sql, auto_begin=False, fetch=None, limit: Optional[int] = None

cosmos/operators/local.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,7 @@ def run_command(
531531
if self.install_deps:
532532
self._install_dependencies(tmp_dir_path, flags, env)
533533

534-
if run_as_async:
534+
if run_as_async and not enable_setup_async_task:
535535
self._mock_dbt_adapter(async_context)
536536

537537
full_cmd = cmd + flags

0 commit comments

Comments
 (0)