Skip to content

Conversation

@kaarolch
Copy link
Contributor

Summary

This PR adds event-time aggregation support to the aggregate transform, addressing issues where metrics with different source timestamps but the same processing time are incorrectly aggregated together.

I made this PR to address some gaps in #23694. Thank you @adiwab for providing initial implementation.

Problem

Currently, the aggregate transform uses system processing time to bucket metrics. This causes issues when:

  • Multiple events have different source timestamps but arrive at the same processing time, or have the same source timestamp and after processing have different tag series.
  • In the first case they get aggregated into the same bucket and sent downstream with identical timestamps and in the second one they could be shipped in different batch.
  • Systems like Datadog overwrite the earlier values, resulting in data loss

Solution

Introduced a new time_source configuration option with two modes:

  • SystemTime (default): Existing behavior, maintains backward compatibility
  • EventTime: Uses metric timestamps for bucketing, with watermark-based out-of-order event rejection

Key Changes

Configuration Options:

  • time_source: Choose between SystemTime (default) or EventTime aggregation
  • allowed_lateness_ms: Grace period for accepting late-arriving events (default: 0)
  • use_system_time_for_missing_timestamps: Fallback behavior for events without timestamps (default: false, drops events)
  • max_future_ms: Maximum allowed future timestamp offset to reject clock-skewed events (default: 10000ms)

Implementation:

  • Event-time bucketing based on metric timestamps rounded down to interval_ms boundaries
  • Watermark tracking to identify and reject out-of-order events past the grace period
  • Support for all aggregation modes.
  • Separate bucket storage (event_time_buckets, event_time_prev_buckets, event_time_multi_buckets) for event-time mode
  • Dropped event metrics via AggregateEventDropped internal event

Vector configuration

api:
  enabled: true
log_schema:
  level: debug
sources:
  http_metrics:
    type: http_server
    address: "0.0.0.0:8080"
    decoding:
      codec: influxdb
    path: "/api/v1/write"
transforms:
  aggregate_metrics:
    type: aggregate
    inputs:
      - http_metrics
    time_source: EventTime
    interval_ms: 5000
sinks:
  console_debug:
    type: console
    inputs:
      -  aggregate_metrics
    encoding:
      codec: json
    buffer:
       max_events: 10000
       type: memory
       when_full: drop_newest

How did you test this PR?

Added 6 new unit tests covering event-time aggregation scenarios:

  • Basic incremental aggregation within same bucket
  • Out-of-order event rejection with watermark
  • Multiple time buckets
  • Missing timestamp rejection
  • Absolute gauge (latest value selection)
  • Multiple different metrics in same bucket

I've used Sonnet 4.5 to create some scripts that push influxdb metrics to vector with multiple values:

[TEST 1] Basic Aggregation - 3 buckets
============================================================

Bucket 1 (timestamp 07:29:11):
 ✓ test_counter=10.0 @ 07:29:11
 ✓ test_counter=20.0 @ 07:29:12
 ✓ test_counter=30.0 @ 07:29:13
 Expected aggregate: 60.0

Bucket 2 (timestamp 07:29:21):
 ✓ test_counter=10.0 @ 07:29:21
 ✓ test_counter=20.0 @ 07:29:22
 ✓ test_counter=30.0 @ 07:29:23
 Expected aggregate: 60.0

Bucket 3 (timestamp 07:29:31):
 ✓ test_counter=10.0 @ 07:29:31
 ✓ test_counter=20.0 @ 07:29:32
 ✓ test_counter=30.0 @ 07:29:33
 Expected aggregate: 60.0

[TEST 2] Out-of-Order Rejection
============================================================

Bucket 1 (timestamp 07:29:31):
 ✓ test_ooo=10.0 @ 07:29:31
 ✓ test_ooo=20.0 @ 07:29:32
 Expected aggregate: 30.0

Bucket 2 (timestamp 07:29:41):
 ✓ test_ooo=15.0 @ 07:29:41
 ✓ test_ooo=25.0 @ 07:29:42
 Expected aggregate: 40.0

Out-of-order event (timestamp 07:29:33):
 Sending to already-flushed bucket - should be DROPPED
 ✓ test_ooo=999.0 @ 07:29:33

 Check Vector output - 999.0 should NOT appear!

[TEST 3] Multiple Buckets (10s span = 2 buckets)
============================================================

Sending events across 10 seconds:
 ✓ test_multi=100.0 @ 07:29:46
 ✓ test_multi=200.0 @ 07:29:48
 ✓ test_multi=300.0 @ 07:29:51
 ✓ test_multi=400.0 @ 07:29:53

 Expected:
   Bucket 1 (0-5s): 300.0
   Bucket 2 (5-10s): 700.0

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

Notes

  • Please read our Vector contributor resources.
  • Do not hesitate to use @vectordotdev/vector to reach out to us regarding this PR.
  • Some CI checks run only after we manually approve them.
    • We recommend adding a pre-push hook, please see this template.
    • Alternatively, we recommend running the following locally before pushing to the remote branch:
      • make fmt
      • make check-clippy (if there are failures it's possible some of them can be fixed with make clippy-fix)
      • make test
  • After a review is requested, please avoid force pushes to help us review incrementally.
    • Feel free to push as many commits as you want. They will be squashed into one before merging.
    • For example, you can run git merge origin master and git push.
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run make build-licenses to regenerate the license inventory and commit the changes (if any). More details here.

@kaarolch kaarolch requested review from a team as code owners December 30, 2025 09:07
@github-actions github-actions bot added domain: transforms Anything related to Vector's transform components domain: external docs Anything related to Vector's external, public documentation labels Dec 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain: external docs Anything related to Vector's external, public documentation domain: transforms Anything related to Vector's transform components

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants