diff --git a/Cargo.lock b/Cargo.lock index 76f20b8..e70e733 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -35,12 +35,30 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" +[[package]] +name = "bitflags" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" + +[[package]] +name = "byteorder" +version = "1.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" + [[package]] name = "cfg-if" version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + [[package]] name = "chrono" version = "0.4.12" @@ -76,7 +94,9 @@ dependencies = [ "gflags", "log", "nursery", + "packet", "prometheus", + "socket2", "stderrlog", "tiny_http", ] @@ -121,6 +141,15 @@ dependencies = [ "syn", ] +[[package]] +name = "hwaddr" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e414433a9e4338f4e87fa29d0670c883a5e73e7955c45f4a49130c0aa992c85b" +dependencies = [ + "phf", +] + [[package]] name = "idna" version = "0.2.0" @@ -178,7 +207,7 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", ] [[package]] @@ -218,12 +247,42 @@ version = "0.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4bd2d4e0cd7c6bb256afbc59a5921c3ead56f05d7696c92e05b6978858b6fa5" +[[package]] +name = "packet" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c136c7ad0619ed4f88894aecf66ad86c80683e7b5d707996e6a3a7e0e3916944" +dependencies = [ + "bitflags", + "byteorder", + "hwaddr", + "thiserror", +] + [[package]] name = "percent-encoding" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "phf" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dfb61232e34fcb633f43d12c58f83c1df82962dcdfa565a4e866ffc17dafe12" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c00cf8b9eafe68dde5e9eaa2cef8ee84a9336a47d566ec55ca16589633b65af7" +dependencies = [ + "siphasher", +] + [[package]] name = "proc-macro2" version = "1.0.18" @@ -239,7 +298,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd0ced56dee39a6e960c15c74dc48849d614586db2eaada6497477af7c7811cd" dependencies = [ - "cfg-if", + "cfg-if 0.1.10", "fnv", "lazy_static 1.4.0", "protobuf", @@ -297,6 +356,23 @@ dependencies = [ "syn", ] +[[package]] +name = "siphasher" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa8f3741c7372e75519bd9346068370c9cdaabcc1f9599cbcf2a2719352286b7" + +[[package]] +name = "socket2" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "winapi", +] + [[package]] name = "spin" version = "0.5.2" diff --git a/Cargo.toml b/Cargo.toml index ea50ee1..4b80073 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,4 +13,6 @@ log = "0.4" nursery = "^0.0.1" prometheus = "^0.9.0" stderrlog = "0.4" -tiny_http = "^0.7.0" \ No newline at end of file +tiny_http = "^0.7.0" +packet = "^0.1.4" +socket2 = "^0.3.19" \ No newline at end of file diff --git a/src/icmp.rs b/src/icmp.rs new file mode 100644 index 0000000..cfc1832 --- /dev/null +++ b/src/icmp.rs @@ -0,0 +1,125 @@ +// Copyright 2020 Jeremy Wall +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use std::convert::Into; +use std::net::{IpAddr, SocketAddr}; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; + +use gflags; +use log::{debug, error, info}; +use packet::icmp::echo::{Builder, Packet}; +use packet::Builder as PBuilder; +use socket2::Domain; +use socket2::Protocol; +use socket2::SockAddr; +use socket2::Socket; + +gflags::define! { + // The size in bytes of the ping requests. + --pingPayload = "durnitisp ping test" +} + +fn make_echo_packet(ident: u16) -> Packet> { + let buffer = Builder::default() + .request() + .unwrap() + .identifier(ident) + .unwrap() + .sequence(0) + .unwrap() + .payload(PINGPAYLOAD.flag.as_bytes()) + .unwrap() + .build() + .unwrap(); + Packet::unchecked(buffer) +} + +pub fn start_echo_loop( + domain_name: &str, + stop_signal: Arc>, + addr: IpAddr, + ident: u16, +) { + info!("Starting ping of {}", domain_name); + // First we construct our icmp transport + // TODO(jwall): Timeouts. + // TODO(jwall): Handle out of order packets. + let (domain, protocol) = match addr { + IpAddr::V4(_) => (Domain::ipv4(), Protocol::icmpv4()), + IpAddr::V6(_) => (Domain::ipv6(), Protocol::icmpv6()), + }; + // Construct a socket to send the ICMP request on. + // socket type: Ip, Datagram, ICMP + let addr: SocketAddr = (addr, 0).into(); + let addr: SockAddr = addr.into(); + let socket = match Socket::new(domain, socket2::Type::raw(), Some(protocol)) { + Ok(s) => s, + Err(e) => { + error!("Unable to create socket for icmp request:\n {}", e); + return; + } + }; + + socket + .set_read_timeout(Some(Duration::from_millis(2048))) + .unwrap(); + // then we start our loop + let mut n = 0; + let mut pkt = make_echo_packet(ident); + loop { + { + // Limit the scope of this lock + if *stop_signal.read().unwrap() { + info!("Stopping ping thread for {}", domain_name); + return; + } + } + pkt.set_sequence(n).unwrap(); + // TODO(jwall): Count the errors? + // construct echo packet + let time_of_send = Instant::now(); + // send echo packet + let pkt_buf: &[u8] = pkt.as_ref(); + debug!("Sending echo request for {}", domain_name); + let sent = socket.send_to(pkt_buf, &addr).unwrap(); + if pkt_buf.len() != sent { + error!("Failed to send a complete icmp packet!"); + continue; + } + // // Wait for echo response + debug!("Waiting for echo reply from {}", domain_name); + let mut buf = vec![0; sent]; + let _rcv_size = match socket.recv(&mut buf) { + Ok(sz) => sz, + Err(e) => { + if let std::io::ErrorKind::TimedOut = e.kind() { + error!("icmp echo request timed out to {}", domain_name); + continue; + } + error!("Error recieving on icmp socket! {:?}", e); + return; + } + }; + let echo = Packet::new(&buf).unwrap(); + if echo.sequence() == n { + let round_trip_time = Instant::now().checked_duration_since(time_of_send).unwrap(); + // record this time + info!("Sequence # {} {}ms", n, round_trip_time.as_millis()); + } else { + error!("Got the wrong sequence number {}", echo.sequence()); + } + // Increment our sequence number + n += 1; + } +} diff --git a/src/main.rs b/src/main.rs index 9d0361d..371cf86 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,7 +24,9 @@ use prometheus::{CounterVec, Encoder, IntGaugeVec, Opts, Registry, TextEncoder}; use stderrlog; use tiny_http; +mod icmp; mod stun; +mod util; gflags::define! { /// Print this help text. @@ -58,6 +60,8 @@ fn main() -> anyhow::Result<()> { ]; let mut stun_servers = gflags::parse(); + let default_ping_hosts: Vec<&'static str> = vec!["google.com"]; + if HELP.flag { println!("durnitisp "); println!(""); @@ -82,6 +86,8 @@ fn main() -> anyhow::Result<()> { if stun_servers.is_empty() { stun_servers = default_stun_servers; } + // FIXME(jwall): allow them to override ping hosts + let ping_hosts = default_ping_hosts; let counter_opts = Opts::new( "stun_attempt_counter", "Counter for the good, bad, and total attempts to connect to stun server.", @@ -108,8 +114,10 @@ fn main() -> anyhow::Result<()> { .expect("Failed to register stun latency guage"); r.register(Box::new(stun_success_vec.clone())) .expect("Failed to register stun success gauge"); - let socket_addrs = stun::resolve_addrs(&stun_servers).unwrap(); + let stun_socket_addrs = util::resolve_addrs(&stun_servers).unwrap(); let stun_servers = Arc::new(stun_servers); + let ping_addrs = util::resolve_ip_addrs(&ping_hosts).unwrap(); + let ping_hosts = Arc::new(ping_hosts); let mut parent = Nursery::new(); // First we start the render thread. @@ -153,30 +161,44 @@ fn main() -> anyhow::Result<()> { }); parent.adopt(Box::new(render_thread)); } - // Then we attempt connections to each server. - for (i, s) in socket_addrs.iter().enumerate() { + for (i, addr) in ping_addrs.iter().cloned().enumerate() { + // TODO(Prometheus stats) + let ping_hosts_copy = ping_hosts.clone(); + if let Some(addr) = dbg!(addr) { + let domain_name = *ping_hosts_copy.get(i).unwrap(); + debug!("Pinging {}", domain_name); + let stop_signal = stop_signal.clone(); + let ping_thread = thread::Pending::new(move || { + icmp::start_echo_loop(domain_name, stop_signal, addr, i as u16); + }); + parent.schedule(Box::new(ping_thread)); + } + } + // Then we attempt to start connections to each stun server. + for (i, s) in stun_socket_addrs.iter().enumerate() { let stun_servers_copy = stun_servers.clone(); let stun_counter_vec_copy = stun_counter_vec.clone(); let stun_latency_vec_copy = stun_latency_vec.clone(); let stun_success_vec_copy = stun_success_vec.clone(); - let s = s.clone(); - let domain_name = *stun_servers_copy.get(i).unwrap(); - let stop_signal = stop_signal.clone(); - let connect_thread = thread::Pending::new(move || { - stun::start_listen_thread( - domain_name, - stop_signal, - s, - stun_counter_vec_copy, - stun_latency_vec_copy, - stun_success_vec_copy, - ) - }); - parent.schedule(Box::new(connect_thread)); - // Spread the probe threads out so they're somewhat uniformly distributed. - std::thread::sleep(std::time::Duration::from_micros( - stun::delay_secs() * 1000000 / (socket_addrs.len() as u64), - )) + if let Some(s) = s.clone() { + let domain_name = *stun_servers_copy.get(i).unwrap(); + let stop_signal = stop_signal.clone(); + let connect_thread = thread::Pending::new(move || { + stun::start_listen_thread( + domain_name, + stop_signal, + s, + stun_counter_vec_copy, + stun_latency_vec_copy, + stun_success_vec_copy, + ) + }); + parent.schedule(Box::new(connect_thread)); + // Spread the probe threads out so they're somewhat uniformly distributed. + std::thread::sleep(std::time::Duration::from_micros( + stun::delay_secs() * 1000000 / (stun_socket_addrs.len() as u64), + )) + }; } // Blocks forever parent.wait(); diff --git a/src/stun.rs b/src/stun.rs index 8f4cc6d..343aa78 100644 --- a/src/stun.rs +++ b/src/stun.rs @@ -12,18 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use gflags; +use log::{debug, error, info}; +use prometheus::{CounterVec, IntGaugeVec}; use std::convert::From; use std::io; -use std::net::{SocketAddr, ToSocketAddrs, UdpSocket}; +use std::net::{SocketAddr, UdpSocket}; use std::sync::Arc; use std::sync::RwLock; use std::time::SystemTime; -use log::{debug, error, info}; -use prometheus::{CounterVec, IntGaugeVec}; - -use gflags; - gflags::define! { /// Read timeout for the stun server udp receive --stunRecvTimeoutSecs: u64 = 5 @@ -57,18 +55,6 @@ impl From for ConnectError { } } -pub fn resolve_addrs(servers: &Vec<&str>) -> io::Result> { - let mut results = Vec::new(); - for name in servers.iter().cloned() { - // TODO for resolution errors return a more valid error with the domain name. - match name.to_socket_addrs() { - Ok(addr) => results.extend(addr), - Err(e) => info!("Failed to resolve {} with error {}", name, e), - } - } - return Ok(results); -} - fn attempt_stun_connect(addr: SocketAddr) -> Result { // We let the OS choose the port by specifying 0 let local_socket = UdpSocket::bind("0.0.0.0:0")?; @@ -99,7 +85,7 @@ pub fn start_listen_thread( { // Limit the scope of this lock if *stop_signal.read().unwrap() { - info!("Stopping thread for {}", domain_name); + info!("Stopping stun thread for {}", domain_name); return; } } diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..dac4eb7 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,51 @@ +// Copyright 2020 Jeremy Wall +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; + +use log::info; + +pub fn resolve_addrs<'a>(servers: &'a Vec<&str>) -> io::Result>> { + let mut results = Vec::new(); + for name in servers.iter().cloned() { + // TODO for resolution errors return a more valid error with the domain name. + match name.to_socket_addrs() { + Ok(addr) => results.push(addr.into_iter().next()), + Err(e) => { + info!("Failed to resolve {} with error {}", name, e); + results.push(None); + } + } + } + return Ok(results); +} + +pub fn resolve_ip_addrs(hosts: &Vec<&str>) -> io::Result>> { + let mut results = Vec::with_capacity(hosts.len()); + // NOTE(jwall): This is a silly hack due to the fact that the proper way + // to do host lookups in the Rust stdlib has not settled yet. + // TODO(jwall): Do this in a less hacky method once host lookups + // are settled properly. + for host in hosts.iter().cloned() { + match format!("{}:8080", host).to_socket_addrs() { + Ok(addr) => results.push(addr.into_iter().next().map(|a| a.ip())), + Err(e) => { + info!("Failed to resolve {} with error {}", host, e); + results.push(None); + } + } + } + Ok(results) +}