Server implemented. Linux client implemented.

This commit is contained in:
2026-03-10 15:33:06 +00:00
parent 0040ae531c
commit a5da1c3a34
20 changed files with 3302 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/target

2228
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

8
Cargo.toml Normal file
View File

@@ -0,0 +1,8 @@
[workspace]
members = [
"furumi-common",
"furumi-server",
"furumi-client-core",
"furumi-mount-linux"
]
resolver = "2"

View File

@@ -0,0 +1,16 @@
[package]
name = "furumi-client-core"
version = "0.1.0"
edition = "2024"
[dependencies]
furumi-common = { path = "../furumi-common" }
anyhow = "1.0.102"
prost = "0.13.5"
tokio = { version = "1.50.0", features = ["full"] }
tokio-stream = "0.1.18"
tonic = "0.12.3"
thiserror = "2.0.18"
moka = { version = "0.12.10", features = ["sync", "future"] }
async-trait = "0.1.89"
tracing = "0.1.44"

View File

@@ -0,0 +1,125 @@
use crate::error::{ClientError, Result};
use furumi_common::proto::{
remote_file_system_client::RemoteFileSystemClient, AttrResponse, DirEntry, FileChunk,
PathRequest, ReadRequest,
};
use moka::future::Cache;
use std::time::Duration;
use tokio_stream::StreamExt;
use tonic::codegen::InterceptedService;
use tonic::metadata::MetadataValue;
use tonic::transport::{Channel, Endpoint};
use tonic::{Request, Status};
use tracing::{debug, info};
#[derive(Clone)]
pub struct AuthInterceptor {
token: String,
}
impl tonic::service::Interceptor for AuthInterceptor {
fn call(&mut self, mut request: Request<()>) -> std::result::Result<Request<()>, Status> {
if !self.token.is_empty() {
let token_str = format!("Bearer {}", self.token);
let meta_val = MetadataValue::try_from(&token_str)
.map_err(|_| Status::invalid_argument("Invalid token format"))?;
request.metadata_mut().insert("authorization", meta_val);
}
Ok(request)
}
}
pub type GrpcClient = RemoteFileSystemClient<InterceptedService<Channel, AuthInterceptor>>;
#[derive(Clone)]
pub struct FurumiClient {
client: GrpcClient,
attr_cache: Cache<String, AttrResponse>,
}
impl FurumiClient {
/// Connects to the Furumi-ng server with an optional bearer token.
pub async fn connect(addr: &str, token: &str) -> Result<Self> {
let endpoint = Endpoint::from_shared(addr.to_string())
.map_err(|e| ClientError::Internal(format!("Invalid URI: {}", e)))?
.timeout(Duration::from_secs(30))
.concurrency_limit(256)
.tcp_keepalive(Some(Duration::from_secs(60)))
.http2_keep_alive_interval(Duration::from_secs(60));
info!("Connecting to {}", addr);
let channel = endpoint.connect().await?;
let interceptor = AuthInterceptor {
token: token.to_string(),
};
let client = RemoteFileSystemClient::with_interceptor(channel, interceptor);
let attr_cache = Cache::builder()
.max_capacity(100_000)
.time_to_live(Duration::from_secs(5)) // short TTL to catch up changes quickly
.build();
Ok(Self { client, attr_cache })
}
/// Fetches file attributes from the server, utilizing an internal cache.
pub async fn get_attr(&self, path: &str) -> Result<AttrResponse> {
if let Some(attr) = self.attr_cache.get(path).await {
return Ok(attr);
}
debug!("get_attr (cache miss): {}", path);
let mut client = self.client.clone();
let req = tonic::Request::new(PathRequest {
path: path.to_string(),
});
let response = client.get_attr(req).await?.into_inner();
self.attr_cache.insert(path.to_string(), response.clone()).await;
Ok(response)
}
/// Reads directory contents from the server stream.
pub async fn read_dir(&self, path: &str) -> Result<Vec<DirEntry>> {
debug!("read_dir: {}", path);
let mut client = self.client.clone();
let req = tonic::Request::new(PathRequest {
path: path.to_string(),
});
let mut stream = client.read_dir(req).await?.into_inner();
let mut entries = Vec::new();
while let Some(chunk) = stream.next().await {
let entry = chunk?;
entries.push(entry);
}
Ok(entries)
}
/// Fetches file chunk stream from the server. Returns the streaming receiver.
pub async fn read_file(
&self,
path: &str,
offset: u64,
size: u32,
chunk_size: u32,
) -> Result<tonic::Streaming<FileChunk>> {
debug!("read_file: {} offset={} size={}", path, offset, size);
let mut client = self.client.clone();
let req = tonic::Request::new(ReadRequest {
path: path.to_string(),
offset,
size,
chunk_size,
});
let stream = client.read_file(req).await?.into_inner();
Ok(stream)
}
}

