Module codec
Adaptors from AsyncRead/AsyncWrite to Stream/Sink
Raw I/O objects work with byte sequences, but higher-level code usually wants to batch these into meaningful chunks, called "frames".
This module contains adapters to go from streams of bytes, AsyncRead and
AsyncWrite, to framed streams implementing Sink and Stream.
Framed streams are also known as transports.
Example encoding using LinesCodec
The following example demonstrates how to use a codec such as LinesCodec to
write framed data. FramedWrite can be used to achieve this. Data sent to
FramedWrite are first framed according to a specific codec, and then sent to
an implementor of AsyncWrite.
use SinkExt;
use LinesCodec;
use FramedWrite;
async
Example decoding using LinesCodec
The following example demonstrates how to use a codec such as LinesCodec to
read a stream of framed data. FramedRead can be used to achieve this. FramedRead
will keep reading from an AsyncRead implementor until a whole frame, according to a codec,
can be parsed.
use StreamExt;
use LinesCodec;
use FramedRead;
async
The Decoder trait
A Decoder is used together with FramedRead or Framed to turn an
AsyncRead into a Stream. The job of the decoder trait is to specify
how sequences of bytes are turned into a sequence of frames, and to
determine where the boundaries between frames are. The job of the
FramedRead is to repeatedly switch between reading more data from the IO
resource, and asking the decoder whether we have received enough data to
decode another frame of data.
The main method on the Decoder trait is the decode method. This method
takes as argument the data that has been read so far, and when it is called,
it will be in one of the following situations:
- The buffer contains less than a full frame.
- The buffer contains exactly a full frame.
- The buffer contains more than a full frame.
In the first situation, the decoder should return Ok(None).
In the second situation, the decoder should clear the provided buffer and
return Ok(Some(the_decoded_frame)).
In the third situation, the decoder should use a method such as split_to
or advance to modify the buffer such that the frame is removed from the
buffer, but any data in the buffer after that frame should still remain in
the buffer. The decoder should also return Ok(Some(the_decoded_frame)) in
this case.
Finally the decoder may return an error if the data is invalid in some way. The decoder should not return an error just because it has yet to receive a full frame.
It is guaranteed that, from one call to decode to another, the provided
buffer will contain the exact same data as before, except that if more data
has arrived through the IO resource, that data will have been appended to
the buffer. This means that reading frames from a FramedRead is
essentially equivalent to the following loop:
use tokio::io::AsyncReadExt;
# // This uses async_stream to create an example that compiles.
# fn foo() -> impl futures_core::Stream<Item = std::io::Result<bytes::BytesMut>> { async_stream::try_stream! {
# use tokio_util::codec::Decoder;
# let mut decoder = tokio_util::codec::BytesCodec::new();
# let io_resource = &mut &[0u8, 1, 2, 3][..];
let mut buf = bytes::BytesMut::new();
loop {
// The read_buf call will append to buf rather than overwrite existing data.
let len = io_resource.read_buf(&mut buf).await?;
if len == 0 {
while let Some(frame) = decoder.decode_eof(&mut buf)? {
yield frame;
}
break;
}
while let Some(frame) = decoder.decode(&mut buf)? {
yield frame;
}
}
# }}
The example above uses yield whenever the Stream produces an item.
Example decoder
As an example, consider a protocol that can be used to send strings where each frame is a four byte integer that contains the length of the frame, followed by that many bytes of string data. The decoder fails with an error if the string data is not valid utf-8 or too long.
Such a decoder can be written like this:
use Decoder;
use ;
const MAX: usize = 8 * 1024 * 1024;
The Encoder trait
An Encoder is used together with FramedWrite or Framed to turn
an AsyncWrite into a Sink. The job of the encoder trait is to
specify how frames are turned into a sequences of bytes. The job of the
FramedWrite is to take the resulting sequence of bytes and write it to the
IO resource.
The main method on the Encoder trait is the encode method. This method
takes an item that is being written, and a buffer to write the item to. The
buffer may already contain data, and in this case, the encoder should append
the new frame to the buffer rather than overwrite the existing data.
It is guaranteed that, from one call to encode to another, the provided
buffer will contain the exact same data as before, except that some of the
data may have been removed from the front of the buffer. Writing to a
FramedWrite is essentially equivalent to the following loop:
use tokio::io::AsyncWriteExt;
use bytes::Buf; // for advance
# use tokio_util::codec::Encoder;
# async fn next_frame() -> bytes::Bytes { bytes::Bytes::new() }
# async fn no_more_frames() { }
# #[tokio::main] async fn main() -> std::io::Result<()> {
# let mut io_resource = tokio::io::sink();
# let mut encoder = tokio_util::codec::BytesCodec::new();
const MAX: usize = 8192;
let mut buf = bytes::BytesMut::new();
loop {
tokio::select! {
num_written = io_resource.write(&buf), if !buf.is_empty() => {
buf.advance(num_written?);
},
frame = next_frame(), if buf.len() < MAX => {
encoder.encode(frame, &mut buf)?;
},
_ = no_more_frames() => {
io_resource.write_all(&buf).await?;
io_resource.shutdown().await?;
return Ok(());
},
}
}
# }
Here the next_frame method corresponds to any frames you write to the
FramedWrite. The no_more_frames method corresponds to closing the
FramedWrite with SinkExt::close.
Example encoder
As an example, consider a protocol that can be used to send strings where each frame is a four byte integer that contains the length of the frame, followed by that many bytes of string data. The encoder will fail if the string is too long.
Such an encoder can be written like this:
use Encoder;
use BytesMut;
const MAX: usize = 8 * 1024 * 1024;
Modules
- length_delimited Frame a stream of bytes based on a length prefix
Structs
-
AnyDelimiterCodec
A simple
DecoderandEncoderimplementation that splits up data into chunks based on any character in the given delimiter string. -
BytesCodec
A simple
DecoderandEncoderimplementation that just ships bytes around. -
Framed
A unified
StreamandSinkinterface to an underlying I/O object, using theEncoderandDecodertraits to encode and decode frames. -
FramedParts
FramedPartscontains an export of the data of a Framed transport. It can be used to construct a newFramedwith a different codec. It contains all current buffers and the inner transport. -
FramedRead
A
Streamof messages decoded from anAsyncRead. -
FramedWrite
A
Sinkof frames encoded to anAsyncWrite. - LengthDelimitedCodec A codec for frames delimited by a frame head specifying their lengths.
- LengthDelimitedCodecError An error when the number of bytes read is more than max frame length.
-
LinesCodec
A simple
DecoderandEncoderimplementation that splits up data into lines.
Enums
- AnyDelimiterCodecError An error occurred while encoding or decoding a chunk.
- LinesCodecError An error occurred while encoding or decoding a line.
Traits
- Decoder Decoding of frames via buffers.
-
Encoder
Trait of helper objects to write out messages as bytes, for use with
FramedWrite.