diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..64ee209 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +target/ +.vscode/ \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 2ee1b18..9d0361d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,14 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::convert::From; -use std::io; -use std::net::{SocketAddr, ToSocketAddrs, UdpSocket}; use std::sync::Arc; use std::sync::RwLock; -use std::time::SystemTime; use gflags; +use log::{debug, error, info}; use nursery::thread; use nursery::{Nursery, Waitable}; use prometheus; @@ -27,85 +24,23 @@ use prometheus::{CounterVec, Encoder, IntGaugeVec, Opts, Registry, TextEncoder}; use stderrlog; use tiny_http; -use log::{debug, error, info}; +mod stun; gflags::define! { /// Print this help text. -h, --help = false } -gflags::define! { - /// Delay between lookup attempts in seconds. - --delaySecs: u64 = 60 -} - gflags::define! { /// Port to listen on for exporting variables prometheus style. --listenHost = "0.0.0.0:8080" } -gflags::define! { - /// Read timeout for the stun server udp receive - --stunRecvTimeoutSecs: u64 = 5 -} - gflags::define! { /// Enable debug logging --debug = false } -const STUN_PAYLOAD: [u8; 20] = [ - 0, 1, // Binding request - 0, 0, // Message length - 0x21, 0x12, 0xa4, 0x42, // magic - 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, -]; - -enum ConnectError { - Timeout(SystemTime), - Err(io::Error), - Incomplete, -} - -impl From for ConnectError { - fn from(e: io::Error) -> ConnectError { - if let io::ErrorKind::TimedOut = e.kind() { - return ConnectError::Timeout(SystemTime::now()); - } else { - return ConnectError::Err(e); - } - } -} - -fn resolve_addrs(servers: &Vec<&str>) -> io::Result> { - let mut results = Vec::new(); - for name in servers.iter().cloned() { - // TODO for resolution errors return a more valid error with the domain name. - match name.to_socket_addrs() { - Ok(addr) => results.extend(addr), - Err(e) => info!("Failed to resolve {} with error {}", name, e), - } - } - return Ok(results); -} - -fn attempt_stun_connect(addr: SocketAddr) -> Result { - // We let the OS choose the port by specifying 0 - let local_socket = UdpSocket::bind("0.0.0.0:0")?; - local_socket.connect(addr)?; - local_socket.set_read_timeout(Some(std::time::Duration::from_secs( - STUNRECVTIMEOUTSECS.flag, - )))?; - let _sent = local_socket.send(&STUN_PAYLOAD)?; - // TODO what if we didn't send the whole packet? - let mut buf = [0 as u8; 1024]; - let rcvd = local_socket.recv(&mut buf)?; - if rcvd == 0 { - return Err(ConnectError::Incomplete); - } - Ok(SystemTime::now()) -} - fn main() -> anyhow::Result<()> { let default_stun_servers: Vec<&'static str> = vec![ "stun.l.google.com:19302", @@ -173,7 +108,7 @@ fn main() -> anyhow::Result<()> { .expect("Failed to register stun latency guage"); r.register(Box::new(stun_success_vec.clone())) .expect("Failed to register stun success gauge"); - let socket_addrs = resolve_addrs(&stun_servers).unwrap(); + let socket_addrs = stun::resolve_addrs(&stun_servers).unwrap(); let stun_servers = Arc::new(stun_servers); let mut parent = Nursery::new(); @@ -228,72 +163,19 @@ fn main() -> anyhow::Result<()> { let domain_name = *stun_servers_copy.get(i).unwrap(); let stop_signal = stop_signal.clone(); let connect_thread = thread::Pending::new(move || { - debug!("started thread for {}", domain_name); - loop { - { - // Limit the scope of this lock - if *stop_signal.read().unwrap() { - info!("Stopping thread for {}", domain_name); - return; - } - } - let now = SystemTime::now(); - info!("Attempting to connect to {}", domain_name); - match attempt_stun_connect(s) { - Ok(finish_time) => { - info!("Success! connecting to {}", domain_name); - stun_counter_vec_copy - .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) - .inc(); - stun_latency_vec_copy - .with(&prometheus::labels! {"domain" => domain_name}) - // Technically this could be lossy but we'll chance it anyway. - .set(finish_time.duration_since(now).unwrap().as_millis() as i64); - stun_success_vec_copy - .with(&prometheus::labels! {"domain" => domain_name}) - .set(1); - } - Err(ConnectError::Timeout(finish_time)) => { - info!( - "Stun connection to {} timedout after {} millis", - domain_name, - finish_time.duration_since(now).unwrap().as_millis() - ); - stun_counter_vec_copy - .with(&prometheus::labels! {"result" => "timeout", "domain" => domain_name}) - .inc(); - stun_success_vec_copy - .with(&prometheus::labels! {"domain" => domain_name}) - .set(0); - } - Err(ConnectError::Err(e)) => { - error!("Error connecting to {}: {}", domain_name, e); - stun_counter_vec_copy - .with(&prometheus::labels! {"result" => "err", "domain" => domain_name}) - .inc(); - stun_success_vec_copy - .with(&prometheus::labels! {"domain" => domain_name}) - .set(0); - } - Err(ConnectError::Incomplete) => { - error!("Connection to {} was incomplete", domain_name); - stun_counter_vec_copy - .with(&prometheus::labels! {"result" => "incomplete", "domain" => domain_name}) - .inc(); - stun_success_vec_copy - .with(&prometheus::labels! {"domain" => domain_name}) - .set(0); - } - } - - // Then we wait for some period of time. - std::thread::sleep(std::time::Duration::from_secs(DELAYSECS.flag)) - } + stun::start_listen_thread( + domain_name, + stop_signal, + s, + stun_counter_vec_copy, + stun_latency_vec_copy, + stun_success_vec_copy, + ) }); parent.schedule(Box::new(connect_thread)); // Spread the probe threads out so they're somewhat uniformly distributed. std::thread::sleep(std::time::Duration::from_micros( - DELAYSECS.flag * 1000000 / (socket_addrs.len() as u64), + stun::delay_secs() * 1000000 / (socket_addrs.len() as u64), )) } // Blocks forever diff --git a/src/stun.rs b/src/stun.rs new file mode 100644 index 0000000..8f4cc6d --- /dev/null +++ b/src/stun.rs @@ -0,0 +1,162 @@ +// Copyright 2020 Jeremy Wall +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::convert::From; +use std::io; +use std::net::{SocketAddr, ToSocketAddrs, UdpSocket}; +use std::sync::Arc; +use std::sync::RwLock; +use std::time::SystemTime; + +use log::{debug, error, info}; +use prometheus::{CounterVec, IntGaugeVec}; + +use gflags; + +gflags::define! { + /// Read timeout for the stun server udp receive + --stunRecvTimeoutSecs: u64 = 5 +} + +gflags::define! { + /// Delay between lookup attempts in seconds. + --delaySecs: u64 = 60 +} + +const STUN_PAYLOAD: [u8; 20] = [ + 0, 1, // Binding request + 0, 0, // Message length + 0x21, 0x12, 0xa4, 0x42, // magic + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, +]; + +enum ConnectError { + Timeout(SystemTime), + Err(io::Error), + Incomplete, +} + +impl From for ConnectError { + fn from(e: io::Error) -> ConnectError { + if let io::ErrorKind::TimedOut = e.kind() { + return ConnectError::Timeout(SystemTime::now()); + } else { + return ConnectError::Err(e); + } + } +} + +pub fn resolve_addrs(servers: &Vec<&str>) -> io::Result> { + let mut results = Vec::new(); + for name in servers.iter().cloned() { + // TODO for resolution errors return a more valid error with the domain name. + match name.to_socket_addrs() { + Ok(addr) => results.extend(addr), + Err(e) => info!("Failed to resolve {} with error {}", name, e), + } + } + return Ok(results); +} + +fn attempt_stun_connect(addr: SocketAddr) -> Result { + // We let the OS choose the port by specifying 0 + let local_socket = UdpSocket::bind("0.0.0.0:0")?; + local_socket.connect(addr)?; + local_socket.set_read_timeout(Some(std::time::Duration::from_secs( + STUNRECVTIMEOUTSECS.flag, + )))?; + let _sent = local_socket.send(&STUN_PAYLOAD)?; + // TODO what if we didn't send the whole packet? + let mut buf = [0 as u8; 1024]; + let rcvd = local_socket.recv(&mut buf)?; + if rcvd == 0 { + return Err(ConnectError::Incomplete); + } + Ok(SystemTime::now()) +} + +pub fn start_listen_thread( + domain_name: &str, + stop_signal: Arc>, + s: SocketAddr, + stun_counter_vec_copy: CounterVec, + stun_latency_vec_copy: IntGaugeVec, + stun_success_vec_copy: IntGaugeVec, +) { + debug!("started thread for {}", domain_name); + loop { + { + // Limit the scope of this lock + if *stop_signal.read().unwrap() { + info!("Stopping thread for {}", domain_name); + return; + } + } + let now = SystemTime::now(); + info!("Attempting to connect to {}", domain_name); + match attempt_stun_connect(s) { + Ok(finish_time) => { + info!("Success! connecting to {}", domain_name); + stun_counter_vec_copy + .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) + .inc(); + stun_latency_vec_copy + .with(&prometheus::labels! {"domain" => domain_name}) + // Technically this could be lossy but we'll chance it anyway. + .set(finish_time.duration_since(now).unwrap().as_millis() as i64); + stun_success_vec_copy + .with(&prometheus::labels! {"domain" => domain_name}) + .set(1); + } + Err(ConnectError::Timeout(finish_time)) => { + info!( + "Stun connection to {} timedout after {} millis", + domain_name, + finish_time.duration_since(now).unwrap().as_millis() + ); + stun_counter_vec_copy + .with(&prometheus::labels! {"result" => "timeout", "domain" => domain_name}) + .inc(); + stun_success_vec_copy + .with(&prometheus::labels! {"domain" => domain_name}) + .set(0); + } + Err(ConnectError::Err(e)) => { + error!("Error connecting to {}: {}", domain_name, e); + stun_counter_vec_copy + .with(&prometheus::labels! {"result" => "err", "domain" => domain_name}) + .inc(); + stun_success_vec_copy + .with(&prometheus::labels! {"domain" => domain_name}) + .set(0); + } + Err(ConnectError::Incomplete) => { + error!("Connection to {} was incomplete", domain_name); + stun_counter_vec_copy + .with(&prometheus::labels! {"result" => "incomplete", "domain" => domain_name}) + .inc(); + stun_success_vec_copy + .with(&prometheus::labels! {"domain" => domain_name}) + .set(0); + } + } + + // Then we wait for some period of time. + std::thread::sleep(std::time::Duration::from_secs(DELAYSECS.flag)) + } +} + +pub fn delay_secs() -> u64 { + DELAYSECS.flag +}