crossbeam_channel/
waker.rs

1//! Waking mechanism for threads blocked on channel operations.
2
3use std::ptr;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Mutex;
6use std::thread::{self, ThreadId};
7
8use crate::context::Context;
9use crate::select::{Operation, Selected};
10
11/// Represents a thread blocked on a specific channel operation.
12pub(crate) struct Entry {
13    /// The operation.
14    pub(crate) oper: Operation,
15
16    /// Optional packet.
17    pub(crate) packet: *mut (),
18
19    /// Context associated with the thread owning this operation.
20    pub(crate) cx: Context,
21}
22
23/// A queue of threads blocked on channel operations.
24///
25/// This data structure is used by threads to register blocking operations and get woken up once
26/// an operation becomes ready.
27pub(crate) struct Waker {
28    /// A list of select operations.
29    selectors: Vec<Entry>,
30
31    /// A list of operations waiting to be ready.
32    observers: Vec<Entry>,
33}
34
35impl Waker {
36    /// Creates a new `Waker`.
37    #[inline]
38    pub(crate) fn new() -> Self {
39        Waker {
40            selectors: Vec::new(),
41            observers: Vec::new(),
42        }
43    }
44
45    /// Registers a select operation.
46    #[inline]
47    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
48        self.register_with_packet(oper, ptr::null_mut(), cx);
49    }
50
51    /// Registers a select operation and a packet.
52    #[inline]
53    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: *mut (), cx: &Context) {
54        self.selectors.push(Entry {
55            oper,
56            packet,
57            cx: cx.clone(),
58        });
59    }
60
61    /// Unregisters a select operation.
62    #[inline]
63    pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
64        if let Some((i, _)) = self
65            .selectors
66            .iter()
67            .enumerate()
68            .find(|&(_, entry)| entry.oper == oper)
69        {
70            let entry = self.selectors.remove(i);
71            Some(entry)
72        } else {
73            None
74        }
75    }
76
77    /// Attempts to find another thread's entry, select the operation, and wake it up.
78    #[inline]
79    pub(crate) fn try_select(&mut self) -> Option<Entry> {
80        if self.selectors.is_empty() {
81            None
82        } else {
83            let thread_id = current_thread_id();
84
85            self.selectors
86                .iter()
87                .position(|selector| {
88                    // Does the entry belong to a different thread?
89                    selector.cx.thread_id() != thread_id
90                        && selector // Try selecting this operation.
91                            .cx
92                            .try_select(Selected::Operation(selector.oper))
93                            .is_ok()
94                        && {
95                            // Provide the packet.
96                            selector.cx.store_packet(selector.packet);
97                            // Wake the thread up.
98                            selector.cx.unpark();
99                            true
100                        }
101                })
102                // Remove the entry from the queue to keep it clean and improve
103                // performance.
104                .map(|pos| self.selectors.remove(pos))
105        }
106    }
107
108    /// Returns `true` if there is an entry which can be selected by the current thread.
109    #[inline]
110    pub(crate) fn can_select(&self) -> bool {
111        if self.selectors.is_empty() {
112            false
113        } else {
114            let thread_id = current_thread_id();
115
116            self.selectors.iter().any(|entry| {
117                entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
118            })
119        }
120    }
121
122    /// Registers an operation waiting to be ready.
123    #[inline]
124    pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
125        self.observers.push(Entry {
126            oper,
127            packet: ptr::null_mut(),
128            cx: cx.clone(),
129        });
130    }
131
132    /// Unregisters an operation waiting to be ready.
133    #[inline]
134    pub(crate) fn unwatch(&mut self, oper: Operation) {
135        self.observers.retain(|e| e.oper != oper);
136    }
137
138    /// Notifies all operations waiting to be ready.
139    #[inline]
140    pub(crate) fn notify(&mut self) {
141        for entry in self.observers.drain(..) {
142            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
143                entry.cx.unpark();
144            }
145        }
146    }
147
148    /// Notifies all registered operations that the channel is disconnected.
149    #[inline]
150    pub(crate) fn disconnect(&mut self) {
151        for entry in self.selectors.iter() {
152            if entry.cx.try_select(Selected::Disconnected).is_ok() {
153                // Wake the thread up.
154                //
155                // Here we don't remove the entry from the queue. Registered threads must
156                // unregister from the waker by themselves. They might also want to recover the
157                // packet value and destroy it, if necessary.
158                entry.cx.unpark();
159            }
160        }
161
162        self.notify();
163    }
164}
165
166impl Drop for Waker {
167    #[inline]
168    fn drop(&mut self) {
169        debug_assert_eq!(self.selectors.len(), 0);
170        debug_assert_eq!(self.observers.len(), 0);
171    }
172}
173
174/// A waker that can be shared among threads without locking.
175///
176/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
177pub(crate) struct SyncWaker {
178    /// The inner `Waker`.
179    inner: Mutex<Waker>,
180
181    /// `true` if the waker is empty.
182    is_empty: AtomicBool,
183}
184
185impl SyncWaker {
186    /// Creates a new `SyncWaker`.
187    #[inline]
188    pub(crate) fn new() -> Self {
189        SyncWaker {
190            inner: Mutex::new(Waker::new()),
191            is_empty: AtomicBool::new(true),
192        }
193    }
194
195    /// Registers the current thread with an operation.
196    #[inline]
197    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
198        let mut inner = self.inner.lock().unwrap();
199        inner.register(oper, cx);
200        self.is_empty.store(
201            inner.selectors.is_empty() && inner.observers.is_empty(),
202            Ordering::SeqCst,
203        );
204    }
205
206    /// Unregisters an operation previously registered by the current thread.
207    #[inline]
208    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
209        let mut inner = self.inner.lock().unwrap();
210        let entry = inner.unregister(oper);
211        self.is_empty.store(
212            inner.selectors.is_empty() && inner.observers.is_empty(),
213            Ordering::SeqCst,
214        );
215        entry
216    }
217
218    /// Attempts to find one thread (not the current one), select its operation, and wake it up.
219    #[inline]
220    pub(crate) fn notify(&self) {
221        if !self.is_empty.load(Ordering::SeqCst) {
222            let mut inner = self.inner.lock().unwrap();
223            if !self.is_empty.load(Ordering::SeqCst) {
224                inner.try_select();
225                inner.notify();
226                self.is_empty.store(
227                    inner.selectors.is_empty() && inner.observers.is_empty(),
228                    Ordering::SeqCst,
229                );
230            }
231        }
232    }
233
234    /// Registers an operation waiting to be ready.
235    #[inline]
236    pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
237        let mut inner = self.inner.lock().unwrap();
238        inner.watch(oper, cx);
239        self.is_empty.store(
240            inner.selectors.is_empty() && inner.observers.is_empty(),
241            Ordering::SeqCst,
242        );
243    }
244
245    /// Unregisters an operation waiting to be ready.
246    #[inline]
247    pub(crate) fn unwatch(&self, oper: Operation) {
248        let mut inner = self.inner.lock().unwrap();
249        inner.unwatch(oper);
250        self.is_empty.store(
251            inner.selectors.is_empty() && inner.observers.is_empty(),
252            Ordering::SeqCst,
253        );
254    }
255
256    /// Notifies all threads that the channel is disconnected.
257    #[inline]
258    pub(crate) fn disconnect(&self) {
259        let mut inner = self.inner.lock().unwrap();
260        inner.disconnect();
261        self.is_empty.store(
262            inner.selectors.is_empty() && inner.observers.is_empty(),
263            Ordering::SeqCst,
264        );
265    }
266}
267
268impl Drop for SyncWaker {
269    #[inline]
270    fn drop(&mut self) {
271        debug_assert!(self.is_empty.load(Ordering::SeqCst));
272    }
273}
274
275/// Returns the id of the current thread.
276#[inline]
277fn current_thread_id() -> ThreadId {
278    thread_local! {
279        /// Cached thread-local id.
280        static THREAD_ID: ThreadId = thread::current().id();
281    }
282
283    THREAD_ID
284        .try_with(|id| *id)
285        .unwrap_or_else(|_| thread::current().id())
286}