Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 10 additions & 11 deletions moqt-client-wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1740,11 +1740,11 @@ impl SubscriptionNode {
}

fn unregister_publisher_namespace(&mut self, namespace: Vec<String>) -> bool {
if let Some(producer) = &mut self.producer {
if producer.has_namespace(namespace.clone()) {
let _ = producer.delete_namespace(namespace);
return true;
}
if let Some(producer) = &mut self.producer
&& producer.has_namespace(namespace.clone())
{
let _ = producer.delete_namespace(namespace);
return true;
}
false
}
Expand All @@ -1758,15 +1758,14 @@ impl SubscriptionNode {
}

fn unregister_subscriber_namespace_prefix(&mut self, namespace_prefix: Vec<String>) -> bool {
if let Some(consumer) = &mut self.consumer {
if consumer
if let Some(consumer) = &mut self.consumer
&& consumer
.get_namespace_prefixes()
.map(|prefixes| prefixes.contains(&namespace_prefix))
.unwrap_or(false)
{
let _ = consumer.delete_namespace_prefix(namespace_prefix);
return true;
}
{
let _ = consumer.delete_namespace_prefix(namespace_prefix);
return true;
}
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ pub enum ExtensionHeaderValue {

impl ExtensionHeader {
pub fn new(header_type: u64, value: ExtensionHeaderValue) -> Result<Self> {
if header_type % 2 == 0 && matches!(value, ExtensionHeaderValue::OddTypeValue(_)) {
if header_type.is_multiple_of(2) && matches!(value, ExtensionHeaderValue::OddTypeValue(_)) {
bail!("Mismatched value type: expected even, but got odd");
}

if header_type % 2 != 0 && matches!(value, ExtensionHeaderValue::EvenTypeValue(_)) {
if !header_type.is_multiple_of(2) && matches!(value, ExtensionHeaderValue::EvenTypeValue(_))
{
bail!("Mismatched value type: expected odd, but got even");
}

Expand Down
11 changes: 6 additions & 5 deletions moqt-core/src/modules/messages/data_streams/subgroup_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,12 @@ impl Object {
}

// Any object with a status code other than zero MUST have an empty payload.
if let Some(status) = object_status {
if status != ObjectStatus::Normal && object_payload_length != 0 {
// TODO: return Termination Error Code
bail!("Any object with a status code other than zero MUST have an empty payload.");
}
if let Some(status) = object_status
&& status != ObjectStatus::Normal
&& object_payload_length != 0
{
// TODO: return Termination Error Code
bail!("Any object with a status code other than zero MUST have an empty payload.");
}

// length of total byte of extension headers
Expand Down
20 changes: 10 additions & 10 deletions moqt-ingest-gateway/src/rtmp/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,16 @@ pub async fn handle_event(
namespace_path.split('/').map(|s| s.to_string()).collect();
let track_name = "audio".to_string();
state.counters.audio += 1;
if state.counters.audio == 1 || state.counters.audio % 1000 == 0 {
if state.counters.audio == 1 || state.counters.audio.is_multiple_of(1000) {
println!(
"[rtmp {label}] audio packets={} app={app_name} track={track_name}",
state.counters.audio
);
}
if let Some(recorder) = state.recorder.as_mut() {
if let Err(err) = recorder.write_audio(timestamp.value, data.as_ref()).await {
eprintln!("[rtmp {label}] write_audio failed: {err:?}");
}
if let Some(recorder) = state.recorder.as_mut()
&& let Err(err) = recorder.write_audio(timestamp.value, data.as_ref()).await
{
eprintln!("[rtmp {label}] write_audio failed: {err:?}");
}
if let Some(moqt) = state.moqt.as_ref() {
let key = (namespace_path.clone(), track_name.clone());
Expand Down Expand Up @@ -162,16 +162,16 @@ pub async fn handle_event(
namespace_path.split('/').map(|s| s.to_string()).collect();
let track_name = "video".to_string();
state.counters.video += 1;
if state.counters.video == 1 || state.counters.video % 1000 == 0 {
if state.counters.video == 1 || state.counters.video.is_multiple_of(1000) {
println!(
"[rtmp {label}] video packets={} app={app_name} track={track_name}",
state.counters.video
);
}
if let Some(recorder) = state.recorder.as_mut() {
if let Err(err) = recorder.write_video(timestamp.value, data.as_ref()).await {
eprintln!("[rtmp {label}] write_video failed: {err:?}");
}
if let Some(recorder) = state.recorder.as_mut()
&& let Err(err) = recorder.write_video(timestamp.value, data.as_ref()).await
{
eprintln!("[rtmp {label}] write_video failed: {err:?}");
}
if let Some(moqt) = state.moqt.as_ref() {
let namespace = namespace_vec.as_slice();
Expand Down
2 changes: 1 addition & 1 deletion moqt-ingest-gateway/src/srt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async fn handle_request(request: ConnectionRequest) {
let (_, data) = packet?;
count += 1;

if count == 1 || count % 200 == 0 {
if count == 1 || count.is_multiple_of(200) {
println!(
"[srt {remote}] packets={count} stream_id={stream_id} last_size={}",
data.len()
Expand Down
1 change: 1 addition & 0 deletions moqt-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ bytes = "1"
async-trait = "0.1.74"
ttl_cache = "0.5.1"
tracing-appender = "0.2.3"
thiserror = "1.0"
50 changes: 33 additions & 17 deletions moqt-server/src/modules/object_cache_storage/storage.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
use super::commands::ObjectCacheStorageCommand;
use crate::modules::object_cache_storage::cache::{
Cache, CacheKey, datagram::DatagramCache, subgroup_stream::SubgroupStreamsCache,
};
use crate::modules::object_cache_storage::cache::Cache;
use crate::modules::object_cache_storage::cache::CacheKey;
use crate::modules::object_cache_storage::cache::datagram::DatagramCache;
use crate::modules::object_cache_storage::cache::subgroup_stream::SubgroupStreamsCache;
use std::{collections::HashMap, time::Duration};
use thiserror::Error;
use tokio::sync::mpsc;

#[derive(Debug, Error)]
pub(crate) enum ObjectCacheError {
#[error("subgroup stream not found")]
SubgroupStreamNotFound,
#[error("datagram not found")]
DatagramNotFound,
#[error("cache not found")]
NotFound,
// 将来の未知エラーを受けるためのプレースホルダ
#[allow(dead_code)]
#[error("unknown error: {0}")]
Unknown(String),
}

pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver<ObjectCacheStorageCommand>) {
tracing::trace!("object_cache_storage start");

Expand Down Expand Up @@ -79,7 +95,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver<ObjectCacheStor
let subgroup_stream_cache = match cache {
Some(Cache::SubgroupStream(subgroup_stream_cache)) => subgroup_stream_cache,
_ => {
resp.send(Err(anyhow::anyhow!("subgroup stream cache not found")))
resp.send(Err(ObjectCacheError::SubgroupStreamNotFound.into()))
.unwrap();
continue;
}
Expand All @@ -98,7 +114,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver<ObjectCacheStor
let datagram_cache = match cache {
Some(Cache::Datagram(datagram_cache)) => datagram_cache,
_ => {
resp.send(Err(anyhow::anyhow!("datagram cache not found")))
resp.send(Err(ObjectCacheError::DatagramNotFound.into()))
.unwrap();
continue;
}
Expand All @@ -119,7 +135,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver<ObjectCacheStor
let subgroup_streams_cache = match cache {
Some(Cache::SubgroupStream(subgroup_stream_cache)) => subgroup_stream_cache,
_ => {
resp.send(Err(anyhow::anyhow!("subgroup stream cache not found")))
resp.send(Err(ObjectCacheError::SubgroupStreamNotFound.into()))
.unwrap();
continue;
}
Expand All @@ -143,7 +159,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver<ObjectCacheStor
let datagram_cache = match cache {
Some(Cache::Datagram(datagram_cache)) => datagram_cache,
_ => {
resp.send(Err(anyhow::anyhow!("datagram cache not found")))
resp.send(Err(ObjectCacheError::DatagramNotFound.into()))
.unwrap();
continue;
}
Expand All @@ -163,7 +179,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver<ObjectCacheStor
let subgroup_streams_cache = match cache {
Some(Cache::SubgroupStream(subgroup_stream_cache)) => subgroup_stream_cache,
_ => {
resp.send(Err(anyhow::anyhow!("subgroup stream cache not found")))
resp.send(Err(ObjectCacheError::SubgroupStreamNotFound.into()))
.unwrap();
continue;
}
Expand All @@ -185,7 +201,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver<ObjectCacheStor
let datagram_cache = match cache {
Some(Cache::Datagram(datagram_cache)) => datagram_cache,
_ => {
resp.send(Err(anyhow::anyhow!("datagram cache not found")))
resp.send(Err(ObjectCacheError::DatagramNotFound.into()))
.unwrap();
continue;
}
Expand All @@ -205,7 +221,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver<ObjectCacheStor
let subgroup_streams_cache = match cache {
Some(Cache::SubgroupStream(subgroup_stream_cache)) => subgroup_stream_cache,
_ => {
resp.send(Err(anyhow::anyhow!("subgroup stream cache not found")))
resp.send(Err(ObjectCacheError::SubgroupStreamNotFound.into()))
.unwrap();
continue;
}
Expand All @@ -220,7 +236,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver<ObjectCacheStor
let datagram_cache = match cache {
Some(Cache::Datagram(datagram_cache)) => datagram_cache,
_ => {
resp.send(Err(anyhow::anyhow!("datagram cache not found")))
resp.send(Err(ObjectCacheError::DatagramNotFound.into()))
.unwrap();
continue;
}
Expand All @@ -234,7 +250,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver<ObjectCacheStor
let datagram_cache = match cache {
Some(Cache::Datagram(datagram_cache)) => datagram_cache,
_ => {
resp.send(Err(anyhow::anyhow!("datagram cache not found")))
resp.send(Err(ObjectCacheError::DatagramNotFound.into()))
.unwrap();
continue;
}
Expand All @@ -253,7 +269,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver<ObjectCacheStor
let subgroup_streams_cache = match cache {
Some(Cache::SubgroupStream(subgroup_stream_cache)) => subgroup_stream_cache,
_ => {
resp.send(Err(anyhow::anyhow!("subgroup stream cache not found")))
resp.send(Err(ObjectCacheError::SubgroupStreamNotFound.into()))
.unwrap();
continue;
}
Expand All @@ -273,7 +289,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver<ObjectCacheStor
let subgroup_streams_cache = match cache {
Some(Cache::SubgroupStream(subgroup_stream_cache)) => subgroup_stream_cache,
_ => {
resp.send(Err(anyhow::anyhow!("subgroup stream cache not found")))
resp.send(Err(ObjectCacheError::SubgroupStreamNotFound.into()))
.unwrap();
continue;
}
Expand All @@ -292,7 +308,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver<ObjectCacheStor
let subgroup_streams_cache = match cache {
Some(Cache::SubgroupStream(subgroup_stream_cache)) => subgroup_stream_cache,
_ => {
resp.send(Err(anyhow::anyhow!("subgroup stream cache not found")))
resp.send(Err(ObjectCacheError::SubgroupStreamNotFound.into()))
.unwrap();
continue;
}
Expand All @@ -313,7 +329,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver<ObjectCacheStor

resp.send(Ok(largest_group_id)).unwrap();
} else {
resp.send(Err(anyhow::anyhow!("cache not found"))).unwrap();
resp.send(Err(ObjectCacheError::NotFound.into())).unwrap();
}
}
ObjectCacheStorageCommand::GetLargestObjectId { cache_key, resp } => {
Expand All @@ -328,7 +344,7 @@ pub(crate) async fn object_cache_storage(rx: &mut mpsc::Receiver<ObjectCacheStor

resp.send(Ok(largest_object_id)).unwrap();
} else {
resp.send(Err(anyhow::anyhow!("cache not found"))).unwrap();
resp.send(Err(ObjectCacheError::NotFound.into())).unwrap();
}
}
ObjectCacheStorageCommand::DeleteClient { session_id, resp } => {
Expand Down
Loading