Allow configuring the use of IPv6 and correctly handle unreachable errors

This commit is contained in:
Jeremy Wall 2021-01-05 20:33:11 -05:00
parent d8c4d4d61a
commit 41f74fd637
3 changed files with 80 additions and 36 deletions

View File

@ -11,12 +11,15 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::{Arc, RwLock};
use std::time::Duration;
use crate::util;
use ekko::{Ekko, EkkoResponse}; use ekko::{Ekko, EkkoResponse};
use gflags; use gflags;
use log::{error, info}; use log::{error, info};
use prometheus::{CounterVec, IntGaugeVec}; use prometheus::{CounterVec, IntGaugeVec};
use std::sync::{Arc, RwLock};
use std::time::Duration;
gflags::define! { gflags::define! {
/// The size in bytes of the ping requests. /// The size in bytes of the ping requests.
@ -44,8 +47,9 @@ pub fn start_echo_loop(
ping_latency_guage: IntGaugeVec, ping_latency_guage: IntGaugeVec,
ping_counter: CounterVec, ping_counter: CounterVec,
) { ) {
info!("Pinging {}", domain_name); let resolved = format!("{}", util::resolve_hosts(&vec![domain_name]).unwrap().first().unwrap().unwrap());
let mut sender = Ekko::with_target(domain_name).unwrap(); info!("Attempting to ping domain {} at address: {}", domain_name, resolved);
let mut sender = Ekko::with_target(&resolved).unwrap();
loop { loop {
{ {
// Limit the scope of this lock // Limit the scope of this lock
@ -54,35 +58,52 @@ pub fn start_echo_loop(
return; return;
} }
} }
let response = sender match sender
.send_with_timeout(MAXHOPS.flag, Some(Duration::from_millis(PINGTIMEOUT.flag))) .send_with_timeout(MAXHOPS.flag, Some(Duration::from_millis(PINGTIMEOUT.flag))) {
.unwrap(); Ok(r) => match r {
match response { EkkoResponse::DestinationResponse(r) => {
EkkoResponse::DestinationResponse(r) => { info!(
info!( "ICMP: Reply from {}: time={}ms",
"ICMP: Reply from {}: time={}ms", r.address.unwrap(),
r.address.unwrap(), r.elapsed.as_millis(),
r.elapsed.as_millis(), );
); ping_counter
ping_counter .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name})
.with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) .inc();
.inc(); ping_latency_guage
ping_latency_guage .with(&prometheus::labels! {"domain" => domain_name})
.with(&prometheus::labels! {"domain" => domain_name}) .set(r.elapsed.as_millis() as i64);
.set(r.elapsed.as_millis() as i64); }
} EkkoResponse::UnreachableResponse((_, ref _code)) => {
EkkoResponse::ExceededResponse(_) => { // If we got unreachable we need to set up a new sender.
ping_counter error!("{:?}", r);
.with(&prometheus::labels! {"result" => "timedout", "domain" => domain_name}) info!("Restarting our sender");
.inc(); ping_counter
} .with(&prometheus::labels! {"result" => "unreachable", "domain" => domain_name})
_ => { .inc();
ping_counter let mut new_sender = Ekko::with_target(&resolved).unwrap();
.with(&prometheus::labels! {"result" => "err", "domain" => domain_name}) std::mem::swap(&mut sender, &mut new_sender);
.inc();
error!("{:?}", response); }
} EkkoResponse::ExceededResponse(_) => {
} ping_counter
.with(&prometheus::labels! {"result" => "timeout", "domain" => domain_name})
.inc();
}
_ => {
ping_counter
.with(&prometheus::labels! {"result" => "err", "domain" => domain_name})
.inc();
error!("{:?}", r);
}
},
Err(e) => {
ping_counter
.with(&prometheus::labels! {"result" => "err", "domain" => domain_name})
.inc();
error!("Ping send to {} failed: {:?}, Trying again later", domain_name, e);
}
};
std::thread::sleep(Duration::from_secs(3)); std::thread::sleep(Duration::from_secs(3));
} }
} }

View File

@ -14,6 +14,7 @@
use std::sync::Arc; use std::sync::Arc;
use std::sync::RwLock; use std::sync::RwLock;
use std::convert::Into;
use gflags; use gflags;
use log::{debug, error, info}; use log::{debug, error, info};
@ -133,7 +134,7 @@ fn main() -> anyhow::Result<()> {
.expect("Failed to register ping latency guage"); .expect("Failed to register ping latency guage");
r.register(Box::new(ping_counter_vec.clone())) r.register(Box::new(ping_counter_vec.clone()))
.expect("Failed to register ping counter"); .expect("Failed to register ping counter");
let stun_socket_addrs = util::resolve_addrs(&stun_servers).unwrap(); let stun_socket_addrs = util::resolve_socket_addrs(&stun_servers).unwrap();
let stun_servers = Arc::new(stun_servers); let stun_servers = Arc::new(stun_servers);
let ping_hosts = Arc::new(ping_hosts); let ping_hosts = Arc::new(ping_hosts);
@ -202,7 +203,7 @@ fn main() -> anyhow::Result<()> {
stun::start_listen_thread( stun::start_listen_thread(
domain_name, domain_name,
stop_signal, stop_signal,
s, s.into(),
stun_counter_vec_copy, stun_counter_vec_copy,
stun_latency_vec_copy, stun_latency_vec_copy,
stun_success_vec_copy, stun_success_vec_copy,

View File

@ -13,11 +13,33 @@
// limitations under the License. // limitations under the License.
use std::io; use std::io;
use std::net::IpAddr;
use std::net::{SocketAddr, ToSocketAddrs}; use std::net::{SocketAddr, ToSocketAddrs};
use log::info; use log::info;
use resolve::config::DnsConfig;
use resolve::resolver::{DnsResolver};
use gflags;
pub fn resolve_addrs<'a>(servers: &'a Vec<&str>) -> io::Result<Vec<Option<SocketAddr>>> { gflags::define! {
/// Allow IPv6 addresses for domain name lookups.
--allowIpv6: bool = false
}
pub fn resolve_hosts<'a>(servers: &'a Vec<&str>) -> io::Result<Vec<Option<IpAddr>>> {
let mut results = Vec::new();
let mut config = DnsConfig::load_default()?;
config.use_inet6 = ALLOWIPV6.flag;
let resolver = DnsResolver::new(config)?;
for name in servers.iter().cloned() {
// TODO for resolution errors return a more valid error with the domain name.
let mut iter = resolver.resolve_host(name)?;
results.push(iter.next());
}
return Ok(results);
}
pub fn resolve_socket_addrs<'a>(servers: &'a Vec<&str>) -> io::Result<Vec<Option<SocketAddr>>> {
let mut results = Vec::new(); let mut results = Vec::new();
for name in servers.iter().cloned() { for name in servers.iter().cloned() {
// TODO for resolution errors return a more valid error with the domain name. // TODO for resolution errors return a more valid error with the domain name.