use crate::error::{ClientError, Result}; use furumi_common::proto::{ remote_file_system_client::RemoteFileSystemClient, AttrResponse, DirEntry, FileChunk, PathRequest, ReadRequest, }; use moka::future::Cache; use std::time::Duration; use tokio_stream::StreamExt; use tonic::codegen::InterceptedService; use tonic::metadata::MetadataValue; use tonic::transport::{Channel, Endpoint}; use tonic::{Request, Status}; use tracing::{debug, info}; #[derive(Clone)] pub struct AuthInterceptor { token: String, } impl tonic::service::Interceptor for AuthInterceptor { fn call(&mut self, mut request: Request<()>) -> std::result::Result, Status> { if !self.token.is_empty() { let token_str = format!("Bearer {}", self.token); let meta_val = MetadataValue::try_from(&token_str) .map_err(|_| Status::invalid_argument("Invalid token format"))?; request.metadata_mut().insert("authorization", meta_val); } Ok(request) } } pub type GrpcClient = RemoteFileSystemClient>; #[derive(Clone)] pub struct FurumiClient { client: GrpcClient, attr_cache: Cache, } impl FurumiClient { /// Connects to the Furumi-ng server. /// /// - `addr`: Server URL. Use `https://` for TLS, `http://` for plaintext. /// - `token`: Bearer token for auth (empty = no auth). /// - `tls_ca_pem`: Optional CA certificate PEM bytes for verifying the server. /// If using TLS without a CA cert, pass `None` (uses system roots). pub async fn connect(addr: &str, token: &str, tls_ca_pem: Option>) -> Result { // Install ring as the default crypto provider for rustls let _ = rustls::crypto::ring::default_provider().install_default(); let mut endpoint = Endpoint::from_shared(addr.to_string()) .map_err(|e| ClientError::Internal(format!("Invalid URI: {}", e)))? .timeout(Duration::from_secs(30)) .concurrency_limit(256) .tcp_keepalive(Some(Duration::from_secs(60))) .http2_keep_alive_interval(Duration::from_secs(60)); // Configure TLS if using https:// if addr.starts_with("https://") { let mut tls_config = tonic::transport::ClientTlsConfig::new() .domain_name("furumi-ng"); if let Some(ca_pem) = tls_ca_pem { info!("TLS enabled with server CA certificate"); tls_config = tls_config .ca_certificate(tonic::transport::Certificate::from_pem(ca_pem)); } else { info!("TLS enabled with system root certificates"); } endpoint = endpoint.tls_config(tls_config) .map_err(|e| ClientError::Internal(format!("TLS config error: {}", e)))?; } info!("Connecting to {}", addr); let channel = endpoint.connect().await?; let interceptor = AuthInterceptor { token: token.to_string(), }; let client = RemoteFileSystemClient::with_interceptor(channel, interceptor); let attr_cache = Cache::builder() .max_capacity(100_000) .time_to_live(Duration::from_secs(5)) .build(); Ok(Self { client, attr_cache }) } /// Fetches file attributes from the server, utilizing an internal cache. pub async fn get_attr(&self, path: &str) -> Result { if let Some(attr) = self.attr_cache.get(path).await { return Ok(attr); } debug!("get_attr (cache miss): {}", path); let mut client = self.client.clone(); let req = tonic::Request::new(PathRequest { path: path.to_string(), }); let response = client.get_attr(req).await?.into_inner(); self.attr_cache.insert(path.to_string(), response.clone()).await; Ok(response) } /// Reads directory contents from the server stream. pub async fn read_dir(&self, path: &str) -> Result> { debug!("read_dir: {}", path); let mut client = self.client.clone(); let req = tonic::Request::new(PathRequest { path: path.to_string(), }); let mut stream = client.read_dir(req).await?.into_inner(); let mut entries = Vec::new(); while let Some(chunk) = stream.next().await { let entry = chunk?; entries.push(entry); } Ok(entries) } /// Fetches file chunk stream from the server. Returns the streaming receiver. pub async fn read_file( &self, path: &str, offset: u64, size: u32, chunk_size: u32, ) -> Result> { debug!("read_file: {} offset={} size={}", path, offset, size); let mut client = self.client.clone(); let req = tonic::Request::new(ReadRequest { path: path.to_string(), offset, size, chunk_size, }); let stream = client.read_file(req).await?.into_inner(); Ok(stream) } }