tower/hedge/
latency.rs

1use futures_util::ready;
2use pin_project_lite::pin_project;
3use std::time::Duration;
4use std::{
5    future::Future,
6    pin::Pin,
7    task::{Context, Poll},
8};
9use tokio::time::Instant;
10use tower_service::Service;
11
12/// Record is the interface for accepting request latency measurements.  When
13/// a request completes, record is called with the elapsed duration between
14/// when the service was called and when the future completed.
15pub trait Record {
16    fn record(&mut self, latency: Duration);
17}
18
19/// Latency is a middleware that measures request latency and records it to the
20/// provided Record instance.
21#[derive(Clone, Debug)]
22pub struct Latency<R, S> {
23    rec: R,
24    service: S,
25}
26
27pin_project! {
28    #[derive(Debug)]
29    pub struct ResponseFuture<R, F> {
30        start: Instant,
31        rec: R,
32        #[pin]
33        inner: F,
34    }
35}
36
37impl<S, R> Latency<R, S>
38where
39    R: Record + Clone,
40{
41    pub const fn new<Request>(rec: R, service: S) -> Self
42    where
43        S: Service<Request>,
44        S::Error: Into<crate::BoxError>,
45    {
46        Latency { rec, service }
47    }
48}
49
50impl<S, R, Request> Service<Request> for Latency<R, S>
51where
52    S: Service<Request>,
53    S::Error: Into<crate::BoxError>,
54    R: Record + Clone,
55{
56    type Response = S::Response;
57    type Error = crate::BoxError;
58    type Future = ResponseFuture<R, S::Future>;
59
60    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
61        self.service.poll_ready(cx).map_err(Into::into)
62    }
63
64    fn call(&mut self, request: Request) -> Self::Future {
65        ResponseFuture {
66            start: Instant::now(),
67            rec: self.rec.clone(),
68            inner: self.service.call(request),
69        }
70    }
71}
72
73impl<R, F, T, E> Future for ResponseFuture<R, F>
74where
75    R: Record,
76    F: Future<Output = Result<T, E>>,
77    E: Into<crate::BoxError>,
78{
79    type Output = Result<T, crate::BoxError>;
80
81    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
82        let this = self.project();
83
84        let rsp = ready!(this.inner.poll(cx)).map_err(Into::into)?;
85        let duration = Instant::now().saturating_duration_since(*this.start);
86        this.rec.record(duration);
87        Poll::Ready(Ok(rsp))
88    }
89}