View File

@@ -0,0 +1,18 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum ClientError {
#[error("Connect error: {0}")]
Connect(#[from] tonic::transport::Error),
#[error("RPC error: {0}")]
Rpc(#[from] tonic::Status),
#[error("Invalid path: {0}")]
InvalidPath(String),
#[error("Internal error: {0}")]
Internal(String),
}
pub type Result<T> = std::result::Result<T, ClientError>;

View File

@@ -0,0 +1,5 @@
pub mod client;
pub mod error;
pub use client::FurumiClient;
pub use error::ClientError;

12
furumi-common/Cargo.toml Normal file
View File

@@ -0,0 +1,12 @@
[package]
name = "furumi-common"
version = "0.1.0"
edition = "2024"
[dependencies]
prost = "0.13.5"
tonic = "0.12.3"
[build-dependencies]
protobuf-src = "2.1.1"
tonic-build = "0.12.3"

12
furumi-common/build.rs Normal file
View File

@@ -0,0 +1,12 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("cargo:rerun-if-changed=proto/virtualfs.proto");
unsafe {
std::env::set_var("PROTOC", protobuf_src::protoc());
}
tonic_build::configure()
.build_server(true)
.build_client(true)
.compile_protos(&["proto/virtualfs.proto"], &["proto"])?;
Ok(())
}

View File

@@ -0,0 +1,41 @@
syntax = "proto3";
package virtualfs;
message PathRequest {
string path = 1;
}
message AttrResponse {
uint64 size = 1;
uint32 mode = 2; // Permissions and file type
uint64 mtime = 3; // Modification time
// ... other standard stat attributes
}
message DirEntry {
string name = 1;
uint32 type = 2; // File or Directory, mapping roughly to libc::DT_REG, libc::DT_DIR, etc.
}
message ReadRequest {
string path = 1;
uint64 offset = 2;
uint32 size = 3;
// Optional requested chunk size. If 0, the server uses its default chunk size.
uint32 chunk_size = 4;
}
message FileChunk {
bytes data = 1;
}
service RemoteFileSystem {
// Get file or directory attributes (size, permissions, timestamps). Maps to stat/getattr.
rpc GetAttr (PathRequest) returns (AttrResponse);
// List directory contents. Uses Server Streaming to handle massively large directories efficiently.
rpc ReadDir (PathRequest) returns (stream DirEntry);
// Read chunks of a file. Uses Server Streaming for efficient chunk delivery based on offset/size.
rpc ReadFile (ReadRequest) returns (stream FileChunk);
}

3
furumi-common/src/lib.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod proto {
tonic::include_proto!("virtualfs");
}

View File

@@ -0,0 +1,16 @@
[package]
name = "furumi-mount-linux"
version = "0.1.0"
edition = "2024"
[dependencies]
furumi-common = { path = "../furumi-common" }
furumi-client-core = { path = "../furumi-client-core" }
anyhow = "1.0.102"
fuser = "0.15.0"
libc = "0.2.183"
clap = { version = "4.5.60", features = ["derive", "env"] }
tracing = "0.1.44"
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
tokio = { version = "1.50.0", features = ["full"] }
tokio-stream = "0.1.18"

View File

@@ -0,0 +1,265 @@
use fuser::{
FileAttr, FileType, Filesystem, ReplyAttr, ReplyData, ReplyDirectory, ReplyEntry, Request,
};
use furumi_client_core::FurumiClient;
use std::collections::HashMap;
use std::ffi::OsStr;
use std::sync::{Arc, Mutex};
use std::time::{Duration, UNIX_EPOCH};
use tracing::{debug, error};
use tokio::runtime::Handle;
const TTL: Duration = Duration::from_secs(1); // 1 second FUSE kernel TTL
pub struct FurumiFuse {
client: FurumiClient,
rt_handle: Handle,
// FUSE deals in inodes (u64). We need to map inode -> string path.
// In a real VFS, this requires a proper tree or bidirectional map.
// For simplicity: inode 1 is always the root "/".
inode_to_path: Arc<Mutex<HashMap<u64, String>>>,
// Mapping path back to inode to keep them consistent
path_to_inode: Arc<Mutex<HashMap<String, u64>>>,
next_inode: Arc<Mutex<u64>>,
}
impl FurumiFuse {
pub fn new(client: FurumiClient, rt_handle: Handle) -> Self {
let mut inode_to_path = HashMap::new();
let mut path_to_inode = HashMap::new();
// Root inode is always 1
inode_to_path.insert(1, "/".to_string());
path_to_inode.insert("/".to_string(), 1);
Self {
client,
rt_handle,
inode_to_path: Arc::new(Mutex::new(inode_to_path)),
path_to_inode: Arc::new(Mutex::new(path_to_inode)),
next_inode: Arc::new(Mutex::new(2)), // Inodes start from 2
}
}
fn get_inode(&self, path: &str) -> u64 {
let mut p2i = self.path_to_inode.lock().unwrap();
if let Some(&inode) = p2i.get(path) {
inode
} else {
let mut next = self.next_inode.lock().unwrap();
let inode = *next;
*next += 1;
p2i.insert(path.to_string(), inode);
self.inode_to_path.lock().unwrap().insert(inode, path.to_string());
inode
}
}
fn get_path(&self, inode: u64) -> Option<String> {
self.inode_to_path.lock().unwrap().get(&inode).cloned()
}
fn make_attr(inode: u64, attr: &furumi_common::proto::AttrResponse) -> FileAttr {
let kind = if attr.mode & libc::S_IFDIR != 0 {
FileType::Directory
} else {
FileType::RegularFile
};
FileAttr {
ino: inode,
size: attr.size,
blocks: (attr.size + 511) / 512,
atime: UNIX_EPOCH + Duration::from_secs(attr.mtime),
mtime: UNIX_EPOCH + Duration::from_secs(attr.mtime),
ctime: UNIX_EPOCH + Duration::from_secs(attr.mtime),
crtime: UNIX_EPOCH + Duration::from_secs(attr.mtime),
kind,
perm: (attr.mode & 0o777) as u16,
nlink: if kind == FileType::Directory { 2 } else { 1 },
uid: unsafe { libc::getuid() }, // Mount as current user
gid: unsafe { libc::getgid() },
rdev: 0,
blksize: 4096,
flags: 0,
}
}
}
impl Filesystem for FurumiFuse {
fn lookup(&mut self, _req: &Request, parent: u64, name: &OsStr, reply: ReplyEntry) {
let name_str = name.to_string_lossy();
debug!("lookup: parent={} name={}", parent, name_str);
let parent_path = match self.get_path(parent) {
Some(p) => p,
None => {
reply.error(libc::ENOENT);
return;
}
};
let full_path = if parent_path == "/" {
format!("/{}", name_str)
} else {
format!("{}/{}", parent_path, name_str)
};
let client = self.client.clone();
let path_clone = full_path.clone();
// Must block inside FUSE callbacks because fuser is synchronous
let attr_res = self.rt_handle.block_on(async {
client.get_attr(&path_clone).await
});
match attr_res {
Ok(attr) => {
let inode = self.get_inode(&full_path);
reply.entry(&TTL, &Self::make_attr(inode, &attr), 0);
}
Err(_) => {
reply.error(libc::ENOENT);
}
}
}
fn getattr(&mut self, _req: &Request, ino: u64, _fh: Option<u64>, reply: ReplyAttr) {
debug!("getattr: ino={}", ino);
let path = match self.get_path(ino) {
Some(p) => p,
None => {
reply.error(libc::ENOENT);
return;
}
};
let client = self.client.clone();
let attr_res = self.rt_handle.block_on(async {
client.get_attr(&path).await
});
match attr_res {
Ok(attr) => {
reply.attr(&TTL, &Self::make_attr(ino, &attr));
}
Err(_) => reply.error(libc::EIO),
}
}
fn readdir(
&mut self,
_req: &Request,
ino: u64,
_fh: u64,
offset: i64,
mut reply: ReplyDirectory,
) {
debug!("readdir: ino={} offset={}", ino, offset);
let path = match self.get_path(ino) {
Some(p) => p,
None => {
reply.error(libc::ENOENT);
return;
}
};
let client = self.client.clone();
let dir_res = self.rt_handle.block_on(async {
client.read_dir(&path).await
});
match dir_res {
Ok(entries) => {
// FUSE readdir requires us to push entries.
// offset 0 is ., offset 1 is ..
if offset == 0 {
let _ = reply.add(ino, 1, FileType::Directory, ".");
let _ = reply.add(ino, 2, FileType::Directory, "..");
}
for (i, entry) in entries.iter().enumerate() {
let entry_offset = (i + 2) as i64;
if entry_offset < offset {
continue;
}
let full_path = if path == "/" {
format!("/{}", entry.name)
} else {
format!("{}/{}", path, entry.name)
};
let child_ino = self.get_inode(&full_path);
let kind = if entry.r#type == 4 {
FileType::Directory
} else {
FileType::RegularFile
};
// reply.add returns true if the buffer is full
if reply.add(child_ino, entry_offset + 1, kind, &entry.name) {
break;
}
}
reply.ok();
}
Err(e) => {
error!("readdir error: {:?}", e);
reply.error(libc::EIO);
}
}
}
fn read(
&mut self,
_req: &Request,
ino: u64,
_fh: u64,
offset: i64,
size: u32,
_flags: i32,
_lock_owner: Option<u64>,
reply: ReplyData,
) {
debug!("read: ino={} offset={} size={}", ino, offset, size);
let path = match self.get_path(ino) {
Some(p) => p,
None => {
reply.error(libc::ENOENT);
return;
}
};
let client = self.client.clone();
// We use block_on to convert the gRPC stream into a synchronous read for FUSE
let read_res: Result<Vec<u8>, String> = self.rt_handle.block_on(async {
match client.read_file(&path, offset as u64, size, size).await {
Ok(mut stream) => {
use tokio_stream::StreamExt;
let mut data = Vec::new();
while let Some(chunk_res) = stream.next().await {
match chunk_res {
Ok(chunk) => data.extend_from_slice(&chunk.data),
Err(e) => return Err(e.to_string()),
}
}
Ok(data)
}
Err(e) => Err(e.to_string()),
}
});
match read_res {
Ok(data) => reply.data(&data),
Err(e) => {
error!("read error: {:?}", e);
reply.error(libc::EIO);
}
}
}
}

