diff --git a/src/icmp.rs b/src/icmp.rs index 3b449ff..aaea847 100644 --- a/src/icmp.rs +++ b/src/icmp.rs @@ -26,6 +26,7 @@ use icmp_socket::{ IcmpSocket, IcmpSocket4, IcmpSocket6, Icmpv4Packet, Icmpv6Packet, }; use log::{debug, error, info}; +use nursery::{thread, Nursery}; use prometheus::{CounterVec, GaugeVec}; gflags::define! { @@ -34,8 +35,8 @@ gflags::define! { } gflags::define! { - /// The timeout for ping requests. - --pingTimeout: u64 = 2048 + /// The timeout for ping requests in seconds. + --pingTimeout: u64 = 3 } gflags::define! { @@ -60,9 +61,9 @@ fn resolve_host_address(host: &str) -> String { } struct State { - sequence: u16, destinations: HashMap, // domain, address - time_tracker: HashMap, + // TODO(jwall): This should be a time tracker by both identifier and sequence + time_tracker: HashMap, u16)>, latency_guage: GaugeVec, ping_counter: CounterVec, } @@ -87,6 +88,7 @@ impl<'a> PacketHandler for &'a mut State { } fn handle_pkt(&mut self, pkt: Icmpv6Packet) -> bool { + debug!("ICMP: handling packet {:?}", pkt); match pkt.message { Icmpv6Message::Unreachable { _unused, @@ -106,8 +108,8 @@ impl<'a> PacketHandler for &'a mut State { }) => { if let Some((domain_name, _addr)) = self.destinations.get(&identifier) { self.ping_counter - .with(&prometheus::labels! {"result" => "unreachable", "domain" => domain_name}) - .inc(); + .with(&prometheus::labels! {"result" => "unreachable", "domain" => domain_name}) + .inc(); return true; } } @@ -158,28 +160,40 @@ impl<'a> PacketHandler for &'a mut State { payload: _, } => { if let Some((domain_name, dest)) = self.destinations.get(&identifier) { - if self.sequence != sequence { - error!("ICMP: Discarding sequence {}", sequence); - return false; - } - let elapsed = if let Some(send_time) = self.time_tracker.get(&identifier) { - Instant::now().sub(send_time.clone()).as_micros() as f64 / 1000.00 + if let Some((Some(send_time), expected_sequence)) = + self.time_tracker.get(&identifier) + { + let elapsed = + Instant::now().sub(send_time.clone()).as_millis() as f64 / 1000.00; + // We make a copy here to avoid the borrow above sticking around for too long. + let expected_sequence = *expected_sequence; + if sequence != expected_sequence { + error!( + "ICMP: Discarding unexpected sequence sequence={} expected={}", + sequence, expected_sequence + ); + self.time_tracker + .insert(identifier, (None, expected_sequence.wrapping_add(1))); + return false; + } + info!( + "ICMP: Reply from {}({}): time={}ms, seq={}", + domain_name, dest, elapsed, sequence, + ); + self.ping_counter + .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) + .inc(); + if elapsed as i32 != 0 { + self.latency_guage + .with(&prometheus::labels! {"domain" => domain_name.as_str()}) + .set(elapsed); + } + self.time_tracker + .insert(identifier, (None, expected_sequence.wrapping_add(1))); + return true; } else { return false; }; - info!( - "ICMP: Reply from {}({}): time={}ms, seq={}", - domain_name, dest, elapsed, sequence, - ); - self.ping_counter - .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) - .inc(); - if elapsed as i32 != 0 { - self.latency_guage - .with(&prometheus::labels! {"domain" => domain_name.as_str()}) - .set(elapsed); - } - return true; } else { info!("ICMP: Discarding wrong identifier {}", identifier); } @@ -198,59 +212,45 @@ impl<'a> PacketHandler for &'a mut State { } fn handle_pkt(&mut self, pkt: Icmpv4Packet) -> bool { + debug!("ICMP: handling packet {:?}", pkt); match pkt.message { - Icmpv4Message::ParameterProblem { - pointer: _, - padding: _, - header: _, - } => { - self.ping_counter - .with(&prometheus::labels! {"result" => "parameter_problem", "domain" => "unknown"}) - .inc(); - } - Icmpv4Message::Unreachable { padding: _, header } => { - let dest_addr = Ipv4Addr::new(header[16], header[17], header[18], header[19]); - info!("ICMP: Destination Unreachable response from {}", dest_addr,); - self.ping_counter - .with(&prometheus::labels! {"result" => "unreachable", "domain" => "unknown"}) - .inc(); - } - Icmpv4Message::TimeExceeded { padding: _, header } => { - let dest_addr = Ipv4Addr::new(header[16], header[17], header[18], header[19]); - info!("ICMP: Timeout for {}", dest_addr); - self.ping_counter - .with(&prometheus::labels! {"result" => "timeout", "domain" => "unknown"}) - .inc(); - } Icmpv4Message::EchoReply { identifier, sequence, payload: _, } => { if let Some((domain_name, dest)) = self.destinations.get(&identifier) { - let elapsed = if let Some(send_time) = self.time_tracker.get(&identifier) { - Instant::now().sub(send_time.clone()).as_micros() as f64 / 1000.00 + if let Some((Some(send_time), expected_sequence)) = + self.time_tracker.get(&identifier) + { + let elapsed = + Instant::now().sub(send_time.clone()).as_micros() as f64 / 1000.00; + let expected_sequence = *expected_sequence; + if expected_sequence != sequence { + error!( + "ICMP: Discarding unexpected sequence sequence={} expected={}", + sequence, expected_sequence + ); + self.time_tracker + .insert(identifier, (None, expected_sequence.wrapping_add(1))); + return false; + } + info!( + "ICMP: Reply from {}({}): time={}ms, seq={}", + domain_name, dest, elapsed, sequence, + ); + self.ping_counter + .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) + .inc(); + self.latency_guage + .with(&prometheus::labels! {"domain" => domain_name.as_str()}) + .set(elapsed); + self.time_tracker + .insert(identifier, (None, expected_sequence.wrapping_add(1))); + return true; } else { return false; }; - if self.sequence != sequence { - error!( - "ICMP: Discarding sequence {}, expected sequence {}", - sequence, self.sequence - ); - return false; - } - info!( - "ICMP: Reply from {}({}): time={}ms, seq={}", - domain_name, dest, elapsed, sequence, - ); - self.ping_counter - .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) - .inc(); - self.latency_guage - .with(&prometheus::labels! {"domain" => domain_name.as_str()}) - .set(elapsed); - return true; } else { info!("ICMP: Discarding wrong identifier {}", identifier); } @@ -288,11 +288,47 @@ where Sock::PacketType: WithEchoRequest, { fn send_all(&mut self, state: &mut State) -> std::io::Result<()> { - self.sock.set_timeout(self.timeout)?; let destinations = state.destinations.clone(); + info!("ICMP: Attempting to send packets for all domains"); for (identifier, (domain_name, dest)) in destinations.into_iter() { - debug!("ICMP: sending echo request to {}({})", domain_name, dest); - match self.send_to_destination(dest, identifier, state.sequence) { + let previous_tracker = state.time_tracker.get(&identifier); + let sequence = if previous_tracker.is_some() { + let (send_status, sequence) = previous_tracker.unwrap(); + if let Some(send_time) = send_status { + // We haven't recieved the previous packet response yet so don't send unless we've waited + // for timeout length of time. + let elapsed = Instant::now() - *send_time; + if elapsed > Duration::from_secs(PINGTIMEOUT.flag) { + info!( + "ICMP: Dropped packet detected for domain_name={} send_time={:?} elapsed={:?} sequence={}", + domain_name, send_time, elapsed, sequence + ); + state.ping_counter + .with(&prometheus::labels! {"result" => "dropped", "domain" => &domain_name}) + .inc(); + sequence.wrapping_add(1) + } else { + debug!( + "ICMP: Waiting for timeout before sending next packet domain_name={} sequence={}", + domain_name, sequence + ); + continue; + } + } else { + *sequence + } + } else { + debug!( + "ICMP: Initializing sequence for first send domain_name={} sequence=0", + domain_name + ); + 0 + }; + info!( + "ICMP: sending echo request to {}({}) sequence={}", + domain_name, dest, sequence + ); + match self.send_to_destination(dest, identifier, sequence) { Err(e) => { state .ping_counter @@ -304,10 +340,13 @@ where ); } Ok(send_time) => { - state.time_tracker.insert(identifier, send_time); + state + .time_tracker + .insert(identifier, (Some(send_time), sequence)); } } } + debug!("ICMP: finished sending for domains"); Ok(()) } @@ -334,37 +373,82 @@ where } fn recv_all>(&mut self, mut handler: H) { - let expected_len = handler.get_mut_state().time_tracker.len(); - for _ in 0..expected_len { - loop { - // Receive loop - match self.recv_pkt() { - Ok(pkt) => { - if handler.handle_pkt(pkt) { - // break out of the recv loop - break; - } - } - Err(e) => { - error!("Error receiving packet: {:?}", e); - handler - .get_mut_state() - .ping_counter - .with(&prometheus::labels! {"result" => "err", "domain" => "unknown"}) - .inc(); + if handler.get_mut_state().destinations.is_empty() { + debug!("Nothing to send to so skipping for this socket"); + return; + }; + if handler + .get_mut_state() + .time_tracker + .values() + .find(|item| item.0.is_some()) + .is_none() + { + // nothing has been sent yet so no need to try to recv packets + debug!("Nothing to recieve for so skipping for this socket"); + return; + } + self.sock + .set_timeout(self.timeout) + .expect("Unable to set timout for recieves on socket."); + let loop_start_time = Instant::now(); + loop { + // Receive loop + debug!("ICMP: Attempting to recieve packets on socket"); + match self.recv_pkt() { + Ok(pkt) => { + if handler.handle_pkt(pkt) { + // break out of the recv loop + debug!("ICMP: Recieved Packet"); + return; } } + Err(e) => { + error!("ICMP: Error receiving packet: {:?}", e); + handler + .get_mut_state() + .ping_counter + .with(&prometheus::labels! {"result" => "err", "domain" => "unknown"}) + .inc(); + return; + } + } + if (Instant::now() - loop_start_time) > Duration::from_secs(PINGTIMEOUT.flag) { + info!("ICMP: Timing out on recieve loop"); + return; } } - let mut state = handler.get_mut_state(); - state.sequence = state.sequence.wrapping_add(1); } } -pub fn start_echo_loop( +struct Multi { + v4_state: State, + v6_state: State, + v4_pinger: PingerImpl, + v6_pinger: PingerImpl, +} + +impl Multi { + fn send_all(&mut self) { + self.v4_pinger + .send_all(&mut self.v4_state) + .expect("Error sending packets on socket"); + self.v6_pinger + .send_all(&mut self.v6_state) + .expect("Error sending packets on socket"); + } + + fn recv_all(&mut self) { + self.v4_pinger.recv_all(&mut self.v4_state); + self.v6_pinger.recv_all(&mut self.v6_state); + } +} + +pub fn schedule_echo_server( domain_names: &Vec<&str>, ping_latency_guage: GaugeVec, ping_counter: CounterVec, + parent: &mut Nursery, ) { let resolved: Vec<(String, IpAddr)> = domain_names .iter() @@ -396,8 +480,7 @@ pub fn start_echo_loop( v4_destinations.insert(v4_id_counter, target.clone()); v4_id_counter += 1; } - let mut v4_state = State { - sequence: 0, + let v4_state = State { destinations: v4_destinations, time_tracker: HashMap::new(), latency_guage: ping_latency_guage.clone(), @@ -410,30 +493,45 @@ pub fn start_echo_loop( v6_destinations.insert(v6_id_counter, target.clone()); v6_id_counter += 1; } - let mut v4_pinger = PingerImpl { + let v4_pinger = PingerImpl { sock: IcmpSocket4::new().expect("Failed to open Icmpv4 Socket"), - timeout: Duration::from_secs(1), + timeout: Duration::from_millis(10), }; - let mut v6_state = State { - sequence: 0, + let v6_state = State { destinations: v6_destinations, time_tracker: HashMap::new(), latency_guage: ping_latency_guage, ping_counter, }; - let mut v6_pinger = PingerImpl { + let v6_pinger = PingerImpl { sock: IcmpSocket6::new().expect("Failed to open Icmpv6 Socket"), - timeout: Duration::from_secs(1), + timeout: Duration::from_millis(10), }; - loop { - v4_pinger - .send_all(&mut v4_state) - .expect("Error sending packets on socket"); - v6_pinger - .send_all(&mut v6_state) - .expect("Error sending packets on socket"); - v4_pinger.recv_all(&mut v4_state); - v6_pinger.recv_all(&mut v6_state); - std::thread::sleep(Duration::from_secs(PINGDELAY.flag)) - } + let multi = std::sync::Arc::new(std::sync::Mutex::new(Multi { + v4_pinger, + v6_pinger, + v4_state, + v6_state, + })); + let send_multi = multi.clone(); + let send_thread = thread::Pending::new(move || { + info!("ICMP: Starrting send thread"); + loop { + { + send_multi.lock().unwrap().send_all(); + } + std::thread::sleep(Duration::from_secs(1)); + } + }); + let recv_thread = thread::Pending::new(move || { + info!("ICMP: Starrting recv thread"); + loop { + { + multi.lock().unwrap().recv_all(); + } + std::thread::sleep(Duration::from_millis(5)); + } + }); + parent.schedule(Box::new(send_thread)); + parent.schedule(Box::new(recv_thread)); } diff --git a/src/main.rs b/src/main.rs index a4895de..d60c4fb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -55,7 +55,7 @@ gflags::define! { fn main() -> anyhow::Result<()> { gflags::parse(); - let stun_servers: Vec<&str> = dbg!(STUNHOSTS.flag).split(",").collect(); + let stun_servers: Vec<&str> = STUNHOSTS.flag.split(",").collect(); if HELP.flag { println!("durnitisp "); @@ -78,9 +78,8 @@ fn main() -> anyhow::Result<()> { .timestamp(stderrlog::Timestamp::Millisecond) .init()?; - let ping_hosts: Vec<&str> = dbg!(PINGHOSTS.flag).split(",").collect(); + let ping_hosts: Vec<&str> = PINGHOSTS.flag.split(",").collect(); - dbg!(&ping_hosts); // Create a Registry and register metrics. let r = Registry::new(); let stun_counter_vec = CounterVec::new( @@ -167,10 +166,7 @@ fn main() -> anyhow::Result<()> { { let ping_latency_vec = ping_latency_vec.clone(); let ping_counter_vec = ping_counter_vec.clone(); - let ping_thread = thread::Pending::new(move || { - icmp::start_echo_loop(&ping_hosts, ping_latency_vec, ping_counter_vec); - }); - parent.schedule(Box::new(ping_thread)); + icmp::schedule_echo_server(&ping_hosts, ping_latency_vec, ping_counter_vec, &mut parent); } // Then we attempt to start connections to each stun server. for (i, s) in stun_socket_addrs.iter().enumerate() {