Skip to content

help request: Multiple Buffers in "cluster-wide and namespaced FluentdConfig multi-tenant scenarios" #1461

Open
@Crazyigor1987

Description

@Crazyigor1987

Describe the issue

Hi everyone.
does somebody get fluent-operator fully working with kubernetes with mixed clusterinput, -filter, and -output written by cloudops and Filter written by Devops?

I want in fluentd something like this:

  1. Source is defined as ClusterInput by CloudOp
  2. ClusterFilter adds company related metainformation to the incoming Logs. In my example, i want to append the opensearch indexname (zzz-namespace-index).
  3. CloudOp setup a Opensearch ClusterOutput, with a Buffer Section for log persistency
  4. Developer creates his own Filter, which referes to the ClusterOutput.

This are my manifests i managed to get running so far:

apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterFluentdConfig
metadata:
  labels:
    fluentd.default.config: "true"
  name: fluentd-config
  namespace: fluent
spec:
  clusterInputSelector:
    matchLabels:
      fluentd.default.input: "true"
  clusterFilterSelector:
    matchLabels:
      fluentd.default.filter: "true"
  clusterOutputSelector:
    matchLabels:
      fluentd.default.output: "true"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: Fluentd
metadata:
  labels:
    app.kubernetes.io/name: fluentd
  name: fluentd
  namespace: fluent
spec:
  envVars:
  - name: FLUENTD_OUTPUT_LOGLEVEL
    value: debug
  - name: OPENSEARCH_USERNAME
    valueFrom:
      secretKeyRef:
        name: fluentbit-credentials
        key: username
  - name: OPENSEARCH_PASSWORD
    valueFrom:
      secretKeyRef:
        name: fluentbit-credentials
        key: password
  - name: OPENSEARCH_HOST
    value: opensearch-ingest.my-domain.org
  fluentdCfgSelector:
    matchLabels:
      fluentd.default.config: "true"
  buffer:
    pvc:
      apiVersion: v1
      kind: PersistentVolumeClaim
      spec:
        accessModes:
        - ReadWriteOnce
        resources:
          requests:
            storage: 16Gi
        storageClassName: default
  defaultFilterSelector:
    matchLabels:
      fluentd.default.filter: "true"
  defaultOutputSelector:
    matchLabels:
      fluentd.default.output: "true"
  globalInputs:
  - forward:
      bind: 0.0.0.0
      port: 24224
  image: ghcr.io/fluent/fluent-operator/fluentd:v1.17.0
  logLevel: info
  mode: collector
  positionDB: {}
  replicas: 3
  resources:
    limits:
      cpu: 1000m
      memory: 2Gi
    requests:
      cpu: 500m
      memory: 1Gi
  service: {}
  livenessProbe:
    exec:
      command: 
      - /bin/sh
      - -c
      - "[ $(du -sb /buffers/opensearch-buffer | cut -f1) -gt 94371840 ] && exit 1 || true"
  volumeMounts:
  - name: cm-opensearch-client-certs
    mountPath: /etc/opensearch-tls
  volumes:
  - name: cm-opensearch-client-certs
    secret:
      defaultMode: 420
      secretName: cm-opensearch-client-certs
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterFilter
metadata:
  name: 001-istio-proxy-0-parsing
  namespace: fluent
  labels:
    fluentd.default.filter: "true"
spec:
  filters:
    - parser:
        keyName: log
        parse:
          type: json
        reserveData: true
        removeKeyNameField: false
        hashValueField: "istio-access-log"
        emitInvalidRecordToError: false
      tag: "istio-proxy.**"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterFilter
metadata:
  name: 001-istio-proxy-1-transforming
  namespace: fluent
  labels:
    fluentd.default.filter: "true"
spec:
  filters:
    - recordTransformer:
        enableRuby: true
        records:
          - key: log
            value: "${record['kubernetes'] && record['kubernetes']['container_name'] == 'istio-proxy' && record['istio-access-log'] && !record['istio-access-log'].empty? ? nil : record['log']}"
      tag: "istio-proxy.**"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterFilter
metadata:
  name: zzz-namespace-index
  namespace: fluent
  labels:
    fluentd.default.filter: "true"
spec:
  filters:
    - recordTransformer:
        enableRuby: true
        records:
          - key: logstash_prefix
            value: "${record['namespaceindex'] or 'adm-unknown'}"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterOutput
metadata:
  labels:
    fluentd.default.output: "true"
  name: fluentd-output-opensearch
  namespace: fluent
spec:
  outputs:
  - customPlugin:
      config: |
        <match **>
          @type opensearch
          enable_ruby true

          host "#{ENV['OPENSEARCH_HOST']}"
          port 9200
          scheme http
          user "#{ENV['OPENSEARCH_USERNAME']}"
          password "#{ENV['OPENSEARCH_PASSWORD']}"

          logstash_format true
          logstash_prefix ${$.logstash_prefix}
          
          #connection settings
          request_timeout 15s
          reload_connections true
          reload_on_failure true
          resurrect_after 5s
          log_os_400_reason true

          <buffer tag, $.logstash_prefix>
            @type file
            path /buffers/opensearch-buffer
            flush_thread_count 16
            flush_interval 1s
            chunk_limit_size 90M
            overflow_action block
            queue_limit_length 16
            flush_mode interval
            retry_max_interval 30
            retry_forever true
          </buffer>
        </match>

