Fix a bunch of stuff for multi destination pings.

This commit is contained in:
Jeremy Wall 2021-02-26 14:23:36 -05:00
parent 278ef8da04
commit 0835ea3ade
2 changed files with 213 additions and 119 deletions

View File

@ -26,6 +26,7 @@ use icmp_socket::{
IcmpSocket, IcmpSocket4, IcmpSocket6, Icmpv4Packet, Icmpv6Packet,
};
use log::{debug, error, info};
use nursery::{thread, Nursery};
use prometheus::{CounterVec, GaugeVec};
gflags::define! {
@ -34,8 +35,8 @@ gflags::define! {
}
gflags::define! {
/// The timeout for ping requests.
--pingTimeout: u64 = 2048
/// The timeout for ping requests in seconds.
--pingTimeout: u64 = 3
}
gflags::define! {
@ -60,9 +61,9 @@ fn resolve_host_address(host: &str) -> String {
}
struct State<AddrType> {
sequence: u16,
destinations: HashMap<u16, (String, AddrType)>, // domain, address
time_tracker: HashMap<u16, Instant>,
// TODO(jwall): This should be a time tracker by both identifier and sequence
time_tracker: HashMap<u16, (Option<Instant>, u16)>,
latency_guage: GaugeVec,
ping_counter: CounterVec,
}
@ -87,6 +88,7 @@ impl<'a> PacketHandler<Icmpv6Packet, Ipv6Addr> for &'a mut State<Ipv6Addr> {
}
fn handle_pkt(&mut self, pkt: Icmpv6Packet) -> bool {
debug!("ICMP: handling packet {:?}", pkt);
match pkt.message {
Icmpv6Message::Unreachable {
_unused,
@ -106,8 +108,8 @@ impl<'a> PacketHandler<Icmpv6Packet, Ipv6Addr> for &'a mut State<Ipv6Addr> {
}) => {
if let Some((domain_name, _addr)) = self.destinations.get(&identifier) {
self.ping_counter
.with(&prometheus::labels! {"result" => "unreachable", "domain" => domain_name})
.inc();
.with(&prometheus::labels! {"result" => "unreachable", "domain" => domain_name})
.inc();
return true;
}
}
@ -158,28 +160,40 @@ impl<'a> PacketHandler<Icmpv6Packet, Ipv6Addr> for &'a mut State<Ipv6Addr> {
payload: _,
} => {
if let Some((domain_name, dest)) = self.destinations.get(&identifier) {
if self.sequence != sequence {
error!("ICMP: Discarding sequence {}", sequence);
return false;
}
let elapsed = if let Some(send_time) = self.time_tracker.get(&identifier) {
Instant::now().sub(send_time.clone()).as_micros() as f64 / 1000.00
if let Some((Some(send_time), expected_sequence)) =
self.time_tracker.get(&identifier)
{
let elapsed =
Instant::now().sub(send_time.clone()).as_millis() as f64 / 1000.00;
// We make a copy here to avoid the borrow above sticking around for too long.
let expected_sequence = *expected_sequence;
if sequence != expected_sequence {
error!(
"ICMP: Discarding unexpected sequence sequence={} expected={}",
sequence, expected_sequence
);
self.time_tracker
.insert(identifier, (None, expected_sequence.wrapping_add(1)));
return false;
}
info!(
"ICMP: Reply from {}({}): time={}ms, seq={}",
domain_name, dest, elapsed, sequence,
);
self.ping_counter
.with(&prometheus::labels! {"result" => "ok", "domain" => domain_name})
.inc();
if elapsed as i32 != 0 {
self.latency_guage
.with(&prometheus::labels! {"domain" => domain_name.as_str()})
.set(elapsed);
}
self.time_tracker
.insert(identifier, (None, expected_sequence.wrapping_add(1)));
return true;
} else {
return false;
};
info!(
"ICMP: Reply from {}({}): time={}ms, seq={}",
domain_name, dest, elapsed, sequence,
);
self.ping_counter
.with(&prometheus::labels! {"result" => "ok", "domain" => domain_name})
.inc();
if elapsed as i32 != 0 {
self.latency_guage
.with(&prometheus::labels! {"domain" => domain_name.as_str()})
.set(elapsed);
}
return true;
} else {
info!("ICMP: Discarding wrong identifier {}", identifier);
}
@ -198,59 +212,45 @@ impl<'a> PacketHandler<Icmpv4Packet, Ipv4Addr> for &'a mut State<Ipv4Addr> {
}
fn handle_pkt(&mut self, pkt: Icmpv4Packet) -> bool {
debug!("ICMP: handling packet {:?}", pkt);
match pkt.message {
Icmpv4Message::ParameterProblem {
pointer: _,
padding: _,
header: _,
} => {
self.ping_counter
.with(&prometheus::labels! {"result" => "parameter_problem", "domain" => "unknown"})
.inc();
}
Icmpv4Message::Unreachable { padding: _, header } => {
let dest_addr = Ipv4Addr::new(header[16], header[17], header[18], header[19]);
info!("ICMP: Destination Unreachable response from {}", dest_addr,);
self.ping_counter
.with(&prometheus::labels! {"result" => "unreachable", "domain" => "unknown"})
.inc();
}
Icmpv4Message::TimeExceeded { padding: _, header } => {
let dest_addr = Ipv4Addr::new(header[16], header[17], header[18], header[19]);
info!("ICMP: Timeout for {}", dest_addr);
self.ping_counter
.with(&prometheus::labels! {"result" => "timeout", "domain" => "unknown"})
.inc();
}
Icmpv4Message::EchoReply {
identifier,
sequence,
payload: _,
} => {
if let Some((domain_name, dest)) = self.destinations.get(&identifier) {
let elapsed = if let Some(send_time) = self.time_tracker.get(&identifier) {
Instant::now().sub(send_time.clone()).as_micros() as f64 / 1000.00
if let Some((Some(send_time), expected_sequence)) =
self.time_tracker.get(&identifier)
{
let elapsed =
Instant::now().sub(send_time.clone()).as_micros() as f64 / 1000.00;
let expected_sequence = *expected_sequence;
if expected_sequence != sequence {
error!(
"ICMP: Discarding unexpected sequence sequence={} expected={}",
sequence, expected_sequence
);
self.time_tracker
.insert(identifier, (None, expected_sequence.wrapping_add(1)));
return false;
}
info!(
"ICMP: Reply from {}({}): time={}ms, seq={}",
domain_name, dest, elapsed, sequence,
);
self.ping_counter
.with(&prometheus::labels! {"result" => "ok", "domain" => domain_name})
.inc();
self.latency_guage
.with(&prometheus::labels! {"domain" => domain_name.as_str()})
.set(elapsed);
self.time_tracker
.insert(identifier, (None, expected_sequence.wrapping_add(1)));
return true;
} else {
return false;
};
if self.sequence != sequence {
error!(
"ICMP: Discarding sequence {}, expected sequence {}",
sequence, self.sequence
);
return false;
}
info!(
"ICMP: Reply from {}({}): time={}ms, seq={}",
domain_name, dest, elapsed, sequence,
);
self.ping_counter
.with(&prometheus::labels! {"result" => "ok", "domain" => domain_name})
.inc();
self.latency_guage
.with(&prometheus::labels! {"domain" => domain_name.as_str()})
.set(elapsed);
return true;
} else {
info!("ICMP: Discarding wrong identifier {}", identifier);
}
@ -288,11 +288,47 @@ where
Sock::PacketType: WithEchoRequest<Packet = Sock::PacketType>,
{
fn send_all(&mut self, state: &mut State<Sock::AddrType>) -> std::io::Result<()> {
self.sock.set_timeout(self.timeout)?;
let destinations = state.destinations.clone();
info!("ICMP: Attempting to send packets for all domains");
for (identifier, (domain_name, dest)) in destinations.into_iter() {
debug!("ICMP: sending echo request to {}({})", domain_name, dest);
match self.send_to_destination(dest, identifier, state.sequence) {
let previous_tracker = state.time_tracker.get(&identifier);
let sequence = if previous_tracker.is_some() {
let (send_status, sequence) = previous_tracker.unwrap();
if let Some(send_time) = send_status {
// We haven't recieved the previous packet response yet so don't send unless we've waited
// for timeout length of time.
let elapsed = Instant::now() - *send_time;
if elapsed > Duration::from_secs(PINGTIMEOUT.flag) {
info!(
"ICMP: Dropped packet detected for domain_name={} send_time={:?} elapsed={:?} sequence={}",
domain_name, send_time, elapsed, sequence
);
state.ping_counter
.with(&prometheus::labels! {"result" => "dropped", "domain" => &domain_name})
.inc();
sequence.wrapping_add(1)
} else {
debug!(
"ICMP: Waiting for timeout before sending next packet domain_name={} sequence={}",
domain_name, sequence
);
continue;
}
} else {
*sequence
}
} else {
debug!(
"ICMP: Initializing sequence for first send domain_name={} sequence=0",
domain_name
);
0
};
info!(
"ICMP: sending echo request to {}({}) sequence={}",
domain_name, dest, sequence
);
match self.send_to_destination(dest, identifier, sequence) {
Err(e) => {
state
.ping_counter
@ -304,10 +340,13 @@ where
);
}
Ok(send_time) => {
state.time_tracker.insert(identifier, send_time);
state
.time_tracker
.insert(identifier, (Some(send_time), sequence));
}
}
}
debug!("ICMP: finished sending for domains");
Ok(())
}
@ -334,37 +373,82 @@ where
}
fn recv_all<H: PacketHandler<Sock::PacketType, Sock::AddrType>>(&mut self, mut handler: H) {
let expected_len = handler.get_mut_state().time_tracker.len();
for _ in 0..expected_len {
loop {
// Receive loop
match self.recv_pkt() {
Ok(pkt) => {
if handler.handle_pkt(pkt) {
// break out of the recv loop
break;
}
}
Err(e) => {
error!("Error receiving packet: {:?}", e);
handler
.get_mut_state()
.ping_counter
.with(&prometheus::labels! {"result" => "err", "domain" => "unknown"})
.inc();
if handler.get_mut_state().destinations.is_empty() {
debug!("Nothing to send to so skipping for this socket");
return;
};
if handler
.get_mut_state()
.time_tracker
.values()
.find(|item| item.0.is_some())
.is_none()
{
// nothing has been sent yet so no need to try to recv packets
debug!("Nothing to recieve for so skipping for this socket");
return;
}
self.sock
.set_timeout(self.timeout)
.expect("Unable to set timout for recieves on socket.");
let loop_start_time = Instant::now();
loop {
// Receive loop
debug!("ICMP: Attempting to recieve packets on socket");
match self.recv_pkt() {
Ok(pkt) => {
if handler.handle_pkt(pkt) {
// break out of the recv loop
debug!("ICMP: Recieved Packet");
return;
}
}
Err(e) => {
error!("ICMP: Error receiving packet: {:?}", e);
handler
.get_mut_state()
.ping_counter
.with(&prometheus::labels! {"result" => "err", "domain" => "unknown"})
.inc();
return;
}
}
if (Instant::now() - loop_start_time) > Duration::from_secs(PINGTIMEOUT.flag) {
info!("ICMP: Timing out on recieve loop");
return;
}
}
let mut state = handler.get_mut_state();
state.sequence = state.sequence.wrapping_add(1);
}
}
pub fn start_echo_loop(
struct Multi {
v4_state: State<Ipv4Addr>,
v6_state: State<Ipv6Addr>,
v4_pinger: PingerImpl<IcmpSocket4>,
v6_pinger: PingerImpl<IcmpSocket6>,
}
impl Multi {
fn send_all(&mut self) {
self.v4_pinger
.send_all(&mut self.v4_state)
.expect("Error sending packets on socket");
self.v6_pinger
.send_all(&mut self.v6_state)
.expect("Error sending packets on socket");
}
fn recv_all(&mut self) {
self.v4_pinger.recv_all(&mut self.v4_state);
self.v6_pinger.recv_all(&mut self.v6_state);
}
}
pub fn schedule_echo_server(
domain_names: &Vec<&str>,
ping_latency_guage: GaugeVec,
ping_counter: CounterVec,
parent: &mut Nursery,
) {
let resolved: Vec<(String, IpAddr)> = domain_names
.iter()
@ -396,8 +480,7 @@ pub fn start_echo_loop(
v4_destinations.insert(v4_id_counter, target.clone());
v4_id_counter += 1;
}
let mut v4_state = State {
sequence: 0,
let v4_state = State {
destinations: v4_destinations,
time_tracker: HashMap::new(),
latency_guage: ping_latency_guage.clone(),
@ -410,30 +493,45 @@ pub fn start_echo_loop(
v6_destinations.insert(v6_id_counter, target.clone());
v6_id_counter += 1;
}
let mut v4_pinger = PingerImpl {
let v4_pinger = PingerImpl {
sock: IcmpSocket4::new().expect("Failed to open Icmpv4 Socket"),
timeout: Duration::from_secs(1),
timeout: Duration::from_millis(10),
};
let mut v6_state = State {
sequence: 0,
let v6_state = State {
destinations: v6_destinations,
time_tracker: HashMap::new(),
latency_guage: ping_latency_guage,
ping_counter,
};
let mut v6_pinger = PingerImpl {
let v6_pinger = PingerImpl {
sock: IcmpSocket6::new().expect("Failed to open Icmpv6 Socket"),
timeout: Duration::from_secs(1),
timeout: Duration::from_millis(10),
};
loop {
v4_pinger
.send_all(&mut v4_state)
.expect("Error sending packets on socket");
v6_pinger
.send_all(&mut v6_state)
.expect("Error sending packets on socket");
v4_pinger.recv_all(&mut v4_state);
v6_pinger.recv_all(&mut v6_state);
std::thread::sleep(Duration::from_secs(PINGDELAY.flag))
}
let multi = std::sync::Arc::new(std::sync::Mutex::new(Multi {
v4_pinger,
v6_pinger,
v4_state,
v6_state,
}));
let send_multi = multi.clone();
let send_thread = thread::Pending::new(move || {
info!("ICMP: Starrting send thread");
loop {
{
send_multi.lock().unwrap().send_all();
}
std::thread::sleep(Duration::from_secs(1));
}
});
let recv_thread = thread::Pending::new(move || {
info!("ICMP: Starrting recv thread");
loop {
{
multi.lock().unwrap().recv_all();
}
std::thread::sleep(Duration::from_millis(5));
}
});
parent.schedule(Box::new(send_thread));
parent.schedule(Box::new(recv_thread));
}

View File

@ -55,7 +55,7 @@ gflags::define! {
fn main() -> anyhow::Result<()> {
gflags::parse();
let stun_servers: Vec<&str> = dbg!(STUNHOSTS.flag).split(",").collect();
let stun_servers: Vec<&str> = STUNHOSTS.flag.split(",").collect();
if HELP.flag {
println!("durnitisp <options> <list of hostname:port>");
@ -78,9 +78,8 @@ fn main() -> anyhow::Result<()> {
.timestamp(stderrlog::Timestamp::Millisecond)
.init()?;
let ping_hosts: Vec<&str> = dbg!(PINGHOSTS.flag).split(",").collect();
let ping_hosts: Vec<&str> = PINGHOSTS.flag.split(",").collect();
dbg!(&ping_hosts);
// Create a Registry and register metrics.
let r = Registry::new();
let stun_counter_vec = CounterVec::new(
@ -167,10 +166,7 @@ fn main() -> anyhow::Result<()> {
{
let ping_latency_vec = ping_latency_vec.clone();
let ping_counter_vec = ping_counter_vec.clone();
let ping_thread = thread::Pending::new(move || {
icmp::start_echo_loop(&ping_hosts, ping_latency_vec, ping_counter_vec);
});
parent.schedule(Box::new(ping_thread));
icmp::schedule_echo_server(&ping_hosts, ping_latency_vec, ping_counter_vec, &mut parent);
}
// Then we attempt to start connections to each stun server.
for (i, s) in stun_socket_addrs.iter().enumerate() {