diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..25eb74d --- /dev/null +++ b/src/main.rs @@ -0,0 +1,214 @@ +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use hyper::service::{make_service_fn, service_fn}; +use hyper::{header, Body, Request, Response, Server, StatusCode}; +use serde::Serialize; +use sysinfo::System; +use tokio::sync::RwLock; + +use std::collections::VecDeque; +use std::convert::Infallible; + +#[derive(Clone, Serialize)] +struct TimeSeriesPoint { + timestamp: u64, + value: f64, +} + +#[derive(Serialize)] +struct TimeSeriesData { + points: Vec, +} + +struct TimeSeries { + points: VecDeque, + max_age: Duration, +} + +impl TimeSeries { + fn new(max_age: Duration) -> Self { + Self { + points: VecDeque::with_capacity(max_age.as_secs() as usize + 1), + max_age, + } + } + + fn add_point(&mut self, value: f64) { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + let cutoff = now - self.max_age.as_secs(); + while self.points.front().map_or(false, |p| p.timestamp < cutoff) { + self.points.pop_front(); + } + self.points.push_back(TimeSeriesPoint { timestamp: now, value }); + } + + fn get_points(&self) -> Vec { + self.points.iter().cloned().collect() + } +} + +struct Metrics { + requests_series: TimeSeries, + cpu_series: TimeSeries, + sys: System, +} + +impl Metrics { + fn new() -> Self { + let mut sys = System::new_all(); + sys.refresh_cpu_all(); + + Self { + requests_series: TimeSeries::new(Duration::from_secs(900)), + cpu_series: TimeSeries::new(Duration::from_secs(900)), + sys, + } + } + + fn add_requests_per_second(&mut self, rps: f64) { + self.requests_series.add_point(rps); + } + + fn add_cpu_usage(&mut self, cpu_usage: f64) { + self.cpu_series.add_point(cpu_usage); + } + + fn refresh_cpu(&mut self) -> f64 { + self.sys.refresh_cpu_all(); + self.sys.global_cpu_usage() as f64 + } +} + +struct RequestCounter { + shards: Vec, +} + +impl RequestCounter { + fn new(num_shards: usize) -> Self { + let shards = (0..num_shards) + .map(|_| AtomicU64::new(0)) + .collect::>(); + Self { shards } + } + + fn increment(&self) { + let idx = fastrand::usize(..self.shards.len()); + self.shards[idx].fetch_add(1, Ordering::Relaxed); + } + + fn swap_and_sum(&self) -> u64 { + self.shards + .iter() + .map(|shard| shard.swap(0, Ordering::Relaxed)) + .sum() + } +} + +async fn handle_request( + req: Request, + request_counter: Arc, +) -> Result, Infallible> { + request_counter.increment(); + + match req.uri().path() { + "/stats" => { + let html = include_str!("stats.html"); + let response = Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "text/html") + .body(Body::from(html)) + .unwrap(); + Ok(response) + } + "/data/requests" => { + let metrics = req.extensions().get::>>().unwrap(); + let metrics_read = metrics.read().await; + let data = TimeSeriesData { + points: metrics_read.requests_series.get_points(), + }; + let json = serde_json::to_string(&data).unwrap(); + + let response = Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(json)) + .unwrap(); + Ok(response) + } + "/data/cpu" => { + let metrics = req.extensions().get::>>().unwrap(); + let metrics_read = metrics.read().await; + let data = TimeSeriesData { + points: metrics_read.cpu_series.get_points(), + }; + let json = serde_json::to_string(&data).unwrap(); + + let response = Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(json)) + .unwrap(); + Ok(response) + } + "/" => Ok(Response::new(Body::from("test"))), + _ => { + let response = Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("404 Not Found")) + .unwrap(); + Ok(response) + } + } +} + +#[tokio::main] +async fn main() { + let metrics = Arc::new(RwLock::new(Metrics::new())); + let request_counter = Arc::new(RequestCounter::new(256)); + + let metrics_clone = Arc::clone(&metrics); + let request_counter_clone = Arc::clone(&request_counter); + + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + loop { + interval.tick().await; + + let count = request_counter_clone.swap_and_sum(); + let rps = count as f64; + + let mut metrics_write = metrics_clone.write().await; + let cpu_usage = metrics_write.refresh_cpu(); + + metrics_write.add_requests_per_second(rps); + metrics_write.add_cpu_usage(cpu_usage); + } + }); + + let make_svc = make_service_fn(move |_conn| { + let request_counter = Arc::clone(&request_counter); + let metrics = Arc::clone(&metrics); + async move { + Ok::<_, Infallible>(service_fn(move |mut req| { + req.extensions_mut().insert(metrics.clone()); + handle_request(req, request_counter.clone()) + })) + } + }); + + let addr = ([0, 0, 0, 0], 80).into(); + let server = Server::bind(&addr).serve(make_svc); + println!("Server running on http://0.0.0.0:80"); + println!("Metrics available at http://0.0.0.0:80/stats"); + + if let Err(e) = server.await { + eprintln!("server error: {}", e); + } +}