use std::collections::{HashMap, HashSet, VecDeque}; use std::os::unix::fs::MetadataExt; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher}; use tokio::sync::{broadcast, RwLock}; use tracing::{debug, info, warn}; use furumi_common::proto::{AttrResponse, ChangeEvent, ChangeKind, DirEntry}; /// How many directory levels to pre-walk on startup. const INITIAL_DEPTH: u32 = 3; /// Broadcast channel capacity — clients that fall behind lose events and rely on TTL. const BROADCAST_CAPACITY: usize = 256; // ── Types ───────────────────────────────────────────────────────── /// One directory in the snapshot: its entries and the attr for each child. /// `children` and `child_attrs` are parallel slices (same index = same file). /// `dir_attr` is the attr of the directory itself. #[derive(Clone)] pub struct SnapDir { pub children: Vec, pub child_attrs: Vec, pub dir_attr: AttrResponse, } // ── WatchedTree ────────────────────────────────────────────────── /// Maintains an in-memory snapshot of the directory tree and broadcasts /// change events to connected clients via inotify. pub struct WatchedTree { /// Virtual-path → (entries + attrs). snapshot: Arc>>, change_tx: broadcast::Sender, /// Kept alive to continue watching. Shared with the event handler so it can /// add new watches when directories are created at runtime. _watcher: Arc>, } impl WatchedTree { pub async fn new(root: PathBuf) -> anyhow::Result { let snapshot: Arc>> = Arc::new(RwLock::new(HashMap::new())); let (change_tx, _) = broadcast::channel(BROADCAST_CAPACITY); info!("WatchedTree: walking '{}' (depth {})…", root.display(), INITIAL_DEPTH); let t = std::time::Instant::now(); let watched_dirs = walk_tree(&root, INITIAL_DEPTH, &snapshot).await; let snap_len = snapshot.read().await.len(); let total_entries: usize = snapshot.read().await.values().map(|d| d.children.len()).sum(); info!( "WatchedTree: snapshot ready — {} dirs, {} entries, {} watches, took {:.1}s", snap_len, total_entries, watched_dirs.len(), t.elapsed().as_secs_f32(), ); // Bridge notify's sync callback → async tokio task. let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel::>(); let watcher = Arc::new(Mutex::new(RecommendedWatcher::new( move |res| { let _ = notify_tx.send(res); }, Config::default(), )?)); // Add one non-recursive inotify watch per directory in the snapshot. { let mut w = watcher.lock().unwrap(); for dir_abs in &watched_dirs { if let Err(e) = w.watch(dir_abs, RecursiveMode::NonRecursive) { warn!("watch failed for {:?}: {}", dir_abs, e); } } } let snapshot_bg = Arc::clone(&snapshot); let root_bg = root.clone(); let tx_bg = change_tx.clone(); let watcher_bg = Arc::clone(&watcher); tokio::spawn(async move { while let Some(res) = notify_rx.recv().await { match res { Ok(event) => { handle_fs_event(event, &root_bg, &snapshot_bg, &tx_bg, &watcher_bg).await } Err(e) => warn!("notify error: {}", e), } } }); Ok(Self { snapshot, change_tx, _watcher: watcher, }) } /// Returns all snapshot entries within `depth` levels of `base`. pub async fn get_snapshot(&self, base: &str, depth: u32) -> Vec<(String, SnapDir)> { let snap = self.snapshot.read().await; snap.iter() .filter(|(path, _)| path_depth_from(base, path).map_or(false, |d| d <= depth)) .map(|(path, snap_dir)| (path.clone(), snap_dir.clone())) .collect() } pub fn subscribe(&self) -> broadcast::Receiver { self.change_tx.subscribe() } } // ── Helpers ────────────────────────────────────────────────────── fn metadata_to_attr(meta: &std::fs::Metadata) -> AttrResponse { AttrResponse { size: meta.len(), mode: meta.mode(), mtime: meta.mtime() as u64, } } /// BFS walk: reads filesystem, stores entries + attrs in snapshot. /// Returns the list of absolute paths walked (for inotify setup). async fn walk_tree( root: &Path, max_depth: u32, snapshot: &Arc>>, ) -> Vec { let mut walked: Vec = Vec::new(); let mut queue: VecDeque<(PathBuf, String, u32)> = VecDeque::new(); queue.push_back((root.to_path_buf(), "/".to_string(), 0)); while let Some((abs_path, virt_path, depth)) = queue.pop_front() { // Stat the directory itself. let dir_attr = match std::fs::metadata(&abs_path) { Ok(m) => metadata_to_attr(&m), Err(e) => { warn!("walk_tree: cannot stat {:?}: {}", abs_path, e); continue; } }; let mut dir = match tokio::fs::read_dir(&abs_path).await { Ok(d) => d, Err(e) => { warn!("walk_tree: cannot read {:?}: {}", abs_path, e); continue; } }; let mut children: Vec = Vec::new(); let mut child_attrs: Vec = Vec::new(); while let Ok(Some(entry)) = dir.next_entry().await { let Ok(ft) = entry.file_type().await else { continue }; let type_val = if ft.is_dir() { 4 } else if ft.is_file() { 8 } else { continue }; let name = entry.file_name().to_string_lossy().into_owned(); // Stat the child. let attr = match entry.metadata().await { Ok(m) => metadata_to_attr(&m), Err(_) => AttrResponse { size: 0, mode: 0, mtime: 0 }, }; // Skip hidden directories to avoid exploding the watch list. if ft.is_dir() && name.starts_with('.') { continue; } if ft.is_dir() && depth < max_depth { let child_virt = child_virt_path(&virt_path, &name); queue.push_back((entry.path(), child_virt, depth + 1)); } children.push(DirEntry { name, r#type: type_val }); child_attrs.push(attr); } snapshot.write().await.insert(virt_path, SnapDir { children, child_attrs, dir_attr }); walked.push(abs_path); } walked } /// Re-reads a single directory and updates its snapshot entry. async fn refresh_dir( abs_path: &Path, virt_path: &str, snapshot: &Arc>>, ) { let dir_attr = match std::fs::metadata(abs_path) { Ok(m) => metadata_to_attr(&m), Err(_) => { snapshot.write().await.remove(virt_path); return; } }; let mut dir = match tokio::fs::read_dir(abs_path).await { Ok(d) => d, Err(_) => { snapshot.write().await.remove(virt_path); return; } }; let mut children: Vec = Vec::new(); let mut child_attrs: Vec = Vec::new(); while let Ok(Some(entry)) = dir.next_entry().await { let Ok(ft) = entry.file_type().await else { continue }; let type_val = if ft.is_dir() { 4 } else if ft.is_file() { 8 } else { continue }; let attr = match entry.metadata().await { Ok(m) => metadata_to_attr(&m), Err(_) => AttrResponse { size: 0, mode: 0, mtime: 0 }, }; children.push(DirEntry { name: entry.file_name().to_string_lossy().into_owned(), r#type: type_val, }); child_attrs.push(attr); } snapshot.write().await.insert(virt_path.to_string(), SnapDir { children, child_attrs, dir_attr }); } async fn handle_fs_event( event: notify::Event, root: &Path, snapshot: &Arc>>, tx: &broadcast::Sender, watcher: &Arc>, ) { use notify::EventKind; let proto_kind = match &event.kind { EventKind::Create(_) => ChangeKind::Created, EventKind::Remove(_) => ChangeKind::Deleted, EventKind::Modify(_) => ChangeKind::Modified, _ => return, }; let kind_i32 = proto_kind as i32; if matches!(event.kind, EventKind::Create(_)) { for path in &event.paths { if path.is_dir() && !path.file_name().map_or(false, |n| n.to_string_lossy().starts_with('.')) { let mut w = watcher.lock().unwrap(); if let Err(e) = w.watch(path, RecursiveMode::NonRecursive) { warn!("failed to add watch for new dir {:?}: {}", path, e); } } } } let mut parents: HashSet = HashSet::new(); for path in &event.paths { if let Some(parent) = path.parent() { if parent.starts_with(root) { parents.insert(parent.to_path_buf()); } } } for parent_abs in parents { let virt = abs_to_virt(root, &parent_abs); debug!("snapshot refresh: {}", virt); refresh_dir(&parent_abs, &virt, snapshot).await; let _ = tx.send(ChangeEvent { path: virt, kind: kind_i32 }); } } fn abs_to_virt(root: &Path, abs: &Path) -> String { match abs.strip_prefix(root) { Ok(rel) if rel.as_os_str().is_empty() => "/".to_string(), Ok(rel) => format!("/{}", rel.to_string_lossy()), Err(_) => "/".to_string(), } } fn child_virt_path(parent: &str, name: &str) -> String { if parent == "/" { format!("/{}", name) } else { format!("{}/{}", parent, name) } } fn path_depth_from(base: &str, path: &str) -> Option { if base == "/" { if path == "/" { return Some(0); } let trimmed = path.trim_start_matches('/'); Some(trimmed.matches('/').count() as u32 + 1) } else { if path == base { return Some(0); } let prefix = format!("{}/", base); path.strip_prefix(prefix.as_str()) .map(|rest| rest.matches('/').count() as u32 + 1) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_path_depth_from_root() { assert_eq!(path_depth_from("/", "/"), Some(0)); assert_eq!(path_depth_from("/", "/a"), Some(1)); assert_eq!(path_depth_from("/", "/a/b"), Some(2)); assert_eq!(path_depth_from("/", "/a/b/c"), Some(3)); } #[test] fn test_path_depth_from_subdir() { assert_eq!(path_depth_from("/movies", "/movies"), Some(0)); assert_eq!(path_depth_from("/movies", "/movies/action"), Some(1)); assert_eq!(path_depth_from("/movies", "/movies/action/marvel"), Some(2)); assert_eq!(path_depth_from("/movies", "/music"), None); assert_eq!(path_depth_from("/movies", "/movies-extra"), None); assert_eq!(path_depth_from("/movies", "/"), None); } }