add comments

This commit is contained in:
elia 2024-11-12 00:19:02 +01:00
parent 00634dea9d
commit 8df0000f56

View File

@ -1,35 +1,38 @@
// imports for concurrency, time, and web server
use std::sync::{ use std::sync::{
atomic::{AtomicU64, Ordering}, atomic::{AtomicU64, Ordering},
Arc, Arc,
}; };
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use hyper::service::{make_service_fn, service_fn}; use hyper::service::{make_service_fn, service_fn};
use hyper::{header, Body, Request, Response, Server, StatusCode}; use hyper::{header, Body, Request, Response, Server, StatusCode};
use serde::Serialize; use serde::Serialize;
use sysinfo::System; use sysinfo::System;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::convert::Infallible; use std::convert::Infallible;
// represents a single data point with timestamp and value
#[derive(Clone, Serialize)] #[derive(Clone, Serialize)]
struct TimeSeriesPoint { struct TimeSeriesPoint {
timestamp: u64, timestamp: u64,
value: f64, value: f64,
} }
// collection of time series points
#[derive(Serialize)] #[derive(Serialize)]
struct TimeSeriesData { struct TimeSeriesData {
points: Vec<TimeSeriesPoint>, points: Vec<TimeSeriesPoint>,
} }
// manages time series data with age-based cleanup
struct TimeSeries { struct TimeSeries {
points: VecDeque<TimeSeriesPoint>, points: VecDeque<TimeSeriesPoint>,
max_age: Duration, max_age: Duration,
} }
impl TimeSeries { impl TimeSeries {
// creates new time series with specified max age
fn new(max_age: Duration) -> Self { fn new(max_age: Duration) -> Self {
Self { Self {
points: VecDeque::with_capacity(max_age.as_secs() as usize + 1), points: VecDeque::with_capacity(max_age.as_secs() as usize + 1),
@ -37,23 +40,28 @@ impl TimeSeries {
} }
} }
// adds new point and removes old ones
fn add_point(&mut self, value: f64) { fn add_point(&mut self, value: f64) {
let now = SystemTime::now() let now = SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
.unwrap() .unwrap()
.as_secs(); .as_secs();
let cutoff = now - self.max_age.as_secs(); let cutoff = now - self.max_age.as_secs();
// remove expired points
while self.points.front().map_or(false, |p| p.timestamp < cutoff) { while self.points.front().map_or(false, |p| p.timestamp < cutoff) {
self.points.pop_front(); self.points.pop_front();
} }
self.points.push_back(TimeSeriesPoint { timestamp: now, value }); self.points.push_back(TimeSeriesPoint { timestamp: now, value });
} }
// returns all points as vector
fn get_points(&self) -> Vec<TimeSeriesPoint> { fn get_points(&self) -> Vec<TimeSeriesPoint> {
self.points.iter().cloned().collect() self.points.iter().cloned().collect()
} }
} }
// stores metrics for requests and cpu usage
struct Metrics { struct Metrics {
requests_series: TimeSeries, requests_series: TimeSeries,
cpu_series: TimeSeries, cpu_series: TimeSeries,
@ -61,10 +69,10 @@ struct Metrics {
} }
impl Metrics { impl Metrics {
// initializes metrics tracking
fn new() -> Self { fn new() -> Self {
let mut sys = System::new_all(); let mut sys = System::new_all();
sys.refresh_cpu_all(); sys.refresh_cpu_all();
Self { Self {
requests_series: TimeSeries::new(Duration::from_secs(900)), requests_series: TimeSeries::new(Duration::from_secs(900)),
cpu_series: TimeSeries::new(Duration::from_secs(900)), cpu_series: TimeSeries::new(Duration::from_secs(900)),
@ -72,25 +80,30 @@ impl Metrics {
} }
} }
// adds requests per second data point
fn add_requests_per_second(&mut self, rps: f64) { fn add_requests_per_second(&mut self, rps: f64) {
self.requests_series.add_point(rps); self.requests_series.add_point(rps);
} }
// adds cpu usage data point
fn add_cpu_usage(&mut self, cpu_usage: f64) { fn add_cpu_usage(&mut self, cpu_usage: f64) {
self.cpu_series.add_point(cpu_usage); self.cpu_series.add_point(cpu_usage);
} }
// updates and returns current cpu usage
fn refresh_cpu(&mut self) -> f64 { fn refresh_cpu(&mut self) -> f64 {
self.sys.refresh_cpu_all(); self.sys.refresh_cpu_all();
self.sys.global_cpu_usage() as f64 self.sys.global_cpu_usage() as f64
} }
} }
// counts requests using multiple atomic counters
struct RequestCounter { struct RequestCounter {
shards: Vec<AtomicU64>, shards: Vec<AtomicU64>,
} }
impl RequestCounter { impl RequestCounter {
// creates counter with specified number of shards
fn new(num_shards: usize) -> Self { fn new(num_shards: usize) -> Self {
let shards = (0..num_shards) let shards = (0..num_shards)
.map(|_| AtomicU64::new(0)) .map(|_| AtomicU64::new(0))
@ -98,11 +111,13 @@ impl RequestCounter {
Self { shards } Self { shards }
} }
// increments random shard counter
fn increment(&self) { fn increment(&self) {
let idx = fastrand::usize(..self.shards.len()); let idx = fastrand::usize(..self.shards.len());
self.shards[idx].fetch_add(1, Ordering::Relaxed); self.shards[idx].fetch_add(1, Ordering::Relaxed);
} }
// resets all counters and returns sum
fn swap_and_sum(&self) -> u64 { fn swap_and_sum(&self) -> u64 {
self.shards self.shards
.iter() .iter()
@ -111,12 +126,14 @@ impl RequestCounter {
} }
} }
// handles http requests
async fn handle_request( async fn handle_request(
req: Request<Body>, req: Request<Body>,
request_counter: Arc<RequestCounter>, request_counter: Arc<RequestCounter>,
) -> Result<Response<Body>, Infallible> { ) -> Result<Response<Body>, Infallible> {
request_counter.increment(); request_counter.increment();
// route requests to appropriate handlers
match req.uri().path() { match req.uri().path() {
"/stats" => { "/stats" => {
let html = include_str!("stats.html"); let html = include_str!("stats.html");
@ -134,7 +151,6 @@ async fn handle_request(
points: metrics_read.requests_series.get_points(), points: metrics_read.requests_series.get_points(),
}; };
let json = serde_json::to_string(&data).unwrap(); let json = serde_json::to_string(&data).unwrap();
let response = Response::builder() let response = Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json") .header(header::CONTENT_TYPE, "application/json")
@ -149,7 +165,6 @@ async fn handle_request(
points: metrics_read.cpu_series.get_points(), points: metrics_read.cpu_series.get_points(),
}; };
let json = serde_json::to_string(&data).unwrap(); let json = serde_json::to_string(&data).unwrap();
let response = Response::builder() let response = Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json") .header(header::CONTENT_TYPE, "application/json")
@ -168,30 +183,30 @@ async fn handle_request(
} }
} }
// main entry point
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
// initialize shared state
let metrics = Arc::new(RwLock::new(Metrics::new())); let metrics = Arc::new(RwLock::new(Metrics::new()));
let request_counter = Arc::new(RequestCounter::new(256)); let request_counter = Arc::new(RequestCounter::new(256));
let metrics_clone = Arc::clone(&metrics); let metrics_clone = Arc::clone(&metrics);
let request_counter_clone = Arc::clone(&request_counter); let request_counter_clone = Arc::clone(&request_counter);
// spawn background metrics collection task
tokio::spawn(async move { tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1)); let mut interval = tokio::time::interval(Duration::from_secs(1));
loop { loop {
interval.tick().await; interval.tick().await;
let count = request_counter_clone.swap_and_sum(); let count = request_counter_clone.swap_and_sum();
let rps = count as f64; let rps = count as f64;
let mut metrics_write = metrics_clone.write().await; let mut metrics_write = metrics_clone.write().await;
let cpu_usage = metrics_write.refresh_cpu(); let cpu_usage = metrics_write.refresh_cpu();
metrics_write.add_requests_per_second(rps); metrics_write.add_requests_per_second(rps);
metrics_write.add_cpu_usage(cpu_usage); metrics_write.add_cpu_usage(cpu_usage);
} }
}); });
// setup http server
let make_svc = make_service_fn(move |_conn| { let make_svc = make_service_fn(move |_conn| {
let request_counter = Arc::clone(&request_counter); let request_counter = Arc::clone(&request_counter);
let metrics = Arc::clone(&metrics); let metrics = Arc::clone(&metrics);
@ -203,12 +218,13 @@ async fn main() {
} }
}); });
// start server
let addr = ([0, 0, 0, 0], 80).into(); let addr = ([0, 0, 0, 0], 80).into();
let server = Server::bind(&addr).serve(make_svc); let server = Server::bind(&addr).serve(make_svc);
println!("Server running on http://0.0.0.0:80"); println!("Server running on http://0.0.0.0:80");
println!("Metrics available at http://0.0.0.0:80/stats"); println!("Metrics available at http://0.0.0.0:80/stats");
if let Err(e) = server.await { if let Err(e) = server.await {
eprintln!("server error: {}", e); eprintln!("server error: {}", e);
} }
} }