Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/service/rooms/read_receipt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{collections::BTreeMap, sync::Arc};
use futures::{Stream, TryFutureExt, try_join};
use ruma::{
OwnedEventId, OwnedUserId, RoomId, UserId,
api::appservice::event::push_events::v1::EphemeralData,
events::{
AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent,
receipt::{ReceiptEvent, ReceiptEventContent, Receipts},
Expand All @@ -21,6 +22,7 @@ use tuwunel_core::{
};

use self::data::{Data, ReceiptItem};
use crate::sending::EduBuf;

pub struct Service {
services: Arc<crate::services::OnceServices>,
Expand Down Expand Up @@ -51,6 +53,19 @@ impl Service {
.readreceipt_update(user_id, room_id, event)
.await;

// update appservices
let edu = EphemeralData::Receipt(ReceiptEvent {
content: event.content.clone(),
room_id: room_id.to_owned(),
});
let mut buf = EduBuf::new();
serde_json::to_writer(&mut buf, &edu).expect("Serialized EphemeralData::Receipt");
let _: Result = self
.services
.sending
.send_edu_appservice_room(room_id, buf)
.await;

if self.services.globals.user_is_local(user_id) {
self.services
.sending
Expand Down
45 changes: 44 additions & 1 deletion src/service/rooms/typing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ use std::{collections::BTreeMap, sync::Arc};
use futures::StreamExt;
use ruma::{
OwnedRoomId, OwnedUserId, RoomId, UserId,
api::federation::transactions::edu::{Edu, TypingContent},
api::{
appservice::event::push_events::v1::EphemeralData,
federation::transactions::edu::{Edu, TypingContent},
},
events::{EphemeralRoomEvent, typing::TypingEventContent},
};
use tokio::sync::{RwLock, broadcast};
use tuwunel_core::{
Expand Down Expand Up @@ -65,6 +69,9 @@ impl Service {
trace!("receiver found what it was looking for and is no longer interested");
}

// update appservices
self.appservice_send(room_id).await?;

// update federation
if self.services.globals.user_is_local(user_id) {
self.federation_send(room_id, user_id, true)
Expand Down Expand Up @@ -100,6 +107,9 @@ impl Service {
trace!("receiver found what it was looking for and is no longer interested");
}

// update appservices
self.appservice_send(room_id).await?;

// update federation
if self.services.globals.user_is_local(user_id) {
self.federation_send(room_id, user_id, false)
Expand Down Expand Up @@ -160,6 +170,9 @@ impl Service {
trace!("receiver found what it was looking for and is no longer interested");
}

// update appservices
self.appservice_send(room_id).await?;

// update federation
for user in &removable {
if self.services.globals.user_is_local(user) {
Expand All @@ -183,6 +196,36 @@ impl Service {
.unwrap_or(0))
}

/// Returns the typing content with all typing users in the room.
async fn typings_content(&self, room_id: &RoomId) -> TypingEventContent {
let room_typing_indicators = self.typing.read().await.get(room_id).cloned();

let Some(typing_indicators) = room_typing_indicators else {
return TypingEventContent { user_ids: Vec::new() };
};

TypingEventContent {
user_ids: typing_indicators.into_keys().collect(),
}
}

/// Sends a typing EDU to all appservices interested in the room.
async fn appservice_send(&self, room_id: &RoomId) -> Result {
let content = self.typings_content(room_id).await;
let edu =
EphemeralData::Typing(EphemeralRoomEvent { content, room_id: room_id.to_owned() });

let mut buf = EduBuf::new();
serde_json::to_writer(&mut buf, &edu).expect("Serialized EphemeralData::Typing");

self.services
.sending
.send_edu_appservice_room(room_id, buf)
.await?;

Ok(())
}

/// Returns a new typing EDU.
pub async fn typing_users_for_user(
&self,
Expand Down
49 changes: 49 additions & 0 deletions src/service/sending/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,55 @@ impl Service {
self.send_edu_servers(servers, serialized).await
}

/// Queue an EDU for delivery to a specific appservice.
#[tracing::instrument(skip(self, serialized), level = "debug")]
pub fn send_edu_appservice(&self, appservice_id: String, serialized: EduBuf) -> Result {
let dest = Destination::Appservice(appservice_id);
let event = SendingEvent::Edu(serialized);
let _cork = self.db.db.cork();
let keys = self.db.queue_requests(once((&event, &dest)));
self.dispatch(Msg {
dest,
event,
queue_id: keys
.into_iter()
.next()
.expect("request queue key"),
})
}

/// Sends an EDU to all appservices interested in a room.
/// The `serialized` data must be in `EphemeralData` format, not federation
/// `Edu`.
#[tracing::instrument(skip(self, room_id, serialized), level = "debug")]
pub async fn send_edu_appservice_room(&self, room_id: &RoomId, serialized: EduBuf) -> Result {
for appservice in self.services.appservice.read().await.values() {
if !appservice.registration.receive_ephemeral {
continue;
}

let matching_aliases = self
.services
.alias
.local_aliases_for_room(room_id)
.ready_any(|room_alias| appservice.aliases.is_match(room_alias.as_str()))
.await;

if appservice.rooms.is_match(room_id.as_str())
|| matching_aliases
|| self
.services
.state_cache
.appservice_in_room(room_id, appservice)
.await
{
self.send_edu_appservice(appservice.registration.id.clone(), serialized.clone())?;
}
}

Ok(())
}

#[tracing::instrument(skip(self, servers, serialized), level = "debug")]
pub async fn send_edu_servers<'a, S>(&self, servers: S, serialized: EduBuf) -> Result
where
Expand Down
Loading