Refactor to make room for icmp stats

This commit is contained in:
Jeremy Wall 2020-12-24 13:24:55 -05:00
parent 47e7cf8f5a
commit 5946cd7f77
3 changed files with 176 additions and 130 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
target/
.vscode/

View File

@ -12,14 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::convert::From;
use std::io;
use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
use std::sync::Arc;
use std::sync::RwLock;
use std::time::SystemTime;
use gflags;
use log::{debug, error, info};
use nursery::thread;
use nursery::{Nursery, Waitable};
use prometheus;
@ -27,85 +24,23 @@ use prometheus::{CounterVec, Encoder, IntGaugeVec, Opts, Registry, TextEncoder};
use stderrlog;
use tiny_http;
use log::{debug, error, info};
mod stun;
gflags::define! {
/// Print this help text.
-h, --help = false
}
gflags::define! {
/// Delay between lookup attempts in seconds.
--delaySecs: u64 = 60
}
gflags::define! {
/// Port to listen on for exporting variables prometheus style.
--listenHost = "0.0.0.0:8080"
}
gflags::define! {
/// Read timeout for the stun server udp receive
--stunRecvTimeoutSecs: u64 = 5
}
gflags::define! {
/// Enable debug logging
--debug = false
}
const STUN_PAYLOAD: [u8; 20] = [
0, 1, // Binding request
0, 0, // Message length
0x21, 0x12, 0xa4, 0x42, // magic
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
];
enum ConnectError {
Timeout(SystemTime),
Err(io::Error),
Incomplete,
}
impl From<io::Error> for ConnectError {
fn from(e: io::Error) -> ConnectError {
if let io::ErrorKind::TimedOut = e.kind() {
return ConnectError::Timeout(SystemTime::now());
} else {
return ConnectError::Err(e);
}
}
}
fn resolve_addrs(servers: &Vec<&str>) -> io::Result<Vec<SocketAddr>> {
let mut results = Vec::new();
for name in servers.iter().cloned() {
// TODO for resolution errors return a more valid error with the domain name.
match name.to_socket_addrs() {
Ok(addr) => results.extend(addr),
Err(e) => info!("Failed to resolve {} with error {}", name, e),
}
}
return Ok(results);
}
fn attempt_stun_connect(addr: SocketAddr) -> Result<SystemTime, ConnectError> {
// We let the OS choose the port by specifying 0
let local_socket = UdpSocket::bind("0.0.0.0:0")?;
local_socket.connect(addr)?;
local_socket.set_read_timeout(Some(std::time::Duration::from_secs(
STUNRECVTIMEOUTSECS.flag,
)))?;
let _sent = local_socket.send(&STUN_PAYLOAD)?;
// TODO what if we didn't send the whole packet?
let mut buf = [0 as u8; 1024];
let rcvd = local_socket.recv(&mut buf)?;
if rcvd == 0 {
return Err(ConnectError::Incomplete);
}
Ok(SystemTime::now())
}
fn main() -> anyhow::Result<()> {
let default_stun_servers: Vec<&'static str> = vec![
"stun.l.google.com:19302",
@ -173,7 +108,7 @@ fn main() -> anyhow::Result<()> {
.expect("Failed to register stun latency guage");
r.register(Box::new(stun_success_vec.clone()))
.expect("Failed to register stun success gauge");
let socket_addrs = resolve_addrs(&stun_servers).unwrap();
let socket_addrs = stun::resolve_addrs(&stun_servers).unwrap();
let stun_servers = Arc::new(stun_servers);
let mut parent = Nursery::new();
@ -228,72 +163,19 @@ fn main() -> anyhow::Result<()> {
let domain_name = *stun_servers_copy.get(i).unwrap();
let stop_signal = stop_signal.clone();
let connect_thread = thread::Pending::new(move || {
debug!("started thread for {}", domain_name);
loop {
{
// Limit the scope of this lock
if *stop_signal.read().unwrap() {
info!("Stopping thread for {}", domain_name);
return;
}
}
let now = SystemTime::now();
info!("Attempting to connect to {}", domain_name);
match attempt_stun_connect(s) {
Ok(finish_time) => {
info!("Success! connecting to {}", domain_name);
stun_counter_vec_copy
.with(&prometheus::labels! {"result" => "ok", "domain" => domain_name})
.inc();
stun_latency_vec_copy
.with(&prometheus::labels! {"domain" => domain_name})
// Technically this could be lossy but we'll chance it anyway.
.set(finish_time.duration_since(now).unwrap().as_millis() as i64);
stun_success_vec_copy
.with(&prometheus::labels! {"domain" => domain_name})
.set(1);
}
Err(ConnectError::Timeout(finish_time)) => {
info!(
"Stun connection to {} timedout after {} millis",
stun::start_listen_thread(
domain_name,
finish_time.duration_since(now).unwrap().as_millis()
);
stun_counter_vec_copy
.with(&prometheus::labels! {"result" => "timeout", "domain" => domain_name})
.inc();
stun_success_vec_copy
.with(&prometheus::labels! {"domain" => domain_name})
.set(0);
}
Err(ConnectError::Err(e)) => {
error!("Error connecting to {}: {}", domain_name, e);
stun_counter_vec_copy
.with(&prometheus::labels! {"result" => "err", "domain" => domain_name})
.inc();
stun_success_vec_copy
.with(&prometheus::labels! {"domain" => domain_name})
.set(0);
}
Err(ConnectError::Incomplete) => {
error!("Connection to {} was incomplete", domain_name);
stun_counter_vec_copy
.with(&prometheus::labels! {"result" => "incomplete", "domain" => domain_name})
.inc();
stun_success_vec_copy
.with(&prometheus::labels! {"domain" => domain_name})
.set(0);
}
}
// Then we wait for some period of time.
std::thread::sleep(std::time::Duration::from_secs(DELAYSECS.flag))
}
stop_signal,
s,
stun_counter_vec_copy,
stun_latency_vec_copy,
stun_success_vec_copy,
)
});
parent.schedule(Box::new(connect_thread));
// Spread the probe threads out so they're somewhat uniformly distributed.
std::thread::sleep(std::time::Duration::from_micros(
DELAYSECS.flag * 1000000 / (socket_addrs.len() as u64),
stun::delay_secs() * 1000000 / (socket_addrs.len() as u64),
))
}
// Blocks forever

162
src/stun.rs Normal file
View File

@ -0,0 +1,162 @@
// Copyright 2020 Jeremy Wall
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::convert::From;
use std::io;
use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
use std::sync::Arc;
use std::sync::RwLock;
use std::time::SystemTime;
use log::{debug, error, info};
use prometheus::{CounterVec, IntGaugeVec};
use gflags;
gflags::define! {
/// Read timeout for the stun server udp receive
--stunRecvTimeoutSecs: u64 = 5
}
gflags::define! {
/// Delay between lookup attempts in seconds.
--delaySecs: u64 = 60
}
const STUN_PAYLOAD: [u8; 20] = [
0, 1, // Binding request
0, 0, // Message length
0x21, 0x12, 0xa4, 0x42, // magic
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
];
enum ConnectError {
Timeout(SystemTime),
Err(io::Error),
Incomplete,
}
impl From<io::Error> for ConnectError {
fn from(e: io::Error) -> ConnectError {
if let io::ErrorKind::TimedOut = e.kind() {
return ConnectError::Timeout(SystemTime::now());
} else {
return ConnectError::Err(e);
}
}
}
pub fn resolve_addrs(servers: &Vec<&str>) -> io::Result<Vec<SocketAddr>> {
let mut results = Vec::new();
for name in servers.iter().cloned() {
// TODO for resolution errors return a more valid error with the domain name.
match name.to_socket_addrs() {
Ok(addr) => results.extend(addr),
Err(e) => info!("Failed to resolve {} with error {}", name, e),
}
}
return Ok(results);
}
fn attempt_stun_connect(addr: SocketAddr) -> Result<SystemTime, ConnectError> {
// We let the OS choose the port by specifying 0
let local_socket = UdpSocket::bind("0.0.0.0:0")?;
local_socket.connect(addr)?;
local_socket.set_read_timeout(Some(std::time::Duration::from_secs(
STUNRECVTIMEOUTSECS.flag,
)))?;
let _sent = local_socket.send(&STUN_PAYLOAD)?;
// TODO what if we didn't send the whole packet?
let mut buf = [0 as u8; 1024];
let rcvd = local_socket.recv(&mut buf)?;
if rcvd == 0 {
return Err(ConnectError::Incomplete);
}
Ok(SystemTime::now())
}
pub fn start_listen_thread(
domain_name: &str,
stop_signal: Arc<RwLock<bool>>,
s: SocketAddr,
stun_counter_vec_copy: CounterVec,
stun_latency_vec_copy: IntGaugeVec,
stun_success_vec_copy: IntGaugeVec,
) {
debug!("started thread for {}", domain_name);
loop {
{
// Limit the scope of this lock
if *stop_signal.read().unwrap() {
info!("Stopping thread for {}", domain_name);
return;
}
}
let now = SystemTime::now();
info!("Attempting to connect to {}", domain_name);
match attempt_stun_connect(s) {
Ok(finish_time) => {
info!("Success! connecting to {}", domain_name);
stun_counter_vec_copy
.with(&prometheus::labels! {"result" => "ok", "domain" => domain_name})
.inc();
stun_latency_vec_copy
.with(&prometheus::labels! {"domain" => domain_name})
// Technically this could be lossy but we'll chance it anyway.
.set(finish_time.duration_since(now).unwrap().as_millis() as i64);
stun_success_vec_copy
.with(&prometheus::labels! {"domain" => domain_name})
.set(1);
}
Err(ConnectError::Timeout(finish_time)) => {
info!(
"Stun connection to {} timedout after {} millis",
domain_name,
finish_time.duration_since(now).unwrap().as_millis()
);
stun_counter_vec_copy
.with(&prometheus::labels! {"result" => "timeout", "domain" => domain_name})
.inc();
stun_success_vec_copy
.with(&prometheus::labels! {"domain" => domain_name})
.set(0);
}
Err(ConnectError::Err(e)) => {
error!("Error connecting to {}: {}", domain_name, e);
stun_counter_vec_copy
.with(&prometheus::labels! {"result" => "err", "domain" => domain_name})
.inc();
stun_success_vec_copy
.with(&prometheus::labels! {"domain" => domain_name})
.set(0);
}
Err(ConnectError::Incomplete) => {
error!("Connection to {} was incomplete", domain_name);
stun_counter_vec_copy
.with(&prometheus::labels! {"result" => "incomplete", "domain" => domain_name})
.inc();
stun_success_vec_copy
.with(&prometheus::labels! {"domain" => domain_name})
.set(0);
}
}
// Then we wait for some period of time.
std::thread::sleep(std::time::Duration::from_secs(DELAYSECS.flag))
}
}
pub fn delay_secs() -> u64 {
DELAYSECS.flag
}