refactor: Query Metrics vs Logs payloads

This commit is contained in:
Jeremy Wall 2024-03-21 19:59:01 -04:00
parent a2cfaff490
commit 471d159af7
7 changed files with 141 additions and 62 deletions

View File

@ -1,4 +1,3 @@
use std::collections::HashMap;
// Copyright 2023 Jeremy Wall // Copyright 2023 Jeremy Wall
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
@ -13,6 +12,7 @@ use std::collections::HashMap;
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::path::Path; use std::path::Path;
use std::collections::HashMap;
use anyhow::Result; use anyhow::Result;
use chrono::prelude::*; use chrono::prelude::*;
@ -21,8 +21,9 @@ use serde::{Deserialize, Serialize};
use serde_yaml; use serde_yaml;
use tracing::{debug, error}; use tracing::{debug, error};
use crate::query::LogQueryResult;
use crate::query::{ use crate::query::{
loki_to_sample, prom_to_samples, LokiConn, PromQueryConn, QueryResult, QueryType, loki_to_sample, prom_to_samples, LokiConn, PromQueryConn, MetricsQueryResult, QueryType,
}; };
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
@ -126,7 +127,7 @@ pub async fn prom_query_data<'a>(
dash: &Dashboard, dash: &Dashboard,
query_span: Option<GraphSpan>, query_span: Option<GraphSpan>,
filters: &Option<HashMap<&'a str, &'a str>>, filters: &Option<HashMap<&'a str, &'a str>>,
) -> Result<Vec<QueryResult>> { ) -> Result<Vec<MetricsQueryResult>> {
let connections = graph.get_query_connections(&dash.span, &query_span, filters); let connections = graph.get_query_connections(&dash.span, &query_span, filters);
let mut data = Vec::new(); let mut data = Vec::new();
for conn in connections { for conn in connections {
@ -142,7 +143,7 @@ pub async fn loki_query_data(
stream: &LogStream, stream: &LogStream,
dash: &Dashboard, dash: &Dashboard,
query_span: Option<GraphSpan>, query_span: Option<GraphSpan>,
) -> Result<QueryResult> { ) -> Result<LogQueryResult> {
let conn = stream.get_query_connection(&dash.span, &query_span); let conn = stream.get_query_connection(&dash.span, &query_span);
let response = conn.get_results().await?; let response = conn.get_results().await?;
if response.status == "success" { if response.status == "success" {

View File

@ -19,7 +19,7 @@ use reqwest;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tracing::{debug, error}; use tracing::{debug, error};
use super::{LogLine, QueryResult, QueryType, TimeSpan}; use super::{LogLine, LogQueryResult, QueryType, TimeSpan};
// TODO(jwall): Should I allow non stream returns? // TODO(jwall): Should I allow non stream returns?
#[derive(Serialize, Deserialize, Debug, PartialEq)] #[derive(Serialize, Deserialize, Debug, PartialEq)]
@ -62,7 +62,7 @@ pub struct LokiData {
//stats: // TODO //stats: // TODO
} }
pub fn loki_to_sample(data: LokiData) -> QueryResult { pub fn loki_to_sample(data: LokiData) -> LogQueryResult {
match data.result_type { match data.result_type {
ResultType::Vector => { ResultType::Vector => {
let mut values = Vec::with_capacity(data.result.len()); let mut values = Vec::with_capacity(data.result.len());
@ -83,7 +83,7 @@ pub fn loki_to_sample(data: LokiData) -> QueryResult {
); );
} }
} }
QueryResult::StreamInstant(values) LogQueryResult::StreamInstant(values)
} }
// Stream types are nanoseconds. // Matrix types are seconds // Stream types are nanoseconds. // Matrix types are seconds
ResultType::Matrix | ResultType::Streams => { ResultType::Matrix | ResultType::Streams => {
@ -109,7 +109,7 @@ pub fn loki_to_sample(data: LokiData) -> QueryResult {
); );
} }
} }
QueryResult::Stream(values) LogQueryResult::Stream(values)
} }
} }
} }

