Skip to content

feat(outputs.snowpipe_streaming): Add plugin#18721

Open
aleksclark wants to merge 5 commits intoinfluxdata:masterfrom
aleksclark:feat/snowpipe-output
Open

feat(outputs.snowpipe_streaming): Add plugin#18721
aleksclark wants to merge 5 commits intoinfluxdata:masterfrom
aleksclark:feat/snowpipe-output

Conversation

@aleksclark
Copy link
Copy Markdown

@aleksclark aleksclark commented Apr 15, 2026

Summary

Add a dedicated Snowflake Snowpipe Streaming output plugin for low-latency, high-throughput metric ingestion directly into Snowflake tables.

The existing SQL output plugin supports Snowflake via gosnowflake, but it is a generic SQL driver — no batch optimization, no schema management, no Snowpipe Streaming awareness. This dedicated plugin provides production-grade Snowflake ingestion with:

  • Efficient batch inserts using gosnowflake array binding (leverages Snowpipe Streaming internally)
  • Template-based table routing (metrics_{{.Name}})
  • Auto table creation and schema evolution
  • Retry with exponential backoff
  • NaN/Inf sanitization

Features

Feature Description
Key-pair auth RSA private key authentication (Snowflake best practice)
Table routing Go templates for metric-name-based table routing
Batch inserts Configurable batch size with multi-row INSERT VALUES
Retry logic Exponential backoff on transient errors
Column filtering Select which tags/fields to include
Auto DDL Create tables and add columns automatically
Schema cache Configurable TTL to avoid repeated DESCRIBE TABLE calls
Data sanitization NaN/Inf to NULL conversion
Thread safety Safe for concurrent writes

Configuration

[[outputs.snowpipe_streaming]]
  account = "myaccount"
  user = "myuser"
  private_key_path = "/etc/telegraf/snowflake_rsa_key.p8"
  database = "METRICS_DB"
  schema = "PUBLIC"
  table = "metrics_{{.Name}}"
  batch_size = 1000
  create_table = true

Tests

  • 7 unit tests: config validation, metric-to-row conversion, table templates, batching, retry logic, column filtering, NaN handling
  • 7 integration tests with mocked Snowflake driver: full write cycle, table creation, schema evolution, batch retry, concurrent writes, connection errors

Dependencies

Uses gosnowflake which is already in go.mod (used by the SQL output plugin). No new dependencies.

Files Added

  • plugins/outputs/snowpipe_streaming/snowpipe_streaming.go (595 lines)
  • plugins/outputs/snowpipe_streaming/snowpipe_streaming_test.go (785 lines)
  • plugins/outputs/snowpipe_streaming/sample.conf
  • plugins/outputs/snowpipe_streaming/README.md (197 lines)
  • plugins/outputs/all/snowpipe_streaming.go (registration)

Checklist

Related issues

resolves #18735

Add a dedicated Snowflake output plugin that streams metrics directly into
Snowflake tables using gosnowflake's efficient batch inserts with array
binding, which leverages Snowpipe Streaming internally.

Features:
- Key-pair authentication via RSA private keys
- Go template-based table routing (e.g. metrics_{{.Name}})
- Configurable batch size with multi-row INSERT VALUES
- Exponential backoff retry on transient errors
- Tag/field column filtering
- Auto table creation and schema evolution (ALTER TABLE ADD COLUMN)
- Table schema caching with configurable TTL
- NaN/Inf sanitization to NULL
- Thread-safe concurrent writes

🐮 Generated with Crush

Assisted-by: AWS Claude Opus 4.6 via Crush <crush@charm.land>
@telegraf-tiger
Copy link
Copy Markdown
Contributor

Thanks so much for the pull request!
🤝 ✒️ Just a reminder that the CLA has not yet been signed, and we'll need it before merging. Please sign the CLA when you get a chance, then post a comment here saying !signed-cla

@telegraf-tiger telegraf-tiger Bot added feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin plugin/output 1. Request for new output plugins 2. Issues/PRs that are related to out plugins labels Apr 15, 2026
@aleksclark
Copy link
Copy Markdown
Author

!signed-cla

🐮 Generated with Crush

Assisted-by: AWS Claude Opus 4.6 via Crush <crush@charm.land>
🐾 Generated with Crush

Assisted-by: AWS Claude Opus 4.6 via Crush <crush@charm.land>
- Omit unused receiver names instead of using underscore
- Remove stale //nolint:revive directives
- Add //nolint:gosec for G201 (quoteIdent sanitises identifier)
- Check s.Write error in goroutine with assert.NoError
@telegraf-tiger
Copy link
Copy Markdown
Contributor

