Struct Semaphore
struct Semaphore { ... }
Counting semaphore performing asynchronous permit acquisition.
A semaphore maintains a set of permits. Permits are used to synchronize access to a shared resource. A semaphore differs from a mutex in that it can allow more than one concurrent caller to access the shared resource at a time.
When acquire is called and the semaphore has remaining permits, the
function immediately returns a permit. However, if no remaining permits are
available, acquire (asynchronously) waits until an outstanding permit is
dropped. At this point, the freed permit is assigned to the caller.
This Semaphore is fair, which means that permits are given out in the order
they were requested. This fairness is also applied when acquire_many gets
involved, so if a call to acquire_many at the front of the queue requests
more permits than currently available, this can prevent a call to acquire
from completing, even if the semaphore has enough permits complete the call
to acquire.
To use the Semaphore in a poll function, you can use the PollSemaphore
utility.
Examples
Basic usage:
use ;
async
Limit the number of simultaneously opened files in your program
Most operating systems have limits on the number of open file handles. Even in systems without explicit limits, resource constraints implicitly set an upper bound on the number of open files. If your program attempts to open a large number of files and exceeds this limit, it will result in an error.
This example uses a Semaphore with 100 permits. By acquiring a permit from the Semaphore before accessing a file, you ensure that your program opens no more than 100 files at a time. When trying to open the 101st file, the program will wait until a permit becomes available before proceeding to open another file.
use Result;
use File;
use Semaphore;
use AsyncWriteExt;
static PERMITS: Semaphore = const_new;
async
Limit the number of outgoing requests being sent at the same time
In some scenarios, it might be required to limit the number of outgoing requests being sent in parallel. This could be due to limits of a consumed API or the network resources of the system the application is running on.
This example uses an Arc<Semaphore> with 10 permits. Each task spawned is
given a reference to the semaphore by cloning the Arc<Semaphore>. Before
a task sends a request, it must acquire a permit from the semaphore by
calling Semaphore::acquire. This ensures that at most 10 requests are
sent in parallel at any given time. After a task has sent a request, it
drops the permit to allow other tasks to send requests.
use Arc;
use Semaphore;
async
# async
Limit the number of incoming requests being handled at the same time
Similar to limiting the number of simultaneously opened files, network handles are a limited resource. Allowing an unbounded amount of requests to be processed could result in a denial-of-service, among many other issues.
This example uses an Arc<Semaphore> instead of a global variable.
To limit the number of requests that can be processed at the time,
we acquire a permit for each task before spawning it. Once acquired,
a new task is spawned; and once finished, the permit is dropped inside
of the task to allow others to spawn. Permits must be acquired via
Semaphore::acquire_owned to be movable across the task boundary.
(Since our semaphore is not a global variable — if it was, then acquire would be enough.)
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> std::io::Result<()> {
let semaphore = Arc::new(Semaphore::new(3));
let listener = TcpListener::bind("127.0.0.1:8080").await?;
loop {
// Acquire permit before accepting the next socket.
//
// We use `acquire_owned` so that we can move `permit` into
// other tasks.
let permit = semaphore.clone().acquire_owned().await.unwrap();
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
// Do work using the socket.
handle_connection(&mut socket).await;
// Drop socket while the permit is still live.
drop(socket);
// Drop the permit, so more tasks can be created.
drop(permit);
});
}
}
# async fn handle_connection(_socket: &mut tokio::net::TcpStream) {
# // Do work
# }
Prevent tests from running in parallel
By default, Rust runs tests in the same file in parallel. However, in some cases, running two tests in parallel may lead to problems. For example, this can happen when tests use the same database.
Consider the following scenario:
test_insert: Inserts a key-value pair into the database, then retrieves the value using the same key to verify the insertion.test_update: Inserts a key, then updates the key to a new value and verifies that the value has been accurately updated.test_others: A third test that doesn't modify the database state. It can run in parallel with the other tests.
In this example, test_insert and test_update need to run in sequence to
work, but it doesn't matter which test runs first. We can leverage a
semaphore with a single permit to address this challenge.
# use Mutex;
# use BTreeMap;
#
#
use Semaphore;
// Initialize a static semaphore with only one permit, which is used to
// prevent test_insert and test_update from running in parallel.
static PERMIT: Semaphore = const_new;
// Initialize the database that will be used by the subsequent tests.
static DB: Database = setup;
# async
async
# async
async
# async
async
#
# async
Rate limiting using a token bucket
This example showcases the add_permits and SemaphorePermit::forget methods.
Many applications and systems have constraints on the rate at which certain operations should occur. Exceeding this rate can result in suboptimal performance or even errors.
This example implements rate limiting using a token bucket. A token bucket is a form of rate limiting that doesn't kick in immediately, to allow for short bursts of incoming requests that arrive at the same time.
With a token bucket, each incoming request consumes a token, and the tokens are refilled at a certain rate that defines the rate limit. When a burst of requests arrives, tokens are immediately given out until the bucket is empty. Once the bucket is empty, requests will have to wait for new tokens to be added.
Unlike the example that limits how many requests can be handled at the same time, we do not add tokens back when we finish handling a request. Instead, tokens are added only by a timer task.
Note that this implementation is suboptimal when the duration is small, because it consumes a lot of cpu constantly looping and sleeping.
use Arc;
use Semaphore;
use ;
# async
#
async
Implementations
impl Semaphore
fn new(permits: usize) -> SelfCreates a new semaphore with the initial number of permits.
Panics if
permitsexceedsSemaphore::MAX_PERMITS.const fn const_new(permits: usize) -> SelfCreates a new semaphore with the initial number of permits.
When using the
tracingunstable feature, aSemaphorecreated withconst_newwill not be instrumented. As such, it will not be visible intokio-console. Instead,Semaphore::newshould be used to create an instrumented object if that is needed.Examples
use Semaphore; static SEM: Semaphore = const_new;fn available_permits(self: &Self) -> usizeReturns the current number of available permits.
fn add_permits(self: &Self, n: usize)Adds
nnew permits to the semaphore.The maximum number of permits is
Semaphore::MAX_PERMITS, and this function will panic if the limit is exceeded.fn forget_permits(self: &Self, n: usize) -> usizeDecrease a semaphore's permits by a maximum of
n.If there are insufficient permits and it's not possible to reduce by
n, return the number of permits that were actually reduced.async fn acquire(self: &Self) -> Result<SemaphorePermit<'_>, AcquireError>Acquires a permit from the semaphore.
If the semaphore has been closed, this returns an
AcquireError. Otherwise, this returns aSemaphorePermitrepresenting the acquired permit.Cancel safety
This method uses a queue to fairly distribute permits in the order they were requested. Cancelling a call to
acquiremakes you lose your place in the queue.Examples
use Semaphore; asyncasync fn acquire_many(self: &Self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError>Acquires
npermits from the semaphore.If the semaphore has been closed, this returns an
AcquireError. Otherwise, this returns aSemaphorePermitrepresenting the acquired permits.Cancel safety
This method uses a queue to fairly distribute permits in the order they were requested. Cancelling a call to
acquire_manymakes you lose your place in the queue.Examples
use Semaphore; asyncfn try_acquire(self: &Self) -> Result<SemaphorePermit<'_>, TryAcquireError>Tries to acquire a permit from the semaphore.
If the semaphore has been closed, this returns a
TryAcquireError::Closedand aTryAcquireError::NoPermitsif there are no permits left. Otherwise, this returns aSemaphorePermitrepresenting the acquired permits.Examples
use ; #fn try_acquire_many(self: &Self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError>Tries to acquire
npermits from the semaphore.If the semaphore has been closed, this returns a
TryAcquireError::Closedand aTryAcquireError::NoPermitsif there are not enough permits left. Otherwise, this returns aSemaphorePermitrepresenting the acquired permits.Examples
use ; #async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError>Acquires a permit from the semaphore.
The semaphore must be wrapped in an
Arcto call this method. If the semaphore has been closed, this returns anAcquireError. Otherwise, this returns aOwnedSemaphorePermitrepresenting the acquired permit.Cancel safety
This method uses a queue to fairly distribute permits in the order they were requested. Cancelling a call to
acquire_ownedmakes you lose your place in the queue.Examples
use Arc; use Semaphore; asyncasync fn acquire_many_owned(self: Arc<Self>, n: u32) -> Result<OwnedSemaphorePermit, AcquireError>Acquires
npermits from the semaphore.The semaphore must be wrapped in an
Arcto call this method. If the semaphore has been closed, this returns anAcquireError. Otherwise, this returns aOwnedSemaphorePermitrepresenting the acquired permit.Cancel safety
This method uses a queue to fairly distribute permits in the order they were requested. Cancelling a call to
acquire_many_ownedmakes you lose your place in the queue.Examples
use Arc; use Semaphore; asyncfn try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError>Tries to acquire a permit from the semaphore.
The semaphore must be wrapped in an
Arcto call this method. If the semaphore has been closed, this returns aTryAcquireError::Closedand aTryAcquireError::NoPermitsif there are no permits left. Otherwise, this returns aOwnedSemaphorePermitrepresenting the acquired permit.Examples
use Arc; use ; #fn try_acquire_many_owned(self: Arc<Self>, n: u32) -> Result<OwnedSemaphorePermit, TryAcquireError>Tries to acquire
npermits from the semaphore.The semaphore must be wrapped in an
Arcto call this method. If the semaphore has been closed, this returns aTryAcquireError::Closedand aTryAcquireError::NoPermitsif there are no permits left. Otherwise, this returns aOwnedSemaphorePermitrepresenting the acquired permit.Examples
use Arc; use ; #fn close(self: &Self)Closes the semaphore.
This prevents the semaphore from issuing new permits and notifies all pending waiters.
Examples
use Semaphore; use Arc; use TryAcquireError; asyncfn is_closed(self: &Self) -> boolReturns true if the semaphore is closed
impl Debug for Semaphore
fn fmt(self: &Self, f: &mut Formatter<'_>) -> Result
impl Freeze for Semaphore
impl RefUnwindSafe for Semaphore
impl Send for Semaphore
impl Sync for Semaphore
impl Unpin for Semaphore
impl UnsafeUnpin for Semaphore
impl UnwindSafe for Semaphore
impl<T> Any for Semaphore
fn type_id(self: &Self) -> TypeId
impl<T> Borrow for Semaphore
fn borrow(self: &Self) -> &T
impl<T> BorrowMut for Semaphore
fn borrow_mut(self: &mut Self) -> &mut T
impl<T> From for Semaphore
fn from(t: T) -> TReturns the argument unchanged.
impl<T, U> Into for Semaphore
fn into(self: Self) -> UCalls
U::from(self).That is, this conversion is whatever the implementation of
[From]<T> for Uchooses to do.
impl<T, U> TryFrom for Semaphore
fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>
impl<T, U> TryInto for Semaphore
fn try_into(self: Self) -> Result<U, <U as TryFrom<T>>::Error>