From 8df0000f56834a6be80e6663eab3fbfe6e60ca62 Mon Sep 17 00:00:00 2001 From: elia Date: Tue, 12 Nov 2024 00:19:02 +0100 Subject: [PATCH] add comments --- src/main.rs | 40 ++++++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/src/main.rs b/src/main.rs index 25eb74d..deed212 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,35 +1,38 @@ +// imports for concurrency, time, and web server use std::sync::{ atomic::{AtomicU64, Ordering}, Arc, }; use std::time::{Duration, SystemTime, UNIX_EPOCH}; - use hyper::service::{make_service_fn, service_fn}; use hyper::{header, Body, Request, Response, Server, StatusCode}; use serde::Serialize; use sysinfo::System; use tokio::sync::RwLock; - use std::collections::VecDeque; use std::convert::Infallible; +// represents a single data point with timestamp and value #[derive(Clone, Serialize)] struct TimeSeriesPoint { timestamp: u64, value: f64, } +// collection of time series points #[derive(Serialize)] struct TimeSeriesData { points: Vec, } +// manages time series data with age-based cleanup struct TimeSeries { points: VecDeque, max_age: Duration, } impl TimeSeries { + // creates new time series with specified max age fn new(max_age: Duration) -> Self { Self { points: VecDeque::with_capacity(max_age.as_secs() as usize + 1), @@ -37,23 +40,28 @@ impl TimeSeries { } } + // adds new point and removes old ones fn add_point(&mut self, value: f64) { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); let cutoff = now - self.max_age.as_secs(); + + // remove expired points while self.points.front().map_or(false, |p| p.timestamp < cutoff) { self.points.pop_front(); } self.points.push_back(TimeSeriesPoint { timestamp: now, value }); } + // returns all points as vector fn get_points(&self) -> Vec { self.points.iter().cloned().collect() } } +// stores metrics for requests and cpu usage struct Metrics { requests_series: TimeSeries, cpu_series: TimeSeries, @@ -61,10 +69,10 @@ struct Metrics { } impl Metrics { + // initializes metrics tracking fn new() -> Self { let mut sys = System::new_all(); sys.refresh_cpu_all(); - Self { requests_series: TimeSeries::new(Duration::from_secs(900)), cpu_series: TimeSeries::new(Duration::from_secs(900)), @@ -72,25 +80,30 @@ impl Metrics { } } + // adds requests per second data point fn add_requests_per_second(&mut self, rps: f64) { self.requests_series.add_point(rps); } + // adds cpu usage data point fn add_cpu_usage(&mut self, cpu_usage: f64) { self.cpu_series.add_point(cpu_usage); } + // updates and returns current cpu usage fn refresh_cpu(&mut self) -> f64 { self.sys.refresh_cpu_all(); self.sys.global_cpu_usage() as f64 } } +// counts requests using multiple atomic counters struct RequestCounter { shards: Vec, } impl RequestCounter { + // creates counter with specified number of shards fn new(num_shards: usize) -> Self { let shards = (0..num_shards) .map(|_| AtomicU64::new(0)) @@ -98,11 +111,13 @@ impl RequestCounter { Self { shards } } + // increments random shard counter fn increment(&self) { let idx = fastrand::usize(..self.shards.len()); self.shards[idx].fetch_add(1, Ordering::Relaxed); } + // resets all counters and returns sum fn swap_and_sum(&self) -> u64 { self.shards .iter() @@ -111,12 +126,14 @@ impl RequestCounter { } } +// handles http requests async fn handle_request( req: Request, request_counter: Arc, ) -> Result, Infallible> { request_counter.increment(); - + + // route requests to appropriate handlers match req.uri().path() { "/stats" => { let html = include_str!("stats.html"); @@ -134,7 +151,6 @@ async fn handle_request( points: metrics_read.requests_series.get_points(), }; let json = serde_json::to_string(&data).unwrap(); - let response = Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "application/json") @@ -149,7 +165,6 @@ async fn handle_request( points: metrics_read.cpu_series.get_points(), }; let json = serde_json::to_string(&data).unwrap(); - let response = Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "application/json") @@ -168,30 +183,30 @@ async fn handle_request( } } +// main entry point #[tokio::main] async fn main() { + // initialize shared state let metrics = Arc::new(RwLock::new(Metrics::new())); let request_counter = Arc::new(RequestCounter::new(256)); - let metrics_clone = Arc::clone(&metrics); let request_counter_clone = Arc::clone(&request_counter); + // spawn background metrics collection task tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(1)); loop { interval.tick().await; - let count = request_counter_clone.swap_and_sum(); let rps = count as f64; - let mut metrics_write = metrics_clone.write().await; let cpu_usage = metrics_write.refresh_cpu(); - metrics_write.add_requests_per_second(rps); metrics_write.add_cpu_usage(cpu_usage); } }); + // setup http server let make_svc = make_service_fn(move |_conn| { let request_counter = Arc::clone(&request_counter); let metrics = Arc::clone(&metrics); @@ -203,12 +218,13 @@ async fn main() { } }); + // start server let addr = ([0, 0, 0, 0], 80).into(); let server = Server::bind(&addr).serve(make_svc); println!("Server running on http://0.0.0.0:80"); println!("Metrics available at http://0.0.0.0:80/stats"); - + if let Err(e) = server.await { eprintln!("server error: {}", e); } -} +} \ No newline at end of file