Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dbt/adapters/hive/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,11 @@ def log_relation(self, incremental_strategy):
"incremental_strategy": incremental_strategy,
}
)

def _render_subquery_alias(self, namespace: str) -> str:
"""Some databases require an alias for subqueries (postgres, mysql, Hive) for all others we want to avoid adding
an alias as it has the potential to introduce issues with the query if the user also defines an alias.
"""
if self.require_alias:
return f" dbt_{namespace}_subq_{self.table}"
return ""
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@

{#-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --#}
{%- set raw_file_format = config.get('file_format', default='parquet') -%}
{%- set raw_file_format = dbt_hive_validate_get_file_format(raw_file_format) -%}

{%- set incremental_strategy = config.get('incremental_strategy', default='append') -%}
{% if incremental_strategy == None %}
{% set incremental_strategy = 'append' %}
{% endif %}
{%- set incremental_strategy = dbt_hive_validate_get_incremental_strategy(incremental_strategy) -%}

{% do target_relation.log_relation(incremental_strategy) %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
{%- if strategy == 'append' -%}
{#-- insert new records into existing table, without updating or overwriting #}
{{ get_insert_into_sql(source, target, dest_columns) }}
{%- elif strategy == 'insert_overwrite' -%}
{%- elif strategy == 'insert_overwrite' or strategy == 'microbatch' -%}
{#-- insert statements don't like CTEs, so support them via a temp view #}
{{ get_insert_overwrite_sql(source, target) }}
{%- elif strategy == 'merge' -%}
Expand Down
23 changes: 19 additions & 4 deletions dbt/include/hive/macros/materializations/incremental/validate.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
{% macro dbt_hive_validate_get_file_format(raw_file_format) %}
{#-- Validate the file format #}

{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm'] %}
{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm', 'avro'] %}

{% set invalid_file_format_msg -%}
Invalid file format provided: {{ raw_file_format }}
Expand All @@ -32,12 +32,12 @@
{% endmacro %}


{% macro dbt_hive_validate_get_incremental_strategy(raw_strategy, file_format) %}
{% macro dbt_hive_validate_get_incremental_strategy(raw_strategy) %}
{#-- Validate the incremental strategy #}

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
Expected one of: 'append', 'merge', 'insert_overwrite'
Expected one of: 'append', 'merge', 'insert_overwrite', 'microbatch'
{%- endset %}

{% set invalid_insert_overwrite_endpoint_msg -%}
Expand All @@ -46,13 +46,28 @@
Use the 'append' or 'merge' strategy instead
{%- endset %}

{% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %}
{% if raw_strategy not in ['append', 'merge', 'insert_overwrite', 'microbatch'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'insert_overwrite' and target.endpoint %}
{% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %}
{% endif %}
{% endif %}

{% if raw_strategy == 'microbatch' %}
{{ validate_partition_key_for_microbatch_strategy() }}
{%- endif -%}

{% do return(raw_strategy) %}
{% endmacro %}

{% macro validate_partition_key_for_microbatch_strategy() %}
{% set microbatch_partition_key_missing_msg -%}
dbt-hive 'microbatch' incremental strategy requires a `partition_by` config.
Ensure you are using a `partition_by` column that is of granularity {{ config.get('batch_size') }}.
{%- endset %}

{%- if not config.get('partition_by') -%}
{{ exceptions.raise_compiler_error(microbatch_partition_key_missing_msg) }}
{%- endif -%}
{% endmacro %}
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
dbt-tests-adapter==1.9.*
dbt-tests-adapter==1.10.*
pre-commit~=2.21;python_version=="3.7"
pre-commit~=3.2;python_version>="3.8"
pytest
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import pytest

from dbt.tests.util import run_dbt_and_capture

from dbt.tests.adapter.incremental.test_incremental_microbatch import (
BaseMicrobatch,
patch_microbatch_end_time,
)

_input_timestamp_modified_model_sql = """
{{ config(materialized='table', event_time='event_time') }}
select 1 as id, TIMESTAMP '2020-01-01 00:00:00' as event_time
union all
select 2 as id, TIMESTAMP '2020-01-02 00:00:00' as event_time
union all
select 3 as id, TIMESTAMP '2020-01-03 00:00:00' as event_time
"""

_microbatch_uniqueid_modified_model_sql = """
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='event_time',
batch_size='day',
partition_by='date_day',
begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0))
}}
select *, to_date(event_time) as date_day from {{ ref('input_model') }}
"""
_microbatch_no_partition_by_sql = """
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='event_time',
batch_size='day',
begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0))
}}
select *, to_date(event_time) as date_day from {{ ref('input_model') }}
"""


class TestHiveMicrobatch(BaseMicrobatch):
@pytest.fixture(scope="class")
def input_model_sql(self) -> str:
return _input_timestamp_modified_model_sql

@pytest.fixture(scope="class")
def microbatch_model_sql(self) -> str:
return _microbatch_uniqueid_modified_model_sql

@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, TIMESTAMP '2020-01-04 00:00:00'), (5, TIMESTAMP '2020-01-05 00:00:00')"


class TestHiveMicroBatchNoPartitionKey:
@pytest.fixture(scope="class")
def models(self):
return {
"microbatch.sql": _microbatch_no_partition_by_sql,
"input_model.sql": _input_timestamp_modified_model_sql,
}

def test_no_partition_by(self, project):
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, stdout = run_dbt_and_capture(["run"], expect_pass=False)
assert (
"dbt-hive 'microbatch' incremental strategy requires a `partition_by` config" in stdout
)