crossbeam_channel/flavors/
list.rs

1//! Unbounded channel implemented as a linked list.
2
3use std::cell::UnsafeCell;
4use std::marker::PhantomData;
5use std::mem::MaybeUninit;
6use std::ptr;
7use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
8use std::time::Instant;
9
10use crossbeam_utils::{Backoff, CachePadded};
11
12use crate::context::Context;
13use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
14use crate::select::{Operation, SelectHandle, Selected, Token};
15use crate::waker::SyncWaker;
16
17// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
18// following changes by @kleimkuhler:
19//
20// 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
21// 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
22
23// Bits indicating the state of a slot:
24// * If a message has been written into the slot, `WRITE` is set.
25// * If a message has been read from the slot, `READ` is set.
26// * If the block is being destroyed, `DESTROY` is set.
27const WRITE: usize = 1;
28const READ: usize = 2;
29const DESTROY: usize = 4;
30
31// Each block covers one "lap" of indices.
32const LAP: usize = 32;
33// The maximum number of messages a block can hold.
34const BLOCK_CAP: usize = LAP - 1;
35// How many lower bits are reserved for metadata.
36const SHIFT: usize = 1;
37// Has two different purposes:
38// * If set in head, indicates that the block is not the last one.
39// * If set in tail, indicates that the channel is disconnected.
40const MARK_BIT: usize = 1;
41
42/// A slot in a block.
43struct Slot<T> {
44    /// The message.
45    msg: UnsafeCell<MaybeUninit<T>>,
46
47    /// The state of the slot.
48    state: AtomicUsize,
49}
50
51impl<T> Slot<T> {
52    const UNINIT: Self = Self {
53        msg: UnsafeCell::new(MaybeUninit::uninit()),
54        state: AtomicUsize::new(0),
55    };
56
57    /// Waits until a message is written into the slot.
58    fn wait_write(&self) {
59        let backoff = Backoff::new();
60        while self.state.load(Ordering::Acquire) & WRITE == 0 {
61            backoff.snooze();
62        }
63    }
64}
65
66/// A block in a linked list.
67///
68/// Each block in the list can hold up to `BLOCK_CAP` messages.
69struct Block<T> {
70    /// The next block in the linked list.
71    next: AtomicPtr<Block<T>>,
72
73    /// Slots for messages.
74    slots: [Slot<T>; BLOCK_CAP],
75}
76
77impl<T> Block<T> {
78    /// Creates an empty block.
79    fn new() -> Block<T> {
80        Self {
81            next: AtomicPtr::new(ptr::null_mut()),
82            slots: [Slot::UNINIT; BLOCK_CAP],
83        }
84    }
85
86    /// Waits until the next pointer is set.
87    fn wait_next(&self) -> *mut Block<T> {
88        let backoff = Backoff::new();
89        loop {
90            let next = self.next.load(Ordering::Acquire);
91            if !next.is_null() {
92                return next;
93            }
94            backoff.snooze();
95        }
96    }
97
98    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
99    unsafe fn destroy(this: *mut Block<T>, start: usize) {
100        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
101        // begun destruction of the block.
102        for i in start..BLOCK_CAP - 1 {
103            let slot = (*this).slots.get_unchecked(i);
104
105            // Mark the `DESTROY` bit if a thread is still using the slot.
106            if slot.state.load(Ordering::Acquire) & READ == 0
107                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
108            {
109                // If a thread is still using the slot, it will continue destruction of the block.
110                return;
111            }
112        }
113
114        // No thread is using the block, now it is safe to destroy it.
115        drop(Box::from_raw(this));
116    }
117}
118
119/// A position in a channel.
120#[derive(Debug)]
121struct Position<T> {
122    /// The index in the channel.
123    index: AtomicUsize,
124
125    /// The block in the linked list.
126    block: AtomicPtr<Block<T>>,
127}
128
129/// The token type for the list flavor.
130#[derive(Debug)]
131pub(crate) struct ListToken {
132    /// The block of slots.
133    block: *const u8,
134
135    /// The offset into the block.
136    offset: usize,
137}
138
139impl Default for ListToken {
140    #[inline]
141    fn default() -> Self {
142        ListToken {
143            block: ptr::null(),
144            offset: 0,
145        }
146    }
147}
148
149/// Unbounded channel implemented as a linked list.
150///
151/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
152/// represented as numbers of type `usize` and wrap on overflow.
153///
154/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
155/// improve cache efficiency.
156pub(crate) struct Channel<T> {
157    /// The head of the channel.
158    head: CachePadded<Position<T>>,
159
160    /// The tail of the channel.
161    tail: CachePadded<Position<T>>,
162
163    /// Receivers waiting while the channel is empty and not disconnected.
164    receivers: SyncWaker,
165
166    /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
167    _marker: PhantomData<T>,
168}
169
170impl<T> Channel<T> {
171    /// Creates a new unbounded channel.
172    pub(crate) fn new() -> Self {
173        Channel {
174            head: CachePadded::new(Position {
175                block: AtomicPtr::new(ptr::null_mut()),
176                index: AtomicUsize::new(0),
177            }),
178            tail: CachePadded::new(Position {
179                block: AtomicPtr::new(ptr::null_mut()),
180                index: AtomicUsize::new(0),
181            }),
182            receivers: SyncWaker::new(),
183            _marker: PhantomData,
184        }
185    }
186
187    /// Returns a receiver handle to the channel.
188    pub(crate) fn receiver(&self) -> Receiver<'_, T> {
189        Receiver(self)
190    }
191
192    /// Returns a sender handle to the channel.
193    pub(crate) fn sender(&self) -> Sender<'_, T> {
194        Sender(self)
195    }
196
197    /// Attempts to reserve a slot for sending a message.
198    fn start_send(&self, token: &mut Token) -> bool {
199        let backoff = Backoff::new();
200        let mut tail = self.tail.index.load(Ordering::Acquire);
201        let mut block = self.tail.block.load(Ordering::Acquire);
202        let mut next_block = None;
203
204        loop {
205            // Check if the channel is disconnected.
206            if tail & MARK_BIT != 0 {
207                token.list.block = ptr::null();
208                return true;
209            }
210
211            // Calculate the offset of the index into the block.
212            let offset = (tail >> SHIFT) % LAP;
213
214            // If we reached the end of the block, wait until the next one is installed.
215            if offset == BLOCK_CAP {
216                backoff.snooze();
217                tail = self.tail.index.load(Ordering::Acquire);
218                block = self.tail.block.load(Ordering::Acquire);
219                continue;
220            }
221
222            // If we're going to have to install the next block, allocate it in advance in order to
223            // make the wait for other threads as short as possible.
224            if offset + 1 == BLOCK_CAP && next_block.is_none() {
225                next_block = Some(Box::new(Block::<T>::new()));
226            }
227
228            // If this is the first message to be sent into the channel, we need to allocate the
229            // first block and install it.
230            if block.is_null() {
231                let new = Box::into_raw(Box::new(Block::<T>::new()));
232
233                if self
234                    .tail
235                    .block
236                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
237                    .is_ok()
238                {
239                    self.head.block.store(new, Ordering::Release);
240                    block = new;
241                } else {
242                    next_block = unsafe { Some(Box::from_raw(new)) };
243                    tail = self.tail.index.load(Ordering::Acquire);
244                    block = self.tail.block.load(Ordering::Acquire);
245                    continue;
246                }
247            }
248
249            let new_tail = tail + (1 << SHIFT);
250
251            // Try advancing the tail forward.
252            match self.tail.index.compare_exchange_weak(
253                tail,
254                new_tail,
255                Ordering::SeqCst,
256                Ordering::Acquire,
257            ) {
258                Ok(_) => unsafe {
259                    // If we've reached the end of the block, install the next one.
260                    if offset + 1 == BLOCK_CAP {
261                        let next_block = Box::into_raw(next_block.unwrap());
262                        self.tail.block.store(next_block, Ordering::Release);
263                        self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
264                        (*block).next.store(next_block, Ordering::Release);
265                    }
266
267                    token.list.block = block as *const u8;
268                    token.list.offset = offset;
269                    return true;
270                },
271                Err(t) => {
272                    tail = t;
273                    block = self.tail.block.load(Ordering::Acquire);
274                    backoff.spin();
275                }
276            }
277        }
278    }
279
280    /// Writes a message into the channel.
281    pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
282        // If there is no slot, the channel is disconnected.
283        if token.list.block.is_null() {
284            return Err(msg);
285        }
286
287        // Write the message into the slot.
288        let block = token.list.block.cast::<Block<T>>();
289        let offset = token.list.offset;
290        let slot = (*block).slots.get_unchecked(offset);
291        slot.msg.get().write(MaybeUninit::new(msg));
292        slot.state.fetch_or(WRITE, Ordering::Release);
293
294        // Wake a sleeping receiver.
295        self.receivers.notify();
296        Ok(())
297    }
298
299    /// Attempts to reserve a slot for receiving a message.
300    fn start_recv(&self, token: &mut Token) -> bool {
301        let backoff = Backoff::new();
302        let mut head = self.head.index.load(Ordering::Acquire);
303        let mut block = self.head.block.load(Ordering::Acquire);
304
305        loop {
306            // Calculate the offset of the index into the block.
307            let offset = (head >> SHIFT) % LAP;
308
309            // If we reached the end of the block, wait until the next one is installed.
310            if offset == BLOCK_CAP {
311                backoff.snooze();
312                head = self.head.index.load(Ordering::Acquire);
313                block = self.head.block.load(Ordering::Acquire);
314                continue;
315            }
316
317            let mut new_head = head + (1 << SHIFT);
318
319            if new_head & MARK_BIT == 0 {
320                atomic::fence(Ordering::SeqCst);
321                let tail = self.tail.index.load(Ordering::Relaxed);
322
323                // If the tail equals the head, that means the channel is empty.
324                if head >> SHIFT == tail >> SHIFT {
325                    // If the channel is disconnected...
326                    if tail & MARK_BIT != 0 {
327                        // ...then receive an error.
328                        token.list.block = ptr::null();
329                        return true;
330                    } else {
331                        // Otherwise, the receive operation is not ready.
332                        return false;
333                    }
334                }
335
336                // If head and tail are not in the same block, set `MARK_BIT` in head.
337                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
338                    new_head |= MARK_BIT;
339                }
340            }
341
342            // The block can be null here only if the first message is being sent into the channel.
343            // In that case, just wait until it gets initialized.
344            if block.is_null() {
345                backoff.snooze();
346                head = self.head.index.load(Ordering::Acquire);
347                block = self.head.block.load(Ordering::Acquire);
348                continue;
349            }
350
351            // Try moving the head index forward.
352            match self.head.index.compare_exchange_weak(
353                head,
354                new_head,
355                Ordering::SeqCst,
356                Ordering::Acquire,
357            ) {
358                Ok(_) => unsafe {
359                    // If we've reached the end of the block, move to the next one.
360                    if offset + 1 == BLOCK_CAP {
361                        let next = (*block).wait_next();
362                        let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
363                        if !(*next).next.load(Ordering::Relaxed).is_null() {
364                            next_index |= MARK_BIT;
365                        }
366
367                        self.head.block.store(next, Ordering::Release);
368                        self.head.index.store(next_index, Ordering::Release);
369                    }
370
371                    token.list.block = block as *const u8;
372                    token.list.offset = offset;
373                    return true;
374                },
375                Err(h) => {
376                    head = h;
377                    block = self.head.block.load(Ordering::Acquire);
378                    backoff.spin();
379                }
380            }
381        }
382    }
383
384    /// Reads a message from the channel.
385    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
386        if token.list.block.is_null() {
387            // The channel is disconnected.
388            return Err(());
389        }
390
391        // Read the message.
392        let block = token.list.block as *mut Block<T>;
393        let offset = token.list.offset;
394        let slot = (*block).slots.get_unchecked(offset);
395        slot.wait_write();
396        let msg = slot.msg.get().read().assume_init();
397
398        // Destroy the block if we've reached the end, or if another thread wanted to destroy but
399        // couldn't because we were busy reading from the slot.
400        if offset + 1 == BLOCK_CAP {
401            Block::destroy(block, 0);
402        } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
403            Block::destroy(block, offset + 1);
404        }
405
406        Ok(msg)
407    }
408
409    /// Attempts to send a message into the channel.
410    pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
411        self.send(msg, None).map_err(|err| match err {
412            SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
413            SendTimeoutError::Timeout(_) => unreachable!(),
414        })
415    }
416
417    /// Sends a message into the channel.
418    pub(crate) fn send(
419        &self,
420        msg: T,
421        _deadline: Option<Instant>,
422    ) -> Result<(), SendTimeoutError<T>> {
423        let token = &mut Token::default();
424        assert!(self.start_send(token));
425        unsafe {
426            self.write(token, msg)
427                .map_err(SendTimeoutError::Disconnected)
428        }
429    }
430
431    /// Attempts to receive a message without blocking.
432    pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
433        let token = &mut Token::default();
434
435        if self.start_recv(token) {
436            unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
437        } else {
438            Err(TryRecvError::Empty)
439        }
440    }
441
442    /// Receives a message from the channel.
443    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
444        let token = &mut Token::default();
445        loop {
446            // Try receiving a message several times.
447            let backoff = Backoff::new();
448            loop {
449                if self.start_recv(token) {
450                    unsafe {
451                        return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
452                    }
453                }
454
455                if backoff.is_completed() {
456                    break;
457                } else {
458                    backoff.snooze();
459                }
460            }
461
462            if let Some(d) = deadline {
463                if Instant::now() >= d {
464                    return Err(RecvTimeoutError::Timeout);
465                }
466            }
467
468            // Prepare for blocking until a sender wakes us up.
469            Context::with(|cx| {
470                let oper = Operation::hook(token);
471                self.receivers.register(oper, cx);
472
473                // Has the channel become ready just now?
474                if !self.is_empty() || self.is_disconnected() {
475                    let _ = cx.try_select(Selected::Aborted);
476                }
477
478                // Block the current thread.
479                let sel = cx.wait_until(deadline);
480
481                match sel {
482                    Selected::Waiting => unreachable!(),
483                    Selected::Aborted | Selected::Disconnected => {
484                        self.receivers.unregister(oper).unwrap();
485                        // If the channel was disconnected, we still have to check for remaining
486                        // messages.
487                    }
488                    Selected::Operation(_) => {}
489                }
490            });
491        }
492    }
493
494    /// Returns the current number of messages inside the channel.
495    pub(crate) fn len(&self) -> usize {
496        loop {
497            // Load the tail index, then load the head index.
498            let mut tail = self.tail.index.load(Ordering::SeqCst);
499            let mut head = self.head.index.load(Ordering::SeqCst);
500
501            // If the tail index didn't change, we've got consistent indices to work with.
502            if self.tail.index.load(Ordering::SeqCst) == tail {
503                // Erase the lower bits.
504                tail &= !((1 << SHIFT) - 1);
505                head &= !((1 << SHIFT) - 1);
506
507                // Fix up indices if they fall onto block ends.
508                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
509                    tail = tail.wrapping_add(1 << SHIFT);
510                }
511                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
512                    head = head.wrapping_add(1 << SHIFT);
513                }
514
515                // Rotate indices so that head falls into the first block.
516                let lap = (head >> SHIFT) / LAP;
517                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
518                head = head.wrapping_sub((lap * LAP) << SHIFT);
519
520                // Remove the lower bits.
521                tail >>= SHIFT;
522                head >>= SHIFT;
523
524                // Return the difference minus the number of blocks between tail and head.
525                return tail - head - tail / LAP;
526            }
527        }
528    }
529
530    /// Returns the capacity of the channel.
531    pub(crate) fn capacity(&self) -> Option<usize> {
532        None
533    }
534
535    /// Disconnects senders and wakes up all blocked receivers.
536    ///
537    /// Returns `true` if this call disconnected the channel.
538    pub(crate) fn disconnect_senders(&self) -> bool {
539        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
540
541        if tail & MARK_BIT == 0 {
542            self.receivers.disconnect();
543            true
544        } else {
545            false
546        }
547    }
548
549    /// Disconnects receivers.
550    ///
551    /// Returns `true` if this call disconnected the channel.
552    pub(crate) fn disconnect_receivers(&self) -> bool {
553        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
554
555        if tail & MARK_BIT == 0 {
556            // If receivers are dropped first, discard all messages to free
557            // memory eagerly.
558            self.discard_all_messages();
559            true
560        } else {
561            false
562        }
563    }
564
565    /// Discards all messages.
566    ///
567    /// This method should only be called when all receivers are dropped.
568    fn discard_all_messages(&self) {
569        let backoff = Backoff::new();
570        let mut tail = self.tail.index.load(Ordering::Acquire);
571        loop {
572            let offset = (tail >> SHIFT) % LAP;
573            if offset != BLOCK_CAP {
574                break;
575            }
576
577            // New updates to tail will be rejected by MARK_BIT and aborted unless it's
578            // at boundary. We need to wait for the updates take affect otherwise there
579            // can be memory leaks.
580            backoff.snooze();
581            tail = self.tail.index.load(Ordering::Acquire);
582        }
583
584        let mut head = self.head.index.load(Ordering::Acquire);
585        let mut block = self.head.block.load(Ordering::Acquire);
586
587        // If we're going to be dropping messages we need to synchronize with initialization
588        if head >> SHIFT != tail >> SHIFT {
589            // The block can be null here only if a sender is in the process of initializing the
590            // channel while another sender managed to send a message by inserting it into the
591            // semi-initialized channel and advanced the tail.
592            // In that case, just wait until it gets initialized.
593            while block.is_null() {
594                backoff.snooze();
595                block = self.head.block.load(Ordering::Acquire);
596            }
597        }
598        unsafe {
599            // Drop all messages between head and tail and deallocate the heap-allocated blocks.
600            while head >> SHIFT != tail >> SHIFT {
601                let offset = (head >> SHIFT) % LAP;
602
603                if offset < BLOCK_CAP {
604                    // Drop the message in the slot.
605                    let slot = (*block).slots.get_unchecked(offset);
606                    slot.wait_write();
607                    (*slot.msg.get()).assume_init_drop();
608                } else {
609                    (*block).wait_next();
610                    // Deallocate the block and move to the next one.
611                    let next = (*block).next.load(Ordering::Acquire);
612                    drop(Box::from_raw(block));
613                    block = next;
614                }
615
616                head = head.wrapping_add(1 << SHIFT);
617            }
618
619            // Deallocate the last remaining block.
620            if !block.is_null() {
621                drop(Box::from_raw(block));
622            }
623        }
624        head &= !MARK_BIT;
625        self.head.block.store(ptr::null_mut(), Ordering::Release);
626        self.head.index.store(head, Ordering::Release);
627    }
628
629    /// Returns `true` if the channel is disconnected.
630    pub(crate) fn is_disconnected(&self) -> bool {
631        self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
632    }
633
634    /// Returns `true` if the channel is empty.
635    pub(crate) fn is_empty(&self) -> bool {
636        let head = self.head.index.load(Ordering::SeqCst);
637        let tail = self.tail.index.load(Ordering::SeqCst);
638        head >> SHIFT == tail >> SHIFT
639    }
640
641    /// Returns `true` if the channel is full.
642    pub(crate) fn is_full(&self) -> bool {
643        false
644    }
645}
646
647impl<T> Drop for Channel<T> {
648    fn drop(&mut self) {
649        let mut head = *self.head.index.get_mut();
650        let mut tail = *self.tail.index.get_mut();
651        let mut block = *self.head.block.get_mut();
652
653        // Erase the lower bits.
654        head &= !((1 << SHIFT) - 1);
655        tail &= !((1 << SHIFT) - 1);
656
657        unsafe {
658            // Drop all messages between head and tail and deallocate the heap-allocated blocks.
659            while head != tail {
660                let offset = (head >> SHIFT) % LAP;
661
662                if offset < BLOCK_CAP {
663                    // Drop the message in the slot.
664                    let slot = (*block).slots.get_unchecked(offset);
665                    (*slot.msg.get()).assume_init_drop();
666                } else {
667                    // Deallocate the block and move to the next one.
668                    let next = *(*block).next.get_mut();
669                    drop(Box::from_raw(block));
670                    block = next;
671                }
672
673                head = head.wrapping_add(1 << SHIFT);
674            }
675
676            // Deallocate the last remaining block.
677            if !block.is_null() {
678                drop(Box::from_raw(block));
679            }
680        }
681    }
682}
683
684/// Receiver handle to a channel.
685pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
686
687/// Sender handle to a channel.
688pub(crate) struct Sender<'a, T>(&'a Channel<T>);
689
690impl<T> SelectHandle for Receiver<'_, T> {
691    fn try_select(&self, token: &mut Token) -> bool {
692        self.0.start_recv(token)
693    }
694
695    fn deadline(&self) -> Option<Instant> {
696        None
697    }
698
699    fn register(&self, oper: Operation, cx: &Context) -> bool {
700        self.0.receivers.register(oper, cx);
701        self.is_ready()
702    }
703
704    fn unregister(&self, oper: Operation) {
705        self.0.receivers.unregister(oper);
706    }
707
708    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
709        self.try_select(token)
710    }
711
712    fn is_ready(&self) -> bool {
713        !self.0.is_empty() || self.0.is_disconnected()
714    }
715
716    fn watch(&self, oper: Operation, cx: &Context) -> bool {
717        self.0.receivers.watch(oper, cx);
718        self.is_ready()
719    }
720
721    fn unwatch(&self, oper: Operation) {
722        self.0.receivers.unwatch(oper);
723    }
724}
725
726impl<T> SelectHandle for Sender<'_, T> {
727    fn try_select(&self, token: &mut Token) -> bool {
728        self.0.start_send(token)
729    }
730
731    fn deadline(&self) -> Option<Instant> {
732        None
733    }
734
735    fn register(&self, _oper: Operation, _cx: &Context) -> bool {
736        self.is_ready()
737    }
738
739    fn unregister(&self, _oper: Operation) {}
740
741    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
742        self.try_select(token)
743    }
744
745    fn is_ready(&self) -> bool {
746        true
747    }
748
749    fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
750        self.is_ready()
751    }
752
753    fn unwatch(&self, _oper: Operation) {}
754}