Skip to content

Commit c1f4e07

Browse files
authored
Merge pull request #1618 from microsoft/ab-cleanup-v12
demikernel: v12: series of patches with code improvements
2 parents 68c7ab9 + 3660086 commit c1f4e07

File tree

9 files changed

+73
-85
lines changed

9 files changed

+73
-85
lines changed

src/catnap/linux/active_socket.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,8 @@ impl ActiveSocketData {
9090
// Put the buffer back and try again later.
9191
self.send_queue.push_front(Outgoing { addr, buffer, result });
9292
} else {
93-
let cause = format!("failed to send on socket: {:?}", errno);
94-
error!("poll_send(): {}", cause);
95-
result.set(Some(Err(Fail::new(errno, &cause))));
93+
error!("poll_send(): failed on socket: {:?}", errno);
94+
result.set(Some(Err(Fail::new(errno, "send failed on socket"))));
9695
}
9796
},
9897
}
@@ -126,9 +125,8 @@ impl ActiveSocketData {
126125
Err(e) => {
127126
let errno = get_libc_err(e);
128127
if !DemiRuntime::should_retry(errno) {
129-
let cause = format!("failed to receive on socket: {:?}", errno);
130-
error!("poll_recv(): {}", cause);
131-
self.recv_queue.push(Err(Fail::new(errno, &cause)));
128+
error!("poll_recv(): failed on socket: {:?}", errno);
129+
self.recv_queue.push(Err(Fail::new(errno, "receive failed on socket")));
132130
}
133131
},
134132
}
@@ -153,21 +151,22 @@ impl ActiveSocketData {
153151
}
154152
}
155153

156-
/// Pops data from the socket. Blocks until some data is found but does not wait until the buf has reached [size].
154+
/// Blocks until some data is found but does not wait until the buf has reached [size].
157155
pub async fn pop(
158156
&mut self,
159157
size: usize,
160158
timeout: Option<Duration>,
161159
) -> Result<(Option<SocketAddr>, DemiBuffer), Fail> {
162160
let (addr, mut buffer) = self.recv_queue.pop(timeout).await??;
163-
// Figure out how much data we got.
164161
let bytes_read = min(buffer.len(), size);
162+
165163
// Trim the buffer and leave for next read if we got more than expected.
166164
if let Ok(remainder) = buffer.split_back(bytes_read) {
167165
if !remainder.is_empty() {
168-
self.push_front(remainder, addr.clone());
166+
self.push_front(remainder, addr);
169167
}
170168
}
169+
171170
Ok((addr, buffer))
172171
}
173172

src/catnap/linux/socket.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,25 +70,23 @@ impl SharedSocketData {
7070
self.set_socket_data(SocketData::Active(ActiveSocketData::new(socket)));
7171
}
7272

73-
/// Gets a reference to the actual Socket for reading the socket's metadata (mostly the raw file descriptor).
74-
pub fn get_socket<'a>(&'a self) -> &'a Socket {
73+
pub fn get_socket(&self) -> &Socket {
7574
let _self = self.as_ref();
7675
match _self {
7776
SocketData::Inactive(Some(socket)) => socket,
7877
SocketData::Active(data) => data.socket(),
7978
SocketData::Passive(data) => data.socket(),
80-
_ => panic!("Should have data"),
79+
_ => panic!("should have data"),
8180
}
8281
}
8382

84-
/// Gets a mutable reference to the actual Socket for I/O operations.
85-
pub fn get_mut_socket<'a>(&'a mut self) -> &'a mut Socket {
83+
pub fn get_socket_mut(&mut self) -> &mut Socket {
8684
let _self = self.as_mut();
8785
match _self {
8886
SocketData::Inactive(Some(socket)) => socket,
8987
SocketData::Active(data) => data.socket_mut(),
9088
SocketData::Passive(data) => data.socket_mut(),
91-
_ => panic!("Should have data"),
89+
_ => panic!("should have data"),
9290
}
9391
}
9492

