From 41f74fd637cd89aea051d8f6f45cf62ef4240806 Mon Sep 17 00:00:00 2001 From: Jeremy Wall Date: Tue, 5 Jan 2021 20:33:11 -0500 Subject: [PATCH] Allow configuring the use of IPv6 and correctly handle unreachable errors --- src/icmp.rs | 87 +++++++++++++++++++++++++++++++++-------------------- src/main.rs | 5 +-- src/util.rs | 24 ++++++++++++++- 3 files changed, 80 insertions(+), 36 deletions(-) diff --git a/src/icmp.rs b/src/icmp.rs index 8c053f5..88d1256 100644 --- a/src/icmp.rs +++ b/src/icmp.rs @@ -11,12 +11,15 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::{Arc, RwLock}; +use std::time::Duration; + +use crate::util; + use ekko::{Ekko, EkkoResponse}; use gflags; use log::{error, info}; use prometheus::{CounterVec, IntGaugeVec}; -use std::sync::{Arc, RwLock}; -use std::time::Duration; gflags::define! { /// The size in bytes of the ping requests. @@ -44,8 +47,9 @@ pub fn start_echo_loop( ping_latency_guage: IntGaugeVec, ping_counter: CounterVec, ) { - info!("Pinging {}", domain_name); - let mut sender = Ekko::with_target(domain_name).unwrap(); + let resolved = format!("{}", util::resolve_hosts(&vec![domain_name]).unwrap().first().unwrap().unwrap()); + info!("Attempting to ping domain {} at address: {}", domain_name, resolved); + let mut sender = Ekko::with_target(&resolved).unwrap(); loop { { // Limit the scope of this lock @@ -54,35 +58,52 @@ pub fn start_echo_loop( return; } } - let response = sender - .send_with_timeout(MAXHOPS.flag, Some(Duration::from_millis(PINGTIMEOUT.flag))) - .unwrap(); - match response { - EkkoResponse::DestinationResponse(r) => { - info!( - "ICMP: Reply from {}: time={}ms", - r.address.unwrap(), - r.elapsed.as_millis(), - ); - ping_counter - .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) - .inc(); - ping_latency_guage - .with(&prometheus::labels! {"domain" => domain_name}) - .set(r.elapsed.as_millis() as i64); - } - EkkoResponse::ExceededResponse(_) => { - ping_counter - .with(&prometheus::labels! {"result" => "timedout", "domain" => domain_name}) - .inc(); - } - _ => { - ping_counter - .with(&prometheus::labels! {"result" => "err", "domain" => domain_name}) - .inc(); - error!("{:?}", response); - } - } + match sender + .send_with_timeout(MAXHOPS.flag, Some(Duration::from_millis(PINGTIMEOUT.flag))) { + Ok(r) => match r { + EkkoResponse::DestinationResponse(r) => { + info!( + "ICMP: Reply from {}: time={}ms", + r.address.unwrap(), + r.elapsed.as_millis(), + ); + ping_counter + .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) + .inc(); + ping_latency_guage + .with(&prometheus::labels! {"domain" => domain_name}) + .set(r.elapsed.as_millis() as i64); + } + EkkoResponse::UnreachableResponse((_, ref _code)) => { + // If we got unreachable we need to set up a new sender. + error!("{:?}", r); + info!("Restarting our sender"); + ping_counter + .with(&prometheus::labels! {"result" => "unreachable", "domain" => domain_name}) + .inc(); + let mut new_sender = Ekko::with_target(&resolved).unwrap(); + std::mem::swap(&mut sender, &mut new_sender); + + } + EkkoResponse::ExceededResponse(_) => { + ping_counter + .with(&prometheus::labels! {"result" => "timeout", "domain" => domain_name}) + .inc(); + } + _ => { + ping_counter + .with(&prometheus::labels! {"result" => "err", "domain" => domain_name}) + .inc(); + error!("{:?}", r); + } + }, + Err(e) => { + ping_counter + .with(&prometheus::labels! {"result" => "err", "domain" => domain_name}) + .inc(); + error!("Ping send to {} failed: {:?}, Trying again later", domain_name, e); + } + }; std::thread::sleep(Duration::from_secs(3)); } } diff --git a/src/main.rs b/src/main.rs index aba99f9..a5fecc3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ use std::sync::Arc; use std::sync::RwLock; +use std::convert::Into; use gflags; use log::{debug, error, info}; @@ -133,7 +134,7 @@ fn main() -> anyhow::Result<()> { .expect("Failed to register ping latency guage"); r.register(Box::new(ping_counter_vec.clone())) .expect("Failed to register ping counter"); - let stun_socket_addrs = util::resolve_addrs(&stun_servers).unwrap(); + let stun_socket_addrs = util::resolve_socket_addrs(&stun_servers).unwrap(); let stun_servers = Arc::new(stun_servers); let ping_hosts = Arc::new(ping_hosts); @@ -202,7 +203,7 @@ fn main() -> anyhow::Result<()> { stun::start_listen_thread( domain_name, stop_signal, - s, + s.into(), stun_counter_vec_copy, stun_latency_vec_copy, stun_success_vec_copy, diff --git a/src/util.rs b/src/util.rs index 3db6cb4..5bdfd53 100644 --- a/src/util.rs +++ b/src/util.rs @@ -13,11 +13,33 @@ // limitations under the License. use std::io; +use std::net::IpAddr; use std::net::{SocketAddr, ToSocketAddrs}; use log::info; +use resolve::config::DnsConfig; +use resolve::resolver::{DnsResolver}; +use gflags; -pub fn resolve_addrs<'a>(servers: &'a Vec<&str>) -> io::Result>> { +gflags::define! { + /// Allow IPv6 addresses for domain name lookups. + --allowIpv6: bool = false +} + +pub fn resolve_hosts<'a>(servers: &'a Vec<&str>) -> io::Result>> { + let mut results = Vec::new(); + let mut config = DnsConfig::load_default()?; + config.use_inet6 = ALLOWIPV6.flag; + let resolver = DnsResolver::new(config)?; + for name in servers.iter().cloned() { + // TODO for resolution errors return a more valid error with the domain name. + let mut iter = resolver.resolve_host(name)?; + results.push(iter.next()); + } + return Ok(results); +} + +pub fn resolve_socket_addrs<'a>(servers: &'a Vec<&str>) -> io::Result>> { let mut results = Vec::new(); for name in servers.iter().cloned() { // TODO for resolution errors return a more valid error with the domain name.