Skip to content

Commit 10ebdb5

Browse files
authored
add resync and poll (#32734)
1 parent 32af01f commit 10ebdb5

File tree

7 files changed

+403
-5
lines changed

7 files changed

+403
-5
lines changed

docs/docs/integrations/libraries/fivetran/fivetran-pythonic.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,29 @@ If you want to customize the sync of your connectors, you can use the <PyObject
8484
language="python"
8585
/>
8686

87+
### Perform historical resyncs of Fivetran assets
88+
89+
In addition to incremental syncs, you can perform full historical resyncs of your Fivetran connector tables using the <PyObject section="libraries" module="dagster_fivetran" object="FivetranWorkspace" displayText="resync_and_poll()" /> method. This is useful when you need to backfill historical data or reload data after schema changes.
90+
91+
You can resync specific tables by providing `resync_parameters`, or resync all tables in a connector by omitting this parameter:
92+
93+
<CodeExample path="docs_snippets/docs_snippets/integrations/fivetran/resync_fivetran_assets.py" language="python" />
94+
95+
To resync all tables in a connector, simply call `resync_and_poll()` without the `resync_parameters` argument:
96+
97+
<CodeExample
98+
startAfter="start_resync_all"
99+
endBefore="end_resync_all"
100+
path="docs_snippets/docs_snippets/integrations/fivetran/resync_fivetran_assets.py"
101+
language="python"
102+
/>
103+
104+
:::note
105+
106+
Historical resyncs can be time-consuming and resource-intensive operations. Be mindful of the cost implications when resyncing large datasets.
107+
108+
:::
109+
87110
### Customize asset definition metadata for Fivetran assets
88111

89112
By default, Dagster will generate asset specs for each Fivetran asset and populate default metadata. You can further customize asset properties by passing an instance of the custom <PyObject section="libraries" module="dagster_fivetran" object="DagsterFivetranTranslator" /> to the <PyObject section="libraries" module="dagster_fivetran" object="load_fivetran_asset_specs" /> function.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
from dagster_fivetran import FivetranSyncConfig, FivetranWorkspace, fivetran_assets
2+
3+
import dagster as dg
4+
5+
fivetran_workspace = FivetranWorkspace(
6+
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
7+
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
8+
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
9+
)
10+
11+
12+
@fivetran_assets(
13+
connector_id="fivetran_connector_id", # Replace with your connector ID
14+
name="fivetran_connector_name", # Replace with your connection name
15+
group_name="fivetran_connector_name",
16+
workspace=fivetran_workspace,
17+
)
18+
def fivetran_connector_assets(
19+
context: dg.AssetExecutionContext,
20+
fivetran: FivetranWorkspace,
21+
config: FivetranSyncConfig,
22+
):
23+
"""Syncs Fivetran connector with optional resync capability.
24+
25+
Configure at runtime:
26+
- For normal sync: Pass config with resync=False (default)
27+
- For historical resync of specific tables: Pass config with resync=True and resync_parameters
28+
- For full historical resync: Pass config with resync=True and no resync_parameters
29+
"""
30+
yield from fivetran.sync_and_poll(context=context, config=config)
31+
32+
33+
# start_resync_all
34+
@fivetran_assets(
35+
connector_id="fivetran_connector_id",
36+
name="fivetran_connector_name_full_resync",
37+
group_name="fivetran_connector_name",
38+
workspace=fivetran_workspace,
39+
)
40+
def fivetran_connector_full_resync_assets(
41+
context: dg.AssetExecutionContext,
42+
fivetran: FivetranWorkspace,
43+
config: FivetranSyncConfig,
44+
):
45+
"""Performs a full historical resync of all tables in the connector.
46+
47+
Configure at runtime with resync=True.
48+
"""
49+
yield from fivetran.sync_and_poll(context=context, config=config)
50+
51+
52+
# end_resync_all
53+
54+
55+
defs = dg.Definitions(
56+
assets=[fivetran_connector_assets, fivetran_connector_full_resync_assets],
57+
resources={"fivetran": fivetran_workspace},
58+
)

examples/docs_snippets/docs_snippets_tests/test_integration_files_load.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
f"{snippets_folder}/fivetran/create_fivetran_all_assets_job.py",
3535
f"{snippets_folder}/fivetran/create_fivetran_selection_job.py",
3636
f"{snippets_folder}/fivetran/schedule_fivetran_jobs.py",
37+
f"{snippets_folder}/fivetran/resync_fivetran_assets.py",
3738
f"{snippets_folder}/airbyte_cloud/customize_airbyte_cloud_asset_defs.py",
3839
f"{snippets_folder}/airbyte_cloud/customize_airbyte_cloud_translator_asset_spec.py",
3940
f"{snippets_folder}/airbyte_cloud/define_downstream_dependencies.py",

python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
from dagster_fivetran.resources import (
1717
FivetranResource as FivetranResource,
18+
FivetranSyncConfig as FivetranSyncConfig,
1819
FivetranWorkspace as FivetranWorkspace,
1920
fivetran_resource as fivetran_resource,
2021
load_fivetran_asset_specs as load_fivetran_asset_specs,

python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py

Lines changed: 138 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from dagster import (
1414
AssetExecutionContext,
1515
AssetMaterialization,
16+
Config,
1617
Definitions,
1718
Failure,
1819
InitResourceContext,
@@ -928,6 +929,29 @@ def _sync_and_poll(
928929
return FivetranOutput(connector_details=final_details, schema_config=schema_config_details)
929930

930931

932+
class FivetranSyncConfig(Config):
933+
"""Configuration for controlling Fivetran sync behavior.
934+
935+
Attributes:
936+
resync: If True, performs a historical resync. If False, performs a normal sync.
937+
resync_parameters: Optional parameters to control which tables to resync.
938+
If not provided with resync=True, all tables will be resynced.
939+
Example: {"schema_name": ["table1", "table2"], "another_schema": ["table3"]}
940+
"""
941+
942+
resync: bool = Field(
943+
default=False,
944+
description="Whether to perform a historical resync instead of a normal sync",
945+
)
946+
resync_parameters: Optional[dict[str, Any]] = Field(
947+
default=None,
948+
description=(
949+
"Optional parameters to control which tables to resync. "
950+
"If not provided with resync=True, all tables will be resynced."
951+
),
952+
)
953+
954+
931955
class FivetranWorkspace(ConfigurableResource):
932956
"""This class represents a Fivetran workspace and provides utilities
933957
to interact with Fivetran APIs.
@@ -1185,22 +1209,84 @@ def _generate_materialization(
11851209

11861210
@public
11871211
def sync_and_poll(
1188-
self, context: AssetExecutionContext
1212+
self,
1213+
context: AssetExecutionContext,
1214+
config: Optional[FivetranSyncConfig] = None,
11891215
) -> FivetranEventIterator[Union[AssetMaterialization, MaterializeResult]]:
11901216
"""Executes a sync and poll process to materialize Fivetran assets.
11911217
This method can only be used in the context of an asset execution.
11921218
11931219
Args:
11941220
context (AssetExecutionContext): The execution context
11951221
from within `@fivetran_assets`.
1222+
config (Optional[FivetranSyncConfig]): Optional configuration to control sync behavior.
1223+
If config.resync is True, performs a historical resync instead of a normal sync.
1224+
If config.resync_parameters is provided, only the specified tables will be resynced.
11961225
11971226
Returns:
11981227
Iterator[Union[AssetMaterialization, MaterializeResult]]: An iterator of MaterializeResult
11991228
or AssetMaterialization.
1229+
1230+
Examples:
1231+
Normal sync (without config):
1232+
1233+
.. code-block:: python
1234+
1235+
from dagster import AssetExecutionContext
1236+
from dagster_fivetran import FivetranWorkspace, fivetran_assets
1237+
1238+
@fivetran_assets(connector_id="my_connector", workspace=fivetran_workspace)
1239+
def my_fivetran_assets(context: AssetExecutionContext, fivetran: FivetranWorkspace):
1240+
yield from fivetran.sync_and_poll(context=context)
1241+
1242+
Historical resync of specific tables (config passed at runtime):
1243+
1244+
.. code-block:: python
1245+
1246+
from dagster import AssetExecutionContext
1247+
from dagster_fivetran import FivetranWorkspace, FivetranSyncConfig, fivetran_assets
1248+
1249+
@fivetran_assets(connector_id="my_connector", workspace=fivetran_workspace)
1250+
def my_fivetran_assets(
1251+
context: AssetExecutionContext,
1252+
fivetran: FivetranWorkspace,
1253+
config: FivetranSyncConfig,
1254+
):
1255+
# When materializing, pass config with:
1256+
# resync=True
1257+
# resync_parameters={"schema_name": ["table1", "table2"]}
1258+
yield from fivetran.sync_and_poll(context=context, config=config)
1259+
1260+
Full historical resync (config passed at runtime):
1261+
1262+
.. code-block:: python
1263+
1264+
from dagster import AssetExecutionContext
1265+
from dagster_fivetran import FivetranWorkspace, FivetranSyncConfig, fivetran_assets
1266+
1267+
@fivetran_assets(connector_id="my_connector", workspace=fivetran_workspace)
1268+
def my_fivetran_assets(
1269+
context: AssetExecutionContext,
1270+
fivetran: FivetranWorkspace,
1271+
config: FivetranSyncConfig,
1272+
):
1273+
# When materializing, pass config with resync=True to resync all tables
1274+
yield from fivetran.sync_and_poll(context=context, config=config)
12001275
"""
1201-
return FivetranEventIterator(
1202-
events=self._sync_and_poll(context=context), fivetran_workspace=self, context=context
1203-
)
1276+
if config and config.resync:
1277+
return FivetranEventIterator(
1278+
events=self._resync_and_poll(
1279+
context=context, resync_parameters=config.resync_parameters
1280+
),
1281+
fivetran_workspace=self,
1282+
context=context,
1283+
)
1284+
else:
1285+
return FivetranEventIterator(
1286+
events=self._sync_and_poll(context=context),
1287+
fivetran_workspace=self,
1288+
context=context,
1289+
)
12041290

12051291
def _sync_and_poll(self, context: AssetExecutionContext):
12061292
assets_def = context.assets_def
@@ -1245,6 +1331,54 @@ def _sync_and_poll(self, context: AssetExecutionContext):
12451331
if unmaterialized_asset_keys:
12461332
context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}")
12471333

1334+
def _resync_and_poll(
1335+
self,
1336+
context: AssetExecutionContext,
1337+
resync_parameters: Optional[Mapping[str, Sequence[str]]] = None,
1338+
):
1339+
assets_def = context.assets_def
1340+
dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def)
1341+
connector_id = next(
1342+
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
1343+
for spec in assets_def.specs
1344+
)
1345+
1346+
client = self.get_client()
1347+
fivetran_output = client.resync_and_poll(
1348+
connector_id=connector_id,
1349+
resync_parameters=resync_parameters,
1350+
)
1351+
1352+
# The FivetranOutput is None if the connector hasn't been synced
1353+
if not fivetran_output:
1354+
context.log.warning(
1355+
f"The connector with ID {connector_id} is currently paused and so it has not been resynced. "
1356+
f"Make sure that your connector is enabled before resyncing it with Dagster."
1357+
)
1358+
return
1359+
1360+
materialized_asset_keys = set()
1361+
for materialization in self._generate_materialization(
1362+
fivetran_output=fivetran_output, dagster_fivetran_translator=dagster_fivetran_translator
1363+
):
1364+
# Scan through all tables actually created, if it was expected then emit a MaterializeResult.
1365+
# Otherwise, emit a runtime AssetMaterialization.
1366+
if materialization.asset_key in context.selected_asset_keys:
1367+
yield MaterializeResult(
1368+
asset_key=materialization.asset_key, metadata=materialization.metadata
1369+
)
1370+
materialized_asset_keys.add(materialization.asset_key)
1371+
else:
1372+
context.log.warning(
1373+
f"An unexpected asset was materialized: {materialization.asset_key}. "
1374+
f"Yielding a materialization event."
1375+
)
1376+
yield materialization
1377+
1378+
unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys
1379+
if unmaterialized_asset_keys:
1380+
context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}")
1381+
12481382

12491383
def load_fivetran_asset_specs(
12501384
workspace: FivetranWorkspace,

python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/conftest.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,3 +775,41 @@ def sync_and_poll_fixture():
775775
paused_connector_output,
776776
]
777777
yield mocked_function
778+
779+
780+
@pytest.fixture(name="resync_and_poll")
781+
def resync_and_poll_fixture():
782+
with (
783+
patch("dagster_fivetran.resources.FivetranClient.resync_and_poll") as mocked_function,
784+
patch(
785+
"dagster_fivetran.resources.FivetranResource.resync_and_poll"
786+
) as mocked_resync_and_poll_legacy_resource,
787+
):
788+
# Fivetran output where all resync'd tables match the workspace data that was used to create the assets def
789+
expected_fivetran_output = FivetranOutput(
790+
connector_details=get_sample_connection_details(
791+
succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR
792+
)["data"],
793+
schema_config=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR["data"],
794+
)
795+
# Fivetran output where a table is missing and an unexpected table is resync'd,
796+
# compared to the workspace data that was used to create the assets def
797+
unexpected_fivetran_output = FivetranOutput(
798+
connector_details=get_sample_connection_details(
799+
succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR
800+
)["data"],
801+
schema_config=ALTERED_SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR["data"],
802+
)
803+
# resync_and_poll returns None if the connector is paused
804+
paused_connector_output = None
805+
mocked_function.side_effect = [
806+
expected_fivetran_output,
807+
unexpected_fivetran_output,
808+
paused_connector_output,
809+
]
810+
mocked_resync_and_poll_legacy_resource.side_effect = [
811+
expected_fivetran_output,
812+
unexpected_fivetran_output,
813+
paused_connector_output,
814+
]
815+
yield mocked_function

0 commit comments

Comments
 (0)