1use futures_util::ready;
2use pin_project_lite::pin_project;
3use std::time::Duration;
4use std::{
5 future::Future,
6 pin::Pin,
7 task::{Context, Poll},
8};
9use tokio::time::Instant;
10use tower_service::Service;
11
12pub trait Record {
16 fn record(&mut self, latency: Duration);
17}
18
19#[derive(Clone, Debug)]
22pub struct Latency<R, S> {
23 rec: R,
24 service: S,
25}
26
27pin_project! {
28 #[derive(Debug)]
29 pub struct ResponseFuture<R, F> {
30 start: Instant,
31 rec: R,
32 #[pin]
33 inner: F,
34 }
35}
36
37impl<S, R> Latency<R, S>
38where
39 R: Record + Clone,
40{
41 pub const fn new<Request>(rec: R, service: S) -> Self
42 where
43 S: Service<Request>,
44 S::Error: Into<crate::BoxError>,
45 {
46 Latency { rec, service }
47 }
48}
49
50impl<S, R, Request> Service<Request> for Latency<R, S>
51where
52 S: Service<Request>,
53 S::Error: Into<crate::BoxError>,
54 R: Record + Clone,
55{
56 type Response = S::Response;
57 type Error = crate::BoxError;
58 type Future = ResponseFuture<R, S::Future>;
59
60 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
61 self.service.poll_ready(cx).map_err(Into::into)
62 }
63
64 fn call(&mut self, request: Request) -> Self::Future {
65 ResponseFuture {
66 start: Instant::now(),
67 rec: self.rec.clone(),
68 inner: self.service.call(request),
69 }
70 }
71}
72
73impl<R, F, T, E> Future for ResponseFuture<R, F>
74where
75 R: Record,
76 F: Future<Output = Result<T, E>>,
77 E: Into<crate::BoxError>,
78{
79 type Output = Result<T, crate::BoxError>;
80
81 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
82 let this = self.project();
83
84 let rsp = ready!(this.inner.poll(cx)).map_err(Into::into)?;
85 let duration = Instant::now().saturating_duration_since(*this.start);
86 this.rec.record(duration);
87 Poll::Ready(Ok(rsp))
88 }
89}