Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
SensorDefinition,
SensorReturnTypesUnion,
SensorType,
resolve_jobs_from_targets_for_with_attributes,
validate_and_get_resource_dict,
)
from dagster._core.definitions.target import ExecutableDefinition
from dagster._core.definitions.utils import check_valid_name
from dagster._utils import IHasInternalInit


class AssetSensorParamNames(NamedTuple):
Expand All @@ -44,7 +46,7 @@ def get_asset_sensor_param_names(fn: Callable[..., Any]) -> AssetSensorParamName


@public
class AssetSensorDefinition(SensorDefinition):
class AssetSensorDefinition(SensorDefinition, IHasInternalInit):
"""Define an asset sensor that initiates a set of runs based on the materialization of a given
asset.

Expand Down Expand Up @@ -95,6 +97,8 @@ def __init__(
metadata: Optional[RawMetadataMapping] = None,
):
self._asset_key = check.inst_param(asset_key, "asset_key", AssetKey)
self._asset_materialization_fn = asset_materialization_fn
self._job_name = job_name

from dagster._core.event_api import AssetRecordsFilter

Expand All @@ -106,6 +110,7 @@ def __init__(
check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str)
| resource_arg_names
)
self._raw_required_resource_keys = combined_required_resource_keys

def _wrap_asset_fn(materialization_fn) -> Any:
def _fn(context) -> Any:
Expand Down Expand Up @@ -184,3 +189,58 @@ def asset_key(self) -> AssetKey:
@property
def sensor_type(self) -> SensorType:
return SensorType.ASSET

@staticmethod
def dagster_internal_init( # type: ignore
*,
name: str,
asset_key: AssetKey,
job_name: Optional[str],
asset_materialization_fn: Callable[..., SensorReturnTypesUnion],
minimum_interval_seconds: Optional[int],
description: Optional[str],
job: Optional[ExecutableDefinition],
jobs: Optional[Sequence[ExecutableDefinition]],
default_status: DefaultSensorStatus,
required_resource_keys: Optional[set[str]],
tags: Optional[Mapping[str, str]],
metadata: Optional[RawMetadataMapping],
) -> "AssetSensorDefinition":
return AssetSensorDefinition(
name=name,
asset_key=asset_key,
job_name=job_name,
asset_materialization_fn=asset_materialization_fn,
minimum_interval_seconds=minimum_interval_seconds,
description=description,
job=job,
jobs=jobs,
default_status=default_status,
required_resource_keys=required_resource_keys,
tags=tags,
metadata=metadata,
)

def with_attributes(
self,
*,
jobs: Optional[Sequence[ExecutableDefinition]] = None,
metadata: Optional[RawMetadataMapping] = None,
) -> "AssetSensorDefinition":
"""Returns a copy of this sensor with the attributes replaced."""
job_name, new_job, new_jobs = resolve_jobs_from_targets_for_with_attributes(self, jobs)

return AssetSensorDefinition.dagster_internal_init(
name=self.name,
asset_key=self._asset_key,
job_name=job_name,
asset_materialization_fn=self._asset_materialization_fn,
minimum_interval_seconds=self.minimum_interval_seconds,
description=self.description,
job=new_job,
jobs=new_jobs,
default_status=self.default_status,
required_resource_keys=self._raw_required_resource_keys,
tags=self._tags,
metadata=metadata if metadata is not None else self._metadata,
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections.abc import Mapping
from collections.abc import Mapping, Sequence
from functools import partial
from typing import Any, Optional, cast

Expand All @@ -16,8 +16,10 @@
SensorEvaluationContext,
SensorType,
)
from dagster._core.definitions.target import ExecutableDefinition
from dagster._core.definitions.utils import check_valid_name
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._utils import IHasInternalInit
from dagster._utils.tags import normalize_tags

MAX_ENTITIES = 500
Expand Down Expand Up @@ -79,7 +81,7 @@ def not_supported(context) -> None:
@public
@beta_param(param="use_user_code_server")
@beta_param(param="default_condition")
class AutomationConditionSensorDefinition(SensorDefinition):
class AutomationConditionSensorDefinition(SensorDefinition, IHasInternalInit):
"""Targets a set of assets and repeatedly evaluates all the AutomationConditions on all of
those assets to determine which to request runs for.

Expand Down Expand Up @@ -171,6 +173,8 @@ def __init__(
)

self._run_tags = normalize_tags(run_tags)
self._sensor_target = target
self._emit_backfills = emit_backfills

# only store this value in the metadata if it's True
if emit_backfills:
Expand Down Expand Up @@ -210,3 +214,56 @@ def default_condition(self) -> Optional[AutomationCondition]:
@property
def sensor_type(self) -> SensorType:
return SensorType.AUTOMATION if self._use_user_code_server else SensorType.AUTO_MATERIALIZE