It results to this config:


<source>
  @type  forward
  bind  0.0.0.0
  port  24224
</source>
<match **>
  @id  main
  @type  label_router
  <route>
    @label  @db681e4cb763ca5b7cdbf9ab76f67bbe
    <match>
    </match>
  </route>
</match>
<label @db681e4cb763ca5b7cdbf9ab76f67bbe>
  <filter istio-proxy.**>
    @id  ClusterFluentdConfig-cluster-fluentd-config::cluster::clusterfilter::001-istio-proxy-0-parsing-0
    @type  parser
    emit_invalid_record_to_error  false
    hash_value_field  istio-access-log
    key_name  log
    remove_key_name_field  false
    reserve_data  true
    <parse>
      @type  json
    </parse>
  </filter>
  <filter istio-proxy.**>
    @id  ClusterFluentdConfig-cluster-fluentd-config::cluster::clusterfilter::001-istio-proxy-1-transforming-0
    @type  record_transformer
    enable_ruby  true
    <record>
      log  ${record['kubernetes'] && record['kubernetes']['container_name'] == 'istio-proxy' && record['istio-access-log'] && !record['istio-access-log'].empty? ? nil : record['log']}
    </record>
  </filter>
  <filter **>
    @id  ClusterFluentdConfig-cluster-fluentd-config::cluster::clusterfilter::zzz-namespace-index-0
    @type  record_transformer
    enable_ruby  true
    <record>
      logstash_prefix  ${record['namespaceindex'] or 'adm-unknown'}
    </record>
  </filter>
  <match **>
    @type opensearch
    enable_ruby true
    host "#{ENV['OPENSEARCH_HOST']}"
    port 9200
    scheme http
    user "#{ENV['OPENSEARCH_USERNAME']}"
    password "#{ENV['OPENSEARCH_PASSWORD']}"
    logstash_format true
    logstash_prefix ${$.logstash_prefix}
    
    #connection settings
    request_timeout 15s
    reload_connections true
    reload_on_failure true
    resurrect_after 5s
    log_os_400_reason true
    <buffer tag, $.logstash_prefix>
      @type file
      path /buffers/opensearch-buffer
      flush_thread_count 16
      flush_interval 1s
      chunk_limit_size 90M
      overflow_action block
      queue_limit_length 16
      flush_mode interval
      retry_max_interval 30
      retry_forever true
    </buffer>
  </match>
</label>

So far, so good. Now i am struggling with the Developer Part. The developer should able to get his own filter running, before our clusterfilter adds the "zzz-namespace-clusterfilter". This are my test manifests for the developer part in his namespace:

apiVersion: fluentd.fluent.io/v1alpha1
kind: FluentdConfig
metadata:
  name: fluentd-clusterconfig-referrer
  namespace: development
  labels:
    fluentd.default.config: "true"
spec:
  clusterOutputSelector:
    matchLabels:
      fluentd.default.output: "true"
  filterSelector:
    matchLabels:
      my-own-namespaced-filter: "true"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: Filter
metadata:
  labels:
    my-own-namespaced-filter: "true"
  name: development-fulllog-parsing
  namespace: development
spec:
  filters:
  - grep:
      regexp:
        - key: "log"
          pattern: '\[fulllog\]'
  - parser:
      keyName: log
      reserveData: true
      removeKeyNameField: false
      injectKeyPrefix: "processed."
      emitInvalidRecordToError: false
      parse:
        type: regexp
        expression: '^(?<date>\S+)\s+(?<time>\S+)\s+(?<pid>\S+)\s+(?<logger>\S+)\s+\[(?<level>\S+)\]:\s+\[fulllog\]\s+(?<logmessage>.*)$'
  - parser:
      keyName: processed.logmessage
      parse:
        type: json
      reserveData: true
      removeKeyNameField: false
      injectKeyPrefix: "fulllog."
      emitInvalidRecordToError: false

As soon as I deploy the manifesto, fluentd stops working. As an error, i get

2025-02-03 20:58:42 +0000 [error]: config error file="/fluentd/etc/fluent.conf" error_class=Fluent::ConfigError error="Other 'opensearch' plugin already use same buffer path: type = opensearch, buffer path = /buffers/opensearch-buffer"
level=error msg="Fluentd exited" error="exit status 1"

The created config looks like this:

<source>
  @type  forward
  bind  0.0.0.0
  port  24224
</source>
  <source>
    @type prometheus
    @id in_prometheus
    bind "0.0.0.0"
    port 2021
    metrics_path "/metrics"
  </source>
  <source>
    @type prometheus_output_monitor
    interval 10
    <labels>
      hostname ${hostname}
    </labels>
  </source>