@srebhan
Copy link
Copy Markdown
Member

srebhan commented Apr 17, 2026

@aleksclark can you please restore the PR description template, especially the AI section, as we cannot review your PR otherwise!

@srebhan srebhan added the ready for final review This pull request has been reviewed and/or tested by multiple users and is ready for a final review. label Apr 17, 2026
@aleksclark
Copy link
Copy Markdown
Author

@srebhan sure thing, sorry about that!

@srebhan srebhan removed the ready for final review This pull request has been reviewed and/or tested by multiple users and is ready for a final review. label Apr 17, 2026
@srebhan srebhan changed the title feat(outputs.snowpipe_streaming): add Snowpipe Streaming output plugin feat(outputs.snowpipe_streaming): Add plugin Apr 20, 2026
Copy link
Copy Markdown
Member

@srebhan srebhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution @aleksclark! I started reviewing and added some comments but the real question is why this plugin is needed.
There already is a SQL output plugin and I don't see any reason on why you can't add Snowpipe support there!?

[snowflake]: https://www.snowflake.com/
[gosnowflake]: https://github.com/snowflakedb/gosnowflake

⭐ Telegraf v1.35.0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is

Suggested change
⭐ Telegraf v1.35.0
⭐ Telegraf v1.39.0

;-)

Comment on lines +4 to +6
inserts via the [gosnowflake][gosnowflake] driver with array binding, which
leverages Snowpipe Streaming internally for low-latency, high-throughput
ingest without staging files.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are implementation / developer-targeted oriented details so we should remove them as this documentation is user-focused.

Suggested change
inserts via the [gosnowflake][gosnowflake] driver with array binding, which
leverages Snowpipe Streaming internally for low-latency, high-throughput
ingest without staging files.
inserts.

Comment on lines +15 to +33
## Prerequisites

1. A Snowflake account with a database and schema already created.
2. Key-pair authentication configured for the Snowflake user:
- Generate an RSA key pair:

```bash
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
```

- Assign the public key to the user:

```sql
ALTER USER my_user SET RSA_PUBLIC_KEY='<public key contents>';
```

3. The user must have INSERT privileges on the target table(s).
4. If `create_table = true`, the user must also have CREATE TABLE privileges.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please cut this down to really Telegraf related details. For the key-pair generation please link to the upstream documentation as nobody will maintain this here if things change in the database.

3. The user must have INSERT privileges on the target table(s).
4. If `create_table = true`, the user must also have CREATE TABLE privileges.

## Global configuration options
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep the inclusion "comment" which is used to auto-insert the global configuration section so we can change it in one common place

Suggested change
## Global configuration options
## Global configuration options <!-- @/docs/includes/plugin_config.md -->

## {{.Name}} - metric name
## {{.Tag "key"}} - tag value
## Example: "metrics_{{.Name}}" routes each metric name to a separate table
table = ""
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please have the metric name as default here?


### NaN/Inf field values

Fields containing NaN or Inf float values are inserted as NULL to avoid
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Fields containing NaN or Inf float values are inserted as NULL to avoid
Fields containing `NaN` or infinite float values are inserted as `NULL` to avoid

@@ -0,0 +1,588 @@
//go:generate ../../../tools/readme_config_includer/generator
package snowpipe_streaming
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just naming this snowpipe?

import (
"crypto/rsa"
"crypto/x509"
gosql "database/sql"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this alias?

Comment on lines +88 to +95
if strings.Contains(s.Table, "{{") {
tmpl, err := template.New("table").Parse(s.Table)
if err != nil {
return fmt.Errorf("parsing table template: %w", err)
}
s.tableTmpl = tmpl
s.tableHasTmpl = true
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you use this fragile setup? Wouldn't it be better to always use the template?

Or even better avoid this altogether and always use the metric name and let the user adapt this using processor plugins.

Comment on lines +31 to +32
## Number of rows per insert batch
# batch_size = 1000
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metric_batch_size is a global setting that should determine the batch-size instead of adding another parameter!

@srebhan srebhan self-assigned this Apr 20, 2026
@srebhan srebhan added the waiting for response waiting for response from contributor label Apr 22, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feat Improvement on an existing feature such as adding a new setting/mode to an existing plugin plugin/output 1. Request for new output plugins 2. Issues/PRs that are related to out plugins waiting for response waiting for response from contributor

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(outputs): add dedicated Snowpipe Streaming output plugin for Snowflake

2 participants