Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
f0cfe56
feat: publish message
t-gazzy Oct 16, 2025
c614fc8
feat: publish message test
t-gazzy Oct 16, 2025
1538f23
feat: publish_ok
t-gazzy Oct 16, 2025
c3483ae
feat: add tests
t-gazzy Oct 16, 2025
300d197
chore: forget commit
t-gazzy Oct 16, 2025
bc9aa9a
chore: test fixed
t-gazzy Oct 16, 2025
9c19d61
feat: subscribe tests
t-gazzy Oct 17, 2025
cc33221
fix: subscribe_ok
t-gazzy Oct 17, 2025
7cdc21a
chore: forward as bool
t-gazzy Oct 17, 2025
44293ae
chore: publish/publish_ok
t-gazzy Oct 17, 2025
171df11
chore: renames
t-gazzy Oct 17, 2025
d7bc284
chore: rename
t-gazzy Oct 17, 2025
8f1c866
feat: pub/sub message
t-gazzy Oct 17, 2025
239e11f
feat: session and context
t-gazzy Oct 20, 2025
adb7c74
chore: temp
t-gazzy Oct 22, 2025
229c47e
feat: impl DatagramSender/Receiver
t-gazzy Oct 23, 2025
a0d9e2d
feat: impl datagramsender
t-gazzy Oct 23, 2025
2939c61
chore: add tests
t-gazzy Oct 23, 2025
2c229c1
chore: add subgroup tests
t-gazzy Oct 23, 2025
abfa7e7
chore: redundant mut
t-gazzy Oct 23, 2025
8f9418d
chore: test
t-gazzy Oct 23, 2025
0ead9fb
chore: temporary
t-gazzy Oct 24, 2025
f701dc5
feat: impl relay
t-gazzy Oct 25, 2025
49a7a68
feat: subscribe on server
t-gazzy Oct 25, 2025
ecd0372
fix: check first packet
t-gazzy Oct 27, 2025
20e3001
chore: temp
t-gazzy Oct 29, 2025
1f48153
chore: temp
t-gazzy Oct 29, 2025
4ee34ba
chore: temp
t-gazzy Oct 30, 2025
c68fda4
chore: temp
t-gazzy Oct 30, 2025
5711554
feat: datagram binding
t-gazzy Oct 31, 2025
afe7c62
feat: pub/sub succeeded
t-gazzy Oct 31, 2025
f0ff977
feat: datagram connection
t-gazzy Oct 31, 2025
cad91ce
chore: temp
t-gazzy Nov 1, 2025
81e4ff9
fix: maybe resolve deadlock
t-gazzy Nov 1, 2025
19245cd
chore: for hackathon
t-gazzy Nov 1, 2025
cfc697f
chore: for cloud
t-gazzy Nov 1, 2025
7720d4e
chore: log out
t-gazzy Nov 1, 2025
2a5b599
feat: datagram ok!
t-gazzy Nov 1, 2025
e0c8fe9
fix: try to down
t-gazzy Nov 2, 2025
535ba2c
chore: insert cert
t-gazzy Nov 2, 2025
7eb6561
fix: load lots of certs
t-gazzy Nov 2, 2025
b0ca1e3
chore: forget to commit
t-gazzy Nov 2, 2025
707c007
fix: typo
t-gazzy Nov 2, 2025
d4abd3a
feat: impl `MOQT_IMPLEMENTATION`
t-gazzy Nov 2, 2025
3c67419
fix: client
t-gazzy Nov 3, 2025
b7e0ec7
feat: impl byteencoder/decoder
t-gazzy Nov 4, 2025
db57eb9
chore: rename
t-gazzy Nov 4, 2025
d94b963
chore: use bytereader/writer
t-gazzy Nov 4, 2025
e551a33
chore: fix: message
t-gazzy Nov 4, 2025
1f42849
fix: bug
t-gazzy Nov 5, 2025
8b88fb9
fix: stream receive
t-gazzy Nov 5, 2025
75ea763
feat: add subscribe
t-gazzy Nov 6, 2025
2ce50e9
chore: subscribe
t-gazzy Nov 6, 2025
8d00c7f
chore: add log
t-gazzy Nov 7, 2025
ceb3b92
fix: GroupOrder::Publisher
t-gazzy Nov 7, 2025
cfb124f
chore: add log
t-gazzy Nov 7, 2025
d180139
chore: fix encode/decode u8
t-gazzy Nov 7, 2025
5cb6e5e
chore: miss
t-gazzy Nov 7, 2025
c050af2
fix: BytesWriter/Reader
t-gazzy Nov 14, 2025
3aaf448
fix: use bytes extension and ReceiveStream is released from Mutex
t-gazzy Nov 17, 2025
8f0a72d
fix: replace ext
t-gazzy Nov 18, 2025
4f56342
chore: remove unused files
t-gazzy Nov 18, 2025
8f0f684
chore: file move
t-gazzy Nov 18, 2025
392e0a4
chore: directory changes
t-gazzy Nov 19, 2025
c6b3bd2
chore: move endpoint
t-gazzy Nov 21, 2025
690000d
chore: move directory
t-gazzy Nov 21, 2025
86d8031
feat: add flag for certificate validation
t-gazzy Nov 21, 2025
210659c
chore: remove unused api
t-gazzy Nov 21, 2025
3776e04
chore: rename
t-gazzy Nov 21, 2025
816f4e8
chore: mis rename
t-gazzy Nov 21, 2025
6d98169
fix: subroutine
t-gazzy Nov 26, 2025
523fc93
fix: tests
t-gazzy Nov 27, 2025
af8d6ab
fix: test
t-gazzy Nov 27, 2025
9b0cbd6
fix: datagram
t-gazzy Nov 27, 2025
d4f6c7a
fix: Bytes->Arc
t-gazzy Dec 3, 2025
81c6e53
fix: remove redundant mutex
t-gazzy Dec 17, 2025
180b7b1
fix: datagram ok
t-gazzy Dec 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ cargo-flamegraph.trace/*
key.pem
cert.pem

Cargo.lock
Cargo.lock

**.log
9 changes: 5 additions & 4 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
"rust-analyzer.check.command": "clippy",
"rust-analyzer.cargo.extraEnv": {
"RUSTFLAGS": "--cfg web_sys_unstable_apis --cfg tokio_unstable"
// "RUST_BACKTRACE": "1"
},
"rust-analyzer.cargo.features": [
// "web_sys_unstable_apis"
],
"rust-analyzer.linkedProjects": [
"moqt-wasm/Cargo.toml",
"moqt-wasm/moqt/Cargo.toml",
"moqt-wasm/sample/client/Cargo.toml",
"moqt-wasm/sample/server/Cargo.toml"
"Cargo.toml",
"moqt/Cargo.toml",
"client/Cargo.toml",
"relay/Cargo.toml"
]
}
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ members = [
'moqt-server-sample',
'moqt-client-sample',
'moqt',
'sample/server',
'sample/client',
'relay',
'client',
]
2 changes: 1 addition & 1 deletion sample/client/Cargo.toml → client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2024"

[dependencies]
moqt = {path = "../../moqt"}
moqt = {path = "../moqt"}
tokio = { version = "1.45.0", features = ["full", "tracing"] }
tracing = "0.1.37"
async-trait = "0.1.74"
Expand Down
303 changes: 303 additions & 0 deletions client/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
use std::{
net::ToSocketAddrs,
str::FromStr,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
};

use anyhow::bail;
use moqt::{DatagramField, Endpoint, ObjectDatagram, QUIC, Session, SubscribeOption};

use crate::stream_runner::StreamTaskRunner;

pub(crate) struct Client {
label: String,
join_handle: tokio::task::JoinHandle<()>,
track_alias: Arc<AtomicU64>,
publisher: moqt::Publisher<moqt::QUIC>,
subscriber: moqt::Subscriber<moqt::QUIC>,
runner: StreamTaskRunner,
}

impl Client {
pub(crate) async fn new(cert_path: String, label: String) -> anyhow::Result<Self> {
// let endpoint = Endpoint::<QUIC>::create_client_with_custom_cert(0, &cert_path)?;
let config = moqt::ClientConfig::default();
let endpoint = Endpoint::<QUIC>::create_client(config)?;
// let url = url::Url::from_str("moqt://interop-relay.cloudflare.mediaoverquic.com:443")?;
let url = url::Url::from_str("moqt://fb.mvfst.net:9448")?;
// let url = url::Url::from_str("moqt://lminiero.it:9000")?;
// let url = url::Url::from_str("moqt://moqt.research.skyway.io:4434")?;
// let url = url::Url::from_str("moqt://localhost:4434")?;
let host = url.host_str().unwrap();
let remote_address = (host, url.port().unwrap_or(4433))
.to_socket_addrs()?
.next()
.unwrap();

tracing::info!("remote_address: {} host: {}", remote_address, host);

let session = match endpoint.connect(remote_address, host).await {
Ok(s) => s,
Err(e) => {
bail!("test failed: {:?}", e)
}
};
let track_alias = Arc::new(AtomicU64::new(0));
let (publisher, subscriber) = session.create_publisher_subscriber_pair();
let join_handle = Self::create_receiver(label.clone(), session, track_alias.clone());

Ok(Self {
label,
join_handle,
track_alias,
publisher,
subscriber,
runner: StreamTaskRunner::new(),
})
}

fn create_receiver(
label: String,
session: Session<moqt::QUIC>,
track_alias: Arc<AtomicU64>,
) -> tokio::task::JoinHandle<()> {
tokio::task::Builder::new()
.spawn(async move {
let runner = StreamTaskRunner::new();
loop {
let result = session.receive_event().await;
if let Err(e) = result {
tracing::error!("Failed to receive event: {}", e);
break;
}
let event = result.unwrap();
match event {
moqt::SessionEvent::PublishNamespace(publish_namespace_handler) => {
tracing::info!(
"Received: {} Publish Namespace: {}",
label,
publish_namespace_handler.track_namespace
);
let _ = publish_namespace_handler.ok().await;
}
moqt::SessionEvent::SubscribeNameSpace(subscribe_namespace_handler) => {
tracing::info!(
"Received: {} Subscribe Namespace: {}",
label,
subscribe_namespace_handler.track_namespace_prefix
);
let _ = subscribe_namespace_handler.ok().await;
}
moqt::SessionEvent::Publish(publish_handler) => {
tracing::info!("Received: {} Publish", label);
match publish_handler
.ok(128, moqt::FilterType::LatestObject)
.await
{
Ok(h) => h,
Err(_) => {
tracing::error!("failed to send");
return;
}
};
// Self::subscribe(label.clone(), publish_handler, &runner).await;
}
moqt::SessionEvent::Subscribe(subscribe_handler) => {
tracing::info!("Received: {} Subscribe", label);
let track_alias = track_alias.load(Ordering::SeqCst);
let _ = subscribe_handler
.ok(track_alias, 1000000, moqt::ContentExists::False)
.await;
let publication = subscribe_handler.into_publication(track_alias);
Self::create_stream(label.clone(), publication, &runner).await;
}
moqt::SessionEvent::ProtocolViolation() => {
tracing::info!("Received: {} ProtocolViolation", label);
}
};
}
})
.unwrap()
}

pub(crate) async fn publish_namespace(&self, track_namespace: String) {
let result = self.publisher.publish_namespace(track_namespace).await;
if result.is_err() {
tracing::info!("{}: publish namespace error", self.label);
} else {
tracing::info!("{}: publish namespace ok", self.label);
}
}

pub(crate) async fn subscribe_namespace(&self, track_namespace_prefix: String) {
let result = self
.subscriber
.subscribe_namespace(track_namespace_prefix)
.await;
if result.is_err() {
tracing::info!("{}: subscribe namespace error", self.label);
} else {
tracing::info!("{}: subscribe namespace ok", self.label);
}
}

pub(crate) async fn publish(&self, track_namespace: String, track_name: String) {
let option = moqt::PublishOption::default();
let pub_result = self
.publisher
.publish(track_namespace, track_name, option)
.await;
if let Ok(p) = pub_result {
tracing::info!("{}: publish ok", self.label);
self.track_alias
.fetch_add(p.track_alias, std::sync::atomic::Ordering::SeqCst);
} else {
tracing::error!("{}: publish error", self.label);
}
}

async fn subscribe(
label: String,
publish_handler: moqt::PublishHandler<moqt::QUIC>,
runner: &StreamTaskRunner,
) {
let full_name = format!(
"{}/{}",
publish_handler.track_namespace, publish_handler.track_name
);
let task = async move {
tracing::info!("{} :subscribe {}", label, full_name);
let subscription = publish_handler.into_subscription(0);
let acceptance = match subscription.accept_stream_or_datagram().await {
Ok(acceptance) => acceptance,
Err(_) => {
tracing::error!("Failed to accept stream or datagram");
return;
}
};
match acceptance {
moqt::Acceptance::Stream(stream) => todo!(),
moqt::Acceptance::Datagram(mut receiver, object) => {
tracing::info!("{} :subscribe datagram :{:?}", label, object);
loop {
let result = receiver.receive().await;
if let Err(e) = result {
tracing::error!("Failed to receive: {}", e);
break;
}
let object = result.unwrap();
tracing::info!("{} :subscribe datagram, message: {:?}", label, object);
}
}
}
};
runner.add_task(Box::pin(task)).await;
}

async fn create_stream(
label: String,
publication: moqt::PublishedResource<moqt::QUIC>,
runner: &StreamTaskRunner,
) {
tracing::info!("{} :create stream", label);
let datagram = publication.create_datagram();
// let _ = publication
// .create_stream()
// .await
// .inspect(|_| {
// tracing::info!("{} :create stream ok", label);
// })
// .inspect_err(|_| {
// tracing::error!("{} :create stream error", label);
// });
let task = async move {
let mut id = 0;
tracing::info!("{} :create stream start", label);
loop {
let format_text = format!("hello from {}! id: {}", label, id);
let data = DatagramField::to_bytes(format_text);
let field = DatagramField::Payload0x00 {
object_id: id,
payload: data,
payload: Arc::new(format_text.as_bytes().to_vec()),
};
let obj = ObjectDatagram::new(publication.track_alias, id, field);
match datagram.send(obj).await {
Ok(_) => {
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
id += 1
}
Err(_) => {
tracing::error!("failed to send");
break;
}
}
}
};
runner.add_task(Box::pin(task)).await;
}

pub(crate) async fn active_subscribe(
&self,
label: String,
track_namespace: String,
track_name: String,
) {
let full_name = format!("{}/{}", track_namespace, track_name);
let option = SubscribeOption {
subscriber_priority: 128,
group_order: moqt::GroupOrder::Ascending,
forward: true,
filter_type: moqt::FilterType::LatestObject,
};
tracing::info!("{} :subscribe {}", label, full_name);
let subscription = match self
.subscriber
.subscribe(track_namespace, track_name, option)
.await
{
Ok(s) => s,
Err(e) => {
tracing::error!("Failed to subscribe: {}", e);
return;
}
};
tracing::warn!("qqq subscribe ok");
let task = async move {
let acceptance = match subscription.accept_stream_or_datagram().await {
Ok(acceptance) => acceptance,
Err(e) => {
tracing::error!("Failed to accept stream or datagram: {}", e);
return;
}
};
tracing::warn!("qqq accept ok");
match acceptance {
moqt::Acceptance::Stream(stream) => todo!(),
moqt::Acceptance::Datagram(mut receiver, object) => {
tracing::info!("{} :subscribe datagram: {:?}", label, object,);
loop {
let result = receiver.receive().await;
if let Err(e) = result {
tracing::error!("Failed to receive: {}", e);
break;
}
let object = result.unwrap();
tracing::info!("{} :subscribe datagram: {:?}", label, object);
}
}
};
};
self.runner.add_task(Box::pin(task)).await;
}
}

impl Drop for Client {
fn drop(&mut self) {
tracing::info!("Client has been dropped.");
self.join_handle.abort();
}
}
Loading
Loading