From 390b61ee8687cfa938c8f7fb7f4379d040a85d1a Mon Sep 17 00:00:00 2001 From: Jeremy Wall Date: Mon, 27 Feb 2023 15:19:16 -0500 Subject: [PATCH] Higher resolution ping timings --- src/icmp.rs | 257 ++++++++++++++++++++++++---------------------------- src/main.rs | 7 +- 2 files changed, 121 insertions(+), 143 deletions(-) diff --git a/src/icmp.rs b/src/icmp.rs index ec4dc37..2b58d20 100644 --- a/src/icmp.rs +++ b/src/icmp.rs @@ -14,7 +14,7 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::ops::Sub; use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, time::{Duration, Instant}, }; @@ -63,11 +63,77 @@ fn resolve_host_address(host: &str) -> String { struct State { destinations: HashMap, // domain, address // TODO(jwall): This should be a time tracker by both identifier and sequence - time_tracker: HashMap, u16)>, + time_tracker: BTreeMap>, + destination_counter: BTreeMap, latency_guage: GaugeVec, ping_counter: CounterVec, } +impl State { + fn handle_echo_reply(&mut self, identifier: u16, sequence: u16) -> bool { + if let Some((domain_name, dest)) = self.destinations.get(&identifier) { + let time_tracker = self.time_tracker.get_mut(&identifier); + if let Some(Some(send_time)) = time_tracker.as_ref().map(|m| m.get(&sequence)) { + let elapsed = Instant::now().sub(send_time.clone()).as_micros() as f64 / 1000.00; + // We make a copy here to avoid the borrow above sticking around for too long. + info!( + domain=domain_name, + %dest, + time = elapsed, + seq = sequence, + "Reply", + ); + self.ping_counter + .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) + .inc(); + if elapsed as i32 != 0 { + self.latency_guage + .with(&prometheus::labels! {"domain" => domain_name.as_str()}) + .set(elapsed); + } + self.time_tracker + .get_mut(&identifier) + .and_then(|m| m.remove(&sequence)); + return true; + } else { + error!(sequence, "Discarding unexpected sequence",); + }; + // Check all the other sequences to see if they have expired timeouts yet. + // Record timeout for the expired sequences. + // Remove the timeouts for the expired sequences. + let expired_sequences = self.time_tracker.get(&identifier).map(|m| { + let mut for_delete = Vec::with_capacity(m.len()); + let m = m.clone(); + { + for (k, send_time) in m.iter() { + if Instant::now().sub(*send_time) >= Duration::from_secs(PINGTIMEOUT.flag) { + info!( + domain=domain_name, + %dest, + seq = sequence, + "Dropped" + ); + self.ping_counter + .with(&prometheus::labels! {"result" => "timeout", "domain" => domain_name}) + .inc(); + for_delete.push(*k); + } + } + } + for_delete + }); + for k in expired_sequences.unwrap_or_default() { + self.time_tracker + .get_mut(&identifier) + .and_then(|m| m.remove(&k)); + } + } else { + warn!(identifier, "Discarding wrong identifier"); + } + return false; + } +} + struct PingerImpl { sock: Sock, } @@ -159,48 +225,7 @@ impl<'a> PacketHandler for &'a mut State { sequence, payload: _, } => { - if let Some((domain_name, dest)) = self.destinations.get(&identifier) { - if let Some((Some(send_time), expected_sequence)) = - self.time_tracker.get(&identifier) - { - let elapsed = - Instant::now().sub(send_time.clone()).as_millis() as f64 / 1000.00; - // We make a copy here to avoid the borrow above sticking around for too long. - let expected_sequence = *expected_sequence; - if sequence != expected_sequence { - error!( - sequence, - expected = expected_sequence, - "Discarding unexpected sequence", - ); - self.time_tracker - .insert(identifier, (None, expected_sequence.wrapping_add(1))); - return false; - } - info!( - domain=domain_name, - %dest, - time = elapsed, - seq = sequence, - "Reply", - ); - self.ping_counter - .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) - .inc(); - if elapsed as i32 != 0 { - self.latency_guage - .with(&prometheus::labels! {"domain" => domain_name.as_str()}) - .set(elapsed); - } - self.time_tracker - .insert(identifier, (None, expected_sequence.wrapping_add(1))); - return true; - } else { - return false; - }; - } else { - warn!(identifier, "Discarding wrong identifier"); - } + return self.handle_echo_reply(identifier, sequence); } _ => { // We ignore the rest. @@ -224,39 +249,7 @@ impl<'a> PacketHandler for &'a mut State { sequence, payload: _, } => { - if let Some((domain_name, dest)) = self.destinations.get(&identifier) { - if let Some((Some(send_time), expected_sequence)) = - self.time_tracker.get(&identifier) - { - let elapsed = - Instant::now().sub(send_time.clone()).as_micros() as f64 / 1000.00; - let expected_sequence = *expected_sequence; - if expected_sequence != sequence { - error!( - sequence, - expected = expected_sequence, - "Discarding unexpected sequence", - ); - self.time_tracker - .insert(identifier, (None, expected_sequence.wrapping_add(1))); - return false; - } - info!(domain=domain_name, %dest, time = elapsed, seq = sequence, "Reply",); - self.ping_counter - .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) - .inc(); - self.latency_guage - .with(&prometheus::labels! {"domain" => domain_name.as_str()}) - .set(elapsed); - self.time_tracker - .insert(identifier, (None, expected_sequence.wrapping_add(1))); - return true; - } else { - return false; - }; - } else { - info!(identifier, "Discarding wrong identifier"); - } + return self.handle_echo_reply(identifier, sequence); } _ => { // We ignore the rest. @@ -282,6 +275,44 @@ where fn recv_pkt(&mut self) -> std::io::Result; fn recv_all>(&mut self, handler: H); + + fn send_pkt( + &mut self, + state: &mut State, + identifier: u16, + dest: AddrType, + domain_name: &String, + ) -> std::io::Result<()> { + let sequence = *state.destination_counter.entry(identifier).or_insert(0); + debug!( + domain=domain_name, %dest, sequence, + "Sending echo request", + ); + match self.send_to_destination(dest, identifier, sequence) { + Err(e) => { + state + .ping_counter + .with(&prometheus::labels! {"result" => "err", "type" => "send"}) + .inc(); + error!( + domain=domain_name, %dest, err=?e, + "Error sending. Trying again later", + ); + } + Ok(send_time) => { + state + .time_tracker + .entry(identifier) + .or_insert_with(|| BTreeMap::new()) + .insert(sequence, send_time); + } + } + state + .destination_counter + .get_mut(&identifier) + .map(|v| *v = v.wrapping_add(1)); + Ok(()) + } } impl Pinger for PingerImpl @@ -290,70 +321,12 @@ where Sock::AddrType: std::fmt::Display + Copy, Sock::PacketType: WithEchoRequest, { - #[instrument(skip(self, state))] + #[instrument(skip_all)] fn send_all(&mut self, state: &mut State) -> std::io::Result<()> { let destinations = state.destinations.clone(); debug!("Attempting to send packets for all domains"); for (identifier, (domain_name, dest)) in destinations.into_iter() { - let previous_tracker = state.time_tracker.get(&identifier); - let sequence = if previous_tracker.is_some() { - let (send_status, sequence) = previous_tracker.unwrap(); - if let Some(send_time) = send_status { - // We haven't recieved the previous packet response yet so don't send unless we've waited - // for timeout length of time. - let elapsed = Instant::now() - *send_time; - if elapsed > Duration::from_secs(PINGTIMEOUT.flag) { - info!( - domain = domain_name, - ?send_time, - ?elapsed, - seq = sequence, - "Dropped packet detected", - ); - state.ping_counter - .with(&prometheus::labels! {"result" => "dropped", "domain" => &domain_name}) - .inc(); - sequence.wrapping_add(1) - } else { - debug!( - domain = domain_name, - seq = sequence, - "Waiting for timeout before sending next packet", - ); - continue; - } - } else { - *sequence - } - } else { - debug!( - domain = domain_name, - seq = 0 as u16, - "Initializing sequence for first send", - ); - 0 - }; - debug!( - domain=domain_name, %dest, sequence, - "Sending echo request", - ); - match self.send_to_destination(dest, identifier, sequence) { - Err(e) => { - state - .ping_counter - .with(&prometheus::labels! {"result" => "err", "type" => "send"}) - .inc(); - error!( - domain=domain_name, %dest, err=?e, - "Error sending. Trying again later", - ); - } - Ok(send_time) => { - state - .time_tracker - .insert(identifier, (Some(send_time), sequence)); - } - } + self.send_pkt(state, identifier, dest, &domain_name)?; } debug!("Finished sending for domains"); Ok(()) @@ -391,11 +364,11 @@ where .get_mut_state() .time_tracker .values() - .find(|item| item.0.is_some()) + .find(|item| !item.is_empty()) .is_none() { // nothing has been sent yet so no need to try to recv packets - debug!("Nothing to recieve for so skipping for this socket"); + debug!("Nothing to recieve so skipping for this socket"); return; } self.sock.set_timeout(None); @@ -495,7 +468,8 @@ pub fn schedule_echo_server( } let v4_state = State { destinations: v4_destinations, - time_tracker: HashMap::new(), + time_tracker: BTreeMap::new(), + destination_counter: BTreeMap::new(), latency_guage: ping_latency_guage.clone(), ping_counter: ping_counter.clone(), }; @@ -515,7 +489,8 @@ pub fn schedule_echo_server( }; let v6_state = State { destinations: v6_destinations, - time_tracker: HashMap::new(), + time_tracker: BTreeMap::new(), + destination_counter: BTreeMap::new(), latency_guage: ping_latency_guage, ping_counter, }; diff --git a/src/main.rs b/src/main.rs index ca41e28..a29a6da 100644 --- a/src/main.rs +++ b/src/main.rs @@ -116,8 +116,11 @@ fn main() -> anyhow::Result<()> { &["domain"], ) .unwrap(); - let ping_latency_vec = - GaugeVec::new(Opts::new("ping_latency", "ICMP Ping latency"), &["domain"]).unwrap(); + let ping_latency_vec = GaugeVec::new( + Opts::new("ping_latency", "ICMP Ping latency in milliseconds"), + &["domain"], + ) + .unwrap(); let ping_counter_vec = CounterVec::new( Opts::new("ping_counter", "Ping Request Counter"), &["result", "domain"],