use crate::error::{ClientError, Result}; use furumi_common::proto::{ remote_file_system_client::RemoteFileSystemClient, AttrResponse, DirEntry, FileChunk, PathRequest, ReadRequest, }; use moka::future::Cache; use std::time::Duration; use tokio_stream::StreamExt; use tonic::codegen::InterceptedService; use tonic::metadata::MetadataValue; use tonic::transport::{Channel, Endpoint}; use tonic::{Request, Status}; use tracing::{debug, info}; #[derive(Clone)] pub struct AuthInterceptor { token: String, } impl tonic::service::Interceptor for AuthInterceptor { fn call(&mut self, mut request: Request<()>) -> std::result::Result, Status> { if !self.token.is_empty() { let token_str = format!("Bearer {}", self.token); let meta_val = MetadataValue::try_from(&token_str) .map_err(|_| Status::invalid_argument("Invalid token format"))?; request.metadata_mut().insert("authorization", meta_val); } Ok(request) } } pub type GrpcClient = RemoteFileSystemClient>; #[derive(Clone)] pub struct FurumiClient { client: GrpcClient, attr_cache: Cache, } impl FurumiClient { /// Connects to the Furumi-ng server with an optional bearer token. pub async fn connect(addr: &str, token: &str) -> Result { let endpoint = Endpoint::from_shared(addr.to_string()) .map_err(|e| ClientError::Internal(format!("Invalid URI: {}", e)))? .timeout(Duration::from_secs(30)) .concurrency_limit(256) .tcp_keepalive(Some(Duration::from_secs(60))) .http2_keep_alive_interval(Duration::from_secs(60)); info!("Connecting to {}", addr); let channel = endpoint.connect().await?; let interceptor = AuthInterceptor { token: token.to_string(), }; let client = RemoteFileSystemClient::with_interceptor(channel, interceptor); let attr_cache = Cache::builder() .max_capacity(100_000) .time_to_live(Duration::from_secs(5)) // short TTL to catch up changes quickly .build(); Ok(Self { client, attr_cache }) } /// Fetches file attributes from the server, utilizing an internal cache. pub async fn get_attr(&self, path: &str) -> Result { if let Some(attr) = self.attr_cache.get(path).await { return Ok(attr); } debug!("get_attr (cache miss): {}", path); let mut client = self.client.clone(); let req = tonic::Request::new(PathRequest { path: path.to_string(), }); let response = client.get_attr(req).await?.into_inner(); self.attr_cache.insert(path.to_string(), response.clone()).await; Ok(response) } /// Reads directory contents from the server stream. pub async fn read_dir(&self, path: &str) -> Result> { debug!("read_dir: {}", path); let mut client = self.client.clone(); let req = tonic::Request::new(PathRequest { path: path.to_string(), }); let mut stream = client.read_dir(req).await?.into_inner(); let mut entries = Vec::new(); while let Some(chunk) = stream.next().await { let entry = chunk?; entries.push(entry); } Ok(entries) } /// Fetches file chunk stream from the server. Returns the streaming receiver. pub async fn read_file( &self, path: &str, offset: u64, size: u32, chunk_size: u32, ) -> Result> { debug!("read_file: {} offset={} size={}", path, offset, size); let mut client = self.client.clone(); let req = tonic::Request::new(ReadRequest { path: path.to_string(), offset, size, chunk_size, }); let stream = client.read_file(req).await?.into_inner(); Ok(stream) } }