Struct AsyncFd
struct AsyncFd<T: AsRawFd> { ... }
Associates an IO object backed by a Unix file descriptor with the tokio
reactor, allowing for readiness to be polled. The file descriptor must be of
a type that can be used with the OS polling facilities (ie, poll, epoll,
kqueue, etc), such as a network socket or pipe, and the file descriptor
must have the nonblocking mode set to true.
Creating an AsyncFd registers the file descriptor with the current tokio
Reactor, allowing you to directly await the file descriptor being readable
or writable. Once registered, the file descriptor remains registered until
the AsyncFd is dropped.
The AsyncFd takes ownership of an arbitrary object to represent the IO
object. It is intended that the inner object will handle closing the file
descriptor when it is dropped, avoiding resource leaks and ensuring that the
AsyncFd can clean up the registration before closing the file descriptor.
The AsyncFd::into_inner function can be used to extract the inner object
to retake control from the tokio IO reactor. The OwnedFd type is often
used as the inner object, as it is the simplest type that closes the fd on
drop.
The inner object is required to implement AsRawFd. This file descriptor
must not change while AsyncFd owns the inner object, i.e. the
AsRawFd::as_raw_fd method on the inner type must always return the same
file descriptor when called multiple times. Failure to uphold this results
in unspecified behavior in the IO driver, which may include breaking
notifications for other sockets/etc.
Polling for readiness is done by calling the async functions readable
and writable. These functions complete when the associated readiness
condition is observed. Any number of tasks can query the same AsyncFd in
parallel, on the same or different conditions.
On some platforms, the readiness detecting mechanism relies on
edge-triggered notifications. This means that the OS will only notify Tokio
when the file descriptor transitions from not-ready to ready. For this to
work you should first try to read or write and only poll for readiness
if that fails with an error of std::io::ErrorKind::WouldBlock.
Tokio internally tracks when it has received a ready notification, and when
readiness checking functions like readable and writable are called,
if the readiness flag is set, these async functions will complete
immediately. This however does mean that it is critical to ensure that this
ready flag is cleared when (and only when) the file descriptor ceases to be
ready. The AsyncFdReadyGuard returned from readiness checking functions
serves this function; after calling a readiness-checking async function,
you must use this AsyncFdReadyGuard to signal to tokio whether the file
descriptor is no longer in a ready state.
Use with to a poll-based API
In some cases it may be desirable to use AsyncFd from APIs similar to
TcpStream::poll_read_ready. The AsyncFd::poll_read_ready and
AsyncFd::poll_write_ready functions are provided for this purpose.
Because these functions don't create a future to hold their state, they have
the limitation that only one task can wait on each direction (read or write)
at a time.
Examples
This example shows how to turn std::net::TcpStream asynchronous using
AsyncFd. It implements the read/write operations both as an async fn
and using the IO traits AsyncRead and AsyncWrite.
use std::io::{self, Read, Write};
use std::net::TcpStream;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::io::unix::AsyncFd;
pub struct AsyncTcpStream {
inner: AsyncFd<TcpStream>,
}
impl AsyncTcpStream {
pub fn new(tcp: TcpStream) -> io::Result<Self> {
tcp.set_nonblocking(true)?;
Ok(Self {
inner: AsyncFd::new(tcp)?,
})
}
pub async fn read(&self, out: &mut [u8]) -> io::Result<usize> {
loop {
let mut guard = self.inner.readable().await?;
match guard.try_io(|inner| inner.get_ref().read(out)) {
Ok(result) => return result,
Err(_would_block) => continue,
}
}
}
pub async fn write(&self, buf: &[u8]) -> io::Result<usize> {
loop {
let mut guard = self.inner.writable().await?;
match guard.try_io(|inner| inner.get_ref().write(buf)) {
Ok(result) => return result,
Err(_would_block) => continue,
}
}
}
}
impl AsyncRead for AsyncTcpStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>
) -> Poll<io::Result<()>> {
loop {
let mut guard = ready!(self.inner.poll_read_ready(cx))?;
let unfilled = buf.initialize_unfilled();
match guard.try_io(|inner| inner.get_ref().read(unfilled)) {
Ok(Ok(len)) => {
buf.advance(len);
return Poll::Ready(Ok(()));
},
Ok(Err(err)) => return Poll::Ready(Err(err)),
Err(_would_block) => continue,
}
}
}
}
impl AsyncWrite for AsyncTcpStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8]
) -> Poll<io::Result<usize>> {
loop {
let mut guard = ready!(self.inner.poll_write_ready(cx))?;
match guard.try_io(|inner| inner.get_ref().write(buf)) {
Ok(result) => return Poll::Ready(result),
Err(_would_block) => continue,
}
}
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
// tcp flush is a no-op
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
self.inner.get_ref().shutdown(std::net::Shutdown::Write)?;
Poll::Ready(Ok(()))
}
}
Implementations
impl<T: AsRawFd> AsyncFd<T>
fn new(inner: T) -> Result<Self> where T: AsRawFdCreates an
AsyncFdbacked by (and taking ownership of) an object implementingAsRawFd. The backing file descriptor is cached at the time of creation.Only configures the
Interest::READABLEandInterest::WRITABLEinterests. For more control, useAsyncFd::with_interest.This method must be called in the context of a tokio runtime.
Panics
This function panics if there is no current reactor set, or if the
rtfeature flag is not enabled.fn with_interest(inner: T, interest: Interest) -> Result<Self> where T: AsRawFdCreates an
AsyncFdbacked by (and taking ownership of) an object implementingAsRawFd, with a specificInterest. The backing file descriptor is cached at the time of creation.Panics
This function panics if there is no current reactor set, or if the
rtfeature flag is not enabled.fn try_new(inner: T) -> Result<Self, AsyncFdTryNewError<T>> where T: AsRawFdCreates an
AsyncFdbacked by (and taking ownership of) an object implementingAsRawFd. The backing file descriptor is cached at the time of creation.Only configures the
Interest::READABLEandInterest::WRITABLEinterests. For more control, useAsyncFd::try_with_interest.This method must be called in the context of a tokio runtime.
In the case of failure, it returns
AsyncFdTryNewErrorthat contains the original object passed to this function.Panics
This function panics if there is no current reactor set, or if the
rtfeature flag is not enabled.fn try_with_interest(inner: T, interest: Interest) -> Result<Self, AsyncFdTryNewError<T>> where T: AsRawFdCreates an
AsyncFdbacked by (and taking ownership of) an object implementingAsRawFd, with a specificInterest. The backing file descriptor is cached at the time of creation.In the case of failure, it returns
AsyncFdTryNewErrorthat contains the original object passed to this function.Panics
This function panics if there is no current reactor set, or if the
rtfeature flag is not enabled.fn get_ref(self: &Self) -> &TReturns a shared reference to the backing object of this
AsyncFd.fn get_mut(self: &mut Self) -> &mut TReturns a mutable reference to the backing object of this
AsyncFd.fn into_inner(self: Self) -> TDeregisters this file descriptor and returns ownership of the backing object.
fn poll_read_ready<'a>(self: &'a Self, cx: &mut Context<'_>) -> Poll<Result<AsyncFdReadyGuard<'a, T>>>Polls for read readiness.
If the file descriptor is not currently ready for reading, this method will store a clone of the
Wakerfrom the providedContext. When the file descriptor becomes ready for reading,Waker::wakewill be called.Note that on multiple calls to
poll_read_readyorpoll_read_ready_mut, only theWakerfrom theContextpassed to the most recent call is scheduled to receive a wakeup. (However,poll_write_readyretains a second, independent waker).This method is intended for cases where creating and pinning a future via
readableis not feasible. Where possible, usingreadableis preferred, as this supports polling from multiple tasks at once.This method takes
&self, so it is possible to call this method concurrently with other methods on this struct. This method only provides shared access to the inner IO resource when handling theAsyncFdReadyGuard.fn poll_read_ready_mut<'a>(self: &'a mut Self, cx: &mut Context<'_>) -> Poll<Result<AsyncFdReadyMutGuard<'a, T>>>Polls for read readiness.
If the file descriptor is not currently ready for reading, this method will store a clone of the
Wakerfrom the providedContext. When the file descriptor becomes ready for reading,Waker::wakewill be called.Note that on multiple calls to
poll_read_readyorpoll_read_ready_mut, only theWakerfrom theContextpassed to the most recent call is scheduled to receive a wakeup. (However,poll_write_readyretains a second, independent waker).This method is intended for cases where creating and pinning a future via
readableis not feasible. Where possible, usingreadableis preferred, as this supports polling from multiple tasks at once.This method takes
&mut self, so it is possible to access the inner IO resource mutably when handling theAsyncFdReadyMutGuard.fn poll_write_ready<'a>(self: &'a Self, cx: &mut Context<'_>) -> Poll<Result<AsyncFdReadyGuard<'a, T>>>Polls for write readiness.
If the file descriptor is not currently ready for writing, this method will store a clone of the
Wakerfrom the providedContext. When the file descriptor becomes ready for writing,Waker::wakewill be called.Note that on multiple calls to
poll_write_readyorpoll_write_ready_mut, only theWakerfrom theContextpassed to the most recent call is scheduled to receive a wakeup. (However,poll_read_readyretains a second, independent waker).This method is intended for cases where creating and pinning a future via
writableis not feasible. Where possible, usingwritableis preferred, as this supports polling from multiple tasks at once.This method takes
&self, so it is possible to call this method concurrently with other methods on this struct. This method only provides shared access to the inner IO resource when handling theAsyncFdReadyGuard.fn poll_write_ready_mut<'a>(self: &'a mut Self, cx: &mut Context<'_>) -> Poll<Result<AsyncFdReadyMutGuard<'a, T>>>Polls for write readiness.
If the file descriptor is not currently ready for writing, this method will store a clone of the
Wakerfrom the providedContext. When the file descriptor becomes ready for writing,Waker::wakewill be called.Note that on multiple calls to
poll_write_readyorpoll_write_ready_mut, only theWakerfrom theContextpassed to the most recent call is scheduled to receive a wakeup. (However,poll_read_readyretains a second, independent waker).This method is intended for cases where creating and pinning a future via
writableis not feasible. Where possible, usingwritableis preferred, as this supports polling from multiple tasks at once.This method takes
&mut self, so it is possible to access the inner IO resource mutably when handling theAsyncFdReadyMutGuard.async fn ready(self: &Self, interest: Interest) -> Result<AsyncFdReadyGuard<'_, T>>Waits for any of the requested ready states, returning a
AsyncFdReadyGuardthat must be dropped to resume polling for the requested ready states.The function may complete without the file descriptor being ready. This is a false-positive and attempting an operation will return with
io::ErrorKind::WouldBlock. The function can also return with an emptyReadyset, so you should always check the returned value and possibly wait again if the requested states are not set.When an IO operation does return
io::ErrorKind::WouldBlock, the readiness must be cleared. When a combined interest is used, it is important to clear only the readiness that is actually observed to block. For instance when the combined interestInterest::READABLE | Interest::WRITABLEis used, and a read blocks, only read readiness should be cleared using theAsyncFdReadyGuard::clear_ready_matchingmethod:guard.clear_ready_matching(Ready::READABLE). Also clearing the write readiness in this case would be incorrect. TheAsyncFdReadyGuard::clear_readymethod clears all readiness flags.This method takes
&self, so it is possible to call this method concurrently with other methods on this struct. This method only provides shared access to the inner IO resource when handling theAsyncFdReadyGuard.Examples
Concurrently read and write to a
std::net::TcpStreamon the same task without splitting.use std::error::Error; use std::io; use std::io::{Read, Write}; use std::net::TcpStream; use tokio::io::unix::AsyncFd; use tokio::io::{Interest, Ready}; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { let stream = TcpStream::connect("127.0.0.1:8080")?; stream.set_nonblocking(true)?; let stream = AsyncFd::new(stream)?; loop { let mut guard = stream .ready(Interest::READABLE | Interest::WRITABLE) .await?; if guard.ready().is_readable() { let mut data = vec![0; 1024]; // Try to read data, this may still fail with `WouldBlock` // if the readiness event is a false positive. match stream.get_ref().read(&mut data) { Ok(n) => { println!("read {} bytes", n); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { // a read has blocked, but a write might still succeed. // clear only the read readiness. guard.clear_ready_matching(Ready::READABLE); continue; } Err(e) => { return Err(e.into()); } } } if guard.ready().is_writable() { // Try to write data, this may still fail with `WouldBlock` // if the readiness event is a false positive. match stream.get_ref().write(b"hello world") { Ok(n) => { println!("write {} bytes", n); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { // a write has blocked, but a read might still succeed. // clear only the write readiness. guard.clear_ready_matching(Ready::WRITABLE); continue; } Err(e) => { return Err(e.into()); } } } } }async fn ready_mut(self: &mut Self, interest: Interest) -> Result<AsyncFdReadyMutGuard<'_, T>>Waits for any of the requested ready states, returning a
AsyncFdReadyMutGuardthat must be dropped to resume polling for the requested ready states.The function may complete without the file descriptor being ready. This is a false-positive and attempting an operation will return with
io::ErrorKind::WouldBlock. The function can also return with an emptyReadyset, so you should always check the returned value and possibly wait again if the requested states are not set.When an IO operation does return
io::ErrorKind::WouldBlock, the readiness must be cleared. When a combined interest is used, it is important to clear only the readiness that is actually observed to block. For instance when the combined interestInterest::READABLE | Interest::WRITABLEis used, and a read blocks, only read readiness should be cleared using theAsyncFdReadyMutGuard::clear_ready_matchingmethod:guard.clear_ready_matching(Ready::READABLE). Also clearing the write readiness in this case would be incorrect. TheAsyncFdReadyMutGuard::clear_readymethod clears all readiness flags.This method takes
&mut self, so it is possible to access the inner IO resource mutably when handling theAsyncFdReadyMutGuard.Examples
Concurrently read and write to a
std::net::TcpStreamon the same task without splitting.use std::error::Error; use std::io; use std::io::{Read, Write}; use std::net::TcpStream; use tokio::io::unix::AsyncFd; use tokio::io::{Interest, Ready}; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { let stream = TcpStream::connect("127.0.0.1:8080")?; stream.set_nonblocking(true)?; let mut stream = AsyncFd::new(stream)?; loop { let mut guard = stream .ready_mut(Interest::READABLE | Interest::WRITABLE) .await?; if guard.ready().is_readable() { let mut data = vec![0; 1024]; // Try to read data, this may still fail with `WouldBlock` // if the readiness event is a false positive. match guard.get_inner_mut().read(&mut data) { Ok(n) => { println!("read {} bytes", n); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { // a read has blocked, but a write might still succeed. // clear only the read readiness. guard.clear_ready_matching(Ready::READABLE); continue; } Err(e) => { return Err(e.into()); } } } if guard.ready().is_writable() { // Try to write data, this may still fail with `WouldBlock` // if the readiness event is a false positive. match guard.get_inner_mut().write(b"hello world") { Ok(n) => { println!("write {} bytes", n); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { // a write has blocked, but a read might still succeed. // clear only the write readiness. guard.clear_ready_matching(Ready::WRITABLE); continue; } Err(e) => { return Err(e.into()); } } } } }async fn readable<'a>(self: &'a Self) -> Result<AsyncFdReadyGuard<'a, T>>Waits for the file descriptor to become readable, returning a
AsyncFdReadyGuardthat must be dropped to resume read-readiness polling.This method takes
&self, so it is possible to call this method concurrently with other methods on this struct. This method only provides shared access to the inner IO resource when handling theAsyncFdReadyGuard.Cancel safety
This method is cancel safe. Once a readiness event occurs, the method will continue to return immediately until the readiness event is consumed by an attempt to read or write that fails with
WouldBlockorPoll::Pending.async fn readable_mut<'a>(self: &'a mut Self) -> Result<AsyncFdReadyMutGuard<'a, T>>Waits for the file descriptor to become readable, returning a
AsyncFdReadyMutGuardthat must be dropped to resume read-readiness polling.This method takes
&mut self, so it is possible to access the inner IO resource mutably when handling theAsyncFdReadyMutGuard.Cancel safety
This method is cancel safe. Once a readiness event occurs, the method will continue to return immediately until the readiness event is consumed by an attempt to read or write that fails with
WouldBlockorPoll::Pending.async fn writable<'a>(self: &'a Self) -> Result<AsyncFdReadyGuard<'a, T>>Waits for the file descriptor to become writable, returning a
AsyncFdReadyGuardthat must be dropped to resume write-readiness polling.This method takes
&self, so it is possible to call this method concurrently with other methods on this struct. This method only provides shared access to the inner IO resource when handling theAsyncFdReadyGuard.Cancel safety
This method is cancel safe. Once a readiness event occurs, the method will continue to return immediately until the readiness event is consumed by an attempt to read or write that fails with
WouldBlockorPoll::Pending.async fn writable_mut<'a>(self: &'a mut Self) -> Result<AsyncFdReadyMutGuard<'a, T>>Waits for the file descriptor to become writable, returning a
AsyncFdReadyMutGuardthat must be dropped to resume write-readiness polling.This method takes
&mut self, so it is possible to access the inner IO resource mutably when handling theAsyncFdReadyMutGuard.Cancel safety
This method is cancel safe. Once a readiness event occurs, the method will continue to return immediately until the readiness event is consumed by an attempt to read or write that fails with
WouldBlockorPoll::Pending.async fn async_io<R, impl FnMut(&T) -> io::Result<R>: FnMut(&T) -> io::Result<R>>(self: &Self, interest: Interest, f: impl FnMut(&T) -> Result<R>) -> Result<R>Reads or writes from the file descriptor using a user-provided IO operation.
The
async_iomethod is a convenience utility that waits for the file descriptor to become ready, and then executes the provided IO operation. Since file descriptors may be marked ready spuriously, the closure will be called repeatedly until it returns something other than aWouldBlockerror. This is done using the following loop:# use std::io::{self, Result}; # struct Dox<T> { inner: T } # impl<T> Dox<T> { # async fn writable(&self) -> Result<&Self> { # Ok(self) # } # fn try_io<R>(&self, _: impl FnMut(&T) -> Result<R>) -> Result<Result<R>> { # panic!() # } async fn async_io<R>(&self, mut f: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> { loop { // or `readable` if called with the read interest. let guard = self.writable().await?; match guard.try_io(&mut f) { Ok(result) => return result, Err(_would_block) => continue, } } } # }The closure should only return a
WouldBlockerror if it has performed an IO operation on the file descriptor that failed due to the file descriptor not being ready. Returning aWouldBlockerror in any other situation will incorrectly clear the readiness flag, which can cause the file descriptor to behave incorrectly.The closure should not perform the IO operation using any of the methods defined on the Tokio
AsyncFdtype, as this will mess with the readiness flag and can cause the file descriptor to behave incorrectly.This method is not intended to be used with combined interests. The closure should perform only one type of IO operation, so it should not require more than one ready state. This method may panic or sleep forever if it is called with a combined interest.
Examples
This example sends some bytes on the inner
std::net::UdpSocket. Theasync_iomethod waits for readiness, and retries if the send operation does block. This example is equivalent to the one given fortry_io.use tokio::io::{Interest, unix::AsyncFd}; use std::io; use std::net::UdpSocket; #[tokio::main] async fn main() -> io::Result<()> { let socket = UdpSocket::bind("0.0.0.0:8080")?; socket.set_nonblocking(true)?; let async_fd = AsyncFd::new(socket)?; let written = async_fd .async_io(Interest::WRITABLE, |inner| inner.send(&[1, 2])) .await?; println!("wrote {written} bytes"); Ok(()) }async fn async_io_mut<R, impl FnMut(&mut T) -> io::Result<R>: FnMut(&mut T) -> io::Result<R>>(self: &mut Self, interest: Interest, f: impl FnMut(&mut T) -> Result<R>) -> Result<R>Reads or writes from the file descriptor using a user-provided IO operation.
The behavior is the same as
async_io, except that the closure can mutate the inner value of theAsyncFd.fn try_io<R, impl FnOnce(&T) -> io::Result<R>: FnOnce(&T) -> io::Result<R>>(self: &Self, interest: Interest, f: impl FnOnce(&T) -> Result<R>) -> Result<R>Tries to read or write from the file descriptor using a user-provided IO operation.
If the file descriptor is ready, the provided closure is called. The closure should attempt to perform IO operation on the file descriptor by manually calling the appropriate syscall. If the operation fails because the file descriptor is not actually ready, then the closure should return a
WouldBlockerror and the readiness flag is cleared. The return value of the closure is then returned bytry_io.If the file descriptor is not ready, then the closure is not called and a
WouldBlockerror is returned.The closure should only return a
WouldBlockerror if it has performed an IO operation on the file descriptor that failed due to the file descriptor not being ready. Returning aWouldBlockerror in any other situation will incorrectly clear the readiness flag, which can cause the file descriptor to behave incorrectly.The closure should not perform the IO operation using any of the methods defined on the Tokio
AsyncFdtype, as this will mess with the readiness flag and can cause the file descriptor to behave incorrectly.This method is not intended to be used with combined interests. The closure should perform only one type of IO operation, so it should not require more than one ready state. This method may panic or sleep forever if it is called with a combined interest.
fn try_io_mut<R, impl FnOnce(&mut T) -> io::Result<R>: FnOnce(&mut T) -> io::Result<R>>(self: &mut Self, interest: Interest, f: impl FnOnce(&mut T) -> Result<R>) -> Result<R>Tries to read or write from the file descriptor using a user-provided IO operation.
The behavior is the same as
try_io, except that the closure can mutate the inner value of theAsyncFd.
impl<T> Any for AsyncFd<T>
fn type_id(self: &Self) -> TypeId
impl<T> Borrow for AsyncFd<T>
fn borrow(self: &Self) -> &T
impl<T> BorrowMut for AsyncFd<T>
fn borrow_mut(self: &mut Self) -> &mut T
impl<T> Freeze for AsyncFd<T>
impl<T> From for AsyncFd<T>
fn from(t: T) -> TReturns the argument unchanged.
impl<T> RefUnwindSafe for AsyncFd<T>
impl<T> Send for AsyncFd<T>
impl<T> Sync for AsyncFd<T>
impl<T> Unpin for AsyncFd<T>
impl<T> UnsafeUnpin for AsyncFd<T>
impl<T> UnwindSafe for AsyncFd<T>
impl<T, U> Into for AsyncFd<T>
fn into(self: Self) -> UCalls
U::from(self).That is, this conversion is whatever the implementation of
[From]<T> for Uchooses to do.
impl<T, U> TryFrom for AsyncFd<T>
fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>
impl<T, U> TryInto for AsyncFd<T>
fn try_into(self: Self) -> Result<U, <U as TryFrom<T>>::Error>
impl<T: AsRawFd> AsFd for AsyncFd<T>
fn as_fd(self: &Self) -> BorrowedFd<'_>
impl<T: AsRawFd> AsRawFd for AsyncFd<T>
fn as_raw_fd(self: &Self) -> RawFd
impl<T: AsRawFd> Drop for AsyncFd<T>
fn drop(self: &mut Self)
impl<T: std::fmt::Debug + AsRawFd> Debug for AsyncFd<T>
fn fmt(self: &Self, f: &mut Formatter<'_>) -> Result