durnitisp/src/main.rs

197 lines
6.9 KiB
Rust
Raw Normal View History

use std::convert::From;
2020-06-30 19:27:17 -05:00
use std::io;
use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
use std::sync::Arc;
use std::time::SystemTime;
use gflags;
use nursery::thread;
use nursery::{Nursery, Waitable};
#[macro_use]
use prometheus;
use prometheus::{CounterVec, Encoder, IntGaugeVec, Opts, Registry, TextEncoder};
2020-06-30 19:27:17 -05:00
gflags::define! {
/// Print this help text.
-h, --help = false
}
gflags::define! {
/// Delay between lookup attempts in seconds.
--delaySecs: u64 = 60
2020-06-30 19:27:17 -05:00
}
gflags::define! {
/// Port to listen on for exporting variables prometheus style.
--listenPort: u32 = 0
2020-06-30 19:27:17 -05:00
}
gflags::define! {
/// Retry dns infinitenly until we resolve.
--dnsRetryInfinite = false
}
gflags::define! {
/// Read timeout for the stun server udp receive
--stunRecvTimeoutSecs: u64 = 5
}
const stun_payload: [u8; 20] = [
0, 1, // Binding request
0, 0, // Message length
0x21, 0x12, 0xa4, 0x42, // magic
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
];
enum ConnectError {
Timeout(SystemTime),
Err(io::Error),
Incomplete,
}
impl From<io::Error> for ConnectError {
fn from(e: io::Error) -> ConnectError {
if let io::ErrorKind::TimedOut = e.kind() {
return ConnectError::Timeout(SystemTime::now());
} else {
return ConnectError::Err(e);
}
}
}
fn resolve_addrs(servers: &Vec<&str>) -> io::Result<Vec<SocketAddr>> {
2020-06-30 19:27:17 -05:00
let mut results = Vec::new();
for name in servers.iter().cloned() {
// TODO for resolution errors return a more valid error with the domain name.
2020-06-30 19:27:17 -05:00
results.extend(name.to_socket_addrs()?);
}
return Ok(results);
}
fn attempt_stun_connect(addr: SocketAddr) -> Result<SystemTime, ConnectError> {
// We let the OS choose the port by specifying 0
let local_socket = UdpSocket::bind("0.0.0.0:0")?;
local_socket.connect(addr)?;
local_socket.set_read_timeout(Some(std::time::Duration::from_secs(
STUNRECVTIMEOUTSECS.flag,
)))?;
let sent = local_socket.send(&stun_payload)?;
// TODO what if we didn't send the whole packet?
let mut buf = [0 as u8; 1024];
let rcvd = local_socket.recv(&mut buf)?;
if rcvd == 0 {
return Err(ConnectError::Incomplete);
}
Ok(SystemTime::now())
}
2020-06-30 19:27:17 -05:00
fn main() {
let default_stun_servers: Vec<&'static str> = vec![
"stun.l.google.com:19302",
"stun.ekiga.net:3478",
"stun.xten.com:3478",
"stun.softjoys.com:3478",
"stun1.noc.ams-ix.net:3478",
];
let mut stun_servers = gflags::parse();
if HELP.flag {
// TODO print better help than this.
gflags::print_help_and_exit(0);
}
if stun_servers.is_empty() {
stun_servers = default_stun_servers;
}
let counter_opts = Opts::new(
"stun_attempt_counter",
"Counter for the good, bad, and total attempts to connect to stun server.",
);
let gauge_opts = Opts::new(
"stun_attempt_latency_ms",
"Latency guage in millis per stun domain.",
);
// Create a Registry and register metrics.
let r = Registry::new();
//r.register(Box::new(counter.clone())).unwrap();
let stun_counter_vec = CounterVec::new(counter_opts, &["result", "domain"]).unwrap();
r.register(Box::new(stun_counter_vec.clone()))
.expect("Failed to register stun connection counter");
let stun_latency_vec = IntGaugeVec::new(gauge_opts, &["domain"]).unwrap();
r.register(Box::new(stun_latency_vec.clone()))
.expect("Failed to register stun latency guage");
let socket_addrs = resolve_addrs(&stun_servers).unwrap();
let stun_servers = Arc::new(stun_servers);
// TODO We need some accounting here
// first we attempt connections to each server.
let mut parent = Nursery::new();
for (i, s) in socket_addrs.iter().enumerate() {
let stun_servers_copy = stun_servers.clone();
let stun_counter_vec_copy = stun_counter_vec.clone();
let stun_latency_vec_copy = stun_latency_vec.clone();
let s = s.clone();
let now = SystemTime::now();
let domain_name = *stun_servers_copy.get(i).unwrap();
let connect_thread = thread::Pending::new(move || {
loop {
eprintln!("Attempting to connect to {}", domain_name);
match attempt_stun_connect(s) {
Ok(finish_time) => {
eprintln!("Success! connecting to {}", domain_name);
stun_counter_vec_copy
.with(&prometheus::labels! {"result" => "ok", "domain" => domain_name})
.inc();
stun_latency_vec_copy
.with(&prometheus::labels! {"domain" => domain_name})
// Technically this could be lossy but we'll chance it anyway.
.set(finish_time.duration_since(now).unwrap().as_millis() as i64);
}
Err(ConnectError::Timeout(finish_time)) => {
eprintln!(
"Stun connection to {} timedout after {} millis",
domain_name,
finish_time.duration_since(now).unwrap().as_millis()
);
stun_counter_vec_copy
.with(&prometheus::labels! {"result" => "timeout", "domain" => domain_name})
.inc();
}
Err(ConnectError::Err(e)) => {
eprintln!("Error connecting to {}: {}", domain_name, e);
stun_counter_vec_copy
.with(&prometheus::labels! {"result" => "err", "domain" => domain_name})
.inc();
}
Err(ConnectError::Incomplete) => {
eprintln!("Connection to {} was incomplete", domain_name);
stun_counter_vec_copy
.with(&prometheus::labels! {"result" => "incomplete", "domain" => domain_name})
.inc();
}
}
// Then we wait for some period of time.
std::thread::sleep(std::time::Duration::from_secs(DELAYSECS.flag))
}
});
parent.schedule(Box::new(connect_thread));
}
let render_thread = thread::Pending::new(move || {
loop {
let mut buffer = vec![];
// Gather the metrics.
let encoder = TextEncoder::new();
let metric_families = r.gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
// Output to the standard output.
println!("{}", String::from_utf8(buffer).unwrap());
// Then we wait for some period of time.
std::thread::sleep(std::time::Duration::from_secs(DELAYSECS.flag))
}
});
parent.schedule(Box::new(render_thread));
// Blocks forever
parent.wait();
2020-06-30 19:27:17 -05:00
}