crossbeam_channel/flavors/
zero.rs

1//! Zero-capacity channel.
2//!
3//! This kind of channel is also known as *rendezvous* channel.
4
5use std::cell::UnsafeCell;
6use std::marker::PhantomData;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Mutex;
9use std::time::Instant;
10use std::{fmt, ptr};
11
12use crossbeam_utils::Backoff;
13
14use crate::context::Context;
15use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
16use crate::select::{Operation, SelectHandle, Selected, Token};
17use crate::waker::Waker;
18
19/// A pointer to a packet.
20pub(crate) struct ZeroToken(*mut ());
21
22impl Default for ZeroToken {
23    fn default() -> Self {
24        Self(ptr::null_mut())
25    }
26}
27
28impl fmt::Debug for ZeroToken {
29    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30        fmt::Debug::fmt(&(self.0 as usize), f)
31    }
32}
33
34/// A slot for passing one message from a sender to a receiver.
35struct Packet<T> {
36    /// Equals `true` if the packet is allocated on the stack.
37    on_stack: bool,
38
39    /// Equals `true` once the packet is ready for reading or writing.
40    ready: AtomicBool,
41
42    /// The message.
43    msg: UnsafeCell<Option<T>>,
44}
45
46impl<T> Packet<T> {
47    /// Creates an empty packet on the stack.
48    fn empty_on_stack() -> Packet<T> {
49        Packet {
50            on_stack: true,
51            ready: AtomicBool::new(false),
52            msg: UnsafeCell::new(None),
53        }
54    }
55
56    /// Creates an empty packet on the heap.
57    fn empty_on_heap() -> Box<Packet<T>> {
58        Box::new(Packet {
59            on_stack: false,
60            ready: AtomicBool::new(false),
61            msg: UnsafeCell::new(None),
62        })
63    }
64
65    /// Creates a packet on the stack, containing a message.
66    fn message_on_stack(msg: T) -> Packet<T> {
67        Packet {
68            on_stack: true,
69            ready: AtomicBool::new(false),
70            msg: UnsafeCell::new(Some(msg)),
71        }
72    }
73
74    /// Waits until the packet becomes ready for reading or writing.
75    fn wait_ready(&self) {
76        let backoff = Backoff::new();
77        while !self.ready.load(Ordering::Acquire) {
78            backoff.snooze();
79        }
80    }
81}
82
83/// Inner representation of a zero-capacity channel.
84struct Inner {
85    /// Senders waiting to pair up with a receive operation.
86    senders: Waker,
87
88    /// Receivers waiting to pair up with a send operation.
89    receivers: Waker,
90
91    /// Equals `true` when the channel is disconnected.
92    is_disconnected: bool,
93}
94
95/// Zero-capacity channel.
96pub(crate) struct Channel<T> {
97    /// Inner representation of the channel.
98    inner: Mutex<Inner>,
99
100    /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
101    _marker: PhantomData<T>,
102}
103
104impl<T> Channel<T> {
105    /// Constructs a new zero-capacity channel.
106    pub(crate) fn new() -> Self {
107        Channel {
108            inner: Mutex::new(Inner {
109                senders: Waker::new(),
110                receivers: Waker::new(),
111                is_disconnected: false,
112            }),
113            _marker: PhantomData,
114        }
115    }
116
117    /// Returns a receiver handle to the channel.
118    pub(crate) fn receiver(&self) -> Receiver<'_, T> {
119        Receiver(self)
120    }
121
122    /// Returns a sender handle to the channel.
123    pub(crate) fn sender(&self) -> Sender<'_, T> {
124        Sender(self)
125    }
126
127    /// Attempts to reserve a slot for sending a message.
128    fn start_send(&self, token: &mut Token) -> bool {
129        let mut inner = self.inner.lock().unwrap();
130
131        // If there's a waiting receiver, pair up with it.
132        if let Some(operation) = inner.receivers.try_select() {
133            token.zero.0 = operation.packet;
134            true
135        } else if inner.is_disconnected {
136            token.zero.0 = ptr::null_mut();
137            true
138        } else {
139            false
140        }
141    }
142
143    /// Writes a message into the packet.
144    pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
145        // If there is no packet, the channel is disconnected.
146        if token.zero.0.is_null() {
147            return Err(msg);
148        }
149
150        let packet = &*(token.zero.0 as *const Packet<T>);
151        packet.msg.get().write(Some(msg));
152        packet.ready.store(true, Ordering::Release);
153        Ok(())
154    }
155
156    /// Attempts to pair up with a sender.
157    fn start_recv(&self, token: &mut Token) -> bool {
158        let mut inner = self.inner.lock().unwrap();
159
160        // If there's a waiting sender, pair up with it.
161        if let Some(operation) = inner.senders.try_select() {
162            token.zero.0 = operation.packet;
163            true
164        } else if inner.is_disconnected {
165            token.zero.0 = ptr::null_mut();
166            true
167        } else {
168            false
169        }
170    }
171
172    /// Reads a message from the packet.
173    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
174        // If there is no packet, the channel is disconnected.
175        if token.zero.0.is_null() {
176            return Err(());
177        }
178
179        let packet = &*(token.zero.0 as *const Packet<T>);
180
181        if packet.on_stack {
182            // The message has been in the packet from the beginning, so there is no need to wait
183            // for it. However, after reading the message, we need to set `ready` to `true` in
184            // order to signal that the packet can be destroyed.
185            let msg = packet.msg.get().replace(None).unwrap();
186            packet.ready.store(true, Ordering::Release);
187            Ok(msg)
188        } else {
189            // Wait until the message becomes available, then read it and destroy the
190            // heap-allocated packet.
191            packet.wait_ready();
192            let msg = packet.msg.get().replace(None).unwrap();
193            drop(Box::from_raw(token.zero.0.cast::<Packet<T>>()));
194            Ok(msg)
195        }
196    }
197
198    /// Attempts to send a message into the channel.
199    pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
200        let token = &mut Token::default();
201        let mut inner = self.inner.lock().unwrap();
202
203        // If there's a waiting receiver, pair up with it.
204        if let Some(operation) = inner.receivers.try_select() {
205            token.zero.0 = operation.packet;
206            drop(inner);
207            unsafe {
208                self.write(token, msg).ok().unwrap();
209            }
210            Ok(())
211        } else if inner.is_disconnected {
212            Err(TrySendError::Disconnected(msg))
213        } else {
214            Err(TrySendError::Full(msg))
215        }
216    }
217
218    /// Sends a message into the channel.
219    pub(crate) fn send(
220        &self,
221        msg: T,
222        deadline: Option<Instant>,
223    ) -> Result<(), SendTimeoutError<T>> {
224        let token = &mut Token::default();
225        let mut inner = self.inner.lock().unwrap();
226
227        // If there's a waiting receiver, pair up with it.
228        if let Some(operation) = inner.receivers.try_select() {
229            token.zero.0 = operation.packet;
230            drop(inner);
231            unsafe {
232                self.write(token, msg).ok().unwrap();
233            }
234            return Ok(());
235        }
236
237        if inner.is_disconnected {
238            return Err(SendTimeoutError::Disconnected(msg));
239        }
240
241        Context::with(|cx| {
242            // Prepare for blocking until a receiver wakes us up.
243            let oper = Operation::hook(token);
244            let mut packet = Packet::<T>::message_on_stack(msg);
245            inner
246                .senders
247                .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
248            inner.receivers.notify();
249            drop(inner);
250
251            // Block the current thread.
252            let sel = cx.wait_until(deadline);
253
254            match sel {
255                Selected::Waiting => unreachable!(),
256                Selected::Aborted => {
257                    self.inner.lock().unwrap().senders.unregister(oper).unwrap();
258                    let msg = unsafe { packet.msg.get().replace(None).unwrap() };
259                    Err(SendTimeoutError::Timeout(msg))
260                }
261                Selected::Disconnected => {
262                    self.inner.lock().unwrap().senders.unregister(oper).unwrap();
263                    let msg = unsafe { packet.msg.get().replace(None).unwrap() };
264                    Err(SendTimeoutError::Disconnected(msg))
265                }
266                Selected::Operation(_) => {
267                    // Wait until the message is read, then drop the packet.
268                    packet.wait_ready();
269                    Ok(())
270                }
271            }
272        })
273    }
274
275    /// Attempts to receive a message without blocking.
276    pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
277        let token = &mut Token::default();
278        let mut inner = self.inner.lock().unwrap();
279
280        // If there's a waiting sender, pair up with it.
281        if let Some(operation) = inner.senders.try_select() {
282            token.zero.0 = operation.packet;
283            drop(inner);
284            unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
285        } else if inner.is_disconnected {
286            Err(TryRecvError::Disconnected)
287        } else {
288            Err(TryRecvError::Empty)
289        }
290    }
291
292    /// Receives a message from the channel.
293    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
294        let token = &mut Token::default();
295        let mut inner = self.inner.lock().unwrap();
296
297        // If there's a waiting sender, pair up with it.
298        if let Some(operation) = inner.senders.try_select() {
299            token.zero.0 = operation.packet;
300            drop(inner);
301            unsafe {
302                return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
303            }
304        }
305
306        if inner.is_disconnected {
307            return Err(RecvTimeoutError::Disconnected);
308        }
309
310        Context::with(|cx| {
311            // Prepare for blocking until a sender wakes us up.
312            let oper = Operation::hook(token);
313            let mut packet = Packet::<T>::empty_on_stack();
314            inner.receivers.register_with_packet(
315                oper,
316                &mut packet as *mut Packet<T> as *mut (),
317                cx,
318            );
319            inner.senders.notify();
320            drop(inner);
321
322            // Block the current thread.
323            let sel = cx.wait_until(deadline);
324
325            match sel {
326                Selected::Waiting => unreachable!(),
327                Selected::Aborted => {
328                    self.inner
329                        .lock()
330                        .unwrap()
331                        .receivers
332                        .unregister(oper)
333                        .unwrap();
334                    Err(RecvTimeoutError::Timeout)
335                }
336                Selected::Disconnected => {
337                    self.inner
338                        .lock()
339                        .unwrap()
340                        .receivers
341                        .unregister(oper)
342                        .unwrap();
343                    Err(RecvTimeoutError::Disconnected)
344                }
345                Selected::Operation(_) => {
346                    // Wait until the message is provided, then read it.
347                    packet.wait_ready();
348                    unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
349                }
350            }
351        })
352    }
353
354    /// Disconnects the channel and wakes up all blocked senders and receivers.
355    ///
356    /// Returns `true` if this call disconnected the channel.
357    pub(crate) fn disconnect(&self) -> bool {
358        let mut inner = self.inner.lock().unwrap();
359
360        if !inner.is_disconnected {
361            inner.is_disconnected = true;
362            inner.senders.disconnect();
363            inner.receivers.disconnect();
364            true
365        } else {
366            false
367        }
368    }
369
370    /// Returns the current number of messages inside the channel.
371    pub(crate) fn len(&self) -> usize {
372        0
373    }
374
375    /// Returns the capacity of the channel.
376    pub(crate) fn capacity(&self) -> Option<usize> {
377        Some(0)
378    }
379
380    /// Returns `true` if the channel is empty.
381    pub(crate) fn is_empty(&self) -> bool {
382        true
383    }
384
385    /// Returns `true` if the channel is full.
386    pub(crate) fn is_full(&self) -> bool {
387        true
388    }
389}
390
391/// Receiver handle to a channel.
392pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
393
394/// Sender handle to a channel.
395pub(crate) struct Sender<'a, T>(&'a Channel<T>);
396
397impl<T> SelectHandle for Receiver<'_, T> {
398    fn try_select(&self, token: &mut Token) -> bool {
399        self.0.start_recv(token)
400    }
401
402    fn deadline(&self) -> Option<Instant> {
403        None
404    }
405
406    fn register(&self, oper: Operation, cx: &Context) -> bool {
407        let packet = Box::into_raw(Packet::<T>::empty_on_heap());
408
409        let mut inner = self.0.inner.lock().unwrap();
410        inner
411            .receivers
412            .register_with_packet(oper, packet.cast::<()>(), cx);
413        inner.senders.notify();
414        inner.senders.can_select() || inner.is_disconnected
415    }
416
417    fn unregister(&self, oper: Operation) {
418        if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) {
419            unsafe {
420                drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
421            }
422        }
423    }
424
425    fn accept(&self, token: &mut Token, cx: &Context) -> bool {
426        token.zero.0 = cx.wait_packet();
427        true
428    }
429
430    fn is_ready(&self) -> bool {
431        let inner = self.0.inner.lock().unwrap();
432        inner.senders.can_select() || inner.is_disconnected
433    }
434
435    fn watch(&self, oper: Operation, cx: &Context) -> bool {
436        let mut inner = self.0.inner.lock().unwrap();
437        inner.receivers.watch(oper, cx);
438        inner.senders.can_select() || inner.is_disconnected
439    }
440
441    fn unwatch(&self, oper: Operation) {
442        let mut inner = self.0.inner.lock().unwrap();
443        inner.receivers.unwatch(oper);
444    }
445}
446
447impl<T> SelectHandle for Sender<'_, T> {
448    fn try_select(&self, token: &mut Token) -> bool {
449        self.0.start_send(token)
450    }
451
452    fn deadline(&self) -> Option<Instant> {
453        None
454    }
455
456    fn register(&self, oper: Operation, cx: &Context) -> bool {
457        let packet = Box::into_raw(Packet::<T>::empty_on_heap());
458
459        let mut inner = self.0.inner.lock().unwrap();
460        inner
461            .senders
462            .register_with_packet(oper, packet.cast::<()>(), cx);
463        inner.receivers.notify();
464        inner.receivers.can_select() || inner.is_disconnected
465    }
466
467    fn unregister(&self, oper: Operation) {
468        if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) {
469            unsafe {
470                drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
471            }
472        }
473    }
474
475    fn accept(&self, token: &mut Token, cx: &Context) -> bool {
476        token.zero.0 = cx.wait_packet();
477        true
478    }
479
480    fn is_ready(&self) -> bool {
481        let inner = self.0.inner.lock().unwrap();
482        inner.receivers.can_select() || inner.is_disconnected
483    }
484
485    fn watch(&self, oper: Operation, cx: &Context) -> bool {
486        let mut inner = self.0.inner.lock().unwrap();
487        inner.senders.watch(oper, cx);
488        inner.receivers.can_select() || inner.is_disconnected
489    }
490
491    fn unwatch(&self, oper: Operation) {
492        let mut inner = self.0.inner.lock().unwrap();
493        inner.senders.unwatch(oper);
494    }
495}