5 Commits

Author SHA1 Message Date
28fd5403df Added prefetch 2026-03-13 18:35:26 +00:00
ec4bbd2c8c Added prefetch 2026-03-13 18:23:26 +00:00
9c55dce5a1 Added prefetch 2026-03-13 18:06:54 +00:00
078698154e Added prefetch 2026-03-13 18:00:43 +00:00
eaf1f549b8 Added prefetch 2026-03-13 17:50:28 +00:00
10 changed files with 858 additions and 56 deletions

225
Cargo.lock generated
View File

@@ -245,6 +245,12 @@ version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bitflags"
version = "2.11.0"
@@ -461,7 +467,7 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e0e367e4e7da84520dedcac1901e4da967309406d1e51017ae1abfb97adbd38"
dependencies = [
"bitflags",
"bitflags 2.11.0",
"block2",
"libc",
"objc2",
@@ -583,6 +589,15 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]]
name = "furumi-client-core"
version = "0.2.1"
@@ -666,6 +681,7 @@ dependencies = [
"futures-util",
"jsonwebtoken",
"libc",
"notify",
"once_cell",
"prometheus",
"prost",
@@ -1001,6 +1017,26 @@ dependencies = [
"serde_core",
]
[[package]]
name = "inotify"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [
"bitflags 1.3.2",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.2"
@@ -1058,6 +1094,26 @@ dependencies = [
"simple_asn1",
]
[[package]]
name = "kqueue"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
dependencies = [
"bitflags 1.3.2",
"libc",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
@@ -1082,7 +1138,7 @@ version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a"
dependencies = [
"bitflags",
"bitflags 2.11.0",
"libc",
"plain",
"redox_syscall 0.7.3",
@@ -1148,6 +1204,18 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "mio"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
dependencies = [
"libc",
"log",
"wasi",
"windows-sys 0.48.0",
]
[[package]]
name = "mio"
version = "1.1.1"
@@ -1211,7 +1279,7 @@ version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46"
dependencies = [
"bitflags",
"bitflags 2.11.0",
"cfg-if",
"cfg_aliases",
"libc",
@@ -1223,7 +1291,7 @@ version = "0.31.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3"
dependencies = [
"bitflags",
"bitflags 2.11.0",
"cfg-if",
"cfg_aliases",
"libc",
@@ -1239,6 +1307,25 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "notify"
version = "6.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d"
dependencies = [
"bitflags 2.11.0",
"crossbeam-channel",
"filetime",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"log",
"mio 0.8.11",
"walkdir",
"windows-sys 0.48.0",
]
[[package]]
name = "nu-ansi-term"
version = "0.50.3"
@@ -1490,7 +1577,7 @@ version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc5b72d8145275d844d4b5f6d4e1eef00c8cd889edb6035c21675d1bb1f45c9f"
dependencies = [
"bitflags",
"bitflags 2.11.0",
"hex",
"procfs-core",
"rustix 0.38.44",
@@ -1502,7 +1589,7 @@ version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "239df02d8349b06fc07398a3a1697b06418223b1c7725085e801e7c0fc6a12ec"
dependencies = [
"bitflags",
"bitflags 2.11.0",
"hex",
]
@@ -1675,7 +1762,7 @@ version = "0.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d"
dependencies = [
"bitflags",
"bitflags 2.11.0",
]
[[package]]
@@ -1684,7 +1771,7 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16"
dependencies = [
"bitflags",
"bitflags 2.11.0",
]
[[package]]
@@ -1745,7 +1832,7 @@ version = "0.38.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154"
dependencies = [
"bitflags",
"bitflags 2.11.0",
"errno",
"libc",
"linux-raw-sys 0.4.15",
@@ -1758,7 +1845,7 @@ version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190"
dependencies = [
"bitflags",
"bitflags 2.11.0",
"errno",
"libc",
"linux-raw-sys 0.12.1",
@@ -1835,6 +1922,15 @@ version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "schannel"
version = "0.1.28"
@@ -1856,7 +1952,7 @@ version = "3.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef"
dependencies = [
"bitflags",
"bitflags 2.11.0",
"core-foundation",
"core-foundation-sys",
"libc",
@@ -2181,7 +2277,7 @@ checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d"
dependencies = [
"bytes",
"libc",
"mio",
"mio 1.1.1",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
@@ -2439,6 +2535,16 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "walkdir"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
dependencies = [
"same-file",
"winapi-util",
]
[[package]]
name = "want"
version = "0.3.1"
@@ -2545,7 +2651,7 @@ version = "0.244.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe"
dependencies = [
"bitflags",
"bitflags 2.11.0",
"hashbrown 0.15.5",
"indexmap 2.13.0",
"semver",
@@ -2576,6 +2682,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
@@ -2588,13 +2703,22 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-sys"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets 0.48.5",
]
[[package]]
name = "windows-sys"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets",
"windows-targets 0.52.6",
]
[[package]]
@@ -2606,34 +2730,67 @@ dependencies = [
"windows-link",
]
[[package]]
name = "windows-targets"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c"
dependencies = [
"windows_aarch64_gnullvm 0.48.5",
"windows_aarch64_msvc 0.48.5",
"windows_i686_gnu 0.48.5",
"windows_i686_msvc 0.48.5",
"windows_x86_64_gnu 0.48.5",
"windows_x86_64_gnullvm 0.48.5",
"windows_x86_64_msvc 0.48.5",
]
[[package]]
name = "windows-targets"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_aarch64_gnullvm 0.52.6",
"windows_aarch64_msvc 0.52.6",
"windows_i686_gnu 0.52.6",
"windows_i686_gnullvm",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
"windows_i686_msvc 0.52.6",
"windows_x86_64_gnu 0.52.6",
"windows_x86_64_gnullvm 0.52.6",
"windows_x86_64_msvc 0.52.6",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]]
name = "windows_i686_gnu"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e"
[[package]]
name = "windows_i686_gnu"
version = "0.52.6"
@@ -2646,24 +2803,48 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]]
name = "windows_i686_msvc"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406"
[[package]]
name = "windows_i686_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.6"
@@ -2728,7 +2909,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2"
dependencies = [
"anyhow",
"bitflags",
"bitflags 2.11.0",
"indexmap 2.13.0",
"log",
"serde",

View File

@@ -1,7 +1,7 @@
use crate::error::{ClientError, Result};
use furumi_common::proto::{
remote_file_system_client::RemoteFileSystemClient, AttrResponse, DirEntry, FileChunk,
PathRequest, ReadRequest,
PathRequest, ReadRequest, SnapshotRequest, WatchRequest,
};
use moka::future::Cache;
use std::future::Future;
@@ -15,7 +15,7 @@ use tonic::codegen::InterceptedService;
use tonic::metadata::MetadataValue;
use tonic::transport::{Channel, Endpoint, Uri};
use tonic::{Request, Status};
use tracing::{debug, info};
use tracing::{debug, info, warn, trace};
// ── Auth interceptor ───────────────────────────────────────────
@@ -127,6 +127,7 @@ impl tower::Service<Uri> for InsecureTlsConnector {
pub struct FurumiClient {
client: GrpcClient,
attr_cache: Cache<String, AttrResponse>,
dir_cache: Cache<String, Arc<Vec<DirEntry>>>,
}
impl FurumiClient {
@@ -187,29 +188,130 @@ impl FurumiClient {
.time_to_live(Duration::from_secs(5))
.build();
Ok(Self { client, attr_cache })
let dir_cache = Cache::builder()
.max_capacity(10_000)
.time_to_live(Duration::from_secs(30))
.build();
Ok(Self { client, attr_cache, dir_cache })
}
/// Spawns background tasks that pre-warm the cache with a server snapshot and then
/// subscribe to live change events to keep it up to date.
/// Must be called after a successful authentication check.
pub fn start_background_sync(&self) {
info!("background sync: starting");
let this = self.clone();
tokio::spawn(async move {
let t = std::time::Instant::now();
match this.load_snapshot("/", 3).await {
Ok(n) => info!(
"background sync: snapshot loaded — {} directories in {:.1}s",
n,
t.elapsed().as_secs_f32()
),
Err(e) => {
warn!("background sync: GetSnapshot failed ({}), falling back to on-demand caching", e);
return;
}
}
info!("background sync: subscribing to change events");
// Reconnect loop: if the watch stream drops, reconnect after a short delay.
loop {
match this.run_watch_loop().await {
Ok(()) => {
info!("background sync: WatchChanges stream closed cleanly");
break;
}
Err(e) => {
warn!("background sync: WatchChanges error ({}), reconnecting in 5s", e);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
});
}
/// Fetches the server's pre-built directory snapshot and populates both caches.
/// Returns the number of directories loaded.
async fn load_snapshot(&self, path: &str, depth: u32) -> Result<usize> {
debug!("snapshot: requesting path={} depth={}", path, depth);
let mut client = self.client.clone();
let req = tonic::Request::new(SnapshotRequest {
path: path.to_string(),
depth,
});
let mut stream = client.get_snapshot(req).await?.into_inner();
let mut dirs = 0;
let mut attrs_warmed = 0;
while let Some(entry) = stream.next().await {
let entry = entry?;
trace!("snapshot: got dir '{}' ({} entries)", entry.path, entry.children.len());
// Warm attr_cache for the directory itself.
if let Some(dir_attr) = entry.dir_attr {
self.attr_cache.insert(entry.path.clone(), dir_attr).await;
attrs_warmed += 1;
}
// Warm attr_cache for each child (parallel slice: children[i] ↔ child_attrs[i]).
for (child, attr) in entry.children.iter().zip(entry.child_attrs.iter()) {
let child_path = if entry.path == "/" {
format!("/{}", child.name)
} else {
format!("{}/{}", entry.path, child.name)
};
self.attr_cache.insert(child_path, attr.clone()).await;
attrs_warmed += 1;
}
// Populate dir_cache.
self.dir_cache
.insert(entry.path, Arc::new(entry.children))
.await;
dirs += 1;
}
debug!("snapshot: {} dirs → dir_cache, {} attrs → attr_cache", dirs, attrs_warmed);
Ok(dirs)
}
/// Subscribes to the server's live change events and invalidates `dir_cache` entries.
/// Returns when the stream closes or on error.
async fn run_watch_loop(&self) -> Result<()> {
let mut client = self.client.clone();
let req = tonic::Request::new(WatchRequest {});
let mut stream = client.watch_changes(req).await?.into_inner();
while let Some(event) = stream.next().await {
let event = event?;
debug!("watch: invalidating dir_cache for '{}'", event.path);
self.dir_cache.invalidate(&event.path).await;
}
Ok(())
}
/// Fetches file attributes from the server, utilizing an internal cache.
pub async fn get_attr(&self, path: &str) -> Result<AttrResponse> {
if let Some(attr) = self.attr_cache.get(path).await {
trace!("get_attr: cache hit '{}'", path);
return Ok(attr);
}
debug!("get_attr (cache miss): {}", path);
let t = std::time::Instant::now();
let mut client = self.client.clone();
let req = tonic::Request::new(PathRequest {
path: path.to_string(),
});
let response = client.get_attr(req).await?.into_inner();
debug!("get_attr: cache miss '{}' — rpc {:.1}ms", path, t.elapsed().as_secs_f32() * 1000.0);
self.attr_cache.insert(path.to_string(), response.clone()).await;
Ok(response)
}
/// Reads directory contents from the server stream.
pub async fn read_dir(&self, path: &str) -> Result<Vec<DirEntry>> {
debug!("read_dir: {}", path);
/// Fetches directory contents from gRPC and stores them in the cache.
/// Does not trigger prefetch — safe to call from background tasks.
async fn fetch_and_cache_dir(&self, path: &str) -> Result<Arc<Vec<DirEntry>>> {
let t = std::time::Instant::now();
let mut client = self.client.clone();
let req = tonic::Request::new(PathRequest {
path: path.to_string(),
@@ -219,13 +321,87 @@ impl FurumiClient {
let mut entries = Vec::new();
while let Some(chunk) = stream.next().await {
let entry = chunk?;
entries.push(entry);
entries.push(chunk?);
}
debug!(
"fetch_dir: '{}' — {} entries in {:.1}ms",
path,
entries.len(),
t.elapsed().as_secs_f32() * 1000.0
);
let entries = Arc::new(entries);
self.dir_cache.insert(path.to_string(), entries.clone()).await;
Ok(entries)
}
/// Reads directory contents, utilizing an internal cache.
/// On cache miss, fetches from server and spawns a background task to
/// prefetch attributes and immediate subdirectory listings.
pub async fn read_dir(&self, path: &str) -> Result<Vec<DirEntry>> {
if let Some(entries) = self.dir_cache.get(path).await {
debug!("read_dir: cache hit '{}' ({} entries)", path, entries.len());
return Ok((*entries).clone());
}
debug!("read_dir: cache miss '{}' — fetching from server", path);
let entries = self.fetch_and_cache_dir(path).await?;
let self_clone = self.clone();
let path_clone = path.to_string();
let entries_clone = entries.clone();
tokio::spawn(async move {
self_clone.prefetch_children(&path_clone, &entries_clone).await;
});
Ok((*entries).clone())
}
/// Background: warms attr_cache for all children and dir_cache for immediate subdirs.
async fn prefetch_children(&self, parent: &str, entries: &[DirEntry]) {
let dirs: Vec<_> = entries.iter().filter(|e| e.r#type == 4).collect();
let files: Vec<_> = entries.iter().filter(|e| e.r#type != 4).collect();
debug!(
"prefetch: '{}' — warming {} attrs, {} subdirs",
parent,
entries.len(),
dirs.len().min(20)
);
// Warm attr_cache for all children.
for entry in entries {
let child_path = if parent == "/" {
format!("/{}", entry.name)
} else {
format!("{}/{}", parent, entry.name)
};
let _ = self.get_attr(&child_path).await;
}
let _ = files; // suppress unused warning
// Prefetch dir listings for immediate subdirs (up to 20).
let subdirs: Vec<_> = dirs.into_iter().take(20).collect();
let mut fetched = 0;
let mut already_cached = 0;
for subdir in &subdirs {
let child_path = if parent == "/" {
format!("/{}", subdir.name)
} else {
format!("{}/{}", parent, subdir.name)
};
if self.dir_cache.get(&child_path).await.is_none() {
let _ = self.fetch_and_cache_dir(&child_path).await;
fetched += 1;
} else {
already_cached += 1;
}
}
debug!(
"prefetch: '{}' done — {} subdirs fetched, {} already cached",
parent, fetched, already_cached
);
}
/// Fetches file chunk stream from the server.
pub async fn read_file(
&self,

View File

@@ -29,6 +29,40 @@ message FileChunk {
bytes data = 1;
}
// ── Snapshot & watch ──────────────────────────────────────────────
// Request a pre-built snapshot of the directory tree up to `depth` levels.
// depth = 0 means only the requested path itself; depth = 1 includes immediate children, etc.
message SnapshotRequest {
string path = 1;
uint32 depth = 2;
}
// One directory's contents within a snapshot response.
// child_attrs is parallel to children: child_attrs[i] is the AttrResponse for children[i].
// dir_attr is the AttrResponse for the directory itself (path).
message SnapshotEntry {
string path = 1;
repeated DirEntry children = 2;
repeated AttrResponse child_attrs = 3;
AttrResponse dir_attr = 4;
}
// Subscribe to live filesystem change notifications (no parameters needed).
message WatchRequest {}
enum ChangeKind {
CREATED = 0;
DELETED = 1;
MODIFIED = 2;
}
// Notifies the client that the contents of `path` have changed.
message ChangeEvent {
string path = 1;
ChangeKind kind = 2;
}
service RemoteFileSystem {
// Get file or directory attributes (size, permissions, timestamps). Maps to stat/getattr.
rpc GetAttr (PathRequest) returns (AttrResponse);
@@ -38,4 +72,12 @@ service RemoteFileSystem {
// Read chunks of a file. Uses Server Streaming for efficient chunk delivery based on offset/size.
rpc ReadFile (ReadRequest) returns (stream FileChunk);
// Return a pre-built in-memory snapshot of the directory tree rooted at `path`.
// The server walks `depth` levels deep on its side — one round-trip fills the client cache.
rpc GetSnapshot (SnapshotRequest) returns (stream SnapshotEntry);
// Subscribe to live filesystem change events. The server pushes a ChangeEvent whenever
// a directory's contents change, allowing the client to invalidate its cache immediately.
rpc WatchChanges (WatchRequest) returns (stream ChangeEvent);
}

View File

@@ -9,7 +9,7 @@ use std::time::{Duration, UNIX_EPOCH};
use tracing::{debug, error};
use tokio::runtime::Handle;
const TTL: Duration = Duration::from_secs(1); // 1 second FUSE kernel TTL
const TTL: Duration = Duration::from_secs(5); // 5 second FUSE kernel TTL (matches attr_cache)
// ── InodeMapper ──────────────────────────────────────────────────

View File

@@ -64,6 +64,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
return Err(format!("Failed to authenticate or connect to server: {}", e).into());
}
// Auth verified — start background snapshot + watch sync
c.start_background_sync();
Ok::<_, Box<dyn std::error::Error>>(c)
})?;

View File

@@ -58,7 +58,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
format!("https://{}", args.server)
};
let client = rt.block_on(async { FurumiClient::connect(&full_addr, &args.token).await })?;
let client = rt.block_on(async {
let c = FurumiClient::connect(&full_addr, &args.token).await?;
c.start_background_sync();
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(c)
})?;
let furumi_nfs = nfs::FurumiNfs::new(client);

View File

@@ -27,6 +27,7 @@ prometheus = { version = "0.14.0", features = ["process"] }
axum = { version = "0.7", features = ["tokio"] }
once_cell = "1.21.3"
rcgen = { version = "0.14.7", features = ["pem"] }
notify = "6"
[dev-dependencies]
tempfile = "3.26.0"

View File

@@ -2,6 +2,7 @@ pub mod vfs;
pub mod security;
pub mod server;
pub mod metrics;
pub mod tree;
use std::net::SocketAddr;
use std::path::PathBuf;
@@ -74,7 +75,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
let vfs = Arc::new(LocalVfs::new(&root_path));
let remote_fs = RemoteFileSystemImpl::new(vfs);
let tree = Arc::new(tree::WatchedTree::new(root_path.clone()).await?);
let remote_fs = RemoteFileSystemImpl::new(vfs, tree);
let auth = AuthInterceptor::new(args.token.clone());
let svc = RemoteFileSystemServer::with_interceptor(remote_fs, auth.clone());

View File

@@ -1,22 +1,27 @@
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio_stream::Stream;
use tonic::{Request, Response, Status};
use crate::vfs::VirtualFileSystem;
use crate::metrics::{self, RequestTimer};
use furumi_common::proto::{
remote_file_system_server::RemoteFileSystem, AttrResponse, DirEntry, FileChunk,
PathRequest, ReadRequest,
};
use crate::security::sanitize_path;
use crate::tree::WatchedTree;
use crate::vfs::VirtualFileSystem;
use furumi_common::proto::{
remote_file_system_server::RemoteFileSystem, AttrResponse, ChangeEvent, DirEntry, FileChunk,
PathRequest, ReadRequest, SnapshotEntry, SnapshotRequest, WatchRequest,
};
pub struct RemoteFileSystemImpl<V: VirtualFileSystem> {
vfs: std::sync::Arc<V>,
vfs: Arc<V>,
tree: Arc<WatchedTree>,
}
impl<V: VirtualFileSystem> RemoteFileSystemImpl<V> {
pub fn new(vfs: std::sync::Arc<V>) -> Self {
Self { vfs }
pub fn new(vfs: Arc<V>, tree: Arc<WatchedTree>) -> Self {
Self { vfs, tree }
}
}
@@ -43,11 +48,7 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
}
}
type ReadDirStream = Pin<
Box<
dyn Stream<Item = Result<DirEntry, Status>> + Send + 'static,
>,
>;
type ReadDirStream = Pin<Box<dyn Stream<Item = Result<DirEntry, Status>> + Send + 'static>>;
async fn read_dir(
&self,
@@ -78,11 +79,7 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
}
}
type ReadFileStream = Pin<
Box<
dyn Stream<Item = Result<FileChunk, Status>> + Send + 'static,
>,
>;
type ReadFileStream = Pin<Box<dyn Stream<Item = Result<FileChunk, Status>> + Send + 'static>>;
async fn read_file(
&self,
@@ -126,5 +123,74 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
}
}
}
// ── Snapshot ─────────────────────────────────────────────────
type GetSnapshotStream =
Pin<Box<dyn Stream<Item = Result<SnapshotEntry, Status>> + Send + 'static>>;
async fn get_snapshot(
&self,
request: Request<SnapshotRequest>,
) -> Result<Response<Self::GetSnapshotStream>, Status> {
let req = request.into_inner();
let safe_path = sanitize_path(&req.path)?;
// sanitize_path strips the leading "/" — map "" back to "/" for snapshot lookup.
let virt_path = sanitized_to_virt(&safe_path);
let entries = self.tree.get_snapshot(&virt_path, req.depth).await;
let total_entries: usize = entries.iter().map(|(_, d)| d.children.len()).sum();
tracing::debug!(
"GetSnapshot: path='{}' depth={} → {} dirs, {} total entries",
virt_path, req.depth, entries.len(), total_entries
);
let stream = async_stream::try_stream! {
for (path, snap_dir) in entries {
yield SnapshotEntry {
path,
children: snap_dir.children,
child_attrs: snap_dir.child_attrs,
dir_attr: Some(snap_dir.dir_attr),
};
}
};
Ok(Response::new(Box::pin(stream) as Self::GetSnapshotStream))
}
// ── Watch ─────────────────────────────────────────────────────
type WatchChangesStream =
Pin<Box<dyn Stream<Item = Result<ChangeEvent, Status>> + Send + 'static>>;
async fn watch_changes(
&self,
_request: Request<WatchRequest>,
) -> Result<Response<Self::WatchChangesStream>, Status> {
let mut rx = self.tree.subscribe();
let stream = async_stream::try_stream! {
loop {
match rx.recv().await {
Ok(event) => yield event,
Err(broadcast::error::RecvError::Lagged(n)) => {
// Client was too slow — it missed n events.
// Log and continue; the client's TTL will cover the gap.
tracing::warn!("WatchChanges client lagged, skipped {} events", n);
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
};
Ok(Response::new(Box::pin(stream) as Self::WatchChangesStream))
}
}
/// sanitize_path removes the leading "/" so "/" becomes "". Map it back.
fn sanitized_to_virt(safe: &str) -> String {
if safe.is_empty() {
"/".to_string()
} else {
format!("/{}", safe)
}
}

327
furumi-server/src/tree.rs Normal file
View File

@@ -0,0 +1,327 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
use tokio::sync::{broadcast, RwLock};
use tracing::{debug, info, warn};
use furumi_common::proto::{AttrResponse, ChangeEvent, ChangeKind, DirEntry};
/// How many directory levels to pre-walk on startup.
const INITIAL_DEPTH: u32 = 3;
/// Broadcast channel capacity — clients that fall behind lose events and rely on TTL.
const BROADCAST_CAPACITY: usize = 256;
// ── Types ─────────────────────────────────────────────────────────
/// One directory in the snapshot: its entries and the attr for each child.
/// `children` and `child_attrs` are parallel slices (same index = same file).
/// `dir_attr` is the attr of the directory itself.
#[derive(Clone)]
pub struct SnapDir {
pub children: Vec<DirEntry>,
pub child_attrs: Vec<AttrResponse>,
pub dir_attr: AttrResponse,
}
// ── WatchedTree ──────────────────────────────────────────────────
/// Maintains an in-memory snapshot of the directory tree and broadcasts
/// change events to connected clients via inotify.
pub struct WatchedTree {
/// Virtual-path → (entries + attrs).
snapshot: Arc<RwLock<HashMap<String, SnapDir>>>,
change_tx: broadcast::Sender<ChangeEvent>,
/// Kept alive to continue watching. Shared with the event handler so it can
/// add new watches when directories are created at runtime.
_watcher: Arc<Mutex<RecommendedWatcher>>,
}
impl WatchedTree {
pub async fn new(root: PathBuf) -> anyhow::Result<Self> {
let snapshot: Arc<RwLock<HashMap<String, SnapDir>>> =
Arc::new(RwLock::new(HashMap::new()));
let (change_tx, _) = broadcast::channel(BROADCAST_CAPACITY);
info!("WatchedTree: walking '{}' (depth {})…", root.display(), INITIAL_DEPTH);
let t = std::time::Instant::now();
let watched_dirs = walk_tree(&root, INITIAL_DEPTH, &snapshot).await;
let snap_len = snapshot.read().await.len();
let total_entries: usize = snapshot.read().await.values().map(|d| d.children.len()).sum();
info!(
"WatchedTree: snapshot ready — {} dirs, {} entries, {} watches, took {:.1}s",
snap_len,
total_entries,
watched_dirs.len(),
t.elapsed().as_secs_f32(),
);
// Bridge notify's sync callback → async tokio task.
let (notify_tx, mut notify_rx) =
tokio::sync::mpsc::unbounded_channel::<notify::Result<notify::Event>>();
let watcher = Arc::new(Mutex::new(RecommendedWatcher::new(
move |res| {
let _ = notify_tx.send(res);
},
Config::default(),
)?));
// Add one non-recursive inotify watch per directory in the snapshot.
{
let mut w = watcher.lock().unwrap();
for dir_abs in &watched_dirs {
if let Err(e) = w.watch(dir_abs, RecursiveMode::NonRecursive) {
warn!("watch failed for {:?}: {}", dir_abs, e);
}
}
}
let snapshot_bg = Arc::clone(&snapshot);
let root_bg = root.clone();
let tx_bg = change_tx.clone();
let watcher_bg = Arc::clone(&watcher);
tokio::spawn(async move {
while let Some(res) = notify_rx.recv().await {
match res {
Ok(event) => {
handle_fs_event(event, &root_bg, &snapshot_bg, &tx_bg, &watcher_bg).await
}
Err(e) => warn!("notify error: {}", e),
}
}
});
Ok(Self {
snapshot,
change_tx,
_watcher: watcher,
})
}
/// Returns all snapshot entries within `depth` levels of `base`.
pub async fn get_snapshot(&self, base: &str, depth: u32) -> Vec<(String, SnapDir)> {
let snap = self.snapshot.read().await;
snap.iter()
.filter(|(path, _)| path_depth_from(base, path).map_or(false, |d| d <= depth))
.map(|(path, snap_dir)| (path.clone(), snap_dir.clone()))
.collect()
}
pub fn subscribe(&self) -> broadcast::Receiver<ChangeEvent> {
self.change_tx.subscribe()
}
}
// ── Helpers ──────────────────────────────────────────────────────
fn metadata_to_attr(meta: &std::fs::Metadata) -> AttrResponse {
AttrResponse {
size: meta.len(),
mode: meta.mode(),
mtime: meta.mtime() as u64,
}
}
/// BFS walk: reads filesystem, stores entries + attrs in snapshot.
/// Returns the list of absolute paths walked (for inotify setup).
async fn walk_tree(
root: &Path,
max_depth: u32,
snapshot: &Arc<RwLock<HashMap<String, SnapDir>>>,
) -> Vec<PathBuf> {
let mut walked: Vec<PathBuf> = Vec::new();
let mut queue: VecDeque<(PathBuf, String, u32)> = VecDeque::new();
queue.push_back((root.to_path_buf(), "/".to_string(), 0));
while let Some((abs_path, virt_path, depth)) = queue.pop_front() {
// Stat the directory itself.
let dir_attr = match std::fs::metadata(&abs_path) {
Ok(m) => metadata_to_attr(&m),
Err(e) => {
warn!("walk_tree: cannot stat {:?}: {}", abs_path, e);
continue;
}
};
let mut dir = match tokio::fs::read_dir(&abs_path).await {
Ok(d) => d,
Err(e) => {
warn!("walk_tree: cannot read {:?}: {}", abs_path, e);
continue;
}
};
let mut children: Vec<DirEntry> = Vec::new();
let mut child_attrs: Vec<AttrResponse> = Vec::new();
while let Ok(Some(entry)) = dir.next_entry().await {
let Ok(ft) = entry.file_type().await else { continue };
let type_val = if ft.is_dir() { 4 } else if ft.is_file() { 8 } else { continue };
let name = entry.file_name().to_string_lossy().into_owned();
// Stat the child.
let attr = match entry.metadata().await {
Ok(m) => metadata_to_attr(&m),
Err(_) => AttrResponse { size: 0, mode: 0, mtime: 0 },
};
// Skip hidden directories to avoid exploding the watch list.
if ft.is_dir() && name.starts_with('.') {
continue;
}
if ft.is_dir() && depth < max_depth {
let child_virt = child_virt_path(&virt_path, &name);
queue.push_back((entry.path(), child_virt, depth + 1));
}
children.push(DirEntry { name, r#type: type_val });
child_attrs.push(attr);
}
snapshot.write().await.insert(virt_path, SnapDir { children, child_attrs, dir_attr });
walked.push(abs_path);
}
walked
}
/// Re-reads a single directory and updates its snapshot entry.
async fn refresh_dir(
abs_path: &Path,
virt_path: &str,
snapshot: &Arc<RwLock<HashMap<String, SnapDir>>>,
) {
let dir_attr = match std::fs::metadata(abs_path) {
Ok(m) => metadata_to_attr(&m),
Err(_) => {
snapshot.write().await.remove(virt_path);
return;
}
};
let mut dir = match tokio::fs::read_dir(abs_path).await {
Ok(d) => d,
Err(_) => {
snapshot.write().await.remove(virt_path);
return;
}
};
let mut children: Vec<DirEntry> = Vec::new();
let mut child_attrs: Vec<AttrResponse> = Vec::new();
while let Ok(Some(entry)) = dir.next_entry().await {
let Ok(ft) = entry.file_type().await else { continue };
let type_val = if ft.is_dir() { 4 } else if ft.is_file() { 8 } else { continue };
let attr = match entry.metadata().await {
Ok(m) => metadata_to_attr(&m),
Err(_) => AttrResponse { size: 0, mode: 0, mtime: 0 },
};
children.push(DirEntry {
name: entry.file_name().to_string_lossy().into_owned(),
r#type: type_val,
});
child_attrs.push(attr);
}
snapshot.write().await.insert(virt_path.to_string(), SnapDir { children, child_attrs, dir_attr });
}
async fn handle_fs_event(
event: notify::Event,
root: &Path,
snapshot: &Arc<RwLock<HashMap<String, SnapDir>>>,
tx: &broadcast::Sender<ChangeEvent>,
watcher: &Arc<Mutex<RecommendedWatcher>>,
) {
use notify::EventKind;
let proto_kind = match &event.kind {
EventKind::Create(_) => ChangeKind::Created,
EventKind::Remove(_) => ChangeKind::Deleted,
EventKind::Modify(_) => ChangeKind::Modified,
_ => return,
};
let kind_i32 = proto_kind as i32;
if matches!(event.kind, EventKind::Create(_)) {
for path in &event.paths {
if path.is_dir()
&& !path.file_name().map_or(false, |n| n.to_string_lossy().starts_with('.'))
{
let mut w = watcher.lock().unwrap();
if let Err(e) = w.watch(path, RecursiveMode::NonRecursive) {
warn!("failed to add watch for new dir {:?}: {}", path, e);
}
}
}
}
let mut parents: HashSet<PathBuf> = HashSet::new();
for path in &event.paths {
if let Some(parent) = path.parent() {
if parent.starts_with(root) {
parents.insert(parent.to_path_buf());
}
}
}
for parent_abs in parents {
let virt = abs_to_virt(root, &parent_abs);
debug!("snapshot refresh: {}", virt);
refresh_dir(&parent_abs, &virt, snapshot).await;
let _ = tx.send(ChangeEvent { path: virt, kind: kind_i32 });
}
}
fn abs_to_virt(root: &Path, abs: &Path) -> String {
match abs.strip_prefix(root) {
Ok(rel) if rel.as_os_str().is_empty() => "/".to_string(),
Ok(rel) => format!("/{}", rel.to_string_lossy()),
Err(_) => "/".to_string(),
}
}
fn child_virt_path(parent: &str, name: &str) -> String {
if parent == "/" { format!("/{}", name) } else { format!("{}/{}", parent, name) }
}
fn path_depth_from(base: &str, path: &str) -> Option<u32> {
if base == "/" {
if path == "/" { return Some(0); }
let trimmed = path.trim_start_matches('/');
Some(trimmed.matches('/').count() as u32 + 1)
} else {
if path == base { return Some(0); }
let prefix = format!("{}/", base);
path.strip_prefix(prefix.as_str())
.map(|rest| rest.matches('/').count() as u32 + 1)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_path_depth_from_root() {
assert_eq!(path_depth_from("/", "/"), Some(0));
assert_eq!(path_depth_from("/", "/a"), Some(1));
assert_eq!(path_depth_from("/", "/a/b"), Some(2));
assert_eq!(path_depth_from("/", "/a/b/c"), Some(3));
}
#[test]
fn test_path_depth_from_subdir() {
assert_eq!(path_depth_from("/movies", "/movies"), Some(0));
assert_eq!(path_depth_from("/movies", "/movies/action"), Some(1));
assert_eq!(path_depth_from("/movies", "/movies/action/marvel"), Some(2));
assert_eq!(path_depth_from("/movies", "/music"), None);
assert_eq!(path_depth_from("/movies", "/movies-extra"), None);
assert_eq!(path_depth_from("/movies", "/"), None);
}
}