<match **>
  @id  main
  @type  label_router
  <route>
    @label  @b7d2982af63a444d6468354108b8c5f1
    <match>
      namespaces  development
    </match>
  </route>
  <route>
    @label  @db681e4cb763ca5b7cdbf9ab76f67bbe
    <match>
    </match>
  </route>
</match>
<label @b7d2982af63a444d6468354108b8c5f1>
  <filter **>
    @id  FluentdConfig-development-fluentd-clusterconfig-referrer::development::filter::development-fulllog-parsing-0
    @type  grep
    <regexp>
      key  log
      pattern  \[fulllog\]
    </regexp>
  </filter>
  <filter **>
    @id  FluentdConfig-development-fluentd-clusterconfig-referrer::development::filter::development-fulllog-parsing-1
    @type  parser
    emit_invalid_record_to_error  false
    inject_key_prefix  processed.
    key_name  log
    remove_key_name_field  false
    reserve_data  true
    <parse>
      @type  regexp
      expression  ^(?<date>\S+)\s+(?<time>\S+)\s+(?<pid>\S+)\s+(?<logger>\S+)\s+\[(?<level>\S+)\]:\s+\[fulllog\]\s+(?<logmessage>.*)$
    </parse>
  </filter>
  <filter **>
    @id  FluentdConfig-development-fluentd-clusterconfig-referrer::development::filter::development-fulllog-parsing-2
    @type  parser
    emit_invalid_record_to_error  false
    inject_key_prefix  fulllog.
    key_name  processed.logmessage
    remove_key_name_field  false
    reserve_data  true
    <parse>
      @type  json
    </parse>
  </filter>
  <match **>
    @type opensearch
    enable_ruby true
    host "#{ENV['OPENSEARCH_HOST']}"
    port 9200
    scheme http
    user "#{ENV['OPENSEARCH_USERNAME']}"
    password "#{ENV['OPENSEARCH_PASSWORD']}"
    logstash_format true
    logstash_prefix ${$.logstash_prefix}
    
    #connection settings
    request_timeout 15s
    reload_connections true
    reload_on_failure true
    resurrect_after 5s
    log_os_400_reason true
    <buffer tag, $.logstash_prefix>
      @type file
      path /buffers/opensearch-buffer
      flush_thread_count 16
      flush_interval 1s
      chunk_limit_size 90M
      overflow_action block
      queue_limit_length 16
      flush_mode interval
      retry_max_interval 30
      retry_forever true
    </buffer>
  </match>
</label>
<label @db681e4cb763ca5b7cdbf9ab76f67bbe>
  <filter istio-proxy.**>
    @id  ClusterFluentdConfig-cluster-fluentd-config::cluster::clusterfilter::001-istio-proxy-0-parsing-0
    @type  parser
    emit_invalid_record_to_error  false
    hash_value_field  istio-access-log
    key_name  log
    remove_key_name_field  false
    reserve_data  true
    <parse>
      @type  json
    </parse>
  </filter>
  <filter istio-proxy.**>
    @id  ClusterFluentdConfig-cluster-fluentd-config::cluster::clusterfilter::001-istio-proxy-1-transforming-0
    @type  record_transformer
    enable_ruby  true
    <record>
      log  ${record['kubernetes'] && record['kubernetes']['container_name'] == 'istio-proxy' && record['istio-access-log'] && !record['istio-access-log'].empty? ? nil : record['log']}
    </record>
  </filter>
  <filter **>
    @id  ClusterFluentdConfig-cluster-fluentd-config::cluster::clusterfilter::zzz-namespace-index-0
    @type  record_transformer
    enable_ruby  true
    <record>
      logstash_prefix  ${record['namespaceindex'] or 'adm-unknown'}
    </record>
  </filter>
  <match **>
    @type opensearch
    enable_ruby true
    host "#{ENV['OPENSEARCH_HOST']}"
    port 9200
    scheme http
    user "#{ENV['OPENSEARCH_USERNAME']}"
    password "#{ENV['OPENSEARCH_PASSWORD']}"
    logstash_format true
    logstash_prefix ${$.logstash_prefix}
    
    #connection settings
    request_timeout 15s
    reload_connections true
    reload_on_failure true
    resurrect_after 5s
    log_os_400_reason true
    <buffer tag, $.logstash_prefix>
      @type file
      path /buffers/opensearch-buffer
      flush_thread_count 16
      flush_interval 1s
      chunk_limit_size 90M
      overflow_action block
      queue_limit_length 16
      flush_mode interval
      retry_max_interval 30
      retry_forever true
    </buffer>
  </match>
</label>

Contrary to what I thought, it does not use the route that I had created as cloudop at the end, instead it creates a copy in its own namespace. This leads to the buffer collision.

The Question are

  • is my approach correct? If not, whats the best practice here?
    
  • How does i manage the file buffer per namespace?
    
  • How can i setup ONE global clusteroutput for logs?
    

How did you install fluent operator?

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    multi-tenantissue complaining about multi-tenant usage

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions