Skip to content

Conversation

@rauanmayemir
Copy link
Contributor

When user specifies Content-Type: application/json, handler will attempt to parse logsData from JSON instead of assuming it's a raw protobuf payload. Fixes #580.

This PR also includes a fix for panic error due to CNPG output not containing "kubernetes" section.
K8s pod/namespace information is not part of a record anyway as it only contains what CNPG container logs into stderr.

If we want to act on kubernetes metadata like k8s namespace, pod name, labels, it likely needs to be in ResourceLogs[].Resource.

@lfittl I would like to include native support for more kinds of logs from https://cloudnative-pg.io/documentation/1.26/logging/#other-logs, e.g wal-archive in particular. It would give great visibility into issues like failure to upload wal. Seeking guidance on how to approach this best.

image

@rauanmayemir
Copy link
Contributor Author

The ultimate working setup with this PR. Would be great to have this added to the docs.

Source

Vector has to run as privileged DaemonSet with hostPath /var/log/pods mounted.

# sources/kubernetes_logs.toml
type = "kubernetes_logs"

Transform

I run one pganalyze-collector instance per cluster, so I have additional routing and filtering. But for simplicity, we assume there is only one collector instance.

First we use route transformation to split out CNPG traffic.

# transforms/cnpg_route.toml
type = "route"
inputs = ["kubernetes_logs"]
reroute_unmatched = false
route.cnpg = '''
  exists(.kubernetes.pod_labels."cnpg.io/cluster") &&
  exists(.kubernetes.pod_labels."cnpg.io/podRole") &&
  .kubernetes.pod_labels."cnpg.io/podRole" == "instance" &&
  ## you can optionally filter out replicas if you only care about primary server
  # exists(.kubernetes.pod_labels."cnpg.io/instanceRole") &&
  # .kubernetes.pod_labels."cnpg.io/instanceRole" == "primary" &&
  ## ingest only the clusters you want
  includes(
    [
      "cluster-name1",
      "cluster-name2",
    ],
    .kubernetes.pod_labels."cnpg.io/cluster"
  )
'''

Now we remap our logs to opentelemetry format.

# transforms/cnpg_remap.toml
type = "remap"
inputs = ["cnpg_route"]
source = '''
    structured = parse_json!(.message)

    severity_text = if includes(["emerg", "err", "crit", "alert"], structured.level) {
        "ERROR"
      } else if structured.level == "warning" {
        "WARN"
      } else if structured.level == "debug" {
        "DEBUG"
      } else if includes(["info", "notice"], structured.level) {
        "INFO"
      } else {
        structured.level
      }

    structuredLogger = del(structured.logger)
    structuredLevel = del(structured.level)

    bodyValues = [
      {"key": "logger", "value": {"stringValue": structuredLogger }},
      {"key": "error_severity", "value": {"stringValue": structuredLevel}},
    ]

    if structuredLogger == "postgres" {
      recordValues = []
      for_each(zip(keys!(structured.record), values!(structured.record))) -> |_index, values| {
        recordValues = push(recordValues, {
          "key": values[0],
          "value": {"stringValue": values[1]}
        })
      }

      bodyValues = push(bodyValues, {
        "key": "record",
        "value": { "kvlistValue": {"values": recordValues}}
      })
    } else {
      for_each(zip(keys!(structured), values!(structured))) -> |_index, values| {
        stringValue = if is_string(values[1]) {
            values[1]
          } else {
            encode_json(values[1])
          }

        bodyValues = push(bodyValues, {
          "key": values[0],
          "value": {"stringValue": stringValue}
        })
      }
    }

    output = {
      "resource": {
        "attributes": [
          {"key": "source_type", "value": {"stringValue": .source_type }},
          {"key": "service.name", "value": {"stringValue": .kubernetes.pod_labels."cnpg.io/cluster" }},
          {"key": "host.hostname", "value": {"stringValue": .kubernetes.pod_name }}
        ]
      },
      "scopeLogs": [{
        "logRecords": [{
          "timeUnixNano": to_unix_timestamp(parse_timestamp!(.timestamp, "%+"), unit: "nanoseconds"),
          "observedTimeUnixNano": to_unix_timestamp(parse_timestamp!(structured.ts, "%+"), unit: "nanoseconds"),
          "body": {"kvlistValue": {"values": bodyValues}},
          "severityText": severity_text
        }]
      }]
    }

    . = output
'''

Sink

Send our logs to collector in batches.

# sinks/pganalyze.toml
type = "opentelemetry"
inputs = [ "cnpg_remap" ]
protocol.type = "http"
protocol.uri = "http://pganalyze-collector.namespace.svc.cluster.local:4318/v1/logs"
protocol.method = "post"
protocol.encoding.codec = "json"
protocol.framing.method = "character_delimited"
protocol.framing.character_delimited.delimiter = ','
protocol.payload_prefix = '{"resourceLogs":'
protocol.payload_suffix = '}'
 # choose appropriate batch size and timeout values
protocol.batch.max_events = 50
protocol.batch.timeout_secs = 5
protocol.request.headers.content-type = "application/json"

Copy link

@vfoucault vfoucault left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉 I struggled to debug / make it work without touching the collector's code. Thanks for that. Once I've found your issue I was relieved that one's has already done it.

Comment on lines +79 to +82
if kubernetes == nil {
return false
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd personally keep that away form skipDueToK8sFilter which might be missleading, because this function is only about filtering out results and not assessing payload's contents.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? I'm only guarding against kubernetes *common.KeyValueList which could be nil, but the function doesn't seem to expect it to be nil so it panic crashes..

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like

if kubernetes != nil && skipDueToK8sFilter(kubernetes, server, prefixedLogger) {
	continue
}

This is just a style comment where to leave the skipDueToK8sFilter doing only what it is meant to be. No big deal here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually didn't end up including this in my open PR for other OTEL things, but I did originally have this and also prefer failing early here rather than inside of the function for what it's worth.

https://github.com/pganalyze/collector/pull/719/files#diff-9b41a22ed8cdaeb1a5377b0a3245b598b04071bfda5edd565de5707b5b5cef24R170

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should kubernetes then still be a reference instead of concrete type?

switch contentType {
case "application/json":
if err := protojson.Unmarshal(b, logsData); err != nil {
prefixedLogger.PrintError("Could not unmarshal otel body, json expected")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we add here the err.Error() for clarity. In my experience it was hard to understand what was the issue without running locally the patched collector.

Of course, protobuf's error wire protocol error wouldn't help much though.

@vfoucault
Copy link

Hi there, is there any news about this ?

@rauanmayemir
Copy link
Contributor Author

Bummed to see all suggestions being totally ignored.

@ryanbooz
Copy link
Contributor

ryanbooz commented Sep 8, 2025

Bummed to see all suggestions being totally ignored.

Hi @rauanmayemir - Your suggestions are not being ignored, but as a small team with a lot of competing priorities, some things take longer an hoped. It's really great to see others contributing to this open-source project and I'm thankful that you can still build your own instance if necessary to serve your purposes!

That said, this PR has actually been on my list of items to get setup and verify for a few weeks, as Vector isn't something we've used as a team before. With summer-end holidays and recent travel to support the wider Postgres community at conferences, it taking longer than expected.

There were a few other priority PRs that touched the same code areas, so it would be helpful if you were to rebase and update this PR to move it forward. Thanks!

@rauanmayemir
Copy link
Contributor Author

Thank you for clarifying, this puts me at ease. I'll rebase and get rid of the conflicts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

OTEL Collector: allow plain json payloads

3 participants