|
59 | 59 | {{ dropTempTable }}
|
60 | 60 | {%- endcall %}
|
61 | 61 | {% endmacro %}
|
| 62 | +{% materialization snapshot, adapter='synapse' %} |
| 63 | + {# |
| 64 | + Synapse-specific snapshot materialization. |
| 65 | + |
| 66 | + Purpose: |
| 67 | + This materialization is designed to handle snapshotting in Azure Synapse Analytics. |
| 68 | + It manages the creation of temporary tables, the application of snapshot strategies, |
| 69 | + and the finalization of the target table. |
| 70 | + |
| 71 | + Configuration Options: |
| 72 | + - `strategy`: The snapshot strategy to use (e.g., 'timestamp', 'check'). |
| 73 | + - `unique_key`: The unique key for identifying records in the snapshot. |
| 74 | + - `grants`: Permissions to apply to the target table after creation. |
| 75 | + |
| 76 | + Usage Example: |
| 77 | + Configure the snapshot in your model file: |
| 78 | + ``` |
| 79 | + {{ |
| 80 | + config( |
| 81 | + materialized='snapshot', |
| 82 | + strategy='timestamp', |
| 83 | + unique_key='id', |
| 84 | + grants=[{'role': 'db_owner', 'privileges': ['select']}] |
| 85 | + ) |
| 86 | + }} |
| 87 | + ``` |
| 88 | + #} |
| 89 | + {%- set config = model['config'] -%} |
| 90 | + {%- set target_table = model.get('alias', model.get('name')) -%} |
| 91 | + {%- set strategy_name = config.get('strategy') -%} |
| 92 | + {%- set unique_key = config.get('unique_key') %} |
| 93 | + -- grab current tables grants config for comparision later on |
| 94 | + {%- set grant_config = config.get('grants') -%} |
| 95 | + |
| 96 | + {% set target_relation_exists, target_relation = get_or_create_relation( |
| 97 | + database=model.database, |
| 98 | + schema=model.schema, |
| 99 | + identifier=target_table, |
| 100 | + type='table') -%} |
| 101 | + |
| 102 | + {%- if not target_relation.is_table -%} |
| 103 | + {% do exceptions.relation_wrong_type(target_relation, 'table') %} |
| 104 | + {%- endif -%} |
| 105 | + |
| 106 | + {{ run_hooks(pre_hooks, inside_transaction=False) }} |
| 107 | + {{ run_hooks(pre_hooks, inside_transaction=True) }} |
| 108 | + |
| 109 | + {% set strategy_macro = strategy_dispatch(strategy_name) %} |
| 110 | + {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %} |
| 111 | + |
| 112 | + {% set temp_snapshot_relation_exists, temp_snapshot_relation = get_or_create_relation( |
| 113 | + database=model.database, |
| 114 | + schema=model.schema, |
| 115 | + identifier=target_table+"_snapshot_staging_temp_view", |
| 116 | + type='view') -%} |
| 117 | + |
| 118 | + -- Create a temporary view to manage if user SQl uses CTE |
| 119 | + {% set temp_snapshot_relation_sql = model['compiled_code'] %} |
| 120 | + {{ adapter.drop_relation(temp_snapshot_relation) }} |
| 121 | + |
| 122 | + {% call statement('create temp_snapshot_relation') -%} |
| 123 | + {{ get_create_view_as_sql(temp_snapshot_relation, temp_snapshot_relation_sql) }} |
| 124 | + {%- endcall %} |
| 125 | + |
| 126 | + {% if not target_relation_exists %} |
| 127 | + |
| 128 | + {% set build_sql = build_snapshot_table(strategy, temp_snapshot_relation) %} |
| 129 | + |
| 130 | + -- naming a temp relation |
| 131 | + {% set tmp_relation_view = target_relation.incorporate(path={"identifier": target_relation.identifier ~ '__dbt_tmp_vw'}, type='view')-%} |
| 132 | + -- Fabric & Synapse adapters use temp relation because of lack of CTE support for CTE in CTAS, Insert |
| 133 | + -- drop temp relation if exists |
| 134 | + {{ adapter.drop_relation(tmp_relation_view) }} |
| 135 | + {% set final_sql = get_create_table_as_sql(False, target_relation, build_sql) %} |
| 136 | + {{ adapter.drop_relation(tmp_relation_view) }} |
| 137 | + |
| 138 | + {% else %} |
| 139 | + |
| 140 | + {{ adapter.valid_snapshot_target(target_relation) }} |
| 141 | + {% set staging_table = build_snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %} |
| 142 | + -- this may no-op if the database does not require column expansion |
| 143 | + {% do adapter.expand_target_column_types(from_relation=staging_table, |
| 144 | + to_relation=target_relation) %} |
| 145 | + {% set missing_columns = adapter.get_missing_columns(staging_table, target_relation) |
| 146 | + | rejectattr('name', 'equalto', 'dbt_change_type') |
| 147 | + | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') |
| 148 | + | rejectattr('name', 'equalto', 'dbt_unique_key') |
| 149 | + | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') |
| 150 | + | list %} |
| 151 | + {% if missing_columns|length > 0 %} |
| 152 | + {{log("Missing columns length is: "~ missing_columns|length)}} |
| 153 | + {% do create_columns(target_relation, missing_columns) %} |
| 154 | + {% endif %} |
| 155 | + {% set source_columns = adapter.get_columns_in_relation(staging_table) |
| 156 | + | rejectattr('name', 'equalto', 'dbt_change_type') |
| 157 | + | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') |
| 158 | + | rejectattr('name', 'equalto', 'dbt_unique_key') |
| 159 | + | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') |
| 160 | + | list %} |
| 161 | + {% set quoted_source_columns = [] %} |
| 162 | + {% for column in source_columns %} |
| 163 | + {% do quoted_source_columns.append(adapter.quote(column.name)) %} |
| 164 | + {% endfor %} |
| 165 | + |
| 166 | + {% set final_sql = snapshot_merge_sql( |
| 167 | + target = target_relation, |
| 168 | + source = staging_table, |
| 169 | + insert_cols = quoted_source_columns |
| 170 | + ) |
| 171 | + %} |
| 172 | + {% endif %} |
| 173 | + |
| 174 | + {% call statement('main') %} |
| 175 | + {{ final_sql }} |
| 176 | + {% endcall %} |
| 177 | + |
| 178 | + {{ adapter.drop_relation(temp_snapshot_relation) }} |
| 179 | + {% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode=False) %} |
| 180 | + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} |
| 181 | + |
| 182 | + {% do persist_docs(target_relation, model) %} |
| 183 | + |
| 184 | + {% if not target_relation_exists %} |
| 185 | + {% do create_indexes(target_relation) %} |
| 186 | + {% endif %} |
| 187 | + |
| 188 | + {{ run_hooks(post_hooks, inside_transaction=True) }} |
| 189 | + {{ adapter.commit() }} |
| 190 | + |
| 191 | + {% if staging_table is defined %} |
| 192 | + {% do post_snapshot(staging_table) %} |
| 193 | + {% endif %} |
| 194 | + |
| 195 | + {{ run_hooks(post_hooks, inside_transaction=False) }} |
| 196 | + {{ return({'relations': [target_relation]}) }} |
| 197 | + |
| 198 | +{% endmaterialization %} |
0 commit comments