Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b339532
initial generate code
derrickaw Nov 13, 2025
1fccd9b
update yaml to jive with java template file
derrickaw Nov 13, 2025
3b64df9
add java template file
derrickaw Nov 13, 2025
3549765
update generate file
derrickaw Nov 13, 2025
996004d
add new workflow for yaml blueprint to java template
derrickaw Nov 13, 2025
457ba94
add the resulting changes to the KafkaToBigQueryYaml.java file that w…
derrickaw Nov 13, 2025
0d50633
with mvn spotless
derrickaw Nov 14, 2025
f660460
remove mvn spotless
derrickaw Nov 14, 2025
148006f
update template
derrickaw Nov 14, 2025
83437e8
update a different template :)
derrickaw Nov 14, 2025
455936e
add distribution for jdk
derrickaw Nov 14, 2025
893cea5
spotless apply on yaml template
derrickaw Nov 14, 2025
7797e73
make spotless run only on yaml module
derrickaw Nov 14, 2025
b477955
add -B
derrickaw Nov 14, 2025
fdc8a8e
add more comments
derrickaw Nov 14, 2025
2ef9170
remove redundant workflow step since changed script to handle both di…
derrickaw Nov 17, 2025
4d772af
remove workflow for now - will revisit on next iteration
derrickaw Nov 18, 2025
e741603
add note abote spotless
derrickaw Nov 18, 2025
b007e38
add generic readme on how to add new templates
derrickaw Nov 18, 2025
ded6680
update readme per comments
derrickaw Nov 19, 2025
437942b
update generate script per comments
derrickaw Nov 19, 2025
5f5e779
update to handle any test.py files
derrickaw Dec 3, 2025
7721fd0
move files to test location
derrickaw Dec 3, 2025
10988af
__init__.py to allow import for test file
derrickaw Dec 3, 2025
01fe746
forgot input yaml file for unit test
derrickaw Dec 3, 2025
906e80e
update readme based on updated path
derrickaw Dec 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions yaml/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Creating a YAML Template

This document outlines the steps required to create a new YAML template for Dataflow.
More detailed instructions on how to run the template can be found in the associated README files for each template.

## Steps

1. **Contribution Instructions:** Read the code contributions located [here](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/contributor-docs/code-contributions.md).

