Compare commits

...

10 Commits

8 changed files with 140 additions and 151 deletions

9
Cargo.lock generated
View File

@ -109,6 +109,12 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]]
name = "glob"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "hashbrown"
version = "0.12.3"
@ -295,9 +301,10 @@ dependencies = [
[[package]]
name = "runwhen"
version = "0.0.5"
version = "0.0.8"
dependencies = [
"clap",
"glob",
"humantime",
"notify",
]

View File

@ -1,6 +1,6 @@
[package]
name = "runwhen"
version = "0.0.6"
version = "0.0.8"
authors = ["Jeremy Wall <jeremy@marzhillstudios.com>"]
description = "Runs a command on user specified triggers."
repository = "https://github.com/zaphar/runwhen"
@ -11,6 +11,7 @@ license = "Apache-2.0"
[dependencies]
humantime = "2.1.0"
notify = "4.0.17"
glob = "0.3.1"
[dependencies.clap]
version = "3.2.17"

55
flake.lock generated
View File

@ -31,21 +31,6 @@
"type": "github"
}
},
"flake-utils_2": {
"locked": {
"lastModified": 1637014545,
"narHash": "sha256-26IZAc5yzlD9FlDT54io1oqG/bBoyka+FJk5guaX4x4=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "bba5dcc8e0b20ab664967ad83d24d64cb64ec4f4",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"naersk": {
"inputs": {
"nixpkgs": "nixpkgs"
@ -78,49 +63,11 @@
"type": "indirect"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1650222748,
"narHash": "sha256-AHh/goEfG5hlhIMVgGQwACbuv5Wit2ND9vrcB4QthJs=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "ba88a5afa6fff7710c17b5423ff9d721386c4164",
"type": "github"
},
"original": {
"owner": "NixOS",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"flake-compat": "flake-compat",
"flake-utils": "flake-utils",
"naersk": "naersk",
"nixpkgs": "nixpkgs_2",
"rust-overlay": "rust-overlay"
}
},
"rust-overlay": {
"inputs": {
"flake-utils": "flake-utils_2",
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1650162887,
"narHash": "sha256-e23LlN7NQGxrsSWNNAjyvrWlZ3kwFSav9kXbayibKWc=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "26b570500cdd7a359526524e9abad341891122a6",
"type": "github"
},
"original": {
"owner": "oxalica",
"repo": "rust-overlay",
"type": "github"
"naersk": "naersk"
}
}
},

View File

