Skip to content

feat: switch to RabbitMQ streams from Redis Pub/Sub #422

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
472 changes: 458 additions & 14 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions crates/bonfire/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition = "2021"
[dependencies]
# util
log = "*"
rand = "*"
sentry = "0.31.5"
lru = "0.7.6"
ulid = "0.5.0"
Expand Down Expand Up @@ -38,6 +39,7 @@ async-std = { version = "1.8.0", features = [
# core
authifier = { version = "1.0.15" }
revolt-result = { path = "../core/result" }
revolt-broker = { path = "../core/broker" }
revolt-models = { path = "../core/models" }
revolt-config = { path = "../core/config" }
revolt-database = { path = "../core/database" }
Expand All @@ -46,3 +48,6 @@ revolt-presence = { path = "../core/presence", features = ["redis-is-patched"] }

# redis
fred = { version = "8.0.1", features = ["subscriber-client"] }

# rabbit
lapin = { version = "3.0.0" }
129 changes: 129 additions & 0 deletions crates/bonfire/src/client/core.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use async_std::{net::TcpStream, sync::Mutex};
use async_tungstenite::WebSocketStream;
use futures::{join, SinkExt, StreamExt, TryStreamExt};
use revolt_config::report_internal_error;
use revolt_database::{
events::{client::EventV1, server::ClientMessage},
iso8601_timestamp::Timestamp,
Database, User, UserHint,
};
use revolt_presence::{create_session, delete_session};
use revolt_result::create_error;

use crate::{
client::{
subscriber::client_subscriber,
worker::{client_worker, WorkerRef},
},
config::ProtocolConfiguration,
events::state::State,
};

/// Core event loop of gateway clients
pub async fn client_core(
db: &'static Database,
ws: WebSocketStream<TcpStream>,
mut config: ProtocolConfiguration,
) {
// Split the socket for simultaneously read and write.
let (mut write, mut read) = ws.split();

// If the user has not provided authentication, request information.
if config.get_session_token().is_none() {
while let Ok(Some(message)) = read.try_next().await {
if let Ok(ClientMessage::Authenticate { token }) = config.decode(&message) {
config.set_session_token(token);
break;
}
}
}

// Try to authenticate the user.
let Some(token) = config.get_session_token().as_ref() else {
write
.send(config.encode(&EventV1::Error {
data: create_error!(InvalidSession),
}))
.await
.ok();
return;
};

let (user, session_id) = match User::from_token(db, token, UserHint::Any).await {
Ok(user) => user,
Err(err) => {
write
.send(config.encode(&EventV1::Error { data: err }))
.await
.ok();
return;
}
};

info!(
"Authenticated user {}#{}",
user.username, user.discriminator
);

db.update_session_last_seen(&session_id, Timestamp::now_utc())
.await
.ok();

// Create local state.
let mut state = State::from(user, session_id);
let user_id = state.cache.user_id.clone();

// Notify socket we have authenticated.
if report_internal_error!(write.send(config.encode(&EventV1::Authenticated)).await).is_err() {
return;
}

// Download required data to local cache and send Ready payload.
let ready_payload = match report_internal_error!(
state
.generate_ready_payload(db, config.get_ready_payload_fields())
.await
) {
Ok(ready_payload) => ready_payload,
Err(_) => return,
};

if report_internal_error!(write.send(config.encode(&ready_payload)).await).is_err() {
return;
}

// Create presence session.
let (first_session, session_id) = create_session(&user_id, 0).await;

// If this was the first session, notify other users that we just went online.
if first_session {
state.broadcast_presence_change(true).await;
}

{
let worker_ref = WorkerRef::from(&state);
let write = Mutex::new(write);
let (reload, reloaded) = async_channel::bounded(1);
let (cancel_1, cancelled_1) = async_channel::bounded(1);
let (cancel_2, cancelled_2) = async_channel::bounded(1);

join!(
async {
client_subscriber(&write, cancelled_1, reloaded, &config, db, &mut state).await;
cancel_2.send(()).await.ok();
},
async {
client_worker(read, &write, cancelled_2, reload, &config, worker_ref).await;
cancel_1.send(()).await.ok();
}
);
}

// Clean up presence session.
let last_session = delete_session(&user_id, session_id).await;

// If this was the last session, notify other users that we just went offline.
if last_session {
state.broadcast_presence_change(false).await;
}
}
3 changes: 3 additions & 0 deletions crates/bonfire/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod core;
pub mod subscriber;
pub mod worker;
112 changes: 112 additions & 0 deletions crates/bonfire/src/client/subscriber.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use async_channel::Receiver;
use async_std::{net::TcpStream, sync::Mutex};
use async_tungstenite::WebSocketStream;
use authifier::AuthifierEvent;
use futures::{pin_mut, select, stream::SplitSink, FutureExt, SinkExt};
use revolt_broker::event_stream;
use revolt_database::{events::client::EventV1, Database};
use sentry::Level;

use crate::{
config::ProtocolConfiguration,
events::state::{State, SubscriptionStateChange},
};

/// Event subscriber loop
pub async fn client_subscriber(
write: &Mutex<SplitSink<WebSocketStream<TcpStream>, async_tungstenite::tungstenite::Message>>,
cancelled: Receiver<()>,
reloaded: Receiver<()>,
protocol_config: &ProtocolConfiguration,
db: &'static Database,
state: &mut State,
) {
let mut consumer = event_stream::Consumer::new().await;
consumer.set_topics(state.subscribed.read().await.clone());

let mut cancel = false;

loop {
// Reload consumer if subscriptions change
if !matches!(state.apply_state().await, SubscriptionStateChange::None) {
consumer.set_topics(state.subscribed.read().await.clone());
}

// Read incoming events
loop {
let reloaded = reloaded.recv().fuse();
let cancelled = cancelled.recv().fuse();
let delivery = consumer.next().fuse();
pin_mut!(delivery, reloaded, cancelled);

select! {
_ = reloaded => {
break;
}
_ = cancelled => {
cancel = true;
break;
}
event = delivery => {
if let Some(mut event) = event {
// Handle the event
if let EventV1::Auth(auth) = &event {
if let AuthifierEvent::DeleteSession { session_id, .. } = auth {
if &state.session_id == session_id {
event = EventV1::Logout;
}
} else if let AuthifierEvent::DeleteAllSessions {
exclude_session_id, ..
} = auth
{
if let Some(excluded) = exclude_session_id {
if &state.session_id != excluded {
event = EventV1::Logout;
}
} else {
event = EventV1::Logout;
}
}
} else {
let should_send = state.handle_incoming_event_v1(db, &mut event).await;
if !should_send {
continue;
}
}

let result = write.lock().await.send(protocol_config.encode(&event)).await;
if let Err(e) = result {
use async_tungstenite::tungstenite::Error;
if !matches!(e, Error::AlreadyClosed | Error::ConnectionClosed) {
let err = format!("Error while sending an event: {e:?}");
warn!("{}", err);
sentry::capture_message(&err, Level::Warning);
}

cancel = true;
break;
}

if let EventV1::Logout = event {
info!("User {} received log out event!", state.user_id);
cancel = true;
break;
}

break;
} else {
cancel = true;
break;
}
}
}
}

// Break out if cancelled
if cancel {
break;
}
}

consumer.dispose_channel().await;
}
124 changes: 124 additions & 0 deletions crates/bonfire/src/client/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::{collections::HashSet, sync::Arc};

use async_channel::{Receiver, Sender};
use async_std::{
net::TcpStream,
sync::{Mutex, RwLock},
};
use async_tungstenite::WebSocketStream;
use futures::{
pin_mut, select,
stream::{SplitSink, SplitStream},
FutureExt, SinkExt, TryStreamExt,
};
use revolt_database::events::{client::EventV1, server::ClientMessage};
use sentry::Level;

use crate::{config::ProtocolConfiguration, events::state::State};

pub struct WorkerRef {
user_id: String,
active_servers: Arc<Mutex<lru_time_cache::LruCache<String, ()>>>,
subscribed: Arc<RwLock<HashSet<String>>>,
}

impl WorkerRef {
pub fn from(state: &State) -> WorkerRef {
WorkerRef {
user_id: state.user_id.clone(),
active_servers: state.active_servers.clone(),
subscribed: state.subscribed.clone(),
}
}
}

/// Incoming message handling
pub async fn client_worker(
mut read: SplitStream<WebSocketStream<TcpStream>>,
write: &Mutex<SplitSink<WebSocketStream<TcpStream>, async_tungstenite::tungstenite::Message>>,
cancelled: Receiver<()>,
reload: Sender<()>,
config: &ProtocolConfiguration,
state: WorkerRef,
) {
loop {
let read = read.try_next().fuse();
let cancelled = cancelled.recv().fuse();
pin_mut!(read, cancelled);

select! {
_ = cancelled => { return; },
msg = read => {
let msg = match msg {
Ok(Some(msg)) => msg,
Ok(None) => {
warn!("Received a None message!");
return;
}
Err(e) => {
use async_tungstenite::tungstenite::Error;
if !matches!(e, Error::AlreadyClosed | Error::ConnectionClosed | Error::Protocol(_)) {
let err = format!("Error while reading an event: {e:?}");
warn!("{}", err);
sentry::capture_message(&err, Level::Warning);
}

return;
}
};

let Ok(payload) = config.decode(&msg) else {
continue;
};

match payload {
ClientMessage::BeginTyping { channel } => {
if !state.subscribed.read().await.contains(&channel) {
continue;
}

EventV1::ChannelStartTyping {
id: channel.clone(),
user: state.user_id.clone(),
}
.p(channel.clone())
.await;
}
ClientMessage::EndTyping { channel } => {
if !state.subscribed.read().await.contains(&channel) {
continue;
}

EventV1::ChannelStopTyping {
id: channel.clone(),
user: state.user_id.clone(),
}
.p(channel.clone())
.await;
}
ClientMessage::Subscribe { server_id } => {
let mut servers = state.active_servers.lock().await;
let has_item = servers.contains_key(&server_id);
servers.insert(server_id, ());

if !has_item {
// Poke the listener to adjust subscriptions
reload.send(()).await.ok();
}
}
ClientMessage::Ping { data, responded } => {
if responded.is_none() {
write
.lock()
.await
.send(config.encode(&EventV1::Pong { data }))
.await
.ok();
}
}
_ => {}
}
}
}
}
}
Loading
Loading