use std::pin::Pin; use tokio_stream::Stream; use tonic::{Request, Response, Status}; use crate::vfs::VirtualFileSystem; use crate::metrics::{self, RequestTimer}; use furumi_common::proto::{ remote_file_system_server::RemoteFileSystem, AttrResponse, DirEntry, FileChunk, PathRequest, ReadRequest, }; use crate::security::sanitize_path; pub struct RemoteFileSystemImpl { vfs: std::sync::Arc, } impl RemoteFileSystemImpl { pub fn new(vfs: std::sync::Arc) -> Self { Self { vfs } } } #[tonic::async_trait] impl RemoteFileSystem for RemoteFileSystemImpl { async fn get_attr( &self, request: Request, ) -> Result, Status> { let timer = RequestTimer::new("GetAttr"); let req = request.into_inner(); let safe_path = sanitize_path(&req.path)?; match self.vfs.get_attr(&safe_path).await { Ok(attr) => { timer.finish_ok(); Ok(Response::new(attr)) } Err(e) => { metrics::FILE_OPEN_ERRORS_TOTAL.inc(); timer.finish_err(); Err(Status::internal(e.to_string())) } } } type ReadDirStream = Pin< Box< dyn Stream> + Send + 'static, >, >; async fn read_dir( &self, request: Request, ) -> Result, Status> { let timer = RequestTimer::new("ReadDir"); let req = request.into_inner(); let safe_path = sanitize_path(&req.path)?; match self.vfs.read_dir(&safe_path).await { Ok(mut rx) => { timer.finish_ok(); let stream = async_stream::try_stream! { let _guard = metrics::ActiveStreamGuard::new(); while let Some(result) = rx.recv().await { match result { Ok(entry) => yield entry, Err(e) => Err(Status::internal(e.to_string()))?, } } }; Ok(Response::new(Box::pin(stream) as Self::ReadDirStream)) } Err(e) => { timer.finish_err(); Err(Status::internal(e.to_string())) } } } type ReadFileStream = Pin< Box< dyn Stream> + Send + 'static, >, >; async fn read_file( &self, request: Request, ) -> Result, Status> { let timer = RequestTimer::new("ReadFile"); let req = request.into_inner(); let safe_path = sanitize_path(&req.path)?; let sanitized_req = ReadRequest { path: safe_path, offset: req.offset, size: req.size, chunk_size: req.chunk_size, }; match self.vfs.read_file(sanitized_req).await { Ok(mut rx) => { timer.finish_ok(); let stream = async_stream::try_stream! { let _guard = metrics::ActiveStreamGuard::new(); while let Some(result) = rx.recv().await { match result { Ok(chunk) => { metrics::BYTES_READ_TOTAL.inc_by(chunk.data.len() as f64); yield chunk; } Err(e) => { metrics::FILE_OPEN_ERRORS_TOTAL.inc(); Err(Status::internal(e.to_string()))?; } } } }; Ok(Response::new(Box::pin(stream) as Self::ReadFileStream)) } Err(e) => { metrics::FILE_OPEN_ERRORS_TOTAL.inc(); timer.finish_err(); Err(Status::internal(e.to_string())) } } } }