crossbeam_channel/flavors/
array.rs1use std::cell::UnsafeCell;
12use std::mem::{self, MaybeUninit};
13use std::ptr;
14use std::sync::atomic::{self, AtomicUsize, Ordering};
15use std::time::Instant;
16
17use crossbeam_utils::{Backoff, CachePadded};
18
19use crate::context::Context;
20use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
21use crate::select::{Operation, SelectHandle, Selected, Token};
22use crate::waker::SyncWaker;
23
24struct Slot<T> {
26 stamp: AtomicUsize,
28
29 msg: UnsafeCell<MaybeUninit<T>>,
31}
32
33#[derive(Debug)]
35pub(crate) struct ArrayToken {
36 slot: *const u8,
38
39 stamp: usize,
41}
42
43impl Default for ArrayToken {
44 #[inline]
45 fn default() -> Self {
46 ArrayToken {
47 slot: ptr::null(),
48 stamp: 0,
49 }
50 }
51}
52
53pub(crate) struct Channel<T> {
55 head: CachePadded<AtomicUsize>,
63
64 tail: CachePadded<AtomicUsize>,
72
73 buffer: Box<[Slot<T>]>,
75
76 cap: usize,
78
79 one_lap: usize,
81
82 mark_bit: usize,
84
85 senders: SyncWaker,
87
88 receivers: SyncWaker,
90}
91
92impl<T> Channel<T> {
93 pub(crate) fn with_capacity(cap: usize) -> Self {
95 assert!(cap > 0, "capacity must be positive");
96
97 let mark_bit = (cap + 1).next_power_of_two();
99 let one_lap = mark_bit * 2;
100
101 let head = 0;
103 let tail = 0;
105
106 let buffer: Box<[Slot<T>]> = (0..cap)
109 .map(|i| {
110 Slot {
112 stamp: AtomicUsize::new(i),
113 msg: UnsafeCell::new(MaybeUninit::uninit()),
114 }
115 })
116 .collect();
117
118 Channel {
119 buffer,
120 cap,
121 one_lap,
122 mark_bit,
123 head: CachePadded::new(AtomicUsize::new(head)),
124 tail: CachePadded::new(AtomicUsize::new(tail)),
125 senders: SyncWaker::new(),
126 receivers: SyncWaker::new(),
127 }
128 }
129
130 pub(crate) fn receiver(&self) -> Receiver<'_, T> {
132 Receiver(self)
133 }
134
135 pub(crate) fn sender(&self) -> Sender<'_, T> {
137 Sender(self)
138 }
139
140 fn start_send(&self, token: &mut Token) -> bool {
142 let backoff = Backoff::new();
143 let mut tail = self.tail.load(Ordering::Relaxed);
144
145 loop {
146 if tail & self.mark_bit != 0 {
148 token.array.slot = ptr::null();
149 token.array.stamp = 0;
150 return true;
151 }
152
153 let index = tail & (self.mark_bit - 1);
155 let lap = tail & !(self.one_lap - 1);
156
157 debug_assert!(index < self.buffer.len());
159 let slot = unsafe { self.buffer.get_unchecked(index) };
160 let stamp = slot.stamp.load(Ordering::Acquire);
161
162 if tail == stamp {
164 let new_tail = if index + 1 < self.cap {
165 tail + 1
168 } else {
169 lap.wrapping_add(self.one_lap)
172 };
173
174 match self.tail.compare_exchange_weak(
176 tail,
177 new_tail,
178 Ordering::SeqCst,
179 Ordering::Relaxed,
180 ) {
181 Ok(_) => {
182 token.array.slot = slot as *const Slot<T> as *const u8;
184 token.array.stamp = tail + 1;
185 return true;
186 }
187 Err(t) => {
188 tail = t;
189 backoff.spin();
190 }
191 }
192 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
193 atomic::fence(Ordering::SeqCst);
194 let head = self.head.load(Ordering::Relaxed);
195
196 if head.wrapping_add(self.one_lap) == tail {
198 return false;
200 }
201
202 backoff.spin();
203 tail = self.tail.load(Ordering::Relaxed);
204 } else {
205 backoff.snooze();
207 tail = self.tail.load(Ordering::Relaxed);
208 }
209 }
210 }
211
212 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
214 if token.array.slot.is_null() {
216 return Err(msg);
217 }
218
219 let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
220
221 slot.msg.get().write(MaybeUninit::new(msg));
223 slot.stamp.store(token.array.stamp, Ordering::Release);
224
225 self.receivers.notify();
227 Ok(())
228 }
229
230 fn start_recv(&self, token: &mut Token) -> bool {
232 let backoff = Backoff::new();
233 let mut head = self.head.load(Ordering::Relaxed);
234
235 loop {
236 let index = head & (self.mark_bit - 1);
238 let lap = head & !(self.one_lap - 1);
239
240 debug_assert!(index < self.buffer.len());
242 let slot = unsafe { self.buffer.get_unchecked(index) };
243 let stamp = slot.stamp.load(Ordering::Acquire);
244
245 if head + 1 == stamp {
247 let new = if index + 1 < self.cap {
248 head + 1
251 } else {
252 lap.wrapping_add(self.one_lap)
255 };
256
257 match self.head.compare_exchange_weak(
259 head,
260 new,
261 Ordering::SeqCst,
262 Ordering::Relaxed,
263 ) {
264 Ok(_) => {
265 token.array.slot = slot as *const Slot<T> as *const u8;
267 token.array.stamp = head.wrapping_add(self.one_lap);
268 return true;
269 }
270 Err(h) => {
271 head = h;
272 backoff.spin();
273 }
274 }
275 } else if stamp == head {
276 atomic::fence(Ordering::SeqCst);
277 let tail = self.tail.load(Ordering::Relaxed);
278
279 if (tail & !self.mark_bit) == head {
281 if tail & self.mark_bit != 0 {
283 token.array.slot = ptr::null();
285 token.array.stamp = 0;
286 return true;
287 } else {
288 return false;
290 }
291 }
292
293 backoff.spin();
294 head = self.head.load(Ordering::Relaxed);
295 } else {
296 backoff.snooze();
298 head = self.head.load(Ordering::Relaxed);
299 }
300 }
301 }
302
303 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
305 if token.array.slot.is_null() {
306 return Err(());
308 }
309
310 let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
311
312 let msg = slot.msg.get().read().assume_init();
314 slot.stamp.store(token.array.stamp, Ordering::Release);
315
316 self.senders.notify();
318 Ok(msg)
319 }
320
321 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
323 let token = &mut Token::default();
324 if self.start_send(token) {
325 unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
326 } else {
327 Err(TrySendError::Full(msg))
328 }
329 }
330
331 pub(crate) fn send(
333 &self,
334 msg: T,
335 deadline: Option<Instant>,
336 ) -> Result<(), SendTimeoutError<T>> {
337 let token = &mut Token::default();
338 loop {
339 let backoff = Backoff::new();
341 loop {
342 if self.start_send(token) {
343 let res = unsafe { self.write(token, msg) };
344 return res.map_err(SendTimeoutError::Disconnected);
345 }
346
347 if backoff.is_completed() {
348 break;
349 } else {
350 backoff.snooze();
351 }
352 }
353
354 if let Some(d) = deadline {
355 if Instant::now() >= d {
356 return Err(SendTimeoutError::Timeout(msg));
357 }
358 }
359
360 Context::with(|cx| {
361 let oper = Operation::hook(token);
363 self.senders.register(oper, cx);
364
365 if !self.is_full() || self.is_disconnected() {
367 let _ = cx.try_select(Selected::Aborted);
368 }
369
370 let sel = cx.wait_until(deadline);
372
373 match sel {
374 Selected::Waiting => unreachable!(),
375 Selected::Aborted | Selected::Disconnected => {
376 self.senders.unregister(oper).unwrap();
377 }
378 Selected::Operation(_) => {}
379 }
380 });
381 }
382 }
383
384 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
386 let token = &mut Token::default();
387
388 if self.start_recv(token) {
389 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
390 } else {
391 Err(TryRecvError::Empty)
392 }
393 }
394
395 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
397 let token = &mut Token::default();
398 loop {
399 let backoff = Backoff::new();
401 loop {
402 if self.start_recv(token) {
403 let res = unsafe { self.read(token) };
404 return res.map_err(|_| RecvTimeoutError::Disconnected);
405 }
406
407 if backoff.is_completed() {
408 break;
409 } else {
410 backoff.snooze();
411 }
412 }
413
414 if let Some(d) = deadline {
415 if Instant::now() >= d {
416 return Err(RecvTimeoutError::Timeout);
417 }
418 }
419
420 Context::with(|cx| {
421 let oper = Operation::hook(token);
423 self.receivers.register(oper, cx);
424
425 if !self.is_empty() || self.is_disconnected() {
427 let _ = cx.try_select(Selected::Aborted);
428 }
429
430 let sel = cx.wait_until(deadline);
432
433 match sel {
434 Selected::Waiting => unreachable!(),
435 Selected::Aborted | Selected::Disconnected => {
436 self.receivers.unregister(oper).unwrap();
437 }
440 Selected::Operation(_) => {}
441 }
442 });
443 }
444 }
445
446 pub(crate) fn len(&self) -> usize {
448 loop {
449 let tail = self.tail.load(Ordering::SeqCst);
451 let head = self.head.load(Ordering::SeqCst);
452
453 if self.tail.load(Ordering::SeqCst) == tail {
455 let hix = head & (self.mark_bit - 1);
456 let tix = tail & (self.mark_bit - 1);
457
458 return if hix < tix {
459 tix - hix
460 } else if hix > tix {
461 self.cap - hix + tix
462 } else if (tail & !self.mark_bit) == head {
463 0
464 } else {
465 self.cap
466 };
467 }
468 }
469 }
470
471 pub(crate) fn capacity(&self) -> Option<usize> {
473 Some(self.cap)
474 }
475
476 pub(crate) fn disconnect(&self) -> bool {
480 let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
481
482 if tail & self.mark_bit == 0 {
483 self.senders.disconnect();
484 self.receivers.disconnect();
485 true
486 } else {
487 false
488 }
489 }
490
491 pub(crate) fn is_disconnected(&self) -> bool {
493 self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
494 }
495
496 pub(crate) fn is_empty(&self) -> bool {
498 let head = self.head.load(Ordering::SeqCst);
499 let tail = self.tail.load(Ordering::SeqCst);
500
501 (tail & !self.mark_bit) == head
506 }
507
508 pub(crate) fn is_full(&self) -> bool {
510 let tail = self.tail.load(Ordering::SeqCst);
511 let head = self.head.load(Ordering::SeqCst);
512
513 head.wrapping_add(self.one_lap) == tail & !self.mark_bit
518 }
519}
520
521impl<T> Drop for Channel<T> {
522 fn drop(&mut self) {
523 if mem::needs_drop::<T>() {
524 let head = *self.head.get_mut();
526 let tail = *self.tail.get_mut();
527
528 let hix = head & (self.mark_bit - 1);
529 let tix = tail & (self.mark_bit - 1);
530
531 let len = if hix < tix {
532 tix - hix
533 } else if hix > tix {
534 self.cap - hix + tix
535 } else if (tail & !self.mark_bit) == head {
536 0
537 } else {
538 self.cap
539 };
540
541 for i in 0..len {
543 let index = if hix + i < self.cap {
545 hix + i
546 } else {
547 hix + i - self.cap
548 };
549
550 unsafe {
551 debug_assert!(index < self.buffer.len());
552 let slot = self.buffer.get_unchecked_mut(index);
553 (*slot.msg.get()).assume_init_drop();
554 }
555 }
556 }
557 }
558}
559
560pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
562
563pub(crate) struct Sender<'a, T>(&'a Channel<T>);
565
566impl<T> SelectHandle for Receiver<'_, T> {
567 fn try_select(&self, token: &mut Token) -> bool {
568 self.0.start_recv(token)
569 }
570
571 fn deadline(&self) -> Option<Instant> {
572 None
573 }
574
575 fn register(&self, oper: Operation, cx: &Context) -> bool {
576 self.0.receivers.register(oper, cx);
577 self.is_ready()
578 }
579
580 fn unregister(&self, oper: Operation) {
581 self.0.receivers.unregister(oper);
582 }
583
584 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
585 self.try_select(token)
586 }
587
588 fn is_ready(&self) -> bool {
589 !self.0.is_empty() || self.0.is_disconnected()
590 }
591
592 fn watch(&self, oper: Operation, cx: &Context) -> bool {
593 self.0.receivers.watch(oper, cx);
594 self.is_ready()
595 }
596
597 fn unwatch(&self, oper: Operation) {
598 self.0.receivers.unwatch(oper);
599 }
600}
601
602impl<T> SelectHandle for Sender<'_, T> {
603 fn try_select(&self, token: &mut Token) -> bool {
604 self.0.start_send(token)
605 }
606
607 fn deadline(&self) -> Option<Instant> {
608 None
609 }
610
611 fn register(&self, oper: Operation, cx: &Context) -> bool {
612 self.0.senders.register(oper, cx);
613 self.is_ready()
614 }
615
616 fn unregister(&self, oper: Operation) {
617 self.0.senders.unregister(oper);
618 }
619
620 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
621 self.try_select(token)
622 }
623
624 fn is_ready(&self) -> bool {
625 !self.0.is_full() || self.0.is_disconnected()
626 }
627
628 fn watch(&self, oper: Operation, cx: &Context) -> bool {
629 self.0.senders.watch(oper, cx);
630 self.is_ready()
631 }
632
633 fn unwatch(&self, oper: Operation) {
634 self.0.senders.unwatch(oper);
635 }
636}