View File

@@ -0,0 +1,55 @@
pub mod fs;
use clap::Parser;
use fuser::MountOption;
use std::path::PathBuf;
use furumi_client_core::FurumiClient;
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
/// Server address to connect to
#[arg(short, long, env = "FURUMI_SERVER", default_value = "http://[::1]:50051")]
server: String,
/// Authentication Bearer token (leave empty if auth is disabled on server)
#[arg(short, long, env = "FURUMI_TOKEN", default_value = "")]
token: String,
/// Mount point directory
#[arg(short, long, env = "FURUMI_MOUNT")]
mount: PathBuf,
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let args = Args::parse();
if !args.mount.exists() || !args.mount.is_dir() {
eprintln!("Error: Mount point {:?} does not exist or is not a directory", args.mount);
std::process::exit(1);
}
// Create a robust tokio runtime for the background gRPC work
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
let client = rt.block_on(async {
FurumiClient::connect(&args.server, &args.token).await
})?;
let fs = fs::FurumiFuse::new(client, rt.handle().clone());
let options = vec![
MountOption::RO,
MountOption::FSName("furumi-ng".to_string()),
MountOption::NoExec, // Better security for media mount
];
println!("Mounting Furumi-ng to {:?}", args.mount);
fuser::mount2(fs, args.mount, &options)?;
Ok(())
}

