From 078698154e753da0755eeb50c1f944ce961307c8 Mon Sep 17 00:00:00 2001 From: AB-UK Date: Fri, 13 Mar 2026 18:00:43 +0000 Subject: [PATCH] Added prefetch --- furumi-server/src/tree.rs | 73 ++++++++++++++++++++++++++++++--------- 1 file changed, 57 insertions(+), 16 deletions(-) diff --git a/furumi-server/src/tree.rs b/furumi-server/src/tree.rs index 8c89ef1..382db16 100644 --- a/furumi-server/src/tree.rs +++ b/furumi-server/src/tree.rs @@ -22,8 +22,9 @@ pub struct WatchedTree { /// Virtual-path → directory entries, e.g. "/" → [...], "/movies" → [...]. snapshot: Arc>>>, change_tx: broadcast::Sender, - /// Kept alive to continue watching; never accessed after construction. - _watcher: Mutex, + /// Kept alive to continue watching. Shared with the event handler so it can + /// add new watches when directories are created at runtime. + _watcher: Arc>, } impl WatchedTree { @@ -32,33 +33,47 @@ impl WatchedTree { Arc::new(RwLock::new(HashMap::new())); let (change_tx, _) = broadcast::channel(BROADCAST_CAPACITY); - // Build the initial snapshot synchronously before accepting requests. - walk_tree(&root, INITIAL_DEPTH, &snapshot).await; + // Build snapshot, collect the absolute paths that were walked so we can watch them. + let watched_dirs = walk_tree(&root, INITIAL_DEPTH, &snapshot).await; info!( - "WatchedTree: snapshot ready ({} directories, depth {})", + "WatchedTree: snapshot ready ({} directories, {} inotify watches)", snapshot.read().await.len(), - INITIAL_DEPTH + watched_dirs.len(), ); // Bridge notify's sync callback → async tokio task. let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel::>(); - let mut watcher = RecommendedWatcher::new( + let watcher = Arc::new(Mutex::new(RecommendedWatcher::new( move |res| { let _ = notify_tx.send(res); }, Config::default(), - )?; - watcher.watch(&root, RecursiveMode::Recursive)?; + )?)); + + // Add one non-recursive inotify watch per directory in the snapshot. + // Non-recursive avoids blowing through the kernel's inotify watch limit + // on large trees (default: 8192 watches). + { + let mut w = watcher.lock().unwrap(); + for dir_abs in &watched_dirs { + if let Err(e) = w.watch(dir_abs, RecursiveMode::NonRecursive) { + warn!("watch failed for {:?}: {}", dir_abs, e); + } + } + } // Process FS events asynchronously. let snapshot_bg = Arc::clone(&snapshot); let root_bg = root.clone(); let tx_bg = change_tx.clone(); + let watcher_bg = Arc::clone(&watcher); tokio::spawn(async move { while let Some(res) = notify_rx.recv().await { match res { - Ok(event) => handle_fs_event(event, &root_bg, &snapshot_bg, &tx_bg).await, + Ok(event) => { + handle_fs_event(event, &root_bg, &snapshot_bg, &tx_bg, &watcher_bg).await + } Err(e) => warn!("notify error: {}", e), } } @@ -67,7 +82,7 @@ impl WatchedTree { Ok(Self { snapshot, change_tx, - _watcher: Mutex::new(watcher), + _watcher: watcher, }) } @@ -75,9 +90,7 @@ impl WatchedTree { pub async fn get_snapshot(&self, base: &str, depth: u32) -> Vec<(String, Vec)> { let snap = self.snapshot.read().await; snap.iter() - .filter(|(path, _)| { - path_depth_from(base, path).map_or(false, |d| d <= depth) - }) + .filter(|(path, _)| path_depth_from(base, path).map_or(false, |d| d <= depth)) .map(|(path, entries)| (path.clone(), entries.clone())) .collect() } @@ -90,11 +103,15 @@ impl WatchedTree { // ── Helpers ────────────────────────────────────────────────────── /// Iterative BFS walk: reads the real filesystem and populates the snapshot. +/// Skips hidden directories (names starting with '.') to avoid walking +/// .cargo, .local, .config etc. in home directories. +/// Returns the list of absolute directory paths that were walked (for inotify setup). async fn walk_tree( root: &Path, max_depth: u32, snapshot: &Arc>>>, -) { +) -> Vec { + let mut walked: Vec = Vec::new(); // Queue of (abs_path, virt_path, depth_from_root) let mut queue: VecDeque<(PathBuf, String, u32)> = VecDeque::new(); queue.push_back((root.to_path_buf(), "/".to_string(), 0)); @@ -119,10 +136,17 @@ async fn walk_tree( } else if ft.is_file() { 8 } else { - continue + continue; }; let name = entry.file_name().to_string_lossy().into_owned(); + // Skip hidden directories — they typically hold tooling caches + // (.cargo, .local, .config, .rustup…) that are irrelevant for + // serving media and would blow through the inotify watch limit. + if ft.is_dir() && name.starts_with('.') { + continue; + } + if ft.is_dir() && depth < max_depth { let child_virt = child_virt_path(&virt_path, &name); queue.push_back((entry.path(), child_virt, depth + 1)); @@ -131,7 +155,10 @@ async fn walk_tree( } snapshot.write().await.insert(virt_path, entries); + walked.push(abs_path); } + + walked } /// Re-reads a single directory and updates its snapshot entry. @@ -168,6 +195,7 @@ async fn handle_fs_event( root: &Path, snapshot: &Arc>>>, tx: &broadcast::Sender, + watcher: &Arc>, ) { use notify::EventKind; @@ -179,6 +207,19 @@ async fn handle_fs_event( }; let kind_i32 = proto_kind as i32; + // If a new directory appeared, start watching it immediately so we don't + // miss events inside it (non-recursive mode requires explicit per-dir watches). + if matches!(event.kind, EventKind::Create(_)) { + for path in &event.paths { + if path.is_dir() && !path.file_name().map_or(false, |n| n.to_string_lossy().starts_with('.')) { + let mut w = watcher.lock().unwrap(); + if let Err(e) = w.watch(path, RecursiveMode::NonRecursive) { + warn!("failed to add watch for new dir {:?}: {}", path, e); + } + } + } + } + // Collect unique parent directories that need refreshing. let mut parents: HashSet = HashSet::new(); for path in &event.paths {