-
Notifications
You must be signed in to change notification settings - Fork 469
[DON'T MERGE] Add new datastreams to Kafka Integration #14417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
🚀 Benchmarks reportTo see the full report comment with |
testing setup:
services:
kafka:
image: mykafka:latest
ports:
- "9092:9092"
- "8779:8779"
- "8775:8775"
- "8774:8774"
environment:
KAFKA_ADVERTISED_HOST: host.docker.internal:9092 in the kafka integration config I used host.docker.internal ports 9092 and 8779 , also for partition and consumergroup I put SASL user: admin pass: admin-secret |
packages/kafka/data_stream/replicamanager/agent/stream/stream.yml.hbs
Outdated
Show resolved
Hide resolved
💚 Build Succeeded
History
|
|
# Memory MBeans | ||
- mbean: 'java.lang:type=Memory' | ||
attributes: | ||
- attr: HeapMemoryUsage.committed | ||
field: memory.heap.committed | ||
- attr: HeapMemoryUsage.init | ||
field: memory.heap.init | ||
- attr: HeapMemoryUsage.max | ||
field: memory.heap.max | ||
- attr: HeapMemoryUsage.used | ||
field: memory.heap.used | ||
- attr: NonHeapMemoryUsage.committed | ||
field: memory.non_heap.committed | ||
- attr: NonHeapMemoryUsage.init | ||
field: memory.non_heap.init | ||
- attr: NonHeapMemoryUsage.max | ||
field: memory.non_heap.max | ||
- attr: NonHeapMemoryUsage.used | ||
field: memory.non_heap.used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figured out that I am able to access those subobjects in the browser via URLs:
- http://localhost:8779/jolokia/read/java.lang:type=Memory/HeapMemoryUsage
- http://localhost:8779/jolokia/read/java.lang:type=Memory/NonHeapMemoryUsage
but for some reason it doesn't work if I put that in JMX mappings (using slash in Mbean)
# Network Request Metrics MBeans | ||
- mbean: 'kafka.network:type=RequestMetrics,name=ErrorsPerSec' | ||
attributes: | ||
- attr: OneMinuteRate | ||
field: request_metrics.errors_per_sec | ||
- mbean: 'kafka.network:type=RequestMetrics,name=LocalTimeMs' | ||
attributes: | ||
- attr: Mean | ||
field: request_metrics.local_time_ms | ||
- mbean: 'kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs' | ||
attributes: | ||
- attr: Mean | ||
field: request_metrics.message_conversions_time_ms | ||
- mbean: 'kafka.network:type=RequestMetrics,name=RemoteTimeMs' | ||
attributes: | ||
- attr: Mean | ||
field: request_metrics.remote_time_ms | ||
- mbean: 'kafka.network:type=RequestMetrics,name=RequestBytes' | ||
attributes: | ||
- attr: Mean | ||
field: request_metrics.request_bytes | ||
- mbean: 'kafka.network:type=RequestMetrics,name=RequestQueueTimeMs' | ||
attributes: | ||
- attr: Mean | ||
field: request_metrics.request_queue_time_ms | ||
- mbean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec' | ||
attributes: | ||
- attr: OneMinuteRate | ||
field: request_metrics.requests_per_sec | ||
- mbean: 'kafka.network:type=RequestMetrics,name=ResponseQueueTimeMs' | ||
attributes: | ||
- attr: Mean | ||
field: request_metrics.response_queue_time_ms | ||
- mbean: 'kafka.network:type=RequestMetrics,name=ResponseSendTimeMs' | ||
attributes: | ||
- attr: Mean | ||
field: request_metrics.response_send_time_ms | ||
- mbean: 'kafka.network:type=RequestMetrics,name=TemporaryMemoryBytes' | ||
attributes: | ||
- attr: Mean | ||
field: request_metrics.temporary_memory_bytes | ||
- mbean: 'kafka.network:type=RequestMetrics,name=ThrottleTimeMs' | ||
attributes: | ||
- attr: Mean | ||
field: request_metrics.throttle_time_ms | ||
- mbean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs' | ||
attributes: | ||
- attr: Mean | ||
field: request_metrics.total_time_ms |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are not available just like that. One needs to append the request
parameter: http://localhost:8779/jolokia/read/kafka.network:type=RequestMetrics,name=*,request=Produce
something like this.
For the request above I got:
{"request":{"mbean":"kafka.network:name=*,request=Produce,type=RequestMetrics","type":"read"},"value":{"kafka.network:name=ResponseQueueTimeMs,request=Produce,type=RequestMetrics":{"StdDev":0.6401002810127467,"Mean":0.2440944881889764,"75thPercentile":0.0,"98thPercentile":2.0,"Min":0.0,"95thPercentile":1.0,"99thPercentile":3.0,"Max":6.0,"999thPercentile":6.0,"Count":508,"50thPercentile":0.0},"kafka.network:name=ThrottleTimeMs,request=Produce,type=RequestMetrics":{"StdDev":0.0,"Mean":0.0,"75thPercentile":0.0,"98thPercentile":0.0,"Min":0.0,"95thPercentile":0.0,"99thPercentile":0.0,"Max":0.0,"999thPercentile":0.0,"Count":508,"50thPercentile":0.0},"kafka.network:name=MessageConversionsTimeMs,request=Produce,type=RequestMetrics":{"StdDev":0.0,"Mean":0.0,"75thPercentile":0.0,"98thPercentile":0.0,"Min":0.0,"95thPercentile":0.0,"99thPercentile":0.0,"Max":0.0,"999thPercentile":0.0,"Count":508,"50thPercentile":0.0},"kafka.network:name=RequestQueueTimeMs,request=Produce,type=RequestMetrics":{"StdDev":0.43361864410532475,"Mean":0.1515748031496063,"75thPercentile":0.0,"98thPercentile":1.0,"Min":0.0,"95thPercentile":1.0,"99thPercentile":2.0,"Max":4.0,"999thPercentile":4.0,"Count":508,"50thPercentile":0.0},"kafka.network:name=LocalTimeMs,request=Produce,type=RequestMetrics":{"StdDev":1.158614549759072,"Mean":1.2322834645669292,"75thPercentile":2.0,"98thPercentile":4.0,"Min":0.0,"95thPercentile":4.0,"99thPercentile":5.0,"Max":7.0,"999thPercentile":7.0,"Count":508,"50thPercentile":1.0},"kafka.network:name=ResponseSendTimeMs,request=Produce,type=RequestMetrics":{"StdDev":0.1638670579438843,"Mean":0.027559055118110236,"75thPercentile":0.0,"98thPercentile":1.0,"Min":0.0,"95thPercentile":0.0,"99thPercentile":1.0,"Max":1.0,"999thPercentile":1.0,"Count":508,"50thPercentile":0.0},"kafka.network:name=RemoteTimeMs,request=Produce,type=RequestMetrics":{"StdDev":0.0,"Mean":0.0,"75thPercentile":0.0,"98thPercentile":0.0,"Min":0.0,"95thPercentile":0.0,"99thPercentile":0.0,"Max":0.0,"999thPercentile":0.0,"Count":508,"50thPercentile":0.0},"kafka.network:name=TemporaryMemoryBytes,request=Produce,type=RequestMetrics":{"StdDev":0.0,"Mean":0.0,"75thPercentile":0.0,"98thPercentile":0.0,"Min":0.0,"95thPercentile":0.0,"99thPercentile":0.0,"Max":0.0,"999thPercentile":0.0,"Count":508,"50thPercentile":0.0},"kafka.network:name=TotalTimeMs,request=Produce,type=RequestMetrics":{"StdDev":1.4895763978304433,"Mean":1.9901574803149606,"75thPercentile":3.0,"98thPercentile":6.819999999999993,"Min":0.0,"95thPercentile":5.0,"99thPercentile":7.0,"Max":9.0,"999thPercentile":9.0,"Count":508,"50thPercentile":1.0},"kafka.network:name=RequestBytes,request=Produce,type=RequestMetrics":{"StdDev":6.708776911517584,"Mean":101.13385826771653,"75thPercentile":112.0,"98thPercentile":112.0,"Min":97.0,"95thPercentile":112.0,"99thPercentile":112.0,"Max":112.0,"999thPercentile":112.0,"Count":508,"50thPercentile":97.0}},"timestamp":1751977660,"status":200}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from the docs https://docs.confluent.io/platform/current/kafka/monitoring.html the format seems to be:
kafka.network:type=RequestMetrics,name=LocalTimeMs,request={Produce|FetchConsumer|FetchFollower}
- mbean: 'kafka.network:type=Acceptor,name=AcceptorBlockedPercent' | ||
attributes: | ||
- attr: Value | ||
field: acceptor.blocked_percent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for this one the parameter listener
is needed, for example kafka.network:type=Acceptor,name=AcceptorBlockedPercent,listener=INSIDE
.
For http://localhost:8779/jolokia/read/kafka.network:type=Acceptor,name=AcceptorBlockedPercent,listener=INSIDE
I got:
{"request":{"mbean":"kafka.network:listener=INSIDE,name=AcceptorBlockedPercent,type=Acceptor","type":"read"},"value":{"RateUnit":"NANOSECONDS","OneMinuteRate":0.0,"EventType":"blocked time","Count":0,"FifteenMinuteRate":0.0,"FiveMinuteRate":0.0,"MeanRate":0.0},"timestamp":1751978436,"status":200}
# Network Processor MBeans | ||
- mbean: 'kafka.network:type=Processor,name=IdlePercent' | ||
attributes: | ||
- attr: Value | ||
field: processor.idle_percent | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for this one the parameter networkProcessor
is needed, for example:
http://localhost:8779/jolokia/read/kafka.network:type=Processor,name=IdlePercent,networkProcessor=*
gets us:
{"request":{"mbean":"kafka.network:name=IdlePercent,networkProcessor=*,type=Processor","type":"read"},"value":{"kafka.network:name=IdlePercent,networkProcessor=1,type=Processor":{"Value":1.0},"kafka.network:name=IdlePercent,networkProcessor=2,type=Processor":{"Value":1.0},"kafka.network:name=IdlePercent,networkProcessor=3,type=Processor":{"Value":1.0},"kafka.network:name=IdlePercent,networkProcessor=5,type=Processor":{"Value":1.0},"kafka.network:name=IdlePercent,networkProcessor=4,type=Processor":{"Value":1.0},"kafka.network:name=IdlePercent,networkProcessor=0,type=Processor":{"Value":1.0}},"timestamp":1751978648,"status":200}
# Log Manager MBeans | ||
- mbean: 'kafka.log:type=LogManager,name=LogDirectoryOffline' | ||
attributes: | ||
- attr: Value | ||
field: log_manager.log_directory_offline | ||
- mbean: 'kafka.log:type=LogManager,name=OfflineLogDirectoryCount' | ||
attributes: | ||
- attr: Value | ||
field: log_manager.offline_log_directory_count | ||
- mbean: 'kafka.log:type=LogManager,name=RemainingLogsToRecover' | ||
attributes: | ||
- attr: Value | ||
field: log_manager.remaining_logs_to_recover | ||
- mbean: 'kafka.log:type=LogManager,name=RemainingSegmentsToRecover' | ||
attributes: | ||
- attr: Value | ||
field: log_manager.remaining_segments_to_recover |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these look good, but only OfflineLogDirectoryCount
is supported on 3.6.0, other 3 were added in 4.0.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comparing latest doc https://kafka.apache.org/documentation/ and 3.6.0 doc https://kafka.apache.org/36/documentation.html
- mbean: 'kafka.log:type=LogCleanerManager,name=MaxDirtyPercent' | ||
attributes: | ||
- attr: Value | ||
field: log_cleaner_manager.max_dirty_percent | ||
- mbean: 'kafka.log:type=LogCleanerManager,name=TimeSinceLastRunMs' | ||
attributes: | ||
- attr: Value | ||
field: log_cleaner_manager.time_since_last_run_ms | ||
- mbean: 'kafka.log:type=LogCleanerManager,name=UncleanableBytes' | ||
attributes: | ||
- attr: Value | ||
field: log_cleaner_manager.uncleanable_bytes | ||
- mbean: 'kafka.log:type=LogCleanerManager,name=UncleanablePartitionsCount' | ||
attributes: | ||
- attr: Value | ||
field: log_cleaner_manager.uncleanable_partitions_count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
according to https://docs.confluent.io/platform/current/kafka/monitoring.html?#search-for-metric for LogCleanerManager there is logDirectory parameter
# Memory MBeans | ||
- mbean: 'java.lang:type=Memory' | ||
attributes: | ||
- attr: HeapMemoryUsage.committed | ||
field: memory.heap.committed | ||
- attr: HeapMemoryUsage.init | ||
field: memory.heap.init | ||
- attr: HeapMemoryUsage.max | ||
field: memory.heap.max | ||
- attr: HeapMemoryUsage.used | ||
field: memory.heap.used | ||
- attr: NonHeapMemoryUsage.committed | ||
field: memory.non_heap.committed | ||
- attr: NonHeapMemoryUsage.init | ||
field: memory.non_heap.init | ||
- attr: NonHeapMemoryUsage.max | ||
field: memory.non_heap.max | ||
- attr: NonHeapMemoryUsage.used | ||
field: memory.non_heap.used | ||
- attr: ObjectPendingFinalizationCount | ||
field: memory.objects_pending_finalization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way of getting nested params worked for me:
# Memory MBeans | |
- mbean: 'java.lang:type=Memory' | |
attributes: | |
- attr: HeapMemoryUsage.committed | |
field: memory.heap.committed | |
- attr: HeapMemoryUsage.init | |
field: memory.heap.init | |
- attr: HeapMemoryUsage.max | |
field: memory.heap.max | |
- attr: HeapMemoryUsage.used | |
field: memory.heap.used | |
- attr: NonHeapMemoryUsage.committed | |
field: memory.non_heap.committed | |
- attr: NonHeapMemoryUsage.init | |
field: memory.non_heap.init | |
- attr: NonHeapMemoryUsage.max | |
field: memory.non_heap.max | |
- attr: NonHeapMemoryUsage.used | |
field: memory.non_heap.used | |
- attr: ObjectPendingFinalizationCount | |
field: memory.objects_pending_finalization | |
# Memory MBeans | |
- mbean: 'java.lang:type=Memory' | |
attributes: | |
- attr: HeapMemoryUsage | |
field: memory.heap | |
paths: | |
committed: "heap_memory.committed" | |
init: "heap_memory.init" | |
max: "heap_memory.max" | |
used: "heap_memory.used" | |
- attr: NonHeapMemoryUsage | |
field: memory.non_heap | |
paths: | |
committed: "non_heap_memory.committed" | |
init: "non_heap_memory.init" | |
max: "non_heap_memory.max" | |
used: "non_heap_memory.used" | |
- attr: ObjectPendingFinalizationCount | |
field: memory.objects_pending_finalization |
The new datastreams in this integration are created using agentic coding
Proposed commit message
Adding jvm, logmanager, replicamanager, ,network datastreams using jolokia input.
The dashboards are not part of the PR.