From 16ff43f4e2de3d094e7b0b399d5e0d00b419d6d7 Mon Sep 17 00:00:00 2001 From: Jeremy Wall Date: Mon, 26 Feb 2024 19:13:05 -0500 Subject: [PATCH] feat: Loki logql query client --- Cargo.lock | 166 ++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/dashboard.rs | 8 +- src/query/loki.rs | 127 ++++++++++++++++++++++++ src/query/mod.rs | 35 +++++++ src/{query.rs => query/prom.rs} | 39 +++----- src/routes.rs | 4 +- 7 files changed, 349 insertions(+), 31 deletions(-) create mode 100644 src/query/loki.rs create mode 100644 src/query/mod.rs rename src/{query.rs => query/prom.rs} (88%) diff --git a/Cargo.lock b/Cargo.lock index 0971659..14ae837 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -429,12 +429,33 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fastrand" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" + [[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -579,6 +600,7 @@ dependencies = [ "maud", "parse_duration", "prometheus-http-query", + "reqwest", "serde", "serde_json", "serde_yaml", @@ -719,6 +741,19 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper 0.14.28", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-util" version = "0.1.3" @@ -885,6 +920,24 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1002,6 +1055,50 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "openssl" +version = "0.10.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15c9d69dd87a29568d4d017cfe8ec518706046a05184e5aea92d0af890b803c8" +dependencies = [ + "bitflags 2.4.2", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + +[[package]] +name = "openssl-sys" +version = "0.9.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e1bf214306098e4832460f797824c05d25aacdf896f64a985fb0fd992454ae" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "overload" version = "0.1.1" @@ -1063,6 +1160,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb" + [[package]] name = "polling" version = "3.3.2" @@ -1183,10 +1286,12 @@ dependencies = [ "http-body 0.4.6", "hyper 0.14.28", "hyper-rustls", + "hyper-tls", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -1198,6 +1303,7 @@ dependencies = [ "sync_wrapper", "system-configuration", "tokio", + "tokio-native-tls", "tokio-rustls", "tower-service", "url", @@ -1284,6 +1390,15 @@ version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" +[[package]] +name = "schannel" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "sct" version = "0.7.1" @@ -1294,6 +1409,29 @@ dependencies = [ "untrusted", ] +[[package]] +name = "security-framework" +version = "2.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e932934257d3b408ed8f30db49d85ea163bfe74961f017f405b025af298f0c7a" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.196" @@ -1444,6 +1582,18 @@ dependencies = [ "libc", ] +[[package]] +name = "tempfile" +version = "3.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67" +dependencies = [ + "cfg-if", + "fastrand", + "rustix", + "windows-sys 0.52.0", +] + [[package]] name = "thread_local" version = "1.1.7" @@ -1527,6 +1677,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" @@ -1717,6 +1877,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.4" diff --git a/Cargo.toml b/Cargo.toml index 743998a..b27c6f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,3 +23,4 @@ tokio = { version = "1.36.0", features = ["net", "rt", "rt-multi-thread"] } tower-http = { version = "0.5.1", features = ["trace"] } tracing = "0.1.40" tracing-subscriber = "0.3.18" +reqwest = { version = "0.11.24", features = ["rustls-tls"] } diff --git a/src/dashboard.rs b/src/dashboard.rs index 76a0f32..9b816a3 100644 --- a/src/dashboard.rs +++ b/src/dashboard.rs @@ -20,7 +20,7 @@ use serde_yaml; use tracing::{debug, error}; use anyhow::Result; -use crate::query::{QueryConn, QueryType, QueryResult, to_samples}; +use crate::query::{PromQueryConn, QueryType, PromQueryResult, to_samples}; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct PlotMeta { @@ -103,7 +103,7 @@ pub struct Graph { pub d3_tick_format: Option, } -pub async fn query_data(graph: &Graph, dash: &Dashboard, query_span: Option) -> Result> { +pub async fn query_data(graph: &Graph, dash: &Dashboard, query_span: Option) -> Result> { let connections = graph.get_query_connections(&dash.span, &query_span); let mut data = Vec::new(); for conn in connections { @@ -172,7 +172,7 @@ impl Graph { &'graph self, graph_span: &'graph Option, query_span: &'graph Option, - ) -> Vec> { + ) -> Vec> { let mut conns = Vec::new(); for plot in self.plots.iter() { debug!( @@ -180,7 +180,7 @@ impl Graph { source = plot.source, "Getting query connection for graph" ); - let mut conn = QueryConn::new(&plot.source, &plot.query, self.query_type.clone(), plot.meta.clone()); + let mut conn = PromQueryConn::new(&plot.source, &plot.query, self.query_type.clone(), plot.meta.clone()); // Query params take precendence over all other settings. Then graph settings take // precedences and finally the dashboard settings take precendence if let Some((end, duration, step_duration)) = graph_span_to_tuple(query_span) { diff --git a/src/query/loki.rs b/src/query/loki.rs new file mode 100644 index 0000000..aa18cea --- /dev/null +++ b/src/query/loki.rs @@ -0,0 +1,127 @@ +// Copyright 2024 Jeremy Wall +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use std::collections::HashMap; + +use anyhow::Result; +use chrono::prelude::*; +use reqwest; +use serde::{Deserialize, Serialize}; + +use super::{QueryType, TimeSpan}; + +// TODO(jwall): Should I allow non stream returns? +#[derive(Serialize, Deserialize)] +pub enum ResultType { + /// Returned by query endpoints + #[serde(rename = "vector")] + Vector, + /// Returned by query_range endpoints + #[serde(rename = "matrix")] + Matrix, + /// Returned by query and query_range endpoints + #[serde(rename = "streams")] + Streams, +} + +#[derive(Serialize, Deserialize)] +pub struct LokiResult { + #[serde(alias = "metric")] + #[serde(alias = "stream")] + labels: Option>, + value: Option<(i64, String)>, + /// The only version that returns log lines + values: Option>, +} + +#[derive(Serialize, Deserialize)] +pub struct LokiData { + result_type: ResultType, + result: Vec, + //stats: // TODO +} + +#[derive(Serialize, Deserialize)] +pub struct LokiResponse { + status: String, + data: LokiData, +} + +pub struct LokiConn<'conn> { + url: &'conn str, + query: &'conn str, + span: Option, + query_type: QueryType, + limit: Option, +} + +const SCALAR_API_PATH: &'static str = "/loki/api/v1/query"; +const RANGE_API_PATH: &'static str = "/loki/api/v1/query_range"; + +impl<'conn> LokiConn<'conn> { + pub fn new<'a: 'conn>(url: &'a str, query: &'a str, query_type: QueryType) -> Self { + Self { + url, + query, + query_type, + span: None, + limit: None, + } + } + + pub fn with_limit(mut self, limit: usize) -> Self { + self.limit = Some(limit); + self + } + + pub fn with_span( + mut self, + end: DateTime, + duration: chrono::Duration, + step: chrono::Duration, + ) -> Self { + self.span = Some(TimeSpan { + end, + duration, + step_seconds: step.num_seconds(), + }); + self + } + + pub async fn get_results(&self) -> Result { + let url = match self.query_type { + QueryType::Scalar => format!("{}{}", self.url, SCALAR_API_PATH), + QueryType::Range => format!("{}{}", self.url, RANGE_API_PATH), + }; + let client = reqwest::Client::new(); + let mut req = client.get(url).query(&["query", self.query]); + if self.limit.is_some() { + req = req.query(&["limit", &self.limit.map(|u| u.to_string()).unwrap()]); + } + if let QueryType::Range = self.query_type { + let (start, end, step_resolution) = if let Some(span) = &self.span { + let start = span.end - span.duration; + (start.timestamp(), span.end.timestamp(), span.step_seconds as f64) + } else { + let end = Utc::now(); + let start = end - chrono::Duration::minutes(10); + (start.timestamp(), end.timestamp(), 30 as f64) + }; + req = req.query(&["end", &end.to_string()]); + req = req.query(&["since", &start.to_string()]); + req = req.query(&["step", &step_resolution.to_string()]); + } + + Ok(req.send().await?.json().await?) + } +} diff --git a/src/query/mod.rs b/src/query/mod.rs new file mode 100644 index 0000000..b348f1d --- /dev/null +++ b/src/query/mod.rs @@ -0,0 +1,35 @@ +// Copyright 2023 Jeremy Wall +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use serde::Deserialize; +use chrono::prelude::*; + +mod loki; +mod prom; + +#[derive(Deserialize, Clone, Debug)] +pub enum QueryType { + Range, + Scalar, +} + +#[derive(Debug)] +pub struct TimeSpan { + pub end: DateTime, + pub duration: chrono::Duration, + pub step_seconds: i64, +} + + +pub use prom::*; + diff --git a/src/query.rs b/src/query/prom.rs similarity index 88% rename from src/query.rs rename to src/query/prom.rs index 695d94d..ba486bf 100644 --- a/src/query.rs +++ b/src/query/prom.rs @@ -1,6 +1,4 @@ -use std::collections::HashMap; - -// Copyright 2023 Jeremy Wall +// Copyright 2024 Jeremy Wall // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,6 +11,8 @@ use std::collections::HashMap; // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use chrono::prelude::*; use prometheus_http_query::{ response::{Data, PromqlResult}, @@ -23,21 +23,10 @@ use tracing::debug; use crate::dashboard::PlotMeta; -#[derive(Deserialize, Clone, Debug)] -pub enum QueryType { - Range, - Scalar, -} +use super::{QueryType, TimeSpan}; #[derive(Debug)] -pub struct TimeSpan { - pub end: DateTime, - pub duration: chrono::Duration, - pub step_seconds: i64, -} - -#[derive(Debug)] -pub struct QueryConn<'conn> { +pub struct PromQueryConn<'conn> { source: &'conn str, query: &'conn str, span: Option, @@ -45,7 +34,7 @@ pub struct QueryConn<'conn> { pub meta: PlotMeta, } -impl<'conn> QueryConn<'conn> { +impl<'conn> PromQueryConn<'conn> { pub fn new<'a: 'conn>(source: &'a str, query: &'a str, query_type: QueryType, meta: PlotMeta) -> Self { Self { source, @@ -120,15 +109,15 @@ pub struct DataPoint { } #[derive(Serialize, Deserialize)] -pub enum QueryResult { +pub enum PromQueryResult { Series(Vec<(HashMap, PlotMeta, Vec)>), Scalar(Vec<(HashMap, PlotMeta, DataPoint)>), } -impl std::fmt::Debug for QueryResult { +impl std::fmt::Debug for PromQueryResult { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - QueryResult::Series(v) => { + PromQueryResult::Series(v) => { f.write_fmt(format_args!("Series trace count = {}", v.len()))?; for (idx, (tags, meta, trace)) in v.iter().enumerate() { f.write_fmt(format_args!( @@ -140,7 +129,7 @@ impl std::fmt::Debug for QueryResult { ))?; } } - QueryResult::Scalar(v) => { + PromQueryResult::Scalar(v) => { f.write_fmt(format_args!("{} traces", v.len()))?; } } @@ -148,9 +137,9 @@ impl std::fmt::Debug for QueryResult { } } -pub fn to_samples(data: Data, meta: PlotMeta) -> QueryResult { +pub fn to_samples(data: Data, meta: PlotMeta) -> PromQueryResult { match data { - Data::Matrix(mut range) => QueryResult::Series( + Data::Matrix(mut range) => PromQueryResult::Series( range .drain(0..) .map(|rv| { @@ -169,7 +158,7 @@ pub fn to_samples(data: Data, meta: PlotMeta) -> QueryResult { }) .collect(), ), - Data::Vector(mut vector) => QueryResult::Scalar( + Data::Vector(mut vector) => PromQueryResult::Scalar( vector .drain(0..) .map(|iv| { @@ -185,7 +174,7 @@ pub fn to_samples(data: Data, meta: PlotMeta) -> QueryResult { }) .collect(), ), - Data::Scalar(sample) => QueryResult::Scalar(vec![( + Data::Scalar(sample) => PromQueryResult::Scalar(vec![( HashMap::new(), meta.clone(), DataPoint { diff --git a/src/routes.rs b/src/routes.rs index 3b36328..81cbd8f 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -26,7 +26,7 @@ use serde::{Serialize, Deserialize}; use tracing::debug; use crate::dashboard::{Dashboard, Graph, GraphSpan, AxisDefinition, Orientation, query_data}; -use crate::query::QueryResult; +use crate::query::PromQueryResult; type Config = State>>; @@ -34,7 +34,7 @@ type Config = State>>; pub struct GraphPayload { pub legend_orientation: Option, pub yaxes: Vec, - pub plots: Vec, + pub plots: Vec, } pub async fn graph_query(