28
furumi-server/Cargo.toml Normal file
View File

@@ -0,0 +1,28 @@
[package]
name = "furumi-server"
version = "0.1.0"
edition = "2024"
[dependencies]
furumi-common = { path = "../furumi-common" }
anyhow = "1.0.102"
bytes = "1.11.1"
clap = { version = "4.5.60", features = ["derive", "env"] }
futures = "0.3.32"
futures-core = "0.3.32"
futures-util = "0.3.32"
jsonwebtoken = "10.3.0"
libc = "0.2.183"
prost = "0.13.5"
rustls = "0.23.37"
thiserror = "2.0.18"
tokio = { version = "1.50.0", features = ["full"] }
tokio-stream = "0.1.18"
tonic = "0.12.3"
tracing = "0.1.44"
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
async-stream = "0.3.6"
async-trait = "0.1.89"
[dev-dependencies]
tempfile = "3.26.0"

68
furumi-server/src/main.rs Normal file
View File

@@ -0,0 +1,68 @@
pub mod vfs;
pub mod security;
pub mod server;
use std::path::PathBuf;
use std::sync::Arc;
use clap::Parser;
use tonic::transport::Server;
use vfs::local::LocalVfs;
use furumi_common::proto::remote_file_system_server::RemoteFileSystemServer;
use server::RemoteFileSystemImpl;
use security::AuthInterceptor;
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
/// IP address and port to bind the server to
#[arg(short, long, env = "FURUMI_BIND", default_value = "[::1]:50051")]
bind: String,
/// Document root directory to expose via VFS
#[arg(short, long, env = "FURUMI_ROOT", default_value = ".")]
root: PathBuf,
/// Authentication Bearer token (leave empty to disable auth)
#[arg(short, long, env = "FURUMI_TOKEN", default_value = "")]
token: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let args = Args::parse();
let addr = args.bind.parse()?;
// Resolve the document root to an absolute path for safety and clarity
let root_path = std::fs::canonicalize(&args.root)
.unwrap_or_else(|_| args.root.clone());
if !root_path.exists() || !root_path.is_dir() {
eprintln!("Error: Root path {:?} does not exist or is not a directory", root_path);
std::process::exit(1);
}
let vfs = Arc::new(LocalVfs::new(&root_path));
let remote_fs = RemoteFileSystemImpl::new(vfs);
let auth = AuthInterceptor::new(args.token.clone());
let svc = RemoteFileSystemServer::with_interceptor(remote_fs, auth.clone());
println!("Furumi-ng Server listening on {}", addr);
if args.token.is_empty() {
println!("WARNING: Authentication is DISABLED");
} else {
println!("Authentication is enabled (Bearer token required)");
}
println!("Document Root: {:?}", root_path);
Server::builder()
// Enable TCP Keep-Alive and HTTP2 Ping to keep connections alive for long media streams
.tcp_keepalive(Some(std::time::Duration::from_secs(60)))
.http2_keepalive_interval(Some(std::time::Duration::from_secs(60)))
.add_service(svc)
.serve(addr)
.await?;
Ok(())
}

