Improve the metrics layer so we use an installed recorder

This commit is contained in:
Jeremy Wall 2023-01-29 19:55:43 -05:00
parent e02fcc82e1
commit ab0937c3b2
2 changed files with 98 additions and 84 deletions

View File

@ -11,78 +11,99 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::{ //! A [metrics] powered [TraceLayer] that works with any [Tower](https://crates.io/crates/tower) middleware.
atomic::{AtomicU64, Ordering}, use axum::http::{Request, Response};
Arc, Mutex, use metrics::{histogram, increment_counter, Label};
use std::{
marker::PhantomData,
sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
},
}; };
use axum::{body::Bytes, http::Request, http::Response};
use metrics::{Key, Label, Recorder};
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusRecorder};
use tower_http::{ use tower_http::{
classify::{ServerErrorsAsFailures, SharedClassifier}, classify::{ServerErrorsAsFailures, SharedClassifier},
trace::{ trace::{
DefaultMakeSpan, DefaultOnEos, OnBodyChunk, OnEos, OnFailure, OnRequest, OnResponse, DefaultMakeSpan, DefaultOnEos, OnBodyChunk, OnFailure, OnRequest, OnResponse, TraceLayer,
TraceLayer,
}, },
}; };
use tracing;
// We want to track requeste count, request latency, request size minimally. /// A Metrics Trace Layer using a [MetricsRecorder].
///
pub type MetricsTraceLayer = TraceLayer< /// The layer will record 4 different metrics:
///
/// * http_request_counter
/// * http_request_failure_counter
/// * http_request_size_bytes_hist
/// * http_request_request_time_micros_hist
///
/// Each of the metrics are labled by host, method, and path
pub type MetricsTraceLayer<B, F> = TraceLayer<
SharedClassifier<ServerErrorsAsFailures>, SharedClassifier<ServerErrorsAsFailures>,
DefaultMakeSpan, DefaultMakeSpan,
MetricsRecorder, MetricsRecorder<B, F>,
MetricsRecorder, MetricsRecorder<B, F>,
MetricsRecorder, MetricsRecorder<B, F>,
DefaultOnEos, DefaultOnEos,
MetricsRecorder, MetricsRecorder<B, F>,
>; >;
pub fn get_recorder() -> PrometheusRecorder { /// Holds the state required for recording metrics on a given request.
let builder = PrometheusBuilder::new(); pub struct MetricsRecorder<B, F>
builder.build_recorder() where
} F: Fn(&B) -> u64,
{
#[derive(Clone)]
pub struct MetricsRecorder {
rec: Arc<PrometheusRecorder>,
labels: Arc<Mutex<Vec<Label>>>, labels: Arc<Mutex<Vec<Label>>>,
size: Arc<AtomicU64>, size: Arc<AtomicU64>,
chunk_len: Arc<F>,
_phantom: PhantomData<B>,
} }
impl MetricsRecorder { impl<B, F> Clone for MetricsRecorder<B, F>
pub fn new_with_rec(rec: Arc<PrometheusRecorder>) -> Self { where
F: Fn(&B) -> u64,
{
fn clone(&self) -> Self {
Self { Self {
rec, labels: self.labels.clone(),
labels: Arc::new(Mutex::new(Vec::new())), size: self.size.clone(),
size: Arc::new(AtomicU64::new(0)), chunk_len: self.chunk_len.clone(),
_phantom: self._phantom.clone(),
} }
} }
} }
impl OnBodyChunk<Bytes> for MetricsRecorder { impl<B, F> MetricsRecorder<B, F>
fn on_body_chunk( where
&mut self, F: Fn(&B) -> u64,
chunk: &Bytes, {
_latency: std::time::Duration, /// Construct a new [MetricsRecorder] using the installed [Recorder].
_span: &tracing::Span, pub fn new(f: F) -> Self {
) { Self {
let _ = self.size.fetch_add(chunk.len() as u64, Ordering::SeqCst); labels: Arc::new(Mutex::new(Vec::new())),
size: Arc::new(AtomicU64::new(0)),
chunk_len: Arc::new(f),
_phantom: PhantomData,
}
} }
} }
impl OnEos for MetricsRecorder { impl<B, F> OnBodyChunk<B> for MetricsRecorder<B, F>
fn on_eos( where
self, F: Fn(&B) -> u64,
_trailers: Option<&axum::http::HeaderMap>, {
_stream_duration: std::time::Duration, fn on_body_chunk(&mut self, chunk: &B, _latency: std::time::Duration, _span: &tracing::Span) {
_span: &tracing::Span, let _ = self
) { .size
.fetch_add(self.chunk_len.as_ref()(chunk), Ordering::SeqCst);
} }
} }
impl<FailureClass> OnFailure<FailureClass> for MetricsRecorder { impl<B, FailureClass, F> OnFailure<FailureClass> for MetricsRecorder<B, F>
where
F: Fn(&B) -> u64,
{
fn on_failure( fn on_failure(
&mut self, &mut self,
_failure_classification: FailureClass, _failure_classification: FailureClass,
@ -90,30 +111,31 @@ impl<FailureClass> OnFailure<FailureClass> for MetricsRecorder {
_span: &tracing::Span, _span: &tracing::Span,
) { ) {
let labels = self.labels.lock().expect("Failed to unlock labels").clone(); let labels = self.labels.lock().expect("Failed to unlock labels").clone();
self.rec increment_counter!("http_request_failure_counter", labels);
.as_ref()
.register_histogram(&Key::from_parts("http_request_failure_counter", labels));
} }
} }
impl<B> OnResponse<B> for MetricsRecorder { impl<B, RB, F> OnResponse<RB> for MetricsRecorder<B, F>
where
F: Fn(&B) -> u64,
{
fn on_response( fn on_response(
self, self,
_response: &Response<B>, _response: &Response<RB>,
latency: std::time::Duration, latency: std::time::Duration,
_span: &tracing::Span, _span: &tracing::Span,
) { ) {
let labels = self.labels.lock().expect("Failed to unlock labels").clone(); let labels = self.labels.lock().expect("Failed to unlock labels").clone();
self.rec histogram!(
.as_ref() "http_request_time_micros_hist",
.register_histogram(&Key::from_parts("http_request_time_micros", labels.clone())) latency.as_micros() as f64,
// If we somehow end up having requests overflow from u128 into f64 then we have labels.clone()
// much bigger problems than this cast. );
.record(latency.as_micros() as f64); histogram!(
self.rec "http_request_size_bytes_hist",
.as_ref() self.size.as_ref().load(Ordering::SeqCst) as f64,
.register_histogram(&Key::from_parts("http_request_size_bytes", labels)) labels
.record(self.size.as_ref().load(Ordering::SeqCst) as f64); )
} }
} }
@ -125,9 +147,11 @@ fn make_request_lables(path: String, host: String, method: String) -> Vec<Label>
] ]
} }
impl<B> OnRequest<B> for MetricsRecorder { impl<B, RB, F> OnRequest<RB> for MetricsRecorder<B, F>
fn on_request(&mut self, request: &Request<B>, _span: &tracing::Span) { where
let rec = self.rec.as_ref(); F: Fn(&B) -> u64,
{
fn on_request(&mut self, request: &Request<RB>, _span: &tracing::Span) {
let path = request.uri().path().to_lowercase(); let path = request.uri().path().to_lowercase();
let host = request.uri().host().unwrap_or("").to_lowercase(); let host = request.uri().host().unwrap_or("").to_lowercase();
let method = request.method().to_string(); let method = request.method().to_string();
@ -135,13 +159,16 @@ impl<B> OnRequest<B> for MetricsRecorder {
let labels = make_request_lables(path, host, method); let labels = make_request_lables(path, host, method);
let mut labels_lock = self.labels.lock().expect("Failed to unlock labels"); let mut labels_lock = self.labels.lock().expect("Failed to unlock labels");
(*labels_lock.as_mut()) = labels.clone(); (*labels_lock.as_mut()) = labels.clone();
rec.register_counter(&Key::from_parts("http_request_counter", labels.clone())) increment_counter!("http_request_counter", labels);
.increment(1);
} }
} }
pub fn make_trace_layer(rec: Arc<PrometheusRecorder>) -> MetricsTraceLayer { /// Construct a [TraceLayer] that will use an installed [metrics::Recorder] to record metrics per request.
let metrics_recorder = MetricsRecorder::new_with_rec(rec); pub fn make_layer<B, F>(f: F) -> MetricsTraceLayer<B, F>
where
F: Fn(&B) -> u64,
{
let metrics_recorder = MetricsRecorder::new(f);
let layer = TraceLayer::new_for_http() let layer = TraceLayer::new_for_http()
.on_body_chunk(metrics_recorder.clone()) .on_body_chunk(metrics_recorder.clone())
.on_request(metrics_recorder.clone()) .on_request(metrics_recorder.clone())
@ -149,17 +176,3 @@ pub fn make_trace_layer(rec: Arc<PrometheusRecorder>) -> MetricsTraceLayer {
.on_failure(metrics_recorder.clone()); .on_failure(metrics_recorder.clone());
layer layer
} }
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_construction() {
let metrics_recorder = MetricsRecorder::new_with_rec(std::sync::Arc::new(get_recorder()));
let _trace_layer = TraceLayer::new_for_http()
.on_body_chunk(metrics_recorder.clone())
.on_request(metrics_recorder.clone())
.on_response(metrics_recorder.clone())
.on_failure(metrics_recorder.clone());
}
}

View File

@ -546,9 +546,10 @@ fn mk_v2_routes() -> Router {
#[instrument(fields(recipe_dir=?recipe_dir_path), skip_all)] #[instrument(fields(recipe_dir=?recipe_dir_path), skip_all)]
pub async fn make_router(recipe_dir_path: PathBuf, store_path: PathBuf) -> Router { pub async fn make_router(recipe_dir_path: PathBuf, store_path: PathBuf) -> Router {
let recorder = std::sync::Arc::new(metrics::get_recorder()); let handle = metrics_exporter_prometheus::PrometheusBuilder::new()
let handle = recorder.handle(); .install_recorder()
let metrics_trace_layer = metrics::make_trace_layer(recorder); .expect("Failed to install Prometheus Recorder");
let metrics_trace_layer = metrics::make_layer(|b: &axum::body::Bytes| b.len() as u64);
let store = Arc::new(storage::file_store::AsyncFileStore::new( let store = Arc::new(storage::file_store::AsyncFileStore::new(
recipe_dir_path.clone(), recipe_dir_path.clone(),
)); ));