Skip to content

Conversation

onur-ozkan
Copy link
Collaborator

@onur-ozkan onur-ozkan commented Jun 11, 2025

Right now, in order to release the recv lock in my current project, I have to do this:

async fn message_loop(self: Arc<Self>) {
    loop {
        let recv_future = smol::unblock({
            let _self = self.clone();
            move || _self.connection.recv()
        });

        match recv_future.timeout(Duration::from_millis(200)).await {
            None => {
                simple_log::warning!("Timed out while waiting for message!");
                Timer::after(Duration::from_millis(200)).await;
            },
            // Rest of the work
        }
    }
}

which is problematic because:

  1. It requires a lot of clones.
  2. I have to convert recv() into a Future.
  3. I have to wrap it with a timeout.
  4. Points 2 and 3 require additional dependencies.
  5. This approach is bad for performance on low-power devices.

Instead, I would prefer to simply do this:

async fn message_loop(&self) {
    loop {
        match self.connection.try_recv() {
            None => {
                // Prevent busy-looping
                Timer::after(Duration::from_millis(50)).await;
            },
            // Rest of the work
        }
    }
}

which solves all the issues mentioned above.

This patch implements try_recv for the MavConnection trait, which allows users (like me) to have more control over their message receiving implementations. I looked at AsyncMavConnection, but as far as I can see it's non-blocking anyway. I am not sure why but it doesn't seem consistent with the MavConnection trait.

@joaoantoniocardoso
Copy link
Collaborator

Hi, I see that the Option is very simple and solves your case here, but masking parsing errors as None might narrow its use. For instance, if someone is trying to build a small router, parsing errors might be instrumental.

So maybe we could have something around

enum TryRecvError {
    Empty,
    Disconnected,
    Parse(ParserError),
}

What do you think?

@onur-ozkan
Copy link
Collaborator Author

Hi, I see that the Option is very simple and solves your case here, but masking parsing errors as None might narrow its use. For instance, if someone is trying to build a small router, parsing errors might be instrumental.

So maybe we could have something around

enum TryRecvError {
    Empty,
    Disconnected,
    Parse(ParserError),
}

What do you think?

We can return the inner Err directly without creating another error type. This patch should work for everyone (I think):

diff --git a/mavlink-core/src/connection/direct_serial.rs b/mavlink-core/src/connection/direct_serial.rs
index bb0fefe..f9f2a7a 100644
--- a/mavlink-core/src/connection/direct_serial.rs
+++ b/mavlink-core/src/connection/direct_serial.rs
@@ -52,7 +52,7 @@ impl<M: Message> MavConnection<M> for SerialConnection {
         }
     }
 
