tower/hedge/
select.rs

1use pin_project_lite::pin_project;
2use std::{
3    future::Future,
4    pin::Pin,
5    task::{Context, Poll},
6};
7use tower_service::Service;
8
9/// A policy which decides which requests can be cloned and sent to the B
10/// service.
11pub trait Policy<Request> {
12    fn clone_request(&self, req: &Request) -> Option<Request>;
13}
14
15/// Select is a middleware which attempts to clone the request and sends the
16/// original request to the A service and, if the request was able to be cloned,
17/// the cloned request to the B service.  Both resulting futures will be polled
18/// and whichever future completes first will be used as the result.
19#[derive(Debug)]
20pub struct Select<P, A, B> {
21    policy: P,
22    a: A,
23    b: B,
24}
25
26pin_project! {
27    #[derive(Debug)]
28    pub struct ResponseFuture<AF, BF> {
29        #[pin]
30        a_fut: AF,
31        #[pin]
32        b_fut: Option<BF>,
33    }
34}
35
36impl<P, A, B> Select<P, A, B> {
37    pub const fn new<Request>(policy: P, a: A, b: B) -> Self
38    where
39        P: Policy<Request>,
40        A: Service<Request>,
41        A::Error: Into<crate::BoxError>,
42        B: Service<Request, Response = A::Response>,
43        B::Error: Into<crate::BoxError>,
44    {
45        Select { policy, a, b }
46    }
47}
48
49impl<P, A, B, Request> Service<Request> for Select<P, A, B>
50where
51    P: Policy<Request>,
52    A: Service<Request>,
53    A::Error: Into<crate::BoxError>,
54    B: Service<Request, Response = A::Response>,
55    B::Error: Into<crate::BoxError>,
56{
57    type Response = A::Response;
58    type Error = crate::BoxError;
59    type Future = ResponseFuture<A::Future, B::Future>;
60
61    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
62        match (self.a.poll_ready(cx), self.b.poll_ready(cx)) {
63            (Poll::Ready(Ok(())), Poll::Ready(Ok(()))) => Poll::Ready(Ok(())),
64            (Poll::Ready(Err(e)), _) => Poll::Ready(Err(e.into())),
65            (_, Poll::Ready(Err(e))) => Poll::Ready(Err(e.into())),
66            _ => Poll::Pending,
67        }
68    }
69
70    fn call(&mut self, request: Request) -> Self::Future {
71        let b_fut = if let Some(cloned_req) = self.policy.clone_request(&request) {
72            Some(self.b.call(cloned_req))
73        } else {
74            None
75        };
76        ResponseFuture {
77            a_fut: self.a.call(request),
78            b_fut,
79        }
80    }
81}
82
83impl<AF, BF, T, AE, BE> Future for ResponseFuture<AF, BF>
84where
85    AF: Future<Output = Result<T, AE>>,
86    AE: Into<crate::BoxError>,
87    BF: Future<Output = Result<T, BE>>,
88    BE: Into<crate::BoxError>,
89{
90    type Output = Result<T, crate::BoxError>;
91
92    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
93        let this = self.project();
94
95        if let Poll::Ready(r) = this.a_fut.poll(cx) {
96            return Poll::Ready(Ok(r.map_err(Into::into)?));
97        }
98        if let Some(b_fut) = this.b_fut.as_pin_mut() {
99            if let Poll::Ready(r) = b_fut.poll(cx) {
100                return Poll::Ready(Ok(r.map_err(Into::into)?));
101            }
102        }
103        Poll::Pending
104    }
105}