3 Commits

Author SHA1 Message Date
Ultradesu
c54af23845 Fix logging
Some checks failed
Publish Server Image / build-and-push-image (push) Has been cancelled
2026-03-11 10:29:31 +00:00
Ultradesu
dd3f3721b2 Adjust logging. Added tcp ping 2026-03-11 10:12:57 +00:00
Ultradesu
dc77933c9e Fix active streams metric
All checks were successful
Publish Server Image / build-and-push-image (push) Successful in 4m50s
2026-03-11 01:39:29 +00:00
9 changed files with 32 additions and 15 deletions

10
Cargo.lock generated
View File

@@ -585,7 +585,7 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]] [[package]]
name = "furumi-client-core" name = "furumi-client-core"
version = "0.1.0" version = "0.2.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@@ -607,7 +607,7 @@ dependencies = [
[[package]] [[package]]
name = "furumi-common" name = "furumi-common"
version = "0.1.0" version = "0.2.0"
dependencies = [ dependencies = [
"prost", "prost",
"protobuf-src", "protobuf-src",
@@ -617,7 +617,7 @@ dependencies = [
[[package]] [[package]]
name = "furumi-mount-linux" name = "furumi-mount-linux"
version = "0.1.0" version = "0.2.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"clap", "clap",
@@ -634,7 +634,7 @@ dependencies = [
[[package]] [[package]]
name = "furumi-mount-macos" name = "furumi-mount-macos"
version = "0.1.0" version = "0.2.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@@ -652,7 +652,7 @@ dependencies = [
[[package]] [[package]]
name = "furumi-server" name = "furumi-server"
version = "0.1.0" version = "0.2.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "furumi-client-core" name = "furumi-client-core"
version = "0.1.0" version = "0.2.1"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@@ -155,7 +155,8 @@ impl FurumiClient {
.timeout(Duration::from_secs(30)) .timeout(Duration::from_secs(30))
.concurrency_limit(256) .concurrency_limit(256)
.tcp_keepalive(Some(Duration::from_secs(60))) .tcp_keepalive(Some(Duration::from_secs(60)))
.http2_keep_alive_interval(Duration::from_secs(60)); .http2_keep_alive_interval(Duration::from_secs(60))
.keep_alive_while_idle(true);
let channel = if is_https { let channel = if is_https {
info!("TLS enabled (encryption only, certificate verification disabled)"); info!("TLS enabled (encryption only, certificate verification disabled)");

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "furumi-common" name = "furumi-common"
version = "0.1.0" version = "0.2.1"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "furumi-mount-linux" name = "furumi-mount-linux"
version = "0.1.0" version = "0.2.1"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "furumi-mount-macos" name = "furumi-mount-macos"
version = "0.1.0" version = "0.2.1"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "furumi-server" name = "furumi-server"
version = "0.1.0" version = "0.2.1"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@@ -100,6 +100,24 @@ impl RequestTimer {
} }
} }
/// An RAII guard that increments the ACTIVE_STREAMS gauge when created
/// and decrements it when dropped. This ensures streams are correctly counted
/// even if they terminate abruptly.
pub struct ActiveStreamGuard;
impl ActiveStreamGuard {
pub fn new() -> Self {
ACTIVE_STREAMS.inc();
Self
}
}
impl Drop for ActiveStreamGuard {
fn drop(&mut self) {
ACTIVE_STREAMS.dec();
}
}
/// Render all registered metrics in Prometheus text format. /// Render all registered metrics in Prometheus text format.
pub fn render_metrics() -> String { pub fn render_metrics() -> String {
let encoder = TextEncoder::new(); let encoder = TextEncoder::new();

View File

@@ -60,15 +60,14 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
match self.vfs.read_dir(&safe_path).await { match self.vfs.read_dir(&safe_path).await {
Ok(mut rx) => { Ok(mut rx) => {
timer.finish_ok(); timer.finish_ok();
metrics::ACTIVE_STREAMS.inc();
let stream = async_stream::try_stream! { let stream = async_stream::try_stream! {
let _guard = metrics::ActiveStreamGuard::new();
while let Some(result) = rx.recv().await { while let Some(result) = rx.recv().await {
match result { match result {
Ok(entry) => yield entry, Ok(entry) => yield entry,
Err(e) => Err(Status::internal(e.to_string()))?, Err(e) => Err(Status::internal(e.to_string()))?,
} }
} }
metrics::ACTIVE_STREAMS.dec();
}; };
Ok(Response::new(Box::pin(stream) as Self::ReadDirStream)) Ok(Response::new(Box::pin(stream) as Self::ReadDirStream))
} }
@@ -103,8 +102,8 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
match self.vfs.read_file(sanitized_req).await { match self.vfs.read_file(sanitized_req).await {
Ok(mut rx) => { Ok(mut rx) => {
timer.finish_ok(); timer.finish_ok();
metrics::ACTIVE_STREAMS.inc();
let stream = async_stream::try_stream! { let stream = async_stream::try_stream! {
let _guard = metrics::ActiveStreamGuard::new();
while let Some(result) = rx.recv().await { while let Some(result) = rx.recv().await {
match result { match result {
Ok(chunk) => { Ok(chunk) => {
@@ -117,7 +116,6 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
} }
} }
} }
metrics::ACTIVE_STREAMS.dec();
}; };
Ok(Response::new(Box::pin(stream) as Self::ReadFileStream)) Ok(Response::new(Box::pin(stream) as Self::ReadFileStream))
} }