From 7e89db36e9d805b58bafb309d5c9d186ebfe1813 Mon Sep 17 00:00:00 2001 From: Jeremy Wall Date: Tue, 30 Jun 2020 19:54:55 -0500 Subject: [PATCH] Mostly working data collection and exporting. --- Cargo.lock | 74 +++++++++++++++++++++++- Cargo.toml | 6 +- src/main.rs | 162 ++++++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 229 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 539b16b..f568257 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6,6 +6,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87b48bbc752e97f1b6d7f237c0fd50056f19417e30b10121d4065d1459270e1d" +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "ctor" version = "0.1.15" @@ -17,12 +23,20 @@ dependencies = [ ] [[package]] -name = "damnitisp" +name = "durnitisp" version = "0.1.0" dependencies = [ "gflags", + "nursery", + "prometheus", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "gflags" version = "0.3.5" @@ -79,6 +93,18 @@ dependencies = [ "syn", ] +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "nursery" +version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4bd2d4e0cd7c6bb256afbc59a5921c3ead56f05d7696c92e05b6978858b6fa5" + [[package]] name = "proc-macro2" version = "1.0.18" @@ -88,6 +114,26 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "prometheus" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd0ced56dee39a6e960c15c74dc48849d614586db2eaada6497477af7c7811cd" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "protobuf", + "spin", + "thiserror", +] + +[[package]] +name = "protobuf" +version = "2.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4951a8253c06334be9fe320bbcf73f14949fde62a0c8128d697eec1ff0fa8cd" + [[package]] name = "quote" version = "1.0.7" @@ -117,6 +163,12 @@ dependencies = [ "syn", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "syn" version = "1.0.33" @@ -128,6 +180,26 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "thiserror" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dfdd070ccd8ccb78f4ad66bf1982dc37f620ef696c6b5028fe2ed83dd3d0d08" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "unicode-xid" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index f5baf39..2875727 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "damnitisp" +name = "durnitisp" version = "0.1.0" authors = ["Jeremy Wall "] edition = "2018" @@ -7,4 +7,6 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -gflags = "^0.3" \ No newline at end of file +gflags = "^0.3" +nursery = "^0.0.1" +prometheus = "^0.9.0" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 1b3504e..7b95100 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,15 @@ -use gflags; - +use std::convert::From; use std::io; -use std::net::{SocketAddr, ToSocketAddrs}; +use std::net::{SocketAddr, ToSocketAddrs, UdpSocket}; +use std::sync::Arc; +use std::time::SystemTime; + +use gflags; +use nursery::thread; +use nursery::{Nursery, Waitable}; +#[macro_use] +use prometheus; +use prometheus::{CounterVec, Encoder, IntGaugeVec, Opts, Registry, TextEncoder}; gflags::define! { /// Print this help text. @@ -10,12 +18,12 @@ gflags::define! { gflags::define! { /// Delay between lookup attempts in seconds. - --delaySec: i32 = 60 + --delaySecs: u64 = 60 } gflags::define! { /// Port to listen on for exporting variables prometheus style. - --listenPort: i32 = 0 + --listenPort: u32 = 0 } gflags::define! { @@ -23,20 +31,64 @@ gflags::define! { --dnsRetryInfinite = false } -fn resolveAddrs(servers: Vec<&str>) -> io::Result> { +gflags::define! { + /// Read timeout for the stun server udp receive + --stunRecvTimeoutSecs: u64 = 5 +} + +const stun_payload: [u8; 20] = [ + 0, 1, // Binding request + 0, 0, // Message length + 0x21, 0x12, 0xa4, 0x42, // magic + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, +]; + +enum ConnectError { + Timeout(SystemTime), + Err(io::Error), + Incomplete, +} + +impl From for ConnectError { + fn from(e: io::Error) -> ConnectError { + if let io::ErrorKind::TimedOut = e.kind() { + return ConnectError::Timeout(SystemTime::now()); + } else { + return ConnectError::Err(e); + } + } +} + +fn resolve_addrs(servers: &Vec<&str>) -> io::Result> { let mut results = Vec::new(); - for name in servers { - eprintln!("Resolving {}", name); + for name in servers.iter().cloned() { + // TODO for resolution errors return a more valid error with the domain name. results.extend(name.to_socket_addrs()?); } return Ok(results); } +fn attempt_stun_connect(addr: SocketAddr) -> Result { + // We let the OS choose the port by specifying 0 + let local_socket = UdpSocket::bind("0.0.0.0:0")?; + local_socket.connect(addr)?; + local_socket.set_read_timeout(Some(std::time::Duration::from_secs( + STUNRECVTIMEOUTSECS.flag, + )))?; + let sent = local_socket.send(&stun_payload)?; + // TODO what if we didn't send the whole packet? + let mut buf = [0 as u8; 1024]; + let rcvd = local_socket.recv(&mut buf)?; + if rcvd == 0 { + return Err(ConnectError::Incomplete); + } + Ok(SystemTime::now()) +} + fn main() { let default_stun_servers: Vec<&'static str> = vec![ "stun.l.google.com:19302", "stun.ekiga.net:3478", - "stunserver.org:3478", "stun.xten.com:3478", "stun.softjoys.com:3478", "stun1.noc.ams-ix.net:3478", @@ -50,5 +102,95 @@ fn main() { if stun_servers.is_empty() { stun_servers = default_stun_servers; } - let socketAddrs = resolveAddrs(stun_servers).unwrap(); + let counter_opts = Opts::new( + "stun_attempt_counter", + "Counter for the good, bad, and total attempts to connect to stun server.", + ); + let gauge_opts = Opts::new( + "stun_attempt_latency_ms", + "Latency guage in millis per stun domain.", + ); + // Create a Registry and register metrics. + let r = Registry::new(); + //r.register(Box::new(counter.clone())).unwrap(); + let stun_counter_vec = CounterVec::new(counter_opts, &["result", "domain"]).unwrap(); + r.register(Box::new(stun_counter_vec.clone())) + .expect("Failed to register stun connection counter"); + let stun_latency_vec = IntGaugeVec::new(gauge_opts, &["domain"]).unwrap(); + r.register(Box::new(stun_latency_vec.clone())) + .expect("Failed to register stun latency guage"); + let socket_addrs = resolve_addrs(&stun_servers).unwrap(); + let stun_servers = Arc::new(stun_servers); + + // TODO We need some accounting here + // first we attempt connections to each server. + let mut parent = Nursery::new(); + for (i, s) in socket_addrs.iter().enumerate() { + let stun_servers_copy = stun_servers.clone(); + let stun_counter_vec_copy = stun_counter_vec.clone(); + let stun_latency_vec_copy = stun_latency_vec.clone(); + let s = s.clone(); + let now = SystemTime::now(); + let domain_name = *stun_servers_copy.get(i).unwrap(); + let connect_thread = thread::Pending::new(move || { + loop { + eprintln!("Attempting to connect to {}", domain_name); + match attempt_stun_connect(s) { + Ok(finish_time) => { + eprintln!("Success! connecting to {}", domain_name); + stun_counter_vec_copy + .with(&prometheus::labels! {"result" => "ok", "domain" => domain_name}) + .inc(); + stun_latency_vec_copy + .with(&prometheus::labels! {"domain" => domain_name}) + // Technically this could be lossy but we'll chance it anyway. + .set(finish_time.duration_since(now).unwrap().as_millis() as i64); + } + Err(ConnectError::Timeout(finish_time)) => { + eprintln!( + "Stun connection to {} timedout after {} millis", + domain_name, + finish_time.duration_since(now).unwrap().as_millis() + ); + stun_counter_vec_copy + .with(&prometheus::labels! {"result" => "timeout", "domain" => domain_name}) + .inc(); + } + Err(ConnectError::Err(e)) => { + eprintln!("Error connecting to {}: {}", domain_name, e); + stun_counter_vec_copy + .with(&prometheus::labels! {"result" => "err", "domain" => domain_name}) + .inc(); + } + Err(ConnectError::Incomplete) => { + eprintln!("Connection to {} was incomplete", domain_name); + stun_counter_vec_copy + .with(&prometheus::labels! {"result" => "incomplete", "domain" => domain_name}) + .inc(); + } + } + + // Then we wait for some period of time. + std::thread::sleep(std::time::Duration::from_secs(DELAYSECS.flag)) + } + }); + parent.schedule(Box::new(connect_thread)); + } + let render_thread = thread::Pending::new(move || { + loop { + let mut buffer = vec![]; + // Gather the metrics. + let encoder = TextEncoder::new(); + let metric_families = r.gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + // Output to the standard output. + println!("{}", String::from_utf8(buffer).unwrap()); + // Then we wait for some period of time. + std::thread::sleep(std::time::Duration::from_secs(DELAYSECS.flag)) + } + }); + parent.schedule(Box::new(render_thread)); + // Blocks forever + parent.wait(); }