Skip to content

Commit 6e48d9c

Browse files
committed
[rfc][rip] dedup serdes
1 parent 3170315 commit 6e48d9c

File tree

3 files changed

+206
-9
lines changed

3 files changed

+206
-9
lines changed

python_modules/dagster/dagster/_core/definitions/partitions/subset/time_window.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def before_pack(self, value: "TimeWindowPartitionsSubset") -> "TimeWindowPartiti
5555
# value.num_partitions will calculate the number of partitions if the field is None
5656
# We want to check if the field is None and replace the value with the calculated value
5757
# for serialization
58+
5859
if value._asdict()["num_partitions"] is None:
5960
return TimeWindowPartitionsSubset(
6061
partitions_def=value.partitions_def,
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import dagster as dg
2+
from dagster import DagsterInstance
3+
from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor
4+
from dagster_shared.record import record
5+
from dagster_shared.serdes import whitelist_for_serdes
6+
from dagster_shared.serdes.serdes import deserialize_value_with_dedup, serialize_value_with_dedup
7+
8+
9+
@whitelist_for_serdes
10+
@record
11+
class Inner:
12+
number: float
13+
14+
15+
@whitelist_for_serdes
16+
@record
17+
class Foo:
18+
name: str
19+
value: int
20+
inner: Inner
21+
22+
23+
@whitelist_for_serdes
24+
@record
25+
class Bar:
26+
name: str
27+
single: Foo
28+
multiple: list[Foo]
29+
30+
31+
def test_dedup():
32+
# same object, different ids
33+
f1 = Foo(name="f1", value=1, inner=Inner(number=1.0))
34+
f1_same = Foo(name="f1", value=1, inner=Inner(number=1.0))
35+
36+
f2 = Foo(name="f2", value=2, inner=Inner(number=2.0))
37+
38+
bar = Bar(name="bar", single=f1, multiple=[f1, f1, f1_same, f1_same, f2])
39+
40+
serialized = serialize_value_with_dedup(bar)
41+
assert "__dedup_mapping__" in serialized
42+
assert "__dedup_ref__" in serialized
43+
deserialized = deserialize_value_with_dedup(serialized, as_type=Bar)
44+
assert deserialized == bar
45+
46+
47+
def test_cursor():
48+
daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")
49+
50+
@dg.asset(partitions_def=daily_partitions)
51+
def upstream_1() -> None: ...
52+
53+
@dg.asset(partitions_def=daily_partitions)
54+
def upstream_2() -> None: ...
55+
56+
@dg.asset(partitions_def=daily_partitions)
57+
def upstream_3() -> None: ...
58+
59+
@dg.asset(partitions_def=daily_partitions)
60+
def upstream_4() -> None: ...
61+
62+
@dg.asset(partitions_def=daily_partitions)
63+
def upstream_5() -> None: ...
64+
65+
@dg.asset(
66+
deps=[upstream_1, upstream_2, upstream_3, upstream_4, upstream_5],
67+
automation_condition=dg.AutomationCondition.on_cron(cron_schedule="0 * * * *"),
68+
)
69+
def downstream() -> None: ...
70+
71+
defs = dg.Definitions(
72+
assets=[upstream_1, upstream_2, upstream_3, upstream_4, upstream_5, downstream]
73+
)
74+
instance = DagsterInstance.ephemeral()
75+
76+
result = dg.evaluate_automation_conditions(defs=defs, instance=instance)
77+
cursor = result.cursor
78+
assert isinstance(cursor, AssetDaemonCursor)
79+
80+
serialized = serialize_value_with_dedup(cursor)
81+
assert "__dedup_mapping__" in serialized
82+
assert "__dedup_ref__" in serialized
83+
deserialized = deserialize_value_with_dedup(serialized, as_type=AssetDaemonCursor)
84+
assert deserialized == cursor

0 commit comments

Comments
 (0)