View File

@@ -0,0 +1,83 @@
use std::path::{Component, Path};
use tonic::{Request, Status};
/// Sanitizes a path strictly to prevent Path Traversal.
/// Rejects paths containing absolute routes outside the conceptual root
/// or any parent directory (`..`) traversal components.
pub fn sanitize_path(input: &str) -> std::result::Result<String, Status> {
let path = Path::new(input);
let mut normalized = std::path::PathBuf::new();
for component in path.components() {
match component {
Component::ParentDir => {
if !normalized.pop() {
return Err(Status::permission_denied("Path traversal attempt"));
}
}
Component::Normal(c) => normalized.push(c),
Component::CurDir | Component::RootDir | Component::Prefix(_) => {
// Ignore RootDir (making absolute paths logic relative to VFS root)
// Ignore Prefix (Windows C:)
// Ignore CurrentDir (.)
}
}
}
Ok(normalized.to_string_lossy().into_owned())
}
/// A simple tonic interceptor for token-based authorization.
/// We expect a `authorization: Bearer <token>` header.
#[derive(Clone)]
pub struct AuthInterceptor {
expected_token: String,
}
impl AuthInterceptor {
pub fn new(token: String) -> Self {
Self { expected_token: token }
}
}
impl tonic::service::Interceptor for AuthInterceptor {
fn call(&mut self, req: Request<()>) -> std::result::Result<Request<()>, Status> {
// If no token is configured on the server, allow all (or reject all based on policy).
// Here we assume if expected_token is empty, auth is disabled.
if self.expected_token.is_empty() {
return Ok(req);
}
match req.metadata().get("authorization") {
Some(t) => {
let token_str = t.to_str().unwrap_or("");
let expected_header = format!("Bearer {}", self.expected_token);
if token_str == expected_header {
Ok(req)
} else {
Err(Status::unauthenticated("Invalid token"))
}
}
_ => Err(Status::unauthenticated("Missing authorization header")),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sanitize_path_normal() {
assert_eq!(sanitize_path("etc/passwd").unwrap(), "etc/passwd");
assert_eq!(sanitize_path("/etc/passwd").unwrap(), "etc/passwd");
assert_eq!(sanitize_path("foo/./bar").unwrap(), "foo/bar");
}
#[test]
fn test_sanitize_path_traversal() {
assert!(sanitize_path("../etc/passwd").is_err());
assert!(sanitize_path("/../etc/passwd").is_err());
assert!(sanitize_path("foo/../../etc/passwd").is_err());
}
}

101
furumi-server/src/server.rs Normal file
View File

@@ -0,0 +1,101 @@
use std::pin::Pin;
use tokio_stream::Stream;
use tonic::{Request, Response, Status};
use crate::vfs::VirtualFileSystem;
use furumi_common::proto::{
remote_file_system_server::RemoteFileSystem, AttrResponse, DirEntry, FileChunk,
PathRequest, ReadRequest,
};
use crate::security::sanitize_path;
pub struct RemoteFileSystemImpl<V: VirtualFileSystem> {
vfs: std::sync::Arc<V>,
}
impl<V: VirtualFileSystem> RemoteFileSystemImpl<V> {
pub fn new(vfs: std::sync::Arc<V>) -> Self {
Self { vfs }
}
}
#[tonic::async_trait]
impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
async fn get_attr(
&self,
request: Request<PathRequest>,
) -> Result<Response<AttrResponse>, Status> {
let req = request.into_inner();
let safe_path = sanitize_path(&req.path)?;
match self.vfs.get_attr(&safe_path).await {
Ok(attr) => Ok(Response::new(attr)),
Err(e) => Err(Status::internal(e.to_string())),
}
}
type ReadDirStream = Pin<
Box<
dyn Stream<Item = Result<DirEntry, Status>> + Send + 'static,
>,
>;
async fn read_dir(
&self,
request: Request<PathRequest>,
) -> Result<Response<Self::ReadDirStream>, Status> {
let req = request.into_inner();
let safe_path = sanitize_path(&req.path)?;
match self.vfs.read_dir(&safe_path).await {
Ok(mut rx) => {
let stream = async_stream::try_stream! {
while let Some(result) = rx.recv().await {
match result {
Ok(entry) => yield entry,
Err(e) => Err(Status::internal(e.to_string()))?,
}
}
};
Ok(Response::new(Box::pin(stream) as Self::ReadDirStream))
}
Err(e) => Err(Status::internal(e.to_string())),
}
}
type ReadFileStream = Pin<
Box<
dyn Stream<Item = Result<FileChunk, Status>> + Send + 'static,
>,
>;
async fn read_file(
&self,
request: Request<ReadRequest>,
) -> Result<Response<Self::ReadFileStream>, Status> {
let req = request.into_inner();
let safe_path = sanitize_path(&req.path)?;
let sanitized_req = ReadRequest {
path: safe_path,
offset: req.offset,
size: req.size,
chunk_size: req.chunk_size,
};
match self.vfs.read_file(sanitized_req).await {
Ok(mut rx) => {
let stream = async_stream::try_stream! {
while let Some(result) = rx.recv().await {
match result {
Ok(chunk) => yield chunk,
Err(e) => Err(Status::internal(e.to_string()))?,
}
}
};
Ok(Response::new(Box::pin(stream) as Self::ReadFileStream))
}
Err(e) => Err(Status::internal(e.to_string())),
}
}
}

View File

@@ -0,0 +1,197 @@
use super::VirtualFileSystem;
use furumi_common::proto::{AttrResponse, DirEntry, FileChunk, ReadRequest};
use anyhow::{Context, Result};
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::sync::mpsc::{self, Receiver};
pub struct LocalVfs {
root: PathBuf,
}
impl LocalVfs {
pub fn new<P: AsRef<Path>>(root: P) -> Self {
Self {
root: root.as_ref().to_path_buf(),
}
}
/// Appends the sanitized relative path to the root.
fn resolve_path(&self, relative_path: &str) -> Result<PathBuf> {
let rel_path = Path::new(relative_path);
// Note: Security module will handle path traversal sanitization before reaching this point.
// We assume `relative_path` is safe and strictly logical here.
let stripped = rel_path.strip_prefix("/").unwrap_or(rel_path);
Ok(self.root.join(stripped))
}
}
#[async_trait::async_trait]
impl VirtualFileSystem for LocalVfs {
async fn get_attr(&self, path: &str) -> Result<AttrResponse> {
let full_path = self.resolve_path(path)?;
let metadata = fs::metadata(&full_path).await.context("Failed to get metadata")?;
Ok(AttrResponse {
size: metadata.len(),
mode: metadata.mode(),
mtime: metadata.mtime() as u64,
})
}
async fn read_dir(&self, path: &str) -> Result<Receiver<Result<DirEntry, anyhow::Error>>> {
let full_path = self.resolve_path(path)?;
let mut dir = fs::read_dir(&full_path).await.context("Failed to read directory")?;
let (tx, rx) = mpsc::channel(100);
tokio::spawn(async move {
while let Ok(Some(entry)) = dir.next_entry().await {
// In Linux, DT_REG = 8, DT_DIR = 4. We can approximate this.
let file_type = entry.file_type().await.ok();
let mut type_val = 0;
if let Some(ft) = file_type {
if ft.is_dir() {
type_val = 4; // DT_DIR roughly
} else if ft.is_file() {
type_val = 8; // DT_REG roughly
}
}
let dir_entry = DirEntry {
name: entry.file_name().to_string_lossy().into_owned(),
r#type: type_val,
};
if tx.send(Ok(dir_entry)).await.is_err() {
break;
}
}
});
Ok(rx)
}
async fn read_file(&self, req: ReadRequest) -> Result<Receiver<Result<FileChunk, anyhow::Error>>> {
let full_path = self.resolve_path(&req.path)?;
let mut file = fs::File::open(&full_path).await.context("Failed to open file")?;
// Seek to the requested offset
file.seek(std::io::SeekFrom::Start(req.offset)).await.context("Failed to seek in file")?;
// Inform the kernel about our sequential read pattern for aggressive read-ahead
#[cfg(target_os = "linux")]
{
use std::os::unix::io::AsRawFd;
let fd = file.as_raw_fd();
// POSIX_FADV_SEQUENTIAL = 2
unsafe {
libc::posix_fadvise(fd, req.offset as libc::off_t, req.size as libc::off_t, libc::POSIX_FADV_SEQUENTIAL);
}
}
// Limit chunks to 1MB default, or user requested, capping at 4MB for sanity
let mut chunk_size = req.chunk_size as usize;
if chunk_size == 0 {
chunk_size = 1024 * 1024; // 1 MB default
}
chunk_size = chunk_size.min(4 * 1024 * 1024);
// Keep the channel size small (2-4 chunks) to apply backpressure and save memory
let (tx, rx) = mpsc::channel(4);
let max_size = req.size;
tokio::spawn(async move {
let mut remaining = max_size;
use bytes::BytesMut;
while remaining > 0 {
let to_read = (remaining as usize).min(chunk_size);
let mut buffer = BytesMut::zeroed(to_read);
match file.read(&mut buffer).await {
Ok(0) => break, // EOF
Ok(n) => {
buffer.truncate(n);
let chunk = FileChunk {
data: buffer.freeze().to_vec(),
};
remaining = remaining.saturating_sub(n as u32);
if tx.send(Ok(chunk)).await.is_err() {
break; // Client disconnected
}
}
Err(e) => {
let _ = tx.send(Err(e.into())).await;
break;
}
}
}
});
Ok(rx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::tempdir;
#[tokio::test]
async fn test_local_vfs_get_attr() {
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.txt");
let mut file = std::fs::File::create(&file_path).unwrap();
file.write_all(b"hello world").unwrap();
let vfs = LocalVfs::new(dir.path());
let attr = vfs.get_attr("test.txt").await.unwrap();
assert_eq!(attr.size, 11);
}
#[tokio::test]
async fn test_local_vfs_read_dir() {
let dir = tempdir().unwrap();
std::fs::File::create(dir.path().join("file1.txt")).unwrap();
std::fs::create_dir(dir.path().join("subdir")).unwrap();
let vfs = LocalVfs::new(dir.path());
let mut rx = vfs.read_dir("/").await.unwrap();
let mut entries = Vec::new();
while let Some(Ok(entry)) = rx.recv().await {
entries.push(entry.name);
}
assert!(entries.contains(&"file1.txt".to_string()));
assert!(entries.contains(&"subdir".to_string()));
}
#[tokio::test]
async fn test_local_vfs_read_file() {
let dir = tempdir().unwrap();
let file_path = dir.path().join("data.bin");
let mut file = std::fs::File::create(&file_path).unwrap();
file.write_all(b"0123456789").unwrap(); // 10 bytes
let vfs = LocalVfs::new(dir.path());
let req = ReadRequest {
path: "data.bin".to_string(),
offset: 2,
size: 5,
chunk_size: 0,
};
let mut rx = vfs.read_file(req).await.unwrap();
let mut result = Vec::new();
while let Some(Ok(chunk)) = rx.recv().await {
result.extend_from_slice(&chunk.data);
}
assert_eq!(result, b"23456"); // bytes from offset 2, length 5
}
}

View File

@@ -0,0 +1,20 @@
pub mod local;
use anyhow::Result;
use tokio::sync::mpsc::Receiver;
use furumi_common::proto::{AttrResponse, DirEntry, FileChunk, ReadRequest};
/// Abstract Virtual File System trait representing the remote server operations.
#[async_trait::async_trait]
pub trait VirtualFileSystem: Send + Sync + 'static {
/// Get attributes for a file or directory.
async fn get_attr(&self, path: &str) -> Result<AttrResponse>;
/// Read directory entries. Returns a stream (Receiver) of DirEntry.
/// Uses Result<DirEntry> inside the Receiver to propagate individual entry errors if any.
async fn read_dir(&self, path: &str) -> Result<Receiver<Result<DirEntry, anyhow::Error>>>;
/// Read chunks of a file. Returns a stream (Receiver) of FileChunk.
async fn read_file(&self, req: ReadRequest) -> Result<Receiver<Result<FileChunk, anyhow::Error>>>;
}