Struct Notify

struct Notify { ... }

Notifies a single task to wake up.

Notify provides a basic mechanism to notify a single task of an event. Notify itself does not carry any data. Instead, it is to be used to signal another task to perform an operation.

A Notify can be thought of as a Semaphore starting with 0 permits. The notified().await method waits for a permit to become available, and notify_one() sets a permit if there currently are no available permits.

The synchronization details of Notify are similar to thread::park and Thread::unpark from std. A Notify value contains a single permit. notified().await waits for the permit to be made available, consumes the permit, and resumes. notify_one() sets the permit, waking a pending task if there is one.

If notify_one() is called before notified().await, then the next call to notified().await will complete immediately, consuming the permit. Any subsequent calls to notified().await will wait for a new permit.

If notify_one() is called multiple times before notified().await, only a single permit is stored. The next call to notified().await will complete immediately, but the one after will wait for a new permit.

Examples

Basic usage.

use tokio::sync::Notify;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notify2 = notify.clone();

    let handle = tokio::spawn(async move {
        notify2.notified().await;
        println!("received notification");
    });

    println!("sending notification");
    notify.notify_one();

    // Wait for task to receive notification.
    handle.await.unwrap();
}

Unbound multi-producer single-consumer (mpsc) channel.

No wakeups can be lost when using this channel because the call to notify_one() will store a permit in the Notify, which the following call to notified() will consume.

use tokio::sync::Notify;

use std::collections::VecDeque;
use std::sync::Mutex;

struct Channel<T> {
    values: Mutex<VecDeque<T>>,
    notify: Notify,
}

impl<T> Channel<T> {
    pub fn send(&self, value: T) {
        self.values.lock().unwrap()
            .push_back(value);

        // Notify the consumer a value is available
        self.notify.notify_one();
    }

    // This is a single-consumer channel, so several concurrent calls to
    // `recv` are not allowed.
    pub async fn recv(&self) -> T {
        loop {
            // Drain values
            if let Some(value) = self.values.lock().unwrap().pop_front() {
                return value;
            }

            // Wait for values to be available
            self.notify.notified().await;
        }
    }
}

Unbound multi-producer multi-consumer (mpmc) channel.

The call to enable is important because otherwise if you have two calls to recv and two calls to send in parallel, the following could happen:

  1. Both calls to try_recv return None.
  2. Both new elements are added to the vector.
  3. The notify_one method is called twice, adding only a single permit to the Notify.
  4. Both calls to recv reach the Notified future. One of them consumes the permit, and the other sleeps forever.

By adding the Notified futures to the list by calling enable before try_recv, the notify_one calls in step three would remove the futures from the list and mark them notified instead of adding a permit to the Notify. This ensures that both futures are woken.

Notice that this failure can only happen if there are two concurrent calls to recv. This is why the mpsc example above does not require a call to enable.

use tokio::sync::Notify;

use std::collections::VecDeque;
use std::sync::Mutex;

struct Channel<T> {
    messages: Mutex<VecDeque<T>>,
    notify_on_sent: Notify,
}

impl<T> Channel<T> {
    pub fn send(&self, msg: T) {
        let mut locked_queue = self.messages.lock().unwrap();
        locked_queue.push_back(msg);
        drop(locked_queue);

        // Send a notification to one of the calls currently
        // waiting in a call to `recv`.
        self.notify_on_sent.notify_one();
    }

    pub fn try_recv(&self) -> Option<T> {
        let mut locked_queue = self.messages.lock().unwrap();
        locked_queue.pop_front()
    }

    pub async fn recv(&self) -> T {
        let future = self.notify_on_sent.notified();
        tokio::pin!(future);

        loop {
            // Make sure that no wakeup is lost if we get
            // `None` from `try_recv`.
            future.as_mut().enable();

            if let Some(msg) = self.try_recv() {
                return msg;
            }

            // Wait for a call to `notify_one`.
            //
            // This uses `.as_mut()` to avoid consuming the future,
            // which lets us call `Pin::set` below.
            future.as_mut().await;

            // Reset the future in case another call to
            // `try_recv` got the message before us.
            future.set(self.notify_on_sent.notified());
        }
    }
}

