From 9534eceb025b2401d70582a72a294315b6e5736e Mon Sep 17 00:00:00 2001 From: AB-UK Date: Tue, 10 Mar 2026 15:52:16 +0000 Subject: [PATCH] Added autounmount on stop. Added prom metrics to server 9090 /metrics --- Cargo.lock | 230 +++++++++++++++++++++++++++++- furumi-mount-linux/Cargo.toml | 1 + furumi-mount-linux/src/main.rs | 31 +++- furumi-server/Cargo.toml | 3 + furumi-server/src/main.rs | 34 ++++- furumi-server/src/metrics.rs | 110 ++++++++++++++ furumi-server/src/security/mod.rs | 6 +- furumi-server/src/server.rs | 43 +++++- 8 files changed, 439 insertions(+), 19 deletions(-) create mode 100644 furumi-server/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 2ac5d18..fb6cdef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -158,6 +158,8 @@ dependencies = [ "http", "http-body", "http-body-util", + "hyper", + "hyper-util", "itoa", "matchit", "memchr", @@ -166,10 +168,15 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", "sync_wrapper", + "tokio", "tower 0.5.3", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -190,6 +197,7 @@ dependencies = [ "sync_wrapper", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -204,6 +212,15 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" +[[package]] +name = "block2" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdeb9d870516001442e364c5220d3574d2da8dc765554b4a617230d33fa58ef5" +dependencies = [ + "objc2", +] + [[package]] name = "bumpalo" version = "3.20.2" @@ -328,6 +345,17 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "ctrlc" +version = "3.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0b1fab2ae45819af2d0731d60f2afe17227ebb1a1538a236da84c93e9a60162" +dependencies = [ + "dispatch2", + "nix 0.31.2", + "windows-sys 0.61.2", +] + [[package]] name = "deranged" version = "0.5.8" @@ -337,6 +365,18 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "dispatch2" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e0e367e4e7da84520dedcac1901e4da967309406d1e51017ae1abfb97adbd38" +dependencies = [ + "bitflags", + "block2", + "libc", + "objc2", +] + [[package]] name = "dunce" version = "1.0.5" @@ -416,6 +456,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "form_urlencoded" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" +dependencies = [ + "percent-encoding", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -431,7 +480,7 @@ dependencies = [ "furumi-common", "moka", "prost", - "thiserror", + "thiserror 2.0.18", "tokio", "tokio-stream", "tonic", @@ -454,6 +503,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", + "ctrlc", "furumi-client-core", "furumi-common", "fuser", @@ -471,6 +521,7 @@ dependencies = [ "anyhow", "async-stream", "async-trait", + "axum", "bytes", "clap", "furumi-common", @@ -479,10 +530,12 @@ dependencies = [ "futures-util", "jsonwebtoken", "libc", + "once_cell", + "prometheus", "prost", "rustls", "tempfile", - "thiserror", + "thiserror 2.0.18", "tokio", "tokio-stream", "tonic", @@ -499,7 +552,7 @@ dependencies = [ "libc", "log", "memchr", - "nix", + "nix 0.29.0", "page_size", "pkg-config", "smallvec", @@ -676,6 +729,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "1.4.0" @@ -880,6 +939,12 @@ version = "0.2.183" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" +[[package]] +name = "linux-raw-sys" +version = "0.4.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" + [[package]] name = "linux-raw-sys" version = "0.12.1" @@ -977,6 +1042,18 @@ dependencies = [ "libc", ] +[[package]] +name = "nix" +version = "0.31.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3" +dependencies = [ + "bitflags", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -1020,6 +1097,21 @@ dependencies = [ "autocfg", ] +[[package]] +name = "objc2" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a12a8ed07aefc768292f076dc3ac8c48f3781c8f2d5851dd3d98950e8c5a89f" +dependencies = [ + "objc2-encode", +] + +[[package]] +name = "objc2-encode" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33" + [[package]] name = "once_cell" version = "1.21.3" @@ -1175,6 +1267,45 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "procfs" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc5b72d8145275d844d4b5f6d4e1eef00c8cd889edb6035c21675d1bb1f45c9f" +dependencies = [ + "bitflags", + "hex", + "procfs-core", + "rustix 0.38.44", +] + +[[package]] +name = "procfs-core" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "239df02d8349b06fc07398a3a1697b06418223b1c7725085e801e7c0fc6a12ec" +dependencies = [ + "bitflags", + "hex", +] + +[[package]] +name = "prometheus" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "libc", + "memchr", + "parking_lot", + "procfs", + "protobuf", + "thiserror 2.0.18", +] + [[package]] name = "prost" version = "0.13.5" @@ -1227,6 +1358,17 @@ dependencies = [ "prost", ] +[[package]] +name = "protobuf" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror 1.0.69", +] + [[package]] name = "protobuf-src" version = "2.1.1+27.1" @@ -1236,6 +1378,15 @@ dependencies = [ "cmake", ] +[[package]] +name = "protobuf-support" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror 1.0.69", +] + [[package]] name = "quote" version = "1.0.45" @@ -1339,6 +1490,19 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustix" +version = "0.38.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys 0.4.15", + "windows-sys 0.52.0", +] + [[package]] name = "rustix" version = "1.1.4" @@ -1348,7 +1512,7 @@ dependencies = [ "bitflags", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.12.1", "windows-sys 0.61.2", ] @@ -1394,6 +1558,12 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "ryu" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" + [[package]] name = "scopeguard" version = "1.2.0" @@ -1449,6 +1619,29 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -1491,7 +1684,7 @@ checksum = "0d585997b0ac10be3c5ee635f1bab02d512760d14b7c468801ac8a01d9ae5f1d" dependencies = [ "num-bigint", "num-traits", - "thiserror", + "thiserror 2.0.18", "time", ] @@ -1571,17 +1764,37 @@ dependencies = [ "fastrand", "getrandom 0.4.2", "once_cell", - "rustix", + "rustix 1.1.4", "windows-sys 0.61.2", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.18", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -1761,8 +1974,10 @@ dependencies = [ "futures-util", "pin-project-lite", "sync_wrapper", + "tokio", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -1783,6 +1998,7 @@ version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/furumi-mount-linux/Cargo.toml b/furumi-mount-linux/Cargo.toml index 1fdd7f2..733b382 100644 --- a/furumi-mount-linux/Cargo.toml +++ b/furumi-mount-linux/Cargo.toml @@ -14,3 +14,4 @@ tracing = "0.1.44" tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } tokio = { version = "1.50.0", features = ["full"] } tokio-stream = "0.1.18" +ctrlc = "3.5.2" diff --git a/furumi-mount-linux/src/main.rs b/furumi-mount-linux/src/main.rs index 9eed6f4..7d14622 100644 --- a/furumi-mount-linux/src/main.rs +++ b/furumi-mount-linux/src/main.rs @@ -1,8 +1,10 @@ pub mod fs; use clap::Parser; -use fuser::MountOption; +use fuser::{MountOption, Session}; use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use furumi_client_core::FurumiClient; #[derive(Parser, Debug)] @@ -40,7 +42,7 @@ fn main() -> Result<(), Box> { FurumiClient::connect(&args.server, &args.token).await })?; - let fs = fs::FurumiFuse::new(client, rt.handle().clone()); + let fuse_fs = fs::FurumiFuse::new(client, rt.handle().clone()); let options = vec![ MountOption::RO, @@ -49,7 +51,30 @@ fn main() -> Result<(), Box> { ]; println!("Mounting Furumi-ng to {:?}", args.mount); - fuser::mount2(fs, args.mount, &options)?; + + // Use Session + BackgroundSession for graceful unmount on exit + let session = Session::new(fuse_fs, &args.mount, &options)?; + // Spawn the FUSE event loop in a background thread. + // When `bg_session` is dropped, the filesystem is automatically unmounted. + let bg_session = session.spawn()?; + + // Set up signal handler for graceful shutdown + let running = Arc::new(AtomicBool::new(true)); + let r = running.clone(); + ctrlc::set_handler(move || { + eprintln!("\nReceived signal, unmounting..."); + r.store(false, Ordering::SeqCst); + })?; + + // Wait for shutdown signal + while running.load(Ordering::SeqCst) { + std::thread::sleep(std::time::Duration::from_millis(100)); + } + + // Dropping bg_session automatically unmounts the filesystem + drop(bg_session); + println!("Unmounted successfully."); Ok(()) } + diff --git a/furumi-server/Cargo.toml b/furumi-server/Cargo.toml index c32e62f..3ad3cbd 100644 --- a/furumi-server/Cargo.toml +++ b/furumi-server/Cargo.toml @@ -23,6 +23,9 @@ tracing = "0.1.44" tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } async-stream = "0.3.6" async-trait = "0.1.89" +prometheus = { version = "0.14.0", features = ["process"] } +axum = { version = "0.7", features = ["tokio"] } +once_cell = "1.21.3" [dev-dependencies] tempfile = "3.26.0" diff --git a/furumi-server/src/main.rs b/furumi-server/src/main.rs index 2da4018..5525b88 100644 --- a/furumi-server/src/main.rs +++ b/furumi-server/src/main.rs @@ -1,9 +1,12 @@ pub mod vfs; pub mod security; pub mod server; +pub mod metrics; +use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; +use axum::{Router, routing::get}; use clap::Parser; use tonic::transport::Server; use vfs::local::LocalVfs; @@ -14,7 +17,7 @@ use security::AuthInterceptor; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] struct Args { - /// IP address and port to bind the server to + /// IP address and port to bind the gRPC server to #[arg(short, long, env = "FURUMI_BIND", default_value = "[::1]:50051")] bind: String, @@ -25,6 +28,14 @@ struct Args { /// Authentication Bearer token (leave empty to disable auth) #[arg(short, long, env = "FURUMI_TOKEN", default_value = "")] token: String, + + /// IP address and port for the Prometheus metrics HTTP endpoint + #[arg(long, env = "FURUMI_METRICS_BIND", default_value = "0.0.0.0:9090")] + metrics_bind: String, +} + +async fn metrics_handler() -> String { + metrics::render_metrics() } #[tokio::main] @@ -32,7 +43,17 @@ async fn main() -> Result<(), Box> { tracing_subscriber::fmt::init(); let args = Args::parse(); - let addr = args.bind.parse()?; + let addr: SocketAddr = args.bind.parse().unwrap_or_else(|e| { + eprintln!("Error: Invalid bind address '{}': {}", args.bind, e); + eprintln!(" Expected format: IP:PORT (e.g. 0.0.0.0:50051 or [::1]:50051)"); + std::process::exit(1); + }); + + let metrics_addr: SocketAddr = args.metrics_bind.parse().unwrap_or_else(|e| { + eprintln!("Error: Invalid metrics bind address '{}': {}", args.metrics_bind, e); + eprintln!(" Expected format: IP:PORT (e.g. 0.0.0.0:9090)"); + std::process::exit(1); + }); // Resolve the document root to an absolute path for safety and clarity let root_path = std::fs::canonicalize(&args.root) @@ -55,6 +76,14 @@ async fn main() -> Result<(), Box> { println!("Authentication is enabled (Bearer token required)"); } println!("Document Root: {:?}", root_path); + println!("Metrics endpoint: http://{}/metrics", metrics_addr); + + // Spawn the Prometheus metrics HTTP server on a separate port + let metrics_app = Router::new().route("/metrics", get(metrics_handler)); + let metrics_listener = tokio::net::TcpListener::bind(metrics_addr).await?; + tokio::spawn(async move { + axum::serve(metrics_listener, metrics_app).await.unwrap(); + }); Server::builder() // Enable TCP Keep-Alive and HTTP2 Ping to keep connections alive for long media streams @@ -66,3 +95,4 @@ async fn main() -> Result<(), Box> { Ok(()) } + diff --git a/furumi-server/src/metrics.rs b/furumi-server/src/metrics.rs new file mode 100644 index 0000000..095f711 --- /dev/null +++ b/furumi-server/src/metrics.rs @@ -0,0 +1,110 @@ +use once_cell::sync::Lazy; +use prometheus::{ + register_counter, register_counter_vec, register_gauge, register_histogram_vec, + Counter, CounterVec, Encoder, Gauge, HistogramVec, TextEncoder, +}; +use std::time::Instant; + +// --- Counters --- + +pub static GRPC_REQUESTS_TOTAL: Lazy = Lazy::new(|| { + register_counter_vec!( + "furumi_grpc_requests_total", + "Total number of gRPC requests", + &["method", "status"] + ) + .unwrap() +}); + +pub static BYTES_READ_TOTAL: Lazy = Lazy::new(|| { + register_counter!( + "furumi_bytes_read_total", + "Total number of bytes read from disk and streamed to clients" + ) + .unwrap() +}); + +pub static FILE_OPEN_ERRORS_TOTAL: Lazy = Lazy::new(|| { + register_counter!( + "furumi_file_open_errors_total", + "Total number of file open errors (not found, permission denied, etc.)" + ) + .unwrap() +}); + +pub static AUTH_FAILURES_TOTAL: Lazy = Lazy::new(|| { + register_counter!( + "furumi_auth_failures_total", + "Total number of authentication failures" + ) + .unwrap() +}); + +// --- Histogram --- + +pub static GRPC_REQUEST_DURATION: Lazy = Lazy::new(|| { + register_histogram_vec!( + "furumi_grpc_request_duration_seconds", + "Duration of gRPC requests in seconds", + &["method"], + vec![0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0] + ) + .unwrap() +}); + +// --- Gauges --- + +pub static ACTIVE_STREAMS: Lazy = Lazy::new(|| { + register_gauge!( + "furumi_active_streams", + "Number of currently active streaming connections (ReadFile/ReadDir)" + ) + .unwrap() +}); + +/// Helper to track and record a gRPC call's duration and status. +pub struct RequestTimer { + method: &'static str, + start: Instant, +} + +impl RequestTimer { + pub fn new(method: &'static str) -> Self { + GRPC_REQUESTS_TOTAL + .with_label_values(&[method, "started"]) + .inc(); + Self { + method, + start: Instant::now(), + } + } + + pub fn finish_ok(self) { + let elapsed = self.start.elapsed().as_secs_f64(); + GRPC_REQUEST_DURATION + .with_label_values(&[self.method]) + .observe(elapsed); + GRPC_REQUESTS_TOTAL + .with_label_values(&[self.method, "ok"]) + .inc(); + } + + pub fn finish_err(self) { + let elapsed = self.start.elapsed().as_secs_f64(); + GRPC_REQUEST_DURATION + .with_label_values(&[self.method]) + .observe(elapsed); + GRPC_REQUESTS_TOTAL + .with_label_values(&[self.method, "error"]) + .inc(); + } +} + +/// Render all registered metrics in Prometheus text format. +pub fn render_metrics() -> String { + let encoder = TextEncoder::new(); + let metric_families = prometheus::gather(); + let mut buffer = Vec::new(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + String::from_utf8(buffer).unwrap() +} diff --git a/furumi-server/src/security/mod.rs b/furumi-server/src/security/mod.rs index 8bb76bd..8b74cd2 100644 --- a/furumi-server/src/security/mod.rs +++ b/furumi-server/src/security/mod.rs @@ -55,10 +55,14 @@ impl tonic::service::Interceptor for AuthInterceptor { if token_str == expected_header { Ok(req) } else { + crate::metrics::AUTH_FAILURES_TOTAL.inc(); Err(Status::unauthenticated("Invalid token")) } } - _ => Err(Status::unauthenticated("Missing authorization header")), + _ => { + crate::metrics::AUTH_FAILURES_TOTAL.inc(); + Err(Status::unauthenticated("Missing authorization header")) + } } } } diff --git a/furumi-server/src/server.rs b/furumi-server/src/server.rs index abb09b8..6d76bb5 100644 --- a/furumi-server/src/server.rs +++ b/furumi-server/src/server.rs @@ -3,6 +3,7 @@ use tokio_stream::Stream; use tonic::{Request, Response, Status}; use crate::vfs::VirtualFileSystem; +use crate::metrics::{self, RequestTimer}; use furumi_common::proto::{ remote_file_system_server::RemoteFileSystem, AttrResponse, DirEntry, FileChunk, PathRequest, ReadRequest, @@ -25,12 +26,20 @@ impl RemoteFileSystem for RemoteFileSystemImpl { &self, request: Request, ) -> Result, Status> { + let timer = RequestTimer::new("GetAttr"); let req = request.into_inner(); let safe_path = sanitize_path(&req.path)?; match self.vfs.get_attr(&safe_path).await { - Ok(attr) => Ok(Response::new(attr)), - Err(e) => Err(Status::internal(e.to_string())), + Ok(attr) => { + timer.finish_ok(); + Ok(Response::new(attr)) + } + Err(e) => { + metrics::FILE_OPEN_ERRORS_TOTAL.inc(); + timer.finish_err(); + Err(Status::internal(e.to_string())) + } } } @@ -44,11 +53,14 @@ impl RemoteFileSystem for RemoteFileSystemImpl { &self, request: Request, ) -> Result, Status> { + let timer = RequestTimer::new("ReadDir"); let req = request.into_inner(); let safe_path = sanitize_path(&req.path)?; match self.vfs.read_dir(&safe_path).await { Ok(mut rx) => { + timer.finish_ok(); + metrics::ACTIVE_STREAMS.inc(); let stream = async_stream::try_stream! { while let Some(result) = rx.recv().await { match result { @@ -56,10 +68,14 @@ impl RemoteFileSystem for RemoteFileSystemImpl { Err(e) => Err(Status::internal(e.to_string()))?, } } + metrics::ACTIVE_STREAMS.dec(); }; Ok(Response::new(Box::pin(stream) as Self::ReadDirStream)) } - Err(e) => Err(Status::internal(e.to_string())), + Err(e) => { + timer.finish_err(); + Err(Status::internal(e.to_string())) + } } } @@ -73,6 +89,7 @@ impl RemoteFileSystem for RemoteFileSystemImpl { &self, request: Request, ) -> Result, Status> { + let timer = RequestTimer::new("ReadFile"); let req = request.into_inner(); let safe_path = sanitize_path(&req.path)?; @@ -85,17 +102,31 @@ impl RemoteFileSystem for RemoteFileSystemImpl { match self.vfs.read_file(sanitized_req).await { Ok(mut rx) => { + timer.finish_ok(); + metrics::ACTIVE_STREAMS.inc(); let stream = async_stream::try_stream! { while let Some(result) = rx.recv().await { match result { - Ok(chunk) => yield chunk, - Err(e) => Err(Status::internal(e.to_string()))?, + Ok(chunk) => { + metrics::BYTES_READ_TOTAL.inc_by(chunk.data.len() as f64); + yield chunk; + } + Err(e) => { + metrics::FILE_OPEN_ERRORS_TOTAL.inc(); + Err(Status::internal(e.to_string()))?; + } } } + metrics::ACTIVE_STREAMS.dec(); }; Ok(Response::new(Box::pin(stream) as Self::ReadFileStream)) } - Err(e) => Err(Status::internal(e.to_string())), + Err(e) => { + metrics::FILE_OPEN_ERRORS_TOTAL.inc(); + timer.finish_err(); + Err(Status::internal(e.to_string())) + } } } } +