-
Notifications
You must be signed in to change notification settings - Fork 11
crossbeam related
There should be credit to Dmitry Vyukov, who authored the original algorithm. The crossbeam crate has it implemented in Rust.
http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
https://github.com/crossbeam-rs/crossbeam/blob/master/crossbeam-queue/src/array_queue.rs
struct Slot<T> {
/// The current stamp.
/// If the stamp equals the tail, this node will be next written to. If it equals head + 1,
/// this node will be next read from.
stamp: AtomicUsize,
/// The value in this slot.
value: UnsafeCell<MaybeUninit<T>>,
}
pub struct ArrayQueue<T> {
/// The head of the queue.
///
/// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
/// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
///
/// Elements are popped from the head of the queue.
head: CachePadded<AtomicUsize>,
/// The tail of the queue.
///
/// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a
/// single `usize`. The lower bits represent the index, while the upper bits represent the lap.
///
/// Elements are pushed into the tail of the queue.
tail: CachePadded<AtomicUsize>,
/// The buffer holding slots.
buffer: Box<[Slot<T>]>,
/// A stamp with the value of `{ lap: 1, index: 0 }`.
one_lap: usize,
}
There are two atomic Head and Tail to maintain the position of the reader and writer in the array. stamp atomic value in the Slot is to tell whether the value is written. Both readers and writers will backoff and spin when they read an inconsistent state of stamp. Therefore, it serves as a fence to prevent unsafely reading from the value.
Only one reader can read the queue at one moment, and only one writer can write the queue at one moment. So the ceiling performance of this algorithm is effectively utilizing two cores; If more cores join for contention, there is a cost in spin and yield.
https://github.com/crossbeam-rs/crossbeam/blob/master/crossbeam-channel/src/flavors/array.rs
In pseudocode:
'MAIN: loop {
for some backoff retry {
if start_send() {
// content with a value position to write
write_value;
update_stamp;
return;
}
backoff;
}
register_waker();
//examine the channel full condition.
if is_full() {
park()
} else {
cancel_waker();
continue 'MAIN;
}
}
NOTE that:
- is_full() uses two SeqCst to read tail and head, is_empty() uses SeqCst to read head and tail in the reverse order than is_full(). SeqCst is necessary to prevent reading old value from atomic, otherwise it might lead to a deadlock.
- Backoff is essential for performance, because the context switch cost of thread park/unpark is higher than the cost of yield.
- thread::park() may be spuriously woken (just like the scheduling async runtime). It will check the status of waker; if it's not notified, sleep again.
You might ask, is_full() uses two atomic reads, why does it make sense to determine a stopping condition? Because bounded channel has limited slot. After reading head, although previously read tail is not the same moment as head, the channel is "rendezvous" to the same point, no writer can advance the pointer. And because the consumer side is reversed order operations, it also makes sense to is_empty().
My attention has not been put in SegQueue, leave this section for the future.
Crossfire 2.1 has dropped the dependency on crossbeam-channel, because the notification layer in it is not used by us. This removes the extra cost in waker registry iteration when each time try_send() / try_recv().
The code of ArrayQueue is copied within this library, under the path
There are some modifications to ArrayQueue:
1). Unused methods are removed (like force_push())
2). Change the type T into *const T in push(), in order to reduce moving cost when the channel is full.
The push() method is renamed to push_with_ptr()
crossbeam-channel has similar ideas. They also maintained a duplicated implementation to ArrayQueue, by splitting the try_send into two steps
3). Add a new method try_push_oneshot()
Its purpose is just to combine is_full() and try_send() into one call. We use this method to double-check the full condition.
There are three possible return conditions: full / send_ok / uncertain (need more spinning)
There's a high chance of sending successfully if the channel is not full.
Note that it has the same ordering as the original is_full()
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
if head.wrapping_add(self.one_lap) == $tail {
// full...
}