1
1
use std:: time:: Duration ;
2
2
3
+ use chrono:: Utc ;
3
4
use futures:: StreamExt ;
4
5
use serde_with:: serde_as;
5
6
use tokio:: time;
@@ -14,7 +15,8 @@ use vector_lib::{
14
15
15
16
use crate :: {
16
17
SourceSender ,
17
- config:: { SourceConfig , SourceContext , SourceOutput , log_schema} ,
18
+ config:: { SharedTopologyMetadata , SourceConfig , SourceContext , SourceOutput , log_schema} ,
19
+ event:: { Metric , MetricKind , MetricTags , MetricValue } ,
18
20
internal_events:: { EventsReceived , StreamClosedError } ,
19
21
metrics:: Controller ,
20
22
shutdown:: ShutdownSignal ,
@@ -122,6 +124,7 @@ impl SourceConfig for InternalMetricsConfig {
122
124
interval,
123
125
out : cx. out ,
124
126
shutdown : cx. shutdown ,
127
+ topology_metadata : cx. topology_metadata . clone ( ) ,
125
128
}
126
129
. run ( ) ,
127
130
) )
@@ -144,6 +147,7 @@ struct InternalMetrics<'a> {
144
147
interval : time:: Duration ,
145
148
out : SourceSender ,
146
149
shutdown : ShutdownSignal ,
150
+ topology_metadata : Option < SharedTopologyMetadata > ,
147
151
}
148
152
149
153
impl InternalMetrics < ' _ > {
@@ -164,25 +168,35 @@ impl InternalMetrics<'_> {
164
168
bytes_received. emit ( ByteSize ( byte_size) ) ;
165
169
events_received. emit ( CountByteSize ( count, json_size) ) ;
166
170
167
- let batch = metrics. into_iter ( ) . map ( |mut metric| {
168
- // A metric starts out with a default "vector" namespace, but will be overridden
169
- // if an explicit namespace is provided to this source.
170
- if self . namespace != "vector" {
171
- metric = metric. with_namespace ( Some ( self . namespace . clone ( ) ) ) ;
172
- }
173
-
174
- if let Some ( host_key) = & self . host_key . path
175
- && let Ok ( hostname) = & hostname
176
- {
177
- metric. replace_tag ( host_key. to_string ( ) , hostname. to_owned ( ) ) ;
178
- }
179
- if let Some ( pid_key) = & self . pid_key {
180
- metric. replace_tag ( pid_key. to_owned ( ) , pid. clone ( ) ) ;
181
- }
182
- metric
183
- } ) ;
184
-
185
- if ( self . out . send_batch ( batch) . await ) . is_err ( ) {
171
+ let mut batch: Vec < Metric > = metrics
172
+ . into_iter ( )
173
+ . map ( |mut metric| {
174
+ // A metric starts out with a default "vector" namespace, but will be overridden
175
+ // if an explicit namespace is provided to this source.
176
+ if self . namespace != "vector" {
177
+ metric = metric. with_namespace ( Some ( self . namespace . clone ( ) ) ) ;
178
+ }
179
+
180
+ if let Some ( host_key) = & self . host_key . path
181
+ && let Ok ( hostname) = & hostname
182
+ {
183
+ metric. replace_tag ( host_key. to_string ( ) , hostname. to_owned ( ) ) ;
184
+ }
185
+ if let Some ( pid_key) = & self . pid_key {
186
+ metric. replace_tag ( pid_key. to_owned ( ) , pid. clone ( ) ) ;
187
+ }
188
+ metric
189
+ } )
190
+ . collect ( ) ;
191
+
192
+ // Add topology metrics if available
193
+ if let Some ( topology_metadata) = & self . topology_metadata {
194
+ let topology = topology_metadata. read ( ) . unwrap ( ) ;
195
+ let topology_metrics = generate_topology_metrics ( & topology, Utc :: now ( ) ) ;
196
+ batch. extend ( topology_metrics) ;
197
+ }
198
+
199
+ if ( self . out . send_batch ( batch. into_iter ( ) ) . await ) . is_err ( ) {
186
200
emit ! ( StreamClosedError { count } ) ;
187
201
return Err ( ( ) ) ;
188
202
}
@@ -192,6 +206,50 @@ impl InternalMetrics<'_> {
192
206
}
193
207
}
194
208
209
+ /// Generate metrics for topology connections
210
+ fn generate_topology_metrics (
211
+ topology : & crate :: config:: TopologyMetadata ,
212
+ timestamp : chrono:: DateTime < Utc > ,
213
+ ) -> Vec < Metric > {
214
+ let mut metrics = Vec :: new ( ) ;
215
+
216
+ for ( to_component, inputs) in & topology. inputs {
217
+ for input in inputs {
218
+ let mut tags = MetricTags :: default ( ) ;
219
+
220
+ // Source component labels
221
+ tags. insert ( "from_component_id" . to_string ( ) , input. component . to_string ( ) ) ;
222
+ if let Some ( ( type_name, kind) ) = topology. component_types . get ( & input. component ) {
223
+ tags. insert ( "from_component_type" . to_string ( ) , type_name. clone ( ) ) ;
224
+ tags. insert ( "from_component_kind" . to_string ( ) , kind. clone ( ) ) ;
225
+ }
226
+ if let Some ( port) = & input. port {
227
+ tags. insert ( "from_output" . to_string ( ) , port. clone ( ) ) ;
228
+ }
229
+
230
+ // Target component labels
231
+ tags. insert ( "to_component_id" . to_string ( ) , to_component. to_string ( ) ) ;
232
+ if let Some ( ( type_name, kind) ) = topology. component_types . get ( to_component) {
233
+ tags. insert ( "to_component_type" . to_string ( ) , type_name. clone ( ) ) ;
234
+ tags. insert ( "to_component_kind" . to_string ( ) , kind. clone ( ) ) ;
235
+ }
236
+
237
+ metrics. push (
238
+ Metric :: new (
239
+ "component_connections" ,
240
+ MetricKind :: Absolute ,
241
+ MetricValue :: Gauge { value : 1.0 } ,
242
+ )
243
+ . with_namespace ( Some ( "vector" . to_string ( ) ) )
244
+ . with_tags ( Some ( tags) )
245
+ . with_timestamp ( Some ( timestamp) ) ,
246
+ ) ;
247
+ }
248
+ }
249
+
250
+ metrics
251
+ }
252
+
195
253
#[ cfg( test) ]
196
254
mod tests {
197
255
use std:: collections:: BTreeMap ;
@@ -201,6 +259,7 @@ mod tests {
201
259
202
260
use super :: * ;
203
261
use crate :: {
262
+ config:: { ComponentKey , OutputId } ,
204
263
event:: {
205
264
Event ,
206
265
metric:: { Metric , MetricValue } ,
@@ -344,4 +403,79 @@ mod tests {
344
403
345
404
assert_eq ! ( event. as_metric( ) . namespace( ) , Some ( namespace) ) ;
346
405
}
406
+
407
+ #[ test]
408
+ fn test_topology_metrics_generation ( ) {
409
+ let mut topology = crate :: config:: TopologyMetadata :: new ( ) ;
410
+
411
+ // Add a source -> transform connection
412
+ topology. inputs . insert (
413
+ ComponentKey :: from ( "my_transform" ) ,
414
+ vec ! [ OutputId {
415
+ component: ComponentKey :: from( "my_source" ) ,
416
+ port: None ,
417
+ } ] ,
418
+ ) ;
419
+
420
+ // Add a transform -> sink connection
421
+ topology. inputs . insert (
422
+ ComponentKey :: from ( "my_sink" ) ,
423
+ vec ! [ OutputId {
424
+ component: ComponentKey :: from( "my_transform" ) ,
425
+ port: Some ( "output1" . to_string( ) ) ,
426
+ } ] ,
427
+ ) ;
428
+
429
+ // Add component types
430
+ topology. component_types . insert (
431
+ ComponentKey :: from ( "my_source" ) ,
432
+ ( "file" . to_string ( ) , "source" . to_string ( ) ) ,
433
+ ) ;
434
+ topology. component_types . insert (
435
+ ComponentKey :: from ( "my_transform" ) ,
436
+ ( "remap" . to_string ( ) , "transform" . to_string ( ) ) ,
437
+ ) ;
438
+ topology. component_types . insert (
439
+ ComponentKey :: from ( "my_sink" ) ,
440
+ ( "console" . to_string ( ) , "sink" . to_string ( ) ) ,
441
+ ) ;
442
+
443
+ let timestamp = Utc :: now ( ) ;
444
+ let metrics = generate_topology_metrics ( & topology, timestamp) ;
445
+
446
+ // Should have 2 connection metrics
447
+ assert_eq ! ( metrics. len( ) , 2 ) ;
448
+
449
+ // Find the source -> transform connection
450
+ let source_to_transform = metrics
451
+ . iter ( )
452
+ . find ( |m| m. tags ( ) . and_then ( |t| t. get ( "from_component_id" ) ) == Some ( "my_source" ) )
453
+ . expect ( "Should find source -> transform metric" ) ;
454
+
455
+ assert_eq ! ( source_to_transform. name( ) , "component_connections" ) ;
456
+ assert_eq ! ( source_to_transform. namespace( ) , Some ( "vector" ) ) ;
457
+ match source_to_transform. value ( ) {
458
+ MetricValue :: Gauge { value } => assert_eq ! ( * value, 1.0 ) ,
459
+ _ => panic ! ( "Expected gauge metric" ) ,
460
+ }
461
+
462
+ let tags1 = source_to_transform. tags ( ) . expect ( "Should have tags" ) ;
463
+ assert_eq ! ( tags1. get( "from_component_id" ) , Some ( "my_source" ) ) ;
464
+ assert_eq ! ( tags1. get( "from_component_type" ) , Some ( "file" ) ) ;
465
+ assert_eq ! ( tags1. get( "from_component_kind" ) , Some ( "source" ) ) ;
466
+ assert_eq ! ( tags1. get( "to_component_id" ) , Some ( "my_transform" ) ) ;
467
+ assert_eq ! ( tags1. get( "to_component_type" ) , Some ( "remap" ) ) ;
468
+ assert_eq ! ( tags1. get( "to_component_kind" ) , Some ( "transform" ) ) ;
469
+
470
+ // Find the transform -> sink connection
471
+ let transform_to_sink = metrics
472
+ . iter ( )
473
+ . find ( |m| m. tags ( ) . and_then ( |t| t. get ( "from_component_id" ) ) == Some ( "my_transform" ) )
474
+ . expect ( "Should find transform -> sink metric" ) ;
475
+
476
+ let tags2 = transform_to_sink. tags ( ) . expect ( "Should have tags" ) ;
477
+ assert_eq ! ( tags2. get( "from_component_id" ) , Some ( "my_transform" ) ) ;
478
+ assert_eq ! ( tags2. get( "from_output" ) , Some ( "output1" ) ) ;
479
+ assert_eq ! ( tags2. get( "to_component_id" ) , Some ( "my_sink" ) ) ;
480
+ }
347
481
}
0 commit comments