Skip to content

Queue的mutability问题 #6

@fandahao17

Description

@fandahao17

我们的测试代码是这样使用Queue的:

        // 用new方法创建一个长度为10的queue。
        let q: Queue<u32> = Queue::new(10);
        // 两个任务共享所有权,所以需Arc包装。
        let mut shared_queue = Arc::new(q);
        let q1 = Arc::clone(&shared_queue);
        let q2 = Arc::clone(&shared_queue);

        // 发送数据的任务代码。
        let sender = move || {
            for i in 1..11 {
                // send方法的参数包括要发送的数据和ticks_to_wait
                q1.send(i, pdMS_TO_TICKS!(50)).unwrap();
            }
        };

        // 接收数据的任务代码。
        let receiver = move || {
            let mut sum = 0;
            loop {
                // receive方法的参数只有ticks_to_wait
                if let Ok(x) = q2.receive(pdMS_TO_TICKS!(300)) {
                    sum += x;
                } else {
                    // 若等待300ms仍未收到数据,则认为发送结束。
                    assert_eq!(sum, 55);
                    kernel::task_end_scheduler();
                }
            }
        };

这段代码是有问题的。Arc产生的引用q1和q2都是immutable的引用,然而send和receive方法都需要&mut self,所以编译不通过。

显然我们不能让两个任务共享mut引用,因为这是违背Rust的所有权的。我们可以通过RwLock来避免编译Error。RwLock是读写锁,它的作用是在运行时检查是否只有一个写引用或只有读引用。修改后,队列创建和读写的代码如下:

let mut shared_queue = Arc::new(RwLock::new(q));

// Sender 任务中
q1.write().unwrap().send(i, pdMS_TO_TICKS!(50)).unwrap();

// Receiver 任务中
q2.write().unwrap().receive(pdMS_TO_TICKS!(300)).unwrap();

然而,考虑以下情况:Receiver先执行,它用write()方法获得写权限,但是随后receive失败,该任务被阻塞(任务只是停住了,但写权限并未释放)。然后Sender执行,它试图用write()方法获得写权限,但RwLock不允许,遂失败。此时,Receiver在等待Sender传东西入队,Sender在等待Receiver释放队列的写权限,死锁发生了!(这正是我们的测试代码出现的情况)

所以不能简单的用RwLock封装。其实仔细思考,我们的Queue就是为了实现信号量,不应该用RwLock这种Rust语言自身提供的同步机制。对比freertos.rs,它的queue的receive和send方法都只需要&self,所以避免了这个问题。所以我们也需要把这些方法改成&self,因此我们要为队列实现Interior mutability

我参考这篇博文介绍的RwLock等的实现方式,试写了一下,可以编译:

use std::cell::UnsafeCell;

pub struct QueueIM<T>(UnsafeCell<QueueDefinition<T>>) where T: Default + Clone;
unsafe impl<T: Default + Clone> Send for QueueIM<T> {}
unsafe impl<T: Default + Clone> Sync for QueueIM<T> {}

impl <T>QueueIM<T> where T: Default + Clone{
    pub fn new(length: UBaseType) -> Self {
        QueueIM(UnsafeCell::new(QueueDefinition::new_type(length, QueueType::Base)))
    }

    pub fn send(&self,pvItemToQueue:T,xTicksToWait:TickType) -> (Result<(), QueueError>){
        unsafe {
            let inner = self.0.get();
            (*inner).queue_generic_send(pvItemToQueue,xTicksToWait,queueSEND_TO_BACK)
        }
    }

    pub fn receive(&self,xTicksToWait:TickType) -> Result<T, QueueError> {
        unsafe {
            let inner = self.0.get();
            (*inner).queue_generic_receive(xTicksToWait,false)
        }
    }
}

实现sendsync是为了让Queue可以在不同线程间共享。new、send和receive方法我都直接抄了Queue的对应方法。

尽管在这里使用了很多unsafe,但因为我们的Queue的receive和send方法内实现了同步阻塞机制,所以不会造成数据竞争,我们的代码还是安全的。通过interior mutability的方式,就可以让Queue作为一个immutable的引用在不同线程间愉快地共享了。

Queue,Mutex,Semaphore应该都需要这样处理,你们这样包装一下吧。

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions