diff --git a/Cargo.lock b/Cargo.lock index 41ce587..d9c1fcf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -245,6 +245,12 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.11.0" @@ -461,7 +467,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e0e367e4e7da84520dedcac1901e4da967309406d1e51017ae1abfb97adbd38" dependencies = [ - "bitflags", + "bitflags 2.11.0", "block2", "libc", "objc2", @@ -583,6 +589,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "furumi-client-core" version = "0.2.1" @@ -666,6 +681,7 @@ dependencies = [ "futures-util", "jsonwebtoken", "libc", + "notify", "once_cell", "prometheus", "prost", @@ -1001,6 +1017,26 @@ dependencies = [ "serde_core", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -1058,6 +1094,26 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -1082,7 +1138,7 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a" dependencies = [ - "bitflags", + "bitflags 2.11.0", "libc", "plain", "redox_syscall 0.7.3", @@ -1148,6 +1204,18 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys 0.48.0", +] + [[package]] name = "mio" version = "1.1.1" @@ -1211,7 +1279,7 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ - "bitflags", + "bitflags 2.11.0", "cfg-if", "cfg_aliases", "libc", @@ -1223,7 +1291,7 @@ version = "0.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3" dependencies = [ - "bitflags", + "bitflags 2.11.0", "cfg-if", "cfg_aliases", "libc", @@ -1239,6 +1307,25 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "notify" +version = "6.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +dependencies = [ + "bitflags 2.11.0", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio 0.8.11", + "walkdir", + "windows-sys 0.48.0", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -1490,7 +1577,7 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc5b72d8145275d844d4b5f6d4e1eef00c8cd889edb6035c21675d1bb1f45c9f" dependencies = [ - "bitflags", + "bitflags 2.11.0", "hex", "procfs-core", "rustix 0.38.44", @@ -1502,7 +1589,7 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "239df02d8349b06fc07398a3a1697b06418223b1c7725085e801e7c0fc6a12ec" dependencies = [ - "bitflags", + "bitflags 2.11.0", "hex", ] @@ -1675,7 +1762,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags", + "bitflags 2.11.0", ] [[package]] @@ -1684,7 +1771,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16" dependencies = [ - "bitflags", + "bitflags 2.11.0", ] [[package]] @@ -1745,7 +1832,7 @@ version = "0.38.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ - "bitflags", + "bitflags 2.11.0", "errno", "libc", "linux-raw-sys 0.4.15", @@ -1758,7 +1845,7 @@ version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" dependencies = [ - "bitflags", + "bitflags 2.11.0", "errno", "libc", "linux-raw-sys 0.12.1", @@ -1835,6 +1922,15 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.28" @@ -1856,7 +1952,7 @@ version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" dependencies = [ - "bitflags", + "bitflags 2.11.0", "core-foundation", "core-foundation-sys", "libc", @@ -2181,7 +2277,7 @@ checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" dependencies = [ "bytes", "libc", - "mio", + "mio 1.1.1", "parking_lot", "pin-project-lite", "signal-hook-registry", @@ -2439,6 +2535,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -2545,7 +2651,7 @@ version = "0.244.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ - "bitflags", + "bitflags 2.11.0", "hashbrown 0.15.5", "indexmap 2.13.0", "semver", @@ -2576,6 +2682,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" @@ -2588,13 +2703,22 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -2606,34 +2730,67 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + [[package]] name = "windows-targets" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -2646,24 +2803,48 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -2728,7 +2909,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", - "bitflags", + "bitflags 2.11.0", "indexmap 2.13.0", "log", "serde", diff --git a/furumi-client-core/src/client.rs b/furumi-client-core/src/client.rs index 160cf8a..68b0473 100644 --- a/furumi-client-core/src/client.rs +++ b/furumi-client-core/src/client.rs @@ -1,7 +1,7 @@ use crate::error::{ClientError, Result}; use furumi_common::proto::{ remote_file_system_client::RemoteFileSystemClient, AttrResponse, DirEntry, FileChunk, - PathRequest, ReadRequest, + PathRequest, ReadRequest, SnapshotRequest, WatchRequest, }; use moka::future::Cache; use std::future::Future; @@ -15,7 +15,7 @@ use tonic::codegen::InterceptedService; use tonic::metadata::MetadataValue; use tonic::transport::{Channel, Endpoint, Uri}; use tonic::{Request, Status}; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; // ── Auth interceptor ─────────────────────────────────────────── @@ -127,6 +127,7 @@ impl tower::Service for InsecureTlsConnector { pub struct FurumiClient { client: GrpcClient, attr_cache: Cache, + dir_cache: Cache>>, } impl FurumiClient { @@ -187,7 +188,74 @@ impl FurumiClient { .time_to_live(Duration::from_secs(5)) .build(); - Ok(Self { client, attr_cache }) + let dir_cache = Cache::builder() + .max_capacity(10_000) + .time_to_live(Duration::from_secs(30)) + .build(); + + let this = Self { client, attr_cache, dir_cache }; + this.start_background_sync(); + Ok(this) + } + + /// Spawns background tasks that pre-warm the cache with a server snapshot and then + /// subscribe to live change events to keep it up to date. + fn start_background_sync(&self) { + let this = self.clone(); + tokio::spawn(async move { + match this.load_snapshot("/", 3).await { + Ok(n) => info!("directory snapshot loaded ({} directories)", n), + Err(e) => { + warn!("GetSnapshot unavailable (old server?): {}", e); + return; + } + } + + // Reconnect loop: if the watch stream drops, reconnect after a short delay. + loop { + match this.run_watch_loop().await { + Ok(()) => break, // server closed the stream cleanly + Err(e) => { + debug!("WatchChanges disconnected: {}, reconnecting in 5s", e); + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + } + }); + } + + /// Fetches the server's pre-built directory snapshot and populates `dir_cache`. + /// Returns the number of directories loaded. + async fn load_snapshot(&self, path: &str, depth: u32) -> Result { + let mut client = self.client.clone(); + let req = tonic::Request::new(SnapshotRequest { + path: path.to_string(), + depth, + }); + let mut stream = client.get_snapshot(req).await?.into_inner(); + let mut count = 0; + while let Some(entry) = stream.next().await { + let entry = entry?; + self.dir_cache + .insert(entry.path, Arc::new(entry.children)) + .await; + count += 1; + } + Ok(count) + } + + /// Subscribes to the server's live change events and invalidates `dir_cache` entries. + /// Returns when the stream closes or on error. + async fn run_watch_loop(&self) -> Result<()> { + let mut client = self.client.clone(); + let req = tonic::Request::new(WatchRequest {}); + let mut stream = client.watch_changes(req).await?.into_inner(); + while let Some(event) = stream.next().await { + let event = event?; + debug!("cache invalidated (change event): {}", event.path); + self.dir_cache.invalidate(&event.path).await; + } + Ok(()) } /// Fetches file attributes from the server, utilizing an internal cache. @@ -207,9 +275,9 @@ impl FurumiClient { Ok(response) } - /// Reads directory contents from the server stream. - pub async fn read_dir(&self, path: &str) -> Result> { - debug!("read_dir: {}", path); + /// Fetches directory contents from gRPC and stores them in the cache. + /// Does not trigger prefetch — safe to call from background tasks. + async fn fetch_and_cache_dir(&self, path: &str) -> Result>> { let mut client = self.client.clone(); let req = tonic::Request::new(PathRequest { path: path.to_string(), @@ -219,13 +287,65 @@ impl FurumiClient { let mut entries = Vec::new(); while let Some(chunk) = stream.next().await { - let entry = chunk?; - entries.push(entry); + entries.push(chunk?); } + let entries = Arc::new(entries); + self.dir_cache.insert(path.to_string(), entries.clone()).await; Ok(entries) } + /// Reads directory contents, utilizing an internal cache. + /// On cache miss, fetches from server and spawns a background task to + /// prefetch attributes and immediate subdirectory listings. + pub async fn read_dir(&self, path: &str) -> Result> { + if let Some(entries) = self.dir_cache.get(path).await { + debug!("read_dir (cache hit): {}", path); + return Ok((*entries).clone()); + } + + debug!("read_dir (cache miss): {}", path); + let entries = self.fetch_and_cache_dir(path).await?; + + let self_clone = self.clone(); + let path_clone = path.to_string(); + let entries_clone = entries.clone(); + tokio::spawn(async move { + self_clone.prefetch_children(&path_clone, &entries_clone).await; + }); + + Ok((*entries).clone()) + } + + /// Background: warms attr_cache for all children and dir_cache for immediate subdirs. + async fn prefetch_children(&self, parent: &str, entries: &[DirEntry]) { + for entry in entries { + let child_path = if parent == "/" { + format!("/{}", entry.name) + } else { + format!("{}/{}", parent, entry.name) + }; + let _ = self.get_attr(&child_path).await; + } + + let subdirs: Vec<_> = entries + .iter() + .filter(|e| e.r#type == 4) + .take(20) + .collect(); + + for subdir in subdirs { + let child_path = if parent == "/" { + format!("/{}", subdir.name) + } else { + format!("{}/{}", parent, subdir.name) + }; + if self.dir_cache.get(&child_path).await.is_none() { + let _ = self.fetch_and_cache_dir(&child_path).await; + } + } + } + /// Fetches file chunk stream from the server. pub async fn read_file( &self, diff --git a/furumi-common/proto/virtualfs.proto b/furumi-common/proto/virtualfs.proto index 04ee658..d2ea232 100644 --- a/furumi-common/proto/virtualfs.proto +++ b/furumi-common/proto/virtualfs.proto @@ -29,13 +29,51 @@ message FileChunk { bytes data = 1; } +// ── Snapshot & watch ────────────────────────────────────────────── + +// Request a pre-built snapshot of the directory tree up to `depth` levels. +// depth = 0 means only the requested path itself; depth = 1 includes immediate children, etc. +message SnapshotRequest { + string path = 1; + uint32 depth = 2; +} + +// One directory's contents within a snapshot response. +message SnapshotEntry { + string path = 1; + repeated DirEntry children = 2; +} + +// Subscribe to live filesystem change notifications (no parameters needed). +message WatchRequest {} + +enum ChangeKind { + CREATED = 0; + DELETED = 1; + MODIFIED = 2; +} + +// Notifies the client that the contents of `path` have changed. +message ChangeEvent { + string path = 1; + ChangeKind kind = 2; +} + service RemoteFileSystem { // Get file or directory attributes (size, permissions, timestamps). Maps to stat/getattr. rpc GetAttr (PathRequest) returns (AttrResponse); - + // List directory contents. Uses Server Streaming to handle massively large directories efficiently. rpc ReadDir (PathRequest) returns (stream DirEntry); - + // Read chunks of a file. Uses Server Streaming for efficient chunk delivery based on offset/size. rpc ReadFile (ReadRequest) returns (stream FileChunk); + + // Return a pre-built in-memory snapshot of the directory tree rooted at `path`. + // The server walks `depth` levels deep on its side — one round-trip fills the client cache. + rpc GetSnapshot (SnapshotRequest) returns (stream SnapshotEntry); + + // Subscribe to live filesystem change events. The server pushes a ChangeEvent whenever + // a directory's contents change, allowing the client to invalidate its cache immediately. + rpc WatchChanges (WatchRequest) returns (stream ChangeEvent); } diff --git a/furumi-mount-linux/src/fs.rs b/furumi-mount-linux/src/fs.rs index ab58648..7c473cf 100644 --- a/furumi-mount-linux/src/fs.rs +++ b/furumi-mount-linux/src/fs.rs @@ -9,7 +9,7 @@ use std::time::{Duration, UNIX_EPOCH}; use tracing::{debug, error}; use tokio::runtime::Handle; -const TTL: Duration = Duration::from_secs(1); // 1 second FUSE kernel TTL +const TTL: Duration = Duration::from_secs(5); // 5 second FUSE kernel TTL (matches attr_cache) // ── InodeMapper ────────────────────────────────────────────────── diff --git a/furumi-server/Cargo.toml b/furumi-server/Cargo.toml index 33c0aa5..69160db 100644 --- a/furumi-server/Cargo.toml +++ b/furumi-server/Cargo.toml @@ -27,6 +27,7 @@ prometheus = { version = "0.14.0", features = ["process"] } axum = { version = "0.7", features = ["tokio"] } once_cell = "1.21.3" rcgen = { version = "0.14.7", features = ["pem"] } +notify = "6" [dev-dependencies] tempfile = "3.26.0" diff --git a/furumi-server/src/main.rs b/furumi-server/src/main.rs index 95df840..b44b6f2 100644 --- a/furumi-server/src/main.rs +++ b/furumi-server/src/main.rs @@ -2,6 +2,7 @@ pub mod vfs; pub mod security; pub mod server; pub mod metrics; +pub mod tree; use std::net::SocketAddr; use std::path::PathBuf; @@ -74,7 +75,8 @@ async fn main() -> Result<(), Box> { } let vfs = Arc::new(LocalVfs::new(&root_path)); - let remote_fs = RemoteFileSystemImpl::new(vfs); + let tree = Arc::new(tree::WatchedTree::new(root_path.clone()).await?); + let remote_fs = RemoteFileSystemImpl::new(vfs, tree); let auth = AuthInterceptor::new(args.token.clone()); let svc = RemoteFileSystemServer::with_interceptor(remote_fs, auth.clone()); diff --git a/furumi-server/src/server.rs b/furumi-server/src/server.rs index 1142e04..43bc663 100644 --- a/furumi-server/src/server.rs +++ b/furumi-server/src/server.rs @@ -1,22 +1,27 @@ use std::pin::Pin; +use std::sync::Arc; + +use tokio::sync::broadcast; use tokio_stream::Stream; use tonic::{Request, Response, Status}; -use crate::vfs::VirtualFileSystem; use crate::metrics::{self, RequestTimer}; -use furumi_common::proto::{ - remote_file_system_server::RemoteFileSystem, AttrResponse, DirEntry, FileChunk, - PathRequest, ReadRequest, -}; use crate::security::sanitize_path; +use crate::tree::WatchedTree; +use crate::vfs::VirtualFileSystem; +use furumi_common::proto::{ + remote_file_system_server::RemoteFileSystem, AttrResponse, ChangeEvent, DirEntry, FileChunk, + PathRequest, ReadRequest, SnapshotEntry, SnapshotRequest, WatchRequest, +}; pub struct RemoteFileSystemImpl { - vfs: std::sync::Arc, + vfs: Arc, + tree: Arc, } impl RemoteFileSystemImpl { - pub fn new(vfs: std::sync::Arc) -> Self { - Self { vfs } + pub fn new(vfs: Arc, tree: Arc) -> Self { + Self { vfs, tree } } } @@ -43,11 +48,7 @@ impl RemoteFileSystem for RemoteFileSystemImpl { } } - type ReadDirStream = Pin< - Box< - dyn Stream> + Send + 'static, - >, - >; + type ReadDirStream = Pin> + Send + 'static>>; async fn read_dir( &self, @@ -78,11 +79,7 @@ impl RemoteFileSystem for RemoteFileSystemImpl { } } - type ReadFileStream = Pin< - Box< - dyn Stream> + Send + 'static, - >, - >; + type ReadFileStream = Pin> + Send + 'static>>; async fn read_file( &self, @@ -126,5 +123,64 @@ impl RemoteFileSystem for RemoteFileSystemImpl { } } } + + // ── Snapshot ───────────────────────────────────────────────── + + type GetSnapshotStream = + Pin> + Send + 'static>>; + + async fn get_snapshot( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let safe_path = sanitize_path(&req.path)?; + // sanitize_path strips the leading "/" — map "" back to "/" for snapshot lookup. + let virt_path = sanitized_to_virt(&safe_path); + + let entries = self.tree.get_snapshot(&virt_path, req.depth).await; + + let stream = async_stream::try_stream! { + for (path, children) in entries { + yield SnapshotEntry { path, children }; + } + }; + Ok(Response::new(Box::pin(stream) as Self::GetSnapshotStream)) + } + + // ── Watch ───────────────────────────────────────────────────── + + type WatchChangesStream = + Pin> + Send + 'static>>; + + async fn watch_changes( + &self, + _request: Request, + ) -> Result, Status> { + let mut rx = self.tree.subscribe(); + + let stream = async_stream::try_stream! { + loop { + match rx.recv().await { + Ok(event) => yield event, + Err(broadcast::error::RecvError::Lagged(n)) => { + // Client was too slow — it missed n events. + // Log and continue; the client's TTL will cover the gap. + tracing::warn!("WatchChanges client lagged, skipped {} events", n); + } + Err(broadcast::error::RecvError::Closed) => break, + } + } + }; + Ok(Response::new(Box::pin(stream) as Self::WatchChangesStream)) + } } +/// sanitize_path removes the leading "/" so "/" becomes "". Map it back. +fn sanitized_to_virt(safe: &str) -> String { + if safe.is_empty() { + "/".to_string() + } else { + format!("/{}", safe) + } +} diff --git a/furumi-server/src/tree.rs b/furumi-server/src/tree.rs new file mode 100644 index 0000000..8c89ef1 --- /dev/null +++ b/furumi-server/src/tree.rs @@ -0,0 +1,260 @@ +use std::collections::{HashMap, HashSet, VecDeque}; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; + +use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher}; +use tokio::sync::{broadcast, RwLock}; +use tracing::{debug, info, warn}; + +use furumi_common::proto::{ChangeEvent, ChangeKind, DirEntry}; + +/// How many directory levels to pre-walk on startup. +const INITIAL_DEPTH: u32 = 3; + +/// Broadcast channel capacity — clients that fall behind lose events and rely on TTL. +const BROADCAST_CAPACITY: usize = 256; + +// ── WatchedTree ────────────────────────────────────────────────── + +/// Maintains an in-memory snapshot of the directory tree and broadcasts +/// change events to connected clients via inotify. +pub struct WatchedTree { + /// Virtual-path → directory entries, e.g. "/" → [...], "/movies" → [...]. + snapshot: Arc>>>, + change_tx: broadcast::Sender, + /// Kept alive to continue watching; never accessed after construction. + _watcher: Mutex, +} + +impl WatchedTree { + pub async fn new(root: PathBuf) -> anyhow::Result { + let snapshot: Arc>>> = + Arc::new(RwLock::new(HashMap::new())); + let (change_tx, _) = broadcast::channel(BROADCAST_CAPACITY); + + // Build the initial snapshot synchronously before accepting requests. + walk_tree(&root, INITIAL_DEPTH, &snapshot).await; + info!( + "WatchedTree: snapshot ready ({} directories, depth {})", + snapshot.read().await.len(), + INITIAL_DEPTH + ); + + // Bridge notify's sync callback → async tokio task. + let (notify_tx, mut notify_rx) = + tokio::sync::mpsc::unbounded_channel::>(); + let mut watcher = RecommendedWatcher::new( + move |res| { + let _ = notify_tx.send(res); + }, + Config::default(), + )?; + watcher.watch(&root, RecursiveMode::Recursive)?; + + // Process FS events asynchronously. + let snapshot_bg = Arc::clone(&snapshot); + let root_bg = root.clone(); + let tx_bg = change_tx.clone(); + tokio::spawn(async move { + while let Some(res) = notify_rx.recv().await { + match res { + Ok(event) => handle_fs_event(event, &root_bg, &snapshot_bg, &tx_bg).await, + Err(e) => warn!("notify error: {}", e), + } + } + }); + + Ok(Self { + snapshot, + change_tx, + _watcher: Mutex::new(watcher), + }) + } + + /// Returns all snapshot entries whose virtual path is within `depth` levels of `base`. + pub async fn get_snapshot(&self, base: &str, depth: u32) -> Vec<(String, Vec)> { + let snap = self.snapshot.read().await; + snap.iter() + .filter(|(path, _)| { + path_depth_from(base, path).map_or(false, |d| d <= depth) + }) + .map(|(path, entries)| (path.clone(), entries.clone())) + .collect() + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.change_tx.subscribe() + } +} + +// ── Helpers ────────────────────────────────────────────────────── + +/// Iterative BFS walk: reads the real filesystem and populates the snapshot. +async fn walk_tree( + root: &Path, + max_depth: u32, + snapshot: &Arc>>>, +) { + // Queue of (abs_path, virt_path, depth_from_root) + let mut queue: VecDeque<(PathBuf, String, u32)> = VecDeque::new(); + queue.push_back((root.to_path_buf(), "/".to_string(), 0)); + + while let Some((abs_path, virt_path, depth)) = queue.pop_front() { + let mut dir = match tokio::fs::read_dir(&abs_path).await { + Ok(d) => d, + Err(e) => { + warn!("walk_tree: cannot read {:?}: {}", abs_path, e); + continue; + } + }; + + let mut entries = Vec::new(); + + while let Ok(Some(entry)) = dir.next_entry().await { + let Ok(ft) = entry.file_type().await else { + continue; + }; + let type_val = if ft.is_dir() { + 4 + } else if ft.is_file() { + 8 + } else { + continue + }; + let name = entry.file_name().to_string_lossy().into_owned(); + + if ft.is_dir() && depth < max_depth { + let child_virt = child_virt_path(&virt_path, &name); + queue.push_back((entry.path(), child_virt, depth + 1)); + } + entries.push(DirEntry { name, r#type: type_val }); + } + + snapshot.write().await.insert(virt_path, entries); + } +} + +/// Re-reads a single directory and updates its snapshot entry. +async fn refresh_dir( + abs_path: &Path, + virt_path: &str, + snapshot: &Arc>>>, +) { + let mut dir = match tokio::fs::read_dir(abs_path).await { + Ok(d) => d, + Err(_) => { + // Directory was deleted — remove it from the snapshot. + snapshot.write().await.remove(virt_path); + return; + } + }; + + let mut entries = Vec::new(); + while let Ok(Some(entry)) = dir.next_entry().await { + let Ok(ft) = entry.file_type().await else { + continue; + }; + let type_val = if ft.is_dir() { 4 } else if ft.is_file() { 8 } else { continue }; + entries.push(DirEntry { + name: entry.file_name().to_string_lossy().into_owned(), + r#type: type_val, + }); + } + snapshot.write().await.insert(virt_path.to_string(), entries); +} + +async fn handle_fs_event( + event: notify::Event, + root: &Path, + snapshot: &Arc>>>, + tx: &broadcast::Sender, +) { + use notify::EventKind; + + let proto_kind = match &event.kind { + EventKind::Create(_) => ChangeKind::Created, + EventKind::Remove(_) => ChangeKind::Deleted, + EventKind::Modify(_) => ChangeKind::Modified, + _ => return, + }; + let kind_i32 = proto_kind as i32; + + // Collect unique parent directories that need refreshing. + let mut parents: HashSet = HashSet::new(); + for path in &event.paths { + if let Some(parent) = path.parent() { + if parent.starts_with(root) { + parents.insert(parent.to_path_buf()); + } + } + } + + for parent_abs in parents { + let virt = abs_to_virt(root, &parent_abs); + debug!("snapshot refresh: {}", virt); + refresh_dir(&parent_abs, &virt, snapshot).await; + let _ = tx.send(ChangeEvent { path: virt, kind: kind_i32 }); + } +} + +fn abs_to_virt(root: &Path, abs: &Path) -> String { + match abs.strip_prefix(root) { + Ok(rel) if rel.as_os_str().is_empty() => "/".to_string(), + Ok(rel) => format!("/{}", rel.to_string_lossy()), + Err(_) => "/".to_string(), + } +} + +fn child_virt_path(parent: &str, name: &str) -> String { + if parent == "/" { + format!("/{}", name) + } else { + format!("{}/{}", parent, name) + } +} + +/// Returns how many levels `path` is below `base`, or `None` if `path` is not under `base`. +/// +/// Examples (base="/"): "/" → 0, "/a" → 1, "/a/b" → 2 +/// Examples (base="/a"): "/a" → 0, "/a/b" → 1, "/other" → None +fn path_depth_from(base: &str, path: &str) -> Option { + if base == "/" { + if path == "/" { + return Some(0); + } + let trimmed = path.trim_start_matches('/'); + Some(trimmed.matches('/').count() as u32 + 1) + } else { + if path == base { + return Some(0); + } + let prefix = format!("{}/", base); + path.strip_prefix(prefix.as_str()) + .map(|rest| rest.matches('/').count() as u32 + 1) + } +} + +// ── Tests ──────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_path_depth_from_root() { + assert_eq!(path_depth_from("/", "/"), Some(0)); + assert_eq!(path_depth_from("/", "/a"), Some(1)); + assert_eq!(path_depth_from("/", "/a/b"), Some(2)); + assert_eq!(path_depth_from("/", "/a/b/c"), Some(3)); + } + + #[test] + fn test_path_depth_from_subdir() { + assert_eq!(path_depth_from("/movies", "/movies"), Some(0)); + assert_eq!(path_depth_from("/movies", "/movies/action"), Some(1)); + assert_eq!(path_depth_from("/movies", "/movies/action/marvel"), Some(2)); + assert_eq!(path_depth_from("/movies", "/music"), None); + assert_eq!(path_depth_from("/movies", "/movies-extra"), None); + assert_eq!(path_depth_from("/movies", "/"), None); + } +}