diff --git a/furumi-server/Cargo.toml b/furumi-server/Cargo.toml index 8bbd61b..253f7d5 100644 --- a/furumi-server/Cargo.toml +++ b/furumi-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "furumi-server" -version = "0.1.0" +version = "0.2.0" edition = "2024" [dependencies] diff --git a/furumi-server/src/metrics.rs b/furumi-server/src/metrics.rs index 095f711..65e447f 100644 --- a/furumi-server/src/metrics.rs +++ b/furumi-server/src/metrics.rs @@ -100,6 +100,24 @@ impl RequestTimer { } } +/// An RAII guard that increments the ACTIVE_STREAMS gauge when created +/// and decrements it when dropped. This ensures streams are correctly counted +/// even if they terminate abruptly. +pub struct ActiveStreamGuard; + +impl ActiveStreamGuard { + pub fn new() -> Self { + ACTIVE_STREAMS.inc(); + Self + } +} + +impl Drop for ActiveStreamGuard { + fn drop(&mut self) { + ACTIVE_STREAMS.dec(); + } +} + /// Render all registered metrics in Prometheus text format. pub fn render_metrics() -> String { let encoder = TextEncoder::new(); diff --git a/furumi-server/src/server.rs b/furumi-server/src/server.rs index 6d76bb5..1142e04 100644 --- a/furumi-server/src/server.rs +++ b/furumi-server/src/server.rs @@ -60,15 +60,14 @@ impl RemoteFileSystem for RemoteFileSystemImpl { match self.vfs.read_dir(&safe_path).await { Ok(mut rx) => { timer.finish_ok(); - metrics::ACTIVE_STREAMS.inc(); let stream = async_stream::try_stream! { + let _guard = metrics::ActiveStreamGuard::new(); while let Some(result) = rx.recv().await { match result { Ok(entry) => yield entry, Err(e) => Err(Status::internal(e.to_string()))?, } } - metrics::ACTIVE_STREAMS.dec(); }; Ok(Response::new(Box::pin(stream) as Self::ReadDirStream)) } @@ -103,8 +102,8 @@ impl RemoteFileSystem for RemoteFileSystemImpl { match self.vfs.read_file(sanitized_req).await { Ok(mut rx) => { timer.finish_ok(); - metrics::ACTIVE_STREAMS.inc(); let stream = async_stream::try_stream! { + let _guard = metrics::ActiveStreamGuard::new(); while let Some(result) = rx.recv().await { match result { Ok(chunk) => { @@ -117,7 +116,6 @@ impl RemoteFileSystem for RemoteFileSystemImpl { } } } - metrics::ACTIVE_STREAMS.dec(); }; Ok(Response::new(Box::pin(stream) as Self::ReadFileStream)) }