From 2a52bba12a6a50e7ffa57f456e20e3aa602fe6a0 Mon Sep 17 00:00:00 2001 From: Jeremy Wall Date: Sun, 29 Jan 2023 20:16:26 -0500 Subject: [PATCH] Initial commit --- .gitignore | 2 + Cargo.toml | 15 +++++ src/lib.rs | 185 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 202 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 src/lib.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4fffb2f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +/Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..09b54ba --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "tower-trace-metrics" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +metrics = "0.20.1" +tracing = "0.1.35" +http = "0.2.8" + +[dependencies.tower-http] +version = "0.3.0" +features = ["trace"] \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..717e65d --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,185 @@ +// Copyright 2023 Jeremy Wall (Jeremy@marzhilsltudios.com) +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//! A [metrics] powered [TraceLayer] that works as [Tower](https://crates.io/crates/tower) middleware. +use http::{Request, Response}; +use metrics::{histogram, increment_counter, Label}; +use std::{ + marker::PhantomData, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, Mutex, + }, +}; +use tower_http::{ + classify::{ServerErrorsAsFailures, SharedClassifier}, + trace::{ + DefaultMakeSpan, DefaultOnEos, OnBodyChunk, OnFailure, OnRequest, OnResponse, TraceLayer, + }, +}; +use tracing; + +/// A Metrics Trace Layer using a [MetricsRecorder]. +/// +/// The layer will record 4 different metrics: +/// +/// * http_request_counter +/// * http_request_failure_counter +/// * http_request_size_bytes_hist +/// * http_request_request_time_micros_hist +/// +/// Each of the metrics are labled by host, method, and path +pub type MetricsTraceLayer = TraceLayer< + SharedClassifier, + DefaultMakeSpan, + MetricsRecorder, + MetricsRecorder, + MetricsRecorder, + DefaultOnEos, + MetricsRecorder, +>; + +/// Holds the state required for recording metrics on a given request. +pub struct MetricsRecorder +where + F: Fn(&B) -> u64, +{ + /// The labels for each metric we record on this request. + pub labels: Arc>>, + /// The accumulator for the number of bytes on this request. + pub size: Arc, + /// The mapper function to extract the size from a chunk in [OnBodyChunk], + pub chunk_len: Arc, + _phantom: PhantomData, +} + +impl Clone for MetricsRecorder +where + F: Fn(&B) -> u64, +{ + fn clone(&self) -> Self { + Self { + labels: self.labels.clone(), + size: self.size.clone(), + chunk_len: self.chunk_len.clone(), + _phantom: self._phantom.clone(), + } + } +} + +impl MetricsRecorder +where + F: Fn(&B) -> u64, +{ + /// Construct a new [MetricsRecorder] using the installed [metrics::Recorder]. + /// The function passed in is used to extract the size from the chunks in a + /// response for all [OnBodyChunk] calls. + pub fn new(f: F) -> Self { + Self { + labels: Arc::new(Mutex::new(Vec::new())), + size: Arc::new(AtomicU64::new(0)), + chunk_len: Arc::new(f), + _phantom: PhantomData, + } + } +} + +impl OnBodyChunk for MetricsRecorder +where + F: Fn(&B) -> u64, +{ + fn on_body_chunk(&mut self, chunk: &B, _latency: std::time::Duration, _span: &tracing::Span) { + let _ = self + .size + .fetch_add(self.chunk_len.as_ref()(chunk), Ordering::SeqCst); + } +} + +impl OnFailure for MetricsRecorder +where + F: Fn(&B) -> u64, +{ + fn on_failure( + &mut self, + _failure_classification: FailureClass, + _latency: std::time::Duration, + _span: &tracing::Span, + ) { + let labels = self.labels.lock().expect("Failed to unlock labels").clone(); + increment_counter!("http_request_failure_counter", labels); + } +} + +impl OnResponse for MetricsRecorder +where + F: Fn(&B) -> u64, +{ + fn on_response( + self, + _response: &Response, + latency: std::time::Duration, + _span: &tracing::Span, + ) { + let labels = self.labels.lock().expect("Failed to unlock labels").clone(); + histogram!( + "http_request_time_micros_hist", + latency.as_micros() as f64, + labels.clone() + ); + histogram!( + "http_request_size_bytes_hist", + self.size.as_ref().load(Ordering::SeqCst) as f64, + labels + ) + } +} + +fn make_request_lables(path: String, host: String, method: String) -> Vec