Skip to content

Commit 71eba94

Browse files
feat(dataplane): make datplane consume memfd
With this commit the dataplane now consumes and acts on the memfd created by dataplane init from the previous commit. Co-authored-by: Fredi Raspal <[email protected]> Signed-off-by: Daniel Noland <[email protected]>
1 parent 03488ce commit 71eba94

File tree

8 files changed

+194
-189
lines changed

8 files changed

+194
-189
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Dockerfile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ FROM $BASE AS dataplane
33
ARG ARTIFACT
44
ARG ARTIFACT_CLI
55
COPY --link --chown=0:0 "${ARTIFACT}" /dataplane
6+
COPY --link --chown=0:0 "${ARTIFACT}" /dataplane-init
67
COPY --link --chown=0:0 "${ARTIFACT_CLI}" /dataplane-cli
78
WORKDIR /
8-
ENTRYPOINT ["/dataplane"]
9+
ENTRYPOINT ["/dataplane-init"]

args/src/lib.rs

Lines changed: 71 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -100,20 +100,26 @@ use std::os::fd::{AsFd, AsRawFd, FromRawFd, OwnedFd, RawFd};
100100
use std::path::PathBuf;
101101
use std::str::FromStr;
102102

103-
#[derive(Debug, Clone, PartialEq, serde::Serialize)]
104-
pub enum PortArg {
105-
PCI(PciAddress), // DPDK driver
106-
KERNEL(InterfaceName), // kernel driver
107-
}
108-
109-
#[derive(Debug, Clone, serde::Serialize)]
103+
#[derive(
104+
CheckBytes,
105+
Clone,
106+
Debug,
107+
Eq,
108+
PartialEq,
109+
rkyv::Archive,
110+
rkyv::Deserialize,
111+
rkyv::Serialize,
112+
serde::Deserialize,
113+
serde::Serialize,
114+
)]
115+
#[rkyv(attr(derive(PartialEq, Eq, Debug)))]
110116
#[allow(unused)]
111117
pub struct InterfaceArg {
112118
pub interface: InterfaceName,
113-
pub port: Option<PortArg>,
119+
pub port: NetworkDeviceDescription,
114120
}
115121

