Higher resolution ping timings

This commit is contained in:
Jeremy Wall 2023-02-27 15:19:16 -05:00
parent b45b29e22d
commit 390b61ee86
2 changed files with 121 additions and 143 deletions

View File

@ -14,7 +14,7 @@
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::ops::Sub;
use std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
time::{Duration, Instant},
};
@ -63,11 +63,77 @@ fn resolve_host_address(host: &str) -> String {
struct State<AddrType> {
destinations: HashMap<u16, (String, AddrType)>, // domain, address
// TODO(jwall): This should be a time tracker by both identifier and sequence
time_tracker: HashMap<u16, (Option<Instant>, u16)>,
time_tracker: BTreeMap<u16, BTreeMap<u16, Instant>>,
destination_counter: BTreeMap<u16, u16>,
latency_guage: GaugeVec,
ping_counter: CounterVec,
}
impl<AddrType: std::fmt::Display> State<AddrType> {
fn handle_echo_reply(&mut self, identifier: u16, sequence: u16) -> bool {
if let Some((domain_name, dest)) = self.destinations.get(&identifier) {
let time_tracker = self.time_tracker.get_mut(&identifier);
if let Some(Some(send_time)) = time_tracker.as_ref().map(|m| m.get(&sequence)) {
let elapsed = Instant::now().sub(send_time.clone()).as_micros() as f64 / 1000.00;
// We make a copy here to avoid the borrow above sticking around for too long.
info!(
domain=domain_name,
%dest,
time = elapsed,
seq = sequence,
"Reply",
);
self.ping_counter
.with(&prometheus::labels! {"result" => "ok", "domain" => domain_name})
.inc();
if elapsed as i32 != 0 {
self.latency_guage
.with(&prometheus::labels! {"domain" => domain_name.as_str()})
.set(elapsed);
}
self.time_tracker
.get_mut(&identifier)
.and_then(|m| m.remove(&sequence));
return true;
} else {
error!(sequence, "Discarding unexpected sequence",);
};
// Check all the other sequences to see if they have expired timeouts yet.
// Record timeout for the expired sequences.
// Remove the timeouts for the expired sequences.
let expired_sequences = self.time_tracker.get(&identifier).map(|m| {
let mut for_delete = Vec::with_capacity(m.len());
let m = m.clone();
{
for (k, send_time) in m.iter() {
if Instant::now().sub(*send_time) >= Duration::from_secs(PINGTIMEOUT.flag) {
info!(
domain=domain_name,
%dest,
seq = sequence,
"Dropped"
);
self.ping_counter
.with(&prometheus::labels! {"result" => "timeout", "domain" => domain_name})
.inc();
for_delete.push(*k);
}
}
}
for_delete
});
for k in expired_sequences.unwrap_or_default() {
self.time_tracker
.get_mut(&identifier)
.and_then(|m| m.remove(&k));
}
} else {
warn!(identifier, "Discarding wrong identifier");
}
return false;
}
}
struct PingerImpl<Sock: IcmpSocket> {
sock: Sock,
}
@ -159,48 +225,7 @@ impl<'a> PacketHandler<Icmpv6Packet, Ipv6Addr> for &'a mut State<Ipv6Addr> {
sequence,
payload: _,
} => {
if let Some((domain_name, dest)) = self.destinations.get(&identifier) {
if let Some((Some(send_time), expected_sequence)) =
self.time_tracker.get(&identifier)
{
let elapsed =
Instant::now().sub(send_time.clone()).as_millis() as f64 / 1000.00;
// We make a copy here to avoid the borrow above sticking around for too long.
let expected_sequence = *expected_sequence;
if sequence != expected_sequence {
error!(
sequence,
expected = expected_sequence,
"Discarding unexpected sequence",
);
self.time_tracker
.insert(identifier, (None, expected_sequence.wrapping_add(1)));
return false;
}
info!(
domain=domain_name,
%dest,
time = elapsed,
seq = sequence,
"Reply",
);
self.ping_counter
.with(&prometheus::labels! {"result" => "ok", "domain" => domain_name})
.inc();
if elapsed as i32 != 0 {
self.latency_guage
.with(&prometheus::labels! {"domain" => domain_name.as_str()})
.set(elapsed);
}
self.time_tracker
.insert(identifier, (None, expected_sequence.wrapping_add(1)));
return true;
} else {
return false;
};
} else {
warn!(identifier, "Discarding wrong identifier");
}
return self.handle_echo_reply(identifier, sequence);
}
_ => {
// We ignore the rest.
@ -224,39 +249,7 @@ impl<'a> PacketHandler<Icmpv4Packet, Ipv4Addr> for &'a mut State<Ipv4Addr> {
sequence,
payload: _,
} => {
if let Some((domain_name, dest)) = self.destinations.get(&identifier) {
if let Some((Some(send_time), expected_sequence)) =
self.time_tracker.get(&identifier)
{
let elapsed =
Instant::now().sub(send_time.clone()).as_micros() as f64 / 1000.00;
let expected_sequence = *expected_sequence;
if expected_sequence != sequence {
error!(
sequence,
expected = expected_sequence,
"Discarding unexpected sequence",
);
self.time_tracker
.insert(identifier, (None, expected_sequence.wrapping_add(1)));
return false;
}
info!(domain=domain_name, %dest, time = elapsed, seq = sequence, "Reply",);
self.ping_counter
.with(&prometheus::labels! {"result" => "ok", "domain" => domain_name})
.inc();
self.latency_guage
.with(&prometheus::labels! {"domain" => domain_name.as_str()})
.set(elapsed);
self.time_tracker
.insert(identifier, (None, expected_sequence.wrapping_add(1)));
return true;
} else {
return false;
};
} else {
info!(identifier, "Discarding wrong identifier");
}
return self.handle_echo_reply(identifier, sequence);
}
_ => {
// We ignore the rest.
@ -282,57 +275,15 @@ where
fn recv_pkt(&mut self) -> std::io::Result<PacketType>;
fn recv_all<H: PacketHandler<PacketType, AddrType>>(&mut self, handler: H);
}
impl<Sock> Pinger<Sock::AddrType, Sock::PacketType> for PingerImpl<Sock>
where
Sock: IcmpSocket,
Sock::AddrType: std::fmt::Display + Copy,
Sock::PacketType: WithEchoRequest<Packet = Sock::PacketType>,
{
#[instrument(skip(self, state))]
fn send_all(&mut self, state: &mut State<Sock::AddrType>) -> std::io::Result<()> {
let destinations = state.destinations.clone();
debug!("Attempting to send packets for all domains");
for (identifier, (domain_name, dest)) in destinations.into_iter() {
let previous_tracker = state.time_tracker.get(&identifier);
let sequence = if previous_tracker.is_some() {
let (send_status, sequence) = previous_tracker.unwrap();
if let Some(send_time) = send_status {
// We haven't recieved the previous packet response yet so don't send unless we've waited
// for timeout length of time.
let elapsed = Instant::now() - *send_time;
if elapsed > Duration::from_secs(PINGTIMEOUT.flag) {
info!(
domain = domain_name,
?send_time,
?elapsed,
seq = sequence,
"Dropped packet detected",
);
state.ping_counter
.with(&prometheus::labels! {"result" => "dropped", "domain" => &domain_name})
.inc();
sequence.wrapping_add(1)
} else {
debug!(
domain = domain_name,
seq = sequence,
"Waiting for timeout before sending next packet",
);
continue;
}
} else {
*sequence
}
} else {
debug!(
domain = domain_name,
seq = 0 as u16,
"Initializing sequence for first send",
);
0
};
fn send_pkt(
&mut self,
state: &mut State<AddrType>,
identifier: u16,
dest: AddrType,
domain_name: &String,
) -> std::io::Result<()> {
let sequence = *state.destination_counter.entry(identifier).or_insert(0);
debug!(
domain=domain_name, %dest, sequence,
"Sending echo request",
@ -351,9 +302,31 @@ where
Ok(send_time) => {
state
.time_tracker
.insert(identifier, (Some(send_time), sequence));
.entry(identifier)
.or_insert_with(|| BTreeMap::new())
.insert(sequence, send_time);
}
}
state
.destination_counter
.get_mut(&identifier)
.map(|v| *v = v.wrapping_add(1));
Ok(())
}
}
impl<Sock> Pinger<Sock::AddrType, Sock::PacketType> for PingerImpl<Sock>
where
Sock: IcmpSocket,
Sock::AddrType: std::fmt::Display + Copy,
Sock::PacketType: WithEchoRequest<Packet = Sock::PacketType>,
{
#[instrument(skip_all)]
fn send_all(&mut self, state: &mut State<Sock::AddrType>) -> std::io::Result<()> {
let destinations = state.destinations.clone();
debug!("Attempting to send packets for all domains");
for (identifier, (domain_name, dest)) in destinations.into_iter() {
self.send_pkt(state, identifier, dest, &domain_name)?;
}
debug!("Finished sending for domains");
Ok(())
@ -391,11 +364,11 @@ where
.get_mut_state()
.time_tracker
.values()
.find(|item| item.0.is_some())
.find(|item| !item.is_empty())
.is_none()
{
// nothing has been sent yet so no need to try to recv packets
debug!("Nothing to recieve for so skipping for this socket");
debug!("Nothing to recieve so skipping for this socket");
return;
}
self.sock.set_timeout(None);
@ -495,7 +468,8 @@ pub fn schedule_echo_server(
}
let v4_state = State {
destinations: v4_destinations,
time_tracker: HashMap::new(),
time_tracker: BTreeMap::new(),
destination_counter: BTreeMap::new(),
latency_guage: ping_latency_guage.clone(),
ping_counter: ping_counter.clone(),
};
@ -515,7 +489,8 @@ pub fn schedule_echo_server(
};
let v6_state = State {
destinations: v6_destinations,
time_tracker: HashMap::new(),
time_tracker: BTreeMap::new(),
destination_counter: BTreeMap::new(),
latency_guage: ping_latency_guage,
ping_counter,
};

View File

@ -116,8 +116,11 @@ fn main() -> anyhow::Result<()> {
&["domain"],
)
.unwrap();
let ping_latency_vec =
GaugeVec::new(Opts::new("ping_latency", "ICMP Ping latency"), &["domain"]).unwrap();
let ping_latency_vec = GaugeVec::new(
Opts::new("ping_latency", "ICMP Ping latency in milliseconds"),
&["domain"],
)
.unwrap();
let ping_counter_vec = CounterVec::new(
Opts::new("ping_counter", "Ping Request Counter"),
&["result", "domain"],