diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e558911..31dac1b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -17,9 +17,9 @@ jobs: with: cache-on-failure: true - name: cargo test - run: cargo test --all + run: sudo HOME=$HOME $(which cargo) test --all - name: cargo test all features - run: cargo test --all --all-features + run: sudo HOME=$HOME $(which cargo) test --all --all-features cargo-lint: runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index 52d229f..c3eb4b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -723,6 +723,7 @@ dependencies = [ "msg-socket", "msg-transport", "msg-wire", + "nix 0.27.1", "pprof", "rand", "tokio", @@ -744,6 +745,7 @@ dependencies = [ name = "msg-sim" version = "0.1.3" dependencies = [ + "nix 0.27.1", "pnet", ] @@ -809,6 +811,17 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" +dependencies = [ + "bitflags 2.4.1", + "cfg-if", + "libc", +] + [[package]] name = "no-std-net" version = "0.6.0" @@ -1076,7 +1089,7 @@ dependencies = [ "inferno", "libc", "log", - "nix", + "nix 0.26.4", "once_cell", "parking_lot", "smallvec", diff --git a/Cargo.toml b/Cargo.toml index 5fe2e46..074c5de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,11 @@ [workspace] members = [ - "msg", - "msg-socket", - "msg-wire", - "msg-transport", - "msg-common", - "msg-sim", + "msg", + "msg-socket", + "msg-wire", + "msg-transport", + "msg-common", + "msg-sim", ] resolver = "2" @@ -18,7 +18,21 @@ description = "A flexible and lightweight messaging library for distributed syst authors = ["Jonas Bostoen", "Nicolas Racchi"] homepage = "https://github.com/chainbound/msg-rs" repository = "https://github.com/chainbound/msg-rs" -keywords = ["messaging", "distributed", "systems", "networking", "quic", "quinn", "tokio", "async", "simulation", "pnet", "udp", "tcp", "socket"] +keywords = [ + "messaging", + "distributed", + "systems", + "networking", + "quic", + "quinn", + "tokio", + "async", + "simulation", + "pnet", + "udp", + "tcp", + "socket", +] [workspace.dependencies] msg-wire = { path = "./msg-wire" } @@ -55,6 +69,9 @@ pprof = { version = "0.13", features = ["flamegraph", "criterion"] } # simulation pnet = "0.34" +# *nix APIs +nix = { version = "0.27.1", features = ["sched"] } + [profile.dev] opt-level = 1 overflow-checks = false diff --git a/msg-sim/Cargo.toml b/msg-sim/Cargo.toml index 3c1595f..f652d36 100644 --- a/msg-sim/Cargo.toml +++ b/msg-sim/Cargo.toml @@ -13,3 +13,4 @@ repository.workspace = true [dependencies] pnet.workspace = true +nix.workspace = true diff --git a/msg-sim/README.md b/msg-sim/README.md index d72cb93..83f9ec7 100644 --- a/msg-sim/README.md +++ b/msg-sim/README.md @@ -1,6 +1,7 @@ # `msg-sim` ## Overview + This crate provides functionality to simulate real-world network conditions locally to and from a specific endpoint for testing and benchmarking purposes. It only works on MacOS and Linux. @@ -8,58 +9,123 @@ It only works on MacOS and Linux. ## Implementation ### MacOS + On MacOS, we use a combination of the `pfctl` and `dnctl` tools. -[`pfctl`](https://man.freebsd.org/cgi/man.cgi?query=pfctl&apropos=0&sektion=8&manpath=FreeBSD+14.0-RELEASE+and+Ports&arch=default&format=html) is a tool to manage the packet filter device. [`dnctl`](https://man.freebsd.org/cgi/man.cgi?query=dnctl&sektion=8&format=html) can manage -the [dummynet](http://info.iet.unipi.it/~luigi/papers/20100304-ccr.pdf) traffic shaper. +[`pfctl`](https://man.freebsd.org/cgi/man.cgi?query=pfctl&apropos=0&sektion=8&manpath=FreeBSD+14.0-RELEASE+and+Ports&arch=default&format=html) +is a tool to manage the packet filter device. +[`dnctl`](https://man.freebsd.org/cgi/man.cgi?query=dnctl&sektion=8&format=html) +can manage the +[dummynet](http://info.iet.unipi.it/~luigi/papers/20100304-ccr.pdf) traffic +shaper. The general flow is as follows: -* Create a dummynet pipe with `dnctl` and configure it with `bw`, `delay`, `plr` +- Create a dummynet pipe with `dnctl` and configure it with `bw`, `delay`, + `plr` Example: -```bash -sudo dnctl pipe 1 config bw 10Kbit/s delay 50 plr 0.1 -``` -* Create a loopback alias with `ifconfig` to simulate a different endpoint and -set the MTU to the usual value (1500) +`bash sudo dnctl pipe 1 config bw 10Kbit/s delay 50 plr 0.1 ` + +- Create a loopback alias with `ifconfig` to simulate a different endpoint and + set the MTU to the usual value (1500) Example: -```bash -sudo ifconfig lo0 alias 127.0.0.3 up -sudo ifconfig lo0 mtu 1500 -``` -* Use `pfctl` to create a rule to match traffic and send it through the pipe +`bash sudo ifconfig lo0 alias 127.0.0.3 up sudo ifconfig lo0 mtu 1500 ` + +- Use `pfctl` to create a rule to match traffic and send it through the pipe Example: + ```bash # Create an anchor (a named container for rules, close to a namespace) -(cat /etc/pf.conf && echo "dummynet-anchor \"msg-sim\"" && \ -echo "anchor \"msg-sim\"") | sudo pfctl -f - -# Create a rule to match traffic from any to the alias and send it through the pipe +(cat /etc/pf.conf && echo "dummynet-anchor \"msg-sim\"" && \ echo "anchor \"msg-sim\"") | sudo pfctl -f - + +# Create a rule to match traffic from any to the alias and send it through the + echo 'dummynet in from any to 127.0.0.3 pipe 1' | sudo pfctl -a msg-sim -f - # Enable the packet filter + sudo pfctl -E ``` -* Remove the rules and the pipe +- Remove the rules and the pipe + ```bash # Apply the default configuration + sudo pfctl -f /etc/pf.conf + # Disable the packet filter + sudo pfctl -d + # Remove the alias & reset the MTU sudo ifconfig lo0 -alias 127.0.0.3 sudo ifconfig lo0 mtu 16384 -# Remove the dummynet pipes + +# Remove the dummynet sudo dnctl pipe delete 1 ``` ### Questions -- Do we need to create 2 pipes to simulate a bidirectional link? MAN page seems to say so. + +- Do we need to create 2 pipes to simulate a bidirectional link? MAN page seems + to say so. ### Linux -On Linux, we use dummy interfaces and `tc` with `netem` to simulate and shape traffic. + +On Linux, we leverage network namespaces to simulate different networking +conditions, leveraging the `tc` and `netem` command to shape traffic. + +On each namespace, we create a veth pair, with one end in the default namespace, +and then we configure the veth devices as needed. + +The general flow is as follows: + +- Create a network namespace with `ip netns add` +- Add a veth pair to the namespace with `ip link add` +- Set the veth pair up with `ip link set` +- Set the IP address of the veth pair in the namespace with `ip netns exec` +- Set the network emulation parameters with `tc qdisc add dev` both in the host and + the namespaced environment + +Example: + +```bash +# create namespace ns1 +sudo ip netns add ns1 +# create veth devices linked together +sudo ip link add veth-host type veth peer name veth-ns1 +# move veth-ns1 device to ns1 namespace +sudo ip link set veth-ns1 netns ns1 + +# associate ip addr to veth-host device and spin it up +sudo ip addr add 192.168.1.2/24 dev veth-host +sudo ip link set veth-host up + +# same but from ns1 namespace +sudo ip netns exec ns1 ip addr add 192.168.1.1/24 dev veth-ns1 +sudo ip netns exec ns1 ip link set veth-ns1 up + +# add latency etc to veth-ns1 from ns1 namespace +sudo ip netns exec ns1 tc qdisc add dev veth-ns1 root netem delay 3000ms loss 50% + +# this should be slow +ping 192.168.1.1 +``` + +#### How to run tests + +Given that the tests require root privileges to modify the networking stack, +you can run them with the following command: + +```bash +sudo HOME=$HOME $(which cargo) test # add your arguments here +``` + +We need to provide the `$HOME` environment variable to `sudo` to ensure that +it can find the Rust toolchain, and then we also need to provide the path of `cargo`. diff --git a/msg-sim/src/assert.rs b/msg-sim/src/assert.rs new file mode 100644 index 0000000..45a7063 --- /dev/null +++ b/msg-sim/src/assert.rs @@ -0,0 +1,14 @@ +use std::{io, process::ExitStatus}; + +/// Assert that the given status is successful, otherwise return an error with the given message. +/// The type of the error will be `io::ErrorKind::Other`. +pub fn assert_status(status: ExitStatus, error: E) -> io::Result<()> +where + E: Into>, +{ + if !status.success() { + return Err(io::Error::new(io::ErrorKind::Other, error)); + } + + Ok(()) +} diff --git a/msg-sim/src/dummynet.rs b/msg-sim/src/dummynet.rs index 1125c7d..7b5823d 100644 --- a/msg-sim/src/dummynet.rs +++ b/msg-sim/src/dummynet.rs @@ -1,15 +1,15 @@ use std::{ io::{self, Read}, net::IpAddr, - process::{Command, ExitStatus, Stdio}, + process::{Command, Stdio}, }; -use crate::protocol::Protocol; +use crate::{assert::assert_status, protocol::Protocol}; /// Pipe represents a dummynet pipe. pub struct Pipe { /// The ID of the pipe. - pub id: usize, + pub id: u8, /// Optional bandwidth cap in Kbps. pub bandwidth: Option, /// Optional propagation delay in ms. @@ -20,7 +20,7 @@ pub struct Pipe { impl Pipe { /// Creates a new pipe with the given ID. The ID must be unique. - pub fn new(id: usize) -> Self { + pub fn new(id: u8) -> Self { Self { id, bandwidth: None, delay: None, plr: None } } @@ -42,7 +42,7 @@ impl Pipe { self } - pub fn id(&self) -> usize { + pub fn id(&self) -> u8 { self.id } @@ -111,9 +111,10 @@ pub struct PacketFilter { impl PacketFilter { /// Creates a new default packet filter from the given [`Pipe`]. pub fn new(pipe: Pipe) -> Self { + let id = pipe.id(); Self { pipe, - anchor: "msg-sim".to_string(), + anchor: format!("msg-sim-{}", id), protocols: vec![Protocol::TCP, Protocol::UDP, Protocol::ICMP], endpoint: None, loopback: get_loopback_name(), @@ -270,19 +271,6 @@ fn get_loopback_name() -> String { loopback.expect("No loopback interface").name } -/// Assert that the given status is successful, otherwise return an error with the given message. -/// The type of the error will be [`io::ErrorKind::Other`]. -fn assert_status(status: ExitStatus, error: E) -> io::Result<()> -where - E: Into>, -{ - if !status.success() { - return Err(io::Error::other(error)); - } - - Ok(()) -} - #[cfg(test)] mod tests { use std::time::Duration; diff --git a/msg-sim/src/ip_tc.rs b/msg-sim/src/ip_tc.rs new file mode 100644 index 0000000..543d0b7 --- /dev/null +++ b/msg-sim/src/ip_tc.rs @@ -0,0 +1,117 @@ +use std::{io, net::IpAddr, process::Command}; + +use crate::assert::assert_status; + +/// Take the arguments to run a command with `Command` +/// and add the prefix to run it in the namespace environment +#[inline] +fn add_namespace_prefix<'a>(namespace: &'a str, args: Vec<&'a str>) -> Vec<&'a str> { + let mut prefix_args = vec!["ip", "netns", "exec", namespace]; + prefix_args.extend(args); + prefix_args +} + +#[inline] +pub fn create_namespace(name: &str) -> io::Result<()> { + let status = Command::new("sudo") + .args(["ip", "netns", "add", &name]) + .status()?; + + assert_status(status, format!("Failed to create namespace {}", name)) +} + +/// Create Virtual Ethernet (veth) devices and link them +/// +/// Note: device name length can be max 15 chars long +#[inline] +pub fn create_veth_pair(name1: &str, name2: &str) -> io::Result<()> { + let status = Command::new("sudo") + .args([ + "ip", "link", "add", &name1, "type", "veth", "peer", "name", &name2, + ]) + .status()?; + assert_status( + status, + format!("Failed to create veth pair {}-{}", name1, name2), + ) +} + +#[inline] +pub fn move_device_to_namespace(name: &str, namespace: &str) -> io::Result<()> { + let status = Command::new("sudo") + .args(["ip", "link", "set", &name, "netns", &namespace]) + .status()?; + assert_status( + status, + format!("Failed to move device {} to namespace {}", name, namespace), + ) +} + +/// Generate a host IPv4 address from the namespace IPv4 address. +/// This is done by incrementing the last octet of the IP by 1. +#[inline] +pub fn gen_host_ip_address(namespace_ip_addr: &IpAddr) -> String { + let mut ip_host: Vec = namespace_ip_addr + .to_string() + .split('.') + .map(|octect| octect.parse::().unwrap()) + .collect(); + ip_host[3] += 1; + let ip_host = format!( + "{}.{}.{}.{}/24", + ip_host[0], ip_host[1], ip_host[2], ip_host[3] + ); + ip_host +} + +/// add IP address to device +#[inline] +pub fn add_ip_addr_to_device( + device: &str, + ip_addr: &str, + namespace: Option<&str>, +) -> io::Result<()> { + let mut args = vec!["ip", "addr", "add", ip_addr, "dev", device]; + if let Some(namespace) = namespace { + args = add_namespace_prefix(namespace, args) + }; + let status = Command::new("sudo").args(args).status()?; + assert_status( + status, + format!("Failed to add IP address {} to device {}", ip_addr, device), + ) +} + +#[inline] +pub fn spin_up_device(name: &str, namespace: Option<&str>) -> io::Result<()> { + let mut args = vec!["ip", "link", "set", "dev", name, "up"]; + if let Some(namespace) = namespace { + args = add_namespace_prefix(namespace, args) + }; + let status = Command::new("sudo").args(args).status()?; + assert_status(status, format!("Failed to spin up device {}", name)) +} + +/// Add the provided network emulation parameters for the device +/// +/// These parameters are appended to the following command: `tc qdisc add dev ` +#[inline] +pub fn add_network_emulation_parameters( + device_name: &str, + parameters: Vec<&str>, + namespace: Option<&str>, +) -> io::Result<()> { + let mut tc_command = vec!["tc", "qdisc", "add", "dev", &device_name]; + if let Some(namespace) = namespace { + tc_command = add_namespace_prefix(namespace, tc_command) + }; + + let err_message = format!( + "Failed to add network emulation parameters {:?} to device {}", + ¶meters, device_name + ); + + tc_command.extend(parameters); + let status = Command::new("sudo").args(tc_command).status()?; + assert_status(status, err_message) +} diff --git a/msg-sim/src/lib.rs b/msg-sim/src/lib.rs index 0e1d3d1..89f5c5c 100644 --- a/msg-sim/src/lib.rs +++ b/msg-sim/src/lib.rs @@ -11,13 +11,26 @@ pub mod dummynet; #[cfg(target_os = "macos")] use dummynet::{PacketFilter, Pipe}; +#[cfg(target_os = "linux")] +pub mod namespace; +#[cfg(target_os = "linux")] +use std::process::Command; + +pub mod assert; +pub mod ip_tc; + #[derive(Debug)] -#[allow(unused)] pub struct SimulationConfig { /// The latency of the connection. pub latency: Option, /// The bandwidth in Kbps. pub bw: Option, + /// The maximum burst size in kbit. + #[cfg(target_os = "linux")] + pub burst: Option, + /// The buffer size in bytes. + #[cfg(target_os = "linux")] + pub limit: Option, /// The packet loss rate in percent. pub plr: Option, /// The supported protocols. @@ -27,25 +40,35 @@ pub struct SimulationConfig { #[derive(Default)] pub struct Simulator { /// A map of active simulations. - active_sims: HashMap, + pub active_sims: HashMap, /// Simulation ID counter. - sim_id: usize, + pub active_sim_count: u8, + /// A unique simulator identifier. + pub id: u8, } impl Simulator { - pub fn new() -> Self { - Self { active_sims: HashMap::new(), sim_id: 1 } + pub fn new(id: u8) -> Self { + Self { active_sims: HashMap::new(), active_sim_count: 1, id } } /// Starts a new simulation on the given endpoint according to the config. - pub fn start(&mut self, endpoint: IpAddr, config: SimulationConfig) -> io::Result { - let id = self.sim_id; + /// + /// ### Linux + /// The simulation is done using network namespaces where multiple + /// IP addresses are needed. Make sure that the endpoint IP address is not alreay in use, + /// and that the "next one" is available. + /// + /// #### Example + /// If `endpoint` is 192.168.1.1, then both 192.168.1.1 and 192.168.1.2 will be used + pub fn start(&mut self, endpoint: IpAddr, config: SimulationConfig) -> io::Result { + let id = self.active_sim_count; - let mut simulation = Simulation::new(id, endpoint, config); + let mut simulation = Simulation::new(id, self.id, endpoint, config); simulation.start()?; - self.sim_id += 1; + self.active_sim_count += 1; self.active_sims.insert(endpoint, simulation); @@ -59,21 +82,23 @@ impl Simulator { } } -/// An active simulation. +/// An active simulation spawned by the simulator. #[allow(unused)] -struct Simulation { - id: usize, - endpoint: IpAddr, - config: SimulationConfig, +pub struct Simulation { + pub id: u8, + pub simulator_id: u8, + pub endpoint: IpAddr, + pub config: SimulationConfig, #[cfg(target_os = "macos")] active_pf: Option, } impl Simulation { - fn new(id: usize, endpoint: IpAddr, config: SimulationConfig) -> Self { + fn new(id: u8, simulator_id: u8, endpoint: IpAddr, config: SimulationConfig) -> Self { Self { id, + simulator_id, endpoint, config, #[cfg(target_os = "macos")] @@ -81,9 +106,98 @@ impl Simulation { } } + #[inline] + #[cfg(target_os = "linux")] + /// Get the namespace name used for the simulation. + pub fn namespace_name(&self) -> String { + format!("msg-{}-{}", self.simulator_id, self.id) + } + + #[inline] + #[cfg(target_os = "linux")] + /// Get the host veth device used name for the simulation. + pub fn veth_host_name(&self) -> String { + format!("vh-msg-{}-{}", self.simulator_id, self.id) + } + + #[inline] + #[cfg(target_os = "linux")] + /// Get the namespaced veth device name used for the simulation. + pub fn veth_namespace_name(&self) -> String { + format!("vn-msg-{}-{}", self.simulator_id, self.id) + } + /// Starts the simulation. #[cfg(target_os = "linux")] fn start(&mut self) -> io::Result<()> { + // Create network namespace + + let network_namespace = self.namespace_name(); + let veth_host = self.veth_host_name(); + let veth_namespace = self.veth_namespace_name(); + let ip_namespace = format!("{}/24", self.endpoint); + + ip_tc::create_namespace(&network_namespace)?; + ip_tc::create_veth_pair(&veth_host, &veth_namespace)?; + ip_tc::move_device_to_namespace(&veth_namespace, &network_namespace)?; + + let ip_host = ip_tc::gen_host_ip_address(&self.endpoint); + + ip_tc::add_ip_addr_to_device(&veth_host, &ip_host, None)?; + ip_tc::spin_up_device(&veth_host, None)?; + + ip_tc::add_ip_addr_to_device(&veth_namespace, &ip_namespace, Some(&network_namespace))?; + ip_tc::spin_up_device(&veth_namespace, Some(&network_namespace))?; + ip_tc::spin_up_device("lo", Some(&network_namespace))?; + + let delay = format!("{}ms", self.config.latency.unwrap_or(Duration::new(0, 0)).as_millis()); + + let loss = format!("{}%", self.config.plr.unwrap_or(0_f64)); + + // Add delay to the host veth device to match MacOS symmetric behaviour + // + // The behaviour is specified on the top-level ("root"), + // with a custom handle for identification + let mut args = vec!["root", "handle", "1:", "netem"]; + if self.config.latency.is_some() { + args.push("delay"); + args.push(&delay); + } + ip_tc::add_network_emulation_parameters(&veth_host, args, None)?; + + // Add network emulation parameters (delay, loss) on namespaced veth device + let mut args = vec!["root", "handle", "1:", "netem"]; + if self.config.latency.is_some() { + args.push("delay"); + args.push(&delay); + } + if (self.config.plr).is_some() { + args.push("loss"); + args.push(&loss); + } + ip_tc::add_network_emulation_parameters(&veth_namespace, args, Some(&network_namespace))?; + + // Add bandwidth paramteres on namespaced veth device + // + // The behaviour is specified on top of the root queue discipline, + // as parent. It uses "Hierarchical Token Bucket" (HBT) discipline + if let Some(bandwidth) = self.config.bw { + let bandwidth = format!("{}kbit", bandwidth); + let burst = format!("{}kbit", self.config.burst.unwrap_or(32)); + let limit = format!("{}", self.config.limit.unwrap_or(10_000)); + + let args = vec![ + "parent", "1:", "handle", "2:", "tbf", "rate", &bandwidth, "burst", &burst, + "limit", &limit, + ]; + + ip_tc::add_network_emulation_parameters(&veth_host, args.clone(), None)?; + ip_tc::add_network_emulation_parameters( + &veth_namespace, + args, + Some(&network_namespace), + )?; + } Ok(()) } @@ -122,7 +236,16 @@ impl Simulation { impl Drop for Simulation { #[cfg(target_os = "linux")] - fn drop(&mut self) {} + fn drop(&mut self) { + let veth_host = self.veth_host_name(); + let network_namespace = self.namespace_name(); + + // The only thing we have to do in the host to delete the veth device + let _ = Command::new("sudo").args(["ip", "link", "del", &veth_host]).status(); + // Deleting the network namespace where the simulated endpoint lives + // drops everything in there + let _ = Command::new("sudo").args(["ip", "netns", "del", &network_namespace]).status(); + } #[cfg(target_os = "macos")] fn drop(&mut self) { @@ -131,3 +254,35 @@ impl Drop for Simulation { } } } + +#[cfg(test)] +mod test { + #[cfg(target_os = "linux")] + #[test] + fn start_simulation() { + use std::{ + net::{IpAddr, Ipv4Addr}, + time::Duration, + }; + + use super::*; + + let mut simulator = Simulator::new(1); + + let addr = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)); + let config = SimulationConfig { + latency: Some(Duration::from_millis(2000)), + bw: Some(1_000), + burst: Some(32), + limit: None, + plr: Some(30_f64), + protocols: vec![Protocol::TCP], + }; + + let res = simulator.start(addr, config); + if let Err(e) = &res { + eprintln!("{}", e); + } + assert!(res.is_ok()); + } +} diff --git a/msg-sim/src/namespace.rs b/msg-sim/src/namespace.rs new file mode 100644 index 0000000..d26f85c --- /dev/null +++ b/msg-sim/src/namespace.rs @@ -0,0 +1,24 @@ +use std::io; +use std::pin::Pin; +use std::{fs::File, future::Future}; + +use nix::sched::{setns, CloneFlags}; + +/// Run the code in the provided function `f` in the network namespace `namespace` +pub async fn run_on_namespace( + namespace: &str, + f: impl FnOnce() -> Pin> + Send>>, +) -> io::Result { + let namespace_path = format!("/var/run/netns/{}", namespace); + + let ns_fd = File::open(namespace_path)?; + let host_ns_fd = File::open("/proc/1/ns/net")?; + + // Use setns to switch to the network namespace + setns(ns_fd, CloneFlags::CLONE_NEWNET)?; + let res = f().await?; + // Go back to the host network environment after calling `f` + setns(host_ns_fd, CloneFlags::CLONE_NEWNET)?; + + Ok(res) +} diff --git a/msg-socket/tests/it/pubsub.rs b/msg-socket/tests/it/pubsub.rs index 5d517c3..96761f3 100644 --- a/msg-socket/tests/it/pubsub.rs +++ b/msg-socket/tests/it/pubsub.rs @@ -1,268 +1,351 @@ -use std::{collections::HashSet, net::IpAddr, time::Duration}; +use std::{collections::HashSet, net::SocketAddr, time::Duration}; use bytes::Bytes; -use msg_sim::{Protocol, SimulationConfig, Simulator}; use rand::Rng; use tokio::{sync::mpsc, task::JoinSet}; use tokio_stream::StreamExt; use tracing::info; +use msg_sim::{Protocol, Simulation, SimulationConfig, Simulator}; use msg_socket::{PubSocket, SubSocket}; use msg_transport::{quic::Quic, tcp::Tcp, Address, Transport}; -const TOPIC: &str = "test"; - -fn init_simulation(addr: IpAddr, config: SimulationConfig) -> Simulator { - let mut simulator = Simulator::new(); - simulator.start(addr, config).unwrap(); - - simulator -} - -/// Single publisher, single subscriber -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[ignore] -async fn pubsub_channel() { - let _ = tracing_subscriber::fmt::try_init(); - - let addr = "127.0.0.1".parse().unwrap(); - - let mut simulator = init_simulation( - addr, - SimulationConfig { - latency: Some(Duration::from_millis(50)), - bw: None, - plr: None, - protocols: vec![Protocol::UDP, Protocol::TCP], - }, - ); - - let result = pubsub_channel_transport(build_tcp, "127.0.0.1:9879".parse().unwrap()).await; - - assert!(result.is_ok()); - - let result = pubsub_channel_transport(build_quic, "127.0.0.1:9879".parse().unwrap()).await; - - assert!(result.is_ok()); - - simulator.stop(addr); -} - -async fn pubsub_channel_transport( - new_transport: F, - addr: A, -) -> Result<(), Box> -where - F: Fn() -> T, - T: Transport + Send + Sync + Unpin + 'static, - A: Address, -{ - let mut publisher = PubSocket::new(new_transport()); - - let mut subscriber = SubSocket::new(new_transport()); - subscriber.connect_inner(addr.clone()).await?; - subscriber.subscribe(TOPIC).await?; - - inject_delay(400).await; - - publisher.try_bind(vec![addr]).await?; - - // Spawn a task to keep sending messages until the subscriber receives one (after connection - // process) - tokio::spawn(async move { - loop { - tokio::time::sleep(Duration::from_millis(500)).await; - publisher.publish(TOPIC, Bytes::from("WORLD")).await.unwrap(); - } - }); - - let msg = subscriber.next().await.unwrap(); - info!("Received message: {:?}", msg); - assert_eq!(TOPIC, msg.topic()); - assert_eq!("WORLD", msg.payload()); - - Ok(()) -} - -/// Single publisher, multiple subscribers -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[ignore] -async fn pubsub_fan_out() { - let _ = tracing_subscriber::fmt::try_init(); - - let addr = "127.0.0.1".parse().unwrap(); - - let mut simulator = init_simulation( - addr, - SimulationConfig { - latency: Some(Duration::from_millis(150)), - bw: None, - plr: None, - protocols: vec![Protocol::UDP, Protocol::TCP], - }, - ); - - let result = pubsub_fan_out_transport(build_tcp, 10, "127.0.0.1:9880".parse().unwrap()).await; - - assert!(result.is_ok()); - - let result = pubsub_fan_out_transport(build_quic, 10, "127.0.0.1:9880".parse().unwrap()).await; - - assert!(result.is_ok()); - - simulator.stop(addr); -} - -async fn pubsub_fan_out_transport< - F: Fn() -> T + Send + 'static + Copy, - T: Transport + Send + Sync + Unpin + 'static, - A: Address, ->( - new_transport: F, - subscibers: usize, - addr: A, -) -> Result<(), Box> { - let mut publisher = PubSocket::new(new_transport()); - - let mut sub_tasks = JoinSet::new(); - - for i in 0..subscibers { - let cloned = addr.clone(); - sub_tasks.spawn(async move { - let mut subscriber = SubSocket::new(new_transport()); - inject_delay((100 * (i + 1)) as u64).await; +#[cfg(target_os = "linux")] +use msg_sim::namepsace::run_on_namespace; - subscriber.connect_inner(cloned).await.unwrap(); - inject_delay((1000 / (i + 1)) as u64).await; - subscriber.subscribe(TOPIC).await.unwrap(); - - let msg = subscriber.next().await.unwrap(); - info!("Received message: {:?}", msg); - assert_eq!(TOPIC, msg.topic()); - assert_eq!("WORLD", msg.payload()); - }); - } - - inject_delay(400).await; - - publisher.try_bind(vec![addr]).await?; - - // Spawn a task to keep sending messages until the subscriber receives one (after connection - // process) - tokio::spawn(async move { - loop { - tokio::time::sleep(Duration::from_millis(500)).await; - publisher.publish(TOPIC, Bytes::from("WORLD")).await.unwrap(); - } - }); - - for _ in 0..subscibers { - sub_tasks.join_next().await.unwrap().unwrap(); - } - - Ok(()) -} - -/// Multiple publishers, single subscriber -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[ignore] -async fn pubsub_fan_in() { - let _ = tracing_subscriber::fmt::try_init(); - - let addr = "127.0.0.1".parse().unwrap(); - - let mut simulator = init_simulation( - addr, - SimulationConfig { - latency: Some(Duration::from_millis(150)), - bw: None, - plr: None, - protocols: vec![Protocol::UDP, Protocol::TCP], - }, - ); - - let result = pubsub_fan_in_transport(build_tcp, 20, "127.0.0.1:9881".parse().unwrap()).await; - - assert!(result.is_ok()); - - let result = pubsub_fan_in_transport(build_quic, 20, "127.0.0.1:9881".parse().unwrap()).await; - - assert!(result.is_ok()); - - simulator.stop(addr); -} - -async fn pubsub_fan_in_transport< - F: Fn() -> T + Send + 'static + Copy, - T: Transport + Send + Sync + Unpin + 'static, - A: Address, ->( - new_transport: F, - publishers: usize, - addr: A, -) -> Result<(), Box> { - let mut sub_tasks = JoinSet::new(); - - let (tx, mut rx) = mpsc::channel(publishers); - - for i in 0..publishers { - let tx = tx.clone(); - let addr = addr.clone(); - sub_tasks.spawn(async move { - let mut publisher = PubSocket::new(new_transport()); - inject_delay((100 * (i + 1)) as u64).await; - - publisher.try_bind(vec![addr]).await.unwrap(); - - let local_addr = publisher.local_addr().unwrap().clone(); - tx.send(local_addr).await.unwrap(); - - // Spawn a task to keep sending messages until the subscriber receives one (after - // connection process) - tokio::spawn(async move { - loop { - tokio::time::sleep(Duration::from_millis(500)).await; - publisher.publish(TOPIC, Bytes::from("WORLD")).await.unwrap(); - } - }); - }); - } - - drop(tx); - - let mut subscriber = SubSocket::new(new_transport()); - - let mut addrs = HashSet::with_capacity(publishers); - - while let Some(addr) = rx.recv().await { - addrs.insert(addr); - } - - for addr in addrs.clone() { - inject_delay(500).await; - subscriber.connect_inner(addr.clone()).await.unwrap(); - subscriber.subscribe(TOPIC).await.unwrap(); - } - - loop { - if addrs.is_empty() { - break; - } - - let msg = subscriber.next().await.unwrap(); - info!("Received message: {:?}", msg); - assert_eq!(TOPIC, msg.topic()); - assert_eq!("WORLD", msg.payload()); - - addrs.remove(msg.source()); - } - - for _ in 0..publishers { - sub_tasks.join_next().await.unwrap().unwrap(); - } +const TOPIC: &str = "test"; - Ok(()) -} +// TODO: fix these tests +///// Single publisher, single subscriber +//#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +//async fn pubsub_channel() { +// let _ = tracing_subscriber::fmt::try_init(); +// +// #[cfg(target_os = "linux")] +// let addr = "192.168.1.1".parse().unwrap(); +// +// #[cfg(target_os = "macos")] +// let addr = "127.0.0.2".parse().unwrap(); +// +// let mut simulator = Simulator::new(1); +// let result = simulator.start( +// addr, +// SimulationConfig { +// latency: Some(Duration::from_millis(500)), +// bw: None, +// #[cfg(target_os = "linux")] +// burst: None, +// #[cfg(target_os = "linux")] +// limit: None, +// plr: Some(20_f64), +// protocols: vec![Protocol::UDP, Protocol::TCP], +// }, +// ); +// +// assert!(result.is_ok()); +// +// let simulation = simulator.active_sims.get(&addr).unwrap(); +// +// let result = pubsub_channel_transport(build_tcp, simulation).await; +// +// assert!(result.is_ok()); +// +// let result = pubsub_channel_transport(build_quic, simulation).await; +// +// assert!(result.is_ok()); +// +// simulator.stop(addr); +//} +// +// async fn pubsub_channel_transport( +// new_transport: F, +// simulation: &Simulation, +// ) -> Result<(), Box> +// where +// F: Fn() -> T, +// T: Transport + Send + Sync + Unpin + 'static, +// A: Address, +// { +// let mut publisher = PubSocket::new(new_transport()); +// let socket_addr = SocketAddr::new(simulation.endpoint, 0); +// +// #[cfg(target_os = "linux")] +// let publisher = { +// run_on_namespace(&simulation.namespace_name(), || { +// Box::pin(async move { +// let _ = publisher.bind(socket_addr).await; +// Ok(publisher) +// }) +// }) +// .await? +// }; +// +// #[cfg(target_os = "macos")] +// publisher.try_bind(vec![socket_addr]).await?; +// +// let mut subscriber = SubSocket::new(new_transport()); +// subscriber.connect_inner(publisher.local_addr().unwrap()).await?; +// subscriber.subscribe(TOPIC).await?; +// +// inject_delay(400).await; +// +// publisher.try_bind(vec![addr]).await?; +// +// // Spawn a task to keep sending messages until the subscriber receives one (after connection +// // process) +// tokio::spawn(async move { +// loop { +// tokio::time::sleep(Duration::from_millis(500)).await; +// publisher.publish(TOPIC, Bytes::from("WORLD")).await.unwrap(); +// } +// }); +// +// let msg = subscriber.next().await.unwrap(); +// info!("Received message: {:?}", msg); +// assert_eq!(TOPIC, msg.topic()); +// assert_eq!("WORLD", msg.payload()); +// +// Ok(()) +// } +// +// /// Single publisher, multiple subscribers +// #[tokio::test(flavor = "multi_thread", worker_threads = 4)] +// async fn pubsub_fan_out() { +// let _ = tracing_subscriber::fmt::try_init(); +// +// #[cfg(target_os = "linux")] +// let addr = "192.168.2.1".parse().unwrap(); +// +// #[cfg(target_os = "macos")] +// let addr = "127.0.0.3".parse().unwrap(); +// +// let mut simulator = Simulator::new(2); +// let result = simulator.start( +// addr, +// SimulationConfig { +// latency: Some(Duration::from_millis(500)), +// bw: None, +// #[cfg(target_os = "linux")] +// burst: None, +// #[cfg(target_os = "linux")] +// limit: None, +// plr: Some(20_f64), +// protocols: vec![Protocol::UDP, Protocol::TCP], +// }, +// ); +// +// assert!(result.is_ok()); +// +// let simulation = simulator.active_sims.get(&addr).unwrap(); +// +// let result = pubsub_fan_out_transport(build_tcp, simulation, 20).await; +// +// assert!(result.is_ok()); +// +// let result = pubsub_fan_out_transport(build_quic, simulation, 20).await; +// +// assert!(result.is_ok()); +// +// simulator.stop(addr); +// } +// +// async fn pubsub_fan_out_transport< +// F: Fn() -> T + Send + 'static + Copy, +// T: Transport + Send + Sync + Unpin + 'static, +// A: Address, +// >( +// new_transport: F, +// simulation: &Simulation, +// subscibers: usize, +// addr: A, +// ) -> Result<(), Box> { +// let mut publisher = PubSocket::new(new_transport()); +// +// let mut sub_tasks = JoinSet::new(); +// +// let socket_addr = SocketAddr::new(simulation.endpoint, 0); +// +// #[cfg(target_os = "linux")] +// let publisher = { +// run_on_namespace(&simulation.namespace_name(), || { +// Box::pin(async move { +// let _ = publisher.bind(socket_addr).await; +// Ok(publisher) +// }) +// }) +// .await? +// }; +// +// #[cfg(target_os = "macos")] +// publisher.bind(socket_addr).await?; +// +// inject_delay(400).await; +// +// let sub_addr = publisher.local_addr().unwrap(); +// +// for i in 0..subscibers { +// sub_tasks.spawn(async move { +// let mut subscriber = SubSocket::new(new_transport()); +// inject_delay((100 * (i + 1)) as u64).await; +// +// subscriber.connect(sub_addr).await.unwrap(); +// inject_delay((1000 / (i + 1)) as u64).await; +// subscriber.subscribe(TOPIC).await.unwrap(); +// +// let msg = subscriber.next().await.unwrap(); +// info!("Received message: {:?}", msg); +// assert_eq!(TOPIC, msg.topic()); +// assert_eq!("WORLD", msg.payload()); +// }); +// } +// +// // Spawn a task to keep sending messages until the subscriber receives one (after connection +// // process) +// tokio::spawn(async move { +// loop { +// tokio::time::sleep(Duration::from_millis(500)).await; +// publisher.publish(TOPIC, Bytes::from("WORLD")).await.unwrap(); +// } +// }); +// +// for _ in 0..subscibers { +// sub_tasks.join_next().await.unwrap().unwrap(); +// } +// +// Ok(()) +// } +// +// /// Multiple publishers, single subscriber +// #[tokio::test(flavor = "multi_thread", worker_threads = 4)] +// async fn pubsub_fan_in() { +// let _ = tracing_subscriber::fmt::try_init(); +// +// #[cfg(target_os = "linux")] +// let addr = "192.168.3.1".parse().unwrap(); +// +// #[cfg(target_os = "macos")] +// let addr = "127.0.0.4".parse().unwrap(); +// +// let mut simulator = Simulator::new(3); +// let result = simulator.start( +// addr, +// SimulationConfig { +// latency: Some(Duration::from_millis(500)), +// bw: None, +// #[cfg(target_os = "linux")] +// burst: None, +// #[cfg(target_os = "linux")] +// limit: None, +// plr: Some(20_f64), +// protocols: vec![Protocol::UDP, Protocol::TCP], +// }, +// ); +// +// assert!(result.is_ok()); +// +// let simulation = simulator.active_sims.get(&addr).unwrap(); +// +// let result = pubsub_fan_in_transport(build_tcp, simulation, 5).await; +// +// assert!(result.is_ok()); +// +// let result = pubsub_fan_in_transport(build_quic, simulation, 5).await; +// +// assert!(result.is_ok()); +// +// simulator.stop(addr); +// } +// +// async fn pubsub_fan_in_transport< +// F: Fn() -> T + Send + 'static + Copy, +// T: Transport + Send + Sync + Unpin + 'static, +// A: Address, +// >( +// new_transport: F, +// simulation: &Simulation, +// publishers: usize, +// addr: A, +// ) -> Result<(), Box> { +// let mut sub_tasks = JoinSet::new(); +// +// let (tx, mut rx) = mpsc::channel(publishers); +// +// #[cfg(target_os = "linux")] +// let namespace = simulation.namespace_name(); +// let socket_addr = SocketAddr::new(simulation.endpoint, 0); +// +// for i in 0..publishers { +// let tx = tx.clone(); +// #[cfg(target_os = "linux")] +// let namespace = namespace.clone(); +// sub_tasks.spawn(async move { +// let mut publisher = PubSocket::new(new_transport()); +// inject_delay((100 * (i + 1)) as u64).await; +// +// #[cfg(target_os = "linux")] +// let publisher = { +// run_on_namespace(&namespace, || { +// Box::pin(async move { +// publisher.bind(socket_addr).await.unwrap(); +// Ok(publisher) +// }) +// }) +// .await +// .unwrap() +// }; +// +// #[cfg(target_os = "macos")] +// publisher.bind(socket_addr).await.unwrap(); +// +// let local_addr = publisher.local_addr().unwrap().clone(); +// tx.send(local_addr).await.unwrap(); +// +// // Spawn a task to keep sending messages until the subscriber receives one (after +// // connection process) +// tokio::spawn(async move { +// loop { +// tokio::time::sleep(Duration::from_millis(500)).await; +// publisher.publish(TOPIC, Bytes::from("WORLD")).await.unwrap(); +// } +// }); +// }); +// } +// +// drop(tx); +// +// let mut subscriber = SubSocket::new(new_transport()); +// +// let mut addrs = HashSet::with_capacity(publishers); +// +// while let Some(addr) = rx.recv().await { +// addrs.insert(addr); +// } +// +// for addr in addrs.clone() { +// inject_delay(500).await; +// subscriber.connect_inner(addr.clone()).await.unwrap(); +// subscriber.subscribe(TOPIC).await.unwrap(); +// } +// +// loop { +// if addrs.is_empty() { +// break; +// } +// +// let msg = subscriber.next().await.unwrap(); +// info!("Received message: {:?}", msg); +// assert_eq!(TOPIC, msg.topic()); +// assert_eq!("WORLD", msg.payload()); +// +// addrs.remove(msg.source()); +// } +// +// for _ in 0..publishers { +// sub_tasks.join_next().await.unwrap().unwrap(); +// } +// +// Ok(()) +// } fn build_tcp() -> Tcp { Tcp::default() diff --git a/msg/Cargo.toml b/msg/Cargo.toml index 8a2e005..4af3fdc 100644 --- a/msg/Cargo.toml +++ b/msg/Cargo.toml @@ -16,6 +16,8 @@ msg-transport.workspace = true msg-wire.workspace = true tokio.workspace = true +bytes.workspace = true +nix.workspace = true tokio-stream.workspace = true [dev-dependencies]