From 28fd5403df2b5f3ad439792d233e846668aed06d Mon Sep 17 00:00:00 2001 From: AB-UK Date: Fri, 13 Mar 2026 18:35:26 +0000 Subject: [PATCH] Added prefetch --- furumi-client-core/src/client.rs | 33 ++++-- furumi-common/proto/virtualfs.proto | 4 + furumi-server/src/server.rs | 11 +- furumi-server/src/tree.rs | 154 ++++++++++++++++------------ 4 files changed, 126 insertions(+), 76 deletions(-) diff --git a/furumi-client-core/src/client.rs b/furumi-client-core/src/client.rs index 15a8677..8a96e2e 100644 --- a/furumi-client-core/src/client.rs +++ b/furumi-client-core/src/client.rs @@ -233,7 +233,7 @@ impl FurumiClient { }); } - /// Fetches the server's pre-built directory snapshot and populates `dir_cache`. + /// Fetches the server's pre-built directory snapshot and populates both caches. /// Returns the number of directories loaded. async fn load_snapshot(&self, path: &str, depth: u32) -> Result { debug!("snapshot: requesting path={} depth={}", path, depth); @@ -243,18 +243,37 @@ impl FurumiClient { depth, }); let mut stream = client.get_snapshot(req).await?.into_inner(); - let mut count = 0; + let mut dirs = 0; + let mut attrs_warmed = 0; while let Some(entry) = stream.next().await { let entry = entry?; - let n = entry.children.len(); - trace!("snapshot: got dir '{}' ({} entries)", entry.path, n); + trace!("snapshot: got dir '{}' ({} entries)", entry.path, entry.children.len()); + + // Warm attr_cache for the directory itself. + if let Some(dir_attr) = entry.dir_attr { + self.attr_cache.insert(entry.path.clone(), dir_attr).await; + attrs_warmed += 1; + } + + // Warm attr_cache for each child (parallel slice: children[i] ↔ child_attrs[i]). + for (child, attr) in entry.children.iter().zip(entry.child_attrs.iter()) { + let child_path = if entry.path == "/" { + format!("/{}", child.name) + } else { + format!("{}/{}", entry.path, child.name) + }; + self.attr_cache.insert(child_path, attr.clone()).await; + attrs_warmed += 1; + } + + // Populate dir_cache. self.dir_cache .insert(entry.path, Arc::new(entry.children)) .await; - count += 1; + dirs += 1; } - debug!("snapshot: inserted {} dirs into dir_cache", count); - Ok(count) + debug!("snapshot: {} dirs → dir_cache, {} attrs → attr_cache", dirs, attrs_warmed); + Ok(dirs) } /// Subscribes to the server's live change events and invalidates `dir_cache` entries. diff --git a/furumi-common/proto/virtualfs.proto b/furumi-common/proto/virtualfs.proto index d2ea232..c808083 100644 --- a/furumi-common/proto/virtualfs.proto +++ b/furumi-common/proto/virtualfs.proto @@ -39,9 +39,13 @@ message SnapshotRequest { } // One directory's contents within a snapshot response. +// child_attrs is parallel to children: child_attrs[i] is the AttrResponse for children[i]. +// dir_attr is the AttrResponse for the directory itself (path). message SnapshotEntry { string path = 1; repeated DirEntry children = 2; + repeated AttrResponse child_attrs = 3; + AttrResponse dir_attr = 4; } // Subscribe to live filesystem change notifications (no parameters needed). diff --git a/furumi-server/src/server.rs b/furumi-server/src/server.rs index 7f37a12..5175bfa 100644 --- a/furumi-server/src/server.rs +++ b/furumi-server/src/server.rs @@ -139,15 +139,20 @@ impl RemoteFileSystem for RemoteFileSystemImpl { let virt_path = sanitized_to_virt(&safe_path); let entries = self.tree.get_snapshot(&virt_path, req.depth).await; - let total_entries: usize = entries.iter().map(|(_, v)| v.len()).sum(); + let total_entries: usize = entries.iter().map(|(_, d)| d.children.len()).sum(); tracing::debug!( "GetSnapshot: path='{}' depth={} → {} dirs, {} total entries", virt_path, req.depth, entries.len(), total_entries ); let stream = async_stream::try_stream! { - for (path, children) in entries { - yield SnapshotEntry { path, children }; + for (path, snap_dir) in entries { + yield SnapshotEntry { + path, + children: snap_dir.children, + child_attrs: snap_dir.child_attrs, + dir_attr: Some(snap_dir.dir_attr), + }; } }; Ok(Response::new(Box::pin(stream) as Self::GetSnapshotStream)) diff --git a/furumi-server/src/tree.rs b/furumi-server/src/tree.rs index 4b7149d..a264563 100644 --- a/furumi-server/src/tree.rs +++ b/furumi-server/src/tree.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet, VecDeque}; +use std::os::unix::fs::MetadataExt; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; @@ -6,7 +7,7 @@ use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher}; use tokio::sync::{broadcast, RwLock}; use tracing::{debug, info, warn}; -use furumi_common::proto::{ChangeEvent, ChangeKind, DirEntry}; +use furumi_common::proto::{AttrResponse, ChangeEvent, ChangeKind, DirEntry}; /// How many directory levels to pre-walk on startup. const INITIAL_DEPTH: u32 = 3; @@ -14,13 +15,25 @@ const INITIAL_DEPTH: u32 = 3; /// Broadcast channel capacity — clients that fall behind lose events and rely on TTL. const BROADCAST_CAPACITY: usize = 256; +// ── Types ───────────────────────────────────────────────────────── + +/// One directory in the snapshot: its entries and the attr for each child. +/// `children` and `child_attrs` are parallel slices (same index = same file). +/// `dir_attr` is the attr of the directory itself. +#[derive(Clone)] +pub struct SnapDir { + pub children: Vec, + pub child_attrs: Vec, + pub dir_attr: AttrResponse, +} + // ── WatchedTree ────────────────────────────────────────────────── /// Maintains an in-memory snapshot of the directory tree and broadcasts /// change events to connected clients via inotify. pub struct WatchedTree { - /// Virtual-path → directory entries, e.g. "/" → [...], "/movies" → [...]. - snapshot: Arc>>>, + /// Virtual-path → (entries + attrs). + snapshot: Arc>>, change_tx: broadcast::Sender, /// Kept alive to continue watching. Shared with the event handler so it can /// add new watches when directories are created at runtime. @@ -29,18 +42,19 @@ pub struct WatchedTree { impl WatchedTree { pub async fn new(root: PathBuf) -> anyhow::Result { - let snapshot: Arc>>> = + let snapshot: Arc>> = Arc::new(RwLock::new(HashMap::new())); let (change_tx, _) = broadcast::channel(BROADCAST_CAPACITY); - // Build snapshot, collect the absolute paths that were walked so we can watch them. info!("WatchedTree: walking '{}' (depth {})…", root.display(), INITIAL_DEPTH); let t = std::time::Instant::now(); let watched_dirs = walk_tree(&root, INITIAL_DEPTH, &snapshot).await; let snap_len = snapshot.read().await.len(); + let total_entries: usize = snapshot.read().await.values().map(|d| d.children.len()).sum(); info!( - "WatchedTree: snapshot ready — {} directories, {} inotify watches, took {:.1}s", + "WatchedTree: snapshot ready — {} dirs, {} entries, {} watches, took {:.1}s", snap_len, + total_entries, watched_dirs.len(), t.elapsed().as_secs_f32(), ); @@ -56,8 +70,6 @@ impl WatchedTree { )?)); // Add one non-recursive inotify watch per directory in the snapshot. - // Non-recursive avoids blowing through the kernel's inotify watch limit - // on large trees (default: 8192 watches). { let mut w = watcher.lock().unwrap(); for dir_abs in &watched_dirs { @@ -67,7 +79,6 @@ impl WatchedTree { } } - // Process FS events asynchronously. let snapshot_bg = Arc::clone(&snapshot); let root_bg = root.clone(); let tx_bg = change_tx.clone(); @@ -90,12 +101,12 @@ impl WatchedTree { }) } - /// Returns all snapshot entries whose virtual path is within `depth` levels of `base`. - pub async fn get_snapshot(&self, base: &str, depth: u32) -> Vec<(String, Vec)> { + /// Returns all snapshot entries within `depth` levels of `base`. + pub async fn get_snapshot(&self, base: &str, depth: u32) -> Vec<(String, SnapDir)> { let snap = self.snapshot.read().await; snap.iter() .filter(|(path, _)| path_depth_from(base, path).map_or(false, |d| d <= depth)) - .map(|(path, entries)| (path.clone(), entries.clone())) + .map(|(path, snap_dir)| (path.clone(), snap_dir.clone())) .collect() } @@ -106,21 +117,35 @@ impl WatchedTree { // ── Helpers ────────────────────────────────────────────────────── -/// Iterative BFS walk: reads the real filesystem and populates the snapshot. -/// Skips hidden directories (names starting with '.') to avoid walking -/// .cargo, .local, .config etc. in home directories. -/// Returns the list of absolute directory paths that were walked (for inotify setup). +fn metadata_to_attr(meta: &std::fs::Metadata) -> AttrResponse { + AttrResponse { + size: meta.len(), + mode: meta.mode(), + mtime: meta.mtime() as u64, + } +} + +/// BFS walk: reads filesystem, stores entries + attrs in snapshot. +/// Returns the list of absolute paths walked (for inotify setup). async fn walk_tree( root: &Path, max_depth: u32, - snapshot: &Arc>>>, + snapshot: &Arc>>, ) -> Vec { let mut walked: Vec = Vec::new(); - // Queue of (abs_path, virt_path, depth_from_root) let mut queue: VecDeque<(PathBuf, String, u32)> = VecDeque::new(); queue.push_back((root.to_path_buf(), "/".to_string(), 0)); while let Some((abs_path, virt_path, depth)) = queue.pop_front() { + // Stat the directory itself. + let dir_attr = match std::fs::metadata(&abs_path) { + Ok(m) => metadata_to_attr(&m), + Err(e) => { + warn!("walk_tree: cannot stat {:?}: {}", abs_path, e); + continue; + } + }; + let mut dir = match tokio::fs::read_dir(&abs_path).await { Ok(d) => d, Err(e) => { @@ -129,24 +154,21 @@ async fn walk_tree( } }; - let mut entries = Vec::new(); + let mut children: Vec = Vec::new(); + let mut child_attrs: Vec = Vec::new(); while let Ok(Some(entry)) = dir.next_entry().await { - let Ok(ft) = entry.file_type().await else { - continue; - }; - let type_val = if ft.is_dir() { - 4 - } else if ft.is_file() { - 8 - } else { - continue; - }; + let Ok(ft) = entry.file_type().await else { continue }; + let type_val = if ft.is_dir() { 4 } else if ft.is_file() { 8 } else { continue }; let name = entry.file_name().to_string_lossy().into_owned(); - // Skip hidden directories — they typically hold tooling caches - // (.cargo, .local, .config, .rustup…) that are irrelevant for - // serving media and would blow through the inotify watch limit. + // Stat the child. + let attr = match entry.metadata().await { + Ok(m) => metadata_to_attr(&m), + Err(_) => AttrResponse { size: 0, mode: 0, mtime: 0 }, + }; + + // Skip hidden directories to avoid exploding the watch list. if ft.is_dir() && name.starts_with('.') { continue; } @@ -155,10 +177,12 @@ async fn walk_tree( let child_virt = child_virt_path(&virt_path, &name); queue.push_back((entry.path(), child_virt, depth + 1)); } - entries.push(DirEntry { name, r#type: type_val }); + + children.push(DirEntry { name, r#type: type_val }); + child_attrs.push(attr); } - snapshot.write().await.insert(virt_path, entries); + snapshot.write().await.insert(virt_path, SnapDir { children, child_attrs, dir_attr }); walked.push(abs_path); } @@ -169,35 +193,48 @@ async fn walk_tree( async fn refresh_dir( abs_path: &Path, virt_path: &str, - snapshot: &Arc>>>, + snapshot: &Arc>>, ) { - let mut dir = match tokio::fs::read_dir(abs_path).await { - Ok(d) => d, + let dir_attr = match std::fs::metadata(abs_path) { + Ok(m) => metadata_to_attr(&m), Err(_) => { - // Directory was deleted — remove it from the snapshot. snapshot.write().await.remove(virt_path); return; } }; - let mut entries = Vec::new(); + let mut dir = match tokio::fs::read_dir(abs_path).await { + Ok(d) => d, + Err(_) => { + snapshot.write().await.remove(virt_path); + return; + } + }; + + let mut children: Vec = Vec::new(); + let mut child_attrs: Vec = Vec::new(); + while let Ok(Some(entry)) = dir.next_entry().await { - let Ok(ft) = entry.file_type().await else { - continue; - }; + let Ok(ft) = entry.file_type().await else { continue }; let type_val = if ft.is_dir() { 4 } else if ft.is_file() { 8 } else { continue }; - entries.push(DirEntry { + let attr = match entry.metadata().await { + Ok(m) => metadata_to_attr(&m), + Err(_) => AttrResponse { size: 0, mode: 0, mtime: 0 }, + }; + children.push(DirEntry { name: entry.file_name().to_string_lossy().into_owned(), r#type: type_val, }); + child_attrs.push(attr); } - snapshot.write().await.insert(virt_path.to_string(), entries); + + snapshot.write().await.insert(virt_path.to_string(), SnapDir { children, child_attrs, dir_attr }); } async fn handle_fs_event( event: notify::Event, root: &Path, - snapshot: &Arc>>>, + snapshot: &Arc>>, tx: &broadcast::Sender, watcher: &Arc>, ) { @@ -211,11 +248,11 @@ async fn handle_fs_event( }; let kind_i32 = proto_kind as i32; - // If a new directory appeared, start watching it immediately so we don't - // miss events inside it (non-recursive mode requires explicit per-dir watches). if matches!(event.kind, EventKind::Create(_)) { for path in &event.paths { - if path.is_dir() && !path.file_name().map_or(false, |n| n.to_string_lossy().starts_with('.')) { + if path.is_dir() + && !path.file_name().map_or(false, |n| n.to_string_lossy().starts_with('.')) + { let mut w = watcher.lock().unwrap(); if let Err(e) = w.watch(path, RecursiveMode::NonRecursive) { warn!("failed to add watch for new dir {:?}: {}", path, e); @@ -224,7 +261,6 @@ async fn handle_fs_event( } } - // Collect unique parent directories that need refreshing. let mut parents: HashSet = HashSet::new(); for path in &event.paths { if let Some(parent) = path.parent() { @@ -251,36 +287,22 @@ fn abs_to_virt(root: &Path, abs: &Path) -> String { } fn child_virt_path(parent: &str, name: &str) -> String { - if parent == "/" { - format!("/{}", name) - } else { - format!("{}/{}", parent, name) - } + if parent == "/" { format!("/{}", name) } else { format!("{}/{}", parent, name) } } -/// Returns how many levels `path` is below `base`, or `None` if `path` is not under `base`. -/// -/// Examples (base="/"): "/" → 0, "/a" → 1, "/a/b" → 2 -/// Examples (base="/a"): "/a" → 0, "/a/b" → 1, "/other" → None fn path_depth_from(base: &str, path: &str) -> Option { if base == "/" { - if path == "/" { - return Some(0); - } + if path == "/" { return Some(0); } let trimmed = path.trim_start_matches('/'); Some(trimmed.matches('/').count() as u32 + 1) } else { - if path == base { - return Some(0); - } + if path == base { return Some(0); } let prefix = format!("{}/", base); path.strip_prefix(prefix.as_str()) .map(|rest| rest.matches('/').count() as u32 + 1) } } -// ── Tests ──────────────────────────────────────────────────────── - #[cfg(test)] mod tests { use super::*;