src/catnap/linux/transport.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ impl SharedCatnapTransport {
203203

204204
/// Internal function to get the Socket from the metadata structure, given the socket descriptor.
205205
fn socket_from_sd(&mut self, sd: &SockDesc) -> &mut Socket {
206-
self.data_from_sd(sd).get_mut_socket()
206+
self.data_from_sd(sd).get_socket_mut()
207207
}
208208

209209
/// Internal function to get the metadata for the socket, given the socket descriptor.

src/demikernel/libos/network/libos.rs

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,8 @@ impl<T: NetworkTransport> SharedNetworkLibOS<T> {
7474
}
7575

7676
if (typ != Type::STREAM) && (typ != Type::DGRAM) {
77-
let cause: String = format!("socket type not supported (type={:?})", typ);
78-
error!("socket(): {}", cause);
79-
return Err(Fail::new(libc::ENOTSUP, &cause));
77+
error!("socket(): socket type not supported (type={:?})", typ);
78+
return Err(Fail::new(libc::ENOTSUP, "socket type not supported"));
8079
}
8180

8281
let queue: SharedNetworkQueue<T> = SharedNetworkQueue::new(domain, typ, &mut self.transport)?;
@@ -109,24 +108,22 @@ impl<T: NetworkTransport> SharedNetworkLibOS<T> {
109108
// We only support the wildcard address for UDP sockets.
110109
// FIXME: https://github.com/demikernel/demikernel/issues/189
111110
if *socket_addrv4.ip() == Ipv4Addr::UNSPECIFIED && self.get_shared_queue(&qd)?.qtype() != QType::UdpSocket {
112-
let cause: String = format!("cannot bind to wildcard address (qd={:?})", qd);
113-
error!("bind(): {}", cause);
114-
return Err(Fail::new(libc::ENOTSUP, &cause));
111+
error!("bind(): cannot bind to wildcard address (qd={:?})", qd);
112+
return Err(Fail::new(libc::ENOTSUP, "cannot bind to wildcard address"));
115113
}
116114

117115
// We only support the wildcard address for UDP sockets.
118116
// FIXME: https://github.com/demikernel/demikernel/issues/582
119117
if socket_addr.port() == 0 && self.get_shared_queue(&qd)?.qtype() != QType::UdpSocket {
120-
let cause: String = format!("cannot bind to port 0 (qd={:?})", qd);
121-
error!("bind(): {}", cause);
122-
return Err(Fail::new(libc::ENOTSUP, &cause));
118+
error!("bind(): cannot bind to port 0 (qd={:?})", qd);
119+
return Err(Fail::new(libc::ENOTSUP, "cannot bind to port 0"));
123120
}
124121

125122
if self.runtime.is_addr_in_use(socket_addrv4) {
126-
let cause: String = format!("address is already bound to a socket (qd={:?}", qd);
127-
error!("bind(): {}", &cause);
128-
return Err(Fail::new(libc::EADDRINUSE, &cause));
123+
error!("bind(): address is already bound to a socket (qd={:?}", qd);
124+
return Err(Fail::new(libc::EADDRINUSE, "address is already bound to a socket"));
129125
}
126+
130127
self.get_shared_queue(&qd)?.bind(socket_addr)?;
131128
// Insert into address to queue descriptor table.
132129
self.runtime
@@ -142,12 +139,10 @@ impl<T: NetworkTransport> SharedNetworkLibOS<T> {
142139

143140
// We use this API for testing, so we must check again.
144141
if !((backlog > 0) && (backlog <= SOMAXCONN as usize)) {
145-
let cause: String = format!("invalid backlog length: {:?}", backlog);
146-
warn!("{}", cause);
147-
return Err(Fail::new(libc::EINVAL, &cause));
142+
warn!("invalid backlog length: {:?}", backlog);
143+
return Err(Fail::new(libc::EINVAL, "invalid backlog length"));
148144
}
149145

150-
// Issue listen operation.
151146
self.get_shared_queue(&qd)?.listen(backlog)
152147
}
153148

@@ -303,23 +298,23 @@ impl<T: NetworkTransport> SharedNetworkLibOS<T> {
303298
/// coroutine that asynchronously runs the push and any synchronous multi-queue functionality before the push
304299
/// begins.
305300
pub fn push(&mut self, qd: QDesc, sga: &demi_sgarray_t) -> Result<QToken, Fail> {
306-
let bufs = clone_sgarray(sga)?;
307-
if bufs.is_empty() {
308-
let cause = "zero-length list of buffers";
309-
warn!("push(): {}", cause);
310-
return Err(Fail::new(libc::EINVAL, &cause));
301+
let buffers = clone_sgarray(sga)?;
302+
303+
if buffers.is_empty() {
304+
warn!("push(): buffers cannot be empty");
305+
return Err(Fail::new(libc::EINVAL, "buffers cannot be empty"));
311306
}
312-
for buf in bufs.iter() {
313-
if buf.is_empty() {
314-
let cause = "zero-length buffer";
315-
warn!("push(): {}", cause);
316-
return Err(Fail::new(libc::EINVAL, &cause));
307+
308+
for buffer in buffers.iter() {
309+
if buffer.is_empty() {
310+
warn!("push(): empty buffer");
311+
return Err(Fail::new(libc::EINVAL, "empty buffer"));
317312
};
318313
}
319314

320315
let mut queue: SharedNetworkQueue<T> = self.get_shared_queue(&qd)?;
321316
let coroutine_constructor = || -> Result<QToken, Fail> {
322-
let coroutine = Box::pin(self.clone().push_coroutine(qd, bufs, None).fuse());
317+
let coroutine = Box::pin(self.clone().push_coroutine(qd, buffers, None).fuse());
323318
self.runtime
324319
.clone()
325320
.schedule_coroutine("ioc::network::libos::push", coroutine)
@@ -360,14 +355,14 @@ impl<T: NetworkTransport> SharedNetworkLibOS<T> {
360355
pub fn pushto(&mut self, qd: QDesc, sga: &demi_sgarray_t, remote: SocketAddr) -> Result<QToken, Fail> {
361356
trace!("pushto() qd={:?}", qd);
362357

363-
let bufs: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN> = clone_sgarray(sga)?;
364-
if bufs.is_empty() {
365-
return Err(Fail::new(libc::EINVAL, "zero buffers to send"));
358+
let buffers: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN> = clone_sgarray(sga)?;
359+
if buffers.is_empty() {
360+
return Err(Fail::new(libc::EINVAL, "buffers cannot be empty"));
366361
}
367362

368363
let mut queue: SharedNetworkQueue<T> = self.get_shared_queue(&qd)?;
369364
let coroutine_constructor = || -> Result<QToken, Fail> {
370-
let coroutine = Box::pin(self.clone().push_coroutine(qd, bufs, Some(remote)).fuse());
365+
let coroutine = Box::pin(self.clone().push_coroutine(qd, buffers, Some(remote)).fuse());
371366
self.runtime
372367
.clone()
373368
.schedule_coroutine("ioc::network::libos::pushto", coroutine)

src/demikernel/libos/network/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ impl NetworkLibOSWrapper {
120120

121121
/// Marks a socket as a passive one.
122122
pub fn listen(&mut self, sockqd: QDesc, mut backlog: usize) -> Result<(), Fail> {
123-
// Truncate backlog length.
123+
// Limit backlog length.
124124
if backlog > SOMAXCONN as usize {
125125
debug!(
126126
"listen(): backlog length is too large, truncating (qd={:?}, backlog={:?})",

src/demikernel/libos/network/queue.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,8 @@ impl<T: NetworkTransport> SharedNetworkQueue<T> {
7474

7575
pub fn set_socket_option(&mut self, option: SocketOption) -> Result<(), Fail> {
7676
if self.state_machine.ensure_not_closing().is_err() {
77-
let cause = "cannot set socket-level options when socket is closing";
78-
warn!("set_socket_option(): {}", cause);
79-
return Err(Fail::new(libc::EBUSY, cause));
77+
warn!("set_socket_option(): cannot set socket options when closing");
78+
return Err(Fail::new(libc::EBUSY, "cannot set socket options when closing"));
8079
}
8180

8281
self.transport.clone().set_socket_option(&mut self.socket, option)

src/runtime/memory/mod.rs

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -37,66 +37,65 @@ pub trait DemiMemoryAllocator {
3737
}
3838
}
3939

40-
/// Converts a list of DemiBuffers into a scatter-gather array.
41-
pub fn into_sgarray(bufs: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN>) -> Result<demi_sgarray_t, Fail> {
42-
// Check the sizes before allocating anything.
43-
if bufs.is_empty() {
44-
let cause = "cannot allocate a zero element scatter-gather array";
45-
error!("into_sgarray(): {}", cause);
46-
return Err(Fail::new(libc::EINVAL, &cause));
40+
pub fn into_sgarray(buffers: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN>) -> Result<demi_sgarray_t, Fail> {
41+
if buffers.is_empty() {
42+
error!("into_sgarray(): buffers is empty");
43+
return Err(Fail::new(libc::EINVAL, "buffers is empty"));
4744
}
48-
if bufs.len() > DEMI_SGARRAY_MAXLEN {
49-
let cause = format!("cannot allocate a {} element scatter-gather array", bufs.len());
50-
error!("into_sgarray(): {}", cause);
51-
return Err(Fail::new(libc::EINVAL, &cause));
45+
46+
if buffers.len() > DEMI_SGARRAY_MAXLEN {
47+
error!(
48+
"into_sgarray(): too many buffers: {}, max: {}",
49+
buffers.len(),
50+
DEMI_SGARRAY_MAXLEN
51+
);
52+
return Err(Fail::new(libc::EINVAL, "too many buffers"));
5253
}
5354

54-
// Create a scatter-gather segment to expose the DemiBuffers to the user.
55-
let mut sga: demi_sgarray_t = demi_sgarray_t::default();
56-
sga.num_segments = bufs.len() as u32;
55+
let mut sga: demi_sgarray_t = demi_sgarray_t {
56+
num_segments: buffers.len() as u32,
57+
..Default::default()
58+
};
5759

58-
for (i, buf) in bufs.into_iter().enumerate() {
59-
sga.segments[i].data_buf_ptr = buf.as_ptr() as *mut c_void;
60-
sga.segments[i].data_len_bytes = buf.len() as u32;
61-
sga.segments[i].reserved_metadata_ptr = buf.into_raw().as_ptr() as *mut c_void;
60+
for (i, buffer) in buffers.into_iter().enumerate() {
61+
sga.segments[i].data_buf_ptr = buffer.as_ptr() as *mut c_void;
62+
sga.segments[i].data_len_bytes = buffer.len() as u32;
63+
sga.segments[i].reserved_metadata_ptr = buffer.into_raw().as_ptr() as *mut c_void;
6264
}
6365

64-
// Create and return a new scatter-gather array (which inherits the DemiBuffer's reference).
6566
Ok(sga)
6667
}
6768

68-
/// Allocates a scatter-gather array.
6969
pub fn sgaalloc<M: DemiMemoryAllocator>(size: usize, mem_alloc: &M) -> Result<demi_sgarray_t, Fail> {
70-
// Check the sizes before allocating anything.
71-
// We can't allocate a zero-sized buffer.
7270
if size == 0 {
73-
let cause = "cannot allocate a zero-sized buffer";
74-
error!("sgaalloc(): {}", cause);
75-
return Err(Fail::new(libc::EINVAL, cause));
71+
error!("sgaalloc(): cannot allocate zero-sized buffer");
72+
return Err(Fail::new(libc::EINVAL, "cannot allocate zero-sized buffer"));
7673
}
7774

7875
// First allocate the underlying DemiBuffer.
7976
if size > mem_alloc.max_buffer_size_bytes() * DEMI_SGARRAY_MAXLEN {
8077
return Err(Fail::new(libc::EINVAL, "size too large for a single demi_sgaseg_t"));
8178
}
79+
8280
// Calculate the number of DemiBuffers to allocate.
8381
let max_buffer_size_bytes: usize = mem_alloc.max_buffer_size_bytes();
8482
let remainder: usize = size % max_buffer_size_bytes;
8583
let len: usize = (size - remainder) / max_buffer_size_bytes;
8684
let mut bufs: ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN> = ArrayVec::new();
85+
8786
for _ in 0..len {
8887
bufs.push(mem_alloc.allocate_demi_buffer(max_buffer_size_bytes)?);
8988
}
89+
9090
// If there is any remaining length, allocate a partial buffer.
9191
if remainder > 0 {
9292
bufs.push(mem_alloc.allocate_demi_buffer(remainder)?);
9393
}
94+
9495
into_sgarray(bufs)
9596
}
9697

97-
/// Releases a scatter-gather array.
9898
pub fn sgafree(sga: demi_sgarray_t) -> Result<(), Fail> {
99-
// Check arguments.
10099
if sga.num_segments > DEMI_SGARRAY_MAXLEN as u32 {
101100
return Err(Fail::new(libc::EINVAL, "demi_sgarray_t has invalid segment count"));
102101
}
@@ -105,13 +104,13 @@ pub fn sgafree(sga: demi_sgarray_t) -> Result<(), Fail> {
105104
let buf: DemiBuffer = convert_sgaseg_to_demi_buffer(&sga.segments[i])?;
106105
drop(buf);
107106
}
107+
108108
Ok(())
109109
}
110110

111-
/// Clones a scatter-gather array. The [sga_buf] field must point to the first DemiBuffer in the chain and the elements
112-
/// of [segments] must be the rest of the chain.
111+
/// The [sga_buf] field must point to the first DemiBuffer in the chain and the elements of [segments] must be the rest
112+
/// of the chain.
113113
pub fn clone_sgarray(sga: &demi_sgarray_t) -> Result<ArrayVec<DemiBuffer, DEMI_SGARRAY_MAXLEN>, Fail> {
114-
// Check arguments.
115114
if sga.num_segments > DEMI_SGARRAY_MAXLEN as u32 || sga.num_segments == 0 {
116115
return Err(Fail::new(libc::EINVAL, "demi_sgarray_t has invalid segment count"));
117116
}

src/runtime/scheduler/page/page_ref.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ impl WakerPageRef {
6666

6767
unsafe {
6868
let base_ptr: *mut u8 = self.0.as_ptr().cast();
69-
let ptr = NonNull::new_unchecked(base_ptr.add(ix));
70-
ptr
69+
NonNull::new_unchecked(base_ptr.add(ix))
7170
}
7271
}
7372
}

src/runtime/scheduler/scheduler.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,7 @@ mod tests {
147147

148148
impl DummyCoroutine {
149149
pub fn new(val: usize) -> Self {
150-
let f = Self { val };
151-
f
150+
Self { val }
152151
}
153152
}
154153
impl Future for DummyCoroutine {

0 commit comments

Comments
 (0)