diff --git a/Cargo.toml b/Cargo.toml index a9675ff..ef91bde 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,8 @@ mctp-estack = { git = "https://github.com/OpenPRoT/mctp-rs.git", branch = "sync- mctp = { git = "https://github.com/OpenPRoT/mctp-rs.git", branch = "sync-features", default-features = false } [dev-dependencies] +mctp = { git = "https://github.com/OpenPRoT/mctp-rs.git", branch = "sync-features" } +standalone = { path = "standalone" } [package.metadata.docs.rs] all-features = true diff --git a/examples/echo/main.rs b/examples/echo/main.rs new file mode 100644 index 0000000..e1b39e5 --- /dev/null +++ b/examples/echo/main.rs @@ -0,0 +1,65 @@ +// Copyright 2025 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Example that listens for a request and echoes the payload in the response. +//! +//! Uses the standalone std implementation for the Stack and attaches to a specified serial port. +//! (Use a tool like _socat_ to attach to the linux MCTP stack through PTYs) +//! +//! Errors after the specified timeout. + +const MSG_TYPE: MsgType = MsgType(1); +const OWN_EID: Eid = Eid(8); +const TIMEOUT_SECS: u64 = 10; +const TTY_PATH: &str = "pts1"; + +use std::{fs::File, thread::spawn, time::Duration}; + +use mctp::{Eid, Listener, MsgType, RespChannel}; +use standalone::{ + Stack, + serial_sender::IoSerialSender, + util::{inbound_loop, update_loop}, +}; + +fn main() { + let serial = File::options() + .write(true) + .read(true) + .open(TTY_PATH) + .unwrap(); + + let serial_sender = IoSerialSender::new(serial.try_clone().unwrap()); + + let mut stack = Stack::new(serial_sender); + + stack.set_eid(OWN_EID).unwrap(); + + let update_stack = stack.clone(); + spawn(move || update_loop(update_stack)); + + let driver_stack = stack.clone(); + spawn(move || inbound_loop(driver_stack, serial)); + + let mut listener = stack + .listener(MSG_TYPE, Some(Duration::from_secs(TIMEOUT_SECS))) + .unwrap(); + + let mut buf = [0; 256]; + let (_, _, msg, mut rsp) = listener.recv(&mut buf).unwrap(); + + println!("Got message: {:#x?}", msg); + + rsp.send(msg).unwrap(); +} diff --git a/src/lib.rs b/src/lib.rs index f10bcfa..1a5f5a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -94,15 +94,20 @@ impl /// Provide an incoming packet to the router. /// /// This expects a single MCTP packet, without a transport binding header. - pub fn inbound(&mut self, pkt: &[u8]) -> Result<()> { + /// + /// Returns `Ok(Some(AppCookie))` for a associated listener or request, + /// or `Ok(None)` if the message was discarded. + pub fn inbound(&mut self, pkt: &[u8]) -> Result> { let own_eid = self.stack.eid(); let Some(mut msg) = self.stack.receive(pkt)? else { - return Ok(()); + return Ok(None); }; - if msg.dest != own_eid { - // Drop messages if eid does not match (for now) - return Ok(()); + if msg.dest != own_eid && msg.dest != Eid(0) { + // Drop messages if eid does not match (for now). + // EID 0 messages are used for physical addressing + // and will thus be processed. + return Ok(None); } match msg.tag { @@ -113,7 +118,7 @@ impl .is_some_and(|i| self.requests.get(i).is_some_and(|r| r.is_some())) { msg.retain(); - return Ok(()); + return Ok(Some(cookie)); } // In this case an unowned message not associated with a request was received. // This might happen if this endpoint was intended to route the packet to a different @@ -124,16 +129,17 @@ impl // check for matching listeners and retain with cookie for i in 0..self.listeners.len() { if self.listeners.get(i).ok_or(Error::InternalError)? == &Some(msg.typ) { - msg.set_cookie(Some(Self::listener_cookie_from_index(i))); + let cookie = Some(Self::listener_cookie_from_index(i)); + msg.set_cookie(cookie); msg.retain(); - return Ok(()); + return Ok(cookie); } } } } // Return Ok(()) even if a message has been discarded - Ok(()) + Ok(None) } /// Allocate a new request "_Handle_" @@ -221,12 +227,14 @@ impl Some(cookie), )?; - self.sender.send_vectored(frag, bufs) + self.sender.send_vectored(eid, frag, bufs) } /// Receive a message associated with a [`AppCookie`] /// /// Returns `None` when no message is available for the listener/request. + /// + /// The message can be retained and received at a later point again (see [MctpMessage::retain()]). pub fn recv(&mut self, cookie: AppCookie) -> Option> { self.stack.get_deferred_bycookie(&[cookie]) } @@ -326,7 +334,8 @@ impl /// Implemented by a transport binding for sending packets. pub trait Sender { /// Send a packet fragmented by `fragmenter` with the payload `payload` - fn send_vectored(&mut self, fragmenter: Fragmenter, payload: &[&[u8]]) -> Result; + fn send_vectored(&mut self, eid: Eid, fragmenter: Fragmenter, payload: &[&[u8]]) + -> Result; /// Get the MTU of a MCTP packet fragment (without transport headers) fn get_mtu(&self) -> usize; } @@ -345,6 +354,7 @@ mod test { impl Sender for DoNothingSender { fn send_vectored( &mut self, + _eid: Eid, fragmenter: mctp_estack::fragment::Fragmenter, payload: &[&[u8]], ) -> core::result::Result { @@ -364,6 +374,7 @@ mod test { impl Sender for BufferSender<'_, MTU> { fn send_vectored( &mut self, + _eid: Eid, mut fragmenter: mctp_estack::fragment::Fragmenter, payload: &[&[u8]], ) -> core::result::Result { diff --git a/standalone/Cargo.toml b/standalone/Cargo.toml new file mode 100644 index 0000000..4042179 --- /dev/null +++ b/standalone/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "standalone" +version = "0.1.0" +edition = "2024" +authors = ["OpenPRoT Contributors"] +license = "Apache-2.0" +repository = "https://github.com/OpenPRoT/mctp-lib" +description = "Standalone std implementation of mctp-lib" + +[dependencies] +mctp-lib = { path = "../" } +mctp = { git = "https://github.com/OpenPRoT/mctp-rs.git", branch = "sync-features" } +embedded-io-adapters = { version = "0.6.0", features = ["std"] } diff --git a/standalone/src/lib.rs b/standalone/src/lib.rs new file mode 100644 index 0000000..c5e917f --- /dev/null +++ b/standalone/src/lib.rs @@ -0,0 +1,344 @@ +// Copyright 2025 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Standalone implementation of mctp-lib usind [std] platform abstactions. +//! +//! Intended for use in examples and tests. + +pub mod serial_sender; + +use std::collections::HashMap; +use std::sync::{Arc, Condvar, Mutex}; +use std::time::{Duration, Instant}; + +use mctp::{Eid, Error, Listener, MsgIC, MsgType, ReqChannel, RespChannel, Tag}; +use mctp_lib::{AppCookie, Router, Sender}; + +const MAX_LISTENER_HANDLES: usize = 128; +const MAX_REQ_HANDLES: usize = 128; + +/// STD MCTP stack +/// +/// Encapsulates a inner [Router] in a thread safe and sharable manner. +/// Provides implementations for the [mctp] traits that hold references to the `stack`. +pub struct Stack { + inner: Arc>>, + /// Notifiers to inform _requests_ and _listeners_ about new messages. + notifiers: Arc>>>, + start_time: Instant, +} + +/// A request implementing [ReqChannel] +#[derive(Debug)] +pub struct Request { + /// Thread safe reference to a stack + stack: Arc>>, + cookie: AppCookie, + /// The [Condvar] that nofifies the request once the response is available + notifier: Arc, + timeout: Option, + tag: Option, +} +/// A listener implementing [Listener] +#[derive(Debug)] +pub struct ReqListener { + stack: Arc>>, + notifiers: Arc>>>, + cookie: AppCookie, + notifier: Arc, + timeout: Option, +} +/// A response for a request received by a [ReqListener] +#[derive(Debug)] +pub struct Response { + stack: Arc>>, + notifiers: Arc>>>, + tag: Tag, + typ: MsgType, + remote_eid: Eid, +} + +impl Stack { + pub fn new(outbound: S) -> Self { + let inner = Router::new(Eid(0), 0, outbound); + Self { + inner: Arc::new(Mutex::new(inner)), + notifiers: Arc::new(Mutex::new(HashMap::new())), + start_time: Instant::now(), + } + } + pub fn request(&mut self, dest: Eid, timeout: Option) -> mctp::Result> { + let handle = self + .inner + .lock() + .map_err(|_| Error::InternalError)? + .req(dest)?; + let mut notifiers = self.notifiers.lock().map_err(|_| Error::InternalError)?; + let notifier = Arc::new(Condvar::new()); + notifiers.insert(handle, Arc::clone(¬ifier)); + Ok(Request { + stack: self.inner.clone(), + cookie: handle, + notifier, + timeout, + tag: None, + }) + } + pub fn listener( + &mut self, + typ: MsgType, + timeout: Option, + ) -> mctp::Result> { + let handle = self + .inner + .lock() + .map_err(|_| Error::InternalError)? + .listener(typ)?; + let mut notifiers = self.notifiers.lock().map_err(|_| Error::InternalError)?; + let notifier = Arc::new(Condvar::new()); + notifiers.insert(handle, Arc::clone(¬ifier)); + Ok(ReqListener { + stack: self.inner.clone(), + cookie: handle, + notifier, + timeout, + notifiers: Arc::clone(&self.notifiers), + }) + } + + pub fn inbound(&mut self, pkt: &[u8]) -> Result<(), Error> { + let cookie = self + .inner + .lock() + .map_err(|_| Error::InternalError)? + .inbound(pkt)?; + if let Some(handle) = cookie { + let notifiers = self.notifiers.lock().map_err(|_| Error::InternalError)?; + let notifier = notifiers.get(&handle); + notifier.inspect(|c| c.notify_all()); + } + Ok(()) + } + + /// Call the update function of the inner stack with the current timestamp + /// + /// Convenience function that gets the current timestamp by calculating the duration since the stack was initialized (using [std::time]). + pub fn update(&mut self) -> Result { + self.inner + .lock() + .map_err(|_| Error::InternalError)? + .update(Instant::now().duration_since(self.start_time).as_millis() as u64) + } + + /// Set the stacks EID + pub fn set_eid(&mut self, eid: Eid) -> Result<(), Error> { + self.inner + .lock() + .map_err(|_| Error::InternalError)? + .set_eid(eid) + } +} + +impl Clone for Stack { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + notifiers: Arc::clone(&self.notifiers), + start_time: self.start_time, + } + } +} + +impl ReqChannel for Request { + fn send_vectored( + &mut self, + typ: mctp::MsgType, + integrity_check: mctp::MsgIC, + bufs: &[&[u8]], + ) -> mctp::Result<()> { + let tag = self + .stack + .lock() + .map_err(|_| Error::InternalError)? + .send_vectored(None, typ, None, integrity_check, self.cookie, bufs)?; + self.tag = Some(tag); + Ok(()) + } + + fn recv<'f>( + &mut self, + buf: &'f mut [u8], + ) -> mctp::Result<(mctp::MsgType, mctp::MsgIC, &'f mut [u8])> { + let Some(tag) = self.tag else { + return Err(Error::BadArgument); + }; + let mut stack = self.stack.lock().unwrap(); + loop { + if let Some(mut msg) = stack.recv(self.cookie) { + if msg.tag.tag() != tag.tag() { + msg.retain(); + return Err(Error::InternalError); + } + buf.get_mut(..msg.payload.len()) + .ok_or(Error::NoSpace)? + .copy_from_slice(msg.payload); + return Ok((msg.typ, msg.ic, &mut buf[..msg.payload.len()])); + } + if let Some(timeout) = self.timeout { + let (stack_result, timeout_result) = + self.notifier.wait_timeout(stack, timeout).unwrap(); + if timeout_result.timed_out() { + return Err(Error::TimedOut); + } else { + stack = stack_result; + } + } else { + stack = self.notifier.wait(stack).unwrap(); + } + } + } + + fn remote_eid(&self) -> Eid { + todo!() + } +} + +impl Listener for ReqListener { + type RespChannel<'a> + = Response + where + Self: 'a; + + fn recv<'f>( + &mut self, + buf: &'f mut [u8], + ) -> mctp::Result<(MsgType, MsgIC, &'f mut [u8], Self::RespChannel<'_>)> { + let mut stack = self.stack.lock().unwrap(); + loop { + if let Some(msg) = stack.recv(self.cookie) { + buf.get_mut(..msg.payload.len()) + .ok_or(Error::NoSpace)? + .copy_from_slice(msg.payload); + let resp = Response { + stack: Arc::clone(&self.stack), + tag: Tag::Unowned(msg.tag.tag()), + remote_eid: msg.source, + typ: msg.typ, + notifiers: Arc::clone(&self.notifiers), + }; + return Ok((msg.typ, msg.ic, &mut buf[..msg.payload.len()], resp)); + } + if let Some(timeout) = self.timeout { + let (stack_result, timeout_result) = + self.notifier.wait_timeout(stack, timeout).unwrap(); + if timeout_result.timed_out() { + return Err(Error::TimedOut); + } else { + stack = stack_result; + } + } else { + stack = self.notifier.wait(stack).unwrap(); + } + } + } +} + +impl RespChannel for Response { + type ReqChannel = Request; + + fn send_vectored(&mut self, integrity_check: MsgIC, bufs: &[&[u8]]) -> mctp::Result<()> { + self.stack + .lock() + .map_err(|_| Error::InternalError)? + .send_vectored( + Some(self.remote_eid), + self.typ, + Some(self.tag), + integrity_check, + AppCookie(255), // TODO improve this in mctp-lib + bufs, + )?; + Ok(()) + } + + fn remote_eid(&self) -> Eid { + self.remote_eid + } + + fn req_channel(&self) -> mctp::Result { + let handle = self + .stack + .lock() + .map_err(|_| Error::InternalError)? + .req(self.remote_eid)?; + let mut notifiers = self.notifiers.lock().map_err(|_| Error::InternalError)?; + let notifier = Arc::new(Condvar::new()); + notifiers.insert(handle, Arc::clone(¬ifier)); + Ok(Request { + stack: self.stack.clone(), + cookie: handle, + notifier, + timeout: None, + tag: None, + }) + } +} + +pub mod util { + use std::{ + io::{BufReader, Read}, + thread::sleep, + time::Duration, + }; + + use crate::Stack; + use embedded_io_adapters::std::FromStd; + use mctp_lib::Sender; + + /// Loop that updates the `stack` periodically + /// + /// The stack gets updated atleast once every 100 ms. + pub fn update_loop(mut stack: Stack) -> ! { + loop { + let timeout = match stack.update() { + Ok(t) => t, + Err(e) => { + println!("Error updating stack: {e}"); + 100 + } + }; + + sleep(Duration::from_millis(timeout)); + } + } + + /// Loop that reads packets from the `serial` line into the stack + pub fn inbound_loop(mut stack: Stack, serial: R) -> ! { + let mut reader = FromStd::new(BufReader::new(serial)); + let mut serial_transport = mctp_lib::serial::MctpSerialHandler::new(); + loop { + let Ok(pkt) = serial_transport + .recv_sync(&mut reader) + .inspect_err(|e| println!("Error receiving serial data: {e}")) + else { + continue; + }; + + stack + .inbound(pkt) + .inspect_err(|e| println!("Error processing inbound packet: {e}")) + .ok(); + } + } +} diff --git a/standalone/src/serial_sender.rs b/standalone/src/serial_sender.rs new file mode 100644 index 0000000..fa25bba --- /dev/null +++ b/standalone/src/serial_sender.rs @@ -0,0 +1,58 @@ +// Copyright 2025 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use embedded_io_adapters::std::FromStd; +use mctp::Error; +use std::io::Write; + +use mctp_lib::{Sender, fragment::SendOutput, serial::MctpSerialHandler}; + +pub struct IoSerialSender { + writer: FromStd, + serial_handler: MctpSerialHandler, +} +impl IoSerialSender { + pub fn new(writer: W) -> Self { + IoSerialSender { + writer: FromStd::new(writer), + serial_handler: MctpSerialHandler::new(), + } + } +} + +impl Sender for IoSerialSender { + fn send_vectored( + &mut self, + _eid: mctp::Eid, + mut fragmenter: mctp_lib::fragment::Fragmenter, + payload: &[&[u8]], + ) -> mctp::Result { + loop { + let mut pkt = [0; mctp_lib::serial::MTU_MAX]; + let fragment = fragmenter.fragment_vectored(payload, &mut pkt); + match fragment { + SendOutput::Packet(items) => { + self.serial_handler.send_sync(items, &mut self.writer)?; + self.writer.inner_mut().flush().map_err(Error::Io)?; + } + SendOutput::Complete { tag, cookie: _ } => return Ok(tag), + SendOutput::Error { err, cookie: _ } => return Err(err), + } + } + } + + fn get_mtu(&self) -> usize { + mctp_lib::serial::MTU_MAX + } +}