Skip to content

[DON'T MERGE] Add new datastreams to Kafka Integration #14417

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions packages/kafka/_dev/build/docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ This integration collects logs and metrics from [Kafka](https://kafka.apache.org

The `log` dataset is tested with logs from Kafka 0.9, 1.1.0 and 2.0.0.

The `broker`, `consumergroup`, `partition` datastreams are tested with Kafka 0.10.2.1, 1.1.0, 2.1.1, 2.2.2 and 3.6.0.
The `broker`, `consumergroup`, `partition`, `jvm`, `network`, `logmanager`, `replicamanager` datastreams are tested with Kafka 0.10.2.1, 1.1.0, 2.1.1, 2.2.2 and 3.6.0.

The `broker` metricset requires Jolokia to fetch JMX metrics. Refer to the Metricbeat documentation about Jolokia for more information.
The `broker`, `jvm`, `network`, `logmanager`, and `replicamanager` metricsets require Jolokia to fetch JMX metrics. Refer to the Metricbeat documentation about Jolokia for more information.

## Logs

Expand Down Expand Up @@ -54,4 +54,52 @@ Please refer to the following [document](https://www.elastic.co/guide/en/ecs/cur

Please refer to the following [document](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) for detailed information on ECS fields.

{{fields "partition"}}
{{fields "partition"}}

### jvm

The `jvm` dataset collects JVM metrics from Kafka brokers using Jolokia. This includes information about buffer pools, class loading, compilation, garbage collection, memory usage, memory pools, runtime, and threading.

{{event "jvm"}}

**ECS Field Reference**

Please refer to the following [document](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) for detailed information on ECS fields.

{{fields "jvm"}}

### network

The `network` dataset collects network metrics from Kafka brokers using Jolokia. This includes information about network acceptors, processors, request channels, request metrics, and socket servers.

{{event "network"}}

**ECS Field Reference**

Please refer to the following [document](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) for detailed information on ECS fields.

{{fields "network"}}

### logmanager

The `logmanager` dataset collects log management metrics from Kafka brokers using Jolokia. This includes information about log segments, log cleaners, log cleaner managers, log flush statistics, and log managers.

{{event "logmanager"}}

**ECS Field Reference**

Please refer to the following [document](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) for detailed information on ECS fields.

{{fields "logmanager"}}

### replicamanager

The `replicamanager` dataset collects replica management metrics from Kafka brokers using Jolokia. This includes information about ISR (In-Sync Replicas), partition counts, leader replicas, offline replicas, and reassignment operations.

{{event "replicamanager"}}

**ECS Field Reference**

Please refer to the following [document](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) for detailed information on ECS fields.

{{fields "replicamanager"}}
5 changes: 5 additions & 0 deletions packages/kafka/changelog.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
# newer versions go on top
- version: "1.19.0"
changes:
- description: Add JVM, Network, LogManager, and ReplicaManager metrics data streams for Kafka via JMX/Jolokia
type: enhancement
link: https://github.com/elastic/integrations/pull/14417
- version: "1.18.4"
changes:
- description: Update supported kafka versions in README.
Expand Down
184 changes: 184 additions & 0 deletions packages/kafka/data_stream/jvm/agent/stream/stream.yml.hbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
metricsets: ["jmx"]
period: {{period}}
http_method: "GET"
hosts:
{{#each hosts}}
- {{this}}
{{/each}}
namespace: {{namespace}}
{{#if username}}
username: {{username}}
{{/if}}
{{#if password}}
password: {{password}}
{{/if}}
{{#if processors}}
processors:
{{processors}}
{{/if}}
jmx.mappings:
# Buffer Pool MBeans
- mbean: 'java.nio:type=BufferPool,name=direct'
attributes:
- attr: UsedMemory
field: buffer_pool.used.bytes
- attr: TotalCapacity
field: buffer_pool.capacity.bytes
- attr: Count
field: buffer_pool.count
- mbean: 'java.nio:type=BufferPool,name=mapped'
attributes:
- attr: UsedMemory
field: buffer_pool.used.bytes
- attr: TotalCapacity
field: buffer_pool.capacity.bytes
- attr: Count
field: buffer_pool.count

# Class Loading MBeans
- mbean: 'java.lang:type=ClassLoading'
attributes:
- attr: LoadedClassCount
field: classes.loaded.current
- attr: TotalLoadedClassCount
field: classes.loaded.total
- attr: UnloadedClassCount
field: classes.unloaded.total

# Compilation MBeans
- mbean: 'java.lang:type=Compilation'
attributes:
- attr: TotalCompilationTime
field: compilation.time.ms

# Garbage Collection MBeans
- mbean: 'java.lang:type=GarbageCollector,name=G1 Young Generation'
attributes:
- attr: CollectionCount
field: gc.collection.count
- attr: CollectionTime
field: gc.collection.time.ms
- mbean: 'java.lang:type=GarbageCollector,name=G1 Old Generation'
attributes:
- attr: CollectionCount
field: gc.collection.count
- attr: CollectionTime
field: gc.collection.time.ms
- mbean: 'java.lang:type=GarbageCollector,name=ParNew'
attributes:
- attr: CollectionCount
field: gc.collection.count
- attr: CollectionTime
field: gc.collection.time.ms
- mbean: 'java.lang:type=GarbageCollector,name=ConcurrentMarkSweep'
attributes:
- attr: CollectionCount
field: gc.collection.count
- attr: CollectionTime
field: gc.collection.time.ms

# Memory MBeans
- mbean: 'java.lang:type=Memory'
attributes:
- attr: HeapMemoryUsage.committed
field: memory.heap.committed
- attr: HeapMemoryUsage.init
field: memory.heap.init
- attr: HeapMemoryUsage.max
field: memory.heap.max
- attr: HeapMemoryUsage.used
field: memory.heap.used
- attr: NonHeapMemoryUsage.committed
field: memory.non_heap.committed
- attr: NonHeapMemoryUsage.init
field: memory.non_heap.init
- attr: NonHeapMemoryUsage.max
field: memory.non_heap.max
- attr: NonHeapMemoryUsage.used
field: memory.non_heap.used
Comment on lines +80 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Screenshot 2025-07-07 at 16 22 36

Copy link
Contributor

@mykola-elastic mykola-elastic Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured out that I am able to access those subobjects in the browser via URLs:

but for some reason it doesn't work if I put that in JMX mappings (using slash in Mbean)

- attr: ObjectPendingFinalizationCount
field: memory.objects_pending_finalization
Comment on lines +80 to +100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way of getting nested params worked for me:

Suggested change
# Memory MBeans
- mbean: 'java.lang:type=Memory'
attributes:
- attr: HeapMemoryUsage.committed
field: memory.heap.committed
- attr: HeapMemoryUsage.init
field: memory.heap.init
- attr: HeapMemoryUsage.max
field: memory.heap.max
- attr: HeapMemoryUsage.used
field: memory.heap.used
- attr: NonHeapMemoryUsage.committed
field: memory.non_heap.committed
- attr: NonHeapMemoryUsage.init
field: memory.non_heap.init
- attr: NonHeapMemoryUsage.max
field: memory.non_heap.max
- attr: NonHeapMemoryUsage.used
field: memory.non_heap.used
- attr: ObjectPendingFinalizationCount
field: memory.objects_pending_finalization
# Memory MBeans
- mbean: 'java.lang:type=Memory'
attributes:
- attr: HeapMemoryUsage
field: memory.heap
paths:
committed: "heap_memory.committed"
init: "heap_memory.init"
max: "heap_memory.max"
used: "heap_memory.used"
- attr: NonHeapMemoryUsage
field: memory.non_heap
paths:
committed: "non_heap_memory.committed"
init: "non_heap_memory.init"
max: "non_heap_memory.max"
used: "non_heap_memory.used"
- attr: ObjectPendingFinalizationCount
field: memory.objects_pending_finalization


# Memory Pool MBeans
- mbean: 'java.lang:type=MemoryPool,name=Eden Space'
attributes:
- attr: Usage.committed
field: memory_pool.usage.committed
- attr: Usage.init
field: memory_pool.usage.init
- attr: Usage.max
field: memory_pool.usage.max
- attr: Usage.used
field: memory_pool.usage.used
- attr: CollectionUsage.committed
field: memory_pool.collection_usage.committed
- attr: CollectionUsage.init
field: memory_pool.collection_usage.init
- attr: CollectionUsage.max
field: memory_pool.collection_usage.max
- attr: CollectionUsage.used
field: memory_pool.collection_usage.used
- mbean: 'java.lang:type=MemoryPool,name=Survivor Space'
attributes:
- attr: Usage.committed
field: memory_pool.usage.committed
- attr: Usage.init
field: memory_pool.usage.init
- attr: Usage.max
field: memory_pool.usage.max
- attr: Usage.used
field: memory_pool.usage.used
- attr: CollectionUsage.committed
field: memory_pool.collection_usage.committed
- attr: CollectionUsage.init
field: memory_pool.collection_usage.init
- attr: CollectionUsage.max
field: memory_pool.collection_usage.max
- attr: CollectionUsage.used
field: memory_pool.collection_usage.used
- mbean: 'java.lang:type=MemoryPool,name=Tenured Gen'
attributes:
- attr: Usage.committed
field: memory_pool.usage.committed
- attr: Usage.init
field: memory_pool.usage.init
- attr: Usage.max
field: memory_pool.usage.max
- attr: Usage.used
field: memory_pool.usage.used
- attr: CollectionUsage.committed
field: memory_pool.collection_usage.committed
- attr: CollectionUsage.init
field: memory_pool.collection_usage.init
- attr: CollectionUsage.max
field: memory_pool.collection_usage.max
- attr: CollectionUsage.used
field: memory_pool.collection_usage.used

# Runtime MBeans
- mbean: 'java.lang:type=Runtime'
attributes:
- attr: VmName
field: runtime.name
- attr: VmVendor
field: runtime.vendor
- attr: VmVersion
field: runtime.version
- attr: SpecName
field: runtime.spec.name
- attr: SpecVendor
field: runtime.spec.vendor
- attr: SpecVersion
field: runtime.spec.version

# Threading MBeans
- mbean: 'java.lang:type=Threading'
attributes:
- attr: ThreadCount
field: threads.count
- attr: DaemonThreadCount
field: threads.daemon
- attr: PeakThreadCount
field: threads.peak
- attr: TotalStartedThreadCount
field: threads.started.total
20 changes: 20 additions & 0 deletions packages/kafka/data_stream/jvm/fields/base-fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
- name: data_stream.type
type: constant_keyword
description: Data stream type.
- name: data_stream.dataset
type: constant_keyword
description: Data stream dataset.
- name: data_stream.namespace
type: constant_keyword
description: Data stream namespace.
- name: '@timestamp'
type: date
description: Event timestamp.
- name: event.module
type: constant_keyword
description: Event module
value: jolokia
- name: event.dataset
type: constant_keyword
description: Event dataset
value: kafka.jvm
Loading