|
| 1 | +# This example demonstrates how to build an **incremental dbt Python model** |
| 2 | +# using BigFrames. |
| 3 | +# |
| 4 | +# Incremental models are essential for efficiently processing large datasets by |
| 5 | +# only transforming new or changed data, rather than reprocessing the entire |
| 6 | +# dataset every time. If the target table already exists, dbt will perform a |
| 7 | +# merge based on the specified unique keys; otherwise, it will create a new |
| 8 | +# table automatically. |
| 9 | +# |
| 10 | +# This model also showcases the definition and application of a **BigFrames |
| 11 | +# User-Defined Function (UDF)** to add a descriptive summary column based on |
| 12 | +# temperature data. BigFrames UDFs allow you to execute custom Python logic |
| 13 | +# directly within BigQuery, leveraging BigQuery's scalability. |
| 14 | + |
| 15 | + |
| 16 | +import bigframes.pandas as bpd |
| 17 | + |
| 18 | +def model(dbt, session): |
| 19 | + # Optional: override settings from dbt_project.yml. |
| 20 | + # When both are set, dbt.config takes precedence over dbt_project.yml. |
| 21 | + dbt.config( |
| 22 | + # Use BigFrames mode to execute this Python model. This enables |
| 23 | + # pandas-like operations directly on BigQuery data. |
| 24 | + submission_method="bigframes", |
| 25 | + # Materialize this model as an 'incremental' table. This tells dbt to |
| 26 | + # only process new or updated data on subsequent runs. |
| 27 | + materialized='incremental', |
| 28 | + # Use MERGE strategy to update rows during incremental runs. |
| 29 | + incremental_strategy='merge', |
| 30 | + # Define the composite key that uniquely identifies a row in the |
| 31 | + # target table. This key is used by the 'merge' strategy to match |
| 32 | + # existing rows for updates during incremental runs. |
| 33 | + unique_key=["state_name", "county_name", "date_local"], |
| 34 | + ) |
| 35 | + |
| 36 | + # Reference an upstream dbt model or an existing BigQuery table as a |
| 37 | + # BigFrames DataFrame. It allows you to seamlessly use the output of another |
| 38 | + # dbt model as input to this one. |
| 39 | + df = dbt.ref("dbt_bigframes_code_sample_1") |
| 40 | + |
| 41 | + # Define a BigFrames UDF to generate a temperature description. |
| 42 | + # BigFrames UDFs allow you to define custom Python logic that executes |
| 43 | + # directly within BigQuery. This is powerful for complex transformations. |
| 44 | + @bpd.udf(dataset='dbt_sample_dataset', name='describe_udf') |
| 45 | + def describe( |
| 46 | + max_temperature: float, |
| 47 | + min_temperature: float, |
| 48 | + ) -> str: |
| 49 | + is_hot = max_temperature > 85.0 |
| 50 | + is_cold = min_temperature < 50.0 |
| 51 | + |
| 52 | + if is_hot and is_cold: |
| 53 | + return "Expect both hot and cold conditions today." |
| 54 | + if is_hot: |
| 55 | + return "Overall, it's a hot day." |
| 56 | + if is_cold: |
| 57 | + return "Overall, it's a cold day." |
| 58 | + return "Comfortable throughout the day." |
| 59 | + |
| 60 | + # Apply the UDF using combine and store the result in a column "describe". |
| 61 | + df["describe"] = df["max_temperature"].combine(df["min_temperature"], describe) |
| 62 | + |
| 63 | + # Return the transformed BigFrames DataFrame. |
| 64 | + # This DataFrame will be the final output of your incremental dbt model. |
| 65 | + # On subsequent runs, only new or changed rows will be processed and merged |
| 66 | + # into the target BigQuery table based on the `unique_key`. |
| 67 | + return df |
0 commit comments