crossbeam_deque/
deque.rs

1use std::cell::{Cell, UnsafeCell};
2use std::cmp;
3use std::fmt;
4use std::iter::FromIterator;
5use std::marker::PhantomData;
6use std::mem::{self, ManuallyDrop, MaybeUninit};
7use std::ptr;
8use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
9use std::sync::Arc;
10
11use crate::epoch::{self, Atomic, Owned};
12use crate::utils::{Backoff, CachePadded};
13
14// Minimum buffer capacity.
15const MIN_CAP: usize = 64;
16// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
17const MAX_BATCH: usize = 32;
18// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
19// deallocated as soon as possible.
20const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
21
22/// A buffer that holds tasks in a worker queue.
23///
24/// This is just a pointer to the buffer and its length - dropping an instance of this struct will
25/// *not* deallocate the buffer.
26struct Buffer<T> {
27    /// Pointer to the allocated memory.
28    ptr: *mut T,
29
30    /// Capacity of the buffer. Always a power of two.
31    cap: usize,
32}
33
34unsafe impl<T> Send for Buffer<T> {}
35
36impl<T> Buffer<T> {
37    /// Allocates a new buffer with the specified capacity.
38    fn alloc(cap: usize) -> Buffer<T> {
39        debug_assert_eq!(cap, cap.next_power_of_two());
40
41        let mut v = ManuallyDrop::new(Vec::with_capacity(cap));
42        let ptr = v.as_mut_ptr();
43
44        Buffer { ptr, cap }
45    }
46
47    /// Deallocates the buffer.
48    unsafe fn dealloc(self) {
49        drop(Vec::from_raw_parts(self.ptr, 0, self.cap));
50    }
51
52    /// Returns a pointer to the task at the specified `index`.
53    unsafe fn at(&self, index: isize) -> *mut T {
54        // `self.cap` is always a power of two.
55        // We do all the loads at `MaybeUninit` because we might realize, after loading, that we
56        // don't actually have the right to access this memory.
57        self.ptr.offset(index & (self.cap - 1) as isize)
58    }
59
60    /// Writes `task` into the specified `index`.
61    ///
62    /// This method might be concurrently called with another `read` at the same index, which is
63    /// technically speaking a data race and therefore UB. We should use an atomic store here, but
64    /// that would be more expensive and difficult to implement generically for all types `T`.
65    /// Hence, as a hack, we use a volatile write instead.
66    unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
67        ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task)
68    }
69
70    /// Reads a task from the specified `index`.
71    ///
72    /// This method might be concurrently called with another `write` at the same index, which is
73    /// technically speaking a data race and therefore UB. We should use an atomic load here, but
74    /// that would be more expensive and difficult to implement generically for all types `T`.
75    /// Hence, as a hack, we use a volatile load instead.
76    unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
77        ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>())
78    }
79}
80
81impl<T> Clone for Buffer<T> {
82    fn clone(&self) -> Buffer<T> {
83        *self
84    }
85}
86
87impl<T> Copy for Buffer<T> {}
88
89/// Internal queue data shared between the worker and stealers.
90///
91/// The implementation is based on the following work:
92///
93/// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
94/// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
95///    PPoPP 2013.][weak-mem]
96/// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
97///    atomics. OOPSLA 2013.][checker]
98///
99/// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
100/// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
101/// [checker]: https://dl.acm.org/citation.cfm?id=2509514
102struct Inner<T> {
103    /// The front index.
104    front: AtomicIsize,
105
106    /// The back index.
107    back: AtomicIsize,
108
109    /// The underlying buffer.
110    buffer: CachePadded<Atomic<Buffer<T>>>,
111}
112
113impl<T> Drop for Inner<T> {
114    fn drop(&mut self) {
115        // Load the back index, front index, and buffer.
116        let b = *self.back.get_mut();
117        let f = *self.front.get_mut();
118
119        unsafe {
120            let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
121
122            // Go through the buffer from front to back and drop all tasks in the queue.
123            let mut i = f;
124            while i != b {
125                buffer.deref().at(i).drop_in_place();
126                i = i.wrapping_add(1);
127            }
128
129            // Free the memory allocated by the buffer.
130            buffer.into_owned().into_box().dealloc();
131        }
132    }
133}
134
135/// Worker queue flavor: FIFO or LIFO.
136#[derive(Clone, Copy, Debug, Eq, PartialEq)]
137enum Flavor {
138    /// The first-in first-out flavor.
139    Fifo,
140
141    /// The last-in first-out flavor.
142    Lifo,
143}
144
145/// A worker queue.
146///
147/// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
148/// tasks from it. Task schedulers typically create a single worker queue per thread.
149///
150/// # Examples
151///
152/// A FIFO worker:
153///
154/// ```
155/// use crossbeam_deque::{Steal, Worker};
156///
157/// let w = Worker::new_fifo();
158/// let s = w.stealer();
159///
160/// w.push(1);
161/// w.push(2);
162/// w.push(3);
163///
164/// assert_eq!(s.steal(), Steal::Success(1));
165/// assert_eq!(w.pop(), Some(2));
166/// assert_eq!(w.pop(), Some(3));
167/// ```
168///
169/// A LIFO worker:
170///
171/// ```
172/// use crossbeam_deque::{Steal, Worker};
173///
174/// let w = Worker::new_lifo();
175/// let s = w.stealer();
176///
177/// w.push(1);
178/// w.push(2);
179/// w.push(3);
180///
181/// assert_eq!(s.steal(), Steal::Success(1));
182/// assert_eq!(w.pop(), Some(3));
183/// assert_eq!(w.pop(), Some(2));
184/// ```
185pub struct Worker<T> {
186    /// A reference to the inner representation of the queue.
187    inner: Arc<CachePadded<Inner<T>>>,
188
189    /// A copy of `inner.buffer` for quick access.
190    buffer: Cell<Buffer<T>>,
191
192    /// The flavor of the queue.
193    flavor: Flavor,
194
195    /// Indicates that the worker cannot be shared among threads.
196    _marker: PhantomData<*mut ()>, // !Send + !Sync
197}
198
199unsafe impl<T: Send> Send for Worker<T> {}
200
201impl<T> Worker<T> {
202    /// Creates a FIFO worker queue.
203    ///
204    /// Tasks are pushed and popped from opposite ends.
205    ///
206    /// # Examples
207    ///
208    /// ```
209    /// use crossbeam_deque::Worker;
210    ///
211    /// let w = Worker::<i32>::new_fifo();
212    /// ```
213    pub fn new_fifo() -> Worker<T> {
214        let buffer = Buffer::alloc(MIN_CAP);
215
216        let inner = Arc::new(CachePadded::new(Inner {
217            front: AtomicIsize::new(0),
218            back: AtomicIsize::new(0),
219            buffer: CachePadded::new(Atomic::new(buffer)),
220        }));
221
222        Worker {
223            inner,
224            buffer: Cell::new(buffer),
225            flavor: Flavor::Fifo,
226            _marker: PhantomData,
227        }
228    }
229
230    /// Creates a LIFO worker queue.
231    ///
232    /// Tasks are pushed and popped from the same end.
233    ///
234    /// # Examples
235    ///
236    /// ```
237    /// use crossbeam_deque::Worker;
238    ///
239    /// let w = Worker::<i32>::new_lifo();
240    /// ```
241    pub fn new_lifo() -> Worker<T> {
242        let buffer = Buffer::alloc(MIN_CAP);
243
244        let inner = Arc::new(CachePadded::new(Inner {
245            front: AtomicIsize::new(0),
246            back: AtomicIsize::new(0),
247            buffer: CachePadded::new(Atomic::new(buffer)),
248        }));
249
250        Worker {
251            inner,
252            buffer: Cell::new(buffer),
253            flavor: Flavor::Lifo,
254            _marker: PhantomData,
255        }
256    }
257
258    /// Creates a stealer for this queue.
259    ///
260    /// The returned stealer can be shared among threads and cloned.
261    ///
262    /// # Examples
263    ///
264    /// ```
265    /// use crossbeam_deque::Worker;
266    ///
267    /// let w = Worker::<i32>::new_lifo();
268    /// let s = w.stealer();
269    /// ```
270    pub fn stealer(&self) -> Stealer<T> {
271        Stealer {
272            inner: self.inner.clone(),
273            flavor: self.flavor,
274        }
275    }
276
277    /// Resizes the internal buffer to the new capacity of `new_cap`.
278    #[cold]
279    unsafe fn resize(&self, new_cap: usize) {
280        // Load the back index, front index, and buffer.
281        let b = self.inner.back.load(Ordering::Relaxed);
282        let f = self.inner.front.load(Ordering::Relaxed);
283        let buffer = self.buffer.get();
284
285        // Allocate a new buffer and copy data from the old buffer to the new one.
286        let new = Buffer::alloc(new_cap);
287        let mut i = f;
288        while i != b {
289            ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
290            i = i.wrapping_add(1);
291        }
292
293        let guard = &epoch::pin();
294
295        // Replace the old buffer with the new one.
296        self.buffer.replace(new);
297        let old =
298            self.inner
299                .buffer
300                .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
301
302        // Destroy the old buffer later.
303        guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
304
305        // If the buffer is very large, then flush the thread-local garbage in order to deallocate
306        // it as soon as possible.
307        if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
308            guard.flush();
309        }
310    }
311
312    /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
313    /// buffer.
314    fn reserve(&self, reserve_cap: usize) {
315        if reserve_cap > 0 {
316            // Compute the current length.
317            let b = self.inner.back.load(Ordering::Relaxed);
318            let f = self.inner.front.load(Ordering::SeqCst);
319            let len = b.wrapping_sub(f) as usize;
320
321            // The current capacity.
322            let cap = self.buffer.get().cap;
323
324            // Is there enough capacity to push `reserve_cap` tasks?
325            if cap - len < reserve_cap {
326                // Keep doubling the capacity as much as is needed.
327                let mut new_cap = cap * 2;
328                while new_cap - len < reserve_cap {
329                    new_cap *= 2;
330                }
331
332                // Resize the buffer.
333                unsafe {
334                    self.resize(new_cap);
335                }
336            }
337        }
338    }
339
340    /// Returns `true` if the queue is empty.
341    ///
342    /// ```
343    /// use crossbeam_deque::Worker;
344    ///
345    /// let w = Worker::new_lifo();
346    ///
347    /// assert!(w.is_empty());
348    /// w.push(1);
349    /// assert!(!w.is_empty());
350    /// ```
351    pub fn is_empty(&self) -> bool {
352        let b = self.inner.back.load(Ordering::Relaxed);
353        let f = self.inner.front.load(Ordering::SeqCst);
354        b.wrapping_sub(f) <= 0
355    }
356
357    /// Returns the number of tasks in the deque.
358    ///
359    /// ```
360    /// use crossbeam_deque::Worker;
361    ///
362    /// let w = Worker::new_lifo();
363    ///
364    /// assert_eq!(w.len(), 0);
365    /// w.push(1);
366    /// assert_eq!(w.len(), 1);
367    /// w.push(1);
368    /// assert_eq!(w.len(), 2);
369    /// ```
370    pub fn len(&self) -> usize {
371        let b = self.inner.back.load(Ordering::Relaxed);
372        let f = self.inner.front.load(Ordering::SeqCst);
373        b.wrapping_sub(f).max(0) as usize
374    }
375
376    /// Pushes a task into the queue.
377    ///
378    /// # Examples
379    ///
380    /// ```
381    /// use crossbeam_deque::Worker;
382    ///
383    /// let w = Worker::new_lifo();
384    /// w.push(1);
385    /// w.push(2);
386    /// ```
387    pub fn push(&self, task: T) {
388        // Load the back index, front index, and buffer.
389        let b = self.inner.back.load(Ordering::Relaxed);
390        let f = self.inner.front.load(Ordering::Acquire);
391        let mut buffer = self.buffer.get();
392
393        // Calculate the length of the queue.
394        let len = b.wrapping_sub(f);
395
396        // Is the queue full?
397        if len >= buffer.cap as isize {
398            // Yes. Grow the underlying buffer.
399            unsafe {
400                self.resize(2 * buffer.cap);
401            }
402            buffer = self.buffer.get();
403        }
404
405        // Write `task` into the slot.
406        unsafe {
407            buffer.write(b, MaybeUninit::new(task));
408        }
409
410        atomic::fence(Ordering::Release);
411
412        // Increment the back index.
413        //
414        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
415        // races because it doesn't understand fences.
416        self.inner.back.store(b.wrapping_add(1), Ordering::Release);
417    }
418
419    /// Pops a task from the queue.
420    ///
421    /// # Examples
422    ///
423    /// ```
424    /// use crossbeam_deque::Worker;
425    ///
426    /// let w = Worker::new_fifo();
427    /// w.push(1);
428    /// w.push(2);
429    ///
430    /// assert_eq!(w.pop(), Some(1));
431    /// assert_eq!(w.pop(), Some(2));
432    /// assert_eq!(w.pop(), None);
433    /// ```
434    pub fn pop(&self) -> Option<T> {
435        // Load the back and front index.
436        let b = self.inner.back.load(Ordering::Relaxed);
437        let f = self.inner.front.load(Ordering::Relaxed);
438
439        // Calculate the length of the queue.
440        let len = b.wrapping_sub(f);
441
442        // Is the queue empty?
443        if len <= 0 {
444            return None;
445        }
446
447        match self.flavor {
448            // Pop from the front of the queue.
449            Flavor::Fifo => {
450                // Try incrementing the front index to pop the task.
451                let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
452                let new_f = f.wrapping_add(1);
453
454                if b.wrapping_sub(new_f) < 0 {
455                    self.inner.front.store(f, Ordering::Relaxed);
456                    return None;
457                }
458
459                unsafe {
460                    // Read the popped task.
461                    let buffer = self.buffer.get();
462                    let task = buffer.read(f).assume_init();
463
464                    // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
465                    if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
466                        self.resize(buffer.cap / 2);
467                    }
468
469                    Some(task)
470                }
471            }
472
473            // Pop from the back of the queue.
474            Flavor::Lifo => {
475                // Decrement the back index.
476                let b = b.wrapping_sub(1);
477                self.inner.back.store(b, Ordering::Relaxed);
478
479                atomic::fence(Ordering::SeqCst);
480
481                // Load the front index.
482                let f = self.inner.front.load(Ordering::Relaxed);
483
484                // Compute the length after the back index was decremented.
485                let len = b.wrapping_sub(f);
486
487                if len < 0 {
488                    // The queue is empty. Restore the back index to the original task.
489                    self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
490                    None
491                } else {
492                    // Read the task to be popped.
493                    let buffer = self.buffer.get();
494                    let mut task = unsafe { Some(buffer.read(b)) };
495
496                    // Are we popping the last task from the queue?
497                    if len == 0 {
498                        // Try incrementing the front index.
499                        if self
500                            .inner
501                            .front
502                            .compare_exchange(
503                                f,
504                                f.wrapping_add(1),
505                                Ordering::SeqCst,
506                                Ordering::Relaxed,
507                            )
508                            .is_err()
509                        {
510                            // Failed. We didn't pop anything. Reset to `None`.
511                            task.take();
512                        }
513
514                        // Restore the back index to the original task.
515                        self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
516                    } else {
517                        // Shrink the buffer if `len` is less than one fourth of the capacity.
518                        if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
519                            unsafe {
520                                self.resize(buffer.cap / 2);
521                            }
522                        }
523                    }
524
525                    task.map(|t| unsafe { t.assume_init() })
526                }
527            }
528        }
529    }
530}
531
532impl<T> fmt::Debug for Worker<T> {
533    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
534        f.pad("Worker { .. }")
535    }
536}
537
538/// A stealer handle of a worker queue.
539///
540/// Stealers can be shared among threads.
541///
542/// Task schedulers typically have a single worker queue per worker thread.
543///
544/// # Examples
545///
546/// ```
547/// use crossbeam_deque::{Steal, Worker};
548///
549/// let w = Worker::new_lifo();
550/// w.push(1);
551/// w.push(2);
552///
553/// let s = w.stealer();
554/// assert_eq!(s.steal(), Steal::Success(1));
555/// assert_eq!(s.steal(), Steal::Success(2));
556/// assert_eq!(s.steal(), Steal::Empty);
557/// ```
558pub struct Stealer<T> {
559    /// A reference to the inner representation of the queue.
560    inner: Arc<CachePadded<Inner<T>>>,
561
562    /// The flavor of the queue.
563    flavor: Flavor,
564}
565
566unsafe impl<T: Send> Send for Stealer<T> {}
567unsafe impl<T: Send> Sync for Stealer<T> {}
568
569impl<T> Stealer<T> {
570    /// Returns `true` if the queue is empty.
571    ///
572    /// ```
573    /// use crossbeam_deque::Worker;
574    ///
575    /// let w = Worker::new_lifo();
576    /// let s = w.stealer();
577    ///
578    /// assert!(s.is_empty());
579    /// w.push(1);
580    /// assert!(!s.is_empty());
581    /// ```
582    pub fn is_empty(&self) -> bool {
583        let f = self.inner.front.load(Ordering::Acquire);
584        atomic::fence(Ordering::SeqCst);
585        let b = self.inner.back.load(Ordering::Acquire);
586        b.wrapping_sub(f) <= 0
587    }
588
589    /// Returns the number of tasks in the deque.
590    ///
591    /// ```
592    /// use crossbeam_deque::Worker;
593    ///
594    /// let w = Worker::new_lifo();
595    /// let s = w.stealer();
596    ///
597    /// assert_eq!(s.len(), 0);
598    /// w.push(1);
599    /// assert_eq!(s.len(), 1);
600    /// w.push(2);
601    /// assert_eq!(s.len(), 2);
602    /// ```
603    pub fn len(&self) -> usize {
604        let f = self.inner.front.load(Ordering::Acquire);
605        atomic::fence(Ordering::SeqCst);
606        let b = self.inner.back.load(Ordering::Acquire);
607        b.wrapping_sub(f).max(0) as usize
608    }
609
610    /// Steals a task from the queue.
611    ///
612    /// # Examples
613    ///
614    /// ```
615    /// use crossbeam_deque::{Steal, Worker};
616    ///
617    /// let w = Worker::new_lifo();
618    /// w.push(1);
619    /// w.push(2);
620    ///
621    /// let s = w.stealer();
622    /// assert_eq!(s.steal(), Steal::Success(1));
623    /// assert_eq!(s.steal(), Steal::Success(2));
624    /// ```
625    pub fn steal(&self) -> Steal<T> {
626        // Load the front index.
627        let f = self.inner.front.load(Ordering::Acquire);
628
629        // A SeqCst fence is needed here.
630        //
631        // If the current thread is already pinned (reentrantly), we must manually issue the
632        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
633        // have to.
634        if epoch::is_pinned() {
635            atomic::fence(Ordering::SeqCst);
636        }
637
638        let guard = &epoch::pin();
639
640        // Load the back index.
641        let b = self.inner.back.load(Ordering::Acquire);
642
643        // Is the queue empty?
644        if b.wrapping_sub(f) <= 0 {
645            return Steal::Empty;
646        }
647
648        // Load the buffer and read the task at the front.
649        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
650        let task = unsafe { buffer.deref().read(f) };
651
652        // Try incrementing the front index to steal the task.
653        // If the buffer has been swapped or the increment fails, we retry.
654        if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
655            || self
656                .inner
657                .front
658                .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
659                .is_err()
660        {
661            // We didn't steal this task, forget it.
662            return Steal::Retry;
663        }
664
665        // Return the stolen task.
666        Steal::Success(unsafe { task.assume_init() })
667    }
668
669    /// Steals a batch of tasks and pushes them into another worker.
670    ///
671    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
672    /// steal around half of the tasks in the queue, but also not more than some constant limit.
673    ///
674    /// # Examples
675    ///
676    /// ```
677    /// use crossbeam_deque::Worker;
678    ///
679    /// let w1 = Worker::new_fifo();
680    /// w1.push(1);
681    /// w1.push(2);
682    /// w1.push(3);
683    /// w1.push(4);
684    ///
685    /// let s = w1.stealer();
686    /// let w2 = Worker::new_fifo();
687    ///
688    /// let _ = s.steal_batch(&w2);
689    /// assert_eq!(w2.pop(), Some(1));
690    /// assert_eq!(w2.pop(), Some(2));
691    /// ```
692    pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
693        self.steal_batch_with_limit(dest, MAX_BATCH)
694    }
695
696    /// Steals no more than `limit` of tasks and pushes them into another worker.
697    ///
698    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
699    /// steal around half of the tasks in the queue, but also not more than the given limit.
700    ///
701    /// # Examples
702    ///
703    /// ```
704    /// use crossbeam_deque::Worker;
705    ///
706    /// let w1 = Worker::new_fifo();
707    /// w1.push(1);
708    /// w1.push(2);
709    /// w1.push(3);
710    /// w1.push(4);
711    /// w1.push(5);
712    /// w1.push(6);
713    ///
714    /// let s = w1.stealer();
715    /// let w2 = Worker::new_fifo();
716    ///
717    /// let _ = s.steal_batch_with_limit(&w2, 2);
718    /// assert_eq!(w2.pop(), Some(1));
719    /// assert_eq!(w2.pop(), Some(2));
720    /// assert_eq!(w2.pop(), None);
721    ///
722    /// w1.push(7);
723    /// w1.push(8);
724    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
725    /// // half of the elements are currently popped, but the number of popped elements is considered
726    /// // an implementation detail that may be changed in the future.
727    /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX);
728    /// assert_eq!(w2.len(), 3);
729    /// ```
730    pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
731        assert!(limit > 0);
732        if Arc::ptr_eq(&self.inner, &dest.inner) {
733            if dest.is_empty() {
734                return Steal::Empty;
735            } else {
736                return Steal::Success(());
737            }
738        }
739
740        // Load the front index.
741        let mut f = self.inner.front.load(Ordering::Acquire);
742
743        // A SeqCst fence is needed here.
744        //
745        // If the current thread is already pinned (reentrantly), we must manually issue the
746        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
747        // have to.
748        if epoch::is_pinned() {
749            atomic::fence(Ordering::SeqCst);
750        }
751
752        let guard = &epoch::pin();
753
754        // Load the back index.
755        let b = self.inner.back.load(Ordering::Acquire);
756
757        // Is the queue empty?
758        let len = b.wrapping_sub(f);
759        if len <= 0 {
760            return Steal::Empty;
761        }
762
763        // Reserve capacity for the stolen batch.
764        let batch_size = cmp::min((len as usize + 1) / 2, limit);
765        dest.reserve(batch_size);
766        let mut batch_size = batch_size as isize;
767
768        // Get the destination buffer and back index.
769        let dest_buffer = dest.buffer.get();
770        let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
771
772        // Load the buffer.
773        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
774
775        match self.flavor {
776            // Steal a batch of tasks from the front at once.
777            Flavor::Fifo => {
778                // Copy the batch from the source to the destination buffer.
779                match dest.flavor {
780                    Flavor::Fifo => {
781                        for i in 0..batch_size {
782                            unsafe {
783                                let task = buffer.deref().read(f.wrapping_add(i));
784                                dest_buffer.write(dest_b.wrapping_add(i), task);
785                            }
786                        }
787                    }
788                    Flavor::Lifo => {
789                        for i in 0..batch_size {
790                            unsafe {
791                                let task = buffer.deref().read(f.wrapping_add(i));
792                                dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
793                            }
794                        }
795                    }
796                }
797
798                // Try incrementing the front index to steal the batch.
799                // If the buffer has been swapped or the increment fails, we retry.
800                if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
801                    || self
802                        .inner
803                        .front
804                        .compare_exchange(
805                            f,
806                            f.wrapping_add(batch_size),
807                            Ordering::SeqCst,
808                            Ordering::Relaxed,
809                        )
810                        .is_err()
811                {
812                    return Steal::Retry;
813                }
814
815                dest_b = dest_b.wrapping_add(batch_size);
816            }
817
818            // Steal a batch of tasks from the front one by one.
819            Flavor::Lifo => {
820                // This loop may modify the batch_size, which triggers a clippy lint warning.
821                // Use a new variable to avoid the warning, and to make it clear we aren't
822                // modifying the loop exit condition during iteration.
823                let original_batch_size = batch_size;
824
825                for i in 0..original_batch_size {
826                    // If this is not the first steal, check whether the queue is empty.
827                    if i > 0 {
828                        // We've already got the current front index. Now execute the fence to
829                        // synchronize with other threads.
830                        atomic::fence(Ordering::SeqCst);
831
832                        // Load the back index.
833                        let b = self.inner.back.load(Ordering::Acquire);
834
835                        // Is the queue empty?
836                        if b.wrapping_sub(f) <= 0 {
837                            batch_size = i;
838                            break;
839                        }
840                    }
841
842                    // Read the task at the front.
843                    let task = unsafe { buffer.deref().read(f) };
844
845                    // Try incrementing the front index to steal the task.
846                    // If the buffer has been swapped or the increment fails, we retry.
847                    if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
848                        || self
849                            .inner
850                            .front
851                            .compare_exchange(
852                                f,
853                                f.wrapping_add(1),
854                                Ordering::SeqCst,
855                                Ordering::Relaxed,
856                            )
857                            .is_err()
858                    {
859                        // We didn't steal this task, forget it and break from the loop.
860                        batch_size = i;
861                        break;
862                    }
863
864                    // Write the stolen task into the destination buffer.
865                    unsafe {
866                        dest_buffer.write(dest_b, task);
867                    }
868
869                    // Move the source front index and the destination back index one step forward.
870                    f = f.wrapping_add(1);
871                    dest_b = dest_b.wrapping_add(1);
872                }
873
874                // If we didn't steal anything, the operation needs to be retried.
875                if batch_size == 0 {
876                    return Steal::Retry;
877                }
878
879                // If stealing into a FIFO queue, stolen tasks need to be reversed.
880                if dest.flavor == Flavor::Fifo {
881                    for i in 0..batch_size / 2 {
882                        unsafe {
883                            let i1 = dest_b.wrapping_sub(batch_size - i);
884                            let i2 = dest_b.wrapping_sub(i + 1);
885                            let t1 = dest_buffer.read(i1);
886                            let t2 = dest_buffer.read(i2);
887                            dest_buffer.write(i1, t2);
888                            dest_buffer.write(i2, t1);
889                        }
890                    }
891                }
892            }
893        }
894
895        atomic::fence(Ordering::Release);
896
897        // Update the back index in the destination queue.
898        //
899        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
900        // races because it doesn't understand fences.
901        dest.inner.back.store(dest_b, Ordering::Release);
902
903        // Return with success.
904        Steal::Success(())
905    }
906
907    /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
908    ///
909    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
910    /// steal around half of the tasks in the queue, but also not more than some constant limit.
911    ///
912    /// # Examples
913    ///
914    /// ```
915    /// use crossbeam_deque::{Steal, Worker};
916    ///
917    /// let w1 = Worker::new_fifo();
918    /// w1.push(1);
919    /// w1.push(2);
920    /// w1.push(3);
921    /// w1.push(4);
922    ///
923    /// let s = w1.stealer();
924    /// let w2 = Worker::new_fifo();
925    ///
926    /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
927    /// assert_eq!(w2.pop(), Some(2));
928    /// ```
929    pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
930        self.steal_batch_with_limit_and_pop(dest, MAX_BATCH)
931    }
932
933    /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from
934    /// that worker.
935    ///
936    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
937    /// steal around half of the tasks in the queue, but also not more than the given limit.
938    ///
939    /// # Examples
940    ///
941    /// ```
942    /// use crossbeam_deque::{Steal, Worker};
943    ///
944    /// let w1 = Worker::new_fifo();
945    /// w1.push(1);
946    /// w1.push(2);
947    /// w1.push(3);
948    /// w1.push(4);
949    /// w1.push(5);
950    /// w1.push(6);
951    ///
952    /// let s = w1.stealer();
953    /// let w2 = Worker::new_fifo();
954    ///
955    /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1));
956    /// assert_eq!(w2.pop(), Some(2));
957    /// assert_eq!(w2.pop(), None);
958    ///
959    /// w1.push(7);
960    /// w1.push(8);
961    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
962    /// // half of the elements are currently popped, but the number of popped elements is considered
963    /// // an implementation detail that may be changed in the future.
964    /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3));
965    /// assert_eq!(w2.pop(), Some(4));
966    /// assert_eq!(w2.pop(), Some(5));
967    /// assert_eq!(w2.pop(), None);
968    /// ```
969    pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
970        assert!(limit > 0);
971        if Arc::ptr_eq(&self.inner, &dest.inner) {
972            match dest.pop() {
973                None => return Steal::Empty,
974                Some(task) => return Steal::Success(task),
975            }
976        }
977
978        // Load the front index.
979        let mut f = self.inner.front.load(Ordering::Acquire);
980
981        // A SeqCst fence is needed here.
982        //
983        // If the current thread is already pinned (reentrantly), we must manually issue the
984        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
985        // have to.
986        if epoch::is_pinned() {
987            atomic::fence(Ordering::SeqCst);
988        }
989
990        let guard = &epoch::pin();
991
992        // Load the back index.
993        let b = self.inner.back.load(Ordering::Acquire);
994
995        // Is the queue empty?
996        let len = b.wrapping_sub(f);
997        if len <= 0 {
998            return Steal::Empty;
999        }
1000
1001        // Reserve capacity for the stolen batch.
1002        let batch_size = cmp::min((len as usize - 1) / 2, limit - 1);
1003        dest.reserve(batch_size);
1004        let mut batch_size = batch_size as isize;
1005
1006        // Get the destination buffer and back index.
1007        let dest_buffer = dest.buffer.get();
1008        let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
1009
1010        // Load the buffer
1011        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
1012
1013        // Read the task at the front.
1014        let mut task = unsafe { buffer.deref().read(f) };
1015
1016        match self.flavor {
1017            // Steal a batch of tasks from the front at once.
1018            Flavor::Fifo => {
1019                // Copy the batch from the source to the destination buffer.
1020                match dest.flavor {
1021                    Flavor::Fifo => {
1022                        for i in 0..batch_size {
1023                            unsafe {
1024                                let task = buffer.deref().read(f.wrapping_add(i + 1));
1025                                dest_buffer.write(dest_b.wrapping_add(i), task);
1026                            }
1027                        }
1028                    }
1029                    Flavor::Lifo => {
1030                        for i in 0..batch_size {
1031                            unsafe {
1032                                let task = buffer.deref().read(f.wrapping_add(i + 1));
1033                                dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
1034                            }
1035                        }
1036                    }
1037                }
1038
1039                // Try incrementing the front index to steal the task.
1040                // If the buffer has been swapped or the increment fails, we retry.
1041                if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1042                    || self
1043                        .inner
1044                        .front
1045                        .compare_exchange(
1046                            f,
1047                            f.wrapping_add(batch_size + 1),
1048                            Ordering::SeqCst,
1049                            Ordering::Relaxed,
1050                        )
1051                        .is_err()
1052                {
1053                    // We didn't steal this task, forget it.
1054                    return Steal::Retry;
1055                }
1056
1057                dest_b = dest_b.wrapping_add(batch_size);
1058            }
1059
1060            // Steal a batch of tasks from the front one by one.
1061            Flavor::Lifo => {
1062                // Try incrementing the front index to steal the task.
1063                if self
1064                    .inner
1065                    .front
1066                    .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
1067                    .is_err()
1068                {
1069                    // We didn't steal this task, forget it.
1070                    return Steal::Retry;
1071                }
1072
1073                // Move the front index one step forward.
1074                f = f.wrapping_add(1);
1075
1076                // Repeat the same procedure for the batch steals.
1077                //
1078                // This loop may modify the batch_size, which triggers a clippy lint warning.
1079                // Use a new variable to avoid the warning, and to make it clear we aren't
1080                // modifying the loop exit condition during iteration.
1081                let original_batch_size = batch_size;
1082                for i in 0..original_batch_size {
1083                    // We've already got the current front index. Now execute the fence to
1084                    // synchronize with other threads.
1085                    atomic::fence(Ordering::SeqCst);
1086
1087                    // Load the back index.
1088                    let b = self.inner.back.load(Ordering::Acquire);
1089
1090                    // Is the queue empty?
1091                    if b.wrapping_sub(f) <= 0 {
1092                        batch_size = i;
1093                        break;
1094                    }
1095
1096                    // Read the task at the front.
1097                    let tmp = unsafe { buffer.deref().read(f) };
1098
1099                    // Try incrementing the front index to steal the task.
1100                    // If the buffer has been swapped or the increment fails, we retry.
1101                    if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1102                        || self
1103                            .inner
1104                            .front
1105                            .compare_exchange(
1106                                f,
1107                                f.wrapping_add(1),
1108                                Ordering::SeqCst,
1109                                Ordering::Relaxed,
1110                            )
1111                            .is_err()
1112                    {
1113                        // We didn't steal this task, forget it and break from the loop.
1114                        batch_size = i;
1115                        break;
1116                    }
1117
1118                    // Write the previously stolen task into the destination buffer.
1119                    unsafe {
1120                        dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1121                    }
1122
1123                    // Move the source front index and the destination back index one step forward.
1124                    f = f.wrapping_add(1);
1125                    dest_b = dest_b.wrapping_add(1);
1126                }
1127
1128                // If stealing into a FIFO queue, stolen tasks need to be reversed.
1129                if dest.flavor == Flavor::Fifo {
1130                    for i in 0..batch_size / 2 {
1131                        unsafe {
1132                            let i1 = dest_b.wrapping_sub(batch_size - i);
1133                            let i2 = dest_b.wrapping_sub(i + 1);
1134                            let t1 = dest_buffer.read(i1);
1135                            let t2 = dest_buffer.read(i2);
1136                            dest_buffer.write(i1, t2);
1137                            dest_buffer.write(i2, t1);
1138                        }
1139                    }
1140                }
1141            }
1142        }
1143
1144        atomic::fence(Ordering::Release);
1145
1146        // Update the back index in the destination queue.
1147        //
1148        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1149        // races because it doesn't understand fences.
1150        dest.inner.back.store(dest_b, Ordering::Release);
1151
1152        // Return with success.
1153        Steal::Success(unsafe { task.assume_init() })
1154    }
1155}
1156
1157impl<T> Clone for Stealer<T> {
1158    fn clone(&self) -> Stealer<T> {
1159        Stealer {
1160            inner: self.inner.clone(),
1161            flavor: self.flavor,
1162        }
1163    }
1164}
1165
1166impl<T> fmt::Debug for Stealer<T> {
1167    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1168        f.pad("Stealer { .. }")
1169    }
1170}
1171
1172// Bits indicating the state of a slot:
1173// * If a task has been written into the slot, `WRITE` is set.
1174// * If a task has been read from the slot, `READ` is set.
1175// * If the block is being destroyed, `DESTROY` is set.
1176const WRITE: usize = 1;
1177const READ: usize = 2;
1178const DESTROY: usize = 4;
1179
1180// Each block covers one "lap" of indices.
1181const LAP: usize = 64;
1182// The maximum number of values a block can hold.
1183const BLOCK_CAP: usize = LAP - 1;
1184// How many lower bits are reserved for metadata.
1185const SHIFT: usize = 1;
1186// Indicates that the block is not the last one.
1187const HAS_NEXT: usize = 1;
1188
1189/// A slot in a block.
1190struct Slot<T> {
1191    /// The task.
1192    task: UnsafeCell<MaybeUninit<T>>,
1193
1194    /// The state of the slot.
1195    state: AtomicUsize,
1196}
1197
1198impl<T> Slot<T> {
1199    const UNINIT: Self = Self {
1200        task: UnsafeCell::new(MaybeUninit::uninit()),
1201        state: AtomicUsize::new(0),
1202    };
1203
1204    /// Waits until a task is written into the slot.
1205    fn wait_write(&self) {
1206        let backoff = Backoff::new();
1207        while self.state.load(Ordering::Acquire) & WRITE == 0 {
1208            backoff.snooze();
1209        }
1210    }
1211}
1212
1213/// A block in a linked list.
1214///
1215/// Each block in the list can hold up to `BLOCK_CAP` values.
1216struct Block<T> {
1217    /// The next block in the linked list.
1218    next: AtomicPtr<Block<T>>,
1219
1220    /// Slots for values.
1221    slots: [Slot<T>; BLOCK_CAP],
1222}
1223
1224impl<T> Block<T> {
1225    /// Creates an empty block that starts at `start_index`.
1226    fn new() -> Block<T> {
1227        Self {
1228            next: AtomicPtr::new(ptr::null_mut()),
1229            slots: [Slot::UNINIT; BLOCK_CAP],
1230        }
1231    }
1232
1233    /// Waits until the next pointer is set.
1234    fn wait_next(&self) -> *mut Block<T> {
1235        let backoff = Backoff::new();
1236        loop {
1237            let next = self.next.load(Ordering::Acquire);
1238            if !next.is_null() {
1239                return next;
1240            }
1241            backoff.snooze();
1242        }
1243    }
1244
1245    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
1246    unsafe fn destroy(this: *mut Block<T>, count: usize) {
1247        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1248        // begun destruction of the block.
1249        for i in (0..count).rev() {
1250            let slot = (*this).slots.get_unchecked(i);
1251
1252            // Mark the `DESTROY` bit if a thread is still using the slot.
1253            if slot.state.load(Ordering::Acquire) & READ == 0
1254                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1255            {
1256                // If a thread is still using the slot, it will continue destruction of the block.
1257                return;
1258            }
1259        }
1260
1261        // No thread is using the block, now it is safe to destroy it.
1262        drop(Box::from_raw(this));
1263    }
1264}
1265
1266/// A position in a queue.
1267struct Position<T> {
1268    /// The index in the queue.
1269    index: AtomicUsize,
1270
1271    /// The block in the linked list.
1272    block: AtomicPtr<Block<T>>,
1273}
1274
1275/// An injector queue.
1276///
1277/// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1278/// a single injector queue, which is the entry point for new tasks.
1279///
1280/// # Examples
1281///
1282/// ```
1283/// use crossbeam_deque::{Injector, Steal};
1284///
1285/// let q = Injector::new();
1286/// q.push(1);
1287/// q.push(2);
1288///
1289/// assert_eq!(q.steal(), Steal::Success(1));
1290/// assert_eq!(q.steal(), Steal::Success(2));
1291/// assert_eq!(q.steal(), Steal::Empty);
1292/// ```
1293pub struct Injector<T> {
1294    /// The head of the queue.
1295    head: CachePadded<Position<T>>,
1296
1297    /// The tail of the queue.
1298    tail: CachePadded<Position<T>>,
1299
1300    /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1301    _marker: PhantomData<T>,
1302}
1303
1304unsafe impl<T: Send> Send for Injector<T> {}
1305unsafe impl<T: Send> Sync for Injector<T> {}
1306
1307impl<T> Default for Injector<T> {
1308    fn default() -> Self {
1309        let block = Box::into_raw(Box::new(Block::<T>::new()));
1310        Self {
1311            head: CachePadded::new(Position {
1312                block: AtomicPtr::new(block),
1313                index: AtomicUsize::new(0),
1314            }),
1315            tail: CachePadded::new(Position {
1316                block: AtomicPtr::new(block),
1317                index: AtomicUsize::new(0),
1318            }),
1319            _marker: PhantomData,
1320        }
1321    }
1322}
1323
1324impl<T> Injector<T> {
1325    /// Creates a new injector queue.
1326    ///
1327    /// # Examples
1328    ///
1329    /// ```
1330    /// use crossbeam_deque::Injector;
1331    ///
1332    /// let q = Injector::<i32>::new();
1333    /// ```
1334    pub fn new() -> Injector<T> {
1335        Self::default()
1336    }
1337
1338    /// Pushes a task into the queue.
1339    ///
1340    /// # Examples
1341    ///
1342    /// ```
1343    /// use crossbeam_deque::Injector;
1344    ///
1345    /// let w = Injector::new();
1346    /// w.push(1);
1347    /// w.push(2);
1348    /// ```
1349    pub fn push(&self, task: T) {
1350        let backoff = Backoff::new();
1351        let mut tail = self.tail.index.load(Ordering::Acquire);
1352        let mut block = self.tail.block.load(Ordering::Acquire);
1353        let mut next_block = None;
1354
1355        loop {
1356            // Calculate the offset of the index into the block.
1357            let offset = (tail >> SHIFT) % LAP;
1358
1359            // If we reached the end of the block, wait until the next one is installed.
1360            if offset == BLOCK_CAP {
1361                backoff.snooze();
1362                tail = self.tail.index.load(Ordering::Acquire);
1363                block = self.tail.block.load(Ordering::Acquire);
1364                continue;
1365            }
1366
1367            // If we're going to have to install the next block, allocate it in advance in order to
1368            // make the wait for other threads as short as possible.
1369            if offset + 1 == BLOCK_CAP && next_block.is_none() {
1370                next_block = Some(Box::new(Block::<T>::new()));
1371            }
1372
1373            let new_tail = tail + (1 << SHIFT);
1374
1375            // Try advancing the tail forward.
1376            match self.tail.index.compare_exchange_weak(
1377                tail,
1378                new_tail,
1379                Ordering::SeqCst,
1380                Ordering::Acquire,
1381            ) {
1382                Ok(_) => unsafe {
1383                    // If we've reached the end of the block, install the next one.
1384                    if offset + 1 == BLOCK_CAP {
1385                        let next_block = Box::into_raw(next_block.unwrap());
1386                        let next_index = new_tail.wrapping_add(1 << SHIFT);
1387
1388                        self.tail.block.store(next_block, Ordering::Release);
1389                        self.tail.index.store(next_index, Ordering::Release);
1390                        (*block).next.store(next_block, Ordering::Release);
1391                    }
1392
1393                    // Write the task into the slot.
1394                    let slot = (*block).slots.get_unchecked(offset);
1395                    slot.task.get().write(MaybeUninit::new(task));
1396                    slot.state.fetch_or(WRITE, Ordering::Release);
1397
1398                    return;
1399                },
1400                Err(t) => {
1401                    tail = t;
1402                    block = self.tail.block.load(Ordering::Acquire);
1403                    backoff.spin();
1404                }
1405            }
1406        }
1407    }
1408
1409    /// Steals a task from the queue.
1410    ///
1411    /// # Examples
1412    ///
1413    /// ```
1414    /// use crossbeam_deque::{Injector, Steal};
1415    ///
1416    /// let q = Injector::new();
1417    /// q.push(1);
1418    /// q.push(2);
1419    ///
1420    /// assert_eq!(q.steal(), Steal::Success(1));
1421    /// assert_eq!(q.steal(), Steal::Success(2));
1422    /// assert_eq!(q.steal(), Steal::Empty);
1423    /// ```
1424    pub fn steal(&self) -> Steal<T> {
1425        let mut head;
1426        let mut block;
1427        let mut offset;
1428
1429        let backoff = Backoff::new();
1430        loop {
1431            head = self.head.index.load(Ordering::Acquire);
1432            block = self.head.block.load(Ordering::Acquire);
1433
1434            // Calculate the offset of the index into the block.
1435            offset = (head >> SHIFT) % LAP;
1436
1437            // If we reached the end of the block, wait until the next one is installed.
1438            if offset == BLOCK_CAP {
1439                backoff.snooze();
1440            } else {
1441                break;
1442            }
1443        }
1444
1445        let mut new_head = head + (1 << SHIFT);
1446
1447        if new_head & HAS_NEXT == 0 {
1448            atomic::fence(Ordering::SeqCst);
1449            let tail = self.tail.index.load(Ordering::Relaxed);
1450
1451            // If the tail equals the head, that means the queue is empty.
1452            if head >> SHIFT == tail >> SHIFT {
1453                return Steal::Empty;
1454            }
1455
1456            // If head and tail are not in the same block, set `HAS_NEXT` in head.
1457            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1458                new_head |= HAS_NEXT;
1459            }
1460        }
1461
1462        // Try moving the head index forward.
1463        if self
1464            .head
1465            .index
1466            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1467            .is_err()
1468        {
1469            return Steal::Retry;
1470        }
1471
1472        unsafe {
1473            // If we've reached the end of the block, move to the next one.
1474            if offset + 1 == BLOCK_CAP {
1475                let next = (*block).wait_next();
1476                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1477                if !(*next).next.load(Ordering::Relaxed).is_null() {
1478                    next_index |= HAS_NEXT;
1479                }
1480
1481                self.head.block.store(next, Ordering::Release);
1482                self.head.index.store(next_index, Ordering::Release);
1483            }
1484
1485            // Read the task.
1486            let slot = (*block).slots.get_unchecked(offset);
1487            slot.wait_write();
1488            let task = slot.task.get().read().assume_init();
1489
1490            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1491            // but couldn't because we were busy reading from the slot.
1492            if (offset + 1 == BLOCK_CAP)
1493                || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
1494            {
1495                Block::destroy(block, offset);
1496            }
1497
1498            Steal::Success(task)
1499        }
1500    }
1501
1502    /// Steals a batch of tasks and pushes them into a worker.
1503    ///
1504    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1505    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1506    ///
1507    /// # Examples
1508    ///
1509    /// ```
1510    /// use crossbeam_deque::{Injector, Worker};
1511    ///
1512    /// let q = Injector::new();
1513    /// q.push(1);
1514    /// q.push(2);
1515    /// q.push(3);
1516    /// q.push(4);
1517    ///
1518    /// let w = Worker::new_fifo();
1519    /// let _ = q.steal_batch(&w);
1520    /// assert_eq!(w.pop(), Some(1));
1521    /// assert_eq!(w.pop(), Some(2));
1522    /// ```
1523    pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1524        self.steal_batch_with_limit(dest, MAX_BATCH)
1525    }
1526
1527    /// Steals no more than of tasks and pushes them into a worker.
1528    ///
1529    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1530    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1531    ///
1532    /// # Examples
1533    ///
1534    /// ```
1535    /// use crossbeam_deque::{Injector, Worker};
1536    ///
1537    /// let q = Injector::new();
1538    /// q.push(1);
1539    /// q.push(2);
1540    /// q.push(3);
1541    /// q.push(4);
1542    /// q.push(5);
1543    /// q.push(6);
1544    ///
1545    /// let w = Worker::new_fifo();
1546    /// let _ = q.steal_batch_with_limit(&w, 2);
1547    /// assert_eq!(w.pop(), Some(1));
1548    /// assert_eq!(w.pop(), Some(2));
1549    /// assert_eq!(w.pop(), None);
1550    ///
1551    /// q.push(7);
1552    /// q.push(8);
1553    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1554    /// // half of the elements are currently popped, but the number of popped elements is considered
1555    /// // an implementation detail that may be changed in the future.
1556    /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX);
1557    /// assert_eq!(w.len(), 3);
1558    /// ```
1559    pub fn steal_batch_with_limit(&self, dest: &Worker<T>, limit: usize) -> Steal<()> {
1560        assert!(limit > 0);
1561        let mut head;
1562        let mut block;
1563        let mut offset;
1564
1565        let backoff = Backoff::new();
1566        loop {
1567            head = self.head.index.load(Ordering::Acquire);
1568            block = self.head.block.load(Ordering::Acquire);
1569
1570            // Calculate the offset of the index into the block.
1571            offset = (head >> SHIFT) % LAP;
1572
1573            // If we reached the end of the block, wait until the next one is installed.
1574            if offset == BLOCK_CAP {
1575                backoff.snooze();
1576            } else {
1577                break;
1578            }
1579        }
1580
1581        let mut new_head = head;
1582        let advance;
1583
1584        if new_head & HAS_NEXT == 0 {
1585            atomic::fence(Ordering::SeqCst);
1586            let tail = self.tail.index.load(Ordering::Relaxed);
1587
1588            // If the tail equals the head, that means the queue is empty.
1589            if head >> SHIFT == tail >> SHIFT {
1590                return Steal::Empty;
1591            }
1592
1593            // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1594            // the right batch size to steal.
1595            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1596                new_head |= HAS_NEXT;
1597                // We can steal all tasks till the end of the block.
1598                advance = (BLOCK_CAP - offset).min(limit);
1599            } else {
1600                let len = (tail - head) >> SHIFT;
1601                // Steal half of the available tasks.
1602                advance = ((len + 1) / 2).min(limit);
1603            }
1604        } else {
1605            // We can steal all tasks till the end of the block.
1606            advance = (BLOCK_CAP - offset).min(limit);
1607        }
1608
1609        new_head += advance << SHIFT;
1610        let new_offset = offset + advance;
1611
1612        // Try moving the head index forward.
1613        if self
1614            .head
1615            .index
1616            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1617            .is_err()
1618        {
1619            return Steal::Retry;
1620        }
1621
1622        // Reserve capacity for the stolen batch.
1623        let batch_size = new_offset - offset;
1624        dest.reserve(batch_size);
1625
1626        // Get the destination buffer and back index.
1627        let dest_buffer = dest.buffer.get();
1628        let dest_b = dest.inner.back.load(Ordering::Relaxed);
1629
1630        unsafe {
1631            // If we've reached the end of the block, move to the next one.
1632            if new_offset == BLOCK_CAP {
1633                let next = (*block).wait_next();
1634                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1635                if !(*next).next.load(Ordering::Relaxed).is_null() {
1636                    next_index |= HAS_NEXT;
1637                }
1638
1639                self.head.block.store(next, Ordering::Release);
1640                self.head.index.store(next_index, Ordering::Release);
1641            }
1642
1643            // Copy values from the injector into the destination queue.
1644            match dest.flavor {
1645                Flavor::Fifo => {
1646                    for i in 0..batch_size {
1647                        // Read the task.
1648                        let slot = (*block).slots.get_unchecked(offset + i);
1649                        slot.wait_write();
1650                        let task = slot.task.get().read();
1651
1652                        // Write it into the destination queue.
1653                        dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1654                    }
1655                }
1656
1657                Flavor::Lifo => {
1658                    for i in 0..batch_size {
1659                        // Read the task.
1660                        let slot = (*block).slots.get_unchecked(offset + i);
1661                        slot.wait_write();
1662                        let task = slot.task.get().read();
1663
1664                        // Write it into the destination queue.
1665                        dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1666                    }
1667                }
1668            }
1669
1670            atomic::fence(Ordering::Release);
1671
1672            // Update the back index in the destination queue.
1673            //
1674            // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1675            // data races because it doesn't understand fences.
1676            dest.inner
1677                .back
1678                .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1679
1680            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1681            // but couldn't because we were busy reading from the slot.
1682            if new_offset == BLOCK_CAP {
1683                Block::destroy(block, offset);
1684            } else {
1685                for i in offset..new_offset {
1686                    let slot = (*block).slots.get_unchecked(i);
1687
1688                    if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1689                        Block::destroy(block, offset);
1690                        break;
1691                    }
1692                }
1693            }
1694
1695            Steal::Success(())
1696        }
1697    }
1698
1699    /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1700    ///
1701    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1702    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1703    ///
1704    /// # Examples
1705    ///
1706    /// ```
1707    /// use crossbeam_deque::{Injector, Steal, Worker};
1708    ///
1709    /// let q = Injector::new();
1710    /// q.push(1);
1711    /// q.push(2);
1712    /// q.push(3);
1713    /// q.push(4);
1714    ///
1715    /// let w = Worker::new_fifo();
1716    /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1717    /// assert_eq!(w.pop(), Some(2));
1718    /// ```
1719    pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1720        // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly
1721        // better, but we may change it in the future to be compatible with the same method in Stealer.
1722        self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1)
1723    }
1724
1725    /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker.
1726    ///
1727    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1728    /// steal around half of the tasks in the queue, but also not more than the given limit.
1729    ///
1730    /// # Examples
1731    ///
1732    /// ```
1733    /// use crossbeam_deque::{Injector, Steal, Worker};
1734    ///
1735    /// let q = Injector::new();
1736    /// q.push(1);
1737    /// q.push(2);
1738    /// q.push(3);
1739    /// q.push(4);
1740    /// q.push(5);
1741    /// q.push(6);
1742    ///
1743    /// let w = Worker::new_fifo();
1744    /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1));
1745    /// assert_eq!(w.pop(), Some(2));
1746    /// assert_eq!(w.pop(), None);
1747    ///
1748    /// q.push(7);
1749    /// // Setting a large limit does not guarantee that all elements will be popped. In this case,
1750    /// // half of the elements are currently popped, but the number of popped elements is considered
1751    /// // an implementation detail that may be changed in the future.
1752    /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3));
1753    /// assert_eq!(w.pop(), Some(4));
1754    /// assert_eq!(w.pop(), Some(5));
1755    /// assert_eq!(w.pop(), None);
1756    /// ```
1757    pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker<T>, limit: usize) -> Steal<T> {
1758        assert!(limit > 0);
1759        let mut head;
1760        let mut block;
1761        let mut offset;
1762
1763        let backoff = Backoff::new();
1764        loop {
1765            head = self.head.index.load(Ordering::Acquire);
1766            block = self.head.block.load(Ordering::Acquire);
1767
1768            // Calculate the offset of the index into the block.
1769            offset = (head >> SHIFT) % LAP;
1770
1771            // If we reached the end of the block, wait until the next one is installed.
1772            if offset == BLOCK_CAP {
1773                backoff.snooze();
1774            } else {
1775                break;
1776            }
1777        }
1778
1779        let mut new_head = head;
1780        let advance;
1781
1782        if new_head & HAS_NEXT == 0 {
1783            atomic::fence(Ordering::SeqCst);
1784            let tail = self.tail.index.load(Ordering::Relaxed);
1785
1786            // If the tail equals the head, that means the queue is empty.
1787            if head >> SHIFT == tail >> SHIFT {
1788                return Steal::Empty;
1789            }
1790
1791            // If head and tail are not in the same block, set `HAS_NEXT` in head.
1792            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1793                new_head |= HAS_NEXT;
1794                // We can steal all tasks till the end of the block.
1795                advance = (BLOCK_CAP - offset).min(limit);
1796            } else {
1797                let len = (tail - head) >> SHIFT;
1798                // Steal half of the available tasks.
1799                advance = ((len + 1) / 2).min(limit);
1800            }
1801        } else {
1802            // We can steal all tasks till the end of the block.
1803            advance = (BLOCK_CAP - offset).min(limit);
1804        }
1805
1806        new_head += advance << SHIFT;
1807        let new_offset = offset + advance;
1808
1809        // Try moving the head index forward.
1810        if self
1811            .head
1812            .index
1813            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1814            .is_err()
1815        {
1816            return Steal::Retry;
1817        }
1818
1819        // Reserve capacity for the stolen batch.
1820        let batch_size = new_offset - offset - 1;
1821        dest.reserve(batch_size);
1822
1823        // Get the destination buffer and back index.
1824        let dest_buffer = dest.buffer.get();
1825        let dest_b = dest.inner.back.load(Ordering::Relaxed);
1826
1827        unsafe {
1828            // If we've reached the end of the block, move to the next one.
1829            if new_offset == BLOCK_CAP {
1830                let next = (*block).wait_next();
1831                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1832                if !(*next).next.load(Ordering::Relaxed).is_null() {
1833                    next_index |= HAS_NEXT;
1834                }
1835
1836                self.head.block.store(next, Ordering::Release);
1837                self.head.index.store(next_index, Ordering::Release);
1838            }
1839
1840            // Read the task.
1841            let slot = (*block).slots.get_unchecked(offset);
1842            slot.wait_write();
1843            let task = slot.task.get().read();
1844
1845            match dest.flavor {
1846                Flavor::Fifo => {
1847                    // Copy values from the injector into the destination queue.
1848                    for i in 0..batch_size {
1849                        // Read the task.
1850                        let slot = (*block).slots.get_unchecked(offset + i + 1);
1851                        slot.wait_write();
1852                        let task = slot.task.get().read();
1853
1854                        // Write it into the destination queue.
1855                        dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1856                    }
1857                }
1858
1859                Flavor::Lifo => {
1860                    // Copy values from the injector into the destination queue.
1861                    for i in 0..batch_size {
1862                        // Read the task.
1863                        let slot = (*block).slots.get_unchecked(offset + i + 1);
1864                        slot.wait_write();
1865                        let task = slot.task.get().read();
1866
1867                        // Write it into the destination queue.
1868                        dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1869                    }
1870                }
1871            }
1872
1873            atomic::fence(Ordering::Release);
1874
1875            // Update the back index in the destination queue.
1876            //
1877            // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1878            // data races because it doesn't understand fences.
1879            dest.inner
1880                .back
1881                .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1882
1883            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1884            // but couldn't because we were busy reading from the slot.
1885            if new_offset == BLOCK_CAP {
1886                Block::destroy(block, offset);
1887            } else {
1888                for i in offset..new_offset {
1889                    let slot = (*block).slots.get_unchecked(i);
1890
1891                    if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1892                        Block::destroy(block, offset);
1893                        break;
1894                    }
1895                }
1896            }
1897
1898            Steal::Success(task.assume_init())
1899        }
1900    }
1901
1902    /// Returns `true` if the queue is empty.
1903    ///
1904    /// # Examples
1905    ///
1906    /// ```
1907    /// use crossbeam_deque::Injector;
1908    ///
1909    /// let q = Injector::new();
1910    ///
1911    /// assert!(q.is_empty());
1912    /// q.push(1);
1913    /// assert!(!q.is_empty());
1914    /// ```
1915    pub fn is_empty(&self) -> bool {
1916        let head = self.head.index.load(Ordering::SeqCst);
1917        let tail = self.tail.index.load(Ordering::SeqCst);
1918        head >> SHIFT == tail >> SHIFT
1919    }
1920
1921    /// Returns the number of tasks in the queue.
1922    ///
1923    /// # Examples
1924    ///
1925    /// ```
1926    /// use crossbeam_deque::Injector;
1927    ///
1928    /// let q = Injector::new();
1929    ///
1930    /// assert_eq!(q.len(), 0);
1931    /// q.push(1);
1932    /// assert_eq!(q.len(), 1);
1933    /// q.push(1);
1934    /// assert_eq!(q.len(), 2);
1935    /// ```
1936    pub fn len(&self) -> usize {
1937        loop {
1938            // Load the tail index, then load the head index.
1939            let mut tail = self.tail.index.load(Ordering::SeqCst);
1940            let mut head = self.head.index.load(Ordering::SeqCst);
1941
1942            // If the tail index didn't change, we've got consistent indices to work with.
1943            if self.tail.index.load(Ordering::SeqCst) == tail {
1944                // Erase the lower bits.
1945                tail &= !((1 << SHIFT) - 1);
1946                head &= !((1 << SHIFT) - 1);
1947
1948                // Fix up indices if they fall onto block ends.
1949                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
1950                    tail = tail.wrapping_add(1 << SHIFT);
1951                }
1952                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
1953                    head = head.wrapping_add(1 << SHIFT);
1954                }
1955
1956                // Rotate indices so that head falls into the first block.
1957                let lap = (head >> SHIFT) / LAP;
1958                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
1959                head = head.wrapping_sub((lap * LAP) << SHIFT);
1960
1961                // Remove the lower bits.
1962                tail >>= SHIFT;
1963                head >>= SHIFT;
1964
1965                // Return the difference minus the number of blocks between tail and head.
1966                return tail - head - tail / LAP;
1967            }
1968        }
1969    }
1970}
1971
1972impl<T> Drop for Injector<T> {
1973    fn drop(&mut self) {
1974        let mut head = *self.head.index.get_mut();
1975        let mut tail = *self.tail.index.get_mut();
1976        let mut block = *self.head.block.get_mut();
1977
1978        // Erase the lower bits.
1979        head &= !((1 << SHIFT) - 1);
1980        tail &= !((1 << SHIFT) - 1);
1981
1982        unsafe {
1983            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
1984            while head != tail {
1985                let offset = (head >> SHIFT) % LAP;
1986
1987                if offset < BLOCK_CAP {
1988                    // Drop the task in the slot.
1989                    let slot = (*block).slots.get_unchecked(offset);
1990                    let p = &mut *slot.task.get();
1991                    p.as_mut_ptr().drop_in_place();
1992                } else {
1993                    // Deallocate the block and move to the next one.
1994                    let next = *(*block).next.get_mut();
1995                    drop(Box::from_raw(block));
1996                    block = next;
1997                }
1998
1999                head = head.wrapping_add(1 << SHIFT);
2000            }
2001
2002            // Deallocate the last remaining block.
2003            drop(Box::from_raw(block));
2004        }
2005    }
2006}
2007
2008impl<T> fmt::Debug for Injector<T> {
2009    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2010        f.pad("Worker { .. }")
2011    }
2012}
2013
2014/// Possible outcomes of a steal operation.
2015///
2016/// # Examples
2017///
2018/// There are lots of ways to chain results of steal operations together:
2019///
2020/// ```
2021/// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
2022///
2023/// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
2024///
2025/// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
2026/// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
2027/// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
2028///
2029/// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
2030/// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
2031/// ```
2032#[must_use]
2033#[derive(PartialEq, Eq, Copy, Clone)]
2034pub enum Steal<T> {
2035    /// The queue was empty at the time of stealing.
2036    Empty,
2037
2038    /// At least one task was successfully stolen.
2039    Success(T),
2040
2041    /// The steal operation needs to be retried.
2042    Retry,
2043}
2044
2045impl<T> Steal<T> {
2046    /// Returns `true` if the queue was empty at the time of stealing.
2047    ///
2048    /// # Examples
2049    ///
2050    /// ```
2051    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2052    ///
2053    /// assert!(!Success(7).is_empty());
2054    /// assert!(!Retry::<i32>.is_empty());
2055    ///
2056    /// assert!(Empty::<i32>.is_empty());
2057    /// ```
2058    pub fn is_empty(&self) -> bool {
2059        match self {
2060            Steal::Empty => true,
2061            _ => false,
2062        }
2063    }
2064
2065    /// Returns `true` if at least one task was stolen.
2066    ///
2067    /// # Examples
2068    ///
2069    /// ```
2070    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2071    ///
2072    /// assert!(!Empty::<i32>.is_success());
2073    /// assert!(!Retry::<i32>.is_success());
2074    ///
2075    /// assert!(Success(7).is_success());
2076    /// ```
2077    pub fn is_success(&self) -> bool {
2078        match self {
2079            Steal::Success(_) => true,
2080            _ => false,
2081        }
2082    }
2083
2084    /// Returns `true` if the steal operation needs to be retried.
2085    ///
2086    /// # Examples
2087    ///
2088    /// ```
2089    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2090    ///
2091    /// assert!(!Empty::<i32>.is_retry());
2092    /// assert!(!Success(7).is_retry());
2093    ///
2094    /// assert!(Retry::<i32>.is_retry());
2095    /// ```
2096    pub fn is_retry(&self) -> bool {
2097        match self {
2098            Steal::Retry => true,
2099            _ => false,
2100        }
2101    }
2102
2103    /// Returns the result of the operation, if successful.
2104    ///
2105    /// # Examples
2106    ///
2107    /// ```
2108    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2109    ///
2110    /// assert_eq!(Empty::<i32>.success(), None);
2111    /// assert_eq!(Retry::<i32>.success(), None);
2112    ///
2113    /// assert_eq!(Success(7).success(), Some(7));
2114    /// ```
2115    pub fn success(self) -> Option<T> {
2116        match self {
2117            Steal::Success(res) => Some(res),
2118            _ => None,
2119        }
2120    }
2121
2122    /// If no task was stolen, attempts another steal operation.
2123    ///
2124    /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
2125    ///
2126    /// * If the second steal resulted in `Success`, it is returned.
2127    /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
2128    /// * If both resulted in `None`, then `None` is returned.
2129    ///
2130    /// # Examples
2131    ///
2132    /// ```
2133    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
2134    ///
2135    /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
2136    /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
2137    ///
2138    /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
2139    /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
2140    ///
2141    /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
2142    /// ```
2143    pub fn or_else<F>(self, f: F) -> Steal<T>
2144    where
2145        F: FnOnce() -> Steal<T>,
2146    {
2147        match self {
2148            Steal::Empty => f(),
2149            Steal::Success(_) => self,
2150            Steal::Retry => {
2151                if let Steal::Success(res) = f() {
2152                    Steal::Success(res)
2153                } else {
2154                    Steal::Retry
2155                }
2156            }
2157        }
2158    }
2159}
2160
2161impl<T> fmt::Debug for Steal<T> {
2162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2163        match self {
2164            Steal::Empty => f.pad("Empty"),
2165            Steal::Success(_) => f.pad("Success(..)"),
2166            Steal::Retry => f.pad("Retry"),
2167        }
2168    }
2169}
2170
2171impl<T> FromIterator<Steal<T>> for Steal<T> {
2172    /// Consumes items until a `Success` is found and returns it.
2173    ///
2174    /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
2175    /// Otherwise, `Empty` is returned.
2176    fn from_iter<I>(iter: I) -> Steal<T>
2177    where
2178        I: IntoIterator<Item = Steal<T>>,
2179    {
2180        let mut retry = false;
2181        for s in iter {
2182            match &s {
2183                Steal::Empty => {}
2184                Steal::Success(_) => return s,
2185                Steal::Retry => retry = true,
2186            }
2187        }
2188
2189        if retry {
2190            Steal::Retry
2191        } else {
2192            Steal::Empty
2193        }
2194    }
2195}