@staticmethod
def dagster_internal_init( # type: ignore
*,
name: str,
target: CoercibleToAssetSelection,
tags: Optional[Mapping[str, str]],
run_tags: Optional[Mapping[str, Any]],
default_status: DefaultSensorStatus,
minimum_interval_seconds: Optional[int],
description: Optional[str],
metadata: Optional[RawMetadataMapping],
emit_backfills: bool,
use_user_code_server: bool,
default_condition: Optional[AutomationCondition],
) -> "AutomationConditionSensorDefinition":
return AutomationConditionSensorDefinition(
name=name,
target=target,
tags=tags,
run_tags=run_tags,
default_status=default_status,
minimum_interval_seconds=minimum_interval_seconds,
description=description,
metadata=metadata,
emit_backfills=emit_backfills,
use_user_code_server=use_user_code_server,
default_condition=default_condition,
)

def with_attributes(
self,
*,
jobs: Optional[Sequence[ExecutableDefinition]] = None,
metadata: Optional[RawMetadataMapping] = None,
) -> "AutomationConditionSensorDefinition":
"""Returns a copy of this sensor with the attributes replaced.

Note: jobs parameter is ignored for AutomationConditionSensorDefinition as it doesn't use jobs.
"""
return AutomationConditionSensorDefinition.dagster_internal_init(
name=self.name,
target=self._sensor_target,
tags=self._tags,
run_tags=self._run_tags,
default_status=self.default_status,
minimum_interval_seconds=self.minimum_interval_seconds,
description=self.description,
metadata=metadata if metadata is not None else self._metadata,
emit_backfills=self._emit_backfills,
use_user_code_server=self._use_user_code_server,
default_condition=self._default_condition,
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
SensorType,
get_context_param_name,
get_sensor_context_from_args_or_kwargs,
resolve_jobs_from_targets_for_with_attributes,
validate_and_get_resource_dict,
)
from dagster._core.definitions.target import ExecutableDefinition
Expand All @@ -33,7 +34,7 @@
)
from dagster._core.instance import DagsterInstance
from dagster._core.instance.ref import InstanceRef
from dagster._utils import normalize_to_repository
from dagster._utils import IHasInternalInit, normalize_to_repository
from dagster._utils.warnings import deprecation_warning, normalize_renamed_param

if TYPE_CHECKING:
Expand Down Expand Up @@ -1103,7 +1104,7 @@ def build_multi_asset_sensor_context(
"multi_asset_sensors may be used."
)
@public
class MultiAssetSensorDefinition(SensorDefinition):
class MultiAssetSensorDefinition(SensorDefinition, IHasInternalInit):
"""Define an asset sensor that initiates a set of runs based on the materialization of a list of
assets.

Expand Down Expand Up @@ -1245,6 +1246,9 @@ def _check_cursor_not_set(sensor_result: SensorResult):
return _fn

self._raw_asset_materialization_fn = asset_materialization_fn
self._monitored_assets = monitored_assets
self._job_name = job_name
self._raw_required_resource_keys = combined_required_resource_keys

super().__init__(
name=check_valid_name(name),
Expand Down Expand Up @@ -1288,3 +1292,61 @@ def __call__(self, *args, **kwargs) -> AssetMaterializationFunctionReturn:
@property
def sensor_type(self) -> SensorType:
return SensorType.MULTI_ASSET

@staticmethod
def dagster_internal_init( # type: ignore
*,
name: str,
monitored_assets: Union[Sequence[AssetKey], AssetSelection],
job_name: Optional[str],
asset_materialization_fn: MultiAssetMaterializationFunction,
minimum_interval_seconds: Optional[int],
description: Optional[str],
job: Optional[ExecutableDefinition],
jobs: Optional[Sequence[ExecutableDefinition]],
default_status: DefaultSensorStatus,
request_assets: Optional[AssetSelection],
required_resource_keys: Optional[set[str]],
tags: Optional[Mapping[str, str]],
metadata: Optional[RawMetadataMapping],
) -> "MultiAssetSensorDefinition":
return MultiAssetSensorDefinition(
name=name,
monitored_assets=monitored_assets,
job_name=job_name,
asset_materialization_fn=asset_materialization_fn,
minimum_interval_seconds=minimum_interval_seconds,
description=description,
job=job,
jobs=jobs,
default_status=default_status,
request_assets=request_assets,
required_resource_keys=required_resource_keys,
tags=tags,
metadata=metadata,
)

def with_attributes(
self,
*,
jobs: Optional[Sequence[ExecutableDefinition]] = None,
metadata: Optional[RawMetadataMapping] = None,
) -> "MultiAssetSensorDefinition":
"""Returns a copy of this sensor with the attributes replaced."""
job_name, new_job, new_jobs = resolve_jobs_from_targets_for_with_attributes(self, jobs)

return MultiAssetSensorDefinition.dagster_internal_init(
name=self.name,
monitored_assets=self._monitored_assets,
job_name=job_name,
asset_materialization_fn=self._raw_asset_materialization_fn,
minimum_interval_seconds=self.minimum_interval_seconds,
description=self.description,
job=new_job,
jobs=new_jobs,
default_status=self.default_status,
request_assets=self.asset_selection,
required_resource_keys=self._raw_required_resource_keys,
tags=self._tags,
metadata=metadata if metadata is not None else self._metadata,
)
Loading
Loading