Add src/main.rs

This commit is contained in:
elia 2024-11-11 23:59:31 +01:00
parent fc110947c5
commit 878e5618fa

214
src/main.rs Normal file
View File

@ -0,0 +1,214 @@
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use hyper::service::{make_service_fn, service_fn};
use hyper::{header, Body, Request, Response, Server, StatusCode};
use serde::Serialize;
use sysinfo::System;
use tokio::sync::RwLock;
use std::collections::VecDeque;
use std::convert::Infallible;
#[derive(Clone, Serialize)]
struct TimeSeriesPoint {
timestamp: u64,
value: f64,
}
#[derive(Serialize)]
struct TimeSeriesData {
points: Vec<TimeSeriesPoint>,
}
struct TimeSeries {
points: VecDeque<TimeSeriesPoint>,
max_age: Duration,
}
impl TimeSeries {
fn new(max_age: Duration) -> Self {
Self {
points: VecDeque::with_capacity(max_age.as_secs() as usize + 1),
max_age,
}
}
fn add_point(&mut self, value: f64) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let cutoff = now - self.max_age.as_secs();
while self.points.front().map_or(false, |p| p.timestamp < cutoff) {
self.points.pop_front();
}
self.points.push_back(TimeSeriesPoint { timestamp: now, value });
}
fn get_points(&self) -> Vec<TimeSeriesPoint> {
self.points.iter().cloned().collect()
}
}
struct Metrics {
requests_series: TimeSeries,
cpu_series: TimeSeries,
sys: System,
}
impl Metrics {
fn new() -> Self {
let mut sys = System::new_all();
sys.refresh_cpu_all();
Self {
requests_series: TimeSeries::new(Duration::from_secs(900)),
cpu_series: TimeSeries::new(Duration::from_secs(900)),
sys,
}
}
fn add_requests_per_second(&mut self, rps: f64) {
self.requests_series.add_point(rps);
}
fn add_cpu_usage(&mut self, cpu_usage: f64) {
self.cpu_series.add_point(cpu_usage);
}
fn refresh_cpu(&mut self) -> f64 {
self.sys.refresh_cpu_all();
self.sys.global_cpu_usage() as f64
}
}
struct RequestCounter {
shards: Vec<AtomicU64>,
}
impl RequestCounter {
fn new(num_shards: usize) -> Self {
let shards = (0..num_shards)
.map(|_| AtomicU64::new(0))
.collect::<Vec<_>>();
Self { shards }
}
fn increment(&self) {
let idx = fastrand::usize(..self.shards.len());
self.shards[idx].fetch_add(1, Ordering::Relaxed);
}
fn swap_and_sum(&self) -> u64 {
self.shards
.iter()
.map(|shard| shard.swap(0, Ordering::Relaxed))
.sum()
}
}
async fn handle_request(
req: Request<Body>,
request_counter: Arc<RequestCounter>,
) -> Result<Response<Body>, Infallible> {
request_counter.increment();
match req.uri().path() {
"/stats" => {
let html = include_str!("stats.html");
let response = Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "text/html")
.body(Body::from(html))
.unwrap();
Ok(response)
}
"/data/requests" => {
let metrics = req.extensions().get::<Arc<RwLock<Metrics>>>().unwrap();
let metrics_read = metrics.read().await;
let data = TimeSeriesData {
points: metrics_read.requests_series.get_points(),
};
let json = serde_json::to_string(&data).unwrap();
let response = Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(json))
.unwrap();
Ok(response)
}
"/data/cpu" => {
let metrics = req.extensions().get::<Arc<RwLock<Metrics>>>().unwrap();
let metrics_read = metrics.read().await;
let data = TimeSeriesData {
points: metrics_read.cpu_series.get_points(),
};
let json = serde_json::to_string(&data).unwrap();
let response = Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(json))
.unwrap();
Ok(response)
}
"/" => Ok(Response::new(Body::from("test"))),
_ => {
let response = Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("404 Not Found"))
.unwrap();
Ok(response)
}
}
}
#[tokio::main]
async fn main() {
let metrics = Arc::new(RwLock::new(Metrics::new()));
let request_counter = Arc::new(RequestCounter::new(256));
let metrics_clone = Arc::clone(&metrics);
let request_counter_clone = Arc::clone(&request_counter);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
interval.tick().await;
let count = request_counter_clone.swap_and_sum();
let rps = count as f64;
let mut metrics_write = metrics_clone.write().await;
let cpu_usage = metrics_write.refresh_cpu();
metrics_write.add_requests_per_second(rps);
metrics_write.add_cpu_usage(cpu_usage);
}
});
let make_svc = make_service_fn(move |_conn| {
let request_counter = Arc::clone(&request_counter);
let metrics = Arc::clone(&metrics);
async move {
Ok::<_, Infallible>(service_fn(move |mut req| {
req.extensions_mut().insert(metrics.clone());
handle_request(req, request_counter.clone())
}))
}
});
let addr = ([0, 0, 0, 0], 80).into();
let server = Server::bind(&addr).serve(make_svc);
println!("Server running on http://0.0.0.0:80");
println!("Metrics available at http://0.0.0.0:80/stats");
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}