use std::collections::{HashMap, HashSet, VecDeque}; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher}; use tokio::sync::{broadcast, RwLock}; use tracing::{debug, info, warn}; use furumi_common::proto::{ChangeEvent, ChangeKind, DirEntry}; /// How many directory levels to pre-walk on startup. const INITIAL_DEPTH: u32 = 3; /// Broadcast channel capacity — clients that fall behind lose events and rely on TTL. const BROADCAST_CAPACITY: usize = 256; // ── WatchedTree ────────────────────────────────────────────────── /// Maintains an in-memory snapshot of the directory tree and broadcasts /// change events to connected clients via inotify. pub struct WatchedTree { /// Virtual-path → directory entries, e.g. "/" → [...], "/movies" → [...]. snapshot: Arc>>>, change_tx: broadcast::Sender, /// Kept alive to continue watching. Shared with the event handler so it can /// add new watches when directories are created at runtime. _watcher: Arc>, } impl WatchedTree { pub async fn new(root: PathBuf) -> anyhow::Result { let snapshot: Arc>>> = Arc::new(RwLock::new(HashMap::new())); let (change_tx, _) = broadcast::channel(BROADCAST_CAPACITY); // Build snapshot, collect the absolute paths that were walked so we can watch them. info!("WatchedTree: walking '{}' (depth {})…", root.display(), INITIAL_DEPTH); let t = std::time::Instant::now(); let watched_dirs = walk_tree(&root, INITIAL_DEPTH, &snapshot).await; let snap_len = snapshot.read().await.len(); info!( "WatchedTree: snapshot ready — {} directories, {} inotify watches, took {:.1}s", snap_len, watched_dirs.len(), t.elapsed().as_secs_f32(), ); // Bridge notify's sync callback → async tokio task. let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel::>(); let watcher = Arc::new(Mutex::new(RecommendedWatcher::new( move |res| { let _ = notify_tx.send(res); }, Config::default(), )?)); // Add one non-recursive inotify watch per directory in the snapshot. // Non-recursive avoids blowing through the kernel's inotify watch limit // on large trees (default: 8192 watches). { let mut w = watcher.lock().unwrap(); for dir_abs in &watched_dirs { if let Err(e) = w.watch(dir_abs, RecursiveMode::NonRecursive) { warn!("watch failed for {:?}: {}", dir_abs, e); } } } // Process FS events asynchronously. let snapshot_bg = Arc::clone(&snapshot); let root_bg = root.clone(); let tx_bg = change_tx.clone(); let watcher_bg = Arc::clone(&watcher); tokio::spawn(async move { while let Some(res) = notify_rx.recv().await { match res { Ok(event) => { handle_fs_event(event, &root_bg, &snapshot_bg, &tx_bg, &watcher_bg).await } Err(e) => warn!("notify error: {}", e), } } }); Ok(Self { snapshot, change_tx, _watcher: watcher, }) } /// Returns all snapshot entries whose virtual path is within `depth` levels of `base`. pub async fn get_snapshot(&self, base: &str, depth: u32) -> Vec<(String, Vec)> { let snap = self.snapshot.read().await; snap.iter() .filter(|(path, _)| path_depth_from(base, path).map_or(false, |d| d <= depth)) .map(|(path, entries)| (path.clone(), entries.clone())) .collect() } pub fn subscribe(&self) -> broadcast::Receiver { self.change_tx.subscribe() } } // ── Helpers ────────────────────────────────────────────────────── /// Iterative BFS walk: reads the real filesystem and populates the snapshot. /// Skips hidden directories (names starting with '.') to avoid walking /// .cargo, .local, .config etc. in home directories. /// Returns the list of absolute directory paths that were walked (for inotify setup). async fn walk_tree( root: &Path, max_depth: u32, snapshot: &Arc>>>, ) -> Vec { let mut walked: Vec = Vec::new(); // Queue of (abs_path, virt_path, depth_from_root) let mut queue: VecDeque<(PathBuf, String, u32)> = VecDeque::new(); queue.push_back((root.to_path_buf(), "/".to_string(), 0)); while let Some((abs_path, virt_path, depth)) = queue.pop_front() { let mut dir = match tokio::fs::read_dir(&abs_path).await { Ok(d) => d, Err(e) => { warn!("walk_tree: cannot read {:?}: {}", abs_path, e); continue; } }; let mut entries = Vec::new(); while let Ok(Some(entry)) = dir.next_entry().await { let Ok(ft) = entry.file_type().await else { continue; }; let type_val = if ft.is_dir() { 4 } else if ft.is_file() { 8 } else { continue; }; let name = entry.file_name().to_string_lossy().into_owned(); // Skip hidden directories — they typically hold tooling caches // (.cargo, .local, .config, .rustup…) that are irrelevant for // serving media and would blow through the inotify watch limit. if ft.is_dir() && name.starts_with('.') { continue; } if ft.is_dir() && depth < max_depth { let child_virt = child_virt_path(&virt_path, &name); queue.push_back((entry.path(), child_virt, depth + 1)); } entries.push(DirEntry { name, r#type: type_val }); } snapshot.write().await.insert(virt_path, entries); walked.push(abs_path); } walked } /// Re-reads a single directory and updates its snapshot entry. async fn refresh_dir( abs_path: &Path, virt_path: &str, snapshot: &Arc>>>, ) { let mut dir = match tokio::fs::read_dir(abs_path).await { Ok(d) => d, Err(_) => { // Directory was deleted — remove it from the snapshot. snapshot.write().await.remove(virt_path); return; } }; let mut entries = Vec::new(); while let Ok(Some(entry)) = dir.next_entry().await { let Ok(ft) = entry.file_type().await else { continue; }; let type_val = if ft.is_dir() { 4 } else if ft.is_file() { 8 } else { continue }; entries.push(DirEntry { name: entry.file_name().to_string_lossy().into_owned(), r#type: type_val, }); } snapshot.write().await.insert(virt_path.to_string(), entries); } async fn handle_fs_event( event: notify::Event, root: &Path, snapshot: &Arc>>>, tx: &broadcast::Sender, watcher: &Arc>, ) { use notify::EventKind; let proto_kind = match &event.kind { EventKind::Create(_) => ChangeKind::Created, EventKind::Remove(_) => ChangeKind::Deleted, EventKind::Modify(_) => ChangeKind::Modified, _ => return, }; let kind_i32 = proto_kind as i32; // If a new directory appeared, start watching it immediately so we don't // miss events inside it (non-recursive mode requires explicit per-dir watches). if matches!(event.kind, EventKind::Create(_)) { for path in &event.paths { if path.is_dir() && !path.file_name().map_or(false, |n| n.to_string_lossy().starts_with('.')) { let mut w = watcher.lock().unwrap(); if let Err(e) = w.watch(path, RecursiveMode::NonRecursive) { warn!("failed to add watch for new dir {:?}: {}", path, e); } } } } // Collect unique parent directories that need refreshing. let mut parents: HashSet = HashSet::new(); for path in &event.paths { if let Some(parent) = path.parent() { if parent.starts_with(root) { parents.insert(parent.to_path_buf()); } } } for parent_abs in parents { let virt = abs_to_virt(root, &parent_abs); debug!("snapshot refresh: {}", virt); refresh_dir(&parent_abs, &virt, snapshot).await; let _ = tx.send(ChangeEvent { path: virt, kind: kind_i32 }); } } fn abs_to_virt(root: &Path, abs: &Path) -> String { match abs.strip_prefix(root) { Ok(rel) if rel.as_os_str().is_empty() => "/".to_string(), Ok(rel) => format!("/{}", rel.to_string_lossy()), Err(_) => "/".to_string(), } } fn child_virt_path(parent: &str, name: &str) -> String { if parent == "/" { format!("/{}", name) } else { format!("{}/{}", parent, name) } } /// Returns how many levels `path` is below `base`, or `None` if `path` is not under `base`. /// /// Examples (base="/"): "/" → 0, "/a" → 1, "/a/b" → 2 /// Examples (base="/a"): "/a" → 0, "/a/b" → 1, "/other" → None fn path_depth_from(base: &str, path: &str) -> Option { if base == "/" { if path == "/" { return Some(0); } let trimmed = path.trim_start_matches('/'); Some(trimmed.matches('/').count() as u32 + 1) } else { if path == base { return Some(0); } let prefix = format!("{}/", base); path.strip_prefix(prefix.as_str()) .map(|rest| rest.matches('/').count() as u32 + 1) } } // ── Tests ──────────────────────────────────────────────────────── #[cfg(test)] mod tests { use super::*; #[test] fn test_path_depth_from_root() { assert_eq!(path_depth_from("/", "/"), Some(0)); assert_eq!(path_depth_from("/", "/a"), Some(1)); assert_eq!(path_depth_from("/", "/a/b"), Some(2)); assert_eq!(path_depth_from("/", "/a/b/c"), Some(3)); } #[test] fn test_path_depth_from_subdir() { assert_eq!(path_depth_from("/movies", "/movies"), Some(0)); assert_eq!(path_depth_from("/movies", "/movies/action"), Some(1)); assert_eq!(path_depth_from("/movies", "/movies/action/marvel"), Some(2)); assert_eq!(path_depth_from("/movies", "/music"), None); assert_eq!(path_depth_from("/movies", "/movies-extra"), None); assert_eq!(path_depth_from("/movies", "/"), None); } }