feat: logql queries in dashboard definitions

This commit is contained in:
Jeremy Wall 2024-02-27 18:13:26 -05:00
parent 16ff43f4e2
commit ec2394eaf7
9 changed files with 284 additions and 96 deletions

View File

@ -58,3 +58,16 @@
query: 'node_memory_MemFree_bytes{job="nodestats"}'
meta:
name_format: "`${labels.instance}`"
- title: Log Test Dashboard 1
span:
end: now
duration: 1h
step_duration: 5min
logs:
- title: Systemd Service Logs
query_type: Range
yaxes:
- anchor: "y" # This axis is y
source: http://heimdall:3100
query: |
{job="systemd-journal"}

View File

@ -20,7 +20,7 @@ use serde_yaml;
use tracing::{debug, error};
use anyhow::Result;
use crate::query::{PromQueryConn, QueryType, PromQueryResult, to_samples};
use crate::query::{PromQueryConn, QueryType, QueryResult, LokiConn, prom_to_samples, loki_to_sample};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PlotMeta {
@ -73,7 +73,8 @@ pub struct GraphSpan {
#[derive(Deserialize)]
pub struct Dashboard {
pub title: String,
pub graphs: Vec<Graph>,
pub graphs: Option<Vec<Graph>>,
pub logs: Option<Vec<LogStream>>,
pub span: Option<GraphSpan>,
}
@ -92,6 +93,8 @@ pub enum Orientation {
Vertical,
}
// NOTE(zapher): These two structs look repetitive but we haven't hit the rule of three yet.
// If we do then it might be time to restructure them a bit.
#[derive(Deserialize)]
pub struct Graph {
pub title: String,
@ -103,11 +106,23 @@ pub struct Graph {
pub d3_tick_format: Option<String>,
}
pub async fn query_data(graph: &Graph, dash: &Dashboard, query_span: Option<GraphSpan>) -> Result<Vec<PromQueryResult>> {
#[derive(Deserialize)]
pub struct LogStream {
pub title: String,
pub legend_orientation: Option<Orientation>,
pub source: String,
pub yaxes: Vec<AxisDefinition>,
pub query: String,
pub span: Option<GraphSpan>,
pub limit: Option<usize>,
pub query_type: QueryType,
}
pub async fn prom_query_data(graph: &Graph, dash: &Dashboard, query_span: Option<GraphSpan>) -> Result<Vec<QueryResult>> {
let connections = graph.get_query_connections(&dash.span, &query_span);
let mut data = Vec::new();
for conn in connections {
data.push(to_samples(
data.push(prom_to_samples(
conn.get_results()
.await?
.data()
@ -118,6 +133,17 @@ pub async fn query_data(graph: &Graph, dash: &Dashboard, query_span: Option<Grap
Ok(data)
}
pub async fn loki_query_data(stream: &LogStream, dash: &Dashboard, query_span: Option<GraphSpan>) -> Result<QueryResult> {
let conn = stream.get_query_connection(&dash.span, &query_span);
let response = conn.get_results().await?;
if response.status == "success" {
Ok(loki_to_sample(response.data))
} else {
// TODO(jwall): Better error handling than this
panic!("Loki query status: {}", response.status)
}
}
fn duration_from_string(duration_string: &str) -> Option<Duration> {
match parse_duration::parse(duration_string) {
Ok(d) => match Duration::from_std(d) {
@ -178,7 +204,7 @@ impl Graph {
debug!(
query = plot.query,
source = plot.source,
"Getting query connection for graph"
"Getting query connection for graph",
);
let mut conn = PromQueryConn::new(&plot.source, &plot.query, self.query_type.clone(), plot.meta.clone());
// Query params take precendence over all other settings. Then graph settings take
@ -196,6 +222,34 @@ impl Graph {
}
}
impl LogStream {
pub fn get_query_connection<'conn, 'stream: 'conn>(
&'stream self,
graph_span: &'stream Option<GraphSpan>,
query_span: &'stream Option<GraphSpan>,
) -> LokiConn<'conn> {
debug!(
query = self.query,
source = self.source,
"Getting query connection for log streams",
);
let mut conn = LokiConn::new(&self.source, &self.query, self.query_type.clone());
// Query params take precendence over all other settings. Then graph settings take
// precedences and finally the dashboard settings take precendence
if let Some((end, duration, step_duration)) = graph_span_to_tuple(query_span) {
conn = conn.with_span(end, duration, step_duration);
} else if let Some((end, duration, step_duration)) = graph_span_to_tuple(&self.span) {
conn = conn.with_span(end, duration, step_duration);
} else if let Some((end, duration, step_duration)) = graph_span_to_tuple(graph_span) {
conn = conn.with_span(end, duration, step_duration);
}
if let Some(limit) = self.limit {
conn = conn.with_limit(limit);
}
conn
}
}
pub fn read_dashboard_list(path: &Path) -> anyhow::Result<Vec<Dashboard>> {
let f = std::fs::File::open(path)?;
Ok(serde_yaml::from_reader(f)?)

View File

@ -15,7 +15,7 @@ use std::path::PathBuf;
use anyhow;
use axum::{self, extract::State, routing::*, Router};
use clap::{self, Parser, ValueEnum};
use dashboard::{Dashboard, query_data};
use dashboard::{Dashboard, prom_query_data};
use tokio::net::TcpListener;
use tower_http::trace::TraceLayer;
use tracing::{error, info};
@ -49,12 +49,14 @@ struct Cli {
}
async fn validate(dash: &Dashboard) -> anyhow::Result<()> {
for graph in dash.graphs.iter() {
let data = query_data(graph, &dash, None).await;
if data.is_err() {
error!(err=?data, "Invalid dashboard query or queries");
if let Some(ref graphs) = dash.graphs {
for graph in graphs.iter() {
let data = prom_query_data(graph, &dash, None).await;
if data.is_err() {
error!(err=?data, "Invalid dashboard query or queries");
}
let _ = data?;
}
let _ = data?;
}
return Ok(());
}

View File

@ -17,11 +17,12 @@ use anyhow::Result;
use chrono::prelude::*;
use reqwest;
use serde::{Deserialize, Serialize};
use tracing::{debug, error};
use super::{QueryType, TimeSpan};
use super::{LogLine, QueryResult, QueryType, TimeSpan};
// TODO(jwall): Should I allow non stream returns?
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
pub enum ResultType {
/// Returned by query endpoints
#[serde(rename = "vector")]
@ -34,27 +35,81 @@ pub enum ResultType {
Streams,
}
#[derive(Serialize, Deserialize)]
// Note that the value and volue types return a pair where the first item is a string but
// will in actuality always be an f64 number.
#[derive(Serialize, Deserialize, Debug)]
pub struct LokiResult {
#[serde(alias = "metric")]
#[serde(alias = "stream")]
labels: Option<HashMap<String, String>>,
value: Option<(i64, String)>,
/// The only version that returns log lines
values: Option<Vec<(f64, String)>>,
labels: HashMap<String, String>,
/// Calculated Value returned by vector result types
value: Option<(String, String)>,
/// Stream of Log lines, Returned by matrix and stream result types
values: Option<Vec<(String, String)>>,
}
#[derive(Serialize, Deserialize)]
pub struct LokiResponse {
pub status: String,
pub data: LokiData,
}
#[derive(Serialize, Deserialize)]
pub struct LokiData {
#[serde(rename = "resultType")]
result_type: ResultType,
result: Vec<LokiResult>,
//stats: // TODO
}
#[derive(Serialize, Deserialize)]
pub struct LokiResponse {
status: String,
data: LokiData,
pub fn loki_to_sample(data: LokiData) -> QueryResult {
match data.result_type {
ResultType::Vector => {
let mut values = Vec::with_capacity(data.result.len());
for result in data.result {
if let Some(value) = result.value {
values.push((
result.labels,
LogLine {
timestamp: value.0.parse::<f64>().expect("Invalid f64 type"),
line: value.1,
},
));
} else {
error!(
?result,
"Invalid LokiResult: No value field when result type is {:?}",
data.result_type,
);
}
}
QueryResult::StreamInstant(values)
}
ResultType::Matrix | ResultType::Streams => {
let mut values = Vec::with_capacity(data.result.len());
for result in data.result {
if let Some(value) = result.values {
values.push((
result.labels,
value
.into_iter()
.map(|(timestamp, line)| LogLine {
timestamp: timestamp.parse::<f64>().expect("Invalid f64 type"),
line,
})
.collect(),
));
} else {
error!(
?result,
"Invalid LokiResult: No values field when result type is {:?}",
data.result_type,
);
}
}
QueryResult::Stream(values)
}
}
}
pub struct LokiConn<'conn> {
@ -98,30 +153,38 @@ impl<'conn> LokiConn<'conn> {
self
}
pub async fn get_results(&self) -> Result<LokiResult> {
pub async fn get_results(&self) -> Result<LokiResponse> {
let url = match self.query_type {
QueryType::Scalar => format!("{}{}", self.url, SCALAR_API_PATH),
QueryType::Range => format!("{}{}", self.url, RANGE_API_PATH),
};
let client = reqwest::Client::new();
let mut req = client.get(url).query(&["query", self.query]);
let mut req = client.get(url).query(&[("query", self.query)]);
debug!(?req, "Building loki reqwest client");
if self.limit.is_some() {
req = req.query(&["limit", &self.limit.map(|u| u.to_string()).unwrap()]);
debug!(?req, "adding limit");
req = req.query(&[("limit", &self.limit.map(|u| u.to_string()).unwrap())]);
}
if let QueryType::Range = self.query_type {
let (start, end, step_resolution) = if let Some(span) = &self.span {
let start = span.end - span.duration;
(start.timestamp(), span.end.timestamp(), span.step_seconds as f64)
debug!(?req, "Configuring span query params");
let (since, end, step_resolution) = if let Some(span) = &self.span {
(
span.duration,
span.end.timestamp(),
span.step_seconds as f64,
)
} else {
let end = Utc::now();
let start = end - chrono::Duration::minutes(10);
(start.timestamp(), end.timestamp(), 30 as f64)
(chrono::Duration::minutes(10), end.timestamp(), 30 as f64)
};
req = req.query(&["end", &end.to_string()]);
req = req.query(&["since", &start.to_string()]);
req = req.query(&["step", &step_resolution.to_string()]);
req = req.query(&[
("end", &end.to_string()),
("since", &format!("{}s", since.num_seconds())),
("step", &step_resolution.to_string()),
]);
}
debug!(?req, "Sending request");
Ok(req.send().await?.json().await?)
}
}

View File

@ -11,9 +11,13 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::Deserialize;
use std::collections::HashMap;
use serde::{Serialize, Deserialize};
use chrono::prelude::*;
use crate::dashboard::PlotMeta;
mod loki;
mod prom;
@ -31,5 +35,62 @@ pub struct TimeSpan {
}
pub use prom::*;
#[derive(Serialize, Deserialize, Debug)]
pub struct DataPoint {
timestamp: f64,
value: f64,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct LogLine {
timestamp: f64,
line: String,
}
#[derive(Serialize, Deserialize)]
pub enum QueryResult {
Series(Vec<(HashMap<String, String>, PlotMeta, Vec<DataPoint>)>),
Scalar(Vec<(HashMap<String, String>, PlotMeta, DataPoint)>),
StreamInstant(Vec<(HashMap<String, String>, LogLine)>),
Stream(Vec<(HashMap<String, String>, Vec<LogLine>)>),
}
impl std::fmt::Debug for QueryResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
QueryResult::Series(v) => {
f.write_fmt(format_args!("Series trace count = {}", v.len()))?;
for (idx, (tags, meta, trace)) in v.iter().enumerate() {
f.write_fmt(format_args!(
"; {}: tags {:?} meta: {:?} datapoint count = {};",
idx,
tags,
meta,
trace.len()
))?;
}
}
QueryResult::Scalar(v) => {
f.write_fmt(format_args!("{} traces", v.len()))?;
}
QueryResult::StreamInstant(v) => {
f.write_fmt(format_args!("{} traces", v.len()))?;
}
QueryResult::Stream(v) => {
f.write_fmt(format_args!("stream trace count = {}", v.len()))?;
for (idx, (tags, trace)) in v.iter().enumerate() {
f.write_fmt(format_args!(
"; {}: tags {:?} line count = {}",
idx,
tags,
trace.len()
))?
}
}
}
Ok(())
}
}
pub use prom::*;
pub use loki::*;

View File

@ -18,12 +18,11 @@ use prometheus_http_query::{
response::{Data, PromqlResult},
Client,
};
use serde::{Deserialize, Serialize};
use tracing::debug;
use crate::dashboard::PlotMeta;
use super::{QueryType, TimeSpan};
use super::{QueryType, TimeSpan, QueryResult, DataPoint};
#[derive(Debug)]
pub struct PromQueryConn<'conn> {
@ -35,7 +34,12 @@ pub struct PromQueryConn<'conn> {
}
impl<'conn> PromQueryConn<'conn> {
pub fn new<'a: 'conn>(source: &'a str, query: &'a str, query_type: QueryType, meta: PlotMeta) -> Self {
pub fn new<'a: 'conn>(
source: &'a str,
query: &'a str,
query_type: QueryType,
meta: PlotMeta,
) -> Self {
Self {
source,
query,
@ -102,44 +106,9 @@ impl<'conn> PromQueryConn<'conn> {
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct DataPoint {
timestamp: f64,
value: f64,
}
#[derive(Serialize, Deserialize)]
pub enum PromQueryResult {
Series(Vec<(HashMap<String, String>, PlotMeta, Vec<DataPoint>)>),
Scalar(Vec<(HashMap<String, String>, PlotMeta, DataPoint)>),
}
impl std::fmt::Debug for PromQueryResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PromQueryResult::Series(v) => {
f.write_fmt(format_args!("Series trace count = {}", v.len()))?;
for (idx, (tags, meta, trace)) in v.iter().enumerate() {
f.write_fmt(format_args!(
"; {}: tags {:?} meta: {:?} datapoint count = {};",
idx,
tags,
meta,
trace.len()
))?;
}
}
PromQueryResult::Scalar(v) => {
f.write_fmt(format_args!("{} traces", v.len()))?;
}
}
Ok(())
}
}
pub fn to_samples(data: Data, meta: PlotMeta) -> PromQueryResult {
pub fn prom_to_samples(data: Data, meta: PlotMeta) -> QueryResult {
match data {
Data::Matrix(mut range) => PromQueryResult::Series(
Data::Matrix(mut range) => QueryResult::Series(
range
.drain(0..)
.map(|rv| {
@ -158,7 +127,7 @@ pub fn to_samples(data: Data, meta: PlotMeta) -> PromQueryResult {
})
.collect(),
),
Data::Vector(mut vector) => PromQueryResult::Scalar(
Data::Vector(mut vector) => QueryResult::Scalar(
vector
.drain(0..)
.map(|iv| {
@ -174,7 +143,7 @@ pub fn to_samples(data: Data, meta: PlotMeta) -> PromQueryResult {
})
.collect(),
),
Data::Scalar(sample) => PromQueryResult::Scalar(vec![(
Data::Scalar(sample) => QueryResult::Scalar(vec![(
HashMap::new(),
meta.clone(),
DataPoint {

View File

@ -25,8 +25,8 @@ use maud::{html, Markup};
use serde::{Serialize, Deserialize};
use tracing::debug;
use crate::dashboard::{Dashboard, Graph, GraphSpan, AxisDefinition, Orientation, query_data};
use crate::query::PromQueryResult;
use crate::dashboard::{Dashboard, Graph, GraphSpan, AxisDefinition, Orientation, prom_query_data, loki_query_data};
use crate::query::QueryResult;
type Config = State<Arc<Vec<Dashboard>>>;
@ -34,7 +34,21 @@ type Config = State<Arc<Vec<Dashboard>>>;
pub struct GraphPayload {
pub legend_orientation: Option<Orientation>,
pub yaxes: Vec<AxisDefinition>,
pub plots: Vec<PromQueryResult>,
pub plots: Vec<QueryResult>,
}
// TODO(jwall): Should this be a completely different payload?
pub async fn loki_query(
State(config): Config,
Path((dash_idx, loki_idx)): Path<(usize, usize)>,
Query(query): Query<HashMap<String, String>>,
) -> Json<GraphPayload> {
let dash = config.get(dash_idx).expect(&format!("No such dashboard index {}", dash_idx));
let log = dash.logs
.as_ref().expect("No logs in this dashboard")
.get(loki_idx).expect(&format!("No such log query {}", loki_idx));
let plots = vec![loki_query_data(log, dash, query_to_graph_span(query)).await.expect("Unable to get log query results")];
Json(GraphPayload{legend_orientation: None, yaxes: log.yaxes.clone(), plots})
}
pub async fn graph_query(
@ -43,11 +57,17 @@ pub async fn graph_query(
Query(query): Query<HashMap<String, String>>,
) -> Json<GraphPayload> {
debug!("Getting data for query");
let dash = config.get(dash_idx).expect("No such dashboard index");
let dash = config.get(dash_idx).expect(&format!("No such dashboard index {}", dash_idx));
let graph = dash
.graphs
.as_ref().expect("No graphs in this dashboard")
.get(graph_idx)
.expect(&format!("No such graph in dasboard {}", dash_idx));
let plots = prom_query_data(graph, dash, query_to_graph_span(query)).await.expect("Unable to get query results");
Json(GraphPayload{legend_orientation: graph.legend_orientation.clone(), yaxes: graph.yaxes.clone(), plots})
}
fn query_to_graph_span(query: HashMap<String, String>) -> Option<GraphSpan> {
let query_span = {
if query.contains_key("end")
&& query.contains_key("duration")
@ -62,15 +82,19 @@ pub async fn graph_query(
None
}
};
let plots = query_data(graph, dash, query_span).await.expect("Unable to get query results");
Json(GraphPayload{legend_orientation: graph.legend_orientation.clone(), yaxes: graph.yaxes.clone(), plots})
query_span
}
pub fn mk_api_routes(config: Arc<Vec<Dashboard>>) -> Router<Config> {
// Query routes
Router::new().route(
Router::new()
.route(
"/dash/:dash_idx/graph/:graph_idx",
get(graph_query).with_state(config),
get(graph_query).with_state(config.clone()),
)
.route(
"/dash/:dash_idx/log/:log_idx",
get(loki_query).with_state(config),
)
}
@ -82,9 +106,9 @@ pub fn graph_component(dash_idx: usize, graph_idx: usize, graph: &Graph) -> Mark
div {
h2 { (graph.title) " - " a href=(graph_embed_uri) { "embed url" } }
@if graph.d3_tick_format.is_some() {
timeseries-graph uri=(graph_data_uri) id=(graph_id) d3-tick-format=(graph.d3_tick_format.as_ref().unwrap()) { }
graph-plot uri=(graph_data_uri) id=(graph_id) d3-tick-format=(graph.d3_tick_format.as_ref().unwrap()) { }
} @else {
timeseries-graph uri=(graph_data_uri) id=(graph_id) { }
graph-plot uri=(graph_data_uri) id=(graph_id) { }
}
}
)
@ -96,8 +120,9 @@ pub async fn graph_ui(
) -> Markup {
let graph = config
.get(dash_idx)
.expect("No such dashboard")
.expect(&format!("No such dashboard {}", dash_idx))
.graphs
.as_ref().expect("No graphs in this dashboard")
.get(graph_idx)
.expect("No such graph");
graph_component(dash_idx, graph_idx, graph)
@ -109,9 +134,10 @@ pub async fn dash_ui(State(config): State<Config>, Path(dash_idx): Path<usize>)
}
fn dash_elements(config: State<Arc<Vec<Dashboard>>>, dash_idx: usize) -> maud::PreEscaped<String> {
let dash = config.get(dash_idx).expect("No such dashboard");
let dash = config.get(dash_idx).expect(&format!("No such dashboard {}", dash_idx));
let graph_iter = dash
.graphs
.as_ref().expect("No graphs in this dashboard")
.iter()
.enumerate()
.collect::<Vec<(usize, &Graph)>>();

View File

@ -16,7 +16,7 @@ function getCssVariableValue(variableName) {
return getComputedStyle(document.documentElement).getPropertyValue(variableName);
}
class TimeseriesGraph extends HTMLElement {
class GraphPlot extends HTMLElement {
#uri;
#width;
#height;
@ -94,7 +94,7 @@ class TimeseriesGraph extends HTMLElement {
this.stopInterval()
}
static elementName = "timeseries-graph";
static elementName = "graph-plot";
getTargetNode() {
return this.#targetNode;
@ -122,8 +122,8 @@ class TimeseriesGraph extends HTMLElement {
}
static registerElement() {
if (!customElements.get(TimeseriesGraph.elementName)) {
customElements.define(TimeseriesGraph.elementName, TimeseriesGraph);
if (!customElements.get(GraphPlot.elementName)) {
customElements.define(GraphPlot.elementName, GraphPlot);
}
}
@ -350,7 +350,7 @@ class TimeseriesGraph extends HTMLElement {
}
}
TimeseriesGraph.registerElement();
GraphPlot.registerElement();
class SpanSelector extends HTMLElement {
#targetNode = null;
@ -389,7 +389,7 @@ class SpanSelector extends HTMLElement {
}
updateGraphs() {
for (var node of document.getElementsByTagName(TimeseriesGraph.elementName)) {
for (var node of document.getElementsByTagName(GraphPlot.elementName)) {
node.setAttribute('end', this.#endInput.value);
node.setAttribute('duration', this.#durationInput.value);
node.setAttribute('step-duration', this.#stepDurationInput.value);

View File

@ -84,7 +84,7 @@ body * {
flex: 0 1 auto;
}
timeseries-graph {
graph-plot {
background-color: var(--paper-background-color);
border-radius: 4px;
display: flex;