116-
impl FromStr for PortArg {
122+
impl FromStr for NetworkDeviceDescription {
117123
type Err = String;
118124
fn from_str(input: &str) -> Result<Self, Self::Err> {
119125
let (disc, value) = input
@@ -123,12 +129,12 @@ impl FromStr for PortArg {
123129
match disc {
124130
"pci" => {
125131
let pciaddr = PciAddress::try_from(value).map_err(|e| e.to_string())?;
126-
Ok(PortArg::PCI(pciaddr))
132+
Ok(NetworkDeviceDescription::Pci(pciaddr))
127133
}
128134
"kernel" => {
129135
let kernelif = InterfaceName::try_from(value)
130136
.map_err(|e| format!("Bad kernel interface name: {e}"))?;
131-
Ok(PortArg::KERNEL(kernelif))
137+
Ok(NetworkDeviceDescription::Kernel(kernelif))
132138
}
133139
_ => Err(format!(
134140
"Unknown discriminant '{disc}': allowed values are pci|kernel"
@@ -141,21 +147,12 @@ impl FromStr for InterfaceArg {
141147
type Err = String;
142148
fn from_str(input: &str) -> Result<Self, Self::Err> {
143149
if let Some((first, second)) = input.split_once('=') {
144-
let interface = InterfaceName::try_from(first)
145-
.map_err(|e| format!("Bad interface name: {e}"))?;
146-
147-
let port = PortArg::from_str(second)?;
148-
Ok(InterfaceArg {
149-
interface,
150-
port: Some(port),
151-
})
150+
let interface =
151+
InterfaceName::try_from(first).map_err(|e| format!("Bad interface name: {e}"))?;
152+
let port = NetworkDeviceDescription::from_str(second)?;
153+
Ok(InterfaceArg { interface, port })
152154
} else {
153-
let interface = InterfaceName::try_from(input)
154-
.map_err(|e| format!("Bad interface name: {e}"))?;
155-
Ok(InterfaceArg {
156-
interface,
157-
port: None,
158-
})
155+
Err(format!("invalid interface argument: {input}"))
159156
}
160157
}
161158
}
@@ -503,7 +500,7 @@ impl Display for NetworkDeviceDescription {
503500
#[rkyv(attr(derive(Debug, PartialEq, Eq)))]
504501
pub struct DpdkDriverConfigSection {
505502
/// Network devices to use with DPDK (identified by PCI address)
506-
pub use_nics: Vec<NetworkDeviceDescription>,
503+
pub interfaces: Vec<InterfaceArg>,
507504
/// DPDK EAL (Environment Abstraction Layer) initialization arguments
508505
pub eal_args: Vec<String>,
509506
}
@@ -545,7 +542,7 @@ pub struct DpdkDriverConfigSection {
545542
#[rkyv(attr(derive(PartialEq, Eq, Debug)))]
546543
pub struct KernelDriverConfigSection {
547544
/// Kernel network interfaces to manage
548-
pub interfaces: Vec<InterfaceName>,
545+
pub interfaces: Vec<InterfaceArg>,
549546
}
550547

551548
/// Configuration for the dataplane's command-line interface (CLI).
@@ -734,6 +731,8 @@ pub struct ConfigServerSection {
734731
pub struct LaunchConfiguration {
735732
/// Dynamic configuration server settings
736733
pub config_server: ConfigServerSection,
734+
/// Number of dataplane worker threads / cores
735+
pub dataplane_workers: usize,
737736
/// Packet processing driver configuration
738737
pub driver: DriverConfigSection,
739738
/// CLI server configuration
@@ -744,6 +743,32 @@ pub struct LaunchConfiguration {
744743
pub tracing: TracingConfigSection,
745744
/// Metrics collection configuration
746745
pub metrics: MetricsConfigSection,
746+
/// Profileing collection configuration
747+
pub profiling: ProfilingConfigSection,
748+
}
749+
750+
#[derive(
751+
Debug, Clone, PartialEq, Eq, serde::Serialize, rkyv::Serialize, rkyv::Deserialize, rkyv::Archive,
752+
)]
753+
#[rkyv(attr(derive(PartialEq, Eq, Debug)))]
754+
pub struct ProfilingConfigSection {
755+
/// The URL of the pryroscope url
756+
pub pyroscope_url: Option<String>,
757+
/// Frequency with which we collect stack traces
758+
pub frequency: u32,
759+
}
760+
761+
impl ProfilingConfigSection {
762+
pub const DEFAULT_FREQUENCY: u32 = 100;
763+
}
764+
765+
impl Default for ProfilingConfigSection {
766+
fn default() -> Self {
767+
Self {
768+
pyroscope_url: None,
769+
frequency: Self::DEFAULT_FREQUENCY,
770+
}
771+
}
747772
}
748773

749774
impl LaunchConfiguration {
@@ -1191,26 +1216,19 @@ impl TryFrom<CmdArgs> for LaunchConfiguration {
11911216
type Error = InvalidCmdArguments;
11921217

11931218
fn try_from(value: CmdArgs) -> Result<Self, InvalidCmdArguments> {
1194-
let use_nics: Vec<_> = value
1195-
.interfaces()
1196-
.map(|x| match x.port {
1197-
Some(PortArg::KERNEL(name)) => NetworkDeviceDescription::Kernel(name),
1198-
Some(PortArg::PCI(address)) => NetworkDeviceDescription::Pci(address),
1199-
None => todo!(), // I am not clear what this case means
1200-
})
1201-
.collect();
12021219
Ok(LaunchConfiguration {
12031220
config_server: ConfigServerSection {
12041221
address: value
12051222
.grpc_address()
12061223
.map_err(InvalidCmdArguments::InvalidGrpcAddress)?,
12071224
},
1225+
dataplane_workers: value.num_workers.into(),
12081226
driver: match &value.driver {
12091227
Some(driver) if driver == "dpdk" => {
12101228
// TODO: adjust command line to specify lcore usage more flexibly in next PR
1211-
let eal_args = use_nics
1212-
.iter()
1213-
.map(|nic| match nic {
1229+
let eal_args = value
1230+
.interfaces()
1231+
.map(|nic| match nic.port {
12141232
NetworkDeviceDescription::Pci(pci_address) => {
12151233
Ok(["--allow".to_string(), format!("{pci_address}")])
12161234
}
@@ -1224,23 +1242,15 @@ impl TryFrom<CmdArgs> for LaunchConfiguration {
12241242
.into_iter()
12251243
.flatten()
12261244
.collect();
1227-
DriverConfigSection::Dpdk(DpdkDriverConfigSection { use_nics, eal_args })
1245+
1246+
DriverConfigSection::Dpdk(DpdkDriverConfigSection {
1247+
interfaces: value.interfaces().collect(),
1248+
eal_args,
1249+
})
12281250
}
12291251
Some(driver) if driver == "kernel" => {
12301252
DriverConfigSection::Kernel(KernelDriverConfigSection {
1231-
interfaces: use_nics
1232-
.iter()
1233-
.map(|nic| match nic {
1234-
NetworkDeviceDescription::Pci(address) => {
1235-
Err(InvalidCmdArguments::UnsupportedByDriver(
1236-
UnsupportedByDriver::Kernel(*address),
1237-
))
1238-
}
1239-
NetworkDeviceDescription::Kernel(interface) => {
1240-
Ok(interface.clone())
1241-
}
1242-
})
1243-
.collect::<Result<_, _>>()?,
1253+
interfaces: value.interfaces().collect(),
12441254
})
12451255
}
12461256
Some(other) => Err(InvalidCmdArguments::InvalidDriver(other.clone()))?,
@@ -1271,6 +1281,10 @@ impl TryFrom<CmdArgs> for LaunchConfiguration {
12711281
metrics: MetricsConfigSection {
12721282
address: value.metrics_address(),
12731283
},
1284+
profiling: ProfilingConfigSection {
1285+
pyroscope_url: value.pyroscope_url().map(std::string::ToString::to_string),
1286+
frequency: ProfilingConfigSection::DEFAULT_FREQUENCY,
1287+
},
12741288
})
12751289
}
12761290
}
@@ -1561,6 +1575,7 @@ impl CmdArgs {
15611575
self.metrics_address
15621576
}
15631577

1578+
#[must_use]
15641579
pub fn pyroscope_url(&self) -> Option<&url::Url> {
15651580
self.pyroscope_url.as_ref()
15661581
}
@@ -1575,7 +1590,7 @@ mod tests {
15751590
use hardware::pci::function::Function;
15761591
use net::interface::InterfaceName;
15771592

1578-
use crate::{InterfaceArg, PortArg};
1593+
use crate::{InterfaceArg, NetworkDeviceDescription};
15791594
use std::str::FromStr;
15801595

15811596
#[test]
@@ -1585,29 +1600,22 @@ mod tests {
15851600
assert_eq!(spec.interface.as_ref(), "GbEth1.9000");
15861601
assert_eq!(
15871602
spec.port,
1588-
Some(PortArg::PCI(PciAddress::new(
1603+
NetworkDeviceDescription::Pci(PciAddress::new(
15891604
Domain::from(0),
15901605
Bus::new(2),
15911606
Device::try_from(1).unwrap(),
15921607
Function::try_from(7).unwrap()
1593-
)))
1608+
))
15941609
);
15951610

15961611
// interface + port as kernel interface
15971612
let spec = InterfaceArg::from_str("[email protected]").unwrap();
15981613
assert_eq!(spec.interface.as_ref(), "GbEth1.9000");
15991614
assert_eq!(
16001615
spec.port,
1601-
Some(PortArg::KERNEL(
1602-
InterfaceName::try_from("enp2s1.100").unwrap()
1603-
))
1616+
NetworkDeviceDescription::Kernel(InterfaceName::try_from("enp2s1.100").unwrap())
16041617
);
16051618

1606-
// interface only (backwards compatibility)
1607-
let spec = InterfaceArg::from_str("GbEth1.9000").unwrap();
1608-
assert_eq!(spec.interface.as_ref(), "GbEth1.9000");
1609-
assert!(spec.port.is_none());
1610-
16111619
// bad pci address
16121620
assert!(InterfaceArg::from_str("GbEth1.9000=pci@0000:02:01").is_err());
16131621

dataplane/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pkt-io = { workspace = true }
3434
pkt-meta = { workspace = true }
3535
pyroscope = { workspace = true }
3636
pyroscope_pprofrs = { workspace = true }
37+
rkyv = { workspace = true, features = [] }
3738
routing = { workspace = true }
3839
serde = { workspace = true, features = ["derive"] }
3940
stats = { workspace = true }

dataplane/src/drivers/dpdk.rs

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use dpdk::queue::tx::{TxQueueConfig, TxQueueIndex};
1414
use dpdk::{dev, eal, socket};
1515
use tracing::{debug, error, info, trace, warn};
1616

17-
use crate::CmdArgs;
1817
use net::buffer::PacketBufferMut;
1918
use net::packet::Packet;
2019
use pipeline::sample_nfs::Passthrough;
@@ -96,34 +95,42 @@ fn start_rte_workers(devices: &[Dev], setup_pipeline: &(impl Sync + Fn() -> DynP
9695
info!("Starting RTE Worker on {lcore_id:?}");
9796
WorkerThread::launch(lcore_id, move || {
9897
let mut pipeline = setup_pipeline();
99-
let rx_queue = devices[0]
100-
.rx_queue(RxQueueIndex(u16::try_from(i).unwrap()))
101-
.unwrap();
102-
let tx_queue = devices[0]
103-
.tx_queue(TxQueueIndex(u16::try_from(i).unwrap()))
104-
.unwrap();
98+
let queues: Vec<_> = devices
99+
.iter()
100+
.map(|device| {
101+
let rx_queue = device
102+
.rx_queue(RxQueueIndex(u16::try_from(i).unwrap()))
103+
.unwrap();
104+
let tx_queue = device
105+
.tx_queue(TxQueueIndex(u16::try_from(i).unwrap()))
106+
.unwrap();
107+
(rx_queue, tx_queue)
108+
})
109+
.collect();
105110
loop {
106-
let mbufs = rx_queue.receive();
107-
let pkts = mbufs.filter_map(|mbuf| match Packet::new(mbuf) {
108-
Ok(pkt) => {
109-
debug!("packet: {pkt:?}");
110-
Some(pkt)
111-
}
112-
Err(e) => {
113-
trace!("Failed to parse packet: {e:?}");
114-
None
115-
}
116-
});
111+
for (rx_queue, tx_queue) in &queues {
112+
let mbufs = rx_queue.receive();
113+
let pkts = mbufs.filter_map(|mbuf| match Packet::new(mbuf) {
114+
Ok(pkt) => {
115+
debug!("packet: {pkt:?}");
116+
Some(pkt)
117+
}
118+
Err(e) => {
119+
trace!("Failed to parse packet: {e:?}");
120+
None
121+
}
122+
});
117123

118-
let pkts_out = pipeline.process(pkts);
119-
let buffers = pkts_out.filter_map(|pkt| match pkt.serialize() {
120-
Ok(buf) => Some(buf),
121-
Err(e) => {
122-
error!("{e:?}");
123-
None
124-
}
125-
});
126-
tx_queue.transmit(buffers);
124+
let pkts_out = pipeline.process(pkts);
125+
let buffers = pkts_out.filter_map(|pkt| match pkt.serialize() {
126+
Ok(buf) => Some(buf),
127+
Err(e) => {
128+
error!("{e:?}");
129+
None
130+
}
131+
});
132+
tx_queue.transmit(buffers);
133+
}
127134
}
128135
});
129136
});
@@ -135,9 +142,10 @@ impl DriverDpdk {
135142
pub fn start(
136143
args: impl IntoIterator<Item = impl AsRef<str>>,
137144
setup_pipeline: &(impl Sync + Fn() -> DynPipeline<Mbuf>),
138-
) {
139-
let eal = init_eal(args);
145+
) -> (Eal, Vec<Dev>) {
146+
let eal = eal::init(args);
140147
let devices = init_devices(&eal);
141148
start_rte_workers(&devices, setup_pipeline);
149+
(eal, devices)
142150
}
143151
}

0 commit comments

Comments
 (0)