From 58f865e2e85771ae671103b6e05a1cb0016cc3c2 Mon Sep 17 00:00:00 2001 From: tecc Date: Sat, 10 Feb 2024 18:18:22 +0100 Subject: [PATCH] refactor(no-mut)!: Remove mut from Listener and Connection functions --- examples/async-tokio/client.rs | 2 +- examples/async-tokio/listener.rs | 4 +- examples/sync/client.rs | 2 +- examples/sync/listener.rs | 4 +- src/connection/async_tokio.rs | 111 ++++++++++++++++++------------- src/connection/sync.rs | 106 +++++++++++++++++------------ src/error.rs | 10 +++ 7 files changed, 144 insertions(+), 95 deletions(-) diff --git a/examples/async-tokio/client.rs b/examples/async-tokio/client.rs index 35272af..f53926e 100644 --- a/examples/async-tokio/client.rs +++ b/examples/async-tokio/client.rs @@ -8,7 +8,7 @@ pub async fn main() { // Much like the synchronous connection, you connect using the `connect_to_socket` function. // It has the same parameters - the first one is the name of the socket, and the second is whether // the socket is global or not. - let mut connection = AsyncConnection::connect_to_socket(NAME, false) + let connection = AsyncConnection::connect_to_socket(NAME, false) .await .expect("Connection worked"); diff --git a/examples/async-tokio/listener.rs b/examples/async-tokio/listener.rs index d25b0af..6913bd2 100644 --- a/examples/async-tokio/listener.rs +++ b/examples/async-tokio/listener.rs @@ -8,7 +8,7 @@ pub async fn main() { // You begin by setting up a listener like so, much like the synchronous example // - let mut listener = + let listener = AsyncListener::listen_as_socket(NAME, false).expect("Couldn't listen! That's sad."); // However, here things change! @@ -40,7 +40,7 @@ pub async fn main() { } // This is the code that handles the connections to the listener -async fn handle_connection(mut connection: AsyncConnection) { +async fn handle_connection(connection: AsyncConnection) { // This code is effectively the same as the synchronous example. // You send messages using the `send` method, connection diff --git a/examples/sync/client.rs b/examples/sync/client.rs index 4d084cc..b20accc 100644 --- a/examples/sync/client.rs +++ b/examples/sync/client.rs @@ -8,7 +8,7 @@ pub fn main() { // Internally, gipc resolves the name to some location deterministically. // The second parameter here specifies whether or not to resolve the socket globally (i.e. when the listening process exists for the entire system). // In our case, it doesn't, so we set that parameter to false. - let mut connection = + let connection = Connection::connect_to_socket(NAME, false).expect("Connection should connect properly"); // Once we've successfully connected, we can use its two main methods: `send` and `receive`. diff --git a/examples/sync/listener.rs b/examples/sync/listener.rs index d393929..6cc8612 100644 --- a/examples/sync/listener.rs +++ b/examples/sync/listener.rs @@ -5,13 +5,13 @@ pub fn main() { println!("[listener] Listening to socket {}", NAME); // You begin by setting up a listener, much like you do a connection (see the client example). // The only difference is that you cannot begin sending and receiving messages yet. - let mut listener = + let listener = Listener::listen_as_socket(NAME, false).expect("Couldn't listen! That's sad."); // And then you just accept incoming connections! // This is a bad implementation, however - this can only handle one connection at a time! // See the async example for a better example on how a listener should work. - while let Ok(mut connection) = listener.accept() { + while let Ok(connection) = listener.accept() { // Once you've accepted the connection, everything works precisely like it does on // the client-side - this is because gipc uses the same type to represent a connection // from a client to a listener as it does for the listener to a client. diff --git a/src/connection/async_tokio.rs b/src/connection/async_tokio.rs index e5ec4fb..5684707 100644 --- a/src/connection/async_tokio.rs +++ b/src/connection/async_tokio.rs @@ -19,9 +19,13 @@ use futures_io::{AsyncRead, AsyncWrite}; use interprocess::local_socket::tokio::{LocalSocketListener, LocalSocketStream}; use serde::de::DeserializeOwned; use serde::Serialize; +use tokio::sync::Mutex; /// Listeners allow you to wait until new [`Connection`s](Connection) can be established. pub struct Listener { + inner: Mutex +} +struct ListenerInner { internal: Box, closed: bool, } @@ -29,10 +33,12 @@ pub struct Listener { impl Listener { /// Creates a new listener based on a specified [`ListenerImpl`]. /// Generally, you won't call this directly unless you're extending gipc. - pub const fn new(internal: Box) -> Self { + pub fn new(internal: Box) -> Self { Self { - internal, - closed: false, + inner: Mutex::new(ListenerInner { + internal, + closed: false, + }) } } /// Listens to a socket on the local machine with a name based on `name`. @@ -46,42 +52,70 @@ impl Listener { } /// Accept a new connection. - pub async fn accept(&mut self) -> Result { - if self.closed { + pub async fn accept(&self) -> Result { + let mut lock = self.inner.lock().await; + if lock.closed { return Err(Error::Closed(false)); } - self.internal.accept().await + lock.internal.accept().await } /// Closes this listener, returning any error that occurred whilst closing it. /// After calling this function, all other methods will immediately return [`Error::Closed(false)`](Error::Closed). - pub async fn close(&mut self) -> Result<()> { - if self.closed { + pub async fn close(&self) -> Result<()> { + let mut lock = self.inner.lock().await; + if lock.closed { return Err(Error::Closed(false)); } - self.closed = true; // we set it to closed either way - self.internal.close().await + lock.closed = true; // we set it to closed either way + lock.internal.close().await } /// Check if this listener is closed. pub fn is_closed(&self) -> bool { - self.closed + tokio::task::block_in_place(|| { + self.inner.blocking_lock().closed + }) } } /// Connections represent a two-way bidirectional stream that you can send and receive messages through. pub struct Connection { + inner: Mutex +} +struct ConnectionInner { internal: Box, closed: bool, } +impl ConnectionInner { + async fn _send(&mut self, message: Message) -> Result<()> + where + T: Serialize, + { + message.write_to_async(&mut self.internal).await + } + async fn _receive(&mut self) -> Result> + where + T: DeserializeOwned, + { + Message::::read_from_async(&mut self.internal).await + } + + async fn _close(&mut self) { + self.internal.close().await; + self.closed = true; + } +} impl Connection { /// Creates a new connection based on a specified [`ConnectionImpl`]. /// Generally, you won't call this directly unless you're extending gipc. - pub const fn new(internal: Box) -> Self { + pub fn new(internal: Box) -> Self { Self { - internal, - closed: false, + inner: Mutex::new(ConnectionInner{ + internal, + closed: false, + }) } } /// Connects to a socket using a name based on `name`. @@ -94,45 +128,34 @@ impl Connection { Ok(Self::new(Box::new(bound))) } - async fn _send(&mut self, message: Message) -> Result<()> - where - T: Serialize, - { - message.write_to_async(&mut self.internal).await - } - async fn _receive(&mut self) -> Result> - where - T: DeserializeOwned, - { - Message::::read_from_async(&mut self.internal).await - } - /// Send a message through this connection. /// Will immediately fail with [`Error::Closed(false)`](Error::Closed) if this connection is already closed. - pub async fn send(&mut self, message_data: T) -> Result<()> + pub async fn send(&self, message_data: T) -> Result<()> where T: Serialize, { - if self.closed { + let mut lock = self.inner.lock().await; + if lock.closed { return Err(Error::Closed(false)); } let message = Message::Data(message_data); - self._send(message).await + lock._send(message).await } /// Receive a message from this connection. /// Will immediately fail with [`Error::Closed(false)`](Error::Closed) if this connection is already closed, /// or fail with [`Error::Closed(true)`](Error::Closed) if this connection was closed whilst trying to read the message. - pub async fn receive(&mut self) -> Result + pub async fn receive(&self) -> Result where T: DeserializeOwned, { - if self.closed { + let mut lock = self.inner.lock().await; + if lock.closed { return Err(Error::Closed(false)); } - let message = self._receive().await?; + let message = lock._receive().await?; match message { Message::ClosingConnection => { - self._close().await; + lock._close().await; Err(Error::Closed(true)) } Message::Data(data) => Ok(data), @@ -140,7 +163,7 @@ impl Connection { } /// Shorthand for calling [`send`] and [`receive`] after one another. - pub async fn send_and_receive(&mut self, data: &A) -> Result + pub async fn send_and_receive(&self, data: &A) -> Result where A: Serialize, B: DeserializeOwned, @@ -149,25 +172,23 @@ impl Connection { self.receive().await } - async fn _close(&mut self) { - self.internal.close().await; - self.closed = true; - } - /// Closes this connection if it isn't already closed. /// This operation can never fail. - pub async fn close(&mut self) { - if self.closed { + pub async fn close(&self) { + let mut lock = self.inner.lock().await; + if lock.closed { return; } // ignore the results of this - it doesn't matter since we're closing it either way - let _ = self._send::<()>(Message::ClosingConnection); - self._close().await; + let _ = lock._send::<()>(Message::ClosingConnection); + lock._close().await; } /// Check if this connection is closed. pub fn is_closed(&self) -> bool { - self.closed + tokio::task::block_in_place(|| { + self.inner.blocking_lock().closed + }) } } diff --git a/src/connection/sync.rs b/src/connection/sync.rs index 5c0583f..eef45b2 100644 --- a/src/connection/sync.rs +++ b/src/connection/sync.rs @@ -17,11 +17,15 @@ use interprocess::local_socket::{LocalSocketListener, LocalSocketStream}; use serde::de::DeserializeOwned; use serde::Serialize; use std::io::{Read, Write}; +use std::sync::{Mutex}; /// Listeners allow you to wait until new [`Connection`s](Connection) can be established. pub struct Listener { + inner: Mutex +} +struct ListenerInner { internal: Box, - closed: bool, + closed: bool } impl Listener { @@ -29,8 +33,10 @@ impl Listener { /// Generally, you won't call this directly unless you're extending gipc. pub const fn new(internal: Box) -> Self { Self { - internal, - closed: false, + inner: Mutex::new(ListenerInner { + internal, + closed: false, + }) } } @@ -45,25 +51,27 @@ impl Listener { } /// Accept a new connection. - pub fn accept(&mut self) -> Result { - if self.closed { + pub fn accept(&self) -> Result { + let mut guard = self.inner.lock()?; + if guard.closed { return Err(Error::Closed(false)); } - self.internal.accept() + guard.internal.accept() } /// Closes this listener, returning any error that occurred whilst closing it. /// After calling this function, all other methods will immediately return [`Error::Closed(false)`](Error::Closed) if called. - pub fn close(&mut self) -> Result<()> { - if self.closed { + pub fn close(&self) -> Result<()> { + let mut guard = self.inner.lock()?; + if guard.closed { return Err(Error::Closed(false)); } - self.closed = true; // we set it to closed either way - self.internal.close() + guard.closed = true; // we set it to closed either way + guard.internal.close() } /// Check if this listener is closed. pub fn is_closed(&self) -> bool { - self.closed + self.inner.lock().map(|v| v.closed).unwrap_or(true) } } @@ -75,17 +83,41 @@ impl Drop for Listener { /// Connections represent a two-way bidirectional stream that you can send and receive messages through. pub struct Connection { + inner: Mutex +} +struct ConnectionInner { internal: Box, closed: bool, } +impl ConnectionInner { + fn _send(&mut self, message: Message) -> Result<()> + where + T: Serialize, + { + message.write_to(&mut self.internal) + } + fn _receive(&mut self) -> Result> + where + T: DeserializeOwned, + { + Message::::read_from(&mut self.internal) + } + fn _close(&mut self) { + self.internal.close(); + self.closed = true; + } +} + impl Connection { /// Creates a new connection based on a specified [`ConnectionImpl`]. /// Generally, you won't call this directly unless you're extending gipc. pub const fn new(internal: Box) -> Self { Self { - internal, - closed: false, + inner: Mutex::new(ConnectionInner { + internal, + closed: false, + }) } } /// Connects to a socket using a name based on `name`. @@ -98,52 +130,41 @@ impl Connection { Ok(Self::new(Box::new(bound))) } - fn _send(&mut self, message: Message) -> Result<()> - where - T: Serialize, - { - message.write_to(&mut self.internal) - } - fn _receive(&mut self) -> Result> - where - T: DeserializeOwned, - { - Message::::read_from(&mut self.internal) - } - /// Send a message through this connection. /// Will immediately fail with [`Error::Closed(false)`] if this connection is already closed. - pub fn send(&mut self, message_data: &T) -> Result<()> + pub fn send(&self, message_data: &T) -> Result<()> where T: Serialize, { - if self.closed { + let mut inner = self.inner.lock()?; + if inner.closed { return Err(Error::Closed(false)); } let message = Message::Data(message_data); - self._send(message) + inner._send(message) } /// Receive a message from this connection. /// Will immediately fail with [`Error::Closed(false)`] if this connection is already closed, /// or fail with [`Error::Closed(true)`] if this connection was closed whilst trying to read the message. - pub fn receive(&mut self) -> Result + pub fn receive(&self) -> Result where T: DeserializeOwned, { - if self.closed { + let mut inner = self.inner.lock()?; + if inner.closed { return Err(Error::Closed(false)); } - let message = self._receive()?; + let message = inner._receive()?; match message { Message::ClosingConnection => { - self._close(); + inner._close(); Err(Error::Closed(true)) } Message::Data(data) => Ok(data), } } /// Shorthand for calling [`send`](Self::send) and [`receive`](Self::receive) after one another. - pub fn send_and_receive(&mut self, data: &A) -> Result + pub fn send_and_receive(&self, data: &A) -> Result where A: Serialize, B: DeserializeOwned, @@ -152,25 +173,22 @@ impl Connection { self.receive() } - fn _close(&mut self) { - self.internal.close(); - self.closed = true; - } - /// Closes this connection if it isn't already closed. /// This operation can never fail. - pub fn close(&mut self) { - if self.closed { + pub fn close(&self) { + let Ok(mut inner) = self.inner.lock() else { return }; + + if inner.closed { return; } // ignore the results of this - it doesn't matter since we're closing it either way - let _ = self._send::<()>(Message::ClosingConnection); - self._close(); + let _ = inner._send::<()>(Message::ClosingConnection); + inner._close(); } /// Check if this connection is closed. pub fn is_closed(&self) -> bool { - self.closed + self.inner.lock().map(|v| v.closed).unwrap_or(true) } } diff --git a/src/error.rs b/src/error.rs index 59ca28b..7fb93a0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,7 @@ //! A small module containing the [`Error`] and [`Result`] type. +use std::sync::{Mutex, PoisonError}; + /// Error type for this library. Any error this library produces uses this to represent it. #[derive(thiserror::Error, Debug)] pub enum Error { @@ -25,6 +27,9 @@ pub enum Error { #[from] tokio::task::JoinError, ), + /// Indicates that a Mutex or RwLock was poisoned. + #[error("mutex error: {0}")] + Poison(String), /// Indicates that something is closed. #[error("{}", if *.0 { "was closed by operation" } else { "already closed" })] Closed( @@ -32,5 +37,10 @@ pub enum Error { bool, ), } +impl From> for Error { + fn from(value: PoisonError) -> Self { + Self::Poison(value.to_string()) + } +} /// Result type for this library. Shorthand for [`std::result::Result`]. pub type Result = std::result::Result;