Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{App, UserId};
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use futures::{future::select, pin_mut, SinkExt, StreamExt};
use rand::{Rng, SeedableRng};
use rand::{thread_rng, Rng, SeedableRng};
use std::net::IpAddr;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
Expand Down Expand Up @@ -131,11 +131,15 @@ pub async fn handle_user_socket(
let expect_pong = &expect_pong;

let transmit = async {
// Use faster random generator for generating ping messages, they dont need to be
// cryptographically secure. It is also OK to use same sequence for every connection.
let mut rng = rand::rngs::SmallRng::seed_from_u64(0);
// Use faster random generator for generating ping messages and time smearthey dont need to be
// cryptographically secure.
let mut rng =
rand::rngs::SmallRng::from_rng(thread_rng()).expect("Failed to initialize rng");

let mut send_queue = SendQueue::default();
// for each connection we randomize the max debounce time to remove the chance that many connections
// get messages at the same time and cause load peaks
let debounce_factor = rng.gen_range(0.5..1.5);
let mut send_queue = SendQueue::new(opts.max_debounce_time, debounce_factor);

let mut reset = app.reset_rx();

Expand All @@ -162,7 +166,7 @@ pub async fn handle_user_socket(
break 'tx_loop;
}

for msg in send_queue.drain(now, METRICS.active_connection_count() + 50000, opts.max_debounce_time) {
for msg in send_queue.drain(now, METRICS.active_connection_count()) {
last_send = now;
METRICS.add_message(msg.message_type());
log::debug!(target: "notify_push::send", "Sending debounced {msg} to {user_id}");
Expand Down
69 changes: 44 additions & 25 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,21 @@ impl PushMessage {
}
}

pub fn debounce_time(&self, connection_count: usize, max_debounce_time: usize) -> Duration {
pub fn debounce_time(
&self,
connection_count: usize,
max_debounce_time: usize,
debounce_factor: f32,
) -> Duration {
// scale the debounce time between 1s and 15s based on the number of active connections
// this provide a decent balance between performance and load
let time = max(1, min(connection_count / 10, max_debounce_time));
// this provides a decent balance between performance and load.
// Additionally, each connection will have a random debounce_factor between 0.5 and 1.5
// to spread out the load of notifications.
let time = max(1, min(connection_count / 10, max_debounce_time)) as f32;
let time = time * debounce_factor;
match self {
PushMessage::File(_) => Duration::from_secs(time as u64),
PushMessage::Activity => Duration::from_secs(time as u64),
PushMessage::File(_) => Duration::from_secs_f32(time),
PushMessage::Activity => Duration::from_secs_f32(time),
PushMessage::Notification => Duration::from_secs(1),
PushMessage::Custom(..) => Duration::from_millis(1), // no debouncing for custom messages
}
Expand Down Expand Up @@ -132,14 +140,23 @@ impl Default for SendQueueItem {
}
}

#[derive(Default, Debug)]
/// Queue for sending outgoing messages to a user for debounce
///
/// The server maintains once queue per connection
#[derive(Debug)]
pub struct SendQueue {
max_debounce_time: usize,
debounce_factor: f32,
items: [SendQueueItem; 3],
}

impl SendQueue {
pub fn new() -> Self {
SendQueue::default()
pub fn new(max_debounce_time: usize, debounce_factor: f32) -> Self {
SendQueue {
max_debounce_time,
debounce_factor,
items: Default::default(),
}
}

fn item_mut(&mut self, message: &PushMessage) -> Option<&mut SendQueueItem> {
Expand Down Expand Up @@ -177,13 +194,15 @@ impl SendQueue {
&mut self,
now: Instant,
connection_count: usize,
max_debounce_time: usize,
) -> impl Iterator<Item = PushMessage> + '_ {
let max_debounce_time = self.max_debounce_time;
let debounce_factor = self.debounce_factor;
self.items.iter_mut().filter_map(move |item| {
let debounce_time = item
.message
.as_ref()?
.debounce_time(connection_count, max_debounce_time);
let debounce_time = item.message.as_ref()?.debounce_time(
connection_count,
max_debounce_time,
debounce_factor,
);
if now.duration_since(item.sent) > debounce_time {
if now.duration_since(item.received) > Duration::from_millis(100) {
item.sent = now;
Expand All @@ -201,7 +220,7 @@ impl SendQueue {
#[test]
fn test_send_queue_100() {
let base_time = Instant::now();
let mut queue = SendQueue::new();
let mut queue = SendQueue::new(15, 1.0);
queue.push(PushMessage::Activity, base_time);
queue.push(
PushMessage::File(UpdatedFiles::Known(vec![1].into())),
Expand All @@ -216,7 +235,7 @@ fn test_send_queue_100() {
assert_eq!(
Vec::<PushMessage>::new(),
queue
.drain(base_time + Duration::from_millis(20), 100, 15)
.drain(base_time + Duration::from_millis(20), 100)
.collect::<Vec<_>>()
);

Expand All @@ -227,7 +246,7 @@ fn test_send_queue_100() {
PushMessage::Activity
],
queue
.drain(base_time + Duration::from_millis(200), 100, 15)
.drain(base_time + Duration::from_millis(200), 100)
.collect::<Vec<_>>()
);

Expand All @@ -243,31 +262,31 @@ fn test_send_queue_100() {
assert_eq!(
Vec::<PushMessage>::new(),
queue
.drain(base_time + Duration::from_secs(10), 100, 15)
.drain(base_time + Duration::from_secs(10), 100)
.collect::<Vec<_>>()
);

// after debounce time we get the merged messages from the timeframe
assert_eq!(
vec![PushMessage::File(UpdatedFiles::Known(vec![3, 4].into()))],
queue
.drain(base_time + Duration::from_secs(70), 100, 15)
.drain(base_time + Duration::from_secs(70), 100)
.collect::<Vec<_>>()
);

// nothing left
assert_eq!(
Vec::<PushMessage>::new(),
queue
.drain(base_time + Duration::from_secs(300), 100, 15)
.drain(base_time + Duration::from_secs(300), 100)
.collect::<Vec<_>>()
);
}

#[test]
fn test_send_queue_1() {
let base_time = Instant::now();
let mut queue = SendQueue::new();
let mut queue = SendQueue::new(15, 1.0);
queue.push(PushMessage::Activity, base_time);
queue.push(
PushMessage::File(UpdatedFiles::Known(vec![1].into())),
Expand All @@ -282,7 +301,7 @@ fn test_send_queue_1() {
assert_eq!(
Vec::<PushMessage>::new(),
queue
.drain(base_time + Duration::from_millis(20), 1, 15)
.drain(base_time + Duration::from_millis(20), 1)
.collect::<Vec<_>>()
);

Expand All @@ -293,7 +312,7 @@ fn test_send_queue_1() {
PushMessage::Activity
],
queue
.drain(base_time + Duration::from_millis(200), 1, 15)
.drain(base_time + Duration::from_millis(200), 1)
.collect::<Vec<_>>()
);

Expand All @@ -309,23 +328,23 @@ fn test_send_queue_1() {
assert_eq!(
Vec::<PushMessage>::new(),
queue
.drain(base_time + Duration::from_secs(1), 1, 15)
.drain(base_time + Duration::from_secs(1), 1)
.collect::<Vec<_>>()
);

// after debounce time we get the merged messages from the timeframe
assert_eq!(
vec![PushMessage::File(UpdatedFiles::Known(vec![3, 4].into()))],
queue
.drain(base_time + Duration::from_secs(3), 1, 15)
.drain(base_time + Duration::from_secs(3), 1)
.collect::<Vec<_>>()
);

// nothing left
assert_eq!(
Vec::<PushMessage>::new(),
queue
.drain(base_time + Duration::from_secs(5), 1, 15)
.drain(base_time + Duration::from_secs(5), 1)
.collect::<Vec<_>>()
);
}