diff --git a/examples/example_dashboards.yaml b/examples/example_dashboards.yaml index 464218a..ff8da83 100644 --- a/examples/example_dashboards.yaml +++ b/examples/example_dashboards.yaml @@ -58,3 +58,16 @@ query: 'node_memory_MemFree_bytes{job="nodestats"}' meta: name_format: "`${labels.instance}`" +- title: Log Test Dashboard 1 + span: + end: now + duration: 1h + step_duration: 5min + logs: + - title: Systemd Service Logs + query_type: Range + yaxes: + - anchor: "y" # This axis is y + source: http://heimdall:3100 + query: | + {job="systemd-journal"} diff --git a/src/dashboard.rs b/src/dashboard.rs index 9b816a3..35eb634 100644 --- a/src/dashboard.rs +++ b/src/dashboard.rs @@ -20,7 +20,7 @@ use serde_yaml; use tracing::{debug, error}; use anyhow::Result; -use crate::query::{PromQueryConn, QueryType, PromQueryResult, to_samples}; +use crate::query::{PromQueryConn, QueryType, QueryResult, LokiConn, prom_to_samples, loki_to_sample}; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct PlotMeta { @@ -73,7 +73,8 @@ pub struct GraphSpan { #[derive(Deserialize)] pub struct Dashboard { pub title: String, - pub graphs: Vec, + pub graphs: Option>, + pub logs: Option>, pub span: Option, } @@ -92,6 +93,8 @@ pub enum Orientation { Vertical, } +// NOTE(zapher): These two structs look repetitive but we haven't hit the rule of three yet. +// If we do then it might be time to restructure them a bit. #[derive(Deserialize)] pub struct Graph { pub title: String, @@ -103,11 +106,23 @@ pub struct Graph { pub d3_tick_format: Option, } -pub async fn query_data(graph: &Graph, dash: &Dashboard, query_span: Option) -> Result> { +#[derive(Deserialize)] +pub struct LogStream { + pub title: String, + pub legend_orientation: Option, + pub source: String, + pub yaxes: Vec, + pub query: String, + pub span: Option, + pub limit: Option, + pub query_type: QueryType, +} + +pub async fn prom_query_data(graph: &Graph, dash: &Dashboard, query_span: Option) -> Result> { let connections = graph.get_query_connections(&dash.span, &query_span); let mut data = Vec::new(); for conn in connections { - data.push(to_samples( + data.push(prom_to_samples( conn.get_results() .await? .data() @@ -118,6 +133,17 @@ pub async fn query_data(graph: &Graph, dash: &Dashboard, query_span: Option) -> Result { + let conn = stream.get_query_connection(&dash.span, &query_span); + let response = conn.get_results().await?; + if response.status == "success" { + Ok(loki_to_sample(response.data)) + } else { + // TODO(jwall): Better error handling than this + panic!("Loki query status: {}", response.status) + } +} + fn duration_from_string(duration_string: &str) -> Option { match parse_duration::parse(duration_string) { Ok(d) => match Duration::from_std(d) { @@ -178,7 +204,7 @@ impl Graph { debug!( query = plot.query, source = plot.source, - "Getting query connection for graph" + "Getting query connection for graph", ); let mut conn = PromQueryConn::new(&plot.source, &plot.query, self.query_type.clone(), plot.meta.clone()); // Query params take precendence over all other settings. Then graph settings take @@ -196,6 +222,34 @@ impl Graph { } } +impl LogStream { + pub fn get_query_connection<'conn, 'stream: 'conn>( + &'stream self, + graph_span: &'stream Option, + query_span: &'stream Option, + ) -> LokiConn<'conn> { + debug!( + query = self.query, + source = self.source, + "Getting query connection for log streams", + ); + let mut conn = LokiConn::new(&self.source, &self.query, self.query_type.clone()); + // Query params take precendence over all other settings. Then graph settings take + // precedences and finally the dashboard settings take precendence + if let Some((end, duration, step_duration)) = graph_span_to_tuple(query_span) { + conn = conn.with_span(end, duration, step_duration); + } else if let Some((end, duration, step_duration)) = graph_span_to_tuple(&self.span) { + conn = conn.with_span(end, duration, step_duration); + } else if let Some((end, duration, step_duration)) = graph_span_to_tuple(graph_span) { + conn = conn.with_span(end, duration, step_duration); + } + if let Some(limit) = self.limit { + conn = conn.with_limit(limit); + } + conn + } +} + pub fn read_dashboard_list(path: &Path) -> anyhow::Result> { let f = std::fs::File::open(path)?; Ok(serde_yaml::from_reader(f)?) diff --git a/src/main.rs b/src/main.rs index fac843b..829c7a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use std::path::PathBuf; use anyhow; use axum::{self, extract::State, routing::*, Router}; use clap::{self, Parser, ValueEnum}; -use dashboard::{Dashboard, query_data}; +use dashboard::{Dashboard, prom_query_data}; use tokio::net::TcpListener; use tower_http::trace::TraceLayer; use tracing::{error, info}; @@ -49,12 +49,14 @@ struct Cli { } async fn validate(dash: &Dashboard) -> anyhow::Result<()> { - for graph in dash.graphs.iter() { - let data = query_data(graph, &dash, None).await; - if data.is_err() { - error!(err=?data, "Invalid dashboard query or queries"); + if let Some(ref graphs) = dash.graphs { + for graph in graphs.iter() { + let data = prom_query_data(graph, &dash, None).await; + if data.is_err() { + error!(err=?data, "Invalid dashboard query or queries"); + } + let _ = data?; } - let _ = data?; } return Ok(()); } diff --git a/src/query/loki.rs b/src/query/loki.rs index aa18cea..8152593 100644 --- a/src/query/loki.rs +++ b/src/query/loki.rs @@ -17,11 +17,12 @@ use anyhow::Result; use chrono::prelude::*; use reqwest; use serde::{Deserialize, Serialize}; +use tracing::{debug, error}; -use super::{QueryType, TimeSpan}; +use super::{LogLine, QueryResult, QueryType, TimeSpan}; // TODO(jwall): Should I allow non stream returns? -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub enum ResultType { /// Returned by query endpoints #[serde(rename = "vector")] @@ -34,27 +35,81 @@ pub enum ResultType { Streams, } -#[derive(Serialize, Deserialize)] +// Note that the value and volue types return a pair where the first item is a string but +// will in actuality always be an f64 number. +#[derive(Serialize, Deserialize, Debug)] pub struct LokiResult { #[serde(alias = "metric")] #[serde(alias = "stream")] - labels: Option>, - value: Option<(i64, String)>, - /// The only version that returns log lines - values: Option>, + labels: HashMap, + /// Calculated Value returned by vector result types + value: Option<(String, String)>, + /// Stream of Log lines, Returned by matrix and stream result types + values: Option>, +} + +#[derive(Serialize, Deserialize)] +pub struct LokiResponse { + pub status: String, + pub data: LokiData, } #[derive(Serialize, Deserialize)] pub struct LokiData { + #[serde(rename = "resultType")] result_type: ResultType, result: Vec, //stats: // TODO } -#[derive(Serialize, Deserialize)] -pub struct LokiResponse { - status: String, - data: LokiData, +pub fn loki_to_sample(data: LokiData) -> QueryResult { + match data.result_type { + ResultType::Vector => { + let mut values = Vec::with_capacity(data.result.len()); + for result in data.result { + if let Some(value) = result.value { + values.push(( + result.labels, + LogLine { + timestamp: value.0.parse::().expect("Invalid f64 type"), + line: value.1, + }, + )); + } else { + error!( + ?result, + "Invalid LokiResult: No value field when result type is {:?}", + data.result_type, + ); + } + } + QueryResult::StreamInstant(values) + } + ResultType::Matrix | ResultType::Streams => { + let mut values = Vec::with_capacity(data.result.len()); + for result in data.result { + if let Some(value) = result.values { + values.push(( + result.labels, + value + .into_iter() + .map(|(timestamp, line)| LogLine { + timestamp: timestamp.parse::().expect("Invalid f64 type"), + line, + }) + .collect(), + )); + } else { + error!( + ?result, + "Invalid LokiResult: No values field when result type is {:?}", + data.result_type, + ); + } + } + QueryResult::Stream(values) + } + } } pub struct LokiConn<'conn> { @@ -98,30 +153,38 @@ impl<'conn> LokiConn<'conn> { self } - pub async fn get_results(&self) -> Result { + pub async fn get_results(&self) -> Result { let url = match self.query_type { QueryType::Scalar => format!("{}{}", self.url, SCALAR_API_PATH), QueryType::Range => format!("{}{}", self.url, RANGE_API_PATH), }; let client = reqwest::Client::new(); - let mut req = client.get(url).query(&["query", self.query]); + let mut req = client.get(url).query(&[("query", self.query)]); + debug!(?req, "Building loki reqwest client"); if self.limit.is_some() { - req = req.query(&["limit", &self.limit.map(|u| u.to_string()).unwrap()]); + debug!(?req, "adding limit"); + req = req.query(&[("limit", &self.limit.map(|u| u.to_string()).unwrap())]); } if let QueryType::Range = self.query_type { - let (start, end, step_resolution) = if let Some(span) = &self.span { - let start = span.end - span.duration; - (start.timestamp(), span.end.timestamp(), span.step_seconds as f64) + debug!(?req, "Configuring span query params"); + let (since, end, step_resolution) = if let Some(span) = &self.span { + ( + span.duration, + span.end.timestamp(), + span.step_seconds as f64, + ) } else { let end = Utc::now(); - let start = end - chrono::Duration::minutes(10); - (start.timestamp(), end.timestamp(), 30 as f64) + (chrono::Duration::minutes(10), end.timestamp(), 30 as f64) }; - req = req.query(&["end", &end.to_string()]); - req = req.query(&["since", &start.to_string()]); - req = req.query(&["step", &step_resolution.to_string()]); + req = req.query(&[ + ("end", &end.to_string()), + ("since", &format!("{}s", since.num_seconds())), + ("step", &step_resolution.to_string()), + ]); } + debug!(?req, "Sending request"); Ok(req.send().await?.json().await?) } } diff --git a/src/query/mod.rs b/src/query/mod.rs index b348f1d..59d2036 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -11,9 +11,13 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -use serde::Deserialize; +use std::collections::HashMap; + +use serde::{Serialize, Deserialize}; use chrono::prelude::*; +use crate::dashboard::PlotMeta; + mod loki; mod prom; @@ -31,5 +35,62 @@ pub struct TimeSpan { } -pub use prom::*; +#[derive(Serialize, Deserialize, Debug)] +pub struct DataPoint { + timestamp: f64, + value: f64, +} +#[derive(Serialize, Deserialize, Debug)] +pub struct LogLine { + timestamp: f64, + line: String, +} + +#[derive(Serialize, Deserialize)] +pub enum QueryResult { + Series(Vec<(HashMap, PlotMeta, Vec)>), + Scalar(Vec<(HashMap, PlotMeta, DataPoint)>), + StreamInstant(Vec<(HashMap, LogLine)>), + Stream(Vec<(HashMap, Vec)>), +} + +impl std::fmt::Debug for QueryResult { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + QueryResult::Series(v) => { + f.write_fmt(format_args!("Series trace count = {}", v.len()))?; + for (idx, (tags, meta, trace)) in v.iter().enumerate() { + f.write_fmt(format_args!( + "; {}: tags {:?} meta: {:?} datapoint count = {};", + idx, + tags, + meta, + trace.len() + ))?; + } + } + QueryResult::Scalar(v) => { + f.write_fmt(format_args!("{} traces", v.len()))?; + } + QueryResult::StreamInstant(v) => { + f.write_fmt(format_args!("{} traces", v.len()))?; + } + QueryResult::Stream(v) => { + f.write_fmt(format_args!("stream trace count = {}", v.len()))?; + for (idx, (tags, trace)) in v.iter().enumerate() { + f.write_fmt(format_args!( + "; {}: tags {:?} line count = {}", + idx, + tags, + trace.len() + ))? + } + } + } + Ok(()) + } +} + +pub use prom::*; +pub use loki::*; diff --git a/src/query/prom.rs b/src/query/prom.rs index ba486bf..fd0c84d 100644 --- a/src/query/prom.rs +++ b/src/query/prom.rs @@ -18,12 +18,11 @@ use prometheus_http_query::{ response::{Data, PromqlResult}, Client, }; -use serde::{Deserialize, Serialize}; use tracing::debug; use crate::dashboard::PlotMeta; -use super::{QueryType, TimeSpan}; +use super::{QueryType, TimeSpan, QueryResult, DataPoint}; #[derive(Debug)] pub struct PromQueryConn<'conn> { @@ -35,7 +34,12 @@ pub struct PromQueryConn<'conn> { } impl<'conn> PromQueryConn<'conn> { - pub fn new<'a: 'conn>(source: &'a str, query: &'a str, query_type: QueryType, meta: PlotMeta) -> Self { + pub fn new<'a: 'conn>( + source: &'a str, + query: &'a str, + query_type: QueryType, + meta: PlotMeta, + ) -> Self { Self { source, query, @@ -102,44 +106,9 @@ impl<'conn> PromQueryConn<'conn> { } } -#[derive(Serialize, Deserialize, Debug)] -pub struct DataPoint { - timestamp: f64, - value: f64, -} - -#[derive(Serialize, Deserialize)] -pub enum PromQueryResult { - Series(Vec<(HashMap, PlotMeta, Vec)>), - Scalar(Vec<(HashMap, PlotMeta, DataPoint)>), -} - -impl std::fmt::Debug for PromQueryResult { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - PromQueryResult::Series(v) => { - f.write_fmt(format_args!("Series trace count = {}", v.len()))?; - for (idx, (tags, meta, trace)) in v.iter().enumerate() { - f.write_fmt(format_args!( - "; {}: tags {:?} meta: {:?} datapoint count = {};", - idx, - tags, - meta, - trace.len() - ))?; - } - } - PromQueryResult::Scalar(v) => { - f.write_fmt(format_args!("{} traces", v.len()))?; - } - } - Ok(()) - } -} - -pub fn to_samples(data: Data, meta: PlotMeta) -> PromQueryResult { +pub fn prom_to_samples(data: Data, meta: PlotMeta) -> QueryResult { match data { - Data::Matrix(mut range) => PromQueryResult::Series( + Data::Matrix(mut range) => QueryResult::Series( range .drain(0..) .map(|rv| { @@ -158,7 +127,7 @@ pub fn to_samples(data: Data, meta: PlotMeta) -> PromQueryResult { }) .collect(), ), - Data::Vector(mut vector) => PromQueryResult::Scalar( + Data::Vector(mut vector) => QueryResult::Scalar( vector .drain(0..) .map(|iv| { @@ -174,7 +143,7 @@ pub fn to_samples(data: Data, meta: PlotMeta) -> PromQueryResult { }) .collect(), ), - Data::Scalar(sample) => PromQueryResult::Scalar(vec![( + Data::Scalar(sample) => QueryResult::Scalar(vec![( HashMap::new(), meta.clone(), DataPoint { diff --git a/src/routes.rs b/src/routes.rs index 81cbd8f..b3f3f6f 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -25,8 +25,8 @@ use maud::{html, Markup}; use serde::{Serialize, Deserialize}; use tracing::debug; -use crate::dashboard::{Dashboard, Graph, GraphSpan, AxisDefinition, Orientation, query_data}; -use crate::query::PromQueryResult; +use crate::dashboard::{Dashboard, Graph, GraphSpan, AxisDefinition, Orientation, prom_query_data, loki_query_data}; +use crate::query::QueryResult; type Config = State>>; @@ -34,7 +34,21 @@ type Config = State>>; pub struct GraphPayload { pub legend_orientation: Option, pub yaxes: Vec, - pub plots: Vec, + pub plots: Vec, +} + +// TODO(jwall): Should this be a completely different payload? +pub async fn loki_query( + State(config): Config, + Path((dash_idx, loki_idx)): Path<(usize, usize)>, + Query(query): Query>, +) -> Json { + let dash = config.get(dash_idx).expect(&format!("No such dashboard index {}", dash_idx)); + let log = dash.logs + .as_ref().expect("No logs in this dashboard") + .get(loki_idx).expect(&format!("No such log query {}", loki_idx)); + let plots = vec![loki_query_data(log, dash, query_to_graph_span(query)).await.expect("Unable to get log query results")]; + Json(GraphPayload{legend_orientation: None, yaxes: log.yaxes.clone(), plots}) } pub async fn graph_query( @@ -43,11 +57,17 @@ pub async fn graph_query( Query(query): Query>, ) -> Json { debug!("Getting data for query"); - let dash = config.get(dash_idx).expect("No such dashboard index"); + let dash = config.get(dash_idx).expect(&format!("No such dashboard index {}", dash_idx)); let graph = dash .graphs + .as_ref().expect("No graphs in this dashboard") .get(graph_idx) .expect(&format!("No such graph in dasboard {}", dash_idx)); + let plots = prom_query_data(graph, dash, query_to_graph_span(query)).await.expect("Unable to get query results"); + Json(GraphPayload{legend_orientation: graph.legend_orientation.clone(), yaxes: graph.yaxes.clone(), plots}) +} + +fn query_to_graph_span(query: HashMap) -> Option { let query_span = { if query.contains_key("end") && query.contains_key("duration") @@ -62,15 +82,19 @@ pub async fn graph_query( None } }; - let plots = query_data(graph, dash, query_span).await.expect("Unable to get query results"); - Json(GraphPayload{legend_orientation: graph.legend_orientation.clone(), yaxes: graph.yaxes.clone(), plots}) + query_span } pub fn mk_api_routes(config: Arc>) -> Router { // Query routes - Router::new().route( + Router::new() + .route( "/dash/:dash_idx/graph/:graph_idx", - get(graph_query).with_state(config), + get(graph_query).with_state(config.clone()), + ) + .route( + "/dash/:dash_idx/log/:log_idx", + get(loki_query).with_state(config), ) } @@ -82,9 +106,9 @@ pub fn graph_component(dash_idx: usize, graph_idx: usize, graph: &Graph) -> Mark div { h2 { (graph.title) " - " a href=(graph_embed_uri) { "embed url" } } @if graph.d3_tick_format.is_some() { - timeseries-graph uri=(graph_data_uri) id=(graph_id) d3-tick-format=(graph.d3_tick_format.as_ref().unwrap()) { } + graph-plot uri=(graph_data_uri) id=(graph_id) d3-tick-format=(graph.d3_tick_format.as_ref().unwrap()) { } } @else { - timeseries-graph uri=(graph_data_uri) id=(graph_id) { } + graph-plot uri=(graph_data_uri) id=(graph_id) { } } } ) @@ -96,8 +120,9 @@ pub async fn graph_ui( ) -> Markup { let graph = config .get(dash_idx) - .expect("No such dashboard") + .expect(&format!("No such dashboard {}", dash_idx)) .graphs + .as_ref().expect("No graphs in this dashboard") .get(graph_idx) .expect("No such graph"); graph_component(dash_idx, graph_idx, graph) @@ -109,9 +134,10 @@ pub async fn dash_ui(State(config): State, Path(dash_idx): Path) } fn dash_elements(config: State>>, dash_idx: usize) -> maud::PreEscaped { - let dash = config.get(dash_idx).expect("No such dashboard"); + let dash = config.get(dash_idx).expect(&format!("No such dashboard {}", dash_idx)); let graph_iter = dash .graphs + .as_ref().expect("No graphs in this dashboard") .iter() .enumerate() .collect::>(); diff --git a/static/lib.js b/static/lib.js index 739afa4..48c90ad 100644 --- a/static/lib.js +++ b/static/lib.js @@ -16,7 +16,7 @@ function getCssVariableValue(variableName) { return getComputedStyle(document.documentElement).getPropertyValue(variableName); } -class TimeseriesGraph extends HTMLElement { +class GraphPlot extends HTMLElement { #uri; #width; #height; @@ -94,7 +94,7 @@ class TimeseriesGraph extends HTMLElement { this.stopInterval() } - static elementName = "timeseries-graph"; + static elementName = "graph-plot"; getTargetNode() { return this.#targetNode; @@ -122,8 +122,8 @@ class TimeseriesGraph extends HTMLElement { } static registerElement() { - if (!customElements.get(TimeseriesGraph.elementName)) { - customElements.define(TimeseriesGraph.elementName, TimeseriesGraph); + if (!customElements.get(GraphPlot.elementName)) { + customElements.define(GraphPlot.elementName, GraphPlot); } } @@ -350,7 +350,7 @@ class TimeseriesGraph extends HTMLElement { } } -TimeseriesGraph.registerElement(); +GraphPlot.registerElement(); class SpanSelector extends HTMLElement { #targetNode = null; @@ -389,7 +389,7 @@ class SpanSelector extends HTMLElement { } updateGraphs() { - for (var node of document.getElementsByTagName(TimeseriesGraph.elementName)) { + for (var node of document.getElementsByTagName(GraphPlot.elementName)) { node.setAttribute('end', this.#endInput.value); node.setAttribute('duration', this.#durationInput.value); node.setAttribute('step-duration', this.#stepDurationInput.value); diff --git a/static/site.css b/static/site.css index b7e6922..f8cf0c5 100644 --- a/static/site.css +++ b/static/site.css @@ -84,7 +84,7 @@ body * { flex: 0 1 auto; } -timeseries-graph { +graph-plot { background-color: var(--paper-background-color); border-radius: 4px; display: flex;