From ec4bbd2c8c89ca8679a8bd97d020b5702fb09d21 Mon Sep 17 00:00:00 2001 From: AB-UK Date: Fri, 13 Mar 2026 18:23:26 +0000 Subject: [PATCH] Added prefetch --- furumi-client-core/src/client.rs | 72 ++++++++++++++++++++++++-------- furumi-server/src/server.rs | 5 +++ furumi-server/src/tree.rs | 8 +++- 3 files changed, 66 insertions(+), 19 deletions(-) diff --git a/furumi-client-core/src/client.rs b/furumi-client-core/src/client.rs index 28e4301..15a8677 100644 --- a/furumi-client-core/src/client.rs +++ b/furumi-client-core/src/client.rs @@ -15,7 +15,7 @@ use tonic::codegen::InterceptedService; use tonic::metadata::MetadataValue; use tonic::transport::{Channel, Endpoint, Uri}; use tonic::{Request, Status}; -use tracing::{debug, info, warn}; +use tracing::{debug, info, warn, trace}; // ── Auth interceptor ─────────────────────────────────────────── @@ -200,22 +200,32 @@ impl FurumiClient { /// subscribe to live change events to keep it up to date. /// Must be called after a successful authentication check. pub fn start_background_sync(&self) { + info!("background sync: starting"); let this = self.clone(); tokio::spawn(async move { + let t = std::time::Instant::now(); match this.load_snapshot("/", 3).await { - Ok(n) => info!("directory snapshot loaded ({} directories)", n), + Ok(n) => info!( + "background sync: snapshot loaded — {} directories in {:.1}s", + n, + t.elapsed().as_secs_f32() + ), Err(e) => { - warn!("GetSnapshot unavailable (old server?): {}", e); + warn!("background sync: GetSnapshot failed ({}), falling back to on-demand caching", e); return; } } + info!("background sync: subscribing to change events"); // Reconnect loop: if the watch stream drops, reconnect after a short delay. loop { match this.run_watch_loop().await { - Ok(()) => break, // server closed the stream cleanly + Ok(()) => { + info!("background sync: WatchChanges stream closed cleanly"); + break; + } Err(e) => { - debug!("WatchChanges disconnected: {}, reconnecting in 5s", e); + warn!("background sync: WatchChanges error ({}), reconnecting in 5s", e); tokio::time::sleep(Duration::from_secs(5)).await; } } @@ -226,6 +236,7 @@ impl FurumiClient { /// Fetches the server's pre-built directory snapshot and populates `dir_cache`. /// Returns the number of directories loaded. async fn load_snapshot(&self, path: &str, depth: u32) -> Result { + debug!("snapshot: requesting path={} depth={}", path, depth); let mut client = self.client.clone(); let req = tonic::Request::new(SnapshotRequest { path: path.to_string(), @@ -235,11 +246,14 @@ impl FurumiClient { let mut count = 0; while let Some(entry) = stream.next().await { let entry = entry?; + let n = entry.children.len(); + trace!("snapshot: got dir '{}' ({} entries)", entry.path, n); self.dir_cache .insert(entry.path, Arc::new(entry.children)) .await; count += 1; } + debug!("snapshot: inserted {} dirs into dir_cache", count); Ok(count) } @@ -251,7 +265,7 @@ impl FurumiClient { let mut stream = client.watch_changes(req).await?.into_inner(); while let Some(event) = stream.next().await { let event = event?; - debug!("cache invalidated (change event): {}", event.path); + debug!("watch: invalidating dir_cache for '{}'", event.path); self.dir_cache.invalidate(&event.path).await; } Ok(()) @@ -260,16 +274,17 @@ impl FurumiClient { /// Fetches file attributes from the server, utilizing an internal cache. pub async fn get_attr(&self, path: &str) -> Result { if let Some(attr) = self.attr_cache.get(path).await { + trace!("get_attr: cache hit '{}'", path); return Ok(attr); } - debug!("get_attr (cache miss): {}", path); + let t = std::time::Instant::now(); let mut client = self.client.clone(); let req = tonic::Request::new(PathRequest { path: path.to_string(), }); - let response = client.get_attr(req).await?.into_inner(); + debug!("get_attr: cache miss '{}' — rpc {:.1}ms", path, t.elapsed().as_secs_f32() * 1000.0); self.attr_cache.insert(path.to_string(), response.clone()).await; Ok(response) } @@ -277,6 +292,7 @@ impl FurumiClient { /// Fetches directory contents from gRPC and stores them in the cache. /// Does not trigger prefetch — safe to call from background tasks. async fn fetch_and_cache_dir(&self, path: &str) -> Result>> { + let t = std::time::Instant::now(); let mut client = self.client.clone(); let req = tonic::Request::new(PathRequest { path: path.to_string(), @@ -289,6 +305,12 @@ impl FurumiClient { entries.push(chunk?); } + debug!( + "fetch_dir: '{}' — {} entries in {:.1}ms", + path, + entries.len(), + t.elapsed().as_secs_f32() * 1000.0 + ); let entries = Arc::new(entries); self.dir_cache.insert(path.to_string(), entries.clone()).await; Ok(entries) @@ -299,11 +321,11 @@ impl FurumiClient { /// prefetch attributes and immediate subdirectory listings. pub async fn read_dir(&self, path: &str) -> Result> { if let Some(entries) = self.dir_cache.get(path).await { - debug!("read_dir (cache hit): {}", path); + debug!("read_dir: cache hit '{}' ({} entries)", path, entries.len()); return Ok((*entries).clone()); } - debug!("read_dir (cache miss): {}", path); + debug!("read_dir: cache miss '{}' — fetching from server", path); let entries = self.fetch_and_cache_dir(path).await?; let self_clone = self.clone(); @@ -318,6 +340,16 @@ impl FurumiClient { /// Background: warms attr_cache for all children and dir_cache for immediate subdirs. async fn prefetch_children(&self, parent: &str, entries: &[DirEntry]) { + let dirs: Vec<_> = entries.iter().filter(|e| e.r#type == 4).collect(); + let files: Vec<_> = entries.iter().filter(|e| e.r#type != 4).collect(); + debug!( + "prefetch: '{}' — warming {} attrs, {} subdirs", + parent, + entries.len(), + dirs.len().min(20) + ); + + // Warm attr_cache for all children. for entry in entries { let child_path = if parent == "/" { format!("/{}", entry.name) @@ -326,14 +358,13 @@ impl FurumiClient { }; let _ = self.get_attr(&child_path).await; } + let _ = files; // suppress unused warning - let subdirs: Vec<_> = entries - .iter() - .filter(|e| e.r#type == 4) - .take(20) - .collect(); - - for subdir in subdirs { + // Prefetch dir listings for immediate subdirs (up to 20). + let subdirs: Vec<_> = dirs.into_iter().take(20).collect(); + let mut fetched = 0; + let mut already_cached = 0; + for subdir in &subdirs { let child_path = if parent == "/" { format!("/{}", subdir.name) } else { @@ -341,8 +372,15 @@ impl FurumiClient { }; if self.dir_cache.get(&child_path).await.is_none() { let _ = self.fetch_and_cache_dir(&child_path).await; + fetched += 1; + } else { + already_cached += 1; } } + debug!( + "prefetch: '{}' done — {} subdirs fetched, {} already cached", + parent, fetched, already_cached + ); } /// Fetches file chunk stream from the server. diff --git a/furumi-server/src/server.rs b/furumi-server/src/server.rs index 43bc663..7f37a12 100644 --- a/furumi-server/src/server.rs +++ b/furumi-server/src/server.rs @@ -139,6 +139,11 @@ impl RemoteFileSystem for RemoteFileSystemImpl { let virt_path = sanitized_to_virt(&safe_path); let entries = self.tree.get_snapshot(&virt_path, req.depth).await; + let total_entries: usize = entries.iter().map(|(_, v)| v.len()).sum(); + tracing::debug!( + "GetSnapshot: path='{}' depth={} → {} dirs, {} total entries", + virt_path, req.depth, entries.len(), total_entries + ); let stream = async_stream::try_stream! { for (path, children) in entries { diff --git a/furumi-server/src/tree.rs b/furumi-server/src/tree.rs index 382db16..4b7149d 100644 --- a/furumi-server/src/tree.rs +++ b/furumi-server/src/tree.rs @@ -34,11 +34,15 @@ impl WatchedTree { let (change_tx, _) = broadcast::channel(BROADCAST_CAPACITY); // Build snapshot, collect the absolute paths that were walked so we can watch them. + info!("WatchedTree: walking '{}' (depth {})…", root.display(), INITIAL_DEPTH); + let t = std::time::Instant::now(); let watched_dirs = walk_tree(&root, INITIAL_DEPTH, &snapshot).await; + let snap_len = snapshot.read().await.len(); info!( - "WatchedTree: snapshot ready ({} directories, {} inotify watches)", - snapshot.read().await.len(), + "WatchedTree: snapshot ready — {} directories, {} inotify watches, took {:.1}s", + snap_len, watched_dirs.len(), + t.elapsed().as_secs_f32(), ); // Bridge notify's sync callback → async tokio task.