From 278f92f60d4a9fdb11cc3fa11349df5cc9cef59c Mon Sep 17 00:00:00 2001 From: Robin Appelman Date: Tue, 5 Aug 2025 19:57:28 +0200 Subject: [PATCH 1/2] store max-debounce-time in sendqueue Signed-off-by: Robin Appelman --- src/connection.rs | 4 ++-- src/message.rs | 39 +++++++++++++++++++++++---------------- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 988b3514..6a1a08ff 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -135,7 +135,7 @@ pub async fn handle_user_socket( // cryptographically secure. It is also OK to use same sequence for every connection. let mut rng = rand::rngs::SmallRng::seed_from_u64(0); - let mut send_queue = SendQueue::default(); + let mut send_queue = SendQueue::new(opts.max_debounce_time); let mut reset = app.reset_rx(); @@ -162,7 +162,7 @@ pub async fn handle_user_socket( break 'tx_loop; } - for msg in send_queue.drain(now, METRICS.active_connection_count() + 50000, opts.max_debounce_time) { + for msg in send_queue.drain(now, METRICS.active_connection_count()) { last_send = now; METRICS.add_message(msg.message_type()); log::debug!(target: "notify_push::send", "Sending debounced {msg} to {user_id}"); diff --git a/src/message.rs b/src/message.rs index 25cbd911..cf757498 100644 --- a/src/message.rs +++ b/src/message.rs @@ -132,14 +132,21 @@ impl Default for SendQueueItem { } } -#[derive(Default, Debug)] +/// Queue for sending outgoing messages to a user for debounce +/// +/// The server maintains once queue per connection +#[derive(Debug)] pub struct SendQueue { + max_debounce_time: usize, items: [SendQueueItem; 3], } impl SendQueue { - pub fn new() -> Self { - SendQueue::default() + pub fn new(max_debounce_time: usize) -> Self { + SendQueue { + max_debounce_time, + items: Default::default(), + } } fn item_mut(&mut self, message: &PushMessage) -> Option<&mut SendQueueItem> { @@ -177,8 +184,8 @@ impl SendQueue { &mut self, now: Instant, connection_count: usize, - max_debounce_time: usize, ) -> impl Iterator + '_ { + let max_debounce_time = self.max_debounce_time; self.items.iter_mut().filter_map(move |item| { let debounce_time = item .message @@ -201,7 +208,7 @@ impl SendQueue { #[test] fn test_send_queue_100() { let base_time = Instant::now(); - let mut queue = SendQueue::new(); + let mut queue = SendQueue::new(15); queue.push(PushMessage::Activity, base_time); queue.push( PushMessage::File(UpdatedFiles::Known(vec![1].into())), @@ -216,7 +223,7 @@ fn test_send_queue_100() { assert_eq!( Vec::::new(), queue - .drain(base_time + Duration::from_millis(20), 100, 15) + .drain(base_time + Duration::from_millis(20), 100) .collect::>() ); @@ -227,7 +234,7 @@ fn test_send_queue_100() { PushMessage::Activity ], queue - .drain(base_time + Duration::from_millis(200), 100, 15) + .drain(base_time + Duration::from_millis(200), 100) .collect::>() ); @@ -243,7 +250,7 @@ fn test_send_queue_100() { assert_eq!( Vec::::new(), queue - .drain(base_time + Duration::from_secs(10), 100, 15) + .drain(base_time + Duration::from_secs(10), 100) .collect::>() ); @@ -251,7 +258,7 @@ fn test_send_queue_100() { assert_eq!( vec![PushMessage::File(UpdatedFiles::Known(vec![3, 4].into()))], queue - .drain(base_time + Duration::from_secs(70), 100, 15) + .drain(base_time + Duration::from_secs(70), 100) .collect::>() ); @@ -259,7 +266,7 @@ fn test_send_queue_100() { assert_eq!( Vec::::new(), queue - .drain(base_time + Duration::from_secs(300), 100, 15) + .drain(base_time + Duration::from_secs(300), 100) .collect::>() ); } @@ -267,7 +274,7 @@ fn test_send_queue_100() { #[test] fn test_send_queue_1() { let base_time = Instant::now(); - let mut queue = SendQueue::new(); + let mut queue = SendQueue::new(15); queue.push(PushMessage::Activity, base_time); queue.push( PushMessage::File(UpdatedFiles::Known(vec![1].into())), @@ -282,7 +289,7 @@ fn test_send_queue_1() { assert_eq!( Vec::::new(), queue - .drain(base_time + Duration::from_millis(20), 1, 15) + .drain(base_time + Duration::from_millis(20), 1) .collect::>() ); @@ -293,7 +300,7 @@ fn test_send_queue_1() { PushMessage::Activity ], queue - .drain(base_time + Duration::from_millis(200), 1, 15) + .drain(base_time + Duration::from_millis(200), 1) .collect::>() ); @@ -309,7 +316,7 @@ fn test_send_queue_1() { assert_eq!( Vec::::new(), queue - .drain(base_time + Duration::from_secs(1), 1, 15) + .drain(base_time + Duration::from_secs(1), 1) .collect::>() ); @@ -317,7 +324,7 @@ fn test_send_queue_1() { assert_eq!( vec![PushMessage::File(UpdatedFiles::Known(vec![3, 4].into()))], queue - .drain(base_time + Duration::from_secs(3), 1, 15) + .drain(base_time + Duration::from_secs(3), 1) .collect::>() ); @@ -325,7 +332,7 @@ fn test_send_queue_1() { assert_eq!( Vec::::new(), queue - .drain(base_time + Duration::from_secs(5), 1, 15) + .drain(base_time + Duration::from_secs(5), 1) .collect::>() ); } From 9fe46cb49bb692ab93bba1af23eddcb1a03828d3 Mon Sep 17 00:00:00 2001 From: Robin Appelman Date: Tue, 5 Aug 2025 20:16:03 +0200 Subject: [PATCH 2/2] spread out debouce times per-connection Signed-off-by: Robin Appelman --- src/connection.rs | 14 +++++++++----- src/message.rs | 36 ++++++++++++++++++++++++------------ 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 6a1a08ff..e4cf55ae 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -12,7 +12,7 @@ use crate::{App, UserId}; use dashmap::mapref::entry::Entry; use dashmap::DashMap; use futures::{future::select, pin_mut, SinkExt, StreamExt}; -use rand::{Rng, SeedableRng}; +use rand::{thread_rng, Rng, SeedableRng}; use std::net::IpAddr; use std::num::NonZeroUsize; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; @@ -131,11 +131,15 @@ pub async fn handle_user_socket( let expect_pong = &expect_pong; let transmit = async { - // Use faster random generator for generating ping messages, they dont need to be - // cryptographically secure. It is also OK to use same sequence for every connection. - let mut rng = rand::rngs::SmallRng::seed_from_u64(0); + // Use faster random generator for generating ping messages and time smearthey dont need to be + // cryptographically secure. + let mut rng = + rand::rngs::SmallRng::from_rng(thread_rng()).expect("Failed to initialize rng"); - let mut send_queue = SendQueue::new(opts.max_debounce_time); + // for each connection we randomize the max debounce time to remove the chance that many connections + // get messages at the same time and cause load peaks + let debounce_factor = rng.gen_range(0.5..1.5); + let mut send_queue = SendQueue::new(opts.max_debounce_time, debounce_factor); let mut reset = app.reset_rx(); diff --git a/src/message.rs b/src/message.rs index cf757498..8aefb065 100644 --- a/src/message.rs +++ b/src/message.rs @@ -59,13 +59,21 @@ impl PushMessage { } } - pub fn debounce_time(&self, connection_count: usize, max_debounce_time: usize) -> Duration { + pub fn debounce_time( + &self, + connection_count: usize, + max_debounce_time: usize, + debounce_factor: f32, + ) -> Duration { // scale the debounce time between 1s and 15s based on the number of active connections - // this provide a decent balance between performance and load - let time = max(1, min(connection_count / 10, max_debounce_time)); + // this provides a decent balance between performance and load. + // Additionally, each connection will have a random debounce_factor between 0.5 and 1.5 + // to spread out the load of notifications. + let time = max(1, min(connection_count / 10, max_debounce_time)) as f32; + let time = time * debounce_factor; match self { - PushMessage::File(_) => Duration::from_secs(time as u64), - PushMessage::Activity => Duration::from_secs(time as u64), + PushMessage::File(_) => Duration::from_secs_f32(time), + PushMessage::Activity => Duration::from_secs_f32(time), PushMessage::Notification => Duration::from_secs(1), PushMessage::Custom(..) => Duration::from_millis(1), // no debouncing for custom messages } @@ -138,13 +146,15 @@ impl Default for SendQueueItem { #[derive(Debug)] pub struct SendQueue { max_debounce_time: usize, + debounce_factor: f32, items: [SendQueueItem; 3], } impl SendQueue { - pub fn new(max_debounce_time: usize) -> Self { + pub fn new(max_debounce_time: usize, debounce_factor: f32) -> Self { SendQueue { max_debounce_time, + debounce_factor, items: Default::default(), } } @@ -186,11 +196,13 @@ impl SendQueue { connection_count: usize, ) -> impl Iterator + '_ { let max_debounce_time = self.max_debounce_time; + let debounce_factor = self.debounce_factor; self.items.iter_mut().filter_map(move |item| { - let debounce_time = item - .message - .as_ref()? - .debounce_time(connection_count, max_debounce_time); + let debounce_time = item.message.as_ref()?.debounce_time( + connection_count, + max_debounce_time, + debounce_factor, + ); if now.duration_since(item.sent) > debounce_time { if now.duration_since(item.received) > Duration::from_millis(100) { item.sent = now; @@ -208,7 +220,7 @@ impl SendQueue { #[test] fn test_send_queue_100() { let base_time = Instant::now(); - let mut queue = SendQueue::new(15); + let mut queue = SendQueue::new(15, 1.0); queue.push(PushMessage::Activity, base_time); queue.push( PushMessage::File(UpdatedFiles::Known(vec![1].into())), @@ -274,7 +286,7 @@ fn test_send_queue_100() { #[test] fn test_send_queue_1() { let base_time = Instant::now(); - let mut queue = SendQueue::new(15); + let mut queue = SendQueue::new(15, 1.0); queue.push(PushMessage::Activity, base_time); queue.push( PushMessage::File(UpdatedFiles::Known(vec![1].into())),