Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c54af23845 | ||
|
|
dd3f3721b2 | ||
|
|
dc77933c9e |
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -585,7 +585,7 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
|
||||
|
||||
[[package]]
|
||||
name = "furumi-client-core"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -607,7 +607,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "furumi-common"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"prost",
|
||||
"protobuf-src",
|
||||
@@ -617,7 +617,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "furumi-mount-linux"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
@@ -634,7 +634,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "furumi-mount-macos"
|
||||
version = "0.1.0"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -652,7 +652,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "furumi-server"
|
||||
version = "0.1.0"
|
||||
version = "0.2.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-stream",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "furumi-client-core"
|
||||
version = "0.1.0"
|
||||
version = "0.2.1"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -155,7 +155,8 @@ impl FurumiClient {
|
||||
.timeout(Duration::from_secs(30))
|
||||
.concurrency_limit(256)
|
||||
.tcp_keepalive(Some(Duration::from_secs(60)))
|
||||
.http2_keep_alive_interval(Duration::from_secs(60));
|
||||
.http2_keep_alive_interval(Duration::from_secs(60))
|
||||
.keep_alive_while_idle(true);
|
||||
|
||||
let channel = if is_https {
|
||||
info!("TLS enabled (encryption only, certificate verification disabled)");
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "furumi-common"
|
||||
version = "0.1.0"
|
||||
version = "0.2.1"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "furumi-mount-linux"
|
||||
version = "0.1.0"
|
||||
version = "0.2.1"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "furumi-mount-macos"
|
||||
version = "0.1.0"
|
||||
version = "0.2.1"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "furumi-server"
|
||||
version = "0.1.0"
|
||||
version = "0.2.1"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@@ -100,6 +100,24 @@ impl RequestTimer {
|
||||
}
|
||||
}
|
||||
|
||||
/// An RAII guard that increments the ACTIVE_STREAMS gauge when created
|
||||
/// and decrements it when dropped. This ensures streams are correctly counted
|
||||
/// even if they terminate abruptly.
|
||||
pub struct ActiveStreamGuard;
|
||||
|
||||
impl ActiveStreamGuard {
|
||||
pub fn new() -> Self {
|
||||
ACTIVE_STREAMS.inc();
|
||||
Self
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ActiveStreamGuard {
|
||||
fn drop(&mut self) {
|
||||
ACTIVE_STREAMS.dec();
|
||||
}
|
||||
}
|
||||
|
||||
/// Render all registered metrics in Prometheus text format.
|
||||
pub fn render_metrics() -> String {
|
||||
let encoder = TextEncoder::new();
|
||||
|
||||
@@ -60,15 +60,14 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
|
||||
match self.vfs.read_dir(&safe_path).await {
|
||||
Ok(mut rx) => {
|
||||
timer.finish_ok();
|
||||
metrics::ACTIVE_STREAMS.inc();
|
||||
let stream = async_stream::try_stream! {
|
||||
let _guard = metrics::ActiveStreamGuard::new();
|
||||
while let Some(result) = rx.recv().await {
|
||||
match result {
|
||||
Ok(entry) => yield entry,
|
||||
Err(e) => Err(Status::internal(e.to_string()))?,
|
||||
}
|
||||
}
|
||||
metrics::ACTIVE_STREAMS.dec();
|
||||
};
|
||||
Ok(Response::new(Box::pin(stream) as Self::ReadDirStream))
|
||||
}
|
||||
@@ -103,8 +102,8 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
|
||||
match self.vfs.read_file(sanitized_req).await {
|
||||
Ok(mut rx) => {
|
||||
timer.finish_ok();
|
||||
metrics::ACTIVE_STREAMS.inc();
|
||||
let stream = async_stream::try_stream! {
|
||||
let _guard = metrics::ActiveStreamGuard::new();
|
||||
while let Some(result) = rx.recv().await {
|
||||
match result {
|
||||
Ok(chunk) => {
|
||||
@@ -117,7 +116,6 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
|
||||
}
|
||||
}
|
||||
}
|
||||
metrics::ACTIVE_STREAMS.dec();
|
||||
};
|
||||
Ok(Response::new(Box::pin(stream) as Self::ReadFileStream))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user