crossbeam_channel/flavors/
zero.rs1use std::cell::UnsafeCell;
6use std::marker::PhantomData;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::Mutex;
9use std::time::Instant;
10use std::{fmt, ptr};
11
12use crossbeam_utils::Backoff;
13
14use crate::context::Context;
15use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
16use crate::select::{Operation, SelectHandle, Selected, Token};
17use crate::waker::Waker;
18
19pub(crate) struct ZeroToken(*mut ());
21
22impl Default for ZeroToken {
23 fn default() -> Self {
24 Self(ptr::null_mut())
25 }
26}
27
28impl fmt::Debug for ZeroToken {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 fmt::Debug::fmt(&(self.0 as usize), f)
31 }
32}
33
34struct Packet<T> {
36 on_stack: bool,
38
39 ready: AtomicBool,
41
42 msg: UnsafeCell<Option<T>>,
44}
45
46impl<T> Packet<T> {
47 fn empty_on_stack() -> Packet<T> {
49 Packet {
50 on_stack: true,
51 ready: AtomicBool::new(false),
52 msg: UnsafeCell::new(None),
53 }
54 }
55
56 fn empty_on_heap() -> Box<Packet<T>> {
58 Box::new(Packet {
59 on_stack: false,
60 ready: AtomicBool::new(false),
61 msg: UnsafeCell::new(None),
62 })
63 }
64
65 fn message_on_stack(msg: T) -> Packet<T> {
67 Packet {
68 on_stack: true,
69 ready: AtomicBool::new(false),
70 msg: UnsafeCell::new(Some(msg)),
71 }
72 }
73
74 fn wait_ready(&self) {
76 let backoff = Backoff::new();
77 while !self.ready.load(Ordering::Acquire) {
78 backoff.snooze();
79 }
80 }
81}
82
83struct Inner {
85 senders: Waker,
87
88 receivers: Waker,
90
91 is_disconnected: bool,
93}
94
95pub(crate) struct Channel<T> {
97 inner: Mutex<Inner>,
99
100 _marker: PhantomData<T>,
102}
103
104impl<T> Channel<T> {
105 pub(crate) fn new() -> Self {
107 Channel {
108 inner: Mutex::new(Inner {
109 senders: Waker::new(),
110 receivers: Waker::new(),
111 is_disconnected: false,
112 }),
113 _marker: PhantomData,
114 }
115 }
116
117 pub(crate) fn receiver(&self) -> Receiver<'_, T> {
119 Receiver(self)
120 }
121
122 pub(crate) fn sender(&self) -> Sender<'_, T> {
124 Sender(self)
125 }
126
127 fn start_send(&self, token: &mut Token) -> bool {
129 let mut inner = self.inner.lock().unwrap();
130
131 if let Some(operation) = inner.receivers.try_select() {
133 token.zero.0 = operation.packet;
134 true
135 } else if inner.is_disconnected {
136 token.zero.0 = ptr::null_mut();
137 true
138 } else {
139 false
140 }
141 }
142
143 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
145 if token.zero.0.is_null() {
147 return Err(msg);
148 }
149
150 let packet = &*(token.zero.0 as *const Packet<T>);
151 packet.msg.get().write(Some(msg));
152 packet.ready.store(true, Ordering::Release);
153 Ok(())
154 }
155
156 fn start_recv(&self, token: &mut Token) -> bool {
158 let mut inner = self.inner.lock().unwrap();
159
160 if let Some(operation) = inner.senders.try_select() {
162 token.zero.0 = operation.packet;
163 true
164 } else if inner.is_disconnected {
165 token.zero.0 = ptr::null_mut();
166 true
167 } else {
168 false
169 }
170 }
171
172 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
174 if token.zero.0.is_null() {
176 return Err(());
177 }
178
179 let packet = &*(token.zero.0 as *const Packet<T>);
180
181 if packet.on_stack {
182 let msg = packet.msg.get().replace(None).unwrap();
186 packet.ready.store(true, Ordering::Release);
187 Ok(msg)
188 } else {
189 packet.wait_ready();
192 let msg = packet.msg.get().replace(None).unwrap();
193 drop(Box::from_raw(token.zero.0.cast::<Packet<T>>()));
194 Ok(msg)
195 }
196 }
197
198 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
200 let token = &mut Token::default();
201 let mut inner = self.inner.lock().unwrap();
202
203 if let Some(operation) = inner.receivers.try_select() {
205 token.zero.0 = operation.packet;
206 drop(inner);
207 unsafe {
208 self.write(token, msg).ok().unwrap();
209 }
210 Ok(())
211 } else if inner.is_disconnected {
212 Err(TrySendError::Disconnected(msg))
213 } else {
214 Err(TrySendError::Full(msg))
215 }
216 }
217
218 pub(crate) fn send(
220 &self,
221 msg: T,
222 deadline: Option<Instant>,
223 ) -> Result<(), SendTimeoutError<T>> {
224 let token = &mut Token::default();
225 let mut inner = self.inner.lock().unwrap();
226
227 if let Some(operation) = inner.receivers.try_select() {
229 token.zero.0 = operation.packet;
230 drop(inner);
231 unsafe {
232 self.write(token, msg).ok().unwrap();
233 }
234 return Ok(());
235 }
236
237 if inner.is_disconnected {
238 return Err(SendTimeoutError::Disconnected(msg));
239 }
240
241 Context::with(|cx| {
242 let oper = Operation::hook(token);
244 let mut packet = Packet::<T>::message_on_stack(msg);
245 inner
246 .senders
247 .register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
248 inner.receivers.notify();
249 drop(inner);
250
251 let sel = cx.wait_until(deadline);
253
254 match sel {
255 Selected::Waiting => unreachable!(),
256 Selected::Aborted => {
257 self.inner.lock().unwrap().senders.unregister(oper).unwrap();
258 let msg = unsafe { packet.msg.get().replace(None).unwrap() };
259 Err(SendTimeoutError::Timeout(msg))
260 }
261 Selected::Disconnected => {
262 self.inner.lock().unwrap().senders.unregister(oper).unwrap();
263 let msg = unsafe { packet.msg.get().replace(None).unwrap() };
264 Err(SendTimeoutError::Disconnected(msg))
265 }
266 Selected::Operation(_) => {
267 packet.wait_ready();
269 Ok(())
270 }
271 }
272 })
273 }
274
275 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
277 let token = &mut Token::default();
278 let mut inner = self.inner.lock().unwrap();
279
280 if let Some(operation) = inner.senders.try_select() {
282 token.zero.0 = operation.packet;
283 drop(inner);
284 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
285 } else if inner.is_disconnected {
286 Err(TryRecvError::Disconnected)
287 } else {
288 Err(TryRecvError::Empty)
289 }
290 }
291
292 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
294 let token = &mut Token::default();
295 let mut inner = self.inner.lock().unwrap();
296
297 if let Some(operation) = inner.senders.try_select() {
299 token.zero.0 = operation.packet;
300 drop(inner);
301 unsafe {
302 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
303 }
304 }
305
306 if inner.is_disconnected {
307 return Err(RecvTimeoutError::Disconnected);
308 }
309
310 Context::with(|cx| {
311 let oper = Operation::hook(token);
313 let mut packet = Packet::<T>::empty_on_stack();
314 inner.receivers.register_with_packet(
315 oper,
316 &mut packet as *mut Packet<T> as *mut (),
317 cx,
318 );
319 inner.senders.notify();
320 drop(inner);
321
322 let sel = cx.wait_until(deadline);
324
325 match sel {
326 Selected::Waiting => unreachable!(),
327 Selected::Aborted => {
328 self.inner
329 .lock()
330 .unwrap()
331 .receivers
332 .unregister(oper)
333 .unwrap();
334 Err(RecvTimeoutError::Timeout)
335 }
336 Selected::Disconnected => {
337 self.inner
338 .lock()
339 .unwrap()
340 .receivers
341 .unregister(oper)
342 .unwrap();
343 Err(RecvTimeoutError::Disconnected)
344 }
345 Selected::Operation(_) => {
346 packet.wait_ready();
348 unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
349 }
350 }
351 })
352 }
353
354 pub(crate) fn disconnect(&self) -> bool {
358 let mut inner = self.inner.lock().unwrap();
359
360 if !inner.is_disconnected {
361 inner.is_disconnected = true;
362 inner.senders.disconnect();
363 inner.receivers.disconnect();
364 true
365 } else {
366 false
367 }
368 }
369
370 pub(crate) fn len(&self) -> usize {
372 0
373 }
374
375 pub(crate) fn capacity(&self) -> Option<usize> {
377 Some(0)
378 }
379
380 pub(crate) fn is_empty(&self) -> bool {
382 true
383 }
384
385 pub(crate) fn is_full(&self) -> bool {
387 true
388 }
389}
390
391pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
393
394pub(crate) struct Sender<'a, T>(&'a Channel<T>);
396
397impl<T> SelectHandle for Receiver<'_, T> {
398 fn try_select(&self, token: &mut Token) -> bool {
399 self.0.start_recv(token)
400 }
401
402 fn deadline(&self) -> Option<Instant> {
403 None
404 }
405
406 fn register(&self, oper: Operation, cx: &Context) -> bool {
407 let packet = Box::into_raw(Packet::<T>::empty_on_heap());
408
409 let mut inner = self.0.inner.lock().unwrap();
410 inner
411 .receivers
412 .register_with_packet(oper, packet.cast::<()>(), cx);
413 inner.senders.notify();
414 inner.senders.can_select() || inner.is_disconnected
415 }
416
417 fn unregister(&self, oper: Operation) {
418 if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) {
419 unsafe {
420 drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
421 }
422 }
423 }
424
425 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
426 token.zero.0 = cx.wait_packet();
427 true
428 }
429
430 fn is_ready(&self) -> bool {
431 let inner = self.0.inner.lock().unwrap();
432 inner.senders.can_select() || inner.is_disconnected
433 }
434
435 fn watch(&self, oper: Operation, cx: &Context) -> bool {
436 let mut inner = self.0.inner.lock().unwrap();
437 inner.receivers.watch(oper, cx);
438 inner.senders.can_select() || inner.is_disconnected
439 }
440
441 fn unwatch(&self, oper: Operation) {
442 let mut inner = self.0.inner.lock().unwrap();
443 inner.receivers.unwatch(oper);
444 }
445}
446
447impl<T> SelectHandle for Sender<'_, T> {
448 fn try_select(&self, token: &mut Token) -> bool {
449 self.0.start_send(token)
450 }
451
452 fn deadline(&self) -> Option<Instant> {
453 None
454 }
455
456 fn register(&self, oper: Operation, cx: &Context) -> bool {
457 let packet = Box::into_raw(Packet::<T>::empty_on_heap());
458
459 let mut inner = self.0.inner.lock().unwrap();
460 inner
461 .senders
462 .register_with_packet(oper, packet.cast::<()>(), cx);
463 inner.receivers.notify();
464 inner.receivers.can_select() || inner.is_disconnected
465 }
466
467 fn unregister(&self, oper: Operation) {
468 if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) {
469 unsafe {
470 drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
471 }
472 }
473 }
474
475 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
476 token.zero.0 = cx.wait_packet();
477 true
478 }
479
480 fn is_ready(&self) -> bool {
481 let inner = self.0.inner.lock().unwrap();
482 inner.receivers.can_select() || inner.is_disconnected
483 }
484
485 fn watch(&self, oper: Operation, cx: &Context) -> bool {
486 let mut inner = self.0.inner.lock().unwrap();
487 inner.senders.watch(oper, cx);
488 inner.receivers.can_select() || inner.is_disconnected
489 }
490
491 fn unwatch(&self, oper: Operation) {
492 let mut inner = self.0.inner.lock().unwrap();
493 inner.senders.unwatch(oper);
494 }
495}