diff --git a/Cargo.lock b/Cargo.lock index b065f784..6ce653d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1388,6 +1388,7 @@ dependencies = [ "console-subscriber", "moqt-core", "once_cell", + "thiserror 1.0.69", "tokio", "tracing", "tracing-appender", diff --git a/moqt-client-wasm/src/lib.rs b/moqt-client-wasm/src/lib.rs index 547570d1..d6b190f2 100644 --- a/moqt-client-wasm/src/lib.rs +++ b/moqt-client-wasm/src/lib.rs @@ -1740,11 +1740,11 @@ impl SubscriptionNode { } fn unregister_publisher_namespace(&mut self, namespace: Vec) -> bool { - if let Some(producer) = &mut self.producer { - if producer.has_namespace(namespace.clone()) { - let _ = producer.delete_namespace(namespace); - return true; - } + if let Some(producer) = &mut self.producer + && producer.has_namespace(namespace.clone()) + { + let _ = producer.delete_namespace(namespace); + return true; } false } @@ -1758,15 +1758,14 @@ impl SubscriptionNode { } fn unregister_subscriber_namespace_prefix(&mut self, namespace_prefix: Vec) -> bool { - if let Some(consumer) = &mut self.consumer { - if consumer + if let Some(consumer) = &mut self.consumer + && consumer .get_namespace_prefixes() .map(|prefixes| prefixes.contains(&namespace_prefix)) .unwrap_or(false) - { - let _ = consumer.delete_namespace_prefix(namespace_prefix); - return true; - } + { + let _ = consumer.delete_namespace_prefix(namespace_prefix); + return true; } false } diff --git a/moqt-core/src/modules/messages/data_streams/extension_header.rs b/moqt-core/src/modules/messages/data_streams/extension_header.rs index c570db42..54307132 100644 --- a/moqt-core/src/modules/messages/data_streams/extension_header.rs +++ b/moqt-core/src/modules/messages/data_streams/extension_header.rs @@ -21,11 +21,12 @@ pub enum ExtensionHeaderValue { impl ExtensionHeader { pub fn new(header_type: u64, value: ExtensionHeaderValue) -> Result { - if header_type % 2 == 0 && matches!(value, ExtensionHeaderValue::OddTypeValue(_)) { + if header_type.is_multiple_of(2) && matches!(value, ExtensionHeaderValue::OddTypeValue(_)) { bail!("Mismatched value type: expected even, but got odd"); } - if header_type % 2 != 0 && matches!(value, ExtensionHeaderValue::EvenTypeValue(_)) { + if !header_type.is_multiple_of(2) && matches!(value, ExtensionHeaderValue::EvenTypeValue(_)) + { bail!("Mismatched value type: expected odd, but got even"); } diff --git a/moqt-core/src/modules/messages/data_streams/subgroup_stream.rs b/moqt-core/src/modules/messages/data_streams/subgroup_stream.rs index 370f8744..9e2b5c99 100644 --- a/moqt-core/src/modules/messages/data_streams/subgroup_stream.rs +++ b/moqt-core/src/modules/messages/data_streams/subgroup_stream.rs @@ -107,11 +107,12 @@ impl Object { } // Any object with a status code other than zero MUST have an empty payload. - if let Some(status) = object_status { - if status != ObjectStatus::Normal && object_payload_length != 0 { - // TODO: return Termination Error Code - bail!("Any object with a status code other than zero MUST have an empty payload."); - } + if let Some(status) = object_status + && status != ObjectStatus::Normal + && object_payload_length != 0 + { + // TODO: return Termination Error Code + bail!("Any object with a status code other than zero MUST have an empty payload."); } // length of total byte of extension headers diff --git a/moqt-ingest-gateway/src/rtmp/session.rs b/moqt-ingest-gateway/src/rtmp/session.rs index c5dfda80..d023d323 100644 --- a/moqt-ingest-gateway/src/rtmp/session.rs +++ b/moqt-ingest-gateway/src/rtmp/session.rs @@ -108,16 +108,16 @@ pub async fn handle_event( namespace_path.split('/').map(|s| s.to_string()).collect(); let track_name = "audio".to_string(); state.counters.audio += 1; - if state.counters.audio == 1 || state.counters.audio % 1000 == 0 { + if state.counters.audio == 1 || state.counters.audio.is_multiple_of(1000) { println!( "[rtmp {label}] audio packets={} app={app_name} track={track_name}", state.counters.audio ); } - if let Some(recorder) = state.recorder.as_mut() { - if let Err(err) = recorder.write_audio(timestamp.value, data.as_ref()).await { - eprintln!("[rtmp {label}] write_audio failed: {err:?}"); - } + if let Some(recorder) = state.recorder.as_mut() + && let Err(err) = recorder.write_audio(timestamp.value, data.as_ref()).await + { + eprintln!("[rtmp {label}] write_audio failed: {err:?}"); } if let Some(moqt) = state.moqt.as_ref() { let key = (namespace_path.clone(), track_name.clone()); @@ -162,16 +162,16 @@ pub async fn handle_event( namespace_path.split('/').map(|s| s.to_string()).collect(); let track_name = "video".to_string(); state.counters.video += 1; - if state.counters.video == 1 || state.counters.video % 1000 == 0 { + if state.counters.video == 1 || state.counters.video.is_multiple_of(1000) { println!( "[rtmp {label}] video packets={} app={app_name} track={track_name}", state.counters.video ); } - if let Some(recorder) = state.recorder.as_mut() { - if let Err(err) = recorder.write_video(timestamp.value, data.as_ref()).await { - eprintln!("[rtmp {label}] write_video failed: {err:?}"); - } + if let Some(recorder) = state.recorder.as_mut() + && let Err(err) = recorder.write_video(timestamp.value, data.as_ref()).await + { + eprintln!("[rtmp {label}] write_video failed: {err:?}"); } if let Some(moqt) = state.moqt.as_ref() { let namespace = namespace_vec.as_slice(); diff --git a/moqt-ingest-gateway/src/srt.rs b/moqt-ingest-gateway/src/srt.rs index 1363c23d..ae56e2ec 100644 --- a/moqt-ingest-gateway/src/srt.rs +++ b/moqt-ingest-gateway/src/srt.rs @@ -33,7 +33,7 @@ async fn handle_request(request: ConnectionRequest) { let (_, data) = packet?; count += 1; - if count == 1 || count % 200 == 0 { + if count == 1 || count.is_multiple_of(200) { println!( "[srt {remote}] packets={count} stream_id={stream_id} last_size={}", data.len() diff --git a/moqt-server/Cargo.toml b/moqt-server/Cargo.toml index f1464695..21ae6809 100644 --- a/moqt-server/Cargo.toml +++ b/moqt-server/Cargo.toml @@ -18,3 +18,4 @@ bytes = "1" async-trait = "0.1.74" ttl_cache = "0.5.1" tracing-appender = "0.2.3" +thiserror = "1.0" diff --git a/moqt-server/src/modules/object_cache_storage/storage.rs b/moqt-server/src/modules/object_cache_storage/storage.rs index e9656ae0..4ad12ce4 100644 --- a/moqt-server/src/modules/object_cache_storage/storage.rs +++ b/moqt-server/src/modules/object_cache_storage/storage.rs @@ -1,10 +1,26 @@ use super::commands::ObjectCacheStorageCommand; -use crate::modules::object_cache_storage::cache::{ - Cache, CacheKey, datagram::DatagramCache, subgroup_stream::SubgroupStreamsCache, -}; +use crate::modules::object_cache_storage::cache::Cache; +use crate::modules::object_cache_storage::cache::CacheKey; +use crate::modules::object_cache_storage::cache::datagram::DatagramCache; +use crate::modules::object_cache_storage::cache::subgroup_stream::SubgroupStreamsCache; use std::{collections::HashMap, time::Duration}; +use thiserror::Error; use tokio::sync::mpsc; +#[derive(Debug, Error)] +pub(crate) enum ObjectCacheError { + #[error("subgroup stream not found")] + SubgroupStreamNotFound, + #[error("datagram not found")] + DatagramNotFound, + #[error("cache not found")] + NotFound, + // 将来の未知エラーを受けるためのプレースホルダ + #[allow(dead_code)] + #[error("unknown error: {0}")] + Unknown(String), +} + pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver) { tracing::trace!("object_cache_storage start"); @@ -79,7 +95,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver subgroup_stream_cache, _ => { - resp.send(Err(anyhow::anyhow!("subgroup stream cache not found"))) + resp.send(Err(ObjectCacheError::SubgroupStreamNotFound.into())) .unwrap(); continue; } @@ -98,7 +114,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver datagram_cache, _ => { - resp.send(Err(anyhow::anyhow!("datagram cache not found"))) + resp.send(Err(ObjectCacheError::DatagramNotFound.into())) .unwrap(); continue; } @@ -119,7 +135,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver subgroup_stream_cache, _ => { - resp.send(Err(anyhow::anyhow!("subgroup stream cache not found"))) + resp.send(Err(ObjectCacheError::SubgroupStreamNotFound.into())) .unwrap(); continue; } @@ -143,7 +159,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver datagram_cache, _ => { - resp.send(Err(anyhow::anyhow!("datagram cache not found"))) + resp.send(Err(ObjectCacheError::DatagramNotFound.into())) .unwrap(); continue; } @@ -163,7 +179,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver subgroup_stream_cache, _ => { - resp.send(Err(anyhow::anyhow!("subgroup stream cache not found"))) + resp.send(Err(ObjectCacheError::SubgroupStreamNotFound.into())) .unwrap(); continue; } @@ -185,7 +201,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver datagram_cache, _ => { - resp.send(Err(anyhow::anyhow!("datagram cache not found"))) + resp.send(Err(ObjectCacheError::DatagramNotFound.into())) .unwrap(); continue; } @@ -205,7 +221,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver subgroup_stream_cache, _ => { - resp.send(Err(anyhow::anyhow!("subgroup stream cache not found"))) + resp.send(Err(ObjectCacheError::SubgroupStreamNotFound.into())) .unwrap(); continue; } @@ -220,7 +236,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver datagram_cache, _ => { - resp.send(Err(anyhow::anyhow!("datagram cache not found"))) + resp.send(Err(ObjectCacheError::DatagramNotFound.into())) .unwrap(); continue; } @@ -234,7 +250,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver datagram_cache, _ => { - resp.send(Err(anyhow::anyhow!("datagram cache not found"))) + resp.send(Err(ObjectCacheError::DatagramNotFound.into())) .unwrap(); continue; } @@ -253,7 +269,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver subgroup_stream_cache, _ => { - resp.send(Err(anyhow::anyhow!("subgroup stream cache not found"))) + resp.send(Err(ObjectCacheError::SubgroupStreamNotFound.into())) .unwrap(); continue; } @@ -273,7 +289,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver subgroup_stream_cache, _ => { - resp.send(Err(anyhow::anyhow!("subgroup stream cache not found"))) + resp.send(Err(ObjectCacheError::SubgroupStreamNotFound.into())) .unwrap(); continue; } @@ -292,7 +308,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver subgroup_stream_cache, _ => { - resp.send(Err(anyhow::anyhow!("subgroup stream cache not found"))) + resp.send(Err(ObjectCacheError::SubgroupStreamNotFound.into())) .unwrap(); continue; } @@ -313,7 +329,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver { @@ -328,7 +344,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver { diff --git a/moqt-server/src/modules/server_processes/data_streams/subgroup_stream/forwarder.rs b/moqt-server/src/modules/server_processes/data_streams/subgroup_stream/forwarder.rs index 2f93114d..9bfc0d92 100644 --- a/moqt-server/src/modules/server_processes/data_streams/subgroup_stream/forwarder.rs +++ b/moqt-server/src/modules/server_processes/data_streams/subgroup_stream/forwarder.rs @@ -4,19 +4,22 @@ use crate::{ modules::{ buffer_manager::BufferCommand, moqt_client::MOQTClient, - object_cache_storage::{cache::CacheKey, wrapper::ObjectCacheStorageWrapper}, + object_cache_storage::{ + cache::CacheKey, storage::ObjectCacheError, wrapper::ObjectCacheStorageWrapper, + }, pubsub_relation_manager::wrapper::PubSubRelationManagerWrapper, server_processes::senders::Senders, }, signal_dispatcher::{DataStreamThreadSignal, SignalDispatcher, TerminateReason}, }; -use anyhow::{Ok, Result, bail}; +use anyhow::{Error as AnyError, Result}; use bytes::BytesMut; use moqt_core::{ data_stream_type::DataStreamType, + messages::data_streams::DataStreams, messages::{ control_messages::subscribe::FilterType, - data_streams::{DataStreams, object_status::ObjectStatus, subgroup_stream}, + data_streams::{object_status::ObjectStatus, subgroup_stream}, }, models::{ range::{ObjectRange, ObjectStart}, @@ -36,6 +39,22 @@ use tokio::{ }; use tracing::{self}; +#[derive(Debug)] +pub(crate) enum SubgroupForwarderError { + CacheMissing, + ForwardingPreferenceMismatch, + SendFailed(AnyError), + Other(AnyError), +} + +type ForwarderResult = std::result::Result; + +impl From for SubgroupForwarderError { + fn from(err: AnyError) -> Self { + SubgroupForwarderError::Other(err) + } +} + pub(crate) struct SubgroupStreamObjectForwarder { stream: UniSendStream, senders: Arc, @@ -51,6 +70,17 @@ pub(crate) struct SubgroupStreamObjectForwarder { } impl SubgroupStreamObjectForwarder { + fn map_cache_error(err: AnyError) -> SubgroupForwarderError { + if let Some(cache_err) = err.downcast_ref::() { + match cache_err { + ObjectCacheError::SubgroupStreamNotFound => SubgroupForwarderError::CacheMissing, + _ => SubgroupForwarderError::Other(err), + } + } else { + SubgroupForwarderError::Other(err) + } + } + pub(crate) async fn init( stream: UniSendStream, downstream_subscribe_id: u64, @@ -137,7 +167,7 @@ impl SubgroupStreamObjectForwarder { Ok(stream_object_forwarder) } - pub(crate) async fn start(&mut self) -> Result<()> { + pub(crate) async fn start(&mut self) -> ForwarderResult<()> { let mut object_cache_storage = ObjectCacheStorageWrapper::new(self.senders.object_cache_tx().clone()); @@ -156,7 +186,9 @@ impl SubgroupStreamObjectForwarder { Ok(()) } - async fn get_upstream_forwarding_preference(&self) -> Result> { + async fn get_upstream_forwarding_preference( + &self, + ) -> ForwarderResult> { let pubsub_relation_manager = PubSubRelationManagerWrapper::new(self.senders.pubsub_relation_tx().clone()); @@ -166,24 +198,23 @@ impl SubgroupStreamObjectForwarder { pubsub_relation_manager .get_upstream_forwarding_preference(upstream_session_id, upstream_subscribe_id) .await + .map_err(SubgroupForwarderError::from) } async fn validate_forwarding_preference( &self, upstream_forwarding_preference: &Option, - ) -> Result<()> { + ) -> ForwarderResult<()> { match upstream_forwarding_preference { Some(ForwardingPreference::Subgroup) => Ok(()), - _ => { - bail!("Forwarding preference is not Subgroup Stream"); - } + _ => Err(SubgroupForwarderError::ForwardingPreferenceMismatch), } } async fn set_forwarding_preference( &self, downstream_forwarding_preference: Option, - ) -> Result<()> { + ) -> ForwarderResult<()> { let forwarding_preference = downstream_forwarding_preference.unwrap(); let downstream_session_id = self.stream.stable_id(); let downstream_subscribe_id = self.downstream_subscribe_id; @@ -205,7 +236,7 @@ impl SubgroupStreamObjectForwarder { async fn forward_header( &mut self, object_cache_storage: &mut ObjectCacheStorageWrapper, - ) -> Result<()> { + ) -> ForwarderResult<()> { let upstream_header = self.get_upstream_header(object_cache_storage).await?; let downstream_header = self.generate_downstream_header(&upstream_header).await; @@ -219,11 +250,12 @@ impl SubgroupStreamObjectForwarder { async fn get_upstream_header( &self, object_cache_storage: &mut ObjectCacheStorageWrapper, - ) -> Result { + ) -> ForwarderResult { let (group_id, subgroup_id) = self.subgroup_stream_id; let subgroup_stream_header = object_cache_storage .get_subgroup_stream_header(&self.cache_key, group_id, subgroup_id) - .await?; + .await + .map_err(Self::map_cache_error)?; Ok(subgroup_stream_header) } @@ -231,7 +263,7 @@ impl SubgroupStreamObjectForwarder { async fn forward_objects( &mut self, object_cache_storage: &mut ObjectCacheStorageWrapper, - ) -> Result<()> { + ) -> ForwarderResult<()> { let (is_end, first_object_cache_id) = self.forward_first_object(object_cache_storage).await?; tracing::debug!( @@ -254,7 +286,7 @@ impl SubgroupStreamObjectForwarder { async fn forward_first_object( &mut self, object_cache_storage: &mut ObjectCacheStorageWrapper, - ) -> Result<(bool, Option)> { + ) -> ForwarderResult<(bool, Option)> { let object_cache_id = loop { if self.is_terminated.load(Ordering::Relaxed) { return Ok((true, None)); @@ -289,7 +321,7 @@ impl SubgroupStreamObjectForwarder { &mut self, object_cache_storage: &mut ObjectCacheStorageWrapper, mut object_cache_id: usize, - ) -> Result<()> { + ) -> ForwarderResult<()> { loop { if self.is_terminated.load(Ordering::Relaxed) { break; @@ -326,7 +358,7 @@ impl SubgroupStreamObjectForwarder { async fn get_first_object( &self, object_cache_storage: &mut ObjectCacheStorageWrapper, - ) -> Result> { + ) -> ForwarderResult> { let downstream_session_id = self.stream.stable_id(); let downstream_subscribe_id = self.downstream_subscribe_id; @@ -376,7 +408,7 @@ impl SubgroupStreamObjectForwarder { async fn get_first_object_for_first_stream( &self, object_cache_storage: &mut ObjectCacheStorageWrapper, - ) -> Result> { + ) -> ForwarderResult> { let (group_id, subgroup_id) = self.subgroup_stream_id; match self.filter_type { @@ -385,6 +417,7 @@ impl SubgroupStreamObjectForwarder { object_cache_storage .get_first_subgroup_stream_object(&self.cache_key, group_id, subgroup_id) .await + .map_err(Self::map_cache_error) } FilterType::LatestObject => { // If the subscriber is the first subscriber for this track, the Relay needs to @@ -392,6 +425,7 @@ impl SubgroupStreamObjectForwarder { object_cache_storage .get_first_subgroup_stream_object(&self.cache_key, group_id, subgroup_id) .await + .map_err(Self::map_cache_error) } FilterType::AbsoluteStart | FilterType::AbsoluteRange => { let start_group_id = self.requested_object_range.start_group_id().unwrap(); @@ -406,10 +440,12 @@ impl SubgroupStreamObjectForwarder { start_object_id, ) .await + .map_err(Self::map_cache_error) } else { object_cache_storage .get_first_subgroup_stream_object(&self.cache_key, group_id, subgroup_id) .await + .map_err(Self::map_cache_error) } } } @@ -419,7 +455,7 @@ impl SubgroupStreamObjectForwarder { &self, object_cache_storage: &mut ObjectCacheStorageWrapper, actual_object_start: ObjectStart, - ) -> Result> { + ) -> ForwarderResult> { let (group_id, subgroup_id) = self.subgroup_stream_id; if group_id == actual_object_start.group_id() { @@ -431,10 +467,12 @@ impl SubgroupStreamObjectForwarder { actual_object_start.object_id(), ) .await + .map_err(Self::map_cache_error) } else { object_cache_storage .get_first_subgroup_stream_object(&self.cache_key, group_id, subgroup_id) .await + .map_err(Self::map_cache_error) } } @@ -442,7 +480,7 @@ impl SubgroupStreamObjectForwarder { &self, object_cache_storage: &mut ObjectCacheStorageWrapper, object_cache_id: usize, - ) -> Result> { + ) -> ForwarderResult> { let (group_id, subgroup_id) = self.subgroup_stream_id; object_cache_storage .get_next_subgroup_stream_object( @@ -452,6 +490,7 @@ impl SubgroupStreamObjectForwarder { object_cache_id, ) .await + .map_err(Self::map_cache_error) } async fn generate_downstream_header( @@ -467,7 +506,10 @@ impl SubgroupStreamObjectForwarder { .unwrap() } - async fn packetize_header(&self, header: &subgroup_stream::Header) -> Result { + async fn packetize_header( + &self, + header: &subgroup_stream::Header, + ) -> ForwarderResult { let downstream_session_id = self.stream.stable_id(); let downstream_subscribe_id = self.downstream_subscribe_id; @@ -498,17 +540,17 @@ impl SubgroupStreamObjectForwarder { async fn packetize_object( &mut self, stream_object: &subgroup_stream::Object, - ) -> Result { + ) -> ForwarderResult { let mut buf = BytesMut::new(); stream_object.packetize(&mut buf); Ok(buf) } - async fn send(&mut self, message_buf: BytesMut) -> Result<()> { + async fn send(&mut self, message_buf: BytesMut) -> ForwarderResult<()> { if let Err(e) = self.stream.write_all(&message_buf).await { tracing::warn!("Failed to write to stream: {:?}", e); - bail!(e); + return Err(SubgroupForwarderError::SendFailed(AnyError::new(e))); } Ok(()) @@ -527,7 +569,7 @@ impl SubgroupStreamObjectForwarder { ) } - async fn get_stream_ids_for_same_group(&self) -> Result> { + async fn get_stream_ids_for_same_group(&self) -> ForwarderResult> { let downstream_session_id = self.stream.stable_id(); let downstream_subscribe_id = self.downstream_subscribe_id; let (group_id, _) = self.subgroup_stream_id; @@ -560,7 +602,10 @@ impl SubgroupStreamObjectForwarder { Ok(stream_ids) } - async fn send_termination_signal_to_forwarders(&self, reason: ObjectStatus) -> Result<()> { + async fn send_termination_signal_to_forwarders( + &self, + reason: ObjectStatus, + ) -> ForwarderResult<()> { let signal_dispatcher = SignalDispatcher::new(self.senders.signal_dispatch_tx().clone()); let terminate_reason = TerminateReason::ObjectStatus(reason); let signal = Box::new(DataStreamThreadSignal::Terminate(terminate_reason)); diff --git a/moqt-server/src/modules/server_processes/thread_starters.rs b/moqt-server/src/modules/server_processes/thread_starters.rs index 1db8f7c9..17d9fed0 100644 --- a/moqt-server/src/modules/server_processes/thread_starters.rs +++ b/moqt-server/src/modules/server_processes/thread_starters.rs @@ -5,7 +5,7 @@ use super::{ data_streams::{ datagram::{forwarder::DatagramObjectForwarder, receiver::DatagramObjectReceiver}, subgroup_stream::{ - forwarder::SubgroupStreamObjectForwarder, + forwarder::{SubgroupForwarderError, SubgroupStreamObjectForwarder}, receiver::SubgroupStreamObjectReceiver, uni_stream::{UniRecvStream, UniSendStream}, }, @@ -215,9 +215,37 @@ async fn spawn_subgroup_stream_object_forwarder_thread( .await { Ok(_) => {} - Err(e) => { + Err(SubgroupForwarderError::CacheMissing) => { + tracing::warn!( + "StreamObjectForwarder: Cache missing: finish forwarder worker and stream" + ); + } + Err(SubgroupForwarderError::SendFailed(e)) => { + let code = TerminationErrorCode::InternalError; + let reason = format!("StreamObjectForwarder send failed: {:?}", e); + + tracing::error!(reason); + + let _ = senders + .close_session_tx() + .send((u8::from(code) as u64, reason.to_string())) + .await; + } + Err(SubgroupForwarderError::ForwardingPreferenceMismatch) => { + let code = TerminationErrorCode::InternalError; + let reason = + "StreamObjectForwarder forwarding preference mismatch".to_string(); + + tracing::error!(reason); + + let _ = senders + .close_session_tx() + .send((u8::from(code) as u64, reason.clone())) + .await; + } + Err(SubgroupForwarderError::Other(err)) => { let code = TerminationErrorCode::InternalError; - let reason = format!("StreamObjectForwarder: {:?}", e); + let reason = format!("StreamObjectForwarder: {:?}", err); tracing::error!(reason);