Added prefetch

This commit is contained in:
2026-03-13 18:00:43 +00:00
parent eaf1f549b8
commit 078698154e

View File

@@ -22,8 +22,9 @@ pub struct WatchedTree {
/// Virtual-path → directory entries, e.g. "/" → [...], "/movies" → [...]. /// Virtual-path → directory entries, e.g. "/" → [...], "/movies" → [...].
snapshot: Arc<RwLock<HashMap<String, Vec<DirEntry>>>>, snapshot: Arc<RwLock<HashMap<String, Vec<DirEntry>>>>,
change_tx: broadcast::Sender<ChangeEvent>, change_tx: broadcast::Sender<ChangeEvent>,
/// Kept alive to continue watching; never accessed after construction. /// Kept alive to continue watching. Shared with the event handler so it can
_watcher: Mutex<RecommendedWatcher>, /// add new watches when directories are created at runtime.
_watcher: Arc<Mutex<RecommendedWatcher>>,
} }
impl WatchedTree { impl WatchedTree {
@@ -32,33 +33,47 @@ impl WatchedTree {
Arc::new(RwLock::new(HashMap::new())); Arc::new(RwLock::new(HashMap::new()));
let (change_tx, _) = broadcast::channel(BROADCAST_CAPACITY); let (change_tx, _) = broadcast::channel(BROADCAST_CAPACITY);
// Build the initial snapshot synchronously before accepting requests. // Build snapshot, collect the absolute paths that were walked so we can watch them.
walk_tree(&root, INITIAL_DEPTH, &snapshot).await; let watched_dirs = walk_tree(&root, INITIAL_DEPTH, &snapshot).await;
info!( info!(
"WatchedTree: snapshot ready ({} directories, depth {})", "WatchedTree: snapshot ready ({} directories, {} inotify watches)",
snapshot.read().await.len(), snapshot.read().await.len(),
INITIAL_DEPTH watched_dirs.len(),
); );
// Bridge notify's sync callback → async tokio task. // Bridge notify's sync callback → async tokio task.
let (notify_tx, mut notify_rx) = let (notify_tx, mut notify_rx) =
tokio::sync::mpsc::unbounded_channel::<notify::Result<notify::Event>>(); tokio::sync::mpsc::unbounded_channel::<notify::Result<notify::Event>>();
let mut watcher = RecommendedWatcher::new( let watcher = Arc::new(Mutex::new(RecommendedWatcher::new(
move |res| { move |res| {
let _ = notify_tx.send(res); let _ = notify_tx.send(res);
}, },
Config::default(), Config::default(),
)?; )?));
watcher.watch(&root, RecursiveMode::Recursive)?;
// Add one non-recursive inotify watch per directory in the snapshot.
// Non-recursive avoids blowing through the kernel's inotify watch limit
// on large trees (default: 8192 watches).
{
let mut w = watcher.lock().unwrap();
for dir_abs in &watched_dirs {
if let Err(e) = w.watch(dir_abs, RecursiveMode::NonRecursive) {
warn!("watch failed for {:?}: {}", dir_abs, e);
}
}
}
// Process FS events asynchronously. // Process FS events asynchronously.
let snapshot_bg = Arc::clone(&snapshot); let snapshot_bg = Arc::clone(&snapshot);
let root_bg = root.clone(); let root_bg = root.clone();
let tx_bg = change_tx.clone(); let tx_bg = change_tx.clone();
let watcher_bg = Arc::clone(&watcher);
tokio::spawn(async move { tokio::spawn(async move {
while let Some(res) = notify_rx.recv().await { while let Some(res) = notify_rx.recv().await {
match res { match res {
Ok(event) => handle_fs_event(event, &root_bg, &snapshot_bg, &tx_bg).await, Ok(event) => {
handle_fs_event(event, &root_bg, &snapshot_bg, &tx_bg, &watcher_bg).await
}
Err(e) => warn!("notify error: {}", e), Err(e) => warn!("notify error: {}", e),
} }
} }
@@ -67,7 +82,7 @@ impl WatchedTree {
Ok(Self { Ok(Self {
snapshot, snapshot,
change_tx, change_tx,
_watcher: Mutex::new(watcher), _watcher: watcher,
}) })
} }
@@ -75,9 +90,7 @@ impl WatchedTree {
pub async fn get_snapshot(&self, base: &str, depth: u32) -> Vec<(String, Vec<DirEntry>)> { pub async fn get_snapshot(&self, base: &str, depth: u32) -> Vec<(String, Vec<DirEntry>)> {
let snap = self.snapshot.read().await; let snap = self.snapshot.read().await;
snap.iter() snap.iter()
.filter(|(path, _)| { .filter(|(path, _)| path_depth_from(base, path).map_or(false, |d| d <= depth))
path_depth_from(base, path).map_or(false, |d| d <= depth)
})
.map(|(path, entries)| (path.clone(), entries.clone())) .map(|(path, entries)| (path.clone(), entries.clone()))
.collect() .collect()
} }
@@ -90,11 +103,15 @@ impl WatchedTree {
// ── Helpers ────────────────────────────────────────────────────── // ── Helpers ──────────────────────────────────────────────────────
/// Iterative BFS walk: reads the real filesystem and populates the snapshot. /// Iterative BFS walk: reads the real filesystem and populates the snapshot.
/// Skips hidden directories (names starting with '.') to avoid walking
/// .cargo, .local, .config etc. in home directories.
/// Returns the list of absolute directory paths that were walked (for inotify setup).
async fn walk_tree( async fn walk_tree(
root: &Path, root: &Path,
max_depth: u32, max_depth: u32,
snapshot: &Arc<RwLock<HashMap<String, Vec<DirEntry>>>>, snapshot: &Arc<RwLock<HashMap<String, Vec<DirEntry>>>>,
) { ) -> Vec<PathBuf> {
let mut walked: Vec<PathBuf> = Vec::new();
// Queue of (abs_path, virt_path, depth_from_root) // Queue of (abs_path, virt_path, depth_from_root)
let mut queue: VecDeque<(PathBuf, String, u32)> = VecDeque::new(); let mut queue: VecDeque<(PathBuf, String, u32)> = VecDeque::new();
queue.push_back((root.to_path_buf(), "/".to_string(), 0)); queue.push_back((root.to_path_buf(), "/".to_string(), 0));
@@ -119,10 +136,17 @@ async fn walk_tree(
} else if ft.is_file() { } else if ft.is_file() {
8 8
} else { } else {
continue continue;
}; };
let name = entry.file_name().to_string_lossy().into_owned(); let name = entry.file_name().to_string_lossy().into_owned();
// Skip hidden directories — they typically hold tooling caches
// (.cargo, .local, .config, .rustup…) that are irrelevant for
// serving media and would blow through the inotify watch limit.
if ft.is_dir() && name.starts_with('.') {
continue;
}
if ft.is_dir() && depth < max_depth { if ft.is_dir() && depth < max_depth {
let child_virt = child_virt_path(&virt_path, &name); let child_virt = child_virt_path(&virt_path, &name);
queue.push_back((entry.path(), child_virt, depth + 1)); queue.push_back((entry.path(), child_virt, depth + 1));
@@ -131,7 +155,10 @@ async fn walk_tree(
} }
snapshot.write().await.insert(virt_path, entries); snapshot.write().await.insert(virt_path, entries);
walked.push(abs_path);
} }
walked
} }
/// Re-reads a single directory and updates its snapshot entry. /// Re-reads a single directory and updates its snapshot entry.
@@ -168,6 +195,7 @@ async fn handle_fs_event(
root: &Path, root: &Path,
snapshot: &Arc<RwLock<HashMap<String, Vec<DirEntry>>>>, snapshot: &Arc<RwLock<HashMap<String, Vec<DirEntry>>>>,
tx: &broadcast::Sender<ChangeEvent>, tx: &broadcast::Sender<ChangeEvent>,
watcher: &Arc<Mutex<RecommendedWatcher>>,
) { ) {
use notify::EventKind; use notify::EventKind;
@@ -179,6 +207,19 @@ async fn handle_fs_event(
}; };
let kind_i32 = proto_kind as i32; let kind_i32 = proto_kind as i32;
// If a new directory appeared, start watching it immediately so we don't
// miss events inside it (non-recursive mode requires explicit per-dir watches).
if matches!(event.kind, EventKind::Create(_)) {
for path in &event.paths {
if path.is_dir() && !path.file_name().map_or(false, |n| n.to_string_lossy().starts_with('.')) {
let mut w = watcher.lock().unwrap();
if let Err(e) = w.watch(path, RecursiveMode::NonRecursive) {
warn!("failed to add watch for new dir {:?}: {}", path, e);
}
}
}
}
// Collect unique parent directories that need refreshing. // Collect unique parent directories that need refreshing.
let mut parents: HashSet<PathBuf> = HashSet::new(); let mut parents: HashSet<PathBuf> = HashSet::new();
for path in &event.paths { for path in &event.paths {