tower/hedge/
rotating_histogram.rs

1use hdrhistogram::Histogram;
2use std::time::Duration;
3use tokio::time::Instant;
4use tracing::trace;
5
6/// This represents a "rotating" histogram which stores two histogram, one which
7/// should be read and one which should be written to.  Every period, the read
8/// histogram is discarded and replaced by the write histogram.  The idea here
9/// is that the read histogram should always contain a full period (the previous
10/// period) of write operations.
11#[derive(Debug)]
12pub struct RotatingHistogram {
13    read: Histogram<u64>,
14    write: Histogram<u64>,
15    last_rotation: Instant,
16    period: Duration,
17}
18
19impl RotatingHistogram {
20    pub fn new(period: Duration) -> RotatingHistogram {
21        RotatingHistogram {
22            // Use an auto-resizing histogram to avoid choosing
23            // a maximum latency bound for all users.
24            read: Histogram::<u64>::new(3).expect("Invalid histogram params"),
25            write: Histogram::<u64>::new(3).expect("Invalid histogram params"),
26            last_rotation: Instant::now(),
27            period,
28        }
29    }
30
31    pub fn read(&mut self) -> &mut Histogram<u64> {
32        self.maybe_rotate();
33        &mut self.read
34    }
35
36    pub fn write(&mut self) -> &mut Histogram<u64> {
37        self.maybe_rotate();
38        &mut self.write
39    }
40
41    fn maybe_rotate(&mut self) {
42        let delta = Instant::now().saturating_duration_since(self.last_rotation);
43        // TODO: replace with delta.duration_div when it becomes stable.
44        let rotations = (nanos(delta) / nanos(self.period)) as u32;
45        if rotations >= 2 {
46            trace!("Time since last rotation is {:?}.  clearing!", delta);
47            self.clear();
48        } else if rotations == 1 {
49            trace!("Time since last rotation is {:?}. rotating!", delta);
50            self.rotate();
51        }
52        self.last_rotation += self.period * rotations;
53    }
54
55    fn rotate(&mut self) {
56        std::mem::swap(&mut self.read, &mut self.write);
57        trace!("Rotated {:?} points into read", self.read.len());
58        self.write.clear();
59    }
60
61    fn clear(&mut self) {
62        self.read.clear();
63        self.write.clear();
64    }
65}
66
67const NANOS_PER_SEC: u64 = 1_000_000_000;
68fn nanos(duration: Duration) -> u64 {
69    duration
70        .as_secs()
71        .saturating_mul(NANOS_PER_SEC)
72        .saturating_add(u64::from(duration.subsec_nanos()))
73}