Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/async-tokio/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub async fn main() {
// Much like the synchronous connection, you connect using the `connect_to_socket` function.
// It has the same parameters - the first one is the name of the socket, and the second is whether
// the socket is global or not.
let mut connection = AsyncConnection::connect_to_socket(NAME, false)
let connection = AsyncConnection::connect_to_socket(NAME, false)
.await
.expect("Connection worked");

Expand Down
4 changes: 2 additions & 2 deletions examples/async-tokio/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub async fn main() {

// You begin by setting up a listener like so, much like the synchronous example
//
let mut listener =
let listener =
AsyncListener::listen_as_socket(NAME, false).expect("Couldn't listen! That's sad.");

// However, here things change!
Expand Down Expand Up @@ -40,7 +40,7 @@ pub async fn main() {
}

// This is the code that handles the connections to the listener
async fn handle_connection(mut connection: AsyncConnection) {
async fn handle_connection(connection: AsyncConnection) {
// This code is effectively the same as the synchronous example.
// You send messages using the `send` method,
connection
Expand Down
2 changes: 1 addition & 1 deletion examples/sync/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub fn main() {
// Internally, gipc resolves the name to some location deterministically.
// The second parameter here specifies whether or not to resolve the socket globally (i.e. when the listening process exists for the entire system).
// In our case, it doesn't, so we set that parameter to false.
let mut connection =
let connection =
Connection::connect_to_socket(NAME, false).expect("Connection should connect properly");

// Once we've successfully connected, we can use its two main methods: `send` and `receive`.
Expand Down
4 changes: 2 additions & 2 deletions examples/sync/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ pub fn main() {
println!("[listener] Listening to socket {}", NAME);
// You begin by setting up a listener, much like you do a connection (see the client example).
// The only difference is that you cannot begin sending and receiving messages yet.
let mut listener =
let listener =
Listener::listen_as_socket(NAME, false).expect("Couldn't listen! That's sad.");

// And then you just accept incoming connections!
// This is a bad implementation, however - this can only handle one connection at a time!
// See the async example for a better example on how a listener should work.
while let Ok(mut connection) = listener.accept() {
while let Ok(connection) = listener.accept() {
// Once you've accepted the connection, everything works precisely like it does on
// the client-side - this is because gipc uses the same type to represent a connection
// from a client to a listener as it does for the listener to a client.
Expand Down
111 changes: 66 additions & 45 deletions src/connection/async_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,26 @@ use futures_io::{AsyncRead, AsyncWrite};
use interprocess::local_socket::tokio::{LocalSocketListener, LocalSocketStream};
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::Mutex;

/// Listeners allow you to wait until new [`Connection`s](Connection) can be established.
pub struct Listener {
inner: Mutex<ListenerInner>
}
struct ListenerInner {
internal: Box<dyn ListenerImpl>,
closed: bool,
}

impl Listener {
/// Creates a new listener based on a specified [`ListenerImpl`].
/// Generally, you won't call this directly unless you're extending gipc.
pub const fn new(internal: Box<dyn ListenerImpl>) -> Self {
pub fn new(internal: Box<dyn ListenerImpl>) -> Self {
Self {
internal,
closed: false,
inner: Mutex::new(ListenerInner {
internal,
closed: false,
})
}
}
/// Listens to a socket on the local machine with a name based on `name`.
Expand All @@ -46,42 +52,70 @@ impl Listener {
}

/// Accept a new connection.
pub async fn accept(&mut self) -> Result<Connection> {
if self.closed {
pub async fn accept(&self) -> Result<Connection> {
let mut lock = self.inner.lock().await;
if lock.closed {
return Err(Error::Closed(false));
}
self.internal.accept().await
lock.internal.accept().await
}

/// Closes this listener, returning any error that occurred whilst closing it.
/// After calling this function, all other methods will immediately return [`Error::Closed(false)`](Error::Closed).
pub async fn close(&mut self) -> Result<()> {
if self.closed {
pub async fn close(&self) -> Result<()> {
let mut lock = self.inner.lock().await;
if lock.closed {
return Err(Error::Closed(false));
}
self.closed = true; // we set it to closed either way
self.internal.close().await
lock.closed = true; // we set it to closed either way
lock.internal.close().await
}

/// Check if this listener is closed.
pub fn is_closed(&self) -> bool {
self.closed
tokio::task::block_in_place(|| {
self.inner.blocking_lock().closed
})
}
}

/// Connections represent a two-way bidirectional stream that you can send and receive messages through.
pub struct Connection {
inner: Mutex<ConnectionInner>
}
struct ConnectionInner {
internal: Box<dyn ConnectionImpl>,
closed: bool,
}
impl ConnectionInner {
async fn _send<T>(&mut self, message: Message<T>) -> Result<()>
where
T: Serialize,
{
message.write_to_async(&mut self.internal).await
}
async fn _receive<T>(&mut self) -> Result<Message<T>>
where
T: DeserializeOwned,
{
Message::<T>::read_from_async(&mut self.internal).await
}

async fn _close(&mut self) {
self.internal.close().await;
self.closed = true;
}
}

impl Connection {
/// Creates a new connection based on a specified [`ConnectionImpl`].
/// Generally, you won't call this directly unless you're extending gipc.
pub const fn new(internal: Box<dyn ConnectionImpl>) -> Self {
pub fn new(internal: Box<dyn ConnectionImpl>) -> Self {
Self {
internal,
closed: false,
inner: Mutex::new(ConnectionInner{
internal,
closed: false,
})
}
}
/// Connects to a socket using a name based on `name`.
Expand All @@ -94,53 +128,42 @@ impl Connection {
Ok(Self::new(Box::new(bound)))
}

async fn _send<T>(&mut self, message: Message<T>) -> Result<()>
where
T: Serialize,
{
message.write_to_async(&mut self.internal).await
}
async fn _receive<T>(&mut self) -> Result<Message<T>>
where
T: DeserializeOwned,
{
Message::<T>::read_from_async(&mut self.internal).await
}

/// Send a message through this connection.
/// Will immediately fail with [`Error::Closed(false)`](Error::Closed) if this connection is already closed.
pub async fn send<T>(&mut self, message_data: T) -> Result<()>
pub async fn send<T>(&self, message_data: T) -> Result<()>
where
T: Serialize,
{
if self.closed {
let mut lock = self.inner.lock().await;
if lock.closed {
return Err(Error::Closed(false));
}
let message = Message::Data(message_data);
self._send(message).await
lock._send(message).await
}
/// Receive a message from this connection.
/// Will immediately fail with [`Error::Closed(false)`](Error::Closed) if this connection is already closed,
/// or fail with [`Error::Closed(true)`](Error::Closed) if this connection was closed whilst trying to read the message.
pub async fn receive<T>(&mut self) -> Result<T>
pub async fn receive<T>(&self) -> Result<T>
where
T: DeserializeOwned,
{
if self.closed {
let mut lock = self.inner.lock().await;
if lock.closed {
return Err(Error::Closed(false));
}
let message = self._receive().await?;
let message = lock._receive().await?;
match message {
Message::ClosingConnection => {
self._close().await;
lock._close().await;
Err(Error::Closed(true))
}
Message::Data(data) => Ok(data),
}
}

/// Shorthand for calling [`send`] and [`receive`] after one another.
pub async fn send_and_receive<A, B>(&mut self, data: &A) -> Result<B>
pub async fn send_and_receive<A, B>(&self, data: &A) -> Result<B>
where
A: Serialize,
B: DeserializeOwned,
Expand All @@ -149,25 +172,23 @@ impl Connection {
self.receive().await
}

async fn _close(&mut self) {
self.internal.close().await;
self.closed = true;
}

/// Closes this connection if it isn't already closed.
/// This operation can never fail.
pub async fn close(&mut self) {
if self.closed {
pub async fn close(&self) {
let mut lock = self.inner.lock().await;
if lock.closed {
return;
}
// ignore the results of this - it doesn't matter since we're closing it either way
let _ = self._send::<()>(Message::ClosingConnection);
self._close().await;
let _ = lock._send::<()>(Message::ClosingConnection);
lock._close().await;
}

/// Check if this connection is closed.
pub fn is_closed(&self) -> bool {
self.closed
tokio::task::block_in_place(|| {
self.inner.blocking_lock().closed
})
}
}

Expand Down
Loading