Added autounmount on stop. Added prom metrics to server 9090 /metrics
This commit is contained in:
@@ -1,9 +1,12 @@
|
||||
pub mod vfs;
|
||||
pub mod security;
|
||||
pub mod server;
|
||||
pub mod metrics;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use axum::{Router, routing::get};
|
||||
use clap::Parser;
|
||||
use tonic::transport::Server;
|
||||
use vfs::local::LocalVfs;
|
||||
@@ -14,7 +17,7 @@ use security::AuthInterceptor;
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version, about, long_about = None)]
|
||||
struct Args {
|
||||
/// IP address and port to bind the server to
|
||||
/// IP address and port to bind the gRPC server to
|
||||
#[arg(short, long, env = "FURUMI_BIND", default_value = "[::1]:50051")]
|
||||
bind: String,
|
||||
|
||||
@@ -25,6 +28,14 @@ struct Args {
|
||||
/// Authentication Bearer token (leave empty to disable auth)
|
||||
#[arg(short, long, env = "FURUMI_TOKEN", default_value = "")]
|
||||
token: String,
|
||||
|
||||
/// IP address and port for the Prometheus metrics HTTP endpoint
|
||||
#[arg(long, env = "FURUMI_METRICS_BIND", default_value = "0.0.0.0:9090")]
|
||||
metrics_bind: String,
|
||||
}
|
||||
|
||||
async fn metrics_handler() -> String {
|
||||
metrics::render_metrics()
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@@ -32,7 +43,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let args = Args::parse();
|
||||
let addr = args.bind.parse()?;
|
||||
let addr: SocketAddr = args.bind.parse().unwrap_or_else(|e| {
|
||||
eprintln!("Error: Invalid bind address '{}': {}", args.bind, e);
|
||||
eprintln!(" Expected format: IP:PORT (e.g. 0.0.0.0:50051 or [::1]:50051)");
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
let metrics_addr: SocketAddr = args.metrics_bind.parse().unwrap_or_else(|e| {
|
||||
eprintln!("Error: Invalid metrics bind address '{}': {}", args.metrics_bind, e);
|
||||
eprintln!(" Expected format: IP:PORT (e.g. 0.0.0.0:9090)");
|
||||
std::process::exit(1);
|
||||
});
|
||||
|
||||
// Resolve the document root to an absolute path for safety and clarity
|
||||
let root_path = std::fs::canonicalize(&args.root)
|
||||
@@ -55,6 +76,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
println!("Authentication is enabled (Bearer token required)");
|
||||
}
|
||||
println!("Document Root: {:?}", root_path);
|
||||
println!("Metrics endpoint: http://{}/metrics", metrics_addr);
|
||||
|
||||
// Spawn the Prometheus metrics HTTP server on a separate port
|
||||
let metrics_app = Router::new().route("/metrics", get(metrics_handler));
|
||||
let metrics_listener = tokio::net::TcpListener::bind(metrics_addr).await?;
|
||||
tokio::spawn(async move {
|
||||
axum::serve(metrics_listener, metrics_app).await.unwrap();
|
||||
});
|
||||
|
||||
Server::builder()
|
||||
// Enable TCP Keep-Alive and HTTP2 Ping to keep connections alive for long media streams
|
||||
@@ -66,3 +95,4 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
110
furumi-server/src/metrics.rs
Normal file
110
furumi-server/src/metrics.rs
Normal file
@@ -0,0 +1,110 @@
|
||||
use once_cell::sync::Lazy;
|
||||
use prometheus::{
|
||||
register_counter, register_counter_vec, register_gauge, register_histogram_vec,
|
||||
Counter, CounterVec, Encoder, Gauge, HistogramVec, TextEncoder,
|
||||
};
|
||||
use std::time::Instant;
|
||||
|
||||
// --- Counters ---
|
||||
|
||||
pub static GRPC_REQUESTS_TOTAL: Lazy<CounterVec> = Lazy::new(|| {
|
||||
register_counter_vec!(
|
||||
"furumi_grpc_requests_total",
|
||||
"Total number of gRPC requests",
|
||||
&["method", "status"]
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static BYTES_READ_TOTAL: Lazy<Counter> = Lazy::new(|| {
|
||||
register_counter!(
|
||||
"furumi_bytes_read_total",
|
||||
"Total number of bytes read from disk and streamed to clients"
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static FILE_OPEN_ERRORS_TOTAL: Lazy<Counter> = Lazy::new(|| {
|
||||
register_counter!(
|
||||
"furumi_file_open_errors_total",
|
||||
"Total number of file open errors (not found, permission denied, etc.)"
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static AUTH_FAILURES_TOTAL: Lazy<Counter> = Lazy::new(|| {
|
||||
register_counter!(
|
||||
"furumi_auth_failures_total",
|
||||
"Total number of authentication failures"
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
// --- Histogram ---
|
||||
|
||||
pub static GRPC_REQUEST_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"furumi_grpc_request_duration_seconds",
|
||||
"Duration of gRPC requests in seconds",
|
||||
&["method"],
|
||||
vec![0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0]
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
// --- Gauges ---
|
||||
|
||||
pub static ACTIVE_STREAMS: Lazy<Gauge> = Lazy::new(|| {
|
||||
register_gauge!(
|
||||
"furumi_active_streams",
|
||||
"Number of currently active streaming connections (ReadFile/ReadDir)"
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
/// Helper to track and record a gRPC call's duration and status.
|
||||
pub struct RequestTimer {
|
||||
method: &'static str,
|
||||
start: Instant,
|
||||
}
|
||||
|
||||
impl RequestTimer {
|
||||
pub fn new(method: &'static str) -> Self {
|
||||
GRPC_REQUESTS_TOTAL
|
||||
.with_label_values(&[method, "started"])
|
||||
.inc();
|
||||
Self {
|
||||
method,
|
||||
start: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn finish_ok(self) {
|
||||
let elapsed = self.start.elapsed().as_secs_f64();
|
||||
GRPC_REQUEST_DURATION
|
||||
.with_label_values(&[self.method])
|
||||
.observe(elapsed);
|
||||
GRPC_REQUESTS_TOTAL
|
||||
.with_label_values(&[self.method, "ok"])
|
||||
.inc();
|
||||
}
|
||||
|
||||
pub fn finish_err(self) {
|
||||
let elapsed = self.start.elapsed().as_secs_f64();
|
||||
GRPC_REQUEST_DURATION
|
||||
.with_label_values(&[self.method])
|
||||
.observe(elapsed);
|
||||
GRPC_REQUESTS_TOTAL
|
||||
.with_label_values(&[self.method, "error"])
|
||||
.inc();
|
||||
}
|
||||
}
|
||||
|
||||
/// Render all registered metrics in Prometheus text format.
|
||||
pub fn render_metrics() -> String {
|
||||
let encoder = TextEncoder::new();
|
||||
let metric_families = prometheus::gather();
|
||||
let mut buffer = Vec::new();
|
||||
encoder.encode(&metric_families, &mut buffer).unwrap();
|
||||
String::from_utf8(buffer).unwrap()
|
||||
}
|
||||
@@ -55,10 +55,14 @@ impl tonic::service::Interceptor for AuthInterceptor {
|
||||
if token_str == expected_header {
|
||||
Ok(req)
|
||||
} else {
|
||||
crate::metrics::AUTH_FAILURES_TOTAL.inc();
|
||||
Err(Status::unauthenticated("Invalid token"))
|
||||
}
|
||||
}
|
||||
_ => Err(Status::unauthenticated("Missing authorization header")),
|
||||
_ => {
|
||||
crate::metrics::AUTH_FAILURES_TOTAL.inc();
|
||||
Err(Status::unauthenticated("Missing authorization header"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ use tokio_stream::Stream;
|
||||
use tonic::{Request, Response, Status};
|
||||
|
||||
use crate::vfs::VirtualFileSystem;
|
||||
use crate::metrics::{self, RequestTimer};
|
||||
use furumi_common::proto::{
|
||||
remote_file_system_server::RemoteFileSystem, AttrResponse, DirEntry, FileChunk,
|
||||
PathRequest, ReadRequest,
|
||||
@@ -25,12 +26,20 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
|
||||
&self,
|
||||
request: Request<PathRequest>,
|
||||
) -> Result<Response<AttrResponse>, Status> {
|
||||
let timer = RequestTimer::new("GetAttr");
|
||||
let req = request.into_inner();
|
||||
let safe_path = sanitize_path(&req.path)?;
|
||||
|
||||
match self.vfs.get_attr(&safe_path).await {
|
||||
Ok(attr) => Ok(Response::new(attr)),
|
||||
Err(e) => Err(Status::internal(e.to_string())),
|
||||
Ok(attr) => {
|
||||
timer.finish_ok();
|
||||
Ok(Response::new(attr))
|
||||
}
|
||||
Err(e) => {
|
||||
metrics::FILE_OPEN_ERRORS_TOTAL.inc();
|
||||
timer.finish_err();
|
||||
Err(Status::internal(e.to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,11 +53,14 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
|
||||
&self,
|
||||
request: Request<PathRequest>,
|
||||
) -> Result<Response<Self::ReadDirStream>, Status> {
|
||||
let timer = RequestTimer::new("ReadDir");
|
||||
let req = request.into_inner();
|
||||
let safe_path = sanitize_path(&req.path)?;
|
||||
|
||||
match self.vfs.read_dir(&safe_path).await {
|
||||
Ok(mut rx) => {
|
||||
timer.finish_ok();
|
||||
metrics::ACTIVE_STREAMS.inc();
|
||||
let stream = async_stream::try_stream! {
|
||||
while let Some(result) = rx.recv().await {
|
||||
match result {
|
||||
@@ -56,10 +68,14 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
|
||||
Err(e) => Err(Status::internal(e.to_string()))?,
|
||||
}
|
||||
}
|
||||
metrics::ACTIVE_STREAMS.dec();
|
||||
};
|
||||
Ok(Response::new(Box::pin(stream) as Self::ReadDirStream))
|
||||
}
|
||||
Err(e) => Err(Status::internal(e.to_string())),
|
||||
Err(e) => {
|
||||
timer.finish_err();
|
||||
Err(Status::internal(e.to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,6 +89,7 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
|
||||
&self,
|
||||
request: Request<ReadRequest>,
|
||||
) -> Result<Response<Self::ReadFileStream>, Status> {
|
||||
let timer = RequestTimer::new("ReadFile");
|
||||
let req = request.into_inner();
|
||||
let safe_path = sanitize_path(&req.path)?;
|
||||
|
||||
@@ -85,17 +102,31 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
|
||||
|
||||
match self.vfs.read_file(sanitized_req).await {
|
||||
Ok(mut rx) => {
|
||||
timer.finish_ok();
|
||||
metrics::ACTIVE_STREAMS.inc();
|
||||
let stream = async_stream::try_stream! {
|
||||
while let Some(result) = rx.recv().await {
|
||||
match result {
|
||||
Ok(chunk) => yield chunk,
|
||||
Err(e) => Err(Status::internal(e.to_string()))?,
|
||||
Ok(chunk) => {
|
||||
metrics::BYTES_READ_TOTAL.inc_by(chunk.data.len() as f64);
|
||||
yield chunk;
|
||||
}
|
||||
Err(e) => {
|
||||
metrics::FILE_OPEN_ERRORS_TOTAL.inc();
|
||||
Err(Status::internal(e.to_string()))?;
|
||||
}
|
||||
}
|
||||
}
|
||||
metrics::ACTIVE_STREAMS.dec();
|
||||
};
|
||||
Ok(Response::new(Box::pin(stream) as Self::ReadFileStream))
|
||||
}
|
||||
Err(e) => Err(Status::internal(e.to_string())),
|
||||
Err(e) => {
|
||||
metrics::FILE_OPEN_ERRORS_TOTAL.inc();
|
||||
timer.finish_err();
|
||||
Err(Status::internal(e.to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user