Files
furumi-ng/furumi-server/src/tree.rs

261 lines
9.0 KiB
Rust
Raw Normal View History

2026-03-13 17:50:28 +00:00
use std::collections::{HashMap, HashSet, VecDeque};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
use tokio::sync::{broadcast, RwLock};
use tracing::{debug, info, warn};
use furumi_common::proto::{ChangeEvent, ChangeKind, DirEntry};
/// How many directory levels to pre-walk on startup.
const INITIAL_DEPTH: u32 = 3;
/// Broadcast channel capacity — clients that fall behind lose events and rely on TTL.
const BROADCAST_CAPACITY: usize = 256;
// ── WatchedTree ──────────────────────────────────────────────────
/// Maintains an in-memory snapshot of the directory tree and broadcasts
/// change events to connected clients via inotify.
pub struct WatchedTree {
/// Virtual-path → directory entries, e.g. "/" → [...], "/movies" → [...].
snapshot: Arc<RwLock<HashMap<String, Vec<DirEntry>>>>,
change_tx: broadcast::Sender<ChangeEvent>,
/// Kept alive to continue watching; never accessed after construction.
_watcher: Mutex<RecommendedWatcher>,
}
impl WatchedTree {
pub async fn new(root: PathBuf) -> anyhow::Result<Self> {
let snapshot: Arc<RwLock<HashMap<String, Vec<DirEntry>>>> =
Arc::new(RwLock::new(HashMap::new()));
let (change_tx, _) = broadcast::channel(BROADCAST_CAPACITY);
// Build the initial snapshot synchronously before accepting requests.
walk_tree(&root, INITIAL_DEPTH, &snapshot).await;
info!(
"WatchedTree: snapshot ready ({} directories, depth {})",
snapshot.read().await.len(),
INITIAL_DEPTH
);
// Bridge notify's sync callback → async tokio task.
let (notify_tx, mut notify_rx) =
tokio::sync::mpsc::unbounded_channel::<notify::Result<notify::Event>>();
let mut watcher = RecommendedWatcher::new(
move |res| {
let _ = notify_tx.send(res);
},
Config::default(),
)?;
watcher.watch(&root, RecursiveMode::Recursive)?;
// Process FS events asynchronously.
let snapshot_bg = Arc::clone(&snapshot);
let root_bg = root.clone();
let tx_bg = change_tx.clone();
tokio::spawn(async move {
while let Some(res) = notify_rx.recv().await {
match res {
Ok(event) => handle_fs_event(event, &root_bg, &snapshot_bg, &tx_bg).await,
Err(e) => warn!("notify error: {}", e),
}
}
});
Ok(Self {
snapshot,
change_tx,
_watcher: Mutex::new(watcher),
})
}
/// Returns all snapshot entries whose virtual path is within `depth` levels of `base`.
pub async fn get_snapshot(&self, base: &str, depth: u32) -> Vec<(String, Vec<DirEntry>)> {
let snap = self.snapshot.read().await;
snap.iter()
.filter(|(path, _)| {
path_depth_from(base, path).map_or(false, |d| d <= depth)
})
.map(|(path, entries)| (path.clone(), entries.clone()))
.collect()
}
pub fn subscribe(&self) -> broadcast::Receiver<ChangeEvent> {
self.change_tx.subscribe()
}
}
// ── Helpers ──────────────────────────────────────────────────────
/// Iterative BFS walk: reads the real filesystem and populates the snapshot.
async fn walk_tree(
root: &Path,
max_depth: u32,
snapshot: &Arc<RwLock<HashMap<String, Vec<DirEntry>>>>,
) {
// Queue of (abs_path, virt_path, depth_from_root)
let mut queue: VecDeque<(PathBuf, String, u32)> = VecDeque::new();
queue.push_back((root.to_path_buf(), "/".to_string(), 0));
while let Some((abs_path, virt_path, depth)) = queue.pop_front() {
let mut dir = match tokio::fs::read_dir(&abs_path).await {
Ok(d) => d,
Err(e) => {
warn!("walk_tree: cannot read {:?}: {}", abs_path, e);
continue;
}
};
let mut entries = Vec::new();
while let Ok(Some(entry)) = dir.next_entry().await {
let Ok(ft) = entry.file_type().await else {
continue;
};
let type_val = if ft.is_dir() {
4
} else if ft.is_file() {
8
} else {
continue
};
let name = entry.file_name().to_string_lossy().into_owned();
if ft.is_dir() && depth < max_depth {
let child_virt = child_virt_path(&virt_path, &name);
queue.push_back((entry.path(), child_virt, depth + 1));
}
entries.push(DirEntry { name, r#type: type_val });
}
snapshot.write().await.insert(virt_path, entries);
}
}
/// Re-reads a single directory and updates its snapshot entry.
async fn refresh_dir(
abs_path: &Path,
virt_path: &str,
snapshot: &Arc<RwLock<HashMap<String, Vec<DirEntry>>>>,
) {
let mut dir = match tokio::fs::read_dir(abs_path).await {
Ok(d) => d,
Err(_) => {
// Directory was deleted — remove it from the snapshot.
snapshot.write().await.remove(virt_path);
return;
}
};
let mut entries = Vec::new();
while let Ok(Some(entry)) = dir.next_entry().await {
let Ok(ft) = entry.file_type().await else {
continue;
};
let type_val = if ft.is_dir() { 4 } else if ft.is_file() { 8 } else { continue };
entries.push(DirEntry {
name: entry.file_name().to_string_lossy().into_owned(),
r#type: type_val,
});
}
snapshot.write().await.insert(virt_path.to_string(), entries);
}
async fn handle_fs_event(
event: notify::Event,
root: &Path,
snapshot: &Arc<RwLock<HashMap<String, Vec<DirEntry>>>>,
tx: &broadcast::Sender<ChangeEvent>,
) {
use notify::EventKind;
let proto_kind = match &event.kind {
EventKind::Create(_) => ChangeKind::Created,
EventKind::Remove(_) => ChangeKind::Deleted,
EventKind::Modify(_) => ChangeKind::Modified,
_ => return,
};
let kind_i32 = proto_kind as i32;
// Collect unique parent directories that need refreshing.
let mut parents: HashSet<PathBuf> = HashSet::new();
for path in &event.paths {
if let Some(parent) = path.parent() {
if parent.starts_with(root) {
parents.insert(parent.to_path_buf());
}
}
}
for parent_abs in parents {
let virt = abs_to_virt(root, &parent_abs);
debug!("snapshot refresh: {}", virt);
refresh_dir(&parent_abs, &virt, snapshot).await;
let _ = tx.send(ChangeEvent { path: virt, kind: kind_i32 });
}
}
fn abs_to_virt(root: &Path, abs: &Path) -> String {
match abs.strip_prefix(root) {
Ok(rel) if rel.as_os_str().is_empty() => "/".to_string(),
Ok(rel) => format!("/{}", rel.to_string_lossy()),
Err(_) => "/".to_string(),
}
}
fn child_virt_path(parent: &str, name: &str) -> String {
if parent == "/" {
format!("/{}", name)
} else {
format!("{}/{}", parent, name)
}
}
/// Returns how many levels `path` is below `base`, or `None` if `path` is not under `base`.
///
/// Examples (base="/"): "/" → 0, "/a" → 1, "/a/b" → 2
/// Examples (base="/a"): "/a" → 0, "/a/b" → 1, "/other" → None
fn path_depth_from(base: &str, path: &str) -> Option<u32> {
if base == "/" {
if path == "/" {
return Some(0);
}
let trimmed = path.trim_start_matches('/');
Some(trimmed.matches('/').count() as u32 + 1)
} else {
if path == base {
return Some(0);
}
let prefix = format!("{}/", base);
path.strip_prefix(prefix.as_str())
.map(|rest| rest.matches('/').count() as u32 + 1)
}
}
// ── Tests ────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_path_depth_from_root() {
assert_eq!(path_depth_from("/", "/"), Some(0));
assert_eq!(path_depth_from("/", "/a"), Some(1));
assert_eq!(path_depth_from("/", "/a/b"), Some(2));
assert_eq!(path_depth_from("/", "/a/b/c"), Some(3));
}
#[test]
fn test_path_depth_from_subdir() {
assert_eq!(path_depth_from("/movies", "/movies"), Some(0));
assert_eq!(path_depth_from("/movies", "/movies/action"), Some(1));
assert_eq!(path_depth_from("/movies", "/movies/action/marvel"), Some(2));
assert_eq!(path_depth_from("/movies", "/music"), None);
assert_eq!(path_depth_from("/movies", "/movies-extra"), None);
assert_eq!(path_depth_from("/movies", "/"), None);
}
}