-    fn try_recv(&self) -> Result<Option<(MavHeader, M)>, MessageReadError> {
+    fn try_recv(&self) -> Result<(MavHeader, M), MessageReadError> {
         let mut port = self.port.lock().unwrap();
         let version = ReadVersion::from_conn_cfg::<_, M>(self);
 
@@ -63,13 +63,7 @@ impl<M: Message> MavConnection<M> for SerialConnection {
         let result =
             read_versioned_msg_signed(port.deref_mut(), version, self.signing_data.as_ref());
 
-        match result {
-            Ok(msg) => Ok(Some(msg)),
-            Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
-                Err(MessageReadError::Io(e))
-            }
-            Err(_) => Ok(None),
-        }
+        result
     }
 
     fn send(&self, header: &MavHeader, data: &M) -> Result<usize, MessageWriteError> {
diff --git a/mavlink-core/src/connection/file.rs b/mavlink-core/src/connection/file.rs
index 61adb81..055fd2e 100644
--- a/mavlink-core/src/connection/file.rs
+++ b/mavlink-core/src/connection/file.rs
@@ -64,7 +64,7 @@ impl<M: Message> MavConnection<M> for FileConnection {
         }
     }
 
-    fn try_recv(&self) -> Result<Option<(MavHeader, M)>, crate::error::MessageReadError> {
+    fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
         let mut file = self.file.lock().unwrap();
         let version = ReadVersion::from_conn_cfg::<_, M>(self);
 
@@ -74,13 +74,7 @@ impl<M: Message> MavConnection<M> for FileConnection {
         let result =
             read_versioned_msg_signed(file.deref_mut(), version, self.signing_data.as_ref());
 
-        match result {
-            Ok(msg) => Ok(Some(msg)),
-            Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
-                Err(MessageReadError::Io(e))
-            }
-            Err(_) => Ok(None),
-        }
+        result
     }
 
     fn send(&self, _header: &MavHeader, _data: &M) -> Result<usize, MessageWriteError> {
diff --git a/mavlink-core/src/connection/mod.rs b/mavlink-core/src/connection/mod.rs
index 1ac4da5..d6435cd 100644
--- a/mavlink-core/src/connection/mod.rs
+++ b/mavlink-core/src/connection/mod.rs
@@ -27,7 +27,7 @@ pub trait MavConnection<M: Message> {
     /// Try to receive a MAVLink message.
     ///
     /// Non-blocking variant of `recv()`, should return `None` immediately if no message is available.
-    fn try_recv(&self) -> Result<Option<(MavHeader, M)>, crate::error::MessageReadError>;
+    fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError>;
 
     /// Send a MAVLink message
     fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError>;
diff --git a/mavlink-core/src/connection/tcp.rs b/mavlink-core/src/connection/tcp.rs
index 76d535b..d1dc4df 100644
--- a/mavlink-core/src/connection/tcp.rs
+++ b/mavlink-core/src/connection/tcp.rs
@@ -96,9 +96,8 @@ impl<M: Message> MavConnection<M> for TcpConnection {
         result
     }
 
-    fn try_recv(&self) -> Result<Option<(MavHeader, M)>, crate::error::MessageReadError> {
-        // `TcpConnection::recv` doesn't seem to block?
-        self.recv().map(Some)
+    fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
+        self.recv()
     }
 
     fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {
diff --git a/mavlink-core/src/connection/udp.rs b/mavlink-core/src/connection/udp.rs
index b808b2f..aab993d 100644
--- a/mavlink-core/src/connection/udp.rs
+++ b/mavlink-core/src/connection/udp.rs
@@ -102,7 +102,7 @@ impl<M: Message> MavConnection<M> for UdpConnection {
         }
     }
 
-    fn try_recv(&self) -> Result<Option<(MavHeader, M)>, crate::error::MessageReadError> {
+    fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
         let mut reader = self.reader.lock().unwrap();
         let version = ReadVersion::from_conn_cfg::<_, M>(self);
 
@@ -118,7 +118,7 @@ impl<M: Message> MavConnection<M> for UdpConnection {
             }
         }
 
-        Ok(result.ok())
+        result
     }
 
     fn send(&self, header: &MavHeader, data: &M) -> Result<usize, crate::error::MessageWriteError> {

What do you think?

@joaoantoniocardoso
Copy link
Collaborator

joaoantoniocardoso commented Jun 19, 2025

That is fine, too. By doing that, we are leaving the responsibility of the semantics to the user -- he will need to figure out what error to map to his reconnection logic, for example.

Just be sure to update the trait comments :)

@onur-ozkan
Copy link
Collaborator Author

Thank you, I will update the comments tomorrow or the day after.

@onur-ozkan
Copy link
Collaborator Author

Thank you, I will update the comments tomorrow or the day after.

Done :)

@patrickelectric
Copy link
Member

Hi @onur-ozkan for the sake of organization, can you squash 03df830 with 7215b73 ? Since you are fixing a commit that is already in the PR.

@onur-ozkan
Copy link
Collaborator Author

Hi @onur-ozkan for the sake of organization, can you squash 03df830 with 7215b73 ? Since you are fixing a commit that is already in the PR.

Sure, done.

Aren't we squash commits on merge already btw? Why is that necessary?

@patrickelectric
Copy link
Member

Hi @onur-ozkan for the sake of organization, can you squash 03df830 with 7215b73 ? Since you are fixing a commit that is already in the PR.

Sure, done.

Aren't we squash commits on merge already btw? Why is that necessary?

We mostly do Rebase workflow over Merge workflow for better organization of the timeline.

But, we can also merge on our end.

Thanks!

@patrickelectric patrickelectric merged commit eb818f6 into mavlink:master Jun 20, 2025
45 checks passed
@onur-ozkan onur-ozkan deleted the try-recv branch June 20, 2025 13:24
This was referenced Jun 21, 2025
@rickyjames35
Copy link

    /// Try to receive a MAVLink message.
    ///
    /// Non-blocking variant of `recv()`, returns immediately with a `MessageReadError`
    /// if there is an error or no message is available.
    fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError>;

Anyone else notice that with udpin the try_recv method will just block until it receives a message? The socket was never setup with set_nonblocking so it just blocks.

@onur-ozkan
Copy link
Collaborator Author

onur-ozkan commented Sep 12, 2025

    /// Try to receive a MAVLink message.
    ///
    /// Non-blocking variant of `recv()`, returns immediately with a `MessageReadError`
    /// if there is an error or no message is available.
    fn try_recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError>;

Anyone else notice that with udpin the try_recv method will just block until it receives a message? The socket was never setup with set_nonblocking so it just blocks.

We should probably have a timeout logic for protocols that are blocking. I will create an issue to track it.

NVM. I realized we can switch socket state during try_recv. Can you test #396 by any chance?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants