mirror of
				https://github.com/house-of-vanity/rexec.git
				synced 2025-10-23 09:49:08 +00:00 
			
		
		
		
	Improved live logging
This commit is contained in:
		
							
								
								
									
										7
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										7
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @@ -266,6 +266,12 @@ version = "1.0.0" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "d2a61bcce320b4968c46542aabe3d14e4d40ad6a6089eeced644326e73f6279a" | ||||
|  | ||||
| [[package]] | ||||
| name = "lazy_static" | ||||
| version = "1.5.0" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" | ||||
|  | ||||
| [[package]] | ||||
| name = "libc" | ||||
| version = "0.2.172" | ||||
| @@ -404,6 +410,7 @@ dependencies = [ | ||||
|  "env_logger", | ||||
|  "itertools", | ||||
|  "lazy-st", | ||||
|  "lazy_static", | ||||
|  "log", | ||||
|  "question", | ||||
|  "rayon", | ||||
|   | ||||
| @@ -1,6 +1,6 @@ | ||||
| [package] | ||||
| name = "rexec" | ||||
| version = "1.4.0" | ||||
| version = "1.5.0" | ||||
| readme = "https://github.com/house-of-vanity/rexec#readme" | ||||
| edition = "2021" | ||||
| description = "Parallel SSH executor" | ||||
| @@ -25,3 +25,4 @@ colored = "3" | ||||
| itertools = "0.14" | ||||
| brace-expand = "0.1.0" | ||||
| question = "0.2.2" | ||||
| lazy_static = "1.5.0" | ||||
|   | ||||
							
								
								
									
										234
									
								
								src/main.rs
									
									
									
									
									
								
							
							
						
						
									
										234
									
								
								src/main.rs
									
									
									
									
									
								
							| @@ -13,11 +13,17 @@ use colored::*; | ||||
| use dns_lookup::lookup_host; | ||||
| use env_logger::Env; | ||||
| use itertools::Itertools; | ||||
| use lazy_static::lazy_static; | ||||
| use log::{error, info, warn}; | ||||
| use question::{Answer, Question}; | ||||
| use rayon::prelude::*; | ||||
| use regex::Regex; | ||||
|  | ||||
| // Global state to track the currently open block | ||||
| lazy_static! { | ||||
|     static ref CURRENT_BLOCK: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None)); | ||||
| } | ||||
|  | ||||
| // Define command-line arguments using the clap library | ||||
| #[derive(Parser, Debug)] | ||||
| #[command(author = "AB ab@hexor.ru", version, about = "Parallel SSH executor in Rust", long_about = None)] | ||||
| @@ -76,69 +82,69 @@ struct Host { | ||||
| } | ||||
|  | ||||
| /// Find common domain suffix across all hostnames to simplify output display | ||||
| ///  | ||||
| /// | ||||
| /// This function analyzes all hostnames to identify a common domain suffix | ||||
| /// which can be shortened during display to improve readability. | ||||
| ///  | ||||
| /// | ||||
| /// # Arguments | ||||
| /// * `hostnames` - A slice of strings containing all server hostnames | ||||
| ///  | ||||
| /// | ||||
| /// # Returns | ||||
| /// * `Option<String>` - The common suffix if found, or None | ||||
| fn find_common_suffix(hostnames: &[String]) -> Option<String> { | ||||
|     if hostnames.is_empty() { | ||||
|         return None; | ||||
|     } | ||||
|      | ||||
|  | ||||
|     // Don't truncate if only one host | ||||
|     if hostnames.len() == 1 { | ||||
|         return None; | ||||
|     } | ||||
|      | ||||
|  | ||||
|     let first = &hostnames[0]; | ||||
|      | ||||
|  | ||||
|     // Start with assumption that the entire first hostname is the common suffix | ||||
|     let mut common = first.clone(); | ||||
|      | ||||
|  | ||||
|     // Iterate through remaining hostnames, reducing the common part | ||||
|     for hostname in hostnames.iter().skip(1) { | ||||
|         // Exit early if no common part remains | ||||
|         if common.is_empty() { | ||||
|             return None; | ||||
|         } | ||||
|          | ||||
|  | ||||
|         // Find common suffix with current hostname | ||||
|         let mut new_common = String::new(); | ||||
|          | ||||
|  | ||||
|         // Search for common suffix by comparing characters from right to left | ||||
|         let mut common_chars = common.chars().rev(); | ||||
|         let mut hostname_chars = hostname.chars().rev(); | ||||
|          | ||||
|  | ||||
|         loop { | ||||
|             match (common_chars.next(), hostname_chars.next()) { | ||||
|                 (Some(c1), Some(c2)) if c1 == c2 => new_common.insert(0, c1), | ||||
|                 _ => break, | ||||
|             } | ||||
|         } | ||||
|          | ||||
|  | ||||
|         common = new_common; | ||||
|     } | ||||
|      | ||||
|  | ||||
|     // Ensure the common part is a valid domain suffix (starts with a dot) | ||||
|     if common.is_empty() || !common.starts_with('.') { | ||||
|         return None; | ||||
|     } | ||||
|      | ||||
|  | ||||
|     // Return the identified common suffix | ||||
|     Some(common) | ||||
| } | ||||
|  | ||||
| /// Shorten hostname by removing the common suffix and replacing with an asterisk | ||||
| ///  | ||||
| /// | ||||
| /// # Arguments | ||||
| /// * `hostname` - The original hostname | ||||
| /// * `common_suffix` - Optional common suffix to remove | ||||
| ///  | ||||
| /// | ||||
| /// # Returns | ||||
| /// * `String` - Shortened hostname or original if no common suffix | ||||
| fn shorten_hostname(hostname: &str, common_suffix: &Option<String>) -> String { | ||||
| @@ -146,13 +152,13 @@ fn shorten_hostname(hostname: &str, common_suffix: &Option<String>) -> String { | ||||
|         Some(suffix) if hostname.ends_with(suffix) => { | ||||
|             let short_name = hostname[..hostname.len() - suffix.len()].to_string(); | ||||
|             format!("{}{}", short_name, "*") | ||||
|         }, | ||||
|         } | ||||
|         _ => hostname.to_string(), | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Read and parse the SSH known_hosts file to extract server names | ||||
| ///  | ||||
| /// | ||||
| /// # Returns | ||||
| /// * `Vec<Host>` - List of hosts found in the known_hosts file | ||||
| fn read_known_hosts() -> Vec<Host> { | ||||
| @@ -174,11 +180,11 @@ fn read_known_hosts() -> Vec<Host> { | ||||
| } | ||||
|  | ||||
| /// Expand a numeric range in the format [start:end] to a list of strings | ||||
| ///  | ||||
| /// | ||||
| /// # Arguments | ||||
| /// * `start` - Starting number (inclusive) | ||||
| /// * `end` - Ending number (inclusive) | ||||
| ///  | ||||
| /// | ||||
| /// # Returns | ||||
| /// * `Vec<String>` - List of numbers as strings | ||||
| fn expand_range(start: i32, end: i32) -> Vec<String> { | ||||
| @@ -186,10 +192,10 @@ fn expand_range(start: i32, end: i32) -> Vec<String> { | ||||
| } | ||||
|  | ||||
| /// Expand a comma-separated list in the format {item1,item2,item3} to a list of strings | ||||
| ///  | ||||
| /// | ||||
| /// # Arguments | ||||
| /// * `list` - Comma-separated string to expand | ||||
| ///  | ||||
| /// | ||||
| /// # Returns | ||||
| /// * `Vec<String>` - List of expanded items | ||||
| fn expand_list(list: &str) -> Vec<String> { | ||||
| @@ -197,14 +203,14 @@ fn expand_list(list: &str) -> Vec<String> { | ||||
| } | ||||
|  | ||||
| /// Expand a server pattern string with range and list notation into individual hostnames | ||||
| ///  | ||||
| /// | ||||
| /// Supports two expansion types: | ||||
| /// - Range expansion: server-[1:5] → server-1, server-2, server-3, server-4, server-5 | ||||
| /// - List expansion: server-{prod,dev} → server-prod, server-dev | ||||
| ///  | ||||
| /// | ||||
| /// # Arguments | ||||
| /// * `s` - Pattern string to expand | ||||
| ///  | ||||
| /// | ||||
| /// # Returns | ||||
| /// * `Vec<Host>` - List of expanded Host objects | ||||
| fn expand_string(s: &str) -> Vec<Host> { | ||||
| @@ -268,90 +274,130 @@ fn expand_string(s: &str) -> Vec<Host> { | ||||
| } | ||||
|  | ||||
| /// Execute a command on a single host using the system SSH client | ||||
| ///  | ||||
| /// | ||||
| /// This function runs an SSH command using the system's SSH client, | ||||
| /// capturing and displaying output in real-time with proper formatting. | ||||
| ///  | ||||
| /// | ||||
| /// # Arguments | ||||
| /// * `hostname` - Target server hostname | ||||
| /// * `username` - SSH username | ||||
| /// * `command` - Command to execute | ||||
| /// * `common_suffix` - Optional common suffix for hostname display formatting | ||||
| /// * `code_only` - Whether to display only exit codes | ||||
| ///  | ||||
| /// | ||||
| /// # Returns | ||||
| /// * `Result<i32, String>` - Exit code on success or error message | ||||
| fn execute_ssh_command(hostname: &str, username: &str, command: &str, common_suffix: &Option<String>, code_only: bool) -> Result<i32, String> { | ||||
| fn execute_ssh_command( | ||||
|     hostname: &str, | ||||
|     username: &str, | ||||
|     command: &str, | ||||
|     common_suffix: &Option<String>, | ||||
|     code_only: bool, | ||||
| ) -> Result<i32, String> { | ||||
|     let display_name = shorten_hostname(hostname, common_suffix); | ||||
|      | ||||
|     // Display execution start message with shortened hostname | ||||
|     println!("\n{} - STARTED", display_name.yellow().bold()); | ||||
|      | ||||
|  | ||||
|     // Build the SSH command with appropriate options | ||||
|     let mut ssh_cmd = Command::new("ssh"); | ||||
|     ssh_cmd.arg("-o").arg("StrictHostKeyChecking=no") | ||||
|          .arg("-o").arg("BatchMode=yes") | ||||
|          .arg(format!("{}@{}", username, hostname)) | ||||
|          .arg(command) | ||||
|          .stdout(Stdio::piped()) | ||||
|          .stderr(Stdio::piped()); | ||||
|      | ||||
|     ssh_cmd | ||||
|         .arg("-o") | ||||
|         .arg("StrictHostKeyChecking=no") | ||||
|         .arg("-o") | ||||
|         .arg("BatchMode=yes") | ||||
|         .arg(format!("{}@{}", username, hostname)) | ||||
|         .arg(command) | ||||
|         .stdout(Stdio::piped()) | ||||
|         .stderr(Stdio::piped()); | ||||
|  | ||||
|     // Execute the command | ||||
|     let mut child = match ssh_cmd.spawn() { | ||||
|         Ok(child) => child, | ||||
|         Err(e) => return Err(format!("Failed to start SSH process: {}", e)), | ||||
|     }; | ||||
|      | ||||
|  | ||||
|     // Function to handle output lines with proper block management | ||||
|     let handle_output = |line: String, display_name: &str, code_only: bool| { | ||||
|         if !code_only { | ||||
|             let mut current_block = CURRENT_BLOCK.lock().unwrap(); | ||||
|  | ||||
|             // Check if we need to close the previous block and open a new one | ||||
|             match current_block.as_ref() { | ||||
|                 Some(open_host) if open_host != display_name => { | ||||
|                     // Close the previous block | ||||
|                     println!("└ {} ┘", open_host.yellow()); | ||||
|                     // Open new block | ||||
|                     println!("┌ {} ┐", display_name.yellow()); | ||||
|                     *current_block = Some(display_name.to_string()); | ||||
|                 } | ||||
|                 None => { | ||||
|                     // Open new block | ||||
|                     println!("┌ {} ┐", display_name.yellow()); | ||||
|                     *current_block = Some(display_name.to_string()); | ||||
|                 } | ||||
|                 Some(_) => { | ||||
|                     // Same host, continue with current block | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             // Print the log line | ||||
|             println!("│ {} │ {}", display_name.yellow(), line); | ||||
|         } | ||||
|     }; | ||||
|  | ||||
|     // Capture and display stdout in real-time using a dedicated thread | ||||
|     let stdout = child.stdout.take().unwrap(); | ||||
|     let display_name_stdout = display_name.clone(); | ||||
|     let code_only_stdout = code_only; | ||||
|     let stdout_thread = thread::spawn(move || { | ||||
|         let reader = BufReader::new(stdout); | ||||
|         let prefix = format!("{}", "║".green()); | ||||
|          | ||||
|  | ||||
|         for line in reader.lines() { | ||||
|             match line { | ||||
|                 Ok(line) => { | ||||
|                     if !code_only_stdout { | ||||
|                         println!("{} {} {} {}", prefix, display_name_stdout.yellow(), prefix, line); | ||||
|                     } | ||||
|                 }, | ||||
|                     handle_output(line, &display_name_stdout, code_only_stdout); | ||||
|                 } | ||||
|                 Err(_) => break, | ||||
|             } | ||||
|         } | ||||
|     }); | ||||
|      | ||||
|  | ||||
|     // Capture and display stderr in real-time using a dedicated thread | ||||
|     let stderr = child.stderr.take().unwrap(); | ||||
|     let display_name_stderr = display_name.clone(); | ||||
|     let code_only_stderr = code_only; | ||||
|     let stderr_thread = thread::spawn(move || { | ||||
|         let reader = BufReader::new(stderr); | ||||
|         let prefix = format!("{}", "║".red()); | ||||
|          | ||||
|  | ||||
|         for line in reader.lines() { | ||||
|             match line { | ||||
|                 Ok(line) => { | ||||
|                     if !code_only_stderr { | ||||
|                         println!("{} {} {} {}", prefix, display_name_stderr.yellow(), prefix, line); | ||||
|                     } | ||||
|                 }, | ||||
|                     handle_output(line, &display_name_stderr, code_only_stderr); | ||||
|                 } | ||||
|                 Err(_) => break, | ||||
|             } | ||||
|         } | ||||
|     }); | ||||
|      | ||||
|  | ||||
|     // Wait for command to complete | ||||
|     let status = match child.wait() { | ||||
|         Ok(status) => status, | ||||
|         Err(e) => return Err(format!("Failed to wait for SSH process: {}", e)), | ||||
|     }; | ||||
|      | ||||
|  | ||||
|     // Wait for stdout and stderr threads to complete | ||||
|     stdout_thread.join().unwrap(); | ||||
|     stderr_thread.join().unwrap(); | ||||
|      | ||||
|  | ||||
|     // Close the block if this host was the last one to output | ||||
|     if !code_only { | ||||
|         let mut current_block = CURRENT_BLOCK.lock().unwrap(); | ||||
|         if let Some(open_host) = current_block.as_ref() { | ||||
|             if open_host == &display_name { | ||||
|                 println!("└ {} ┘", display_name.yellow()); | ||||
|                 *current_block = None; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     // Format exit code with color (green for success, red for failure) | ||||
|     let exit_code = status.code().unwrap_or(-1); | ||||
|     let code_string = if exit_code == 0 { | ||||
| @@ -359,10 +405,12 @@ fn execute_ssh_command(hostname: &str, username: &str, command: &str, common_suf | ||||
|     } else { | ||||
|         format!("{}", exit_code.to_string().red()) | ||||
|     }; | ||||
|      | ||||
|     // Display completion message | ||||
|     println!("{} - COMPLETED (Exit code: [{}])", display_name.yellow().bold(), code_string); | ||||
|      | ||||
|  | ||||
|     // For code-only mode, just show hostname and exit code | ||||
|     if code_only { | ||||
|         println!("{}: [{}]", display_name.yellow(), code_string); | ||||
|     } | ||||
|  | ||||
|     Ok(exit_code) | ||||
| } | ||||
|  | ||||
| @@ -373,7 +421,7 @@ fn main() { | ||||
|         .format_timestamp(None) | ||||
|         .format_target(false) | ||||
|         .init(); | ||||
|      | ||||
|  | ||||
|     // Parse command-line arguments | ||||
|     let args = Args::parse(); | ||||
|  | ||||
| @@ -430,13 +478,14 @@ fn main() { | ||||
|     } | ||||
|  | ||||
|     info!("Matched hosts:"); | ||||
|      | ||||
|  | ||||
|     // Perform DNS resolution for all hosts in parallel | ||||
|     // Results are stored with original indices to maintain order | ||||
|     let resolved_ips_with_indices = Arc::new(Mutex::new(Vec::<(String, IpAddr, usize)>::new())); | ||||
|  | ||||
|     host_with_indices.par_iter().for_each(|(host, idx)| { | ||||
|         match lookup_host(&host.name) { | ||||
|     host_with_indices | ||||
|         .par_iter() | ||||
|         .for_each(|(host, idx)| match lookup_host(&host.name) { | ||||
|             Ok(ips) if !ips.is_empty() => { | ||||
|                 let ip = ips[0]; | ||||
|                 let mut results = resolved_ips_with_indices.lock().unwrap(); | ||||
| @@ -444,19 +493,26 @@ fn main() { | ||||
|             } | ||||
|             Ok(_) => { | ||||
|                 let mut results = resolved_ips_with_indices.lock().unwrap(); | ||||
|                 results.push((host.name.clone(), IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)), *idx)); | ||||
|                 results.push(( | ||||
|                     host.name.clone(), | ||||
|                     IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)), | ||||
|                     *idx, | ||||
|                 )); | ||||
|             } | ||||
|             Err(_) => { | ||||
|                 let mut results = resolved_ips_with_indices.lock().unwrap(); | ||||
|                 results.push((host.name.clone(), IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)), *idx)); | ||||
|                 results.push(( | ||||
|                     host.name.clone(), | ||||
|                     IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)), | ||||
|                     *idx, | ||||
|                 )); | ||||
|             } | ||||
|         } | ||||
|     }); | ||||
|         }); | ||||
|  | ||||
|     // Sort hosts by original index to maintain consistent display order | ||||
|     let mut resolved_hosts = resolved_ips_with_indices.lock().unwrap().clone(); | ||||
|     resolved_hosts.sort_by_key(|(_, _, idx)| *idx); | ||||
|      | ||||
|  | ||||
|     // Display all matched hosts with their resolved IPs | ||||
|     for (hostname, ip, _) in &resolved_hosts { | ||||
|         if ip.is_unspecified() { | ||||
| @@ -477,16 +533,22 @@ fn main() { | ||||
|         error!("No valid hosts to connect to"); | ||||
|         process::exit(1); | ||||
|     } | ||||
|      | ||||
|  | ||||
|     // Find common domain suffix to optimize display | ||||
|     let hostnames: Vec<String> = valid_hosts.iter().map(|(hostname, _, _)| hostname.clone()).collect(); | ||||
|     let hostnames: Vec<String> = valid_hosts | ||||
|         .iter() | ||||
|         .map(|(hostname, _, _)| hostname.clone()) | ||||
|         .collect(); | ||||
|     let common_suffix = find_common_suffix(&hostnames); | ||||
|      | ||||
|  | ||||
|     // Inform user about display optimization if common suffix found | ||||
|     if let Some(suffix) = &common_suffix { | ||||
|         info!("Common domain suffix found: '{}' (will be displayed as '*')", suffix); | ||||
|         info!( | ||||
|             "Common domain suffix found: '{}' (will be displayed as '*')", | ||||
|             suffix | ||||
|         ); | ||||
|     } | ||||
|      | ||||
|  | ||||
|     // Ask for confirmation before proceeding (unless --noconfirm is specified) | ||||
|     if !args.noconfirm | ||||
|         && match Question::new(&*format!( | ||||
| @@ -509,37 +571,43 @@ fn main() { | ||||
|     // Execute commands using system SSH client | ||||
|     let batch_size = args.parallel as usize; | ||||
|     let mut processed = 0; | ||||
|      | ||||
|  | ||||
|     while processed < valid_hosts.len() { | ||||
|         let end = std::cmp::min(processed + batch_size, valid_hosts.len()); | ||||
|         let batch = &valid_hosts[processed..end]; | ||||
|          | ||||
|  | ||||
|         // Create a thread for each host in the current batch | ||||
|         let mut handles = Vec::new(); | ||||
|          | ||||
|  | ||||
|         for (hostname, _, _) in batch { | ||||
|             let hostname = hostname.clone(); | ||||
|             let username = args.username.clone(); | ||||
|             let command = args.command.clone(); | ||||
|             let common_suffix_clone = common_suffix.clone(); | ||||
|             let code_only = args.code; | ||||
|              | ||||
|  | ||||
|             // Execute SSH command in a separate thread | ||||
|             let handle = thread::spawn(move || { | ||||
|                 match execute_ssh_command(&hostname, &username, &command, &common_suffix_clone, code_only) { | ||||
|                 match execute_ssh_command( | ||||
|                     &hostname, | ||||
|                     &username, | ||||
|                     &command, | ||||
|                     &common_suffix_clone, | ||||
|                     code_only, | ||||
|                 ) { | ||||
|                     Ok(_) => (), | ||||
|                     Err(e) => error!("Error executing command on {}: {}", hostname, e), | ||||
|                 } | ||||
|             }); | ||||
|              | ||||
|  | ||||
|             handles.push(handle); | ||||
|         } | ||||
|          | ||||
|  | ||||
|         // Wait for all threads in this batch to complete | ||||
|         for handle in handles { | ||||
|             handle.join().unwrap(); | ||||
|         } | ||||
|          | ||||
|  | ||||
|         processed = end; | ||||
|     } | ||||
| } | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user