Finish Domain based state machine.

This commit is contained in:
Jeremy Wall 2021-01-30 22:30:29 -05:00
parent 8ca7122609
commit 57ff47b36f
3 changed files with 42 additions and 24 deletions

2
Cargo.lock generated
View File

@ -163,7 +163,7 @@ dependencies = [
[[package]] [[package]]
name = "icmp-socket" name = "icmp-socket"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/zaphar/icmp-socket.git?branch=master#37f3651995fc608cfe40bde9e89649ac5ea72df6" source = "git+https://github.com/zaphar/icmp-socket.git?branch=master#42e39a621f2754328c4628c5acf8ade640f8911d"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"socket2", "socket2",

View File

@ -26,7 +26,7 @@ use icmp_socket::{
IcmpSocket, IcmpSocket4, IcmpSocket6, Icmpv4Packet, Icmpv6Packet, IcmpSocket, IcmpSocket4, IcmpSocket6, Icmpv4Packet, Icmpv6Packet,
}; };
use log::{error, info}; use log::{error, info};
use prometheus::{CounterVec, IntGaugeVec}; use prometheus::{CounterVec, GaugeVec};
use socket2::{self, SockAddr}; use socket2::{self, SockAddr};
gflags::define! { gflags::define! {
@ -63,7 +63,7 @@ fn loop_impl<Sock, PH, EH>(
stop_signal: Arc<RwLock<bool>>, stop_signal: Arc<RwLock<bool>>,
) where ) where
PH: Fn(Sock::PacketType, socket2::SockAddr, Instant, u16) -> Option<()>, PH: Fn(Sock::PacketType, socket2::SockAddr, Instant, u16) -> Option<()>,
EH: Fn(std::io::Error) -> (), EH: Fn(std::io::Error, bool) -> (),
Sock: IcmpSocket, Sock: IcmpSocket,
Sock::AddrType: std::fmt::Display + Copy, Sock::AddrType: std::fmt::Display + Copy,
Sock::PacketType: WithEchoRequest<Packet = Sock::PacketType>, Sock::PacketType: WithEchoRequest<Packet = Sock::PacketType>,
@ -85,23 +85,27 @@ fn loop_impl<Sock, PH, EH>(
.unwrap(); .unwrap();
let send_time = Instant::now(); let send_time = Instant::now();
if let Err(e) = socket.send_to(dest, packet) { if let Err(e) = socket.send_to(dest, packet) {
err_handler(e); err_handler(e, true);
} else { } else {
loop { loop {
// Keep going until we get the packet we are looking for. // Keep going until we get the packet we are looking for.
match socket.rcv_from() { match socket.rcv_with_timeout(Duration::from_secs(1)) {
Err(e) => { Err(e) => {
err_handler(e); err_handler(e, false);
} }
Ok((resp, sock_addr)) => { Ok((resp, sock_addr)) => {
if packet_handler(resp, sock_addr, send_time, sequence).is_some() { if packet_handler(resp, sock_addr, send_time, sequence).is_some() {
sequence = sequence.wrapping_add(1);
break; break;
} }
} }
} }
// Give up after 3 seconds and send another packet.
if Instant::now() - send_time > Duration::from_secs(3) {
break;
}
} }
} }
sequence += 1;
std::thread::sleep(Duration::from_secs(3)); std::thread::sleep(Duration::from_secs(3));
} }
} }
@ -109,7 +113,7 @@ fn loop_impl<Sock, PH, EH>(
pub fn start_echo_loop( pub fn start_echo_loop(
domain_name: &str, domain_name: &str,
stop_signal: Arc<RwLock<bool>>, stop_signal: Arc<RwLock<bool>>,
ping_latency_guage: IntGaugeVec, ping_latency_guage: GaugeVec,
ping_counter: CounterVec, ping_counter: CounterVec,
) { ) {
let resolved = resolve_host_address(domain_name); let resolved = resolve_host_address(domain_name);
@ -121,14 +125,18 @@ pub fn start_echo_loop(
.parse::<IpAddr>() .parse::<IpAddr>()
.expect(&format!("Invalid IP Address {}", resolved)); .expect(&format!("Invalid IP Address {}", resolved));
let err_handler = |e: std::io::Error| { let err_handler = |e: std::io::Error, send: bool| {
ping_counter if send {
.with(&prometheus::labels! {"result" => "err", "domain" => domain_name}) error!(
.inc(); "ICMP: error sending to domain: {} and address: {} failed: {:?}, Trying again later",
error!( domain_name, &dest, e
"ICMP: error sending domain: {} and address: {} failed: {:?}, Trying again later", );
domain_name, &dest, e } else {
); error!(
"ICMP: error receiving for domain: {} and address: {} failed: {:?}, Trying again later",
domain_name, &dest, e
);
}
}; };
match dest { match dest {
IpAddr::V4(dest) => { IpAddr::V4(dest) => {
@ -191,7 +199,8 @@ pub fn start_echo_loop(
info!("ICMP: Discarding sequence {}", sequence); info!("ICMP: Discarding sequence {}", sequence);
return Some(()); return Some(());
} }
let elapsed = Instant::now().sub(send_time.clone()).as_millis(); let elapsed =
Instant::now().sub(send_time.clone()).as_micros() as f64 / 1000.00;
info!( info!(
"ICMP: Reply from {}: time={}ms, seq={}", "ICMP: Reply from {}: time={}ms, seq={}",
dest, elapsed, sequence, dest, elapsed, sequence,
@ -199,10 +208,10 @@ pub fn start_echo_loop(
ping_counter ping_counter
.with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name})
.inc(); .inc();
if elapsed != 0 { if elapsed as i32 != 0 {
ping_latency_guage ping_latency_guage
.with(&prometheus::labels! {"domain" => domain_name}) .with(&prometheus::labels! {"domain" => domain_name})
.set(elapsed as i64); .set(elapsed);
} }
} }
p => { p => {
@ -252,7 +261,16 @@ pub fn start_echo_loop(
sequence, sequence,
payload: _, payload: _,
} => { } => {
let elapsed = Instant::now().sub(send_time.clone()).as_millis(); if sequence != seq {
info!("ICMP: Discarding sequence {}", sequence);
return Some(());
}
let elapsed =
Instant::now().sub(send_time.clone()).as_micros() as f64 / 1000.00;
info!(
"ICMP: Reply from {}: time={}ms, seq={}",
dest, elapsed, sequence,
);
info!( info!(
"ICMP: Reply from {}: time={}ms, seq={}", "ICMP: Reply from {}: time={}ms, seq={}",
dest, elapsed, sequence, dest, elapsed, sequence,
@ -260,10 +278,10 @@ pub fn start_echo_loop(
ping_counter ping_counter
.with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name})
.inc(); .inc();
if elapsed != 0 { if elapsed as i32 != 0 {
ping_latency_guage ping_latency_guage
.with(&prometheus::labels! {"domain" => domain_name}) .with(&prometheus::labels! {"domain" => domain_name})
.set(elapsed as i64); .set(elapsed);
} }
} }
_ => { _ => {

View File

@ -20,7 +20,7 @@ use gflags;
use log::{debug, error, info}; use log::{debug, error, info};
use nursery::thread; use nursery::thread;
use nursery::{Nursery, Waitable}; use nursery::{Nursery, Waitable};
use prometheus; use prometheus::{self, GaugeVec};
use prometheus::{CounterVec, Encoder, IntGaugeVec, Opts, Registry, TextEncoder}; use prometheus::{CounterVec, Encoder, IntGaugeVec, Opts, Registry, TextEncoder};
use stderrlog; use stderrlog;
use tiny_http; use tiny_http;
@ -118,7 +118,7 @@ fn main() -> anyhow::Result<()> {
) )
.unwrap(); .unwrap();
let ping_latency_vec = let ping_latency_vec =
IntGaugeVec::new(Opts::new("ping_latency", "ICMP Ping latency"), &["domain"]).unwrap(); GaugeVec::new(Opts::new("ping_latency", "ICMP Ping latency"), &["domain"]).unwrap();
let ping_counter_vec = CounterVec::new( let ping_counter_vec = CounterVec::new(
Opts::new("ping_counter", "Ping Request Counter"), Opts::new("ping_counter", "Ping Request Counter"),
&["result", "domain"], &["result", "domain"],