feat: Loki logql query client

This commit is contained in:
Jeremy Wall 2024-02-26 19:13:05 -05:00
parent 782cca41a0
commit 16ff43f4e2
7 changed files with 349 additions and 31 deletions

166
Cargo.lock generated
View File

@ -429,12 +429,33 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "fastrand"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "form_urlencoded"
version = "1.2.1"
@ -579,6 +600,7 @@ dependencies = [
"maud",
"parse_duration",
"prometheus-http-query",
"reqwest",
"serde",
"serde_json",
"serde_yaml",
@ -719,6 +741,19 @@ dependencies = [
"tokio-rustls",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper 0.14.28",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "hyper-util"
version = "0.1.3"
@ -885,6 +920,24 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "native-tls"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@ -1002,6 +1055,50 @@ version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "openssl"
version = "0.10.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15c9d69dd87a29568d4d017cfe8ec518706046a05184e5aea92d0af890b803c8"
dependencies = [
"bitflags 2.4.2",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22e1bf214306098e4832460f797824c05d25aacdf896f64a985fb0fd992454ae"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "overload"
version = "0.1.1"
@ -1063,6 +1160,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2900ede94e305130c13ddd391e0ab7cbaeb783945ae07a279c268cb05109c6cb"
[[package]]
name = "polling"
version = "3.3.2"
@ -1183,10 +1286,12 @@ dependencies = [
"http-body 0.4.6",
"hyper 0.14.28",
"hyper-rustls",
"hyper-tls",
"ipnet",
"js-sys",
"log",
"mime",
"native-tls",
"once_cell",
"percent-encoding",
"pin-project-lite",
@ -1198,6 +1303,7 @@ dependencies = [
"sync_wrapper",
"system-configuration",
"tokio",
"tokio-native-tls",
"tokio-rustls",
"tower-service",
"url",
@ -1284,6 +1390,15 @@ version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
[[package]]
name = "schannel"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534"
dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "sct"
version = "0.7.1"
@ -1294,6 +1409,29 @@ dependencies = [
"untrusted",
]
[[package]]
name = "security-framework"
version = "2.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de"
dependencies = [
"bitflags 1.3.2",
"core-foundation",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework-sys"
version = "2.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e932934257d3b408ed8f30db49d85ea163bfe74961f017f405b025af298f0c7a"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "serde"
version = "1.0.196"
@ -1444,6 +1582,18 @@ dependencies = [
"libc",
]
[[package]]
name = "tempfile"
version = "3.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67"
dependencies = [
"cfg-if",
"fastrand",
"rustix",
"windows-sys 0.52.0",
]
[[package]]
name = "thread_local"
version = "1.1.7"
@ -1527,6 +1677,16 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.24.1"
@ -1717,6 +1877,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.4"

View File

@ -23,3 +23,4 @@ tokio = { version = "1.36.0", features = ["net", "rt", "rt-multi-thread"] }
tower-http = { version = "0.5.1", features = ["trace"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
reqwest = { version = "0.11.24", features = ["rustls-tls"] }

View File

@ -20,7 +20,7 @@ use serde_yaml;
use tracing::{debug, error};
use anyhow::Result;
use crate::query::{QueryConn, QueryType, QueryResult, to_samples};
use crate::query::{PromQueryConn, QueryType, PromQueryResult, to_samples};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PlotMeta {
@ -103,7 +103,7 @@ pub struct Graph {
pub d3_tick_format: Option<String>,
}
pub async fn query_data(graph: &Graph, dash: &Dashboard, query_span: Option<GraphSpan>) -> Result<Vec<QueryResult>> {
pub async fn query_data(graph: &Graph, dash: &Dashboard, query_span: Option<GraphSpan>) -> Result<Vec<PromQueryResult>> {
let connections = graph.get_query_connections(&dash.span, &query_span);
let mut data = Vec::new();
for conn in connections {
@ -172,7 +172,7 @@ impl Graph {
&'graph self,
graph_span: &'graph Option<GraphSpan>,
query_span: &'graph Option<GraphSpan>,
) -> Vec<QueryConn<'conn>> {
) -> Vec<PromQueryConn<'conn>> {
let mut conns = Vec::new();
for plot in self.plots.iter() {
debug!(
@ -180,7 +180,7 @@ impl Graph {
source = plot.source,
"Getting query connection for graph"
);
let mut conn = QueryConn::new(&plot.source, &plot.query, self.query_type.clone(), plot.meta.clone());
let mut conn = PromQueryConn::new(&plot.source, &plot.query, self.query_type.clone(), plot.meta.clone());
// Query params take precendence over all other settings. Then graph settings take
// precedences and finally the dashboard settings take precendence
if let Some((end, duration, step_duration)) = graph_span_to_tuple(query_span) {

127
src/query/loki.rs Normal file
View File

@ -0,0 +1,127 @@
// Copyright 2024 Jeremy Wall
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use anyhow::Result;
use chrono::prelude::*;
use reqwest;
use serde::{Deserialize, Serialize};
use super::{QueryType, TimeSpan};
// TODO(jwall): Should I allow non stream returns?
#[derive(Serialize, Deserialize)]
pub enum ResultType {
/// Returned by query endpoints
#[serde(rename = "vector")]
Vector,
/// Returned by query_range endpoints
#[serde(rename = "matrix")]
Matrix,
/// Returned by query and query_range endpoints
#[serde(rename = "streams")]
Streams,
}
#[derive(Serialize, Deserialize)]
pub struct LokiResult {
#[serde(alias = "metric")]
#[serde(alias = "stream")]
labels: Option<HashMap<String, String>>,
value: Option<(i64, String)>,
/// The only version that returns log lines
values: Option<Vec<(f64, String)>>,
}
#[derive(Serialize, Deserialize)]
pub struct LokiData {
result_type: ResultType,
result: Vec<LokiResult>,
//stats: // TODO
}
#[derive(Serialize, Deserialize)]
pub struct LokiResponse {
status: String,
data: LokiData,
}
pub struct LokiConn<'conn> {
url: &'conn str,
query: &'conn str,
span: Option<TimeSpan>,
query_type: QueryType,
limit: Option<usize>,
}
const SCALAR_API_PATH: &'static str = "/loki/api/v1/query";
const RANGE_API_PATH: &'static str = "/loki/api/v1/query_range";
impl<'conn> LokiConn<'conn> {
pub fn new<'a: 'conn>(url: &'a str, query: &'a str, query_type: QueryType) -> Self {
Self {
url,
query,
query_type,
span: None,
limit: None,
}
}
pub fn with_limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
pub fn with_span(
mut self,
end: DateTime<Utc>,
duration: chrono::Duration,
step: chrono::Duration,
) -> Self {
self.span = Some(TimeSpan {
end,
duration,
step_seconds: step.num_seconds(),
});
self
}
pub async fn get_results(&self) -> Result<LokiResult> {
let url = match self.query_type {
QueryType::Scalar => format!("{}{}", self.url, SCALAR_API_PATH),
QueryType::Range => format!("{}{}", self.url, RANGE_API_PATH),
};
let client = reqwest::Client::new();
let mut req = client.get(url).query(&["query", self.query]);
if self.limit.is_some() {
req = req.query(&["limit", &self.limit.map(|u| u.to_string()).unwrap()]);
}
if let QueryType::Range = self.query_type {
let (start, end, step_resolution) = if let Some(span) = &self.span {
let start = span.end - span.duration;
(start.timestamp(), span.end.timestamp(), span.step_seconds as f64)
} else {
let end = Utc::now();
let start = end - chrono::Duration::minutes(10);
(start.timestamp(), end.timestamp(), 30 as f64)
};
req = req.query(&["end", &end.to_string()]);
req = req.query(&["since", &start.to_string()]);
req = req.query(&["step", &step_resolution.to_string()]);
}
Ok(req.send().await?.json().await?)
}
}

35
src/query/mod.rs Normal file
View File

@ -0,0 +1,35 @@
// Copyright 2023 Jeremy Wall
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::Deserialize;
use chrono::prelude::*;
mod loki;
mod prom;
#[derive(Deserialize, Clone, Debug)]
pub enum QueryType {
Range,
Scalar,
}
#[derive(Debug)]
pub struct TimeSpan {
pub end: DateTime<Utc>,
pub duration: chrono::Duration,
pub step_seconds: i64,
}
pub use prom::*;

View File

@ -1,6 +1,4 @@
use std::collections::HashMap;
// Copyright 2023 Jeremy Wall
// Copyright 2024 Jeremy Wall
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -13,6 +11,8 @@ use std::collections::HashMap;
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use chrono::prelude::*;
use prometheus_http_query::{
response::{Data, PromqlResult},
@ -23,21 +23,10 @@ use tracing::debug;
use crate::dashboard::PlotMeta;
#[derive(Deserialize, Clone, Debug)]
pub enum QueryType {
Range,
Scalar,
}
use super::{QueryType, TimeSpan};
#[derive(Debug)]
pub struct TimeSpan {
pub end: DateTime<Utc>,
pub duration: chrono::Duration,
pub step_seconds: i64,
}
#[derive(Debug)]
pub struct QueryConn<'conn> {
pub struct PromQueryConn<'conn> {
source: &'conn str,
query: &'conn str,
span: Option<TimeSpan>,
@ -45,7 +34,7 @@ pub struct QueryConn<'conn> {
pub meta: PlotMeta,
}
impl<'conn> QueryConn<'conn> {
impl<'conn> PromQueryConn<'conn> {
pub fn new<'a: 'conn>(source: &'a str, query: &'a str, query_type: QueryType, meta: PlotMeta) -> Self {
Self {
source,
@ -120,15 +109,15 @@ pub struct DataPoint {
}
#[derive(Serialize, Deserialize)]
pub enum QueryResult {
pub enum PromQueryResult {
Series(Vec<(HashMap<String, String>, PlotMeta, Vec<DataPoint>)>),
Scalar(Vec<(HashMap<String, String>, PlotMeta, DataPoint)>),
}
impl std::fmt::Debug for QueryResult {
impl std::fmt::Debug for PromQueryResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
QueryResult::Series(v) => {
PromQueryResult::Series(v) => {
f.write_fmt(format_args!("Series trace count = {}", v.len()))?;
for (idx, (tags, meta, trace)) in v.iter().enumerate() {
f.write_fmt(format_args!(
@ -140,7 +129,7 @@ impl std::fmt::Debug for QueryResult {
))?;
}
}
QueryResult::Scalar(v) => {
PromQueryResult::Scalar(v) => {
f.write_fmt(format_args!("{} traces", v.len()))?;
}
}
@ -148,9 +137,9 @@ impl std::fmt::Debug for QueryResult {
}
}
pub fn to_samples(data: Data, meta: PlotMeta) -> QueryResult {
pub fn to_samples(data: Data, meta: PlotMeta) -> PromQueryResult {
match data {
Data::Matrix(mut range) => QueryResult::Series(
Data::Matrix(mut range) => PromQueryResult::Series(
range
.drain(0..)
.map(|rv| {
@ -169,7 +158,7 @@ pub fn to_samples(data: Data, meta: PlotMeta) -> QueryResult {
})
.collect(),
),
Data::Vector(mut vector) => QueryResult::Scalar(
Data::Vector(mut vector) => PromQueryResult::Scalar(
vector
.drain(0..)
.map(|iv| {
@ -185,7 +174,7 @@ pub fn to_samples(data: Data, meta: PlotMeta) -> QueryResult {
})
.collect(),
),
Data::Scalar(sample) => QueryResult::Scalar(vec![(
Data::Scalar(sample) => PromQueryResult::Scalar(vec![(
HashMap::new(),
meta.clone(),
DataPoint {

View File

@ -26,7 +26,7 @@ use serde::{Serialize, Deserialize};
use tracing::debug;
use crate::dashboard::{Dashboard, Graph, GraphSpan, AxisDefinition, Orientation, query_data};
use crate::query::QueryResult;
use crate::query::PromQueryResult;
type Config = State<Arc<Vec<Dashboard>>>;
@ -34,7 +34,7 @@ type Config = State<Arc<Vec<Dashboard>>>;
pub struct GraphPayload {
pub legend_orientation: Option<Orientation>,
pub yaxes: Vec<AxisDefinition>,
pub plots: Vec<QueryResult>,
pub plots: Vec<PromQueryResult>,
}
pub async fn graph_query(