diff --git a/src/exec.rs b/src/exec.rs index 624f256..2d75e4f 100644 --- a/src/exec.rs +++ b/src/exec.rs @@ -97,6 +97,7 @@ impl CancelableProcess { pub fn check(&mut self) -> Result, CommandError> { Ok(match self.handle { + // TODO(jwall): This appears to block the thread despite the documenation. Figure out if this is fixable or not. Some(ref mut h) => match h.try_wait()? { Some(status) => Some(status.code().unwrap_or(0)), None => Some(h.wait()?.code().unwrap_or(0)), @@ -115,8 +116,9 @@ impl CancelableProcess { pub fn cancel(&mut self) -> Result<(), CommandError> { if let Some(ref mut h) = self.handle { - h.kill()?; + let _ = h.kill(); } + self.exec = None; self.handle = None; Ok(()) diff --git a/src/file.rs b/src/file.rs index 0f071e0..98cddbd 100644 --- a/src/file.rs +++ b/src/file.rs @@ -12,8 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. use std::path::Path; -use std::sync::mpsc::channel; -use std::sync::{Arc, Mutex}; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::thread; use std::time::Duration; @@ -29,7 +28,7 @@ pub struct FileProcess<'a> { env: Option>, files: Vec<&'a str>, method: WatchEventType, - poll: Duration, + poll: Option, } impl<'a> FileProcess<'a> { @@ -38,7 +37,7 @@ impl<'a> FileProcess<'a> { env: Option>, file: Vec<&'a str>, method: WatchEventType, - poll: Duration, + poll: Option, ) -> FileProcess<'a> { FileProcess { cmd, @@ -50,11 +49,11 @@ impl<'a> FileProcess<'a> { } } -fn spawn_runner_thread( - lock: Arc>, +fn watch_for_change_events( + ch: Receiver<()>, cmd: String, env: Option>, - poll: Duration, + poll: Option, ) { let copied_env = env.and_then(|v| { Some( @@ -64,56 +63,48 @@ fn spawn_runner_thread( .collect::>(), ) }); - thread::spawn(move || { - let mut exec = CancelableProcess::new(&cmd, copied_env); - exec.spawn().expect("Failed to start command"); - loop { - // Wait our requisit number of seconds - thread::sleep(poll); - // Default to not running the command. - if !run_loop_step(lock.clone(), &mut exec) { - exec.reset().expect("Failed to start command"); - } + let mut exec = CancelableProcess::new(&cmd, copied_env); + println!("Spawning command"); + exec.spawn().expect("Failed to start command"); + println!("Starting watch loop"); + loop { + // Wait our requisit number of seconds + if let Some(poll) = poll { + thread::sleep(dbg!(poll)); } - }); -} - -fn run_loop_step(lock: Arc>, exec: &mut CancelableProcess) -> bool { - match lock.lock() { - Ok(mut signal) => { - // We always want to check on our process each iteration of the loop. - if let Err(err) = exec.check() { - println!("{:?}", err); - return false; - } - if *signal { - // set signal to false so we won't trigger on the - // next loop iteration unless we recieved more events. - *signal = false; - // On a true signal we want to start or restart our process. - if let Err(err) = exec.reset() { - println!("{:?}", err); - return false; - } - } - return true; - } - Err(err) => { - println!("Unexpected error; {}", err); - return false; + //if let Err(err) = exec.check() { + // println!("Error running command! {}", err); + // println!("Continuing"); + //}; + // Default to not running the command. + if !run_loop_step(&ch, &mut exec) { + println!("Failed to start command"); } } } +fn run_loop_step(ch: &Receiver<()>, exec: &mut CancelableProcess) -> bool { + let _ = ch.recv().unwrap(); + // We always want to check on our process each iteration of the loop. + // set signal to false so we won't trigger on the + // next loop iteration unless we recieved more events. + // On a true signal we want to start or restart our process. + println!("Restarting process"); + if let Err(err) = exec.reset() { + println!("{:?}", err); + return false; + } + return true; +} + fn wait_for_fs_events( - lock: Arc>, + ch: Sender<()>, method: WatchEventType, files: &Vec<&str>, ) -> Result<(), CommandError> { // Notify requires a channel for communication. let (tx, rx) = channel(); let mut watcher = watcher(tx, Duration::from_secs(1))?; - // TODO(jwall): Better error handling. for file in files { // NOTE(jwall): this is necessary because notify::fsEventWatcher panics // if the path doesn't exist. :-( @@ -128,30 +119,24 @@ fn wait_for_fs_events( loop { let evt: WatchEventType = match rx.recv() { Ok(event) => WatchEventType::from(event), - Err(_) => WatchEventType::Error, + Err(e) => { + println!("Watch Error: {}", e); + WatchEventType::Error + } }; match evt { - WatchEventType::Ignore => { - // We ignore this one. - } - WatchEventType::Error => { - // We log this one. + WatchEventType::Ignore | WatchEventType::Error => { + // We ignore these. + //println!("Event: Ignore"); } WatchEventType::Touched => { if method == WatchEventType::Touched { - let mut signal = lock.lock().unwrap(); - *signal = true; - } else { - println!("Ignoring touched event"); + ch.send(()).unwrap(); } } - WatchEventType::Changed => match lock.lock() { - Ok(mut signal) => *signal = true, - Err(err) => { - println!("Unexpected error; {}", err); - return Ok(()); - } - }, + WatchEventType::Changed => { + ch.send(()).unwrap(); + } } } } @@ -160,13 +145,16 @@ impl<'a> Process for FileProcess<'a> { fn run(&mut self) -> Result<(), CommandError> { // TODO(jeremy): Is this sufficent or do we want to ignore // any events that come in while the command is running? - let lock = Arc::new(Mutex::new(false)); - spawn_runner_thread( - lock.clone(), - self.cmd.to_string(), - self.env.clone(), - self.poll, - ); - wait_for_fs_events(lock, self.method.clone(), &self.files) + let (tx, rx) = channel(); + thread::spawn({ + let cmd = self.cmd.to_string(); + let env = self.env.clone(); + let poll = self.poll.clone(); + move || { + watch_for_change_events(rx, cmd, env, poll); + } + }); + wait_for_fs_events(tx, self.method.clone(), &self.files)?; + Ok(()) } } diff --git a/src/main.rs b/src/main.rs index ed103d9..52cb647 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ extern crate clap; extern crate humantime; extern crate notify; -use std::{path::PathBuf, process, str::FromStr}; +use std::{process, str::FromStr}; mod error; mod events; @@ -44,8 +44,8 @@ fn do_flags() -> clap::ArgMatches { clap::Command::new("watch") .about("Trigger that fires when a file or directory changes.") .arg( - arg!(-f --file) - .takes_value(true).value_parser(value_parser!(PathBuf)).help("File or directory to watch for changes"), + arg!(-f --file).name("file") + .takes_value(true).help("File or directory to watch for changes"), ) .arg(arg!(--touch).name("filetouch").help("Use file or directory timestamps to monitor for changes.")) .arg(arg!(--poll).value_parser(value_parser!(humantime::Duration)).help("Duration of time between polls"))) @@ -75,6 +75,7 @@ fn main() { } maybe_env = Some(env_vec); } + let mut proc: Box = if let Some(matches) = app.subcommand_matches("watch") { let file = match matches.values_of("file") { Some(v) => v.collect(), @@ -85,10 +86,10 @@ fn main() { if matches.is_present("filetouch") { method = WatchEventType::Touched; } - let duration = *matches - .get_one::("poll") - .cloned() - .unwrap_or(humantime::Duration::from_str("5s").unwrap()); + let duration = match matches.get_one::("poll") { + Some(d) => Some((*d).into()), + None => None, + }; Box::new(FileProcess::new(cmd, maybe_env, file, method, duration)) } else if let Some(matches) = app.subcommand_matches("timer") { // TODO(jwall): This should use cancelable commands.