crossbeam_channel/
select.rs

1//! Interface to the select mechanism.
2
3use std::fmt;
4use std::marker::PhantomData;
5use std::mem;
6use std::time::{Duration, Instant};
7
8use crossbeam_utils::Backoff;
9
10use crate::channel::{self, Receiver, Sender};
11use crate::context::Context;
12use crate::err::{ReadyTimeoutError, TryReadyError};
13use crate::err::{RecvError, SendError};
14use crate::err::{SelectTimeoutError, TrySelectError};
15use crate::flavors;
16use crate::utils;
17
18/// Temporary data that gets initialized during select or a blocking operation, and is consumed by
19/// `read` or `write`.
20///
21/// Each field contains data associated with a specific channel flavor.
22// This is a private API that is used by the select macro.
23#[derive(Debug, Default)]
24pub struct Token {
25    pub(crate) at: flavors::at::AtToken,
26    pub(crate) array: flavors::array::ArrayToken,
27    pub(crate) list: flavors::list::ListToken,
28    #[allow(dead_code)]
29    pub(crate) never: flavors::never::NeverToken,
30    pub(crate) tick: flavors::tick::TickToken,
31    pub(crate) zero: flavors::zero::ZeroToken,
32}
33
34/// Identifier associated with an operation by a specific thread on a specific channel.
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub struct Operation(usize);
37
38impl Operation {
39    /// Creates an operation identifier from a mutable reference.
40    ///
41    /// This function essentially just turns the address of the reference into a number. The
42    /// reference should point to a variable that is specific to the thread and the operation,
43    /// and is alive for the entire duration of select or blocking operation.
44    #[inline]
45    pub fn hook<T>(r: &mut T) -> Operation {
46        let val = r as *mut T as usize;
47        // Make sure that the pointer address doesn't equal the numerical representation of
48        // `Selected::{Waiting, Aborted, Disconnected}`.
49        assert!(val > 2);
50        Operation(val)
51    }
52}
53
54/// Current state of a select or a blocking operation.
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum Selected {
57    /// Still waiting for an operation.
58    Waiting,
59
60    /// The attempt to block the current thread has been aborted.
61    Aborted,
62
63    /// An operation became ready because a channel is disconnected.
64    Disconnected,
65
66    /// An operation became ready because a message can be sent or received.
67    Operation(Operation),
68}
69
70impl From<usize> for Selected {
71    #[inline]
72    fn from(val: usize) -> Selected {
73        match val {
74            0 => Selected::Waiting,
75            1 => Selected::Aborted,
76            2 => Selected::Disconnected,
77            oper => Selected::Operation(Operation(oper)),
78        }
79    }
80}
81
82impl Into<usize> for Selected {
83    #[inline]
84    fn into(self) -> usize {
85        match self {
86            Selected::Waiting => 0,
87            Selected::Aborted => 1,
88            Selected::Disconnected => 2,
89            Selected::Operation(Operation(val)) => val,
90        }
91    }
92}
93
94/// A receiver or a sender that can participate in select.
95///
96/// This is a handle that assists select in executing an operation, registration, deciding on the
97/// appropriate deadline for blocking, etc.
98// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
99pub trait SelectHandle {
100    /// Attempts to select an operation and returns `true` on success.
101    fn try_select(&self, token: &mut Token) -> bool;
102
103    /// Returns a deadline for an operation, if there is one.
104    fn deadline(&self) -> Option<Instant>;
105
106    /// Registers an operation for execution and returns `true` if it is now ready.
107    fn register(&self, oper: Operation, cx: &Context) -> bool;
108
109    /// Unregisters an operation for execution.
110    fn unregister(&self, oper: Operation);
111
112    /// Attempts to select an operation the thread got woken up for and returns `true` on success.
113    fn accept(&self, token: &mut Token, cx: &Context) -> bool;
114
115    /// Returns `true` if an operation can be executed without blocking.
116    fn is_ready(&self) -> bool;
117
118    /// Registers an operation for readiness notification and returns `true` if it is now ready.
119    fn watch(&self, oper: Operation, cx: &Context) -> bool;
120
121    /// Unregisters an operation for readiness notification.
122    fn unwatch(&self, oper: Operation);
123}
124
125impl<T: SelectHandle> SelectHandle for &T {
126    fn try_select(&self, token: &mut Token) -> bool {
127        (**self).try_select(token)
128    }
129
130    fn deadline(&self) -> Option<Instant> {
131        (**self).deadline()
132    }
133
134    fn register(&self, oper: Operation, cx: &Context) -> bool {
135        (**self).register(oper, cx)
136    }
137
138    fn unregister(&self, oper: Operation) {
139        (**self).unregister(oper);
140    }
141
142    fn accept(&self, token: &mut Token, cx: &Context) -> bool {
143        (**self).accept(token, cx)
144    }
145
146    fn is_ready(&self) -> bool {
147        (**self).is_ready()
148    }
149
150    fn watch(&self, oper: Operation, cx: &Context) -> bool {
151        (**self).watch(oper, cx)
152    }
153
154    fn unwatch(&self, oper: Operation) {
155        (**self).unwatch(oper)
156    }
157}
158
159/// Determines when a select operation should time out.
160#[derive(Clone, Copy, Eq, PartialEq)]
161enum Timeout {
162    /// No blocking.
163    Now,
164
165    /// Block forever.
166    Never,
167
168    /// Time out after the time instant.
169    At(Instant),
170}
171
172/// Runs until one of the operations is selected, potentially blocking the current thread.
173///
174/// Successful receive operations will have to be followed up by `channel::read()` and successful
175/// send operations by `channel::write()`.
176fn run_select(
177    handles: &mut [(&dyn SelectHandle, usize, *const u8)],
178    timeout: Timeout,
179) -> Option<(Token, usize, *const u8)> {
180    if handles.is_empty() {
181        // Wait until the timeout and return.
182        match timeout {
183            Timeout::Now => return None,
184            Timeout::Never => {
185                utils::sleep_until(None);
186                unreachable!();
187            }
188            Timeout::At(when) => {
189                utils::sleep_until(Some(when));
190                return None;
191            }
192        }
193    }
194
195    // Shuffle the operations for fairness.
196    utils::shuffle(handles);
197
198    // Create a token, which serves as a temporary variable that gets initialized in this function
199    // and is later used by a call to `channel::read()` or `channel::write()` that completes the
200    // selected operation.
201    let mut token = Token::default();
202
203    // Try selecting one of the operations without blocking.
204    for &(handle, i, ptr) in handles.iter() {
205        if handle.try_select(&mut token) {
206            return Some((token, i, ptr));
207        }
208    }
209
210    loop {
211        // Prepare for blocking.
212        let res = Context::with(|cx| {
213            let mut sel = Selected::Waiting;
214            let mut registered_count = 0;
215            let mut index_ready = None;
216
217            if let Timeout::Now = timeout {
218                cx.try_select(Selected::Aborted).unwrap();
219            }
220
221            // Register all operations.
222            for (handle, i, _) in handles.iter_mut() {
223                registered_count += 1;
224
225                // If registration returns `false`, that means the operation has just become ready.
226                if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
227                    // Try aborting select.
228                    sel = match cx.try_select(Selected::Aborted) {
229                        Ok(()) => {
230                            index_ready = Some(*i);
231                            Selected::Aborted
232                        }
233                        Err(s) => s,
234                    };
235                    break;
236                }
237
238                // If another thread has already selected one of the operations, stop registration.
239                sel = cx.selected();
240                if sel != Selected::Waiting {
241                    break;
242                }
243            }
244
245            if sel == Selected::Waiting {
246                // Check with each operation for how long we're allowed to block, and compute the
247                // earliest deadline.
248                let mut deadline: Option<Instant> = match timeout {
249                    Timeout::Now => return None,
250                    Timeout::Never => None,
251                    Timeout::At(when) => Some(when),
252                };
253                for &(handle, _, _) in handles.iter() {
254                    if let Some(x) = handle.deadline() {
255                        deadline = deadline.map(|y| x.min(y)).or(Some(x));
256                    }
257                }
258
259                // Block the current thread.
260                sel = cx.wait_until(deadline);
261            }
262
263            // Unregister all registered operations.
264            for (handle, _, _) in handles.iter_mut().take(registered_count) {
265                handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
266            }
267
268            match sel {
269                Selected::Waiting => unreachable!(),
270                Selected::Aborted => {
271                    // If an operation became ready during registration, try selecting it.
272                    if let Some(index_ready) = index_ready {
273                        for &(handle, i, ptr) in handles.iter() {
274                            if i == index_ready && handle.try_select(&mut token) {
275                                return Some((i, ptr));
276                            }
277                        }
278                    }
279                }
280                Selected::Disconnected => {}
281                Selected::Operation(_) => {
282                    // Find the selected operation.
283                    for (handle, i, ptr) in handles.iter_mut() {
284                        // Is this the selected operation?
285                        if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
286                        {
287                            // Try selecting this operation.
288                            if handle.accept(&mut token, cx) {
289                                return Some((*i, *ptr));
290                            }
291                        }
292                    }
293                }
294            }
295
296            None
297        });
298
299        // Return if an operation was selected.
300        if let Some((i, ptr)) = res {
301            return Some((token, i, ptr));
302        }
303
304        // Try selecting one of the operations without blocking.
305        for &(handle, i, ptr) in handles.iter() {
306            if handle.try_select(&mut token) {
307                return Some((token, i, ptr));
308            }
309        }
310
311        match timeout {
312            Timeout::Now => return None,
313            Timeout::Never => {}
314            Timeout::At(when) => {
315                if Instant::now() >= when {
316                    return None;
317                }
318            }
319        }
320    }
321}
322
323/// Runs until one of the operations becomes ready, potentially blocking the current thread.
324fn run_ready(
325    handles: &mut [(&dyn SelectHandle, usize, *const u8)],
326    timeout: Timeout,
327) -> Option<usize> {
328    if handles.is_empty() {
329        // Wait until the timeout and return.
330        match timeout {
331            Timeout::Now => return None,
332            Timeout::Never => {
333                utils::sleep_until(None);
334                unreachable!();
335            }
336            Timeout::At(when) => {
337                utils::sleep_until(Some(when));
338                return None;
339            }
340        }
341    }
342
343    // Shuffle the operations for fairness.
344    utils::shuffle(handles);
345
346    loop {
347        let backoff = Backoff::new();
348        loop {
349            // Check operations for readiness.
350            for &(handle, i, _) in handles.iter() {
351                if handle.is_ready() {
352                    return Some(i);
353                }
354            }
355
356            if backoff.is_completed() {
357                break;
358            } else {
359                backoff.snooze();
360            }
361        }
362
363        // Check for timeout.
364        match timeout {
365            Timeout::Now => return None,
366            Timeout::Never => {}
367            Timeout::At(when) => {
368                if Instant::now() >= when {
369                    return None;
370                }
371            }
372        }
373
374        // Prepare for blocking.
375        let res = Context::with(|cx| {
376            let mut sel = Selected::Waiting;
377            let mut registered_count = 0;
378
379            // Begin watching all operations.
380            for (handle, _, _) in handles.iter_mut() {
381                registered_count += 1;
382                let oper = Operation::hook::<&dyn SelectHandle>(handle);
383
384                // If registration returns `false`, that means the operation has just become ready.
385                if handle.watch(oper, cx) {
386                    sel = match cx.try_select(Selected::Operation(oper)) {
387                        Ok(()) => Selected::Operation(oper),
388                        Err(s) => s,
389                    };
390                    break;
391                }
392
393                // If another thread has already chosen one of the operations, stop registration.
394                sel = cx.selected();
395                if sel != Selected::Waiting {
396                    break;
397                }
398            }
399
400            if sel == Selected::Waiting {
401                // Check with each operation for how long we're allowed to block, and compute the
402                // earliest deadline.
403                let mut deadline: Option<Instant> = match timeout {
404                    Timeout::Now => unreachable!(),
405                    Timeout::Never => None,
406                    Timeout::At(when) => Some(when),
407                };
408                for &(handle, _, _) in handles.iter() {
409                    if let Some(x) = handle.deadline() {
410                        deadline = deadline.map(|y| x.min(y)).or(Some(x));
411                    }
412                }
413
414                // Block the current thread.
415                sel = cx.wait_until(deadline);
416            }
417
418            // Unwatch all operations.
419            for (handle, _, _) in handles.iter_mut().take(registered_count) {
420                handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
421            }
422
423            match sel {
424                Selected::Waiting => unreachable!(),
425                Selected::Aborted => {}
426                Selected::Disconnected => {}
427                Selected::Operation(_) => {
428                    for (handle, i, _) in handles.iter_mut() {
429                        let oper = Operation::hook::<&dyn SelectHandle>(handle);
430                        if sel == Selected::Operation(oper) {
431                            return Some(*i);
432                        }
433                    }
434                }
435            }
436
437            None
438        });
439
440        // Return if an operation became ready.
441        if res.is_some() {
442            return res;
443        }
444    }
445}
446
447/// Attempts to select one of the operations without blocking.
448// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
449#[inline]
450pub fn try_select<'a>(
451    handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
452) -> Result<SelectedOperation<'a>, TrySelectError> {
453    match run_select(handles, Timeout::Now) {
454        None => Err(TrySelectError),
455        Some((token, index, ptr)) => Ok(SelectedOperation {
456            token,
457            index,
458            ptr,
459            _marker: PhantomData,
460        }),
461    }
462}
463
464/// Blocks until one of the operations becomes ready and selects it.
465// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
466#[inline]
467pub fn select<'a>(
468    handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
469) -> SelectedOperation<'a> {
470    if handles.is_empty() {
471        panic!("no operations have been added to `Select`");
472    }
473
474    let (token, index, ptr) = run_select(handles, Timeout::Never).unwrap();
475    SelectedOperation {
476        token,
477        index,
478        ptr,
479        _marker: PhantomData,
480    }
481}
482
483/// Blocks for a limited time until one of the operations becomes ready and selects it.
484// This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
485#[inline]
486pub fn select_timeout<'a>(
487    handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
488    timeout: Duration,
489) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
490    match Instant::now().checked_add(timeout) {
491        Some(deadline) => select_deadline(handles, deadline),
492        None => Ok(select(handles)),
493    }
494}
495
496/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
497#[inline]
498pub(crate) fn select_deadline<'a>(
499    handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
500    deadline: Instant,
501) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
502    match run_select(handles, Timeout::At(deadline)) {
503        None => Err(SelectTimeoutError),
504        Some((token, index, ptr)) => Ok(SelectedOperation {
505            token,
506            index,
507            ptr,
508            _marker: PhantomData,
509        }),
510    }
511}
512
513/// Selects from a set of channel operations.
514///
515/// `Select` allows you to define a set of channel operations, wait until any one of them becomes
516/// ready, and finally execute it. If multiple operations are ready at the same time, a random one
517/// among them is selected.
518///
519/// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
520/// when it will simply return an error because the channel is disconnected.
521///
522/// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
523/// dynamically created list of channel operations.
524///
525/// [`select!`]: crate::select!
526///
527/// Once a list of operations has been built with `Select`, there are two different ways of
528/// proceeding:
529///
530/// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
531///   the returned selected operation has already begun and **must** be completed. If we don't
532///   complete it, a panic will occur.
533///
534/// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
535///   successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
536///   possible for another thread to make the operation not ready just before we try executing it,
537///   so it's wise to use a retry loop. However, note that these methods might return with success
538///   spuriously, so it's a good idea to always double check if the operation is really ready.
539///
540/// # Examples
541///
542/// Use [`select`] to receive a message from a list of receivers:
543///
544/// ```
545/// use crossbeam_channel::{Receiver, RecvError, Select};
546///
547/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
548///     // Build a list of operations.
549///     let mut sel = Select::new();
550///     for r in rs {
551///         sel.recv(r);
552///     }
553///
554///     // Complete the selected operation.
555///     let oper = sel.select();
556///     let index = oper.index();
557///     oper.recv(&rs[index])
558/// }
559/// ```
560///
561/// Use [`ready`] to receive a message from a list of receivers:
562///
563/// ```
564/// use crossbeam_channel::{Receiver, RecvError, Select};
565///
566/// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
567///     // Build a list of operations.
568///     let mut sel = Select::new();
569///     for r in rs {
570///         sel.recv(r);
571///     }
572///
573///     loop {
574///         // Wait until a receive operation becomes ready and try executing it.
575///         let index = sel.ready();
576///         let res = rs[index].try_recv();
577///
578///         // If the operation turns out not to be ready, retry.
579///         if let Err(e) = res {
580///             if e.is_empty() {
581///                 continue;
582///             }
583///         }
584///
585///         // Success!
586///         return res.map_err(|_| RecvError);
587///     }
588/// }
589/// ```
590///
591/// [`try_select`]: Select::try_select
592/// [`select`]: Select::select
593/// [`select_timeout`]: Select::select_timeout
594/// [`try_ready`]: Select::try_ready
595/// [`ready`]: Select::ready
596/// [`ready_timeout`]: Select::ready_timeout
597pub struct Select<'a> {
598    /// A list of senders and receivers participating in selection.
599    handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
600
601    /// The next index to assign to an operation.
602    next_index: usize,
603}
604
605unsafe impl Send for Select<'_> {}
606unsafe impl Sync for Select<'_> {}
607
608impl<'a> Select<'a> {
609    /// Creates an empty list of channel operations for selection.
610    ///
611    /// # Examples
612    ///
613    /// ```
614    /// use crossbeam_channel::Select;
615    ///
616    /// let mut sel = Select::new();
617    ///
618    /// // The list of operations is empty, which means no operation can be selected.
619    /// assert!(sel.try_select().is_err());
620    /// ```
621    pub fn new() -> Select<'a> {
622        Select {
623            handles: Vec::with_capacity(4),
624            next_index: 0,
625        }
626    }
627
628    /// Adds a send operation.
629    ///
630    /// Returns the index of the added operation.
631    ///
632    /// # Examples
633    ///
634    /// ```
635    /// use crossbeam_channel::{unbounded, Select};
636    ///
637    /// let (s, r) = unbounded::<i32>();
638    ///
639    /// let mut sel = Select::new();
640    /// let index = sel.send(&s);
641    /// ```
642    pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
643        let i = self.next_index;
644        let ptr = s as *const Sender<_> as *const u8;
645        self.handles.push((s, i, ptr));
646        self.next_index += 1;
647        i
648    }
649
650    /// Adds a receive operation.
651    ///
652    /// Returns the index of the added operation.
653    ///
654    /// # Examples
655    ///
656    /// ```
657    /// use crossbeam_channel::{unbounded, Select};
658    ///
659    /// let (s, r) = unbounded::<i32>();
660    ///
661    /// let mut sel = Select::new();
662    /// let index = sel.recv(&r);
663    /// ```
664    pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
665        let i = self.next_index;
666        let ptr = r as *const Receiver<_> as *const u8;
667        self.handles.push((r, i, ptr));
668        self.next_index += 1;
669        i
670    }
671
672    /// Removes a previously added operation.
673    ///
674    /// This is useful when an operation is selected because the channel got disconnected and we
675    /// want to try again to select a different operation instead.
676    ///
677    /// If new operations are added after removing some, the indices of removed operations will not
678    /// be reused.
679    ///
680    /// # Panics
681    ///
682    /// An attempt to remove a non-existing or already removed operation will panic.
683    ///
684    /// # Examples
685    ///
686    /// ```
687    /// use crossbeam_channel::{unbounded, Select};
688    ///
689    /// let (s1, r1) = unbounded::<i32>();
690    /// let (_, r2) = unbounded::<i32>();
691    ///
692    /// let mut sel = Select::new();
693    /// let oper1 = sel.recv(&r1);
694    /// let oper2 = sel.recv(&r2);
695    ///
696    /// // Both operations are initially ready, so a random one will be executed.
697    /// let oper = sel.select();
698    /// assert_eq!(oper.index(), oper2);
699    /// assert!(oper.recv(&r2).is_err());
700    /// sel.remove(oper2);
701    ///
702    /// s1.send(10).unwrap();
703    ///
704    /// let oper = sel.select();
705    /// assert_eq!(oper.index(), oper1);
706    /// assert_eq!(oper.recv(&r1), Ok(10));
707    /// ```
708    pub fn remove(&mut self, index: usize) {
709        assert!(
710            index < self.next_index,
711            "index out of bounds; {} >= {}",
712            index,
713            self.next_index,
714        );
715
716        let i = self
717            .handles
718            .iter()
719            .enumerate()
720            .find(|(_, (_, i, _))| *i == index)
721            .expect("no operation with this index")
722            .0;
723
724        self.handles.swap_remove(i);
725    }
726
727    /// Attempts to select one of the operations without blocking.
728    ///
729    /// If an operation is ready, it is selected and returned. If multiple operations are ready at
730    /// the same time, a random one among them is selected. If none of the operations are ready, an
731    /// error is returned.
732    ///
733    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
734    /// even when it will simply return an error because the channel is disconnected.
735    ///
736    /// The selected operation must be completed with [`SelectedOperation::send`]
737    /// or [`SelectedOperation::recv`].
738    ///
739    /// # Examples
740    ///
741    /// ```
742    /// use crossbeam_channel::{unbounded, Select};
743    ///
744    /// let (s1, r1) = unbounded();
745    /// let (s2, r2) = unbounded();
746    ///
747    /// s1.send(10).unwrap();
748    /// s2.send(20).unwrap();
749    ///
750    /// let mut sel = Select::new();
751    /// let oper1 = sel.recv(&r1);
752    /// let oper2 = sel.recv(&r2);
753    ///
754    /// // Both operations are initially ready, so a random one will be executed.
755    /// let oper = sel.try_select();
756    /// match oper {
757    ///     Err(_) => panic!("both operations should be ready"),
758    ///     Ok(oper) => match oper.index() {
759    ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
760    ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
761    ///         _ => unreachable!(),
762    ///     }
763    /// }
764    /// ```
765    pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
766        try_select(&mut self.handles)
767    }
768
769    /// Blocks until one of the operations becomes ready and selects it.
770    ///
771    /// Once an operation becomes ready, it is selected and returned. If multiple operations are
772    /// ready at the same time, a random one among them is selected.
773    ///
774    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
775    /// even when it will simply return an error because the channel is disconnected.
776    ///
777    /// The selected operation must be completed with [`SelectedOperation::send`]
778    /// or [`SelectedOperation::recv`].
779    ///
780    /// # Panics
781    ///
782    /// Panics if no operations have been added to `Select`.
783    ///
784    /// # Examples
785    ///
786    /// ```
787    /// use std::thread;
788    /// use std::time::Duration;
789    /// use crossbeam_channel::{unbounded, Select};
790    ///
791    /// let (s1, r1) = unbounded();
792    /// let (s2, r2) = unbounded();
793    ///
794    /// thread::spawn(move || {
795    ///     thread::sleep(Duration::from_secs(1));
796    ///     s1.send(10).unwrap();
797    /// });
798    /// thread::spawn(move || s2.send(20).unwrap());
799    ///
800    /// let mut sel = Select::new();
801    /// let oper1 = sel.recv(&r1);
802    /// let oper2 = sel.recv(&r2);
803    ///
804    /// // The second operation will be selected because it becomes ready first.
805    /// let oper = sel.select();
806    /// match oper.index() {
807    ///     i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
808    ///     i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
809    ///     _ => unreachable!(),
810    /// }
811    /// ```
812    pub fn select(&mut self) -> SelectedOperation<'a> {
813        select(&mut self.handles)
814    }
815
816    /// Blocks for a limited time until one of the operations becomes ready and selects it.
817    ///
818    /// If an operation becomes ready, it is selected and returned. If multiple operations are
819    /// ready at the same time, a random one among them is selected. If none of the operations
820    /// become ready for the specified duration, an error is returned.
821    ///
822    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
823    /// even when it will simply return an error because the channel is disconnected.
824    ///
825    /// The selected operation must be completed with [`SelectedOperation::send`]
826    /// or [`SelectedOperation::recv`].
827    ///
828    /// # Examples
829    ///
830    /// ```
831    /// use std::thread;
832    /// use std::time::Duration;
833    /// use crossbeam_channel::{unbounded, Select};
834    ///
835    /// let (s1, r1) = unbounded();
836    /// let (s2, r2) = unbounded();
837    ///
838    /// thread::spawn(move || {
839    ///     thread::sleep(Duration::from_secs(1));
840    ///     s1.send(10).unwrap();
841    /// });
842    /// thread::spawn(move || s2.send(20).unwrap());
843    ///
844    /// let mut sel = Select::new();
845    /// let oper1 = sel.recv(&r1);
846    /// let oper2 = sel.recv(&r2);
847    ///
848    /// // The second operation will be selected because it becomes ready first.
849    /// let oper = sel.select_timeout(Duration::from_millis(500));
850    /// match oper {
851    ///     Err(_) => panic!("should not have timed out"),
852    ///     Ok(oper) => match oper.index() {
853    ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
854    ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
855    ///         _ => unreachable!(),
856    ///     }
857    /// }
858    /// ```
859    pub fn select_timeout(
860        &mut self,
861        timeout: Duration,
862    ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
863        select_timeout(&mut self.handles, timeout)
864    }
865
866    /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
867    ///
868    /// If an operation becomes ready, it is selected and returned. If multiple operations are
869    /// ready at the same time, a random one among them is selected. If none of the operations
870    /// become ready before the given deadline, an error is returned.
871    ///
872    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
873    /// even when it will simply return an error because the channel is disconnected.
874    ///
875    /// The selected operation must be completed with [`SelectedOperation::send`]
876    /// or [`SelectedOperation::recv`].
877    ///
878    /// # Examples
879    ///
880    /// ```
881    /// use std::thread;
882    /// use std::time::{Instant, Duration};
883    /// use crossbeam_channel::{unbounded, Select};
884    ///
885    /// let (s1, r1) = unbounded();
886    /// let (s2, r2) = unbounded();
887    ///
888    /// thread::spawn(move || {
889    ///     thread::sleep(Duration::from_secs(1));
890    ///     s1.send(10).unwrap();
891    /// });
892    /// thread::spawn(move || s2.send(20).unwrap());
893    ///
894    /// let mut sel = Select::new();
895    /// let oper1 = sel.recv(&r1);
896    /// let oper2 = sel.recv(&r2);
897    ///
898    /// let deadline = Instant::now() + Duration::from_millis(500);
899    ///
900    /// // The second operation will be selected because it becomes ready first.
901    /// let oper = sel.select_deadline(deadline);
902    /// match oper {
903    ///     Err(_) => panic!("should not have timed out"),
904    ///     Ok(oper) => match oper.index() {
905    ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
906    ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
907    ///         _ => unreachable!(),
908    ///     }
909    /// }
910    /// ```
911    pub fn select_deadline(
912        &mut self,
913        deadline: Instant,
914    ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
915        select_deadline(&mut self.handles, deadline)
916    }
917
918    /// Attempts to find a ready operation without blocking.
919    ///
920    /// If an operation is ready, its index is returned. If multiple operations are ready at the
921    /// same time, a random one among them is chosen. If none of the operations are ready, an error
922    /// is returned.
923    ///
924    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
925    /// even when it will simply return an error because the channel is disconnected.
926    ///
927    /// Note that this method might return with success spuriously, so it's a good idea to always
928    /// double check if the operation is really ready.
929    ///
930    /// # Examples
931    ///
932    /// ```
933    /// use crossbeam_channel::{unbounded, Select};
934    ///
935    /// let (s1, r1) = unbounded();
936    /// let (s2, r2) = unbounded();
937    ///
938    /// s1.send(10).unwrap();
939    /// s2.send(20).unwrap();
940    ///
941    /// let mut sel = Select::new();
942    /// let oper1 = sel.recv(&r1);
943    /// let oper2 = sel.recv(&r2);
944    ///
945    /// // Both operations are initially ready, so a random one will be chosen.
946    /// match sel.try_ready() {
947    ///     Err(_) => panic!("both operations should be ready"),
948    ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
949    ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
950    ///     Ok(_) => unreachable!(),
951    /// }
952    /// ```
953    pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
954        match run_ready(&mut self.handles, Timeout::Now) {
955            None => Err(TryReadyError),
956            Some(index) => Ok(index),
957        }
958    }
959
960    /// Blocks until one of the operations becomes ready.
961    ///
962    /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
963    /// the same time, a random one among them is chosen.
964    ///
965    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
966    /// even when it will simply return an error because the channel is disconnected.
967    ///
968    /// Note that this method might return with success spuriously, so it's a good idea to always
969    /// double check if the operation is really ready.
970    ///
971    /// # Panics
972    ///
973    /// Panics if no operations have been added to `Select`.
974    ///
975    /// # Examples
976    ///
977    /// ```
978    /// use std::thread;
979    /// use std::time::Duration;
980    /// use crossbeam_channel::{unbounded, Select};
981    ///
982    /// let (s1, r1) = unbounded();
983    /// let (s2, r2) = unbounded();
984    ///
985    /// thread::spawn(move || {
986    ///     thread::sleep(Duration::from_secs(1));
987    ///     s1.send(10).unwrap();
988    /// });
989    /// thread::spawn(move || s2.send(20).unwrap());
990    ///
991    /// let mut sel = Select::new();
992    /// let oper1 = sel.recv(&r1);
993    /// let oper2 = sel.recv(&r2);
994    ///
995    /// // The second operation will be selected because it becomes ready first.
996    /// match sel.ready() {
997    ///     i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
998    ///     i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
999    ///     _ => unreachable!(),
1000    /// }
1001    /// ```
1002    pub fn ready(&mut self) -> usize {
1003        if self.handles.is_empty() {
1004            panic!("no operations have been added to `Select`");
1005        }
1006
1007        run_ready(&mut self.handles, Timeout::Never).unwrap()
1008    }
1009
1010    /// Blocks for a limited time until one of the operations becomes ready.
1011    ///
1012    /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1013    /// the same time, a random one among them is chosen. If none of the operations become ready
1014    /// for the specified duration, an error is returned.
1015    ///
1016    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1017    /// even when it will simply return an error because the channel is disconnected.
1018    ///
1019    /// Note that this method might return with success spuriously, so it's a good idea to double
1020    /// check if the operation is really ready.
1021    ///
1022    /// # Examples
1023    ///
1024    /// ```
1025    /// use std::thread;
1026    /// use std::time::Duration;
1027    /// use crossbeam_channel::{unbounded, Select};
1028    ///
1029    /// let (s1, r1) = unbounded();
1030    /// let (s2, r2) = unbounded();
1031    ///
1032    /// thread::spawn(move || {
1033    ///     thread::sleep(Duration::from_secs(1));
1034    ///     s1.send(10).unwrap();
1035    /// });
1036    /// thread::spawn(move || s2.send(20).unwrap());
1037    ///
1038    /// let mut sel = Select::new();
1039    /// let oper1 = sel.recv(&r1);
1040    /// let oper2 = sel.recv(&r2);
1041    ///
1042    /// // The second operation will be selected because it becomes ready first.
1043    /// match sel.ready_timeout(Duration::from_millis(500)) {
1044    ///     Err(_) => panic!("should not have timed out"),
1045    ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1046    ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1047    ///     Ok(_) => unreachable!(),
1048    /// }
1049    /// ```
1050    pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1051        match Instant::now().checked_add(timeout) {
1052            Some(deadline) => self.ready_deadline(deadline),
1053            None => Ok(self.ready()),
1054        }
1055    }
1056
1057    /// Blocks until a given deadline, or until one of the operations becomes ready.
1058    ///
1059    /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1060    /// the same time, a random one among them is chosen. If none of the operations become ready
1061    /// before the deadline, an error is returned.
1062    ///
1063    /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1064    /// even when it will simply return an error because the channel is disconnected.
1065    ///
1066    /// Note that this method might return with success spuriously, so it's a good idea to double
1067    /// check if the operation is really ready.
1068    ///
1069    /// # Examples
1070    ///
1071    /// ```
1072    /// use std::thread;
1073    /// use std::time::{Duration, Instant};
1074    /// use crossbeam_channel::{unbounded, Select};
1075    ///
1076    /// let deadline = Instant::now() + Duration::from_millis(500);
1077    ///
1078    /// let (s1, r1) = unbounded();
1079    /// let (s2, r2) = unbounded();
1080    ///
1081    /// thread::spawn(move || {
1082    ///     thread::sleep(Duration::from_secs(1));
1083    ///     s1.send(10).unwrap();
1084    /// });
1085    /// thread::spawn(move || s2.send(20).unwrap());
1086    ///
1087    /// let mut sel = Select::new();
1088    /// let oper1 = sel.recv(&r1);
1089    /// let oper2 = sel.recv(&r2);
1090    ///
1091    /// // The second operation will be selected because it becomes ready first.
1092    /// match sel.ready_deadline(deadline) {
1093    ///     Err(_) => panic!("should not have timed out"),
1094    ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1095    ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1096    ///     Ok(_) => unreachable!(),
1097    /// }
1098    /// ```
1099    pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1100        match run_ready(&mut self.handles, Timeout::At(deadline)) {
1101            None => Err(ReadyTimeoutError),
1102            Some(index) => Ok(index),
1103        }
1104    }
1105}
1106
1107impl<'a> Clone for Select<'a> {
1108    fn clone(&self) -> Select<'a> {
1109        Select {
1110            handles: self.handles.clone(),
1111            next_index: self.next_index,
1112        }
1113    }
1114}
1115
1116impl<'a> Default for Select<'a> {
1117    fn default() -> Select<'a> {
1118        Select::new()
1119    }
1120}
1121
1122impl fmt::Debug for Select<'_> {
1123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1124        f.pad("Select { .. }")
1125    }
1126}
1127
1128/// A selected operation that needs to be completed.
1129///
1130/// To complete the operation, call [`send`] or [`recv`].
1131///
1132/// # Panics
1133///
1134/// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1135/// `SelectedOperation` is dropped without completion, a panic occurs.
1136///
1137/// [`send`]: SelectedOperation::send
1138/// [`recv`]: SelectedOperation::recv
1139#[must_use]
1140pub struct SelectedOperation<'a> {
1141    /// Token needed to complete the operation.
1142    token: Token,
1143
1144    /// The index of the selected operation.
1145    index: usize,
1146
1147    /// The address of the selected `Sender` or `Receiver`.
1148    ptr: *const u8,
1149
1150    /// Indicates that `Sender`s and `Receiver`s are borrowed.
1151    _marker: PhantomData<&'a ()>,
1152}
1153
1154impl SelectedOperation<'_> {
1155    /// Returns the index of the selected operation.
1156    ///
1157    /// # Examples
1158    ///
1159    /// ```
1160    /// use crossbeam_channel::{bounded, Select};
1161    ///
1162    /// let (s1, r1) = bounded::<()>(0);
1163    /// let (s2, r2) = bounded::<()>(0);
1164    /// let (s3, r3) = bounded::<()>(1);
1165    ///
1166    /// let mut sel = Select::new();
1167    /// let oper1 = sel.send(&s1);
1168    /// let oper2 = sel.recv(&r2);
1169    /// let oper3 = sel.send(&s3);
1170    ///
1171    /// // Only the last operation is ready.
1172    /// let oper = sel.select();
1173    /// assert_eq!(oper.index(), 2);
1174    /// assert_eq!(oper.index(), oper3);
1175    ///
1176    /// // Complete the operation.
1177    /// oper.send(&s3, ()).unwrap();
1178    /// ```
1179    pub fn index(&self) -> usize {
1180        self.index
1181    }
1182
1183    /// Completes the send operation.
1184    ///
1185    /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1186    /// when the operation was added.
1187    ///
1188    /// # Panics
1189    ///
1190    /// Panics if an incorrect [`Sender`] reference is passed.
1191    ///
1192    /// # Examples
1193    ///
1194    /// ```
1195    /// use crossbeam_channel::{bounded, Select, SendError};
1196    ///
1197    /// let (s, r) = bounded::<i32>(0);
1198    /// drop(r);
1199    ///
1200    /// let mut sel = Select::new();
1201    /// let oper1 = sel.send(&s);
1202    ///
1203    /// let oper = sel.select();
1204    /// assert_eq!(oper.index(), oper1);
1205    /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1206    /// ```
1207    pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1208        assert!(
1209            s as *const Sender<T> as *const u8 == self.ptr,
1210            "passed a sender that wasn't selected",
1211        );
1212        let res = unsafe { channel::write(s, &mut self.token, msg) };
1213        mem::forget(self);
1214        res.map_err(SendError)
1215    }
1216
1217    /// Completes the receive operation.
1218    ///
1219    /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1220    /// when the operation was added.
1221    ///
1222    /// # Panics
1223    ///
1224    /// Panics if an incorrect [`Receiver`] reference is passed.
1225    ///
1226    /// # Examples
1227    ///
1228    /// ```
1229    /// use crossbeam_channel::{bounded, Select, RecvError};
1230    ///
1231    /// let (s, r) = bounded::<i32>(0);
1232    /// drop(s);
1233    ///
1234    /// let mut sel = Select::new();
1235    /// let oper1 = sel.recv(&r);
1236    ///
1237    /// let oper = sel.select();
1238    /// assert_eq!(oper.index(), oper1);
1239    /// assert_eq!(oper.recv(&r), Err(RecvError));
1240    /// ```
1241    pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1242        assert!(
1243            r as *const Receiver<T> as *const u8 == self.ptr,
1244            "passed a receiver that wasn't selected",
1245        );
1246        let res = unsafe { channel::read(r, &mut self.token) };
1247        mem::forget(self);
1248        res.map_err(|_| RecvError)
1249    }
1250}
1251
1252impl fmt::Debug for SelectedOperation<'_> {
1253    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1254        f.pad("SelectedOperation { .. }")
1255    }
1256}
1257
1258impl Drop for SelectedOperation<'_> {
1259    fn drop(&mut self) {
1260        panic!("dropped `SelectedOperation` without completing the operation");
1261    }
1262}