Implementations

impl Notify

fn new() -> Notify

Create a new Notify, initialized without a permit.

Examples

use tokio::sync::Notify;

let notify = Notify::new();
const fn const_new() -> Notify

Create a new Notify, initialized without a permit.

When using the tracing unstable feature, a Notify created with const_new will not be instrumented. As such, it will not be visible in tokio-console. Instead, Notify::new should be used to create an instrumented object if that is needed.

Examples

use tokio::sync::Notify;

static NOTIFY: Notify = Notify::const_new();
fn notified(self: &Self) -> Notified<'_>

Wait for a notification.

Equivalent to:

async fn notified(&self);

Each Notify value holds a single permit. If a permit is available from an earlier call to notify_one(), then notified().await will complete immediately, consuming that permit. Otherwise, notified().await waits for a permit to be made available by the next call to notify_one().

The Notified future is not guaranteed to receive wakeups from calls to notify_one() if it has not yet been polled. See the documentation for Notified::enable() for more details.

The Notified future is guaranteed to receive wakeups from notify_waiters() as soon as it has been created, even if it has not yet been polled.

Cancel safety

This method uses a queue to fairly distribute notifications in the order they were requested. Cancelling a call to notified makes you lose your place in the queue.

Examples

use tokio::sync::Notify;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notify2 = notify.clone();

    tokio::spawn(async move {
        notify2.notified().await;
        println!("received notification");
    });

    println!("sending notification");
    notify.notify_one();
}
fn notify_one(self: &Self)

Notifies the first waiting task.

If a task is currently waiting, that task is notified. Otherwise, a permit is stored in this Notify value and the next call to notified().await will complete immediately consuming the permit made available by this call to notify_one().

At most one permit may be stored by Notify. Many sequential calls to notify_one will result in a single permit being stored. The next call to notified().await will complete immediately, but the one after that will wait.

Examples

use tokio::sync::Notify;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notify2 = notify.clone();

    tokio::spawn(async move {
        notify2.notified().await;
        println!("received notification");
    });

    println!("sending notification");
    notify.notify_one();
}
fn notify_last(self: &Self)

Notifies the last waiting task.

This function behaves similar to notify_one. The only difference is that it wakes the most recently added waiter instead of the oldest waiter.

Check the notify_one() documentation for more info and examples.

fn notify_waiters(self: &Self)

Notifies all waiting tasks.

If a task is currently waiting, that task is notified. Unlike with notify_one(), no permit is stored to be used by the next call to notified().await. The purpose of this method is to notify all already registered waiters. Registering for notification is done by acquiring an instance of the Notified future via calling notified().

Examples

use tokio::sync::Notify;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let notify = Arc::new(Notify::new());
    let notify2 = notify.clone();

    let notified1 = notify.notified();
    let notified2 = notify.notified();

    let handle = tokio::spawn(async move {
        println!("sending notifications");
        notify2.notify_waiters();
    });

    notified1.await;
    notified2.await;
    println!("received notifications");
}

impl Debug for Notify

fn fmt(self: &Self, f: &mut Formatter<'_>) -> Result

impl Default for Notify

fn default() -> Notify

impl Freeze for Notify

impl RefUnwindSafe for Notify

impl Send for Notify

impl Sync for Notify

impl Unpin for Notify

impl UnsafeUnpin for Notify

impl UnwindSafe for Notify

impl<T> Any for Notify

fn type_id(self: &Self) -> TypeId

impl<T> Borrow for Notify

fn borrow(self: &Self) -> &T

impl<T> BorrowMut for Notify

fn borrow_mut(self: &mut Self) -> &mut T

impl<T> From for Notify

fn from(t: T) -> T

Returns the argument unchanged.

impl<T, U> Into for Notify

fn into(self: Self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of [From]<T> for U chooses to do.

impl<T, U> TryFrom for Notify

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

impl<T, U> TryInto for Notify

fn try_into(self: Self) -> Result<U, <U as TryFrom<T>>::Error>