diff --git a/ci/fingerprints/dogstatsd_http/lading.yaml b/ci/fingerprints/dogstatsd_http/lading.yaml new file mode 100644 index 000000000..2d0a8b40e --- /dev/null +++ b/ci/fingerprints/dogstatsd_http/lading.yaml @@ -0,0 +1,19 @@ +generator: + - id: "dogstatsd_http" + http: + seed: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5] + target_uri: "http://127.0.0.1:8125" + headers: + content-type: application/x-protobuf + method: + post: + variant: + dogstatsd_http: {} + maximum_prebuild_cache_size_bytes: "10 Mb" + bytes_per_second: "100 Mb" + maximum_block_size: "1 Mb" + parallel_connections: 1 + +blackhole: + - http: + binding_addr: "127.0.0.1:9999" diff --git a/lading_payload/Cargo.toml b/lading_payload/Cargo.toml index ef9be7d54..efe4a05dd 100644 --- a/lading_payload/Cargo.toml +++ b/lading_payload/Cargo.toml @@ -64,6 +64,10 @@ harness = false name = "dogstatsd" harness = false +[[bench]] +name = "dogstatsd_http" +harness = false + [[bench]] name = "fluent" harness = false diff --git a/lading_payload/benches/dogstatsd_http.rs b/lading_payload/benches/dogstatsd_http.rs new file mode 100644 index 000000000..44489e318 --- /dev/null +++ b/lading_payload/benches/dogstatsd_http.rs @@ -0,0 +1,60 @@ +//! Benchmarks for `DogStatsDHttp` payload generation. + +use criterion::{BatchSize, BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; +use lading_payload::{Serialize, dogstatsd, dogstatsd_http::DogStatsDHttp}; +use rand::{SeedableRng, rngs::SmallRng}; +use std::time::Duration; + +const MIB: usize = 1_048_576; + +fn dogstatsd_http_setup(c: &mut Criterion) { + c.bench_function("dogstatsd_http_setup", |b| { + b.iter(|| { + let mut rng = SmallRng::seed_from_u64(19_690_716); + let config = dogstatsd::Config::default(); + let _dd = DogStatsDHttp::new(&config, &mut rng); + }); + }); +} + +fn dogstatsd_http_throughput(c: &mut Criterion) { + let mut group = c.benchmark_group("dogstatsd_http_throughput"); + for size in &[MIB, 10 * MIB, 100 * MIB] { + group.throughput(Throughput::Bytes(*size as u64)); + group.bench_with_input(BenchmarkId::from_parameter(size), size, |b, &size| { + b.iter_batched( + || { + let mut rng = SmallRng::seed_from_u64(19_690_716); + let config = dogstatsd::Config::default(); + let dd = DogStatsDHttp::new(&config, &mut rng) + .expect("failed to create DogStatsDHttp"); + (rng, dd, Vec::with_capacity(size)) + }, + |(rng, mut dd, mut writer)| { + dd.to_bytes(rng, size, &mut writer) + .expect("failed to convert to bytes"); + }, + BatchSize::PerIteration, + ); + }); + } + group.finish(); +} + +criterion_group!( + name = setup_benches; + config = Criterion::default() + .measurement_time(Duration::from_secs(5)) + .warm_up_time(Duration::from_secs(1)); + targets = dogstatsd_http_setup, +); + +criterion_group!( + name = throughput_benches; + config = Criterion::default() + .measurement_time(Duration::from_secs(30)) + .warm_up_time(Duration::from_secs(1)); + targets = dogstatsd_http_throughput, +); + +criterion_main!(setup_benches, throughput_benches); diff --git a/lading_payload/proptest-regressions/dogstatsd_http.txt b/lading_payload/proptest-regressions/dogstatsd_http.txt new file mode 100644 index 000000000..1617a759b --- /dev/null +++ b/lading_payload/proptest-regressions/dogstatsd_http.txt @@ -0,0 +1,7 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc 7bb60665af78073dc7447e24d33711ca088d8e001cbbf923670d7d598094ce35 # shrinks to seed = 0, max_bytes = 1622 diff --git a/lading_payload/src/block.rs b/lading_payload/src/block.rs index 4acb0d4e5..2bf5ee880 100644 --- a/lading_payload/src/block.rs +++ b/lading_payload/src/block.rs @@ -401,6 +401,27 @@ impl Cache { construct_block_cache_inner(rng, &mut pyld, maximum_block_bytes, total_bytes.get())? } + crate::Config::DogStatsDHttp(conf) => { + match conf.valid() { + Ok(()) => (), + Err(e) => { + warn!("Invalid DogStatsDHttp configuration: {}", e); + return Err(Error::InvalidConfig(e)); + } + } + + let mut serializer = crate::DogStatsDHttp::new(conf, &mut rng)?; + + let span = span!(Level::INFO, "fixed", payload = "dogstatsd-http"); + let _guard = span.enter(); + + construct_block_cache_inner( + &mut rng, + &mut serializer, + maximum_block_bytes, + total_bytes.get(), + )? + } }; let total_cycle_size = blocks diff --git a/lading_payload/src/dogstatsd.rs b/lading_payload/src/dogstatsd.rs index 0553a147f..9e6bf730e 100644 --- a/lading_payload/src/dogstatsd.rs +++ b/lading_payload/src/dogstatsd.rs @@ -22,7 +22,7 @@ use self::{ use super::Generator; -mod common; +pub(crate) mod common; pub mod event; pub mod metric; pub mod service_check; @@ -72,12 +72,12 @@ impl Default for KindWeights { #[serde(rename_all = "snake_case")] #[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct MetricWeights { - count: u8, - gauge: u8, - timer: u8, - distribution: u8, - set: u8, - histogram: u8, + pub(crate) count: u8, + pub(crate) gauge: u8, + pub(crate) timer: u8, + pub(crate) distribution: u8, + pub(crate) set: u8, + pub(crate) histogram: u8, } impl MetricWeights { @@ -354,10 +354,10 @@ struct MemberGenerator { /// Cheap to clone. #[derive(Clone, Debug)] pub(crate) struct StringPools { - tag_name: Rc, - tag_value: Rc, - name: Rc, - randomstring: Rc, + pub(crate) tag_name: Rc, + pub(crate) tag_value: Rc, + pub(crate) name: Rc, + pub(crate) randomstring: Rc, } impl MemberGenerator { diff --git a/lading_payload/src/dogstatsd_http.rs b/lading_payload/src/dogstatsd_http.rs new file mode 100644 index 000000000..9aac70144 --- /dev/null +++ b/lading_payload/src/dogstatsd_http.rs @@ -0,0 +1,566 @@ +//! `DogStatsD`-over-HTTP payload (protobuf). +//! +//! Serializes `DogStatsD` Count and Gauge metrics into the binary protobuf format +//! defined by the Datadog Agent's `dogstatsdhttp/payload.proto`. Metric names +//! and tag strings are stored in byte dictionaries with varint-length prefixes; +//! per-metric arrays use 1-based index references with delta encoding. + +use std::io::Write; + +use rustc_hash::FxHashMap; + +use bytes::BytesMut; +use rand::Rng; + +use crate::common::strings::Pool; +use crate::dogstatsd::common::NumValue; +use crate::dogstatsd::metric::Metric; +use crate::{Generator, Serialize}; + +// ───────────────────────────────────────────────────────────────────────────── +// Prost message structs (mirrors dogstatsdhttp/payload.proto) +// ───────────────────────────────────────────────────────────────────────────── + +#[derive(Clone, prost::Message)] +struct Payload { + #[prost(message, optional, tag = "2")] + metadata: Option, + #[prost(message, optional, tag = "3")] + metric_data: Option, +} + +#[derive(Clone, prost::Message)] +struct Metadata { + #[prost(string, repeated, tag = "1")] + tags: Vec, + #[prost(string, repeated, tag = "2")] + resources: Vec, +} + +#[derive(Clone, prost::Message)] +struct MetricData { + /// Dictionary of metric name strings: varint(len) || utf8 bytes per entry. + #[prost(bytes = "vec", tag = "1")] + dict_name_str: Vec, + /// Dictionary of tag strings: varint(len) || utf8 bytes per entry. + #[prost(bytes = "vec", tag = "2")] + dict_tag_str: Vec, + /// Flat delta-encoded tagset entries: `[num_tags, delta_idx1, delta_idx2, …]` + #[prost(sint64, repeated, tag = "3")] + dict_tagsets: Vec, + + /// Bitwise-OR of metricType | valueType | metricFlags per metric. + #[prost(uint64, repeated, tag = "10")] + types: Vec, + /// Delta-encoded 1-based name indices. + #[prost(sint64, repeated, tag = "11")] + name_refs: Vec, + /// Delta-encoded tagset references (0 = no tags). + #[prost(sint64, repeated, tag = "12")] + tagset_refs: Vec, + /// Reporting interval per metric (unused; kept for proto compatibility). + #[prost(uint64, repeated, tag = "14")] + intervals: Vec, + /// Number of values per metric point (always 1 here). + #[prost(uint64, repeated, tag = "15")] + num_points: Vec, + /// Delta-encoded timestamps (0 = inherit/delta from previous). + #[prost(sint64, repeated, tag = "16")] + timestamps: Vec, + /// Integer values for metrics whose valueType is Sint64 (0x10). + #[prost(sint64, repeated, tag = "17")] + vals_sint64: Vec, + /// Float64 values for metrics whose valueType is Float64 (0x30). + #[prost(double, repeated, tag = "19")] + vals_float64: Vec, +} + +// ───────────────────────────────────────────────────────────────────────────── +// Dictionary helpers +// ───────────────────────────────────────────────────────────────────────────── + +/// Builds `dictNameStr` / `dictTagStr`: varint(len) || utf8 bytes per entry. +/// +/// Indices are **1-based**; 0 is reserved for "empty / not present". +struct StringDict { + map: FxHashMap, + bytes: Vec, +} + +impl StringDict { + fn new() -> Self { + Self { + map: FxHashMap::default(), + bytes: Vec::new(), + } + } + + /// Return the 1-based index for `s`, inserting it if not already present. + fn get_or_insert(&mut self, s: &str) -> usize { + if let Some(&idx) = self.map.get(s) { + return idx; + } + let idx = self.map.len() + 1; + self.map.insert(s.to_owned(), idx); + let mut buf = BytesMut::new(); + prost::encoding::encode_varint(s.len() as u64, &mut buf); + self.bytes.extend_from_slice(&buf); + self.bytes.extend_from_slice(s.as_bytes()); + idx + } +} + +/// Builds `dictTagsets` (flat repeated sint64). +/// +/// Each tagset entry occupies: `[num_tags, delta_idx1, delta_idx2, …]`. +/// A tagset reference of 0 means "no tags" and is never stored in this dict. +struct TagsetDict { + map: FxHashMap, usize>, + flat: Vec, +} + +impl TagsetDict { + fn new() -> Self { + Self { + map: FxHashMap::default(), + flat: Vec::new(), + } + } + + /// Return the 1-based tagset reference for `tag_indices` (sorted for + /// canonical identity), inserting a new entry if not already present. + #[expect(clippy::cast_possible_wrap)] + fn get_or_insert(&mut self, mut tag_indices: Vec) -> usize { + tag_indices.sort_unstable(); + if let Some(&idx) = self.map.get(&tag_indices) { + return idx; + } + let idx = self.map.len() + 1; + self.map.insert(tag_indices.clone(), idx); + self.flat.push(tag_indices.len() as i64); + let mut prev = 0usize; + for &ti in &tag_indices { + self.flat.push((ti as i64) - (prev as i64)); + prev = ti; + } + idx + } +} + +// ───────────────────────────────────────────────────────────────────────────── +// DogStatsDHttp +// ───────────────────────────────────────────────────────────────────────────── + +#[derive(Clone, Debug)] +#[allow(clippy::module_name_repetitions)] +/// Generator for DogStatsD-over-HTTP protobuf payloads. +/// +/// Only **Count** and **Gauge** metric types are encoded; all other types +/// (Timer, Histogram, Distribution, Set) are skipped silently. +pub struct DogStatsDHttp { + metric_generator: crate::dogstatsd::metric::MetricGenerator, +} + +impl DogStatsDHttp { + /// Create a new `DogStatsDHttp` instance from a [`crate::dogstatsd::Config`]. + /// + /// # Errors + /// + /// Returns an error if the configuration is invalid or internal string pool + /// creation fails. + pub fn new( + config: &crate::dogstatsd::Config, + rng: &mut R, + ) -> Result { + use crate::common::strings::{RandomStringPool, StringListPool, PoolKind, random_strings_with_length}; + use rand::distr::weighted::WeightedIndex; + use std::rc::Rc; + + let pool = RandomStringPool::with_size(rng, 8_000_000); + let small_strings = random_strings_with_length(&pool, 16..1024, 8, rng); + + let str_pool = Rc::new(pool.clone()); + let pool_kind = Rc::new(PoolKind::RandomStringPool(pool)); + + let tag_name_pool = if config.tag_names.is_empty() { + Rc::clone(&pool_kind) + } else { + Rc::new(PoolKind::StringListPool(StringListPool::new( + config.tag_names.clone(), + ))) + }; + + let tag_value_pool = if config.tag_values.is_empty() { + Rc::clone(&pool_kind) + } else { + Rc::new(PoolKind::StringListPool(StringListPool::new( + config.tag_values.clone(), + ))) + }; + + let name_pool = if config.metric_names.is_empty() { + Rc::clone(&pool_kind) + } else { + Rc::new(PoolKind::StringListPool(StringListPool::new( + config.metric_names.clone(), + ))) + }; + + let pools = crate::dogstatsd::StringPools { + tag_name: tag_name_pool, + tag_value: tag_value_pool, + name: name_pool, + randomstring: str_pool, + }; + + let num_contexts = config.contexts.sample(rng); + + let mut tags_generator = match crate::dogstatsd::common::tags::Generator::new( + rng.random(), + config.tags_per_msg, + config.tag_length, + num_contexts as usize, + Rc::clone(&pools.tag_value), + Rc::clone(&pools.tag_name), + config.unique_tag_ratio, + ) { + Ok(tg) => tg, + Err(e) => { + tracing::warn!("Encountered error while constructing tag generator: {e}"); + return Err(crate::Error::StringGenerate); + } + }; + + let metric_choices = [ + u16::from(config.metric_weights.count), + u16::from(config.metric_weights.gauge), + u16::from(config.metric_weights.timer), + u16::from(config.metric_weights.distribution), + u16::from(config.metric_weights.set), + u16::from(config.metric_weights.histogram), + ]; + + let metric_generator = crate::dogstatsd::metric::MetricGenerator::new( + num_contexts as usize, + config.name_length, + config.multivalue_count, + config.multivalue_pack_probability, + config.sampling_range, + config.sampling_probability, + &WeightedIndex::new(metric_choices)?, + small_strings, + &mut tags_generator, + &pools, + config.value, + rng, + )?; + + Ok(DogStatsDHttp { metric_generator }) + } +} + +impl Serialize for DogStatsDHttp { + #[expect(clippy::too_many_lines)] + #[expect(clippy::cast_possible_wrap)] + fn to_bytes(&mut self, mut rng: R, max_bytes: usize, writer: &mut W) -> Result<(), crate::Error> + where + W: Write, + R: Rng + Sized, + { + use prost::Message; + + let mut md = MetricData::default(); + let mut name_dict = StringDict::new(); + let mut tag_dict = StringDict::new(); + let mut tagset_dict = TagsetDict::new(); + let mut prev_name_ref: i64 = 0; + let mut prev_tagset_ref: i64 = 0; + + loop { + let member = self.metric_generator.generate(&mut rng)?; + + // Extract metric type and first value; skip unsupported types. + let (proto_metric_type, num_value) = match &member { + Metric::Count(count) => (1u64, count.values[0]), + Metric::Gauge(gauge) => (3u64, gauge.values[0]), + _ => continue, + }; + + // Name is already a resolved &str on the metric struct. + let name_str = match &member { + Metric::Count(count) => count.name, + Metric::Gauge(gauge) => gauge.name, + _ => unreachable!(), + }; + + // Checkpoint dict state so we can roll back if this metric doesn't fit. + let name_bytes_checkpoint = name_dict.bytes.len(); + let name_map_checkpoint = name_dict.map.len(); + let tag_bytes_checkpoint = tag_dict.bytes.len(); + let tag_map_checkpoint = tag_dict.map.len(); + let tagset_flat_checkpoint = tagset_dict.flat.len(); + let tagset_map_checkpoint = tagset_dict.map.len(); + + let name_idx = name_dict.get_or_insert(name_str) as i64; + + // Resolve tags from handle → string and look up in tag dict. + let tag_indices: Vec = { + let (tags, pools) = match &member { + Metric::Count(count) => (count.tags, count.pools), + Metric::Gauge(gauge) => (gauge.tags, gauge.pools), + _ => unreachable!(), + }; + tags.iter() + .map(|tag| { + let k = pools + .tag_name + .using_handle(tag.key) + .expect("invalid tag key handle"); + let v = pools + .tag_value + .using_handle(tag.value) + .expect("invalid tag value handle"); + tag_dict.get_or_insert(&format!("{k}:{v}")) + }) + .collect() + }; + + let tagset_idx = if tag_indices.is_empty() { + 0i64 // 0 = no tags + } else { + tagset_dict.get_or_insert(tag_indices) as i64 + }; + + // Determine value encoding. + let (value_type, value_sint64, value_f64): (u64, Option, Option) = + match num_value { + NumValue::Int(i) => (0x10, Some(i), None), + NumValue::Float(f) => (0x30, None, Some(f)), + }; + + // Append to MetricData arrays. + md.types.push(proto_metric_type | value_type); + md.name_refs.push(name_idx - prev_name_ref); + prev_name_ref = name_idx; + md.tagset_refs.push(tagset_idx - prev_tagset_ref); + prev_tagset_ref = tagset_idx; + md.num_points.push(1); + md.timestamps.push(0); // 0 = delta from previous (first = epoch) + if let Some(v) = value_sint64 { + md.vals_sint64.push(v); + } + if let Some(v) = value_f64 { + md.vals_float64.push(v); + } + + // Budget check: snapshot dict bytes into MetricData to get the real + // encoded size. This clone is intentional — profiling can determine + // if a cheaper heuristic is worthwhile. + let snapshot = MetricData { + dict_name_str: name_dict.bytes.clone(), + dict_tag_str: tag_dict.bytes.clone(), + dict_tagsets: tagset_dict.flat.clone(), + ..md.clone() + }; + let required = Payload { + metadata: None, + metric_data: Some(snapshot), + } + .encoded_len(); + + if required > max_bytes { + // Undo metric data array appends. + md.types.pop(); + md.name_refs.pop(); + md.tagset_refs.pop(); + md.num_points.pop(); + md.timestamps.pop(); + if value_sint64.is_some() { + md.vals_sint64.pop(); + } + if value_f64.is_some() { + md.vals_float64.pop(); + } + // Undo dict mutations: truncate bytes/flat and remove new map entries. + // New entries have indices > checkpoint lengths (indices are 1-based). + name_dict.bytes.truncate(name_bytes_checkpoint); + name_dict.map.retain(|_, &mut v| v <= name_map_checkpoint); + tag_dict.bytes.truncate(tag_bytes_checkpoint); + tag_dict.map.retain(|_, &mut v| v <= tag_map_checkpoint); + tagset_dict.flat.truncate(tagset_flat_checkpoint); + tagset_dict.map.retain(|_, &mut v| v <= tagset_map_checkpoint); + break; + } + } + + // Nothing fit within the budget. + if md.types.is_empty() { + return Ok(()); + } + + // Finalize: move dict bytes into MetricData. + md.dict_name_str = name_dict.bytes; + md.dict_tag_str = tag_dict.bytes; + md.dict_tagsets = tagset_dict.flat; + + let payload = Payload { + metadata: None, + metric_data: Some(md), + }; + let mut buf = Vec::with_capacity(payload.encoded_len()); + payload.encode(&mut buf)?; + writer.write_all(&buf)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::dogstatsd::Config; + use proptest::prelude::*; + use rand::{SeedableRng, rngs::SmallRng}; + + fn make_generator(seed: u64) -> DogStatsDHttp { + let mut rng = SmallRng::seed_from_u64(seed); + DogStatsDHttp::new(&Config::default(), &mut rng).expect("failed to create DogStatsDHttp") + } + + /// Decode the serialized bytes and verify basic structural invariants. + #[test] + fn roundtrip_decodes_and_has_metrics() { + use prost::Message; + + let mut dd = make_generator(42); + let mut rng = SmallRng::seed_from_u64(42); + let max_bytes = 65_536; + + let mut buf = Vec::with_capacity(max_bytes); + dd.to_bytes(&mut rng, max_bytes, &mut buf) + .expect("serialization failed"); + + assert!(!buf.is_empty(), "expected non-empty output"); + + let payload = Payload::decode(bytes::Bytes::from(buf)).expect("decode failed"); + let md = payload.metric_data.expect("metric_data should be present"); + + assert!(!md.types.is_empty(), "types array should not be empty"); + assert_eq!( + md.name_refs.len(), + md.types.len(), + "name_refs and types must have the same length" + ); + assert_eq!( + md.tagset_refs.len(), + md.types.len(), + "tagset_refs and types must have the same length" + ); + assert_eq!( + md.num_points.len(), + md.types.len(), + "num_points and types must have the same length" + ); + assert_eq!( + md.timestamps.len(), + md.types.len(), + "timestamps and types must have the same length" + ); + // Every value goes into exactly one of the two value arrays. + assert_eq!( + md.vals_sint64.len() + md.vals_float64.len(), + md.types.len(), + "total value count must equal number of metrics" + ); + } + + /// All `types` entries should be Count (1) or Gauge (3) combined with a + /// valid value type (0x10 or 0x30). + #[test] + fn metric_types_are_count_or_gauge() { + use prost::Message; + + let mut dd = make_generator(7); + let mut rng = SmallRng::seed_from_u64(7); + + let mut buf = Vec::new(); + dd.to_bytes(&mut rng, 32_768, &mut buf) + .expect("serialization failed"); + + if buf.is_empty() { + return; // nothing generated — budget too small, skip + } + + let payload = Payload::decode(bytes::Bytes::from(buf)).expect("decode failed"); + let md = payload.metric_data.unwrap(); + + for &t in &md.types { + let metric_type = t & 0x0F; + let value_type = t & 0xF0; + assert!( + metric_type == 1 || metric_type == 3, + "unexpected metric type {metric_type} in type word {t:#04x}" + ); + assert!( + value_type == 0x10 || value_type == 0x30, + "unexpected value type {value_type:#04x} in type word {t:#04x}" + ); + } + } + + /// An empty output is expected when `max_bytes` is too small to hold even a + /// single metric. + #[test] + fn tiny_budget_produces_empty_output() { + let mut dd = make_generator(1); + let mut rng = SmallRng::seed_from_u64(1); + let mut buf = Vec::new(); + // 1 byte is never enough for a full proto message. + dd.to_bytes(&mut rng, 1, &mut buf) + .expect("serialization failed"); + assert!(buf.is_empty()); + } + + proptest! { + /// The serialized output must never exceed `max_bytes`. + #[test] + fn payload_never_exceeds_max_bytes(seed: u64, max_bytes: u16) { + let max_bytes = max_bytes as usize; + let mut rng = SmallRng::seed_from_u64(seed); + let mut dd = DogStatsDHttp::new(&Config::default(), &mut rng) + .expect("failed to create generator"); + + let mut buf = Vec::with_capacity(max_bytes); + dd.to_bytes(rng, max_bytes, &mut buf) + .expect("serialization failed"); + + prop_assert!( + buf.len() <= max_bytes, + "output {} bytes exceeds max_bytes {}", + buf.len(), + max_bytes + ); + } + } + + proptest! { + /// Whatever is written must always be valid protobuf. + #[test] + fn output_is_always_valid_proto(seed: u64, max_bytes in 0usize..65_536) { + let mut rng = SmallRng::seed_from_u64(seed); + let mut dd = DogStatsDHttp::new(&Config::default(), &mut rng) + .expect("failed to create generator"); + + let mut buf = Vec::new(); + dd.to_bytes(rng, max_bytes, &mut buf) + .expect("serialization failed"); + + if buf.is_empty() { + return Ok(()); + } + + use prost::Message as _; + let result = Payload::decode(bytes::Bytes::from(buf)); + prop_assert!(result.is_ok(), "invalid proto: {:?}", result.err()); + } + } +} diff --git a/lading_payload/src/lib.rs b/lading_payload/src/lib.rs index 7da716c56..b2514ea10 100644 --- a/lading_payload/src/lib.rs +++ b/lading_payload/src/lib.rs @@ -20,6 +20,7 @@ pub use apache_common::ApacheCommon; pub use ascii::Ascii; pub use datadog_logs::DatadogLog; pub use dogstatsd::DogStatsD; +pub use dogstatsd_http::DogStatsDHttp; pub use fluent::Fluent; pub use json::Json; pub use opentelemetry::log::OpentelemetryLogs; @@ -36,6 +37,7 @@ pub mod ascii; pub mod common; pub mod datadog_logs; pub mod dogstatsd; +pub mod dogstatsd_http; pub mod fluent; pub mod json; pub mod opentelemetry; @@ -162,6 +164,9 @@ pub enum Config { /// Generates `DogStatsD` #[serde(rename = "dogstatsd")] DogStatsD(crate::dogstatsd::Config), + /// Generates DogStatsD-over-HTTP protobuf payloads + #[serde(rename = "dogstatsd_http")] + DogStatsDHttp(crate::dogstatsd::Config), /// Generates `TraceAgent` payloads in `MsgPack` format #[serde(rename = "trace_agent")] TraceAgent(crate::trace_agent::Config), @@ -203,6 +208,8 @@ pub enum Payload { OtelMetrics(OpentelemetryMetrics), /// `DogStatsD` metrics DogStatsdD(DogStatsD), + /// DogStatsD-over-HTTP protobuf + DogStatsDHttp(DogStatsDHttp), /// Datadog Trace Agent format TraceAgent(crate::trace_agent::v04::V04), /// JSON generated from a user-supplied template file @@ -229,6 +236,7 @@ impl Serialize for Payload { Payload::OtelLogs(ser) => ser.to_bytes(rng, max_bytes, writer), Payload::OtelMetrics(ser) => ser.to_bytes(rng, max_bytes, writer), Payload::DogStatsdD(ser) => ser.to_bytes(rng, max_bytes, writer), + Payload::DogStatsDHttp(ser) => ser.to_bytes(rng, max_bytes, writer), Payload::TraceAgent(ser) => ser.to_bytes(rng, max_bytes, writer), Payload::TemplatedJson(ser) => ser.to_bytes(rng, max_bytes, writer), }