View File

@ -47,17 +47,21 @@ pub struct LogLine {
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub enum QueryResult { pub enum MetricsQueryResult {
Series(Vec<(HashMap<String, String>, PlotMeta, Vec<DataPoint>)>), Series(Vec<(HashMap<String, String>, PlotMeta, Vec<DataPoint>)>),
Scalar(Vec<(HashMap<String, String>, PlotMeta, DataPoint)>), Scalar(Vec<(HashMap<String, String>, PlotMeta, DataPoint)>),
}
#[derive(Serialize, Deserialize)]
pub enum LogQueryResult {
StreamInstant(Vec<(HashMap<String, String>, LogLine)>), StreamInstant(Vec<(HashMap<String, String>, LogLine)>),
Stream(Vec<(HashMap<String, String>, Vec<LogLine>)>), Stream(Vec<(HashMap<String, String>, Vec<LogLine>)>),
} }
impl std::fmt::Debug for QueryResult { impl std::fmt::Debug for MetricsQueryResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {
QueryResult::Series(v) => { MetricsQueryResult::Series(v) => {
f.write_fmt(format_args!("Series trace count = {}", v.len()))?; f.write_fmt(format_args!("Series trace count = {}", v.len()))?;
for (idx, (tags, meta, trace)) in v.iter().enumerate() { for (idx, (tags, meta, trace)) in v.iter().enumerate() {
f.write_fmt(format_args!( f.write_fmt(format_args!(
@ -69,13 +73,21 @@ impl std::fmt::Debug for QueryResult {
))?; ))?;
} }
} }
QueryResult::Scalar(v) => { MetricsQueryResult::Scalar(v) => {
f.write_fmt(format_args!("{} traces", v.len()))?; f.write_fmt(format_args!("{} traces", v.len()))?;
} }
QueryResult::StreamInstant(v) => { }
Ok(())
}
}
impl std::fmt::Debug for LogQueryResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LogQueryResult::StreamInstant(v) => {
f.write_fmt(format_args!("{} traces", v.len()))?; f.write_fmt(format_args!("{} traces", v.len()))?;
} }
QueryResult::Stream(v) => { LogQueryResult::Stream(v) => {
f.write_fmt(format_args!("stream trace count = {}", v.len()))?; f.write_fmt(format_args!("stream trace count = {}", v.len()))?;
for (idx, (tags, trace)) in v.iter().enumerate() { for (idx, (tags, trace)) in v.iter().enumerate() {
f.write_fmt(format_args!( f.write_fmt(format_args!(
@ -90,6 +102,5 @@ impl std::fmt::Debug for QueryResult {
Ok(()) Ok(())
} }
} }
pub use loki::*; pub use loki::*;
pub use prom::*; pub use prom::*;

View File

@ -22,7 +22,7 @@ use tracing::debug;
use crate::dashboard::PlotMeta; use crate::dashboard::PlotMeta;
use super::{DataPoint, QueryResult, QueryType, TimeSpan}; use super::{DataPoint, MetricsQueryResult, QueryType, TimeSpan};
pub const FILTER_PLACEHOLDER: &'static str = "FILTERS"; pub const FILTER_PLACEHOLDER: &'static str = "FILTERS";
pub const FILTER_COMMA_PLACEHOLDER: &'static str = ",FILTERS"; pub const FILTER_COMMA_PLACEHOLDER: &'static str = ",FILTERS";
@ -159,9 +159,9 @@ impl<'conn> PromQueryConn<'conn> {
} }
} }
pub fn prom_to_samples(data: Data, meta: PlotMeta) -> QueryResult { pub fn prom_to_samples(data: Data, meta: PlotMeta) -> MetricsQueryResult {
match data { match data {
Data::Matrix(mut range) => QueryResult::Series( Data::Matrix(mut range) => MetricsQueryResult::Series(
range range
.drain(0..) .drain(0..)
.map(|rv| { .map(|rv| {
@ -180,7 +180,7 @@ pub fn prom_to_samples(data: Data, meta: PlotMeta) -> QueryResult {
}) })
.collect(), .collect(),
), ),
Data::Vector(mut vector) => QueryResult::Scalar( Data::Vector(mut vector) => MetricsQueryResult::Scalar(
vector vector
.drain(0..) .drain(0..)
.map(|iv| { .map(|iv| {
@ -196,7 +196,7 @@ pub fn prom_to_samples(data: Data, meta: PlotMeta) -> QueryResult {
}) })
.collect(), .collect(),
), ),
Data::Scalar(sample) => QueryResult::Scalar(vec![( Data::Scalar(sample) => MetricsQueryResult::Scalar(vec![(
HashMap::new(), HashMap::new(),
meta.clone(), meta.clone(),
DataPoint { DataPoint {

View File

@ -28,15 +28,26 @@ use tracing::debug;
use crate::dashboard::{ use crate::dashboard::{
loki_query_data, prom_query_data, AxisDefinition, Dashboard, Graph, GraphSpan, Orientation, LogStream, loki_query_data, prom_query_data, AxisDefinition, Dashboard, Graph, GraphSpan, Orientation, LogStream,
}; };
use crate::query::{self, QueryResult}; use crate::query::{self, MetricsQueryResult, LogQueryResult};
type Config = State<Arc<Vec<Dashboard>>>; type Config = State<Arc<Vec<Dashboard>>>;
#[derive(Serialize, Deserialize)]
pub enum QueryPayload {
Metrics(GraphPayload),
Logs(LogsPayload),
}
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct GraphPayload { pub struct GraphPayload {
pub legend_orientation: Option<Orientation>, pub legend_orientation: Option<Orientation>,
pub yaxes: Vec<AxisDefinition>, pub yaxes: Vec<AxisDefinition>,
pub plots: Vec<QueryResult>, pub plots: Vec<MetricsQueryResult>,
}
#[derive(Serialize, Deserialize)]
pub struct LogsPayload {
pub lines: LogQueryResult,
} }
// TODO(jwall): Should this be a completely different payload? // TODO(jwall): Should this be a completely different payload?
@ -44,7 +55,7 @@ pub async fn loki_query(
State(config): Config, State(config): Config,
Path((dash_idx, loki_idx)): Path<(usize, usize)>, Path((dash_idx, loki_idx)): Path<(usize, usize)>,
Query(query): Query<HashMap<String, String>>, Query(query): Query<HashMap<String, String>>,
) -> Json<GraphPayload> { ) -> Json<QueryPayload> {
let dash = config let dash = config
.get(dash_idx) .get(dash_idx)
.expect(&format!("No such dashboard index {}", dash_idx)); .expect(&format!("No such dashboard index {}", dash_idx));
@ -54,21 +65,19 @@ pub async fn loki_query(
.expect("No logs in this dashboard") .expect("No logs in this dashboard")
.get(loki_idx) .get(loki_idx)
.expect(&format!("No such log query {}", loki_idx)); .expect(&format!("No such log query {}", loki_idx));
let plots = vec![loki_query_data(log, dash, query_to_graph_span(&query)) let lines = loki_query_data(log, dash, query_to_graph_span(&query))
.await .await
.expect("Unable to get log query results")]; .expect("Unable to get log query results");
Json(GraphPayload { Json(QueryPayload::Logs(LogsPayload {
legend_orientation: None, lines,
yaxes: log.yaxes.clone(), }))
plots,
})
} }
pub async fn graph_query( pub async fn graph_query(
State(config): Config, State(config): Config,
Path((dash_idx, graph_idx)): Path<(usize, usize)>, Path((dash_idx, graph_idx)): Path<(usize, usize)>,
Query(query): Query<HashMap<String, String>>, Query(query): Query<HashMap<String, String>>,
) -> Json<GraphPayload> { ) -> Json<QueryPayload> {
debug!("Getting data for query"); debug!("Getting data for query");
let dash = config let dash = config
.get(dash_idx) .get(dash_idx)
@ -83,11 +92,11 @@ pub async fn graph_query(
let plots = prom_query_data(graph, dash, query_to_graph_span(&query), &filters) let plots = prom_query_data(graph, dash, query_to_graph_span(&query), &filters)
.await .await
.expect("Unable to get query results"); .expect("Unable to get query results");
Json(GraphPayload { Json(QueryPayload::Metrics(GraphPayload {
legend_orientation: graph.legend_orientation.clone(), legend_orientation: graph.legend_orientation.clone(),
yaxes: graph.yaxes.clone(), yaxes: graph.yaxes.clone(),
plots, plots,
}) }))
} }
fn query_to_filterset<'v, 'a: 'v>(query: &'a HashMap<String, String>) -> Option<HashMap<&'v str, &'v str>> { fn query_to_filterset<'v, 'a: 'v>(query: &'a HashMap<String, String>) -> Option<HashMap<&'v str, &'v str>> {

View File

@ -17,6 +17,11 @@
* @type {object} * @type {object}
* @property {Array=} Series * @property {Array=} Series
* @property {Array=} Scalar * @property {Array=} Scalar
*/
/**
* @typedef LogLineList
* @type {object}
* @property {Array=} StreamInstant - Timestamps are in seconds * @property {Array=} StreamInstant - Timestamps are in seconds
* @property {Array=} Stream - Timestamps are in nanoseconds * @property {Array=} Stream - Timestamps are in nanoseconds
*/ */
@ -29,6 +34,11 @@
* @property {Array<PlotList>} plots * @property {Array<PlotList>} plots
*/ */
/**
* @typedef QueryPayload
* @type {{Metrics: QueryData, Logs: {lines: LogLineList}}}
*/
/** /**
* @typedef HeaderOrCell * @typedef HeaderOrCell
* @type {object} * @type {object}

View File

@ -195,7 +195,7 @@ export class GraphPlot extends HTMLElement {
self.stopInterval() self.stopInterval()
self.fetchData().then((data) => { self.fetchData().then((data) => {
if (!updateOnly) { if (!updateOnly) {
self.getLabelsForData(data); self.getLabelsForData(data.Metrics || data.Logs.Lines);
self.buildFilterMenu(); self.buildFilterMenu();
} }
self.updateGraph(data).then(() => { self.updateGraph(data).then(() => {
@ -240,7 +240,7 @@ export class GraphPlot extends HTMLElement {
/** /**
* Returns the data from an api call. * Returns the data from an api call.
* *
* @return {Promise<QueryData>} * @return {Promise<QueryPayload>}
*/ */
async fetchData() { async fetchData() {
// TODO(zaphar): Can we do some massaging on these // TODO(zaphar): Can we do some massaging on these
@ -362,7 +362,7 @@ export class GraphPlot extends HTMLElement {
} }
/** /**
* @param {QueryData} graph * @param {QueryData|LogLineList} graph
*/ */
getLabelsForData(graph) { getLabelsForData(graph) {
const data = graph.plots; const data = graph.plots;
@ -385,6 +385,9 @@ export class GraphPlot extends HTMLElement {
this.populateFilterData(labels); this.populateFilterData(labels);
} }
} }
if (subplot.StreamInstant) {
// TODO(zaphar): Handle this?
}
} }
} }
@ -465,6 +468,8 @@ export class GraphPlot extends HTMLElement {
/** /**
* @param {Array} stream * @param {Array} stream
*
* @returns {{dates: Array<string>, meta: Array<string>, lines: Array<string>}}
*/ */
buildStreamPlot(stream) { buildStreamPlot(stream) {
const dateColumn = []; const dateColumn = [];
@ -491,19 +496,86 @@ export class GraphPlot extends HTMLElement {
logColumn.push(ansiToHtml(line.line)); logColumn.push(ansiToHtml(line.line));
} }
} }
return [dateColumn, metaColumn, logColumn]; return { dates: dateColumn, meta: metaColumn, lines: logColumn };
} }
/** /**
* Update the graph with new data. * Update the graph with new data.
* *
* @param {?QueryData=} maybeGraph * @param {?QueryPayload=} maybeGraph
*/ */
async updateGraph(maybeGraph) { async updateGraph(maybeGraph) {
var graph = maybeGraph; var graph = maybeGraph;
if (!graph) { if (!graph) {
graph = await this.fetchData(); graph = await this.fetchData();
} }
if (graph.Metrics) {
this.updateMetricsGraph(graph.Metrics);
} else if (graph.Logs) {
this.updateLogsView(graph.Logs.lines);
} else {
}
}
/**
* Update the logs view with new data.
*
* @param {?LogLineList=} logLineList
*/
updateLogsView(logLineList) {
var layout = {
displayModeBar: false,
responsive: true,
plot_bgcolor: getCssVariableValue('--plot-background-color').trim(),
paper_bgcolor: getCssVariableValue('--paper-background-color').trim(),
font: {
color: getCssVariableValue('--text-color').trim()
},
xaxis: {
gridcolor: getCssVariableValue("--grid-line-color")
},
legend: {
orientation: 'v'
}
};
var traces = [];
if (logLineList.Stream) {
// TODO(jwall): It's possible that this should actually be a separate custom
// element.
const trace = /** @type TableTrace */ ({
type: "table",
columnwidth: [15, 20, 70],
header: {
align: "left",
values: ["Timestamp", "Labels", "Log"],
fill: { color: layout.xaxis.paper_bgcolor },
font: { color: getCssVariableValue('--text-color').trim() }
},
cells: {
align: "left",
values: [],
fill: { color: layout.plot_bgcolor }
},
});
const columns = this.buildStreamPlot(logLineList.Stream);
trace.cells.values.push(columns.dates);
trace.cells.values.push(columns.meta);
trace.cells.values.push(columns.lines);
traces.push(trace);
} else if (logLineList.StreamInstant) {
// TODO(zaphar): Handle this?
}
// https://plotly.com/javascript/plotlyjs-function-reference/#plotlyreact
// @ts-ignore
Plotly.react(this.getTargetNode(), traces, layout, null);
}
/**
* Update the metrics graph with new data.
*
* @param {?QueryData=} graph
*/
updateMetricsGraph(graph) {
var data = graph.plots; var data = graph.plots;
var yaxes = graph.yaxes; var yaxes = graph.yaxes;
var layout = { var layout = {
@ -550,30 +622,6 @@ export class GraphPlot extends HTMLElement {
traces.push(trace); traces.push(trace);
} }
} }
} else if (subplot.Stream) {
// TODO(jwall): It would be nice if scroll behavior would handle replots better.
// TODO(jwall): It's possible that this should actually be a separate custom
// element.
const trace = /** @type TableTrace */({
type: "table",
columnwidth: [15, 20, 70],
header: {
align: "left",
values: ["Timestamp", "Labels", "Log"],
fill: { color: layout.xaxis.paper_bgcolor },
font: { color: getCssVariableValue('--text-color').trim() }
},
cells: {
align: "left",
values: [],
fill: { color: layout.plot_bgcolor }
},
});
const [dateColumn, metaColumn, logColumn] = this.buildStreamPlot(subplot.Stream);
trace.cells.values.push(dateColumn);
trace.cells.values.push(metaColumn);
trace.cells.values.push(logColumn);
traces.push(trace);
} }
} }
// https://plotly.com/javascript/plotlyjs-function-reference/#plotlyreact // https://plotly.com/javascript/plotlyjs-function-reference/#plotlyreact