@ -2,31 +2,24 @@
description = "runwhen";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs";
flake-utils.url = "github:numtide/flake-utils";
rust-overlay = {
url = "github:oxalica/rust-overlay";
inputs.nixpkgs.follows = "nixpkgs";
};
naersk.url = "github:nix-community/naersk";
flake-compat = {
url = github:edolstra/flake-compat;
url = "github:edolstra/flake-compat";
flake = false;
};
};
outputs = {self, nixpkgs, flake-utils, rust-overlay, naersk, flake-compat}:
outputs = {self, flake-utils, naersk, flake-compat}:
flake-utils.lib.eachDefaultSystem (system:
let
overlays = [ rust-overlay.overlay ];
pkgs = import nixpkgs { inherit system overlays; };
naersk-lib = naersk.lib."${system}";
in
{
defaultPackage = with pkgs;
naersk-lib.buildPackage rec {
inherit flake-compat;
defaultPackage = naersk-lib.buildPackage rec {
pname = "runwhen";
version = "0.0.6";
version = "0.0.8";
src = ./.;
cargoBuildOptions = opts: opts ++ ["-p" "${pname}" ];
};

View File

@ -21,6 +21,19 @@ pub enum WatchEventType {
Ignore,
}
pub fn get_file(evt: &DebouncedEvent) -> Option<&std::path::PathBuf> {
match evt {
DebouncedEvent::NoticeWrite(b)
| DebouncedEvent::NoticeRemove(b)
| DebouncedEvent::Create(b)
| DebouncedEvent::Write(b)
| DebouncedEvent::Chmod(b)
| DebouncedEvent::Remove(b)
| DebouncedEvent::Rename(b, _) => Some(b),
DebouncedEvent::Error(_, _) | DebouncedEvent::Rescan => None,
}
}
impl From<DebouncedEvent> for WatchEventType {
fn from(e: DebouncedEvent) -> WatchEventType {
match e {

View File

@ -95,8 +95,11 @@ impl CancelableProcess {
}
}
// NOTE(jwall): We want to actually use this some time when we figure out if it can be made to not block or not.
#[allow(dead_code)]
pub fn check(&mut self) -> Result<Option<i32>, CommandError> {
Ok(match self.handle {
// TODO(jwall): This appears to block the thread despite the documenation. Figure out if this is fixable or not.
Some(ref mut h) => match h.try_wait()? {
Some(status) => Some(status.code().unwrap_or(0)),
None => Some(h.wait()?.code().unwrap_or(0)),
@ -115,8 +118,9 @@ impl CancelableProcess {
pub fn cancel(&mut self) -> Result<(), CommandError> {
if let Some(ref mut h) = self.handle {
h.kill()?;
let _ = h.kill();
}
self.exec = None;
self.handle = None;
Ok(())

View File

@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread;
use std::time::Duration;
use std::time::{Duration, Instant};
use glob;
use notify::{watcher, RecursiveMode, Watcher};
use error::CommandError;
@ -28,8 +28,9 @@ pub struct FileProcess<'a> {
cmd: &'a str,
env: Option<Vec<String>>,
files: Vec<&'a str>,
exclude: Option<Vec<&'a str>>,
method: WatchEventType,
poll: Duration,
poll: Option<Duration>,
}
impl<'a> FileProcess<'a> {
@ -37,24 +38,26 @@ impl<'a> FileProcess<'a> {
cmd: &'a str,
env: Option<Vec<String>>,
file: Vec<&'a str>,
exclude: Option<Vec<&'a str>>,
method: WatchEventType,
poll: Duration,
poll: Option<Duration>,
) -> FileProcess<'a> {
FileProcess {
cmd,
env,
method,
poll,
exclude,
files: file,
}
}
}
fn spawn_runner_thread(
lock: Arc<Mutex<bool>>,
fn watch_for_change_events(
ch: Receiver<()>,
cmd: String,
env: Option<Vec<String>>,
poll: Duration,
poll: Option<Duration>,
) {
let copied_env = env.and_then(|v| {
Some(
@ -64,56 +67,52 @@ fn spawn_runner_thread(
.collect::<Vec<String>>(),
)
});
thread::spawn(move || {
let mut exec = CancelableProcess::new(&cmd, copied_env);
exec.spawn().expect("Failed to start command");
let mut exec = CancelableProcess::new(&cmd, copied_env);
println!("Spawning command");
exec.spawn().expect("Failed to start command");
println!("Starting watch loop");
run_loop_step(&mut exec);
println!("Waiting for first change event");
if let Some(poll) = poll {
let mut poll_time = Instant::now();
loop {
// Wait our requisit number of seconds
thread::sleep(poll);
// Default to not running the command.
if !run_loop_step(lock.clone(), &mut exec) {
exec.reset().expect("Failed to start command");
let _ = ch.recv().expect("Channel was closed!!!");
let elapsed = Instant::now().duration_since(poll_time);
poll_time = Instant::now();
if elapsed >= poll {
run_loop_step(&mut exec);
continue;
}
}
});
}
fn run_loop_step(lock: Arc<Mutex<bool>>, exec: &mut CancelableProcess) -> bool {
match lock.lock() {
Ok(mut signal) => {
// We always want to check on our process each iteration of the loop.
if let Err(err) = exec.check() {
println!("{:?}", err);
return false;
}
if *signal {
// set signal to false so we won't trigger on the
// next loop iteration unless we recieved more events.
*signal = false;
// On a true signal we want to start or restart our process.
if let Err(err) = exec.reset() {
println!("{:?}", err);
return false;
}
}
return true;
}
Err(err) => {
println!("Unexpected error; {}", err);
return false;
} else {
loop {
let _ = ch.recv().expect("Channel was closed!!!");
run_loop_step(&mut exec);
}
}
}
fn run_loop_step(exec: &mut CancelableProcess) {
// We always want to check on our process each iteration of the loop.
// set signal to false so we won't trigger on the
// next loop iteration unless we recieved more events.
// On a true signal we want to start or restart our process.
println!("Restarting process");
if let Err(err) = exec.reset() {
println!("Failed to start command");
println!("{:?}", err);
}
}
fn wait_for_fs_events(
lock: Arc<Mutex<bool>>,
ch: Sender<()>,
method: WatchEventType,
files: &Vec<&str>,
excluded: &Option<Vec<&str>>,
) -> Result<(), CommandError> {
// Notify requires a channel for communication.
let (tx, rx) = channel();
let mut watcher = watcher(tx, Duration::from_secs(1))?;
// TODO(jwall): Better error handling.
for file in files {
// NOTE(jwall): this is necessary because notify::fsEventWatcher panics
// if the path doesn't exist. :-(
@ -125,33 +124,43 @@ fn wait_for_fs_events(
watcher.watch(*file, RecursiveMode::Recursive)?;
println!("Watching {:?}", *file);
}
let mut patterns = Vec::new();
if let Some(exclude) = excluded {
for ef in exclude.iter() {
patterns.push(glob::Pattern::new(*ef).expect("Invalid path pattern"));
}
}
loop {
let evt: WatchEventType = match rx.recv() {
Ok(event) => WatchEventType::from(event),
Err(_) => WatchEventType::Error,
Ok(event) => {
// TODO(jwall): Filter this based on the exclude pattern
if let Some(f) = crate::events::get_file(&event) {
for pat in patterns.iter() {
if pat.matches_path(&f) {
continue;
}
}
}
WatchEventType::from(event)
}
Err(e) => {
println!("Watch Error: {}", e);
WatchEventType::Error
}
};
match evt {
WatchEventType::Ignore => {
// We ignore this one.
}
WatchEventType::Error => {
// We log this one.
WatchEventType::Ignore | WatchEventType::Error => {
// We ignore these.
//println!("Event: Ignore");
}
WatchEventType::Touched => {
if method == WatchEventType::Touched {
let mut signal = lock.lock().unwrap();
*signal = true;
} else {
println!("Ignoring touched event");
ch.send(()).unwrap();
}
}
WatchEventType::Changed => match lock.lock() {
Ok(mut signal) => *signal = true,
Err(err) => {
println!("Unexpected error; {}", err);
return Ok(());
}
},
WatchEventType::Changed => {
ch.send(()).unwrap();
}
}
}
}
@ -160,13 +169,16 @@ impl<'a> Process for FileProcess<'a> {
fn run(&mut self) -> Result<(), CommandError> {
// TODO(jeremy): Is this sufficent or do we want to ignore
// any events that come in while the command is running?
let lock = Arc::new(Mutex::new(false));
spawn_runner_thread(
lock.clone(),
self.cmd.to_string(),
self.env.clone(),
self.poll,
);
wait_for_fs_events(lock, self.method.clone(), &self.files)
let (tx, rx) = channel();
thread::spawn({
let cmd = self.cmd.to_string();
let env = self.env.clone();
let poll = self.poll.clone();
move || {
watch_for_change_events(rx, cmd, env, poll);
}
});
wait_for_fs_events(tx, self.method.clone(), &self.files, &self.exclude)?;
Ok(())
}
}

View File

@ -14,10 +14,11 @@
// runwhen - A utility that runs commands on user defined triggers.
#[macro_use]
extern crate clap;
extern crate glob;
extern crate humantime;
extern crate notify;
use std::{path::PathBuf, process, str::FromStr};
use std::{process, str::FromStr};
mod error;
mod events;
@ -44,17 +45,20 @@ fn do_flags() -> clap::ArgMatches {
clap::Command::new("watch")
.about("Trigger that fires when a file or directory changes.")
.arg(
arg!(-f --file)
.name("filetouch")
.value_parser(value_parser!(PathBuf)).help("File or directory to watch for changes"),
arg!(-f --file ...).name("file")
.takes_value(true).help("File or directory to watch for changes"),
)
.arg(arg!(--touch).help("Use file or directory timestamps to monitor for changes."))
.arg(arg!(--poll).value_parser(value_parser!(humantime::Duration)).help("Duration of time between polls")))
.arg(
arg!(-e --exclude ...).name("exclude")
.takes_value(true).help("path names to skip when watching. Specified in unix glob format."),
)
.arg(arg!(--touch).name("filetouch").help("Use file or directory timestamps to monitor for changes."))
.arg(arg!(--poll).name("poll").takes_value(true).value_parser(value_parser!(humantime::Duration)).help("Duration of time between polls")))
.subcommand(
clap::Command::new("timer")
.about("Run command on a timer")
.arg(arg!(-t --duration).takes_value(true).value_parser(value_parser!(humantime::Duration)).help("Duration between runs"))
.arg(arg!(-n --repeat).value_parser(value_parser!(u32))).about("Number of times to run before finishing"))
.arg(arg!(-n --repeat).value_parser(value_parser!(u32)).help("Number of times to run before finishing")))
.subcommand(
clap::Command::new("success")
.about("Run a command when a test command succeeds")
@ -76,6 +80,7 @@ fn main() {
}
maybe_env = Some(env_vec);
}
let mut proc: Box<dyn Process> = if let Some(matches) = app.subcommand_matches("watch") {
let file = match matches.values_of("file") {
Some(v) => v.collect(),
@ -86,11 +91,18 @@ fn main() {
if matches.is_present("filetouch") {
method = WatchEventType::Touched;
}
let duration = *matches
.get_one::<humantime::Duration>("poll")
.cloned()
.unwrap_or(humantime::Duration::from_str("5s").unwrap());
Box::new(FileProcess::new(cmd, maybe_env, file, method, duration))
let duration = match matches.get_one::<humantime::Duration>("poll") {
Some(d) => Some((*d).into()),
None => None,
};
let exclude = match matches.values_of("exclude") {
Some(vr) => Some(vr.collect()),
None => None,
};
println!("Enforcing a poll time of {:?}", duration);
Box::new(FileProcess::new(
cmd, maybe_env, file, exclude, method, duration,
))
} else if let Some(matches) = app.subcommand_matches("timer") {
// TODO(jwall): This should use cancelable commands.
// Unwrap because this flag is required.