Restarts work properly now

This commit is contained in:
Jeremy Wall 2023-08-14 18:56:34 -04:00
parent 5cfed8963f
commit 18c964138a
3 changed files with 69 additions and 78 deletions

View File

@ -97,6 +97,7 @@ impl CancelableProcess {
pub fn check(&mut self) -> Result<Option<i32>, CommandError> { pub fn check(&mut self) -> Result<Option<i32>, CommandError> {
Ok(match self.handle { Ok(match self.handle {
// TODO(jwall): This appears to block the thread despite the documenation. Figure out if this is fixable or not.
Some(ref mut h) => match h.try_wait()? { Some(ref mut h) => match h.try_wait()? {
Some(status) => Some(status.code().unwrap_or(0)), Some(status) => Some(status.code().unwrap_or(0)),
None => Some(h.wait()?.code().unwrap_or(0)), None => Some(h.wait()?.code().unwrap_or(0)),
@ -115,8 +116,9 @@ impl CancelableProcess {
pub fn cancel(&mut self) -> Result<(), CommandError> { pub fn cancel(&mut self) -> Result<(), CommandError> {
if let Some(ref mut h) = self.handle { if let Some(ref mut h) = self.handle {
h.kill()?; let _ = h.kill();
} }
self.exec = None; self.exec = None;
self.handle = None; self.handle = None;
Ok(()) Ok(())

View File

@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::path::Path; use std::path::Path;
use std::sync::mpsc::channel; use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
@ -29,7 +28,7 @@ pub struct FileProcess<'a> {
env: Option<Vec<String>>, env: Option<Vec<String>>,
files: Vec<&'a str>, files: Vec<&'a str>,
method: WatchEventType, method: WatchEventType,
poll: Duration, poll: Option<Duration>,
} }
impl<'a> FileProcess<'a> { impl<'a> FileProcess<'a> {
@ -38,7 +37,7 @@ impl<'a> FileProcess<'a> {
env: Option<Vec<String>>, env: Option<Vec<String>>,
file: Vec<&'a str>, file: Vec<&'a str>,
method: WatchEventType, method: WatchEventType,
poll: Duration, poll: Option<Duration>,
) -> FileProcess<'a> { ) -> FileProcess<'a> {
FileProcess { FileProcess {
cmd, cmd,
@ -50,11 +49,11 @@ impl<'a> FileProcess<'a> {
} }
} }
fn spawn_runner_thread( fn watch_for_change_events(
lock: Arc<Mutex<bool>>, ch: Receiver<()>,
cmd: String, cmd: String,
env: Option<Vec<String>>, env: Option<Vec<String>>,
poll: Duration, poll: Option<Duration>,
) { ) {
let copied_env = env.and_then(|v| { let copied_env = env.and_then(|v| {
Some( Some(
@ -64,56 +63,48 @@ fn spawn_runner_thread(
.collect::<Vec<String>>(), .collect::<Vec<String>>(),
) )
}); });
thread::spawn(move || { let mut exec = CancelableProcess::new(&cmd, copied_env);
let mut exec = CancelableProcess::new(&cmd, copied_env); println!("Spawning command");
exec.spawn().expect("Failed to start command"); exec.spawn().expect("Failed to start command");
loop { println!("Starting watch loop");
// Wait our requisit number of seconds loop {
thread::sleep(poll); // Wait our requisit number of seconds
// Default to not running the command. if let Some(poll) = poll {
if !run_loop_step(lock.clone(), &mut exec) { thread::sleep(dbg!(poll));
exec.reset().expect("Failed to start command");
}
} }
}); //if let Err(err) = exec.check() {
} // println!("Error running command! {}", err);
// println!("Continuing");
fn run_loop_step(lock: Arc<Mutex<bool>>, exec: &mut CancelableProcess) -> bool { //};
match lock.lock() { // Default to not running the command.
Ok(mut signal) => { if !run_loop_step(&ch, &mut exec) {
// We always want to check on our process each iteration of the loop. println!("Failed to start command");
if let Err(err) = exec.check() {
println!("{:?}", err);
return false;
}
if *signal {
// set signal to false so we won't trigger on the
// next loop iteration unless we recieved more events.
*signal = false;
// On a true signal we want to start or restart our process.
if let Err(err) = exec.reset() {
println!("{:?}", err);
return false;
}
}
return true;
}
Err(err) => {
println!("Unexpected error; {}", err);
return false;
} }
} }
} }
fn run_loop_step(ch: &Receiver<()>, exec: &mut CancelableProcess) -> bool {
let _ = ch.recv().unwrap();
// We always want to check on our process each iteration of the loop.
// set signal to false so we won't trigger on the
// next loop iteration unless we recieved more events.
// On a true signal we want to start or restart our process.
println!("Restarting process");
if let Err(err) = exec.reset() {
println!("{:?}", err);
return false;
}
return true;
}
fn wait_for_fs_events( fn wait_for_fs_events(
lock: Arc<Mutex<bool>>, ch: Sender<()>,
method: WatchEventType, method: WatchEventType,
files: &Vec<&str>, files: &Vec<&str>,
) -> Result<(), CommandError> { ) -> Result<(), CommandError> {
// Notify requires a channel for communication. // Notify requires a channel for communication.
let (tx, rx) = channel(); let (tx, rx) = channel();
let mut watcher = watcher(tx, Duration::from_secs(1))?; let mut watcher = watcher(tx, Duration::from_secs(1))?;
// TODO(jwall): Better error handling.
for file in files { for file in files {
// NOTE(jwall): this is necessary because notify::fsEventWatcher panics // NOTE(jwall): this is necessary because notify::fsEventWatcher panics
// if the path doesn't exist. :-( // if the path doesn't exist. :-(
@ -128,30 +119,24 @@ fn wait_for_fs_events(
loop { loop {
let evt: WatchEventType = match rx.recv() { let evt: WatchEventType = match rx.recv() {
Ok(event) => WatchEventType::from(event), Ok(event) => WatchEventType::from(event),
Err(_) => WatchEventType::Error, Err(e) => {
println!("Watch Error: {}", e);
WatchEventType::Error
}
}; };
match evt { match evt {
WatchEventType::Ignore => { WatchEventType::Ignore | WatchEventType::Error => {
// We ignore this one. // We ignore these.
} //println!("Event: Ignore");
WatchEventType::Error => {
// We log this one.
} }
WatchEventType::Touched => { WatchEventType::Touched => {
if method == WatchEventType::Touched { if method == WatchEventType::Touched {
let mut signal = lock.lock().unwrap(); ch.send(()).unwrap();
*signal = true;
} else {
println!("Ignoring touched event");
} }
} }
WatchEventType::Changed => match lock.lock() { WatchEventType::Changed => {
Ok(mut signal) => *signal = true, ch.send(()).unwrap();
Err(err) => { }
println!("Unexpected error; {}", err);
return Ok(());
}
},
} }
} }
} }
@ -160,13 +145,16 @@ impl<'a> Process for FileProcess<'a> {
fn run(&mut self) -> Result<(), CommandError> { fn run(&mut self) -> Result<(), CommandError> {
// TODO(jeremy): Is this sufficent or do we want to ignore // TODO(jeremy): Is this sufficent or do we want to ignore
// any events that come in while the command is running? // any events that come in while the command is running?
let lock = Arc::new(Mutex::new(false)); let (tx, rx) = channel();
spawn_runner_thread( thread::spawn({
lock.clone(), let cmd = self.cmd.to_string();
self.cmd.to_string(), let env = self.env.clone();
self.env.clone(), let poll = self.poll.clone();
self.poll, move || {
); watch_for_change_events(rx, cmd, env, poll);
wait_for_fs_events(lock, self.method.clone(), &self.files) }
});
wait_for_fs_events(tx, self.method.clone(), &self.files)?;
Ok(())
} }
} }

View File

@ -17,7 +17,7 @@ extern crate clap;
extern crate humantime; extern crate humantime;
extern crate notify; extern crate notify;
use std::{path::PathBuf, process, str::FromStr}; use std::{process, str::FromStr};
mod error; mod error;
mod events; mod events;
@ -44,8 +44,8 @@ fn do_flags() -> clap::ArgMatches {
clap::Command::new("watch") clap::Command::new("watch")
.about("Trigger that fires when a file or directory changes.") .about("Trigger that fires when a file or directory changes.")
.arg( .arg(
arg!(-f --file) arg!(-f --file).name("file")
.takes_value(true).value_parser(value_parser!(PathBuf)).help("File or directory to watch for changes"), .takes_value(true).help("File or directory to watch for changes"),
) )
.arg(arg!(--touch).name("filetouch").help("Use file or directory timestamps to monitor for changes.")) .arg(arg!(--touch).name("filetouch").help("Use file or directory timestamps to monitor for changes."))
.arg(arg!(--poll).value_parser(value_parser!(humantime::Duration)).help("Duration of time between polls"))) .arg(arg!(--poll).value_parser(value_parser!(humantime::Duration)).help("Duration of time between polls")))
@ -75,6 +75,7 @@ fn main() {
} }
maybe_env = Some(env_vec); maybe_env = Some(env_vec);
} }
let mut proc: Box<dyn Process> = if let Some(matches) = app.subcommand_matches("watch") { let mut proc: Box<dyn Process> = if let Some(matches) = app.subcommand_matches("watch") {
let file = match matches.values_of("file") { let file = match matches.values_of("file") {
Some(v) => v.collect(), Some(v) => v.collect(),
@ -85,10 +86,10 @@ fn main() {
if matches.is_present("filetouch") { if matches.is_present("filetouch") {
method = WatchEventType::Touched; method = WatchEventType::Touched;
} }
let duration = *matches let duration = match matches.get_one::<humantime::Duration>("poll") {
.get_one::<humantime::Duration>("poll") Some(d) => Some((*d).into()),
.cloned() None => None,
.unwrap_or(humantime::Duration::from_str("5s").unwrap()); };
Box::new(FileProcess::new(cmd, maybe_env, file, method, duration)) Box::new(FileProcess::new(cmd, maybe_env, file, method, duration))
} else if let Some(matches) = app.subcommand_matches("timer") { } else if let Some(matches) = app.subcommand_matches("timer") {
// TODO(jwall): This should use cancelable commands. // TODO(jwall): This should use cancelable commands.