async_channel/lib.rs
1//! An async multi-producer multi-consumer channel, where each message can be received by only
2//! one of all existing consumers.
3//!
4//! There are two kinds of channels:
5//!
6//! 1. [Bounded][`bounded()`] channel with limited capacity.
7//! 2. [Unbounded][`unbounded()`] channel with unlimited capacity.
8//!
9//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
10//! among multiple threads.
11//!
12//! When all [`Sender`]s or all [`Receiver`]s are dropped, the channel becomes closed. When a
13//! channel is closed, no more messages can be sent, but remaining messages can still be received.
14//!
15//! The channel can also be closed manually by calling [`Sender::close()`] or
16//! [`Receiver::close()`].
17//!
18//! # Examples
19//!
20//! ```
21//! # futures_lite::future::block_on(async {
22//! let (s, r) = async_channel::unbounded();
23//!
24//! assert_eq!(s.send("Hello").await, Ok(()));
25//! assert_eq!(r.recv().await, Ok("Hello"));
26//! # });
27//! ```
28
29#![cfg_attr(not(feature = "std"), no_std)]
30#![forbid(unsafe_code)]
31#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
32#![doc(
33 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
34)]
35#![doc(
36 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
37)]
38
39#[cfg(not(feature = "portable-atomic"))]
40extern crate alloc;
41
42use core::fmt;
43use core::future::Future;
44use core::marker::PhantomPinned;
45use core::pin::Pin;
46use core::task::{Context, Poll};
47
48#[cfg(not(feature = "portable-atomic"))]
49use alloc::sync::Arc;
50#[cfg(not(feature = "portable-atomic"))]
51use core::sync::atomic::{AtomicUsize, Ordering};
52
53#[cfg(feature = "portable-atomic")]
54use portable_atomic::{AtomicUsize, Ordering};
55#[cfg(feature = "portable-atomic")]
56use portable_atomic_util::Arc;
57
58use concurrent_queue::{ConcurrentQueue, ForcePushError, PopError, PushError};
59use event_listener_strategy::{
60 easy_wrapper,
61 event_listener::{Event, EventListener},
62 EventListenerFuture, Strategy,
63};
64use futures_core::ready;
65use futures_core::stream::Stream;
66use pin_project_lite::pin_project;
67
68struct Channel<T> {
69 /// Inner message queue.
70 queue: ConcurrentQueue<T>,
71
72 /// Send operations waiting while the channel is full.
73 send_ops: Event,
74
75 /// Receive operations waiting while the channel is empty and not closed.
76 recv_ops: Event,
77
78 /// Stream operations while the channel is empty and not closed.
79 stream_ops: Event,
80
81 /// Closed operations while the channel is not closed.
82 closed_ops: Event,
83
84 /// The number of currently active `Sender`s.
85 sender_count: AtomicUsize,
86
87 /// The number of currently active `Receivers`s.
88 receiver_count: AtomicUsize,
89}
90
91impl<T> Channel<T> {
92 /// Closes the channel and notifies all blocked operations.
93 ///
94 /// Returns `true` if this call has closed the channel and it was not closed already.
95 fn close(&self) -> bool {
96 if self.queue.close() {
97 // Notify all send operations.
98 self.send_ops.notify(usize::MAX);
99
100 // Notify all receive and stream operations.
101 self.recv_ops.notify(usize::MAX);
102 self.stream_ops.notify(usize::MAX);
103 self.closed_ops.notify(usize::MAX);
104
105 true
106 } else {
107 false
108 }
109 }
110}
111
112/// Creates a bounded channel.
113///
114/// The created channel has space to hold at most `cap` messages at a time.
115///
116/// # Panics
117///
118/// Capacity must be a positive number. If `cap` is zero, this function will panic.
119///
120/// # Examples
121///
122/// ```
123/// # futures_lite::future::block_on(async {
124/// use async_channel::{bounded, TryRecvError, TrySendError};
125///
126/// let (s, r) = bounded(1);
127///
128/// assert_eq!(s.send(10).await, Ok(()));
129/// assert_eq!(s.try_send(20), Err(TrySendError::Full(20)));
130///
131/// assert_eq!(r.recv().await, Ok(10));
132/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
133/// # });
134/// ```
135pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
136 assert!(cap > 0, "capacity cannot be zero");
137
138 let channel = Arc::new(Channel {
139 queue: ConcurrentQueue::bounded(cap),
140 send_ops: Event::new(),
141 recv_ops: Event::new(),
142 stream_ops: Event::new(),
143 closed_ops: Event::new(),
144 sender_count: AtomicUsize::new(1),
145 receiver_count: AtomicUsize::new(1),
146 });
147
148 let s = Sender {
149 channel: channel.clone(),
150 };
151 let r = Receiver {
152 listener: None,
153 channel,
154 _pin: PhantomPinned,
155 };
156 (s, r)
157}
158
159/// Creates an unbounded channel.
160///
161/// The created channel can hold an unlimited number of messages.
162///
163/// # Examples
164///
165/// ```
166/// # futures_lite::future::block_on(async {
167/// use async_channel::{unbounded, TryRecvError};
168///
169/// let (s, r) = unbounded();
170///
171/// assert_eq!(s.send(10).await, Ok(()));
172/// assert_eq!(s.send(20).await, Ok(()));
173///
174/// assert_eq!(r.recv().await, Ok(10));
175/// assert_eq!(r.recv().await, Ok(20));
176/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
177/// # });
178/// ```
179pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
180 let channel = Arc::new(Channel {
181 queue: ConcurrentQueue::unbounded(),
182 send_ops: Event::new(),
183 recv_ops: Event::new(),
184 stream_ops: Event::new(),
185 closed_ops: Event::new(),
186 sender_count: AtomicUsize::new(1),
187 receiver_count: AtomicUsize::new(1),
188 });
189
190 let s = Sender {
191 channel: channel.clone(),
192 };
193 let r = Receiver {
194 listener: None,
195 channel,
196 _pin: PhantomPinned,
197 };
198 (s, r)
199}
200
201/// The sending side of a channel.
202///
203/// Senders can be cloned and shared among threads. When all senders associated with a channel are
204/// dropped, the channel becomes closed.
205///
206/// The channel can also be closed manually by calling [`Sender::close()`].
207pub struct Sender<T> {
208 /// Inner channel state.
209 channel: Arc<Channel<T>>,
210}
211
212impl<T> Sender<T> {
213 /// Attempts to send a message into the channel.
214 ///
215 /// If the channel is full or closed, this method returns an error.
216 ///
217 /// # Examples
218 ///
219 /// ```
220 /// use async_channel::{bounded, TrySendError};
221 ///
222 /// let (s, r) = bounded(1);
223 ///
224 /// assert_eq!(s.try_send(1), Ok(()));
225 /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
226 ///
227 /// drop(r);
228 /// assert_eq!(s.try_send(3), Err(TrySendError::Closed(3)));
229 /// ```
230 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
231 match self.channel.queue.push(msg) {
232 Ok(()) => {
233 // Notify a blocked receive operation. If the notified operation gets canceled,
234 // it will notify another blocked receive operation.
235 self.channel.recv_ops.notify_additional(1);
236
237 // Notify all blocked streams.
238 self.channel.stream_ops.notify(usize::MAX);
239
240 Ok(())
241 }
242 Err(PushError::Full(msg)) => Err(TrySendError::Full(msg)),
243 Err(PushError::Closed(msg)) => Err(TrySendError::Closed(msg)),
244 }
245 }
246
247 /// Sends a message into the channel.
248 ///
249 /// If the channel is full, this method waits until there is space for a message.
250 ///
251 /// If the channel is closed, this method returns an error.
252 ///
253 /// # Examples
254 ///
255 /// ```
256 /// # futures_lite::future::block_on(async {
257 /// use async_channel::{unbounded, SendError};
258 ///
259 /// let (s, r) = unbounded();
260 ///
261 /// assert_eq!(s.send(1).await, Ok(()));
262 /// drop(r);
263 /// assert_eq!(s.send(2).await, Err(SendError(2)));
264 /// # });
265 /// ```
266 pub fn send(&self, msg: T) -> Send<'_, T> {
267 Send::_new(SendInner {
268 sender: self,
269 msg: Some(msg),
270 listener: None,
271 _pin: PhantomPinned,
272 })
273 }
274
275 /// Completes when all receivers have dropped.
276 ///
277 /// This allows the producers to get notified when interest in the produced values is canceled and immediately stop doing work.
278 ///
279 /// # Examples
280 ///
281 /// ```
282 /// # futures_lite::future::block_on(async {
283 /// use async_channel::{unbounded, SendError};
284 ///
285 /// let (s, r) = unbounded::<i32>();
286 /// drop(r);
287 /// s.closed().await;
288 /// # });
289 /// ```
290 pub fn closed(&self) -> Closed<'_, T> {
291 Closed::_new(ClosedInner {
292 sender: self,
293 listener: None,
294 _pin: PhantomPinned,
295 })
296 }
297
298 /// Sends a message into this channel using the blocking strategy.
299 ///
300 /// If the channel is full, this method will block until there is room.
301 /// If the channel is closed, this method returns an error.
302 ///
303 /// # Blocking
304 ///
305 /// Rather than using asynchronous waiting, like the [`send`](Self::send) method,
306 /// this method will block the current thread until the message is sent.
307 ///
308 /// This method should not be used in an asynchronous context. It is intended
309 /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
310 /// Calling this method in an asynchronous context may result in deadlocks.
311 ///
312 /// # Examples
313 ///
314 /// ```
315 /// use async_channel::{unbounded, SendError};
316 ///
317 /// let (s, r) = unbounded();
318 ///
319 /// assert_eq!(s.send_blocking(1), Ok(()));
320 /// drop(r);
321 /// assert_eq!(s.send_blocking(2), Err(SendError(2)));
322 /// ```
323 #[cfg(all(feature = "std", not(target_family = "wasm")))]
324 pub fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
325 self.send(msg).wait()
326 }
327
328 /// Forcefully push a message into this channel.
329 ///
330 /// If the channel is full, this method will replace an existing message in the
331 /// channel and return it as `Ok(Some(value))`. If the channel is closed, this
332 /// method will return an error.
333 ///
334 /// # Examples
335 ///
336 /// ```
337 /// # futures_lite::future::block_on(async {
338 /// use async_channel::{bounded, SendError};
339 ///
340 /// let (s, r) = bounded(3);
341 ///
342 /// assert_eq!(s.send(1).await, Ok(()));
343 /// assert_eq!(s.send(2).await, Ok(()));
344 /// assert_eq!(s.force_send(3), Ok(None));
345 /// assert_eq!(s.force_send(4), Ok(Some(1)));
346 ///
347 /// assert_eq!(r.recv().await, Ok(2));
348 /// assert_eq!(r.recv().await, Ok(3));
349 /// assert_eq!(r.recv().await, Ok(4));
350 /// # });
351 /// ```
352 pub fn force_send(&self, msg: T) -> Result<Option<T>, SendError<T>> {
353 match self.channel.queue.force_push(msg) {
354 Ok(backlog) => {
355 // Notify a blocked receive operation. If the notified operation gets canceled,
356 // it will notify another blocked receive operation.
357 self.channel.recv_ops.notify_additional(1);
358
359 // Notify all blocked streams.
360 self.channel.stream_ops.notify(usize::MAX);
361
362 Ok(backlog)
363 }
364
365 Err(ForcePushError(reject)) => Err(SendError(reject)),
366 }
367 }
368
369 /// Closes the channel.
370 ///
371 /// Returns `true` if this call has closed the channel and it was not closed already.
372 ///
373 /// The remaining messages can still be received.
374 ///
375 /// # Examples
376 ///
377 /// ```
378 /// # futures_lite::future::block_on(async {
379 /// use async_channel::{unbounded, RecvError};
380 ///
381 /// let (s, r) = unbounded();
382 /// assert_eq!(s.send(1).await, Ok(()));
383 /// assert!(s.close());
384 ///
385 /// assert_eq!(r.recv().await, Ok(1));
386 /// assert_eq!(r.recv().await, Err(RecvError));
387 /// # });
388 /// ```
389 pub fn close(&self) -> bool {
390 self.channel.close()
391 }
392
393 /// Returns `true` if the channel is closed.
394 ///
395 /// # Examples
396 ///
397 /// ```
398 /// # futures_lite::future::block_on(async {
399 /// use async_channel::{unbounded, RecvError};
400 ///
401 /// let (s, r) = unbounded::<()>();
402 /// assert!(!s.is_closed());
403 ///
404 /// drop(r);
405 /// assert!(s.is_closed());
406 /// # });
407 /// ```
408 pub fn is_closed(&self) -> bool {
409 self.channel.queue.is_closed()
410 }
411
412 /// Returns `true` if the channel is empty.
413 ///
414 /// # Examples
415 ///
416 /// ```
417 /// # futures_lite::future::block_on(async {
418 /// use async_channel::unbounded;
419 ///
420 /// let (s, r) = unbounded();
421 ///
422 /// assert!(s.is_empty());
423 /// s.send(1).await;
424 /// assert!(!s.is_empty());
425 /// # });
426 /// ```
427 pub fn is_empty(&self) -> bool {
428 self.channel.queue.is_empty()
429 }
430
431 /// Returns `true` if the channel is full.
432 ///
433 /// Unbounded channels are never full.
434 ///
435 /// # Examples
436 ///
437 /// ```
438 /// # futures_lite::future::block_on(async {
439 /// use async_channel::bounded;
440 ///
441 /// let (s, r) = bounded(1);
442 ///
443 /// assert!(!s.is_full());
444 /// s.send(1).await;
445 /// assert!(s.is_full());
446 /// # });
447 /// ```
448 pub fn is_full(&self) -> bool {
449 self.channel.queue.is_full()
450 }
451
452 /// Returns the number of messages in the channel.
453 ///
454 /// # Examples
455 ///
456 /// ```
457 /// # futures_lite::future::block_on(async {
458 /// use async_channel::unbounded;
459 ///
460 /// let (s, r) = unbounded();
461 /// assert_eq!(s.len(), 0);
462 ///
463 /// s.send(1).await;
464 /// s.send(2).await;
465 /// assert_eq!(s.len(), 2);
466 /// # });
467 /// ```
468 pub fn len(&self) -> usize {
469 self.channel.queue.len()
470 }
471
472 /// Returns the channel capacity if it's bounded.
473 ///
474 /// # Examples
475 ///
476 /// ```
477 /// use async_channel::{bounded, unbounded};
478 ///
479 /// let (s, r) = bounded::<i32>(5);
480 /// assert_eq!(s.capacity(), Some(5));
481 ///
482 /// let (s, r) = unbounded::<i32>();
483 /// assert_eq!(s.capacity(), None);
484 /// ```
485 pub fn capacity(&self) -> Option<usize> {
486 self.channel.queue.capacity()
487 }
488
489 /// Returns the number of receivers for the channel.
490 ///
491 /// # Examples
492 ///
493 /// ```
494 /// # futures_lite::future::block_on(async {
495 /// use async_channel::unbounded;
496 ///
497 /// let (s, r) = unbounded::<()>();
498 /// assert_eq!(s.receiver_count(), 1);
499 ///
500 /// let r2 = r.clone();
501 /// assert_eq!(s.receiver_count(), 2);
502 /// # });
503 /// ```
504 pub fn receiver_count(&self) -> usize {
505 self.channel.receiver_count.load(Ordering::SeqCst)
506 }
507
508 /// Returns the number of senders for the channel.
509 ///
510 /// # Examples
511 ///
512 /// ```
513 /// # futures_lite::future::block_on(async {
514 /// use async_channel::unbounded;
515 ///
516 /// let (s, r) = unbounded::<()>();
517 /// assert_eq!(s.sender_count(), 1);
518 ///
519 /// let s2 = s.clone();
520 /// assert_eq!(s.sender_count(), 2);
521 /// # });
522 /// ```
523 pub fn sender_count(&self) -> usize {
524 self.channel.sender_count.load(Ordering::SeqCst)
525 }
526
527 /// Downgrade the sender to a weak reference.
528 pub fn downgrade(&self) -> WeakSender<T> {
529 WeakSender {
530 channel: self.channel.clone(),
531 }
532 }
533
534 /// Returns whether the senders belong to the same channel.
535 ///
536 /// # Examples
537 ///
538 /// ```
539 /// # futures_lite::future::block_on(async {
540 /// use async_channel::unbounded;
541 ///
542 /// let (s, r) = unbounded::<()>();
543 /// let s2 = s.clone();
544 ///
545 /// assert!(s.same_channel(&s2));
546 /// # });
547 /// ```
548 pub fn same_channel(&self, other: &Sender<T>) -> bool {
549 Arc::ptr_eq(&self.channel, &other.channel)
550 }
551}
552
553impl<T> Drop for Sender<T> {
554 fn drop(&mut self) {
555 // Decrement the sender count and close the channel if it drops down to zero.
556 if self.channel.sender_count.fetch_sub(1, Ordering::AcqRel) == 1 {
557 self.channel.close();
558 }
559 }
560}
561
562impl<T> fmt::Debug for Sender<T> {
563 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
564 write!(f, "Sender {{ .. }}")
565 }
566}
567
568impl<T> Clone for Sender<T> {
569 fn clone(&self) -> Sender<T> {
570 let count = self.channel.sender_count.fetch_add(1, Ordering::Relaxed);
571
572 // Make sure the count never overflows, even if lots of sender clones are leaked.
573 if count > usize::MAX / 2 {
574 abort();
575 }
576
577 Sender {
578 channel: self.channel.clone(),
579 }
580 }
581}
582
583pin_project! {
584 /// The receiving side of a channel.
585 ///
586 /// Receivers can be cloned and shared among threads. When all receivers associated with a channel
587 /// are dropped, the channel becomes closed.
588 ///
589 /// The channel can also be closed manually by calling [`Receiver::close()`].
590 ///
591 /// Receivers implement the [`Stream`] trait.
592 pub struct Receiver<T> {
593 // Inner channel state.
594 channel: Arc<Channel<T>>,
595
596 // Listens for a send or close event to unblock this stream.
597 listener: Option<EventListener>,
598
599 // Keeping this type `!Unpin` enables future optimizations.
600 #[pin]
601 _pin: PhantomPinned
602 }
603
604 impl<T> PinnedDrop for Receiver<T> {
605 fn drop(this: Pin<&mut Self>) {
606 let this = this.project();
607
608 // Decrement the receiver count and close the channel if it drops down to zero.
609 if this.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
610 this.channel.close();
611 }
612 }
613 }
614}
615
616impl<T> Receiver<T> {
617 /// Attempts to receive a message from the channel.
618 ///
619 /// If the channel is empty, or empty and closed, this method returns an error.
620 ///
621 /// # Examples
622 ///
623 /// ```
624 /// # futures_lite::future::block_on(async {
625 /// use async_channel::{unbounded, TryRecvError};
626 ///
627 /// let (s, r) = unbounded();
628 /// assert_eq!(s.send(1).await, Ok(()));
629 ///
630 /// assert_eq!(r.try_recv(), Ok(1));
631 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
632 ///
633 /// drop(s);
634 /// assert_eq!(r.try_recv(), Err(TryRecvError::Closed));
635 /// # });
636 /// ```
637 pub fn try_recv(&self) -> Result<T, TryRecvError> {
638 match self.channel.queue.pop() {
639 Ok(msg) => {
640 // Notify a blocked send operation. If the notified operation gets canceled, it
641 // will notify another blocked send operation.
642 self.channel.send_ops.notify_additional(1);
643
644 Ok(msg)
645 }
646 Err(PopError::Empty) => Err(TryRecvError::Empty),
647 Err(PopError::Closed) => Err(TryRecvError::Closed),
648 }
649 }
650
651 /// Receives a message from the channel.
652 ///
653 /// If the channel is empty, this method waits until there is a message.
654 ///
655 /// If the channel is closed, this method receives a message or returns an error if there are
656 /// no more messages.
657 ///
658 /// # Examples
659 ///
660 /// ```
661 /// # futures_lite::future::block_on(async {
662 /// use async_channel::{unbounded, RecvError};
663 ///
664 /// let (s, r) = unbounded();
665 ///
666 /// assert_eq!(s.send(1).await, Ok(()));
667 /// drop(s);
668 ///
669 /// assert_eq!(r.recv().await, Ok(1));
670 /// assert_eq!(r.recv().await, Err(RecvError));
671 /// # });
672 /// ```
673 pub fn recv(&self) -> Recv<'_, T> {
674 Recv::_new(RecvInner {
675 receiver: self,
676 listener: None,
677 _pin: PhantomPinned,
678 })
679 }
680
681 /// Receives a message from the channel using the blocking strategy.
682 ///
683 /// If the channel is empty, this method waits until there is a message.
684 /// If the channel is closed, this method receives a message or returns an error if there are
685 /// no more messages.
686 ///
687 /// # Blocking
688 ///
689 /// Rather than using asynchronous waiting, like the [`recv`](Self::recv) method,
690 /// this method will block the current thread until the message is received.
691 ///
692 /// This method should not be used in an asynchronous context. It is intended
693 /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
694 /// Calling this method in an asynchronous context may result in deadlocks.
695 ///
696 /// # Examples
697 ///
698 /// ```
699 /// use async_channel::{unbounded, RecvError};
700 ///
701 /// let (s, r) = unbounded();
702 ///
703 /// assert_eq!(s.send_blocking(1), Ok(()));
704 /// drop(s);
705 ///
706 /// assert_eq!(r.recv_blocking(), Ok(1));
707 /// assert_eq!(r.recv_blocking(), Err(RecvError));
708 /// ```
709 #[cfg(all(feature = "std", not(target_family = "wasm")))]
710 pub fn recv_blocking(&self) -> Result<T, RecvError> {
711 self.recv().wait()
712 }
713
714 /// Closes the channel.
715 ///
716 /// Returns `true` if this call has closed the channel and it was not closed already.
717 ///
718 /// The remaining messages can still be received.
719 ///
720 /// # Examples
721 ///
722 /// ```
723 /// # futures_lite::future::block_on(async {
724 /// use async_channel::{unbounded, RecvError};
725 ///
726 /// let (s, r) = unbounded();
727 /// assert_eq!(s.send(1).await, Ok(()));
728 ///
729 /// assert!(r.close());
730 /// assert_eq!(r.recv().await, Ok(1));
731 /// assert_eq!(r.recv().await, Err(RecvError));
732 /// # });
733 /// ```
734 pub fn close(&self) -> bool {
735 self.channel.close()
736 }
737
738 /// Returns `true` if the channel is closed.
739 ///
740 /// # Examples
741 ///
742 /// ```
743 /// # futures_lite::future::block_on(async {
744 /// use async_channel::{unbounded, RecvError};
745 ///
746 /// let (s, r) = unbounded::<()>();
747 /// assert!(!r.is_closed());
748 ///
749 /// drop(s);
750 /// assert!(r.is_closed());
751 /// # });
752 /// ```
753 pub fn is_closed(&self) -> bool {
754 self.channel.queue.is_closed()
755 }
756
757 /// Returns `true` if the channel is empty.
758 ///
759 /// # Examples
760 ///
761 /// ```
762 /// # futures_lite::future::block_on(async {
763 /// use async_channel::unbounded;
764 ///
765 /// let (s, r) = unbounded();
766 ///
767 /// assert!(s.is_empty());
768 /// s.send(1).await;
769 /// assert!(!s.is_empty());
770 /// # });
771 /// ```
772 pub fn is_empty(&self) -> bool {
773 self.channel.queue.is_empty()
774 }
775
776 /// Returns `true` if the channel is full.
777 ///
778 /// Unbounded channels are never full.
779 ///
780 /// # Examples
781 ///
782 /// ```
783 /// # futures_lite::future::block_on(async {
784 /// use async_channel::bounded;
785 ///
786 /// let (s, r) = bounded(1);
787 ///
788 /// assert!(!r.is_full());
789 /// s.send(1).await;
790 /// assert!(r.is_full());
791 /// # });
792 /// ```
793 pub fn is_full(&self) -> bool {
794 self.channel.queue.is_full()
795 }
796
797 /// Returns the number of messages in the channel.
798 ///
799 /// # Examples
800 ///
801 /// ```
802 /// # futures_lite::future::block_on(async {
803 /// use async_channel::unbounded;
804 ///
805 /// let (s, r) = unbounded();
806 /// assert_eq!(r.len(), 0);
807 ///
808 /// s.send(1).await;
809 /// s.send(2).await;
810 /// assert_eq!(r.len(), 2);
811 /// # });
812 /// ```
813 pub fn len(&self) -> usize {
814 self.channel.queue.len()
815 }
816
817 /// Returns the channel capacity if it's bounded.
818 ///
819 /// # Examples
820 ///
821 /// ```
822 /// use async_channel::{bounded, unbounded};
823 ///
824 /// let (s, r) = bounded::<i32>(5);
825 /// assert_eq!(r.capacity(), Some(5));
826 ///
827 /// let (s, r) = unbounded::<i32>();
828 /// assert_eq!(r.capacity(), None);
829 /// ```
830 pub fn capacity(&self) -> Option<usize> {
831 self.channel.queue.capacity()
832 }
833
834 /// Returns the number of receivers for the channel.
835 ///
836 /// # Examples
837 ///
838 /// ```
839 /// # futures_lite::future::block_on(async {
840 /// use async_channel::unbounded;
841 ///
842 /// let (s, r) = unbounded::<()>();
843 /// assert_eq!(r.receiver_count(), 1);
844 ///
845 /// let r2 = r.clone();
846 /// assert_eq!(r.receiver_count(), 2);
847 /// # });
848 /// ```
849 pub fn receiver_count(&self) -> usize {
850 self.channel.receiver_count.load(Ordering::SeqCst)
851 }
852
853 /// Returns the number of senders for the channel.
854 ///
855 /// # Examples
856 ///
857 /// ```
858 /// # futures_lite::future::block_on(async {
859 /// use async_channel::unbounded;
860 ///
861 /// let (s, r) = unbounded::<()>();
862 /// assert_eq!(r.sender_count(), 1);
863 ///
864 /// let s2 = s.clone();
865 /// assert_eq!(r.sender_count(), 2);
866 /// # });
867 /// ```
868 pub fn sender_count(&self) -> usize {
869 self.channel.sender_count.load(Ordering::SeqCst)
870 }
871
872 /// Downgrade the receiver to a weak reference.
873 pub fn downgrade(&self) -> WeakReceiver<T> {
874 WeakReceiver {
875 channel: self.channel.clone(),
876 }
877 }
878
879 /// Returns whether the receivers belong to the same channel.
880 ///
881 /// # Examples
882 ///
883 /// ```
884 /// # futures_lite::future::block_on(async {
885 /// use async_channel::unbounded;
886 ///
887 /// let (s, r) = unbounded::<()>();
888 /// let r2 = r.clone();
889 ///
890 /// assert!(r.same_channel(&r2));
891 /// # });
892 /// ```
893 pub fn same_channel(&self, other: &Receiver<T>) -> bool {
894 Arc::ptr_eq(&self.channel, &other.channel)
895 }
896}
897
898impl<T> fmt::Debug for Receiver<T> {
899 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
900 write!(f, "Receiver {{ .. }}")
901 }
902}
903
904impl<T> Clone for Receiver<T> {
905 fn clone(&self) -> Receiver<T> {
906 let count = self.channel.receiver_count.fetch_add(1, Ordering::Relaxed);
907
908 // Make sure the count never overflows, even if lots of receiver clones are leaked.
909 if count > usize::MAX / 2 {
910 abort();
911 }
912
913 Receiver {
914 channel: self.channel.clone(),
915 listener: None,
916 _pin: PhantomPinned,
917 }
918 }
919}
920
921impl<T> Stream for Receiver<T> {
922 type Item = T;
923
924 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
925 loop {
926 // If this stream is listening for events, first wait for a notification.
927 {
928 let this = self.as_mut().project();
929 if let Some(listener) = this.listener.as_mut() {
930 ready!(Pin::new(listener).poll(cx));
931 *this.listener = None;
932 }
933 }
934
935 loop {
936 // Attempt to receive a message.
937 match self.try_recv() {
938 Ok(msg) => {
939 // The stream is not blocked on an event - drop the listener.
940 let this = self.as_mut().project();
941 *this.listener = None;
942 return Poll::Ready(Some(msg));
943 }
944 Err(TryRecvError::Closed) => {
945 // The stream is not blocked on an event - drop the listener.
946 let this = self.as_mut().project();
947 *this.listener = None;
948 return Poll::Ready(None);
949 }
950 Err(TryRecvError::Empty) => {}
951 }
952
953 // Receiving failed - now start listening for notifications or wait for one.
954 let this = self.as_mut().project();
955 if this.listener.is_some() {
956 // Go back to the outer loop to wait for a notification.
957 break;
958 } else {
959 *this.listener = Some(this.channel.stream_ops.listen());
960 }
961 }
962 }
963 }
964}
965
966impl<T> futures_core::stream::FusedStream for Receiver<T> {
967 fn is_terminated(&self) -> bool {
968 self.channel.queue.is_closed() && self.channel.queue.is_empty()
969 }
970}
971
972/// A [`Sender`] that does not prevent the channel from being closed.
973///
974/// This is created through the [`Sender::downgrade`] method. In order to use it, it needs
975/// to be upgraded into a [`Sender`] through the `upgrade` method.
976pub struct WeakSender<T> {
977 channel: Arc<Channel<T>>,
978}
979
980impl<T> WeakSender<T> {
981 /// Upgrade the [`WeakSender`] into a [`Sender`].
982 pub fn upgrade(&self) -> Option<Sender<T>> {
983 if self.channel.queue.is_closed() {
984 None
985 } else {
986 match self.channel.sender_count.fetch_update(
987 Ordering::Relaxed,
988 Ordering::Relaxed,
989 |count| if count == 0 { None } else { Some(count + 1) },
990 ) {
991 Err(_) => None,
992 Ok(new_value) if new_value > usize::MAX / 2 => {
993 // Make sure the count never overflows, even if lots of sender clones are leaked.
994 abort();
995 }
996 Ok(_) => Some(Sender {
997 channel: self.channel.clone(),
998 }),
999 }
1000 }
1001 }
1002}
1003
1004impl<T> Clone for WeakSender<T> {
1005 fn clone(&self) -> Self {
1006 WeakSender {
1007 channel: self.channel.clone(),
1008 }
1009 }
1010}
1011
1012impl<T> fmt::Debug for WeakSender<T> {
1013 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1014 write!(f, "WeakSender {{ .. }}")
1015 }
1016}
1017
1018/// A [`Receiver`] that does not prevent the channel from being closed.
1019///
1020/// This is created through the [`Receiver::downgrade`] method. In order to use it, it needs
1021/// to be upgraded into a [`Receiver`] through the `upgrade` method.
1022pub struct WeakReceiver<T> {
1023 channel: Arc<Channel<T>>,
1024}
1025
1026impl<T> WeakReceiver<T> {
1027 /// Upgrade the [`WeakReceiver`] into a [`Receiver`].
1028 pub fn upgrade(&self) -> Option<Receiver<T>> {
1029 if self.channel.queue.is_closed() {
1030 None
1031 } else {
1032 match self.channel.receiver_count.fetch_update(
1033 Ordering::Relaxed,
1034 Ordering::Relaxed,
1035 |count| if count == 0 { None } else { Some(count + 1) },
1036 ) {
1037 Err(_) => None,
1038 Ok(new_value) if new_value > usize::MAX / 2 => {
1039 // Make sure the count never overflows, even if lots of receiver clones are leaked.
1040 abort();
1041 }
1042 Ok(_) => Some(Receiver {
1043 channel: self.channel.clone(),
1044 listener: None,
1045 _pin: PhantomPinned,
1046 }),
1047 }
1048 }
1049 }
1050}
1051
1052impl<T> Clone for WeakReceiver<T> {
1053 fn clone(&self) -> Self {
1054 WeakReceiver {
1055 channel: self.channel.clone(),
1056 }
1057 }
1058}
1059
1060impl<T> fmt::Debug for WeakReceiver<T> {
1061 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1062 write!(f, "WeakReceiver {{ .. }}")
1063 }
1064}
1065
1066/// An error returned from [`Sender::send()`].
1067///
1068/// Received because the channel is closed.
1069#[derive(PartialEq, Eq, Clone, Copy)]
1070pub struct SendError<T>(pub T);
1071
1072impl<T> SendError<T> {
1073 /// Unwraps the message that couldn't be sent.
1074 pub fn into_inner(self) -> T {
1075 self.0
1076 }
1077}
1078
1079#[cfg(feature = "std")]
1080impl<T> std::error::Error for SendError<T> {}
1081
1082impl<T> fmt::Debug for SendError<T> {
1083 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1084 write!(f, "SendError(..)")
1085 }
1086}
1087
1088impl<T> fmt::Display for SendError<T> {
1089 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1090 write!(f, "sending into a closed channel")
1091 }
1092}
1093
1094/// An error returned from [`Sender::try_send()`].
1095#[derive(PartialEq, Eq, Clone, Copy)]
1096pub enum TrySendError<T> {
1097 /// The channel is full but not closed.
1098 Full(T),
1099
1100 /// The channel is closed.
1101 Closed(T),
1102}
1103
1104impl<T> TrySendError<T> {
1105 /// Unwraps the message that couldn't be sent.
1106 pub fn into_inner(self) -> T {
1107 match self {
1108 TrySendError::Full(t) => t,
1109 TrySendError::Closed(t) => t,
1110 }
1111 }
1112
1113 /// Returns `true` if the channel is full but not closed.
1114 pub fn is_full(&self) -> bool {
1115 match self {
1116 TrySendError::Full(_) => true,
1117 TrySendError::Closed(_) => false,
1118 }
1119 }
1120
1121 /// Returns `true` if the channel is closed.
1122 pub fn is_closed(&self) -> bool {
1123 match self {
1124 TrySendError::Full(_) => false,
1125 TrySendError::Closed(_) => true,
1126 }
1127 }
1128}
1129
1130#[cfg(feature = "std")]
1131impl<T> std::error::Error for TrySendError<T> {}
1132
1133impl<T> fmt::Debug for TrySendError<T> {
1134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1135 match *self {
1136 TrySendError::Full(..) => write!(f, "Full(..)"),
1137 TrySendError::Closed(..) => write!(f, "Closed(..)"),
1138 }
1139 }
1140}
1141
1142impl<T> fmt::Display for TrySendError<T> {
1143 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1144 match *self {
1145 TrySendError::Full(..) => write!(f, "sending into a full channel"),
1146 TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
1147 }
1148 }
1149}
1150
1151/// An error returned from [`Receiver::recv()`].
1152///
1153/// Received because the channel is empty and closed.
1154#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1155pub struct RecvError;
1156
1157#[cfg(feature = "std")]
1158impl std::error::Error for RecvError {}
1159
1160impl fmt::Display for RecvError {
1161 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1162 write!(f, "receiving from an empty and closed channel")
1163 }
1164}
1165
1166/// An error returned from [`Receiver::try_recv()`].
1167#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1168pub enum TryRecvError {
1169 /// The channel is empty but not closed.
1170 Empty,
1171
1172 /// The channel is empty and closed.
1173 Closed,
1174}
1175
1176impl TryRecvError {
1177 /// Returns `true` if the channel is empty but not closed.
1178 pub fn is_empty(&self) -> bool {
1179 match self {
1180 TryRecvError::Empty => true,
1181 TryRecvError::Closed => false,
1182 }
1183 }
1184
1185 /// Returns `true` if the channel is empty and closed.
1186 pub fn is_closed(&self) -> bool {
1187 match self {
1188 TryRecvError::Empty => false,
1189 TryRecvError::Closed => true,
1190 }
1191 }
1192}
1193
1194#[cfg(feature = "std")]
1195impl std::error::Error for TryRecvError {}
1196
1197impl fmt::Display for TryRecvError {
1198 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1199 match *self {
1200 TryRecvError::Empty => write!(f, "receiving from an empty channel"),
1201 TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
1202 }
1203 }
1204}
1205
1206easy_wrapper! {
1207 /// A future returned by [`Sender::send()`].
1208 #[derive(Debug)]
1209 #[must_use = "futures do nothing unless you `.await` or poll them"]
1210 pub struct Send<'a, T>(SendInner<'a, T> => Result<(), SendError<T>>);
1211 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1212 pub(crate) wait();
1213}
1214
1215pin_project! {
1216 #[derive(Debug)]
1217 #[project(!Unpin)]
1218 struct SendInner<'a, T> {
1219 // Reference to the original sender.
1220 sender: &'a Sender<T>,
1221
1222 // The message to send.
1223 msg: Option<T>,
1224
1225 // Listener waiting on the channel.
1226 listener: Option<EventListener>,
1227
1228 // Keeping this type `!Unpin` enables future optimizations.
1229 #[pin]
1230 _pin: PhantomPinned
1231 }
1232}
1233
1234impl<T> EventListenerFuture for SendInner<'_, T> {
1235 type Output = Result<(), SendError<T>>;
1236
1237 /// Run this future with the given `Strategy`.
1238 fn poll_with_strategy<'x, S: Strategy<'x>>(
1239 self: Pin<&mut Self>,
1240 strategy: &mut S,
1241 context: &mut S::Context,
1242 ) -> Poll<Result<(), SendError<T>>> {
1243 let this = self.project();
1244
1245 loop {
1246 let msg = this.msg.take().unwrap();
1247 // Attempt to send a message.
1248 match this.sender.try_send(msg) {
1249 Ok(()) => return Poll::Ready(Ok(())),
1250 Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
1251 Err(TrySendError::Full(m)) => *this.msg = Some(m),
1252 }
1253
1254 // Sending failed - now start listening for notifications or wait for one.
1255 if this.listener.is_some() {
1256 // Poll using the given strategy
1257 ready!(S::poll(strategy, &mut *this.listener, context));
1258 } else {
1259 *this.listener = Some(this.sender.channel.send_ops.listen());
1260 }
1261 }
1262 }
1263}
1264
1265easy_wrapper! {
1266 /// A future returned by [`Receiver::recv()`].
1267 #[derive(Debug)]
1268 #[must_use = "futures do nothing unless you `.await` or poll them"]
1269 pub struct Recv<'a, T>(RecvInner<'a, T> => Result<T, RecvError>);
1270 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1271 pub(crate) wait();
1272}
1273
1274pin_project! {
1275 #[derive(Debug)]
1276 #[project(!Unpin)]
1277 struct RecvInner<'a, T> {
1278 // Reference to the receiver.
1279 receiver: &'a Receiver<T>,
1280
1281 // Listener waiting on the channel.
1282 listener: Option<EventListener>,
1283
1284 // Keeping this type `!Unpin` enables future optimizations.
1285 #[pin]
1286 _pin: PhantomPinned
1287 }
1288}
1289
1290impl<T> EventListenerFuture for RecvInner<'_, T> {
1291 type Output = Result<T, RecvError>;
1292
1293 /// Run this future with the given `Strategy`.
1294 fn poll_with_strategy<'x, S: Strategy<'x>>(
1295 self: Pin<&mut Self>,
1296 strategy: &mut S,
1297 cx: &mut S::Context,
1298 ) -> Poll<Result<T, RecvError>> {
1299 let this = self.project();
1300
1301 loop {
1302 // Attempt to receive a message.
1303 match this.receiver.try_recv() {
1304 Ok(msg) => return Poll::Ready(Ok(msg)),
1305 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
1306 Err(TryRecvError::Empty) => {}
1307 }
1308
1309 // Receiving failed - now start listening for notifications or wait for one.
1310 if this.listener.is_some() {
1311 // Poll using the given strategy
1312 ready!(S::poll(strategy, &mut *this.listener, cx));
1313 } else {
1314 *this.listener = Some(this.receiver.channel.recv_ops.listen());
1315 }
1316 }
1317 }
1318}
1319
1320easy_wrapper! {
1321 /// A future returned by [`Sender::closed()`].
1322 #[derive(Debug)]
1323 #[must_use = "futures do nothing unless you `.await` or poll them"]
1324 pub struct Closed<'a, T>(ClosedInner<'a, T> => ());
1325 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1326 pub(crate) wait();
1327}
1328
1329pin_project! {
1330 #[derive(Debug)]
1331 #[project(!Unpin)]
1332 struct ClosedInner<'a, T> {
1333 // Reference to the sender.
1334 sender: &'a Sender<T>,
1335
1336 // Listener waiting on the channel.
1337 listener: Option<EventListener>,
1338
1339 // Keeping this type `!Unpin` enables future optimizations.
1340 #[pin]
1341 _pin: PhantomPinned
1342 }
1343}
1344
1345impl<'a, T> EventListenerFuture for ClosedInner<'a, T> {
1346 type Output = ();
1347
1348 /// Run this future with the given `Strategy`.
1349 fn poll_with_strategy<'x, S: Strategy<'x>>(
1350 self: Pin<&mut Self>,
1351 strategy: &mut S,
1352 cx: &mut S::Context,
1353 ) -> Poll<()> {
1354 let this = self.project();
1355
1356 loop {
1357 // Check if the channel is closed.
1358 if this.sender.is_closed() {
1359 return Poll::Ready(());
1360 }
1361
1362 // Not closed - now start listening for notifications or wait for one.
1363 if this.listener.is_some() {
1364 // Poll using the given strategy
1365 ready!(S::poll(strategy, &mut *this.listener, cx));
1366 } else {
1367 *this.listener = Some(this.sender.channel.closed_ops.listen());
1368 }
1369 }
1370 }
1371}
1372
1373#[cfg(feature = "std")]
1374use std::process::abort;
1375
1376#[cfg(not(feature = "std"))]
1377fn abort() -> ! {
1378 struct PanicOnDrop;
1379
1380 impl Drop for PanicOnDrop {
1381 fn drop(&mut self) {
1382 panic!("Panic while panicking to abort");
1383 }
1384 }
1385
1386 let _bomb = PanicOnDrop;
1387 panic!("Panic while panicking to abort")
1388}