Trying out cancelable commands

This commit is contained in:
Jeremy Wall 2022-08-24 14:19:52 -04:00
parent 9829d0f38f
commit b58c94e9e6
8 changed files with 199 additions and 130 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

1
result Symbolic link
View File

@ -0,0 +1 @@
/nix/store/wllb7d3wx2wh2p4h8vj7y10p31f37jd8-runwhen-0.0.4

View File

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::fmt; use std::fmt;
use std::io;
use notify; use notify;
@ -37,3 +38,9 @@ impl From<notify::Error> for CommandError {
CommandError::new(format!("{}", e)) CommandError::new(format!("{}", e))
} }
} }
impl From<io::Error> for CommandError {
fn from(e: io::Error) -> CommandError {
CommandError::new(format!("IO: {}", e))
}
}

View File

@ -11,11 +11,10 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::process::{Child, Command, Stdio};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use std::process::{Command, Stdio};
use error::CommandError; use error::CommandError;
use traits::Process; use traits::Process;
@ -30,79 +29,147 @@ fn env_var_to_tuple(var: &str) -> (String, String) {
("".to_string(), "".to_string()) ("".to_string(), "".to_string())
} }
pub fn run_cmd(cmd: &str, env: &Option<Vec<&str>>) -> Result<i32, CommandError> { pub struct CancelableProcess {
let args = cmd cmd: String,
.split(' ') env: Option<Vec<String>>,
.filter(|s| !s.is_empty()) exec: Option<Command>,
.collect::<Vec<&str>>(); handle: Option<Child>,
if args.len() < 1 { }
return Err(CommandError::new("Empty command string passed in"));
} impl CancelableProcess {
let mut exec = Command::new(args[0]); pub fn new(cmd: &str, env: Option<Vec<String>>) -> Self {
if args.len() > 1 { Self {
exec.args(&args[1..]); cmd: cmd.to_string(),
} env,
exec.stdout(Stdio::inherit()); exec: None,
exec.stderr(Stdio::inherit()); handle: None,
if let &Some(ref env_vars) = env {
for var in env_vars {
let tpl = env_var_to_tuple(var);
exec.env(tpl.0, tpl.1);
} }
} }
return match exec.output() {
Ok(out) => match out.status.code() {
Some(val) => Ok(val),
None => Ok(0),
},
// TODO(jeremy): We should not swallow this error.
Err(_) => Err(CommandError::new("Error running command")),
};
}
fn is_cmd_success(cmd: &str, env: Option<Vec<&str>>) -> bool { fn create_command(cmd: &str, env: &Option<Vec<String>>) -> Result<Command, CommandError> {
match run_cmd(cmd, &env) { let args = cmd
Ok(code) => code == 0, .split(' ')
_ => false, .filter(|s| !s.is_empty())
.collect::<Vec<&str>>();
if args.len() < 1 {
return Err(CommandError::new("Empty command string passed in"));
}
let mut exec = Command::new(args[0]);
if args.len() > 1 {
exec.args(&args[1..]);
}
exec.stdout(Stdio::inherit());
exec.stderr(Stdio::inherit());
if let &Some(ref env_vars) = env {
for var in env_vars {
let tpl = env_var_to_tuple(var);
exec.env(tpl.0, tpl.1);
}
}
return Ok(exec);
}
pub fn block(&mut self) -> Result<i32, CommandError> {
if let Some(ref mut handle) = self.handle {
let code = handle.wait()?.code().unwrap_or(0);
self.exec = None;
self.handle = None;
Ok(code)
} else {
let mut exec = Self::create_command(&self.cmd, &self.env)?;
return match exec.output() {
Ok(out) => match out.status.code() {
Some(val) => Ok(val),
None => Ok(0),
},
// TODO(jeremy): We should not swallow this error.
Err(_) => Err(CommandError::new("Error running command")),
};
}
}
pub fn is_success(&mut self) -> bool {
match self.block() {
Ok(code) => code == 0,
_ => false,
}
}
pub fn check(&mut self) -> Result<Option<i32>, CommandError> {
Ok(match self.handle {
Some(ref mut h) => match h.try_wait()? {
Some(status) => Some(status.code().unwrap_or(0)),
None => Some(h.wait()?.code().unwrap_or(0)),
},
None => None,
})
}
pub fn spawn(&mut self) -> Result<(), CommandError> {
let mut exec = Self::create_command(&self.cmd, &self.env)?;
let handle = exec.spawn()?;
self.exec = Some(exec);
self.handle = Some(handle);
Ok(())
}
pub fn cancel(&mut self) -> Result<(), CommandError> {
if let Some(ref mut h) = self.handle {
h.kill()?;
}
self.exec = None;
self.handle = None;
Ok(())
}
pub fn reset(&mut self) -> Result<(), CommandError> {
self.cancel()?;
self.spawn()?;
Ok(())
} }
} }
pub struct ExecProcess<'a> { // TODO(jwall): Make these CancelableProcess instead.
test_cmd: &'a str, pub struct ExecProcess {
test_cmd: CancelableProcess,
negate: bool, negate: bool,
cmd: &'a str, cmd: CancelableProcess,
env: Option<Vec<&'a str>>,
poll: Duration, poll: Duration,
} }
impl<'a> ExecProcess<'a> { impl ExecProcess {
pub fn new( pub fn new(
test_cmd: &'a str, test_cmd: &str,
cmd: &'a str, cmd: &str,
negate: bool, negate: bool,
env: Option<Vec<&'a str>>, env: Option<Vec<String>>,
poll: Duration, poll: Duration,
) -> ExecProcess<'a> { ) -> ExecProcess {
let test_cmd = CancelableProcess::new(test_cmd, None);
let cmd = CancelableProcess::new(cmd, env);
ExecProcess { ExecProcess {
test_cmd: test_cmd, test_cmd,
negate: negate, negate,
cmd: cmd, cmd,
env: env, poll,
poll: poll, }
}
fn run_loop_step(&mut self) {
let test_result = self.test_cmd.is_success();
if (test_result && !self.negate) || (!test_result && self.negate) {
if let Err(err) = self.cmd.block() {
println!("{:?}", err)
}
} }
} }
} }
impl<'a> Process for ExecProcess<'a> { impl Process for ExecProcess {
fn run(&self) -> Result<(), CommandError> { fn run(&mut self) -> Result<(), CommandError> {
loop { loop {
// TODO(jwall): Should we set the environment the same as the other command? // TODO(jwall): Should we set the environment the same as the other command?
let test_result = is_cmd_success(self.test_cmd, None); self.run_loop_step();
if (test_result && !self.negate) || (!test_result && self.negate) {
if let Err(err) = run_cmd(self.cmd, &self.env) {
println!("{:?}", err)
}
}
thread::sleep(self.poll); thread::sleep(self.poll);
} }
} }

View File

@ -21,12 +21,12 @@ use notify::{watcher, RecursiveMode, Watcher};
use error::CommandError; use error::CommandError;
use events::WatchEventType; use events::WatchEventType;
use exec::run_cmd; use exec::CancelableProcess;
use traits::Process; use traits::Process;
pub struct FileProcess<'a> { pub struct FileProcess<'a> {
cmd: &'a str, cmd: &'a str,
env: Option<Vec<&'a str>>, env: Option<Vec<String>>,
files: Vec<&'a str>, files: Vec<&'a str>,
method: WatchEventType, method: WatchEventType,
poll: Duration, poll: Duration,
@ -35,17 +35,17 @@ pub struct FileProcess<'a> {
impl<'a> FileProcess<'a> { impl<'a> FileProcess<'a> {
pub fn new( pub fn new(
cmd: &'a str, cmd: &'a str,
env: Option<Vec<&'a str>>, env: Option<Vec<String>>,
file: Vec<&'a str>, file: Vec<&'a str>,
method: WatchEventType, method: WatchEventType,
poll: Duration, poll: Duration,
) -> FileProcess<'a> { ) -> FileProcess<'a> {
FileProcess { FileProcess {
cmd: cmd, cmd,
env: env, env,
method,
poll,
files: file, files: file,
method: method,
poll: poll,
} }
} }
} }
@ -53,7 +53,7 @@ impl<'a> FileProcess<'a> {
fn spawn_runner_thread( fn spawn_runner_thread(
lock: Arc<Mutex<bool>>, lock: Arc<Mutex<bool>>,
cmd: String, cmd: String,
env: Option<Vec<&str>>, env: Option<Vec<String>>,
poll: Duration, poll: Duration,
) { ) {
let copied_env = env.and_then(|v| { let copied_env = env.and_then(|v| {
@ -65,42 +65,46 @@ fn spawn_runner_thread(
) )
}); });
thread::spawn(move || { thread::spawn(move || {
let copied_env_refs: Option<Vec<&str>> = match copied_env { let mut exec = CancelableProcess::new(&cmd, copied_env);
Some(ref vec) => { exec.spawn().expect("Failed to start command");
let mut refs: Vec<&str> = Vec::new();
for s in vec.iter() {
refs.push(s);
}
Some(refs)
}
None => None,
};
loop { loop {
// Wait our requisit number of seconds // Wait our requisit number of seconds
thread::sleep(poll); thread::sleep(poll);
// Default to not running the command. // Default to not running the command.
match lock.lock() { if !run_loop_step(lock.clone(), &mut exec) {
Ok(mut signal) => { exec.reset().expect("Failed to start command");
if *signal {
// set signal to false so we won't trigger on the
// next loop iteration unless we recieved more events.
*signal = false;
// Run our command!
println!("exec: {}", cmd);
if let Err(err) = run_cmd(&cmd, &copied_env_refs) {
println!("{:?}", err)
}
}
}
Err(err) => {
println!("Unexpected error; {}", err);
return;
}
} }
} }
}); });
} }
fn run_loop_step(lock: Arc<Mutex<bool>>, exec: &mut CancelableProcess) -> bool {
match lock.lock() {
Ok(mut signal) => {
// We always want to check on our process each iteration of the loop.
if let Err(err) = exec.check() {
println!("{:?}", err);
return false;
}
if *signal {
// set signal to false so we won't trigger on the
// next loop iteration unless we recieved more events.
*signal = false;
// On a true signal we want to start or restart our process.
if let Err(err) = exec.reset() {
println!("{:?}", err);
return false;
}
}
return true;
}
Err(err) => {
println!("Unexpected error; {}", err);
return false;
}
}
}
fn wait_for_fs_events( fn wait_for_fs_events(
lock: Arc<Mutex<bool>>, lock: Arc<Mutex<bool>>,
method: WatchEventType, method: WatchEventType,
@ -153,7 +157,7 @@ fn wait_for_fs_events(
} }
impl<'a> Process for FileProcess<'a> { impl<'a> Process for FileProcess<'a> {
fn run(&self) -> Result<(), CommandError> { fn run(&mut self) -> Result<(), CommandError> {
// TODO(jeremy): Is this sufficent or do we want to ignore // TODO(jeremy): Is this sufficent or do we want to ignore
// any events that come in while the command is running? // any events that come in while the command is running?
let lock = Arc::new(Mutex::new(false)); let lock = Arc::new(Mutex::new(false));

View File

@ -72,12 +72,11 @@ fn main() {
if let Some(env_values) = app.values_of("env") { if let Some(env_values) = app.values_of("env") {
let mut env_vec = Vec::new(); let mut env_vec = Vec::new();
for v in env_values { for v in env_values {
env_vec.push(v); env_vec.push(v.to_string());
} }
maybe_env = Some(env_vec); maybe_env = Some(env_vec);
} }
let mut process: Option<Box<dyn Process>> = None; let mut proc: Box<dyn Process> = if let Some(matches) = app.subcommand_matches("watch") {
if let Some(matches) = app.subcommand_matches("watch") {
let file = match matches.values_of("file") { let file = match matches.values_of("file") {
Some(v) => v.collect(), Some(v) => v.collect(),
// The default is our current directory // The default is our current directory
@ -91,19 +90,16 @@ fn main() {
.get_one::<humantime::Duration>("poll") .get_one::<humantime::Duration>("poll")
.cloned() .cloned()
.unwrap_or(humantime::Duration::from_str("5s").unwrap()); .unwrap_or(humantime::Duration::from_str("5s").unwrap());
process = Some(Box::new(FileProcess::new( Box::new(FileProcess::new(cmd, maybe_env, file, method, duration))
cmd, maybe_env, file, method, duration,
)));
} else if let Some(matches) = app.subcommand_matches("timer") { } else if let Some(matches) = app.subcommand_matches("timer") {
// TODO(jwall): This should use cancelable commands.
// Unwrap because this flag is required. // Unwrap because this flag is required.
let duration = matches let duration = matches
.get_one::<humantime::Duration>("duration") .get_one::<humantime::Duration>("duration")
.expect("duration flag is required") .expect("duration flag is required")
.clone(); .clone();
let max_repeat = matches.get_one::<u32>("repeat").cloned(); let max_repeat = matches.get_one::<u32>("repeat").cloned();
process = Some(Box::new(TimerProcess::new( Box::new(TimerProcess::new(cmd, maybe_env, *duration, max_repeat))
cmd, maybe_env, *duration, max_repeat,
)));
} else if let Some(matches) = app.subcommand_matches("success") { } else if let Some(matches) = app.subcommand_matches("success") {
// unwrap because this is required. // unwrap because this is required.
let ifcmd = matches.value_of("ifcmd").expect("ifcmd flag is required"); let ifcmd = matches.value_of("ifcmd").expect("ifcmd flag is required");
@ -112,20 +108,15 @@ fn main() {
.get_one::<humantime::Duration>("poll") .get_one::<humantime::Duration>("poll")
.cloned() .cloned()
.unwrap_or(humantime::Duration::from_str("5s").unwrap()); .unwrap_or(humantime::Duration::from_str("5s").unwrap());
Some(Box::new(ExecProcess::new( Box::new(ExecProcess::new(ifcmd, cmd, negate, maybe_env, duration))
ifcmd, cmd, negate, maybe_env, duration, } else {
))); println!("You must specify a subcommand.");
} process::exit(1)
match process { };
Some(process) => match process.run() { match proc.run() {
Ok(_) => return, Ok(_) => return,
Err(err) => { Err(err) => {
println!("{0}", err); println!("{0}", err);
process::exit(1)
}
},
None => {
println!("You must specify a subcommand.");
process::exit(1) process::exit(1)
} }
} }

View File

@ -14,41 +14,40 @@
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use exec::CancelableProcess;
use error::CommandError; use error::CommandError;
use exec::run_cmd;
use traits::Process; use traits::Process;
pub struct TimerProcess<'a> { pub struct TimerProcess {
cmd: &'a str, cmd: CancelableProcess,
env: Option<Vec<&'a str>>,
poll_duration: Duration, poll_duration: Duration,
max_repeat: Option<u32>, max_repeat: Option<u32>,
} }
impl<'a> TimerProcess<'a> { impl TimerProcess {
pub fn new( pub fn new(
cmd: &'a str, cmd: &str,
env: Option<Vec<&'a str>>, env: Option<Vec<String>>,
poll_duration: Duration, poll_duration: Duration,
max_repeat: Option<u32>, max_repeat: Option<u32>,
) -> TimerProcess<'a> { ) -> TimerProcess {
let cmd = CancelableProcess::new(cmd, env);
TimerProcess { TimerProcess {
cmd: cmd, cmd,
env: env, poll_duration,
poll_duration: poll_duration, max_repeat,
max_repeat: max_repeat,
} }
} }
} }
impl<'a> Process for TimerProcess<'a> { impl Process for TimerProcess {
fn run(&self) -> Result<(), CommandError> { fn run(&mut self) -> Result<(), CommandError> {
let mut counter = 0; let mut counter = 0;
loop { loop {
if self.max_repeat.is_some() && counter >= self.max_repeat.unwrap() { if self.max_repeat.is_some() && counter >= self.max_repeat.unwrap() {
return Ok(()); return Ok(());
} }
if let Err(err) = run_cmd(self.cmd, &self.env) { if let Err(err) = self.cmd.block() {
println!("{:?}", err) println!("{:?}", err)
} }
thread::sleep(self.poll_duration); thread::sleep(self.poll_duration);

View File

@ -14,5 +14,5 @@
use error::CommandError; use error::CommandError;
pub trait Process { pub trait Process {
fn run(&self) -> Result<(), CommandError>; fn run(&mut self) -> Result<(), CommandError>;
} }