diff --git a/Cargo.toml b/Cargo.toml index d06a194..8369688 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,9 +11,10 @@ anyhow = "1.0.79" async-io = "2.3.1" axum = { version = "0.7.4", features = [ "http2" ] } axum-macros = "0.4.1" -chrono = { version = "0.4.33", features = ["alloc", "std", "now"] } +chrono = { version = "0.4.33", features = ["alloc", "std", "now", "serde"] } clap = { version = "4.4.18", features = ["derive"] } maud = { version = "0.26.0", features = ["axum"] } +parse_duration = "2.1.1" prometheus-http-query = "0.8.2" serde = { version = "1.0.196", features = ["derive"] } serde_json = "1.0.113" diff --git a/examples/example_dashboards.yaml b/examples/example_dashboards.yaml index 8e07dd9..90d9e36 100644 --- a/examples/example_dashboards.yaml +++ b/examples/example_dashboards.yaml @@ -5,6 +5,10 @@ source: http://heimdall:9001 query: 'sum by (instance)(irate(node_cpu_seconds_total{mode="system",job="nodestats"}[5m])) * 100' query_type: Range + span: + start: 2024-02-10T00:00:00.00Z + duration: 2d + step_duration: 1min name_label: instance - title: Test Dasbboard 2 graphs: diff --git a/src/dashboard.rs b/src/dashboard.rs index 71ad3e2..a7a7d55 100644 --- a/src/dashboard.rs +++ b/src/dashboard.rs @@ -13,9 +13,10 @@ // limitations under the License. use std::path::Path; +use chrono::prelude::*; use serde::Deserialize; use serde_yaml; -use tracing::debug; +use tracing::{debug, error}; use crate::query::{QueryConn, QueryType}; @@ -25,19 +26,69 @@ pub struct Dashboard { pub graphs: Vec, } +#[derive(Deserialize)] +pub struct GraphSpan { + pub start: DateTime, + pub duration: String, + pub step_duration: String, +} + #[derive(Deserialize)] pub struct Graph { pub title: String, pub source: String, pub query: String, + // serialized with https://datatracker.ietf.org/doc/html/rfc3339 + pub span: Option, pub name_label: String, pub query_type: QueryType, } +fn duration_from_string(duration: &str) -> Option { + match parse_duration::parse(duration) { + Ok(d) => match chrono::Duration::from_std(d) { + Ok(d) => Some(d), + Err(e) => { + error!(err = ?e, "specified Duration is out of bounds"); + return None; + } + }, + Err(e) => { + error!( + err = ?e, + "Failed to parse duration" + ); + return None; + } + } +} + impl Graph { pub fn get_query_connection<'conn, 'graph: 'conn>(&'graph self) -> QueryConn<'conn> { - debug!(query=self.query, source=self.source, "Getting query connection for graph"); - QueryConn::new(&self.source, &self.query, self.query_type.clone()) + debug!( + query = self.query, + source = self.source, + "Getting query connection for graph" + ); + let mut conn = QueryConn::new(&self.source, &self.query, self.query_type.clone()); + if let Some(span) = &self.span { + let duration = match duration_from_string(&span.duration) { + Some(d) => d, + None => { + error!("Invalid query duration not assigning span to to graph query"); + return conn; + } + }; + let step_duration = match duration_from_string(&span.step_duration) { + Some(d) => d, + None => { + error!("Invalid query step resolution not assigning span to to graph query"); + return conn; + } + }; + conn = conn.with_span(span.start.clone(), duration, step_duration); + } + conn } } diff --git a/src/query.rs b/src/query.rs index abc5479..b7e3aa1 100644 --- a/src/query.rs +++ b/src/query.rs @@ -13,10 +13,13 @@ // limitations under the License. use std::collections::HashMap; -use prometheus_http_query::{Client, response::{PromqlResult, Data}}; -use serde::{Serialize, Deserialize}; -use tracing::debug; use chrono::prelude::*; +use prometheus_http_query::{ + response::{Data, PromqlResult}, + Client, +}; +use serde::{Deserialize, Serialize}; +use tracing::debug; #[derive(Deserialize, Clone)] pub enum QueryType { @@ -24,10 +27,17 @@ pub enum QueryType { Scalar, } +pub struct TimeSpan { + pub start: DateTime, + pub duration: chrono::Duration, + pub step_seconds: i64, +} + pub struct QueryConn<'conn> { source: &'conn str, query: &'conn str, - query_type: QueryType, + span: Option, + query_type: QueryType, } impl<'conn> QueryConn<'conn> { @@ -36,17 +46,36 @@ impl<'conn> QueryConn<'conn> { source, query, query_type, + span: None, } } + pub fn with_span(mut self, start: DateTime, duration: chrono::Duration, step: chrono::Duration) -> Self { + self.span = Some(TimeSpan { start, duration, step_seconds: step.num_seconds() , }); + self + } + pub async fn get_results(&self) -> anyhow::Result { debug!("Getting results for query"); let client = Client::try_from(self.source)?; - let end = Utc::now().timestamp(); - let start = end - (60 * 10); - let step_resolution = 10 as f64; + let (end, start, step_resolution) = if let Some(TimeSpan { + start: st, + duration: du, + step_seconds: step_millis, + }) = self.span + { + ((st + du).timestamp(), st.timestamp(), step_millis as f64) + } else { + let end = Utc::now().timestamp(); + let start = end - (60 * 10); + (end, start, 30 as f64) + }; + debug!(start, end, step_resolution, "Running Query with range values"); match self.query_type { - QueryType::Range => Ok(client.query_range(self.query, start, end, step_resolution).get().await?), + QueryType::Range => Ok(client + .query_range(self.query, start, end, step_resolution) + .get() + .await?), QueryType::Scalar => Ok(client.query(self.query).get().await?), } } @@ -58,7 +87,6 @@ pub struct DataPoint { value: f64, } - #[derive(Serialize, Deserialize)] pub enum QueryResult { Series(Vec<(HashMap, Vec)>), @@ -67,22 +95,45 @@ pub enum QueryResult { pub fn to_samples(data: Data) -> QueryResult { match data { - Data::Matrix(mut range) => { - QueryResult::Series(range.drain(0..).map(|rv| { - let (metric, mut samples) = rv.into_inner(); - (metric, samples.drain(0..).map(|s| { - DataPoint { timestamp: s.timestamp(), value: s.value() } - }).collect()) - }).collect()) - } - Data::Vector(mut vector) => { - QueryResult::Scalar(vector.drain(0..).map(|iv| { - let (metric, sample) = iv.into_inner(); - (metric, DataPoint { timestamp: sample.timestamp(), value: sample.value() }) - }).collect()) - } - Data::Scalar(sample) => { - QueryResult::Scalar(vec![(HashMap::new(), DataPoint { timestamp: sample.timestamp(), value: sample.value() })]) - } + Data::Matrix(mut range) => QueryResult::Series( + range + .drain(0..) + .map(|rv| { + let (metric, mut samples) = rv.into_inner(); + ( + metric, + samples + .drain(0..) + .map(|s| DataPoint { + timestamp: s.timestamp(), + value: s.value(), + }) + .collect(), + ) + }) + .collect(), + ), + Data::Vector(mut vector) => QueryResult::Scalar( + vector + .drain(0..) + .map(|iv| { + let (metric, sample) = iv.into_inner(); + ( + metric, + DataPoint { + timestamp: sample.timestamp(), + value: sample.value(), + }, + ) + }) + .collect(), + ), + Data::Scalar(sample) => QueryResult::Scalar(vec![( + HashMap::new(), + DataPoint { + timestamp: sample.timestamp(), + value: sample.value(), + }, + )]), } }