From 57ff47b36fdb82d3c9bab310e484ca533d977949 Mon Sep 17 00:00:00 2001 From: Jeremy Wall Date: Sat, 30 Jan 2021 22:30:29 -0500 Subject: [PATCH] Finish Domain based state machine. --- Cargo.lock | 2 +- src/icmp.rs | 60 ++++++++++++++++++++++++++++++++++------------------- src/main.rs | 4 ++-- 3 files changed, 42 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f0f5a8c..f083b64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,7 +163,7 @@ dependencies = [ [[package]] name = "icmp-socket" version = "0.1.0" -source = "git+https://github.com/zaphar/icmp-socket.git?branch=master#37f3651995fc608cfe40bde9e89649ac5ea72df6" +source = "git+https://github.com/zaphar/icmp-socket.git?branch=master#42e39a621f2754328c4628c5acf8ade640f8911d" dependencies = [ "byteorder", "socket2", diff --git a/src/icmp.rs b/src/icmp.rs index a262ff7..a09667a 100644 --- a/src/icmp.rs +++ b/src/icmp.rs @@ -26,7 +26,7 @@ use icmp_socket::{ IcmpSocket, IcmpSocket4, IcmpSocket6, Icmpv4Packet, Icmpv6Packet, }; use log::{error, info}; -use prometheus::{CounterVec, IntGaugeVec}; +use prometheus::{CounterVec, GaugeVec}; use socket2::{self, SockAddr}; gflags::define! { @@ -63,7 +63,7 @@ fn loop_impl( stop_signal: Arc>, ) where PH: Fn(Sock::PacketType, socket2::SockAddr, Instant, u16) -> Option<()>, - EH: Fn(std::io::Error) -> (), + EH: Fn(std::io::Error, bool) -> (), Sock: IcmpSocket, Sock::AddrType: std::fmt::Display + Copy, Sock::PacketType: WithEchoRequest, @@ -85,23 +85,27 @@ fn loop_impl( .unwrap(); let send_time = Instant::now(); if let Err(e) = socket.send_to(dest, packet) { - err_handler(e); + err_handler(e, true); } else { loop { // Keep going until we get the packet we are looking for. - match socket.rcv_from() { + match socket.rcv_with_timeout(Duration::from_secs(1)) { Err(e) => { - err_handler(e); + err_handler(e, false); } Ok((resp, sock_addr)) => { if packet_handler(resp, sock_addr, send_time, sequence).is_some() { + sequence = sequence.wrapping_add(1); break; } } } + // Give up after 3 seconds and send another packet. + if Instant::now() - send_time > Duration::from_secs(3) { + break; + } } } - sequence += 1; std::thread::sleep(Duration::from_secs(3)); } } @@ -109,7 +113,7 @@ fn loop_impl( pub fn start_echo_loop( domain_name: &str, stop_signal: Arc>, - ping_latency_guage: IntGaugeVec, + ping_latency_guage: GaugeVec, ping_counter: CounterVec, ) { let resolved = resolve_host_address(domain_name); @@ -121,14 +125,18 @@ pub fn start_echo_loop( .parse::() .expect(&format!("Invalid IP Address {}", resolved)); - let err_handler = |e: std::io::Error| { - ping_counter - .with(&prometheus::labels! {"result" => "err", "domain" => domain_name}) - .inc(); - error!( - "ICMP: error sending domain: {} and address: {} failed: {:?}, Trying again later", - domain_name, &dest, e - ); + let err_handler = |e: std::io::Error, send: bool| { + if send { + error!( + "ICMP: error sending to domain: {} and address: {} failed: {:?}, Trying again later", + domain_name, &dest, e + ); + } else { + error!( + "ICMP: error receiving for domain: {} and address: {} failed: {:?}, Trying again later", + domain_name, &dest, e + ); + } }; match dest { IpAddr::V4(dest) => { @@ -191,7 +199,8 @@ pub fn start_echo_loop( info!("ICMP: Discarding sequence {}", sequence); return Some(()); } - let elapsed = Instant::now().sub(send_time.clone()).as_millis(); + let elapsed = + Instant::now().sub(send_time.clone()).as_micros() as f64 / 1000.00; info!( "ICMP: Reply from {}: time={}ms, seq={}", dest, elapsed, sequence, @@ -199,10 +208,10 @@ pub fn start_echo_loop( ping_counter .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) .inc(); - if elapsed != 0 { + if elapsed as i32 != 0 { ping_latency_guage .with(&prometheus::labels! {"domain" => domain_name}) - .set(elapsed as i64); + .set(elapsed); } } p => { @@ -252,7 +261,16 @@ pub fn start_echo_loop( sequence, payload: _, } => { - let elapsed = Instant::now().sub(send_time.clone()).as_millis(); + if sequence != seq { + info!("ICMP: Discarding sequence {}", sequence); + return Some(()); + } + let elapsed = + Instant::now().sub(send_time.clone()).as_micros() as f64 / 1000.00; + info!( + "ICMP: Reply from {}: time={}ms, seq={}", + dest, elapsed, sequence, + ); info!( "ICMP: Reply from {}: time={}ms, seq={}", dest, elapsed, sequence, @@ -260,10 +278,10 @@ pub fn start_echo_loop( ping_counter .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) .inc(); - if elapsed != 0 { + if elapsed as i32 != 0 { ping_latency_guage .with(&prometheus::labels! {"domain" => domain_name}) - .set(elapsed as i64); + .set(elapsed); } } _ => { diff --git a/src/main.rs b/src/main.rs index 3c3baf4..935400b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,7 +20,7 @@ use gflags; use log::{debug, error, info}; use nursery::thread; use nursery::{Nursery, Waitable}; -use prometheus; +use prometheus::{self, GaugeVec}; use prometheus::{CounterVec, Encoder, IntGaugeVec, Opts, Registry, TextEncoder}; use stderrlog; use tiny_http; @@ -118,7 +118,7 @@ fn main() -> anyhow::Result<()> { ) .unwrap(); let ping_latency_vec = - IntGaugeVec::new(Opts::new("ping_latency", "ICMP Ping latency"), &["domain"]).unwrap(); + GaugeVec::new(Opts::new("ping_latency", "ICMP Ping latency"), &["domain"]).unwrap(); let ping_counter_vec = CounterVec::new( Opts::new("ping_counter", "Ping Request Counter"), &["result", "domain"],