tower/hedge/
delay.rs

1use futures_util::ready;
2use pin_project_lite::pin_project;
3use std::time::Duration;
4use std::{
5    future::Future,
6    pin::Pin,
7    task::{Context, Poll},
8};
9use tower_service::Service;
10
11use crate::util::Oneshot;
12
13/// A policy which specifies how long each request should be delayed for.
14pub trait Policy<Request> {
15    fn delay(&self, req: &Request) -> Duration;
16}
17
18/// A middleware which delays sending the request to the underlying service
19/// for an amount of time specified by the policy.
20#[derive(Debug)]
21pub struct Delay<P, S> {
22    policy: P,
23    service: S,
24}
25
26pin_project! {
27    #[derive(Debug)]
28    pub struct ResponseFuture<Request, S>
29    where
30        S: Service<Request>,
31    {
32        service: Option<S>,
33        #[pin]
34        state: State<Request, Oneshot<S, Request>>,
35    }
36}
37
38pin_project! {
39    #[project = StateProj]
40    #[derive(Debug)]
41    enum State<Request, F> {
42        Delaying {
43            #[pin]
44            delay: tokio::time::Sleep,
45            req: Option<Request>,
46        },
47        Called {
48            #[pin]
49            fut: F,
50        },
51    }
52}
53
54impl<Request, F> State<Request, F> {
55    fn delaying(delay: tokio::time::Sleep, req: Option<Request>) -> Self {
56        Self::Delaying { delay, req }
57    }
58
59    fn called(fut: F) -> Self {
60        Self::Called { fut }
61    }
62}
63
64impl<P, S> Delay<P, S> {
65    pub const fn new<Request>(policy: P, service: S) -> Self
66    where
67        P: Policy<Request>,
68        S: Service<Request> + Clone,
69        S::Error: Into<crate::BoxError>,
70    {
71        Delay { policy, service }
72    }
73}
74
75impl<Request, P, S> Service<Request> for Delay<P, S>
76where
77    P: Policy<Request>,
78    S: Service<Request> + Clone,
79    S::Error: Into<crate::BoxError>,
80{
81    type Response = S::Response;
82    type Error = crate::BoxError;
83    type Future = ResponseFuture<Request, S>;
84
85    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86        // Calling self.service.poll_ready would reserve a slot for the delayed request,
87        // potentially well in advance of actually making it.  Instead, signal readiness here and
88        // treat the service as a Oneshot in the future.
89        Poll::Ready(Ok(()))
90    }
91
92    fn call(&mut self, request: Request) -> Self::Future {
93        let delay = self.policy.delay(&request);
94        ResponseFuture {
95            service: Some(self.service.clone()),
96            state: State::delaying(tokio::time::sleep(delay), Some(request)),
97        }
98    }
99}
100
101impl<Request, S, T, E> Future for ResponseFuture<Request, S>
102where
103    E: Into<crate::BoxError>,
104    S: Service<Request, Response = T, Error = E>,
105{
106    type Output = Result<T, crate::BoxError>;
107
108    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
109        let mut this = self.project();
110
111        loop {
112            match this.state.as_mut().project() {
113                StateProj::Delaying { delay, req } => {
114                    ready!(delay.poll(cx));
115                    let req = req.take().expect("Missing request in delay");
116                    let svc = this.service.take().expect("Missing service in delay");
117                    let fut = Oneshot::new(svc, req);
118                    this.state.set(State::called(fut));
119                }
120                StateProj::Called { fut } => {
121                    return fut.poll(cx).map_err(Into::into);
122                }
123            };
124        }
125    }
126}