diff --git a/src/service/rooms/read_receipt/mod.rs b/src/service/rooms/read_receipt/mod.rs index b1fcff036..2a55d0d37 100644 --- a/src/service/rooms/read_receipt/mod.rs +++ b/src/service/rooms/read_receipt/mod.rs @@ -5,6 +5,7 @@ use std::{collections::BTreeMap, sync::Arc}; use futures::{Stream, TryFutureExt, try_join}; use ruma::{ OwnedEventId, OwnedUserId, RoomId, UserId, + api::appservice::event::push_events::v1::EphemeralData, events::{ AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent, receipt::{ReceiptEvent, ReceiptEventContent, Receipts}, @@ -21,6 +22,7 @@ use tuwunel_core::{ }; use self::data::{Data, ReceiptItem}; +use crate::sending::EduBuf; pub struct Service { services: Arc, @@ -51,6 +53,19 @@ impl Service { .readreceipt_update(user_id, room_id, event) .await; + // update appservices + let edu = EphemeralData::Receipt(ReceiptEvent { + content: event.content.clone(), + room_id: room_id.to_owned(), + }); + let mut buf = EduBuf::new(); + serde_json::to_writer(&mut buf, &edu).expect("Serialized EphemeralData::Receipt"); + let _: Result = self + .services + .sending + .send_edu_appservice_room(room_id, buf) + .await; + if self.services.globals.user_is_local(user_id) { self.services .sending diff --git a/src/service/rooms/typing/mod.rs b/src/service/rooms/typing/mod.rs index 74731082a..430fe59df 100644 --- a/src/service/rooms/typing/mod.rs +++ b/src/service/rooms/typing/mod.rs @@ -3,7 +3,11 @@ use std::{collections::BTreeMap, sync::Arc}; use futures::StreamExt; use ruma::{ OwnedRoomId, OwnedUserId, RoomId, UserId, - api::federation::transactions::edu::{Edu, TypingContent}, + api::{ + appservice::event::push_events::v1::EphemeralData, + federation::transactions::edu::{Edu, TypingContent}, + }, + events::{EphemeralRoomEvent, typing::TypingEventContent}, }; use tokio::sync::{RwLock, broadcast}; use tuwunel_core::{ @@ -65,6 +69,9 @@ impl Service { trace!("receiver found what it was looking for and is no longer interested"); } + // update appservices + self.appservice_send(room_id).await?; + // update federation if self.services.globals.user_is_local(user_id) { self.federation_send(room_id, user_id, true) @@ -100,6 +107,9 @@ impl Service { trace!("receiver found what it was looking for and is no longer interested"); } + // update appservices + self.appservice_send(room_id).await?; + // update federation if self.services.globals.user_is_local(user_id) { self.federation_send(room_id, user_id, false) @@ -160,6 +170,9 @@ impl Service { trace!("receiver found what it was looking for and is no longer interested"); } + // update appservices + self.appservice_send(room_id).await?; + // update federation for user in &removable { if self.services.globals.user_is_local(user) { @@ -183,6 +196,36 @@ impl Service { .unwrap_or(0)) } + /// Returns the typing content with all typing users in the room. + async fn typings_content(&self, room_id: &RoomId) -> TypingEventContent { + let room_typing_indicators = self.typing.read().await.get(room_id).cloned(); + + let Some(typing_indicators) = room_typing_indicators else { + return TypingEventContent { user_ids: Vec::new() }; + }; + + TypingEventContent { + user_ids: typing_indicators.into_keys().collect(), + } + } + + /// Sends a typing EDU to all appservices interested in the room. + async fn appservice_send(&self, room_id: &RoomId) -> Result { + let content = self.typings_content(room_id).await; + let edu = + EphemeralData::Typing(EphemeralRoomEvent { content, room_id: room_id.to_owned() }); + + let mut buf = EduBuf::new(); + serde_json::to_writer(&mut buf, &edu).expect("Serialized EphemeralData::Typing"); + + self.services + .sending + .send_edu_appservice_room(room_id, buf) + .await?; + + Ok(()) + } + /// Returns a new typing EDU. pub async fn typing_users_for_user( &self, diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index cd1aeeacc..91c74dfb7 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -210,6 +210,55 @@ impl Service { self.send_edu_servers(servers, serialized).await } + /// Queue an EDU for delivery to a specific appservice. + #[tracing::instrument(skip(self, serialized), level = "debug")] + pub fn send_edu_appservice(&self, appservice_id: String, serialized: EduBuf) -> Result { + let dest = Destination::Appservice(appservice_id); + let event = SendingEvent::Edu(serialized); + let _cork = self.db.db.cork(); + let keys = self.db.queue_requests(once((&event, &dest))); + self.dispatch(Msg { + dest, + event, + queue_id: keys + .into_iter() + .next() + .expect("request queue key"), + }) + } + + /// Sends an EDU to all appservices interested in a room. + /// The `serialized` data must be in `EphemeralData` format, not federation + /// `Edu`. + #[tracing::instrument(skip(self, room_id, serialized), level = "debug")] + pub async fn send_edu_appservice_room(&self, room_id: &RoomId, serialized: EduBuf) -> Result { + for appservice in self.services.appservice.read().await.values() { + if !appservice.registration.receive_ephemeral { + continue; + } + + let matching_aliases = self + .services + .alias + .local_aliases_for_room(room_id) + .ready_any(|room_alias| appservice.aliases.is_match(room_alias.as_str())) + .await; + + if appservice.rooms.is_match(room_id.as_str()) + || matching_aliases + || self + .services + .state_cache + .appservice_in_room(room_id, appservice) + .await + { + self.send_edu_appservice(appservice.registration.id.clone(), serialized.clone())?; + } + } + + Ok(()) + } + #[tracing::instrument(skip(self, servers, serialized), level = "debug")] pub async fn send_edu_servers<'a, S>(&self, servers: S, serialized: EduBuf) -> Result where