crossbeam_channel/flavors/
list.rs1use std::cell::UnsafeCell;
4use std::marker::PhantomData;
5use std::mem::MaybeUninit;
6use std::ptr;
7use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
8use std::time::Instant;
9
10use crossbeam_utils::{Backoff, CachePadded};
11
12use crate::context::Context;
13use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
14use crate::select::{Operation, SelectHandle, Selected, Token};
15use crate::waker::SyncWaker;
16
17const WRITE: usize = 1;
28const READ: usize = 2;
29const DESTROY: usize = 4;
30
31const LAP: usize = 32;
33const BLOCK_CAP: usize = LAP - 1;
35const SHIFT: usize = 1;
37const MARK_BIT: usize = 1;
41
42struct Slot<T> {
44 msg: UnsafeCell<MaybeUninit<T>>,
46
47 state: AtomicUsize,
49}
50
51impl<T> Slot<T> {
52 const UNINIT: Self = Self {
53 msg: UnsafeCell::new(MaybeUninit::uninit()),
54 state: AtomicUsize::new(0),
55 };
56
57 fn wait_write(&self) {
59 let backoff = Backoff::new();
60 while self.state.load(Ordering::Acquire) & WRITE == 0 {
61 backoff.snooze();
62 }
63 }
64}
65
66struct Block<T> {
70 next: AtomicPtr<Block<T>>,
72
73 slots: [Slot<T>; BLOCK_CAP],
75}
76
77impl<T> Block<T> {
78 fn new() -> Block<T> {
80 Self {
81 next: AtomicPtr::new(ptr::null_mut()),
82 slots: [Slot::UNINIT; BLOCK_CAP],
83 }
84 }
85
86 fn wait_next(&self) -> *mut Block<T> {
88 let backoff = Backoff::new();
89 loop {
90 let next = self.next.load(Ordering::Acquire);
91 if !next.is_null() {
92 return next;
93 }
94 backoff.snooze();
95 }
96 }
97
98 unsafe fn destroy(this: *mut Block<T>, start: usize) {
100 for i in start..BLOCK_CAP - 1 {
103 let slot = (*this).slots.get_unchecked(i);
104
105 if slot.state.load(Ordering::Acquire) & READ == 0
107 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
108 {
109 return;
111 }
112 }
113
114 drop(Box::from_raw(this));
116 }
117}
118
119#[derive(Debug)]
121struct Position<T> {
122 index: AtomicUsize,
124
125 block: AtomicPtr<Block<T>>,
127}
128
129#[derive(Debug)]
131pub(crate) struct ListToken {
132 block: *const u8,
134
135 offset: usize,
137}
138
139impl Default for ListToken {
140 #[inline]
141 fn default() -> Self {
142 ListToken {
143 block: ptr::null(),
144 offset: 0,
145 }
146 }
147}
148
149pub(crate) struct Channel<T> {
157 head: CachePadded<Position<T>>,
159
160 tail: CachePadded<Position<T>>,
162
163 receivers: SyncWaker,
165
166 _marker: PhantomData<T>,
168}
169
170impl<T> Channel<T> {
171 pub(crate) fn new() -> Self {
173 Channel {
174 head: CachePadded::new(Position {
175 block: AtomicPtr::new(ptr::null_mut()),
176 index: AtomicUsize::new(0),
177 }),
178 tail: CachePadded::new(Position {
179 block: AtomicPtr::new(ptr::null_mut()),
180 index: AtomicUsize::new(0),
181 }),
182 receivers: SyncWaker::new(),
183 _marker: PhantomData,
184 }
185 }
186
187 pub(crate) fn receiver(&self) -> Receiver<'_, T> {
189 Receiver(self)
190 }
191
192 pub(crate) fn sender(&self) -> Sender<'_, T> {
194 Sender(self)
195 }
196
197 fn start_send(&self, token: &mut Token) -> bool {
199 let backoff = Backoff::new();
200 let mut tail = self.tail.index.load(Ordering::Acquire);
201 let mut block = self.tail.block.load(Ordering::Acquire);
202 let mut next_block = None;
203
204 loop {
205 if tail & MARK_BIT != 0 {
207 token.list.block = ptr::null();
208 return true;
209 }
210
211 let offset = (tail >> SHIFT) % LAP;
213
214 if offset == BLOCK_CAP {
216 backoff.snooze();
217 tail = self.tail.index.load(Ordering::Acquire);
218 block = self.tail.block.load(Ordering::Acquire);
219 continue;
220 }
221
222 if offset + 1 == BLOCK_CAP && next_block.is_none() {
225 next_block = Some(Box::new(Block::<T>::new()));
226 }
227
228 if block.is_null() {
231 let new = Box::into_raw(Box::new(Block::<T>::new()));
232
233 if self
234 .tail
235 .block
236 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
237 .is_ok()
238 {
239 self.head.block.store(new, Ordering::Release);
240 block = new;
241 } else {
242 next_block = unsafe { Some(Box::from_raw(new)) };
243 tail = self.tail.index.load(Ordering::Acquire);
244 block = self.tail.block.load(Ordering::Acquire);
245 continue;
246 }
247 }
248
249 let new_tail = tail + (1 << SHIFT);
250
251 match self.tail.index.compare_exchange_weak(
253 tail,
254 new_tail,
255 Ordering::SeqCst,
256 Ordering::Acquire,
257 ) {
258 Ok(_) => unsafe {
259 if offset + 1 == BLOCK_CAP {
261 let next_block = Box::into_raw(next_block.unwrap());
262 self.tail.block.store(next_block, Ordering::Release);
263 self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
264 (*block).next.store(next_block, Ordering::Release);
265 }
266
267 token.list.block = block as *const u8;
268 token.list.offset = offset;
269 return true;
270 },
271 Err(t) => {
272 tail = t;
273 block = self.tail.block.load(Ordering::Acquire);
274 backoff.spin();
275 }
276 }
277 }
278 }
279
280 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
282 if token.list.block.is_null() {
284 return Err(msg);
285 }
286
287 let block = token.list.block.cast::<Block<T>>();
289 let offset = token.list.offset;
290 let slot = (*block).slots.get_unchecked(offset);
291 slot.msg.get().write(MaybeUninit::new(msg));
292 slot.state.fetch_or(WRITE, Ordering::Release);
293
294 self.receivers.notify();
296 Ok(())
297 }
298
299 fn start_recv(&self, token: &mut Token) -> bool {
301 let backoff = Backoff::new();
302 let mut head = self.head.index.load(Ordering::Acquire);
303 let mut block = self.head.block.load(Ordering::Acquire);
304
305 loop {
306 let offset = (head >> SHIFT) % LAP;
308
309 if offset == BLOCK_CAP {
311 backoff.snooze();
312 head = self.head.index.load(Ordering::Acquire);
313 block = self.head.block.load(Ordering::Acquire);
314 continue;
315 }
316
317 let mut new_head = head + (1 << SHIFT);
318
319 if new_head & MARK_BIT == 0 {
320 atomic::fence(Ordering::SeqCst);
321 let tail = self.tail.index.load(Ordering::Relaxed);
322
323 if head >> SHIFT == tail >> SHIFT {
325 if tail & MARK_BIT != 0 {
327 token.list.block = ptr::null();
329 return true;
330 } else {
331 return false;
333 }
334 }
335
336 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
338 new_head |= MARK_BIT;
339 }
340 }
341
342 if block.is_null() {
345 backoff.snooze();
346 head = self.head.index.load(Ordering::Acquire);
347 block = self.head.block.load(Ordering::Acquire);
348 continue;
349 }
350
351 match self.head.index.compare_exchange_weak(
353 head,
354 new_head,
355 Ordering::SeqCst,
356 Ordering::Acquire,
357 ) {
358 Ok(_) => unsafe {
359 if offset + 1 == BLOCK_CAP {
361 let next = (*block).wait_next();
362 let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
363 if !(*next).next.load(Ordering::Relaxed).is_null() {
364 next_index |= MARK_BIT;
365 }
366
367 self.head.block.store(next, Ordering::Release);
368 self.head.index.store(next_index, Ordering::Release);
369 }
370
371 token.list.block = block as *const u8;
372 token.list.offset = offset;
373 return true;
374 },
375 Err(h) => {
376 head = h;
377 block = self.head.block.load(Ordering::Acquire);
378 backoff.spin();
379 }
380 }
381 }
382 }
383
384 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
386 if token.list.block.is_null() {
387 return Err(());
389 }
390
391 let block = token.list.block as *mut Block<T>;
393 let offset = token.list.offset;
394 let slot = (*block).slots.get_unchecked(offset);
395 slot.wait_write();
396 let msg = slot.msg.get().read().assume_init();
397
398 if offset + 1 == BLOCK_CAP {
401 Block::destroy(block, 0);
402 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
403 Block::destroy(block, offset + 1);
404 }
405
406 Ok(msg)
407 }
408
409 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
411 self.send(msg, None).map_err(|err| match err {
412 SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
413 SendTimeoutError::Timeout(_) => unreachable!(),
414 })
415 }
416
417 pub(crate) fn send(
419 &self,
420 msg: T,
421 _deadline: Option<Instant>,
422 ) -> Result<(), SendTimeoutError<T>> {
423 let token = &mut Token::default();
424 assert!(self.start_send(token));
425 unsafe {
426 self.write(token, msg)
427 .map_err(SendTimeoutError::Disconnected)
428 }
429 }
430
431 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
433 let token = &mut Token::default();
434
435 if self.start_recv(token) {
436 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
437 } else {
438 Err(TryRecvError::Empty)
439 }
440 }
441
442 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
444 let token = &mut Token::default();
445 loop {
446 let backoff = Backoff::new();
448 loop {
449 if self.start_recv(token) {
450 unsafe {
451 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
452 }
453 }
454
455 if backoff.is_completed() {
456 break;
457 } else {
458 backoff.snooze();
459 }
460 }
461
462 if let Some(d) = deadline {
463 if Instant::now() >= d {
464 return Err(RecvTimeoutError::Timeout);
465 }
466 }
467
468 Context::with(|cx| {
470 let oper = Operation::hook(token);
471 self.receivers.register(oper, cx);
472
473 if !self.is_empty() || self.is_disconnected() {
475 let _ = cx.try_select(Selected::Aborted);
476 }
477
478 let sel = cx.wait_until(deadline);
480
481 match sel {
482 Selected::Waiting => unreachable!(),
483 Selected::Aborted | Selected::Disconnected => {
484 self.receivers.unregister(oper).unwrap();
485 }
488 Selected::Operation(_) => {}
489 }
490 });
491 }
492 }
493
494 pub(crate) fn len(&self) -> usize {
496 loop {
497 let mut tail = self.tail.index.load(Ordering::SeqCst);
499 let mut head = self.head.index.load(Ordering::SeqCst);
500
501 if self.tail.index.load(Ordering::SeqCst) == tail {
503 tail &= !((1 << SHIFT) - 1);
505 head &= !((1 << SHIFT) - 1);
506
507 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
509 tail = tail.wrapping_add(1 << SHIFT);
510 }
511 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
512 head = head.wrapping_add(1 << SHIFT);
513 }
514
515 let lap = (head >> SHIFT) / LAP;
517 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
518 head = head.wrapping_sub((lap * LAP) << SHIFT);
519
520 tail >>= SHIFT;
522 head >>= SHIFT;
523
524 return tail - head - tail / LAP;
526 }
527 }
528 }
529
530 pub(crate) fn capacity(&self) -> Option<usize> {
532 None
533 }
534
535 pub(crate) fn disconnect_senders(&self) -> bool {
539 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
540
541 if tail & MARK_BIT == 0 {
542 self.receivers.disconnect();
543 true
544 } else {
545 false
546 }
547 }
548
549 pub(crate) fn disconnect_receivers(&self) -> bool {
553 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
554
555 if tail & MARK_BIT == 0 {
556 self.discard_all_messages();
559 true
560 } else {
561 false
562 }
563 }
564
565 fn discard_all_messages(&self) {
569 let backoff = Backoff::new();
570 let mut tail = self.tail.index.load(Ordering::Acquire);
571 loop {
572 let offset = (tail >> SHIFT) % LAP;
573 if offset != BLOCK_CAP {
574 break;
575 }
576
577 backoff.snooze();
581 tail = self.tail.index.load(Ordering::Acquire);
582 }
583
584 let mut head = self.head.index.load(Ordering::Acquire);
585 let mut block = self.head.block.load(Ordering::Acquire);
586
587 if head >> SHIFT != tail >> SHIFT {
589 while block.is_null() {
594 backoff.snooze();
595 block = self.head.block.load(Ordering::Acquire);
596 }
597 }
598 unsafe {
599 while head >> SHIFT != tail >> SHIFT {
601 let offset = (head >> SHIFT) % LAP;
602
603 if offset < BLOCK_CAP {
604 let slot = (*block).slots.get_unchecked(offset);
606 slot.wait_write();
607 (*slot.msg.get()).assume_init_drop();
608 } else {
609 (*block).wait_next();
610 let next = (*block).next.load(Ordering::Acquire);
612 drop(Box::from_raw(block));
613 block = next;
614 }
615
616 head = head.wrapping_add(1 << SHIFT);
617 }
618
619 if !block.is_null() {
621 drop(Box::from_raw(block));
622 }
623 }
624 head &= !MARK_BIT;
625 self.head.block.store(ptr::null_mut(), Ordering::Release);
626 self.head.index.store(head, Ordering::Release);
627 }
628
629 pub(crate) fn is_disconnected(&self) -> bool {
631 self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
632 }
633
634 pub(crate) fn is_empty(&self) -> bool {
636 let head = self.head.index.load(Ordering::SeqCst);
637 let tail = self.tail.index.load(Ordering::SeqCst);
638 head >> SHIFT == tail >> SHIFT
639 }
640
641 pub(crate) fn is_full(&self) -> bool {
643 false
644 }
645}
646
647impl<T> Drop for Channel<T> {
648 fn drop(&mut self) {
649 let mut head = *self.head.index.get_mut();
650 let mut tail = *self.tail.index.get_mut();
651 let mut block = *self.head.block.get_mut();
652
653 head &= !((1 << SHIFT) - 1);
655 tail &= !((1 << SHIFT) - 1);
656
657 unsafe {
658 while head != tail {
660 let offset = (head >> SHIFT) % LAP;
661
662 if offset < BLOCK_CAP {
663 let slot = (*block).slots.get_unchecked(offset);
665 (*slot.msg.get()).assume_init_drop();
666 } else {
667 let next = *(*block).next.get_mut();
669 drop(Box::from_raw(block));
670 block = next;
671 }
672
673 head = head.wrapping_add(1 << SHIFT);
674 }
675
676 if !block.is_null() {
678 drop(Box::from_raw(block));
679 }
680 }
681 }
682}
683
684pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
686
687pub(crate) struct Sender<'a, T>(&'a Channel<T>);
689
690impl<T> SelectHandle for Receiver<'_, T> {
691 fn try_select(&self, token: &mut Token) -> bool {
692 self.0.start_recv(token)
693 }
694
695 fn deadline(&self) -> Option<Instant> {
696 None
697 }
698
699 fn register(&self, oper: Operation, cx: &Context) -> bool {
700 self.0.receivers.register(oper, cx);
701 self.is_ready()
702 }
703
704 fn unregister(&self, oper: Operation) {
705 self.0.receivers.unregister(oper);
706 }
707
708 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
709 self.try_select(token)
710 }
711
712 fn is_ready(&self) -> bool {
713 !self.0.is_empty() || self.0.is_disconnected()
714 }
715
716 fn watch(&self, oper: Operation, cx: &Context) -> bool {
717 self.0.receivers.watch(oper, cx);
718 self.is_ready()
719 }
720
721 fn unwatch(&self, oper: Operation) {
722 self.0.receivers.unwatch(oper);
723 }
724}
725
726impl<T> SelectHandle for Sender<'_, T> {
727 fn try_select(&self, token: &mut Token) -> bool {
728 self.0.start_send(token)
729 }
730
731 fn deadline(&self) -> Option<Instant> {
732 None
733 }
734
735 fn register(&self, _oper: Operation, _cx: &Context) -> bool {
736 self.is_ready()
737 }
738
739 fn unregister(&self, _oper: Operation) {}
740
741 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
742 self.try_select(token)
743 }
744
745 fn is_ready(&self) -> bool {
746 true
747 }
748
749 fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
750 self.is_ready()
751 }
752
753 fn unwatch(&self, _oper: Operation) {}
754}