Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions macros/create_streamline_udfs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
{% if var("UPDATE_UDFS_AND_SPS") %}
{{ create_udf_bulk_rest_api_v2() }}
{{ create_udf_bulk_decode_logs() }}
{{ create_udf_bulk_decode_traces() }}
{% endif %}
{% endmacro %}
122 changes: 114 additions & 8 deletions macros/streamline/models.sql
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ WHERE

{% macro streamline_external_table_query_v2(
model,
partition_function
partition_function,
evm_balances=False
) %}
WITH meta AS (
SELECT
Expand All @@ -107,7 +108,10 @@ WHERE
SELECT
s.*,
b.file_name,
_inserted_timestamp
b._inserted_timestamp
{% if evm_balances %}
, r.block_timestamp :: TIMESTAMP AS block_timestamp
{% endif %}
FROM
{{ source(
"bronze_streamline",
Expand All @@ -117,15 +121,21 @@ WHERE
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
{% if evm_balances %}
JOIN {{ ref('_block_ranges') }} r
ON r.block_number = COALESCE(s.VALUE :"BLOCK_NUMBER" :: INT,s.VALUE :"block_number" :: INT)
{% endif %}
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA is not null
AND DATA IS NOT NULL
{% endmacro %}

{% macro streamline_external_table_FR_query_v2(
model,
partition_function
partition_function,
partition_column="partition_key",
evm_balances=False
) %}
WITH meta AS (
SELECT
Expand All @@ -142,7 +152,10 @@ WHERE
SELECT
s.*,
b.file_name,
_inserted_timestamp
b._inserted_timestamp
{% if evm_balances %}
, r.block_timestamp :: TIMESTAMP AS block_timestamp
{% endif %}
FROM
{{ source(
"bronze_streamline",
Expand All @@ -151,9 +164,102 @@ FROM
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
AND b.partition_key = s.{{ partition_column }}
{% if evm_balances %}
JOIN {{ ref('_block_ranges') }} r
ON r.block_number = COALESCE(s.VALUE :"BLOCK_NUMBER" :: INT,s.VALUE :"block_number" :: INT)
{% endif %}
WHERE
b.partition_key = s.partition_key
b.partition_key = s.{{ partition_column }}
AND DATA :error IS NULL
AND DATA is not null
AND DATA IS NOT NULL
{% endmacro %}

{% macro streamline_external_table_query_decoder(
model
) %}

WITH meta AS (

SELECT
job_created_time AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
TO_DATE(
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
) AS _partition_by_created_date
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
block_number,
id :: STRING AS id,
DATA,
_inserted_timestamp,
s._partition_by_block_number AS _partition_by_block_number,
s._partition_by_created_date AS _partition_by_created_date
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
WHERE
b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
AND s._partition_by_created_date >= DATEADD('day', -2, CURRENT_TIMESTAMP())
AND DATA :error IS NULL
AND DATA IS NOT NULL
{% endmacro %}

{% macro streamline_external_table_FR_query_decoder(
model
) %}

WITH meta AS (

SELECT
registered_on AS _inserted_timestamp,
file_name,
CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 6), '_', 1) AS INTEGER) AS _partition_by_block_number,
TO_DATE(
concat_ws('-', SPLIT_PART(file_name, '/', 3), SPLIT_PART(file_name, '/', 4), SPLIT_PART(file_name, '/', 5))
) AS _partition_by_created_date
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
block_number,
id :: STRING AS id,
DATA,
_inserted_timestamp,
s._partition_by_block_number AS _partition_by_block_number,
s._partition_by_created_date AS _partition_by_created_date
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
WHERE
b._partition_by_block_number = s._partition_by_block_number
AND b._partition_by_created_date = s._partition_by_created_date
AND DATA :error IS NULL
AND DATA IS NOT NULL
{% endmacro %}
24 changes: 24 additions & 0 deletions macros/streamline/udfs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,30 @@
{% do adapter.execute(sql) %}
{% endmacro %}

{% macro create_udf_bulk_decode_traces() %}
{{ log("Creating udf udf_bulk_decode_traces_v2 for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }}
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}

{% set sql %}
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_traces_v2(json object) returns array api_integration =
{% if target.name == "prod" %}
{{ log("Creating prod udf_bulk_decode_traces_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces'
{% elif target.name == "dev" %}
{{ log("Creating dev udf_bulk_decode_traces_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces'
{% elif target.name == "sbx" %}
{{ log("Creating stg udf_bulk_decode_traces_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces'
{% else %}
{{ log("Creating default (dev) udf_bulk_decode_traces_v2", info=True) }}
{{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}bulk_decode_traces'
{% endif %};
{% endset %}
{{ log(sql, info=True) }}
{% do adapter.execute(sql) %}
{% endmacro %}

{% macro create_aws_api_integrations() %}
{{ log("Creating api integration for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }}
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}
Expand Down
43 changes: 43 additions & 0 deletions macros/streamline/utils.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,47 @@
SELECT
NULL
{% endif %}
{% endmacro %}

{% macro if_data_call_wait() %}
{% if var(
"STREAMLINE_INVOKE_STREAMS"
) %}
{% set query %}
SELECT
1
WHERE
EXISTS(
SELECT
1
FROM
{{ model.schema ~ "." ~ model.alias }}
LIMIT
1
) {% endset %}
{% if execute %}
{% set results = run_query(
query
) %}
{% if results %}
{{ log(
"Waiting...",
info = True
) }}

{% set wait_query %}
SELECT
system$wait(
{{ var(
"WAIT",
400
) }}
) {% endset %}
{% do run_query(wait_query) %}
{% else %}
SELECT
NULL;
{% endif %}
{% endif %}
{% endif %}
{% endmacro %}