Struct CallAll

struct CallAll<Svc, S> { ... }
where
    Svc: Service<<S as >::Item>,
    S: Stream

This is a Stream of responses resulting from calling the wrapped Service for each request received on the wrapped Stream.

# use std::task::{Poll, Context};
# use std::cell::Cell;
# use std::error::Error;
# use std::rc::Rc;
#
use futures::future::{ready, Ready};
use futures::StreamExt;
use futures::channel::mpsc;
use tower_service::Service;
use tower::util::ServiceExt;

// First, we need to have a Service to process our requests.
#[derive(Debug, Eq, PartialEq)]
struct FirstLetter;
impl Service<&'static str> for FirstLetter {
     type Response = &'static str;
     type Error = Box<dyn Error + Send + Sync>;
     type Future = Ready<Result<Self::Response, Self::Error>>;

     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
         Poll::Ready(Ok(()))
     }

     fn call(&mut self, req: &'static str) -> Self::Future {
         ready(Ok(&req[..1]))
     }
}

#[tokio::main]
async fn main() {
    // Next, we need a Stream of requests.
    let (mut reqs, rx) = mpsc::unbounded();
    // Note that we have to help Rust out here by telling it what error type to use.
    // Specifically, it has to be From<Service::Error> + From<Stream::Error>.
    let mut rsps = FirstLetter.call_all(rx);

    // Now, let's send a few requests and then check that we get the corresponding responses.
    reqs.unbounded_send("one").unwrap();
    reqs.unbounded_send("two").unwrap();
    reqs.unbounded_send("three").unwrap();
    drop(reqs);

    // We then loop over the response `Stream` that we get back from call_all.
    let mut i = 0usize;
    while let Some(rsp) = rsps.next().await {
        // Each response is a Result (we could also have used TryStream::try_next)
        match (i + 1, rsp.unwrap()) {
            (1, "o") |
            (2, "t") |
            (3, "t") => {}
            (n, i) => {
                unreachable!("{}. response was '{}'", n, i);
            }
        }
        i += 1;
    }

    // And at the end, we can get the Service back when there are no more requests.
    assert_eq!(rsps.into_inner(), FirstLetter);
}

Implementations

impl<Svc, S> CallAll<Svc, S>

fn new(service: Svc, stream: S) -> CallAll<Svc, S>

Create new CallAll combinator.

Each request yielded by stream is passed to svc, and the resulting responses are yielded in the same order by the implementation of Stream for CallAll.

fn into_inner(self: Self) -> Svc

Extract the wrapped Service.

Panics

Panics if take_service was already called.

fn take_service(self: Pin<&mut Self>) -> Svc

Extract the wrapped Service.

This CallAll can no longer be used after this function has been called.

Panics

Panics if take_service was already called.

fn unordered(self: Self) -> CallAllUnordered<Svc, S>

Return responses as they are ready, regardless of the initial order.

This function must be called before the stream is polled.

Panics

Panics if poll was called.

impl<'__pin, Svc, S> Unpin for CallAll<Svc, S>

impl<K, S, E, D> Discover for CallAll<Svc, S>

fn poll_discover(self: Pin<&mut D>, cx: &mut Context<'_>) -> Poll<Option<Result<<D as TryStream>::Ok, <D as TryStream>::Error>>>

impl<S> TryStreamExt for CallAll<Svc, S>

impl<S, T, E> TryStream for CallAll<Svc, S>

fn try_poll_next(self: Pin<&mut S>, cx: &mut Context<'_>) -> Poll<Option<Result<<S as TryStream>::Ok, <S as TryStream>::Error>>>

impl<Svc, S> Debug for CallAll<Svc, S>

fn fmt(self: &Self, f: &mut Formatter<'_>) -> Result

impl<Svc, S> Freeze for CallAll<Svc, S>

impl<Svc, S> RefUnwindSafe for CallAll<Svc, S>

impl<Svc, S> Send for CallAll<Svc, S>

impl<Svc, S> Stream for CallAll<Svc, S>

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<<Self as >::Item>>

impl<Svc, S> Sync for CallAll<Svc, S>

impl<Svc, S> UnsafeUnpin for CallAll<Svc, S>

impl<Svc, S> UnwindSafe for CallAll<Svc, S>

impl<T> Any for CallAll<Svc, S>

fn type_id(self: &Self) -> TypeId

impl<T> Borrow for CallAll<Svc, S>

fn borrow(self: &Self) -> &T

impl<T> BorrowMut for CallAll<Svc, S>

fn borrow_mut(self: &mut Self) -> &mut T

impl<T> From for CallAll<Svc, S>

fn from(t: T) -> T

Returns the argument unchanged.

impl<T> Instrument for CallAll<Svc, S>

impl<T> StreamExt for CallAll<Svc, S>

impl<T> WithSubscriber for CallAll<Svc, S>

impl<T, U> Into for CallAll<Svc, S>

fn into(self: Self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of [From]<T> for U chooses to do.

impl<T, U> TryFrom for CallAll<Svc, S>

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

impl<T, U> TryInto for CallAll<Svc, S>

fn try_into(self: Self) -> Result<U, <U as TryFrom<T>>::Error>