1. **Add YAML Blueprint:** Create the YAML blueprint file that defines the template's structure and parameters.
Place this file in [here](https://github.com/GoogleCloudPlatform/DataflowTemplates/yaml/src/main/yaml).

1. **Generate YAML Template (Optional):** If desired, create the YAML template using the generation script.

```shell
python yaml/src/main/python/generate_yaml_java_templates.py yaml/src/main/yaml
```

1. **Create Integration Test:** Develop an integration test to validate the functionality of the new template.
Place this file in [here](https://github.com/GoogleCloudPlatform/DataflowTemplates/yaml/src/test/java).
Additional instructions can be located [here](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/contributor-docs/add-integration-or-load-test.md).

1. **Generate Documentation:** Run the documentation generation script to create the official documentation for the template. You can find instructions on how to do this in the [contributor documentation](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/contributor-docs/code-contributions.md#generated-documentation).
6 changes: 5 additions & 1 deletion yaml/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@
<arguments>
<argument>-m</argument>
<argument>unittest</argument>
<argument>src/test/python/yaml_syntax_test.py</argument>
<argument>discover</argument>
<argument>-s</argument>
<argument>src/test/python</argument>
<argument>-p</argument>
<argument>*_test.py</argument>
</arguments>
</configuration>
</execution>
Expand Down
Empty file added yaml/src/__init__.py
Empty file.
Empty file added yaml/src/main/__init__.py
Empty file.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2024 Google LLC
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
Expand Down Expand Up @@ -27,9 +27,7 @@
type = Template.TemplateType.YAML,
displayName = "Kafka to BigQuery (YAML)",
description =
"The Apache Kafka to BigQuery template is a streaming pipeline which ingests text data from Apache Kafka, executes a user-defined function (UDF), and outputs the resulting records to BigQuery. "
+ "Any errors which occur in the transformation of the data, execution of the UDF, or inserting into the output table are inserted into a separate errors table in BigQuery. "
+ "If the errors table does not exist prior to execution, then it is created.",
"The Apache Kafka to BigQuery template is a streaming pipeline which ingests text data from Apache Kafka, executes a user-defined function (UDF), and outputs the resulting records to BigQuery. Any errors which occur in the transformation of the data, execution of the UDF, or inserting into the output table are inserted into a separate errors table in BigQuery. If the errors table does not exist prior to execution, then it is created.",
flexContainerName = "kafka-to-bigquery-yaml",
yamlTemplateFile = "KafkaToBigQuery.yaml",
filesToCopy = {"template.yaml", "main.py", "requirements.txt"},
Expand All @@ -44,14 +42,13 @@
streaming = true,
hidden = false)
public interface KafkaToBigQueryYaml {

@TemplateParameter.Text(
order = 1,
name = "readBootstrapServers",
optional = false,
description = "Kafka Bootstrap Server list",
helpText =
"Kafka Bootstrap Server list, separated by commas. This "
+ "parameter should be provided either through this parameter or jinjaVariables.",
helpText = "Kafka Bootstrap Server list, separated by commas.",
example = "localhost:9092,127.0.0.1:9093")
@Validation.Required
String getReadBootstrapServers();
Expand All @@ -60,10 +57,8 @@ public interface KafkaToBigQueryYaml {
order = 2,
name = "kafkaReadTopics",
optional = false,
description = "Kafka topic(s) to read input from.",
helpText =
"Kafka topic(s) to read input from. This parameter should be "
+ "provided either through this parameter or jinjaVariables.\",",
description = "Kafka topic(s) to read the input from.",
helpText = "Kafka topic(s) to read the input from.",
example = "topic1,topic2")
@Validation.Required
String getKafkaReadTopics();
Expand All @@ -74,9 +69,8 @@ public interface KafkaToBigQueryYaml {
optional = false,
description = "BigQuery output table",
helpText =
"BigQuery table location to write the output to. The name should be in the format "
+ "`<project>:<dataset>.<table_name>`. The table's schema must match input objects."
+ "This parameter should be provided either through this parameter or jinjaVariables.")
"BigQuery table location to write the output to. The name should be in the format <project>:<dataset>.<table_name>`. The table's schema must match input objects.",
example = "")
@Validation.Required
String getOutputTableSpec();

Expand All @@ -86,10 +80,7 @@ public interface KafkaToBigQueryYaml {
optional = false,
description = "The dead-letter table name to output failed messages to BigQuery",
helpText =
"BigQuery table for failed messages. Messages failed to reach the output table for different reasons "
+ "(e.g., mismatched schema, malformed json) are written to this table. If it doesn't exist, it will"
+ " be created during pipeline execution. If not specified, \"outputTableSpec_error_records\" is used instead."
+ "This parameter should be provided either through this parameter or jinjaVariables.",
"BigQuery table for failed messages. Messages failed to reach the output table for different reasons (e.g., mismatched schema, malformed json) are written to this table. If it doesn't exist, it will be created during pipeline execution. If not specified, 'outputTableSpec_error_records' is used instead. The dead-letter table name to output failed messages to BigQuery.",
example = "your-project-id:your-dataset.your-table-name")
@Validation.Required
String getOutputDeadletterTable();
Expand All @@ -98,10 +89,9 @@ public interface KafkaToBigQueryYaml {
order = 5,
name = "messageFormat",
optional = true,
description = "The message format",
helpText =
"The message format. One of: AVRO, JSON, PROTO, RAW, or STRING."
+ "This parameter should be provided either through this parameter or jinjaVariables.\",")
description = "The message format.",
helpText = "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING.",
example = "")
@Default.String("JSON")
String getMessageFormat();

Expand All @@ -110,7 +100,9 @@ public interface KafkaToBigQueryYaml {
name = "schema",
optional = false,
description = "Kafka schema.",
helpText = "Kafka schema. A schema is required if data format is JSON, AVRO or PROTO.")
helpText = "Kafka schema. A schema is required if data format is JSON, AVRO or PROTO.",
example = "")
@Validation.Required
String getSchema();

@TemplateParameter.Integer(
Expand All @@ -119,11 +111,8 @@ public interface KafkaToBigQueryYaml {
optional = true,
description = "Number of streams for BigQuery Storage Write API",
helpText =
"Number of streams defines the parallelism of the BigQueryIO’s Write transform and"
+ " roughly corresponds to the number of Storage Write API’s streams which will be"
+ " used by the pipeline. See"
+ " https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api"
+ " for the recommended values. The default value is 1.")
"Number of streams defines the parallelism of the BigQueryIO’s Write transform and roughly corresponds to the number of Storage Write API’s streams which will be used by the pipeline. See https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api for the recommended values. The default value is 1.",
example = "")
@Default.Integer(1)
Integer getNumStorageWriteApiStreams();

Expand All @@ -133,10 +122,8 @@ public interface KafkaToBigQueryYaml {
optional = true,
description = "Triggering frequency in seconds for BigQuery Storage Write API",
helpText =
"Triggering frequency will determine how soon the data will be visible for querying in"
+ " BigQuery. See"
+ " https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api"
+ " for the recommended values. The default value is 5.")
"Triggering frequency will determine how soon the data will be visible for querying in BigQuery. See https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api for the recommended values. The default value is 5.",
example = "")
@Default.Integer(5)
Integer getStorageWriteApiTriggeringFrequencySec();
}
Empty file.
Loading