Skip to content

internal_logs source silently drops logs under high load #24220

@thomasqueirozb

Description

@thomasqueirozb

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

internal_logs source drops logs under high load. BroadcastStream errors with RecvError::Lagged

Configuration

Write this config file to tmp/config.yml:

api:
  enabled: true

sources:
  internal_logs:
    type: internal_logs

sinks:
  show_internal_logs:
    type: console
    inputs:
      - internal_logs
    encoding:
      codec: json

Apply this patch to debug

diff --git a/src/internal_events/heartbeat.rs b/src/internal_events/heartbeat.rs
index 8197237bb..4dc4aa71a 100644
--- a/src/internal_events/heartbeat.rs
+++ b/src/internal_events/heartbeat.rs
@@ -11,8 +11,10 @@ pub struct Heartbeat {
 }

 impl InternalEvent for Heartbeat {
+    #[allow(clippy::print_stdout)]
     fn emit(self) {
-        trace!(target: "vector", message = "Beep.");
+        error!(target: "vector", message = "Beep.");
+        println!("BEEPING");
         gauge!("uptime_seconds").set(self.since.elapsed().as_secs() as f64);
         gauge!(
             "build_info",
diff --git a/src/trace.rs b/src/trace.rs
index 6ecec8045..b1d30a7bc 100644
--- a/src/trace.rs
+++ b/src/trace.rs
@@ -282,7 +282,12 @@ impl TraceSubscription {
     pub fn into_stream(self) -> impl Stream<Item = LogEvent> + Unpin {
         // We ignore errors because the only error we get is when the broadcast receiver lags, and there's nothing we
         // can actually do about that so there's no reason to force callers to even deal with it.
-        BroadcastStream::new(self.trace_rx).filter_map(|event| ready(event.ok()))
+        BroadcastStream::new(self.trace_rx).filter_map(|event| {
+            if event.is_err() {
+                println!("LAGGED {:?}", &event)
+            }
+            ready(event.ok())
+        })
     }
 }

Version

master/0.51.0

Debug Output

Running:

VECTOR_LOG=debug cargo run -- -c tmp/config.yml 2>&1 | rg 'Beep|BEEPING|LAGGED'

outputs:

2025-11-12T19:09:25.646829Z ERROR vector: Beep.
BEEPING
{"host":"XXX","message":"Beep.","metadata":{"kind":"event","level":"ERROR","module_path":"vector::internal_events::heartbeat","target":"vector"},"pid":38081,"source_type":"internal_logs","timestamp":"2025-11-12T19:09:25.645812Z"}
2025-11-12T19:09:26.649567Z ERROR vector: Beep.
BEEPING
{"host":"XXX","message":"Beep.","metadata":{"kind":"event","level":"ERROR","module_path":"vector::internal_events::heartbeat","target":"vector"},"pid":38081,"source_type":"internal_logs","timestamp":"2025-11-12T19:09:26.649511Z"}
2025-11-12T19:09:27.645633Z ERROR vector: Beep.
BEEPING
{"host":"XXX","message":"Beep.","metadata":{"kind":"event","level":"ERROR","module_path":"vector::internal_events::heartbeat","target":"vector"},"pid":38081,"source_type":"internal_logs","timestamp":"2025-11-12T19:09:27.645543Z"}
...

Which is what you'd expect. One internal_logs event per Beep.

However running with VECTOR_LOG=trace (to generate way too many logs) you will then see the following.

VECTOR_LOG=trace cargo run -- -c tmp/config.yml 2>&1 | rg 'Beep|BEEPING'

Outputs:

2025-11-12T19:11:57.922701Z ERROR vector: Beep.
BEEPING
{"host":"XXX","message":"Beep.","metadata":{"kind":"event","level":"ERROR","module_path":"vector::internal_events::heartbeat","target":"vector"},"pid":38539,"source_type":"internal_logs","timestamp":"2025-11-12T19:11:57.922690Z"}
2025-11-12T19:11:58.922668Z ERROR vector: Beep.
BEEPING
2025-11-12T19:11:59.922867Z ERROR vector: Beep.
BEEPING
2025-11-12T19:12:00.922409Z ERROR vector: Beep.
BEEPING
2025-11-12T19:12:01.922655Z ERROR vector: Beep.
BEEPING
{"host":"XXX","message":"Beep.","metadata":{"kind":"event","level":"ERROR","module_path":"vector::internal_events::heartbeat","target":"vector"},"pid":38539,"source_type":"internal_logs","timestamp":"2025-11-12T19:12:01.922641Z"}
2025-11-12T19:12:02.922624Z ERROR vector: Beep.
BEEPING
2025-11-12T19:12:03.922599Z ERROR vector: Beep.
BEEPING
2025-11-12T19:12:04.922512Z ERROR vector: Beep.
BEEPING
2025-11-12T19:12:05.922613Z ERROR vector: Beep.
BEEPING
2025-11-12T19:12:06.922878Z ERROR vector: Beep.
BEEPING
2025-11-12T19:12:07.922769Z ERROR vector: Beep.
BEEPING
2025-11-12T19:12:08.923229Z ERROR vector: Beep.
BEEPING
{"host":"COMP-MD7RLJ4PMV","message":"Beep.","metadata":{"kind":"event","level":"ERROR","module_path":"vector::internal_events::heartbeat","target":"vector"},"pid":38539,"source_type":"internal_logs","timestamp":"2025-11-12T19:12:08.923216Z"}

Meaning that there are more Beeps than internal_logs events.

Running and grepping for LAGGED will spam the output like so:

VECTOR_LOG=trace cargo run -- -c tmp/config.yml 2>&1 | rg 'Beep|BEEPING|LAGGED'
2025-11-12T19:18:17.712980Z ERROR vector: Beep.
BEEPING
LAGGED Err(Lagged(2))
LAGGED Err(Lagged(1))
{"host":"XXX","message":"Beep.","metadata":{"kind":"event","level":"ERROR","module_path":"vector::internal_events::heartbeat","target":"vector"},"pid":39584,"source_type":"internal_logs","timestamp":"2025-11-12T19:18:17.711641Z"}
LAGGED Err(Lagged(311))
LAGGED Err(Lagged(4))
LAGGED Err(Lagged(3))
LAGGED Err(Lagged(4))
LAGGED Err(Lagged(3))
LAGGED Err(Lagged(3))
LAGGED Err(Lagged(3))
LAGGED Err(Lagged(3))
LAGGED Err(Lagged(3))
LAGGED Err(Lagged(1))
LAGGED Err(Lagged(5))
LAGGED Err(Lagged(3))
LAGGED Err(Lagged(1))
LAGGED Err(Lagged(3))
LAGGED Err(Lagged(1))
LAGGED Err(Lagged(3))
LAGGED Err(Lagged(4))
LAGGED Err(Lagged(3))
LAGGED Err(Lagged(4))
LAGGED Err(Lagged(3))
LAGGED Err(Lagged(1))
LAGGED Err(Lagged(4))
LAGGED Err(Lagged(4))
LAGGED Err(Lagged(4))
...

Example Data

No response

Additional Context

No response

References

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    type: bugA code related bug.

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions