From 27ee56c5b72467fca756c12ed6ec52533ddd5ccf Mon Sep 17 00:00:00 2001 From: AB Date: Mon, 1 Jun 2026 16:08:49 +0300 Subject: [PATCH] CORE: added prom metrics on /metrics --- Cargo.lock | 3 +- Cargo.toml | 3 +- src/agent/cover_art.rs | 12 +- src/agent/cover_variants.rs | 20 +- src/agent/normalize.rs | 32 +- src/auth.rs | 4 + src/jobs/inbox_discover.rs | 36 +- src/jobs/inbox_process.rs | 46 +- src/main.rs | 59 +++ src/metrics.rs | 843 ++++++++++++++++++++++++++++++++++++ src/oidc.rs | 15 + src/player/mod.rs | 3 + src/scheduler/mod.rs | 12 + src/torrents.rs | 25 +- 14 files changed, 1097 insertions(+), 16 deletions(-) create mode 100644 src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 01869eb..f664a19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1418,7 +1418,7 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "furumusic" -version = "0.2.10" +version = "0.2.11" dependencies = [ "anyhow", "async-trait", @@ -1441,6 +1441,7 @@ dependencies = [ "symphonia", "tokio", "tokio-cron-scheduler", + "tower", "tracing", "tracing-subscriber", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 61e6f20..46f0782 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "furumusic" -version = "0.2.11" +version = "0.2.12" edition = "2024" description = "Reusable web-app boilerplate: auth, OIDC/SSO, admin panel, user management, i18n, PostgreSQL" @@ -11,6 +11,7 @@ serde = { version = "1", features = ["derive"] } openidconnect = "4.0" reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json"] } tokio = { version = "1", features = ["sync", "fs", "io-util"] } +tower = "0.5" base64 = "0.22" serde_json = "1" tracing = "0.1" diff --git a/src/agent/cover_art.rs b/src/agent/cover_art.rs index 2188d0b..ac5ca94 100644 --- a/src/agent/cover_art.rs +++ b/src/agent/cover_art.rs @@ -128,13 +128,21 @@ pub async fn find_best_cover(folder: &Path, audio_files: &[PathBuf]) -> Option { let mime = mime_for_image(img_path); + crate::metrics::record_agent_cover_lookup("folder", "ok", data.len()); return Some(CoverImage { data, mime_type: mime, source: CoverSource::FolderFile(img_path.clone()), }); } - _ => continue, + Ok(_) => { + crate::metrics::record_agent_cover_lookup("folder", "empty", 0); + continue; + } + Err(_) => { + crate::metrics::record_agent_cover_lookup("folder", "error", 0); + continue; + } } } @@ -143,10 +151,12 @@ pub async fn find_best_cover(folder: &Path, audio_files: &[PathBuf]) -> Option crate::metrics::record_agent_cover_variant( + variant.name, + "ok", + start.elapsed(), + ), + Err(err) => { + crate::metrics::record_agent_cover_variant( + variant.name, + "error", + start.elapsed(), + ); + return Err(err.into()); + } + } std::fs::write(path, output)?; created += 1; } diff --git a/src/agent/normalize.rs b/src/agent/normalize.rs index 58bb466..57d472b 100644 --- a/src/agent/normalize.rs +++ b/src/agent/normalize.rs @@ -330,6 +330,7 @@ pub async fn normalize_batch( // If over 80% of context limit and more than 1 file, split let limit_80 = context_limit * 80 / 100; if estimated > limit_80 && files.len() > 1 { + crate::metrics::record_agent_llm_split("estimated_context"); tracing::info!( estimated_tokens = estimated, context_limit, @@ -419,6 +420,7 @@ pub async fn normalize_batch( || err_str.contains("length") || err_str.contains("token"); if is_context_error { + crate::metrics::record_agent_llm_split("context_error"); tracing::warn!( file_count = files.len(), "LLM error suggests context overflow, splitting batch: {e}" @@ -466,14 +468,40 @@ pub async fn normalize_batch( } return Err(e); } - Err(e) => return Err(e), + Err(e) => { + crate::metrics::record_agent_llm( + llm_model, + "error", + std::time::Duration::from_millis(duration_ms), + 0, + 0, + files.len(), + Some(estimated), + ); + return Err(e); + } }; let prompt_tokens = usage.prompt_tokens.unwrap_or(0) as u64; let completion_tokens = usage.completion_tokens.unwrap_or(0) as u64; + crate::metrics::record_agent_llm( + &resp_model, + "ok", + std::time::Duration::from_millis(duration_ms), + prompt_tokens, + completion_tokens, + files.len(), + Some(estimated), + ); // Parse batch response - let results = parse_batch_response(&response_text, &files)?; + let results = match parse_batch_response(&response_text, &files) { + Ok(results) => results, + Err(error) => { + crate::metrics::record_agent_llm_parse_failure(&resp_model); + return Err(error); + } + }; Ok(BatchNormalizeResult { results, diff --git a/src/auth.rs b/src/auth.rs index ccb8f1c..a7908f2 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -61,6 +61,7 @@ pub async fn get_session_user(session: &Session, db: &Database) -> Option Result { let Some(user) = get_session_user(session, db).await else { + crate::metrics::record_authorization_denied("unauthenticated"); return Err(redirect("/login")); }; if user.role != Role::Admin { + crate::metrics::record_authorization_denied("forbidden"); return Err("Forbidden" .with_status(cot::http::StatusCode::FORBIDDEN) .into_response() @@ -96,6 +99,7 @@ pub async fn login(session: &Session, user_id: i64) -> cot::Result<()> { .insert(SESSION_USER_ID, user_id) .await .map_err(|e| cot::Error::internal(e.to_string()))?; + crate::metrics::record_active_user(user_id); Ok(()) } diff --git a/src/jobs/inbox_discover.rs b/src/jobs/inbox_discover.rs index d3e8cec..7c66f36 100644 --- a/src/jobs/inbox_discover.rs +++ b/src/jobs/inbox_discover.rs @@ -29,6 +29,8 @@ impl Job for InboxDiscoverJob { } async fn run(&self, ctx: &JobContext, log: &mut JobLog) -> anyhow::Result<()> { + let run_start = std::time::Instant::now(); + let run_outcome = "completed"; // Prevent overlapping discover runs if DISCOVER_RUNNING .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) @@ -44,6 +46,19 @@ impl Job for InboxDiscoverJob { } } let _guard = Guard; + struct MetricsGuard { + start: std::time::Instant, + outcome: &'static str, + } + impl Drop for MetricsGuard { + fn drop(&mut self) { + crate::metrics::record_agent_discover_run(self.outcome, self.start.elapsed()); + } + } + let mut metrics_guard = MetricsGuard { + start: run_start, + outcome: run_outcome, + }; let config = &ctx.config; @@ -97,6 +112,7 @@ impl Job for InboxDiscoverJob { // Compute SHA-256 hash let path_clone = file_path.to_path_buf(); + let hash_start = std::time::Instant::now(); let (hash, file_size) = match tokio::task::spawn_blocking(move || -> anyhow::Result<(String, i64)> { let data = std::fs::read(&path_clone)?; @@ -107,8 +123,12 @@ impl Job for InboxDiscoverJob { }) .await? { - Ok(v) => v, + Ok(v) => { + crate::metrics::record_agent_file_hash(hash_start.elapsed(), v.1, "ok"); + v + } Err(e) => { + crate::metrics::record_agent_file_hash(hash_start.elapsed(), 0, "error"); log.warn(&format!("Failed to hash {}: {e}", file_path.display())); continue; } @@ -125,13 +145,18 @@ impl Job for InboxDiscoverJob { // Extract raw metadata let path_for_meta = file_path.to_path_buf(); + let metadata_start = std::time::Instant::now(); let raw_meta = match tokio::task::spawn_blocking(move || { crate::agent::metadata::extract(&path_for_meta) }) .await? { - Ok(m) => m, + Ok(m) => { + crate::metrics::record_agent_metadata(metadata_start.elapsed(), "ok"); + m + } Err(e) => { + crate::metrics::record_agent_metadata(metadata_start.elapsed(), "error"); log.warn(&format!( "Failed to extract metadata from {}: {e}", file_path.display() @@ -189,6 +214,12 @@ impl Job for InboxDiscoverJob { "Discovered {} new files, skipped {} (hash known), skipped {} (already queued)", discovered, skipped_hash, skipped_existing )); + crate::metrics::record_agent_discover_files( + audio_files.len() as u64, + discovered, + skipped_hash, + skipped_existing, + ); // Trigger inbox_process in background if new files were discovered // and no orchestrator is already running @@ -219,6 +250,7 @@ impl Job for InboxDiscoverJob { } } + metrics_guard.outcome = run_outcome; Ok(()) } } diff --git a/src/jobs/inbox_process.rs b/src/jobs/inbox_process.rs index c4445f3..6dd2d7b 100644 --- a/src/jobs/inbox_process.rs +++ b/src/jobs/inbox_process.rs @@ -307,12 +307,18 @@ async fn process_folder_batch( // Extract metadata (with 60s timeout) let path_for_meta = file_path.to_path_buf(); + let metadata_start = std::time::Instant::now(); let meta_future = tokio::task::spawn_blocking(move || crate::agent::metadata::extract(&path_for_meta)); let raw_meta = match tokio::time::timeout(std::time::Duration::from_secs(60), meta_future).await { - Ok(Ok(Ok(m))) => m, + Ok(Ok(Ok(m))) => { + crate::metrics::record_agent_metadata(metadata_start.elapsed(), "ok"); + m + } Ok(Ok(Err(e))) => { + crate::metrics::record_agent_metadata(metadata_start.elapsed(), "error"); + crate::metrics::record_agent_failed("metadata"); let msg = format!("{filename}: metadata error: {e}"); log.error(&msg); let _ = review.set_failed(db, &msg).await; @@ -320,6 +326,8 @@ async fn process_folder_batch( continue; } Ok(Err(e)) => { + crate::metrics::record_agent_metadata(metadata_start.elapsed(), "panic"); + crate::metrics::record_agent_failed("metadata"); let msg = format!("{filename}: metadata panic: {e}"); log.error(&msg); let _ = review.set_failed(db, &msg).await; @@ -327,6 +335,8 @@ async fn process_folder_batch( continue; } Err(_) => { + crate::metrics::record_agent_metadata(metadata_start.elapsed(), "timeout"); + crate::metrics::record_agent_failed("metadata"); let msg = format!("{filename}: metadata timeout (60s)"); log.error(&msg); let _ = review.set_failed(db, &msg).await; @@ -416,6 +426,7 @@ async fn process_folder_batch( // Lookup all unique artist queries let mut all_similar_artists = Vec::new(); for q in &artist_queries { + let rag_start = std::time::Instant::now(); match tokio::time::timeout( std::time::Duration::from_secs(30), crate::agent::rag::find_similar_artists(pool, q, 5), @@ -423,6 +434,7 @@ async fn process_folder_batch( .await { Ok(Ok(results)) => { + crate::metrics::record_agent_rag("artist", "ok", rag_start.elapsed(), results.len()); for a in results { if !all_similar_artists .iter() @@ -432,13 +444,20 @@ async fn process_folder_batch( } } } - Ok(Err(e)) => log.warn(&format!("RAG artist lookup failed for \"{q}\": {e}")), - Err(_) => log.warn(&format!("RAG artist lookup timed out for \"{q}\"")), + Ok(Err(e)) => { + crate::metrics::record_agent_rag("artist", "error", rag_start.elapsed(), 0); + log.warn(&format!("RAG artist lookup failed for \"{q}\": {e}")) + } + Err(_) => { + crate::metrics::record_agent_rag("artist", "timeout", rag_start.elapsed(), 0); + log.warn(&format!("RAG artist lookup timed out for \"{q}\"")) + } } } let mut all_similar_releases = Vec::new(); for q in &album_queries { + let rag_start = std::time::Instant::now(); match tokio::time::timeout( std::time::Duration::from_secs(30), crate::agent::rag::find_similar_releases(pool, q, 5), @@ -446,6 +465,7 @@ async fn process_folder_batch( .await { Ok(Ok(results)) => { + crate::metrics::record_agent_rag("release", "ok", rag_start.elapsed(), results.len()); for r in results { if !all_similar_releases .iter() @@ -455,8 +475,14 @@ async fn process_folder_batch( } } } - Ok(Err(e)) => log.warn(&format!("RAG release lookup failed for \"{q}\": {e}")), - Err(_) => log.warn(&format!("RAG release lookup timed out for \"{q}\"")), + Ok(Err(e)) => { + crate::metrics::record_agent_rag("release", "error", rag_start.elapsed(), 0); + log.warn(&format!("RAG release lookup failed for \"{q}\": {e}")) + } + Err(_) => { + crate::metrics::record_agent_rag("release", "timeout", rag_start.elapsed(), 0); + log.warn(&format!("RAG release lookup timed out for \"{q}\"")) + } } } @@ -547,11 +573,14 @@ async fn process_folder_batch( let batch_result = match llm_result { Ok(r) => r, Err(e) => { + crate::metrics::record_agent_failed("llm"); + crate::metrics::record_agent_folder_batch("failed", file_count, batch_start.elapsed()); let err_msg = format!("Batch LLM call failed: {e}"); log.error(&err_msg); // Mark all files as failed for mut p in prepared { let _ = p.review.set_failed(db, &err_msg).await; + crate::metrics::record_agent_file_processed("failed", "failed"); } let total_fail_count = failed_reviews.len() as u64 + file_count as u64; let duration_ms = batch_start.elapsed().as_millis() as i64; @@ -612,6 +641,7 @@ async fn process_folder_batch( let result_json = serde_json::to_string(normalized).unwrap_or_default(); let confidence = normalized.confidence.unwrap_or(0.0); + crate::metrics::observe_agent_confidence(confidence); let feat = if normalized.featured_artists.is_empty() { String::new() @@ -655,9 +685,12 @@ async fn process_folder_batch( { Ok(()) => { let _ = p.review.set_auto_approved(db).await; + crate::metrics::record_agent_file_processed("ok", "auto_approved"); ok_count += 1; } Err(e) => { + crate::metrics::record_agent_failed("finalize"); + crate::metrics::record_agent_file_processed("failed", "failed"); let msg = format!("{filename}: finalize failed: {e}"); log.error(&msg); let _ = p.review.set_failed(db, &msg).await; @@ -671,6 +704,7 @@ async fn process_folder_batch( ) .unwrap(); let _ = p.review.save(db).await; + crate::metrics::record_agent_file_processed("ok", "pending_review"); log.info(&format!( "{filename}: manual review (confidence {confidence} < {})", config.agent_confidence_threshold, @@ -682,9 +716,11 @@ async fn process_folder_batch( let duration_ms = batch_start.elapsed().as_millis() as i64; if fail_count == 0 { let _ = run.set_completed(db, duration_ms, &log.output()).await; + crate::metrics::record_agent_folder_batch("completed", file_count, batch_start.elapsed()); } else { let msg = format!("{fail_count} file(s) failed"); let _ = run.set_failed(db, duration_ms, &log.output(), &msg).await; + crate::metrics::record_agent_folder_batch("failed", file_count, batch_start.elapsed()); } (ok_count, fail_count) diff --git a/src/main.rs b/src/main.rs index 8455a9c..414a8a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ mod i18n; mod jobs; mod lastfm; mod media_paths; +mod metrics; mod music; mod oidc; mod player; @@ -139,12 +140,41 @@ async fn logout_handler(session: Session) -> cot::Result, + pool: Arc>, +) -> cot::Result> { + if config.database_url.is_empty() { + return Ok(cot::http::Response::builder() + .status(cot::http::StatusCode::SERVICE_UNAVAILABLE) + .header(cot::http::header::CONTENT_TYPE, "text/plain; version=0.0.4") + .body(Body::fixed("furumusic_metrics_unavailable 1\n")) + .expect("valid response")); + } + let pg_pool = pool + .get_or_init(|| async { + sqlx::postgres::PgPoolOptions::new() + .max_connections(2) + .connect(&config.database_url) + .await + .expect("metrics pool") + }) + .await; + let body = metrics::render(pg_pool, &config).await; + Ok(cot::http::Response::builder() + .status(cot::http::StatusCode::OK) + .header(cot::http::header::CONTENT_TYPE, "text/plain; version=0.0.4") + .body(Body::fixed(body)) + .expect("valid response")) +} + // --------------------------------------------------------------------------- // App // --------------------------------------------------------------------------- struct FuruApp { config: Arc, + pool: Arc>, } impl App for FuruApp { @@ -171,6 +201,19 @@ impl App for FuruApp { }, "index", ), + Route::with_handler_and_name( + "/metrics", + get({ + let config = Arc::clone(&self.config); + let pool = Arc::clone(&self.pool); + move || { + let config = Arc::clone(&config); + let pool = Arc::clone(&pool); + async move { metrics_handler(config, pool).await } + } + }), + "metrics", + ), Route::with_handler_and_name( "/login", get({ @@ -200,6 +243,11 @@ impl App for FuruApp { let data = match result { FormResult::Ok(data) => data, FormResult::ValidationError(_) => { + metrics::record_auth_attempt( + "password", + "failure", + "validation_error", + ); let msg = i18n.t.login_invalid.to_owned(); return login_page_handler(i18n, &config, db, msg) .await? @@ -216,6 +264,10 @@ impl App for FuruApp { PasswordVerificationResult::Ok | PasswordVerificationResult::OkObsolete(_) => { auth::login(&session, user.id_val()).await?; + metrics::record_auth_attempt( + "password", "success", "ok", + ); + metrics::record_session_created("password"); return Ok(auth::redirect("/")); } PasswordVerificationResult::Invalid => {} @@ -223,6 +275,11 @@ impl App for FuruApp { } } + metrics::record_auth_attempt( + "password", + "failure", + "bad_credentials", + ); let msg = i18n.t.login_invalid.to_owned(); login_page_handler(i18n, &config, db, msg) .await? @@ -332,6 +389,7 @@ impl Project for FuruProject { context: &cot::project::MiddlewareContext, ) -> cot::project::RootHandler { handler + .middleware(metrics::MetricsLayer) .middleware(StaticFilesMiddleware::from_context(context)) .middleware(SessionMiddleware::from_context(context)) .build() @@ -361,6 +419,7 @@ impl Project for FuruProject { apps.register_with_views( FuruApp { config: Arc::clone(&self.app_config), + pool: Arc::new(tokio::sync::OnceCell::new()), }, "", ); diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..0e1b9c4 --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,843 @@ +use std::collections::{BTreeMap, HashMap}; +use std::future::Future; +use std::path::Path; +use std::pin::Pin; +use std::sync::{LazyLock, Mutex}; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +use cot::http::header::CONTENT_LENGTH; +use cot::http::Method; +use cot::request::Request; +use cot::response::Response; +use cot::Error; +use sqlx::PgPool; +use tower::{Layer, Service}; + +use crate::config::AppConfig; + +const HTTP_BUCKETS: &[f64] = &[ + 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, +]; +const JOB_BUCKETS: &[f64] = &[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 300.0, 600.0]; +const FILE_BUCKETS: &[f64] = &[ + 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 15.0, 60.0, +]; + +static REGISTRY: LazyLock = LazyLock::new(Registry::default); +static ACTIVE_USERS: LazyLock>> = + LazyLock::new(|| Mutex::new(HashMap::new())); + +#[derive(Default)] +struct Registry { + counters: Mutex>, + gauges: Mutex>, + histograms: Mutex>, +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +struct MetricKey { + name: &'static str, + labels: Vec<(&'static str, String)>, +} + +#[derive(Debug, Clone)] +struct HistogramState { + buckets: Vec, + counts: Vec, + sum: f64, + count: u64, +} + +#[derive(Debug, Clone, Copy)] +pub struct MetricsLayer; + +#[derive(Debug, Clone)] +pub struct MetricsService { + inner: S, +} + +impl Layer for MetricsLayer { + type Service = MetricsService; + + fn layer(&self, inner: S) -> Self::Service { + MetricsService { inner } + } +} + +impl Service for MetricsService +where + S: Service + Send + 'static, + S::Future: Send + 'static, +{ + type Response = Response; + type Error = Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> Self::Future { + let method = request.method().clone(); + let route = normalize_route(request.uri().path()); + let request_bytes = request + .headers() + .get(CONTENT_LENGTH) + .and_then(|value| value.to_str().ok()) + .and_then(|value| value.parse::().ok()) + .unwrap_or(0.0); + let labels = http_labels(&method, &route, "in_flight"); + REGISTRY.inc_gauge("furumusic_http_in_flight_requests", labels, 1.0); + REGISTRY.inc_counter( + "furumusic_http_request_body_bytes_total", + vec![ + ("method", method.as_str().to_owned()), + ("route", route.clone()), + ], + request_bytes, + ); + + let start = Instant::now(); + let fut = self.inner.call(request); + Box::pin(async move { + let result = fut.await; + let elapsed = start.elapsed().as_secs_f64(); + REGISTRY.inc_gauge( + "furumusic_http_in_flight_requests", + http_labels(&method, &route, "in_flight"), + -1.0, + ); + + match result { + Ok(response) => { + let status = response.status().as_u16().to_string(); + let labels = http_labels(&method, &route, &status); + REGISTRY.inc_counter("furumusic_http_requests_total", labels.clone(), 1.0); + REGISTRY.observe_histogram( + "furumusic_http_request_duration_seconds", + labels, + elapsed, + HTTP_BUCKETS, + ); + if let Some(length) = response + .headers() + .get(CONTENT_LENGTH) + .and_then(|value| value.to_str().ok()) + .and_then(|value| value.parse::().ok()) + { + REGISTRY.inc_counter( + "furumusic_http_response_body_bytes_total", + vec![ + ("method", method.as_str().to_owned()), + ("route", route.clone()), + ("status", status), + ], + length, + ); + } + Ok(response) + } + Err(error) => { + let labels = http_labels(&method, &route, "500"); + REGISTRY.inc_counter("furumusic_http_requests_total", labels.clone(), 1.0); + REGISTRY.observe_histogram( + "furumusic_http_request_duration_seconds", + labels, + elapsed, + HTTP_BUCKETS, + ); + Err(error) + } + } + }) + } +} + +pub fn record_active_user(user_id: i64) { + let mut users = ACTIVE_USERS.lock().expect("active user lock"); + users.insert(user_id, Instant::now()); +} + +pub fn record_auth_attempt(method: &'static str, outcome: &'static str, reason: &'static str) { + REGISTRY.inc_counter( + "furumusic_auth_login_attempts_total", + vec![ + ("method", method.to_owned()), + ("outcome", outcome.to_owned()), + ("reason", reason.to_owned()), + ], + 1.0, + ); +} + +pub fn record_session_created(method: &'static str) { + REGISTRY.inc_counter( + "furumusic_auth_sessions_created_total", + vec![("method", method.to_owned())], + 1.0, + ); +} + +pub fn record_authorization_denied(kind: &'static str) { + REGISTRY.inc_counter( + "furumusic_auth_denied_total", + vec![("kind", kind.to_owned())], + 1.0, + ); +} + +pub fn record_play_history(duration_listened: Option, completed: bool) { + REGISTRY.inc_counter( + "furumusic_listens_total", + vec![( + "completed", + if completed { "true" } else { "false" }.to_owned(), + )], + 1.0, + ); + if let Some(seconds) = duration_listened { + REGISTRY.inc_counter( + "furumusic_listened_seconds_total", + Vec::new(), + seconds.max(0) as f64, + ); + } +} + +pub fn record_stream_request(range: bool, bytes: u64) { + REGISTRY.inc_counter( + "furumusic_stream_requests_total", + vec![("range", if range { "true" } else { "false" }.to_owned())], + 1.0, + ); + REGISTRY.inc_counter("furumusic_stream_bytes_total", Vec::new(), bytes as f64); +} + +pub fn record_agent_discover_run(outcome: &'static str, duration: Duration) { + REGISTRY.inc_counter( + "furumusic_agent_discover_runs_total", + vec![("outcome", outcome.to_owned())], + 1.0, + ); + REGISTRY.observe_histogram( + "furumusic_agent_discover_duration_seconds", + vec![("outcome", outcome.to_owned())], + duration.as_secs_f64(), + JOB_BUCKETS, + ); +} + +pub fn record_agent_discover_files(seen: u64, queued: u64, skipped_hash: u64, skipped_existing: u64) { + REGISTRY.inc_counter("furumusic_agent_discover_files_seen_total", Vec::new(), seen as f64); + REGISTRY.inc_counter( + "furumusic_agent_discover_files_queued_total", + Vec::new(), + queued as f64, + ); + REGISTRY.inc_counter( + "furumusic_agent_discover_files_skipped_total", + vec![("reason", "hash_known".to_owned())], + skipped_hash as f64, + ); + REGISTRY.inc_counter( + "furumusic_agent_discover_files_skipped_total", + vec![("reason", "already_queued".to_owned())], + skipped_existing as f64, + ); +} + +pub fn record_agent_file_hash(duration: Duration, bytes: i64, outcome: &'static str) { + REGISTRY.observe_histogram( + "furumusic_agent_discover_hash_duration_seconds", + vec![("outcome", outcome.to_owned())], + duration.as_secs_f64(), + FILE_BUCKETS, + ); + if bytes > 0 { + REGISTRY.inc_counter( + "furumusic_agent_discover_file_bytes_total", + vec![("outcome", outcome.to_owned())], + bytes as f64, + ); + } +} + +pub fn record_agent_metadata(duration: Duration, outcome: &'static str) { + REGISTRY.observe_histogram( + "furumusic_agent_discover_metadata_duration_seconds", + vec![("outcome", outcome.to_owned())], + duration.as_secs_f64(), + FILE_BUCKETS, + ); +} + +pub fn record_agent_folder_batch(outcome: &'static str, size: usize, duration: Duration) { + REGISTRY.inc_counter( + "furumusic_agent_folder_batches_total", + vec![("outcome", outcome.to_owned())], + 1.0, + ); + REGISTRY.observe_histogram( + "furumusic_agent_folder_batch_duration_seconds", + vec![("outcome", outcome.to_owned())], + duration.as_secs_f64(), + JOB_BUCKETS, + ); + REGISTRY.observe_histogram( + "furumusic_agent_folder_batch_size", + vec![("outcome", outcome.to_owned())], + size as f64, + &[1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0], + ); +} + +pub fn record_agent_file_processed(outcome: &'static str, decision: &'static str) { + REGISTRY.inc_counter( + "furumusic_agent_files_processed_total", + vec![ + ("outcome", outcome.to_owned()), + ("decision", decision.to_owned()), + ], + 1.0, + ); +} + +pub fn record_agent_failed(stage: &'static str) { + REGISTRY.inc_counter( + "furumusic_agent_failed_total", + vec![("stage", stage.to_owned())], + 1.0, + ); +} + +pub fn observe_agent_confidence(confidence: f64) { + REGISTRY.observe_histogram( + "furumusic_agent_confidence", + Vec::new(), + confidence, + &[0.1, 0.2, 0.4, 0.6, 0.75, 0.85, 0.95, 1.0], + ); +} + +pub fn record_agent_llm( + model: &str, + outcome: &'static str, + duration: Duration, + prompt_tokens: u64, + completion_tokens: u64, + batch_size: usize, + estimated_tokens: Option, +) { + let model = normalize_model_label(model); + REGISTRY.inc_counter( + "furumusic_agent_llm_requests_total", + vec![ + ("model", model.clone()), + ("outcome", outcome.to_owned()), + ], + 1.0, + ); + REGISTRY.observe_histogram( + "furumusic_agent_llm_duration_seconds", + vec![ + ("model", model.clone()), + ("outcome", outcome.to_owned()), + ], + duration.as_secs_f64(), + JOB_BUCKETS, + ); + REGISTRY.inc_counter( + "furumusic_agent_llm_tokens_total", + vec![("model", model.clone()), ("type", "prompt".to_owned())], + prompt_tokens as f64, + ); + REGISTRY.inc_counter( + "furumusic_agent_llm_tokens_total", + vec![ + ("model", model.clone()), + ("type", "completion".to_owned()), + ], + completion_tokens as f64, + ); + REGISTRY.observe_histogram( + "furumusic_agent_llm_batch_size", + vec![("model", model.clone())], + batch_size as f64, + &[1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0], + ); + if let Some(estimated) = estimated_tokens { + REGISTRY.observe_histogram( + "furumusic_agent_llm_context_estimated_tokens", + vec![("model", model)], + estimated as f64, + &[512.0, 1024.0, 2048.0, 4096.0, 8192.0, 16384.0, 32768.0], + ); + } +} + +pub fn record_agent_llm_split(reason: &'static str) { + REGISTRY.inc_counter( + "furumusic_agent_llm_batch_splits_total", + vec![("reason", reason.to_owned())], + 1.0, + ); +} + +pub fn record_agent_llm_parse_failure(model: &str) { + REGISTRY.inc_counter( + "furumusic_agent_llm_parse_failures_total", + vec![("model", normalize_model_label(model))], + 1.0, + ); +} + +pub fn record_agent_rag(kind: &'static str, outcome: &'static str, duration: Duration, results: usize) { + REGISTRY.inc_counter( + "furumusic_agent_rag_queries_total", + vec![("kind", kind.to_owned()), ("outcome", outcome.to_owned())], + 1.0, + ); + REGISTRY.observe_histogram( + "furumusic_agent_rag_duration_seconds", + vec![("kind", kind.to_owned()), ("outcome", outcome.to_owned())], + duration.as_secs_f64(), + FILE_BUCKETS, + ); + REGISTRY.observe_histogram( + "furumusic_agent_rag_results", + vec![("kind", kind.to_owned())], + results as f64, + &[0.0, 1.0, 2.0, 5.0, 10.0], + ); +} + +pub fn record_agent_cover_lookup(source: &'static str, outcome: &'static str, bytes: usize) { + REGISTRY.inc_counter( + "furumusic_agent_cover_lookup_total", + vec![("source", source.to_owned()), ("outcome", outcome.to_owned())], + 1.0, + ); + REGISTRY.inc_counter( + "furumusic_agent_cover_bytes_total", + vec![("source", source.to_owned())], + bytes as f64, + ); +} + +pub fn record_agent_cover_variant(variant: &'static str, outcome: &'static str, duration: Duration) { + REGISTRY.inc_counter( + "furumusic_agent_cover_variant_generation_total", + vec![ + ("variant", variant.to_owned()), + ("outcome", outcome.to_owned()), + ], + 1.0, + ); + REGISTRY.observe_histogram( + "furumusic_agent_cover_variant_duration_seconds", + vec![ + ("variant", variant.to_owned()), + ("outcome", outcome.to_owned()), + ], + duration.as_secs_f64(), + FILE_BUCKETS, + ); +} + +pub fn record_scheduler_job(job: &str, trigger: &str, outcome: &'static str, duration_ms: i64) { + let labels = vec![ + ("job", job.to_owned()), + ("trigger", trigger.to_owned()), + ("outcome", outcome.to_owned()), + ]; + REGISTRY.inc_counter("furumusic_scheduler_job_runs_total", labels.clone(), 1.0); + REGISTRY.observe_histogram( + "furumusic_scheduler_job_duration_seconds", + labels, + (duration_ms.max(0) as f64) / 1000.0, + JOB_BUCKETS, + ); +} + +pub fn record_torrent_download(outcome: &'static str, selected_bytes: u64, duration: Duration) { + REGISTRY.inc_counter( + "furumusic_torrent_downloads_total", + vec![("outcome", outcome.to_owned())], + 1.0, + ); + REGISTRY.inc_counter( + "furumusic_torrent_selected_bytes_total", + vec![("outcome", outcome.to_owned())], + selected_bytes as f64, + ); + REGISTRY.observe_histogram( + "furumusic_torrent_download_duration_seconds", + vec![("outcome", outcome.to_owned())], + duration.as_secs_f64(), + JOB_BUCKETS, + ); +} + +pub async fn render(pool: &PgPool, config: &AppConfig) -> String { + let mut out = String::new(); + emit_static_gauge(&mut out, "furumusic_build_info", &[("version", env!("CARGO_PKG_VERSION"))], 1.0); + render_active_users(&mut out); + render_storage(&mut out, config); + render_db_metrics(&mut out, pool).await; + out.push_str(®ISTRY.render()); + out +} + +fn render_active_users(out: &mut String) { + let mut users = ACTIVE_USERS.lock().expect("active user lock"); + let now = Instant::now(); + users.retain(|_, seen| now.duration_since(*seen) <= Duration::from_secs(3600)); + for (window, seconds) in [("5m", 300), ("15m", 900), ("1h", 3600)] { + let count = users + .values() + .filter(|seen| now.duration_since(**seen) <= Duration::from_secs(seconds)) + .count(); + emit_static_gauge( + out, + "furumusic_active_users", + &[("window", window)], + count as f64, + ); + } +} + +fn render_storage(out: &mut String, config: &AppConfig) { + for (kind, path) in [ + ("inbox", config.agent_inbox_dir.as_str()), + ("library", config.agent_storage_dir.as_str()), + ] { + if let Some(usage) = disk_usage(Path::new(path.trim())) { + emit_static_gauge( + out, + "furumusic_storage_free_bytes", + &[("path_kind", kind)], + usage.free_bytes as f64, + ); + emit_static_gauge( + out, + "furumusic_storage_total_bytes", + &[("path_kind", kind)], + usage.total_bytes as f64, + ); + } + } +} + +async fn render_db_metrics(out: &mut String, pool: &PgPool) { + render_group_counts(out, pool, "furumusic_users_total", "SELECT role::text AS label, COUNT(*) AS count FROM furumusic__user GROUP BY role", "role").await; + render_single_count(out, pool, "furumusic_library_tracks_total", "SELECT COUNT(*) FROM furumusic__track").await; + render_single_count(out, pool, "furumusic_library_releases_total", "SELECT COUNT(*) FROM furumusic__release").await; + render_single_count(out, pool, "furumusic_library_artists_total", "SELECT COUNT(*) FROM furumusic__artist").await; + render_single_count(out, pool, "furumusic_library_playlists_total", "SELECT COUNT(*) FROM furumusic__playlist").await; + render_group_counts(out, pool, "furumusic_media_files_total", "SELECT file_type::text AS label, COUNT(*) AS count FROM furumusic__media_file GROUP BY file_type", "type").await; + render_group_sums(out, pool, "furumusic_media_file_bytes_total", "SELECT file_type::text AS label, COALESCE(SUM(file_size_bytes), 0)::bigint AS value FROM furumusic__media_file GROUP BY file_type", "type").await; + render_group_counts(out, pool, "furumusic_agent_reviews_total", "SELECT status::text AS label, COUNT(*) AS count FROM furumusic__pending_review GROUP BY status", "status").await; + render_group_counts(out, pool, "furumusic_agent_queue_depth", "SELECT status::text AS label, COUNT(*) AS count FROM furumusic__pending_review GROUP BY status", "status").await; + render_group_counts(out, pool, "furumusic_scheduler_job_running", "SELECT job_name::text AS label, COUNT(*) AS count FROM furumusic__job_run WHERE status = 'running' GROUP BY job_name", "job").await; + render_group_sums(out, pool, "furumusic_scheduler_job_enabled", "SELECT name::text AS label, (CASE WHEN enabled THEN 1 ELSE 0 END)::bigint AS value FROM furumusic__scheduled_job", "job").await; + render_group_counts(out, pool, "furumusic_torrent_sessions_total", "SELECT status::text AS label, COUNT(*) AS count FROM furumusic__torrent_session GROUP BY status", "status").await; + render_single_count(out, pool, "furumusic_play_history_total", "SELECT COUNT(*) FROM furumusic__play_history").await; +} + +async fn render_single_count(out: &mut String, pool: &PgPool, metric: &'static str, sql: &str) { + if let Ok(value) = sqlx::query_scalar::<_, i64>(sql).fetch_one(pool).await { + emit_static_gauge(out, metric, &[], value as f64); + } +} + +async fn render_group_counts( + out: &mut String, + pool: &PgPool, + metric: &'static str, + sql: &str, + label_name: &'static str, +) { + if let Ok(rows) = sqlx::query_as::<_, (String, i64)>(sql).fetch_all(pool).await { + for (label, count) in rows { + emit_static_gauge(out, metric, &[(label_name, label.as_str())], count as f64); + } + } +} + +async fn render_group_sums( + out: &mut String, + pool: &PgPool, + metric: &'static str, + sql: &str, + label_name: &'static str, +) { + if let Ok(rows) = sqlx::query_as::<_, (String, i64)>(sql).fetch_all(pool).await { + for (label, value) in rows { + emit_static_gauge(out, metric, &[(label_name, label.as_str())], value as f64); + } + } +} + +impl Registry { + fn inc_counter(&self, name: &'static str, labels: Vec<(&'static str, String)>, value: f64) { + if value <= 0.0 { + return; + } + let mut counters = self.counters.lock().expect("counter lock"); + *counters.entry(MetricKey::new(name, labels)).or_default() += value; + } + + fn inc_gauge(&self, name: &'static str, labels: Vec<(&'static str, String)>, value: f64) { + let mut gauges = self.gauges.lock().expect("gauge lock"); + *gauges.entry(MetricKey::new(name, labels)).or_default() += value; + } + + fn observe_histogram( + &self, + name: &'static str, + labels: Vec<(&'static str, String)>, + value: f64, + buckets: &[f64], + ) { + let mut histograms = self.histograms.lock().expect("histogram lock"); + let state = histograms + .entry(MetricKey::new(name, labels)) + .or_insert_with(|| HistogramState { + buckets: buckets.to_vec(), + counts: vec![0; buckets.len()], + sum: 0.0, + count: 0, + }); + for (index, bucket) in state.buckets.iter().enumerate() { + if value <= *bucket { + state.counts[index] += 1; + } + } + state.sum += value; + state.count += 1; + } + + fn render(&self) -> String { + let mut out = String::new(); + for (key, value) in self.counters.lock().expect("counter lock").iter() { + emit_type(&mut out, key.name, "counter"); + emit_metric(&mut out, key.name, &key.labels, *value); + } + for (key, value) in self.gauges.lock().expect("gauge lock").iter() { + emit_type(&mut out, key.name, "gauge"); + emit_metric(&mut out, key.name, &key.labels, (*value).max(0.0)); + } + for (key, state) in self.histograms.lock().expect("histogram lock").iter() { + emit_type(&mut out, key.name, "histogram"); + for (bucket, count) in state.buckets.iter().zip(state.counts.iter()) { + let mut labels = key.labels.clone(); + labels.push(("le", bucket.to_string())); + emit_metric(&mut out, &format!("{}_bucket", key.name), &labels, *count as f64); + } + let mut inf_labels = key.labels.clone(); + inf_labels.push(("le", "+Inf".to_owned())); + emit_metric( + &mut out, + &format!("{}_bucket", key.name), + &inf_labels, + state.count as f64, + ); + emit_metric( + &mut out, + &format!("{}_sum", key.name), + &key.labels, + state.sum, + ); + emit_metric( + &mut out, + &format!("{}_count", key.name), + &key.labels, + state.count as f64, + ); + } + out + } +} + +impl MetricKey { + fn new(name: &'static str, mut labels: Vec<(&'static str, String)>) -> Self { + labels.sort_by(|a, b| a.0.cmp(b.0)); + Self { name, labels } + } +} + +fn http_labels(method: &Method, route: &str, status: &str) -> Vec<(&'static str, String)> { + vec![ + ("method", method.as_str().to_owned()), + ("route", route.to_owned()), + ("status", status.to_owned()), + ] +} + +fn normalize_route(path: &str) -> String { + let mut route = String::with_capacity(path.len()); + for segment in path.split('/') { + if segment.is_empty() { + continue; + } + route.push('/'); + if segment.parse::().is_ok() || looks_like_uuid(segment) { + route.push_str("{id}"); + } else { + route.push_str(segment); + } + } + if route.is_empty() { + "/".to_owned() + } else { + route + } +} + +fn looks_like_uuid(value: &str) -> bool { + value.len() == 36 + && value + .chars() + .all(|ch| ch.is_ascii_hexdigit() || ch == '-') +} + +fn normalize_model_label(value: &str) -> String { + let trimmed = value.trim(); + if trimmed.is_empty() { + "unknown".to_owned() + } else { + trimmed.chars().take(80).collect() + } +} + +fn emit_static_gauge(out: &mut String, name: &str, labels: &[(&str, &str)], value: f64) { + emit_type(out, name, "gauge"); + let labels = labels + .iter() + .map(|(key, value)| (*key, (*value).to_owned())) + .collect::>(); + emit_metric(out, name, &labels, value); +} + +fn emit_type(out: &mut String, name: &str, metric_type: &str) { + let _ = (out, name, metric_type); +} + +fn emit_metric(out: &mut String, name: &str, labels: &[(&str, String)], value: f64) { + out.push_str(name); + if !labels.is_empty() { + out.push('{'); + for (index, (key, value)) in labels.iter().enumerate() { + if index > 0 { + out.push(','); + } + out.push_str(key); + out.push_str("=\""); + escape_label(out, value); + out.push('"'); + } + out.push('}'); + } + out.push(' '); + out.push_str(&format!("{value:.6}")); + out.push('\n'); +} + +fn escape_label(out: &mut String, value: &str) { + for ch in value.chars() { + match ch { + '\\' => out.push_str("\\\\"), + '"' => out.push_str("\\\""), + '\n' => out.push_str("\\n"), + _ => out.push(ch), + } + } +} + +#[derive(Debug, Clone, Copy)] +struct DiskUsage { + free_bytes: u64, + total_bytes: u64, +} + +#[cfg(windows)] +fn disk_usage(path: &Path) -> Option { + use std::os::windows::ffi::OsStrExt; + + #[link(name = "kernel32")] + unsafe extern "system" { + fn GetDiskFreeSpaceExW( + lpDirectoryName: *const u16, + lpFreeBytesAvailableToCaller: *mut u64, + lpTotalNumberOfBytes: *mut u64, + lpTotalNumberOfFreeBytes: *mut u64, + ) -> i32; + } + + let mut wide: Vec = path.as_os_str().encode_wide().collect(); + wide.push(0); + let mut free_available = 0_u64; + let mut total = 0_u64; + let mut total_free = 0_u64; + let ok = unsafe { + GetDiskFreeSpaceExW( + wide.as_ptr(), + &mut free_available, + &mut total, + &mut total_free, + ) + }; + (ok != 0).then_some(DiskUsage { + free_bytes: free_available, + total_bytes: total, + }) +} + +#[cfg(any(target_os = "linux", target_os = "android"))] +fn disk_usage(path: &Path) -> Option { + use std::ffi::CString; + use std::os::unix::ffi::OsStrExt; + + #[repr(C)] + struct Statvfs { + f_bsize: std::ffi::c_ulong, + f_frsize: std::ffi::c_ulong, + f_blocks: std::ffi::c_ulong, + f_bfree: std::ffi::c_ulong, + f_bavail: std::ffi::c_ulong, + f_files: std::ffi::c_ulong, + f_ffree: std::ffi::c_ulong, + f_favail: std::ffi::c_ulong, + f_fsid: std::ffi::c_ulong, + f_flag: std::ffi::c_ulong, + f_namemax: std::ffi::c_ulong, + __f_spare: [std::ffi::c_int; 6], + } + + unsafe extern "C" { + fn statvfs(path: *const std::ffi::c_char, buf: *mut Statvfs) -> std::ffi::c_int; + } + + let c_path = CString::new(path.as_os_str().as_bytes()).ok()?; + let mut stat = std::mem::MaybeUninit::::uninit(); + let ok = unsafe { statvfs(c_path.as_ptr(), stat.as_mut_ptr()) }; + if ok != 0 { + return None; + } + let stat = unsafe { stat.assume_init() }; + let fragment_size = if stat.f_frsize > 0 { + stat.f_frsize as u64 + } else { + stat.f_bsize as u64 + }; + Some(DiskUsage { + free_bytes: stat.f_bavail as u64 * fragment_size, + total_bytes: stat.f_blocks as u64 * fragment_size, + }) +} + +#[cfg(not(any(windows, target_os = "linux", target_os = "android")))] +fn disk_usage(_path: &Path) -> Option { + None +} diff --git a/src/oidc.rs b/src/oidc.rs index b5ebbc4..f624d65 100644 --- a/src/oidc.rs +++ b/src/oidc.rs @@ -172,6 +172,7 @@ pub async fn oidc_start_handler( || config.oidc_client_secret.is_empty() { tracing::warn!("OIDC start requested but SSO is not configured"); + crate::metrics::record_auth_attempt("oidc", "failure", "not_configured"); return redirect_login_with_error(i18n.t.login_sso_disabled); } @@ -180,6 +181,7 @@ pub async fn oidc_start_handler( Ok(c) => c, Err(e) => { tracing::error!("OIDC provider error: {e}"); + crate::metrics::record_auth_attempt("oidc", "failure", "provider_error"); return redirect_login_with_error(i18n.t.login_oidc_error); } }; @@ -276,19 +278,23 @@ pub async fn oidc_callback_handler( // Validate CSRF state. let Some(saved_csrf) = saved_csrf else { tracing::warn!("OIDC callback: no CSRF state in session"); + crate::metrics::record_auth_attempt("oidc", "failure", "missing_state"); return redirect_login_with_error(i18n.t.login_oidc_error); }; if query.state != saved_csrf { tracing::warn!("OIDC callback: CSRF state mismatch"); + crate::metrics::record_auth_attempt("oidc", "failure", "csrf"); return redirect_login_with_error(i18n.t.login_oidc_error); } let Some(nonce_str) = saved_nonce else { tracing::warn!("OIDC callback: no nonce in session"); + crate::metrics::record_auth_attempt("oidc", "failure", "missing_nonce"); return redirect_login_with_error(i18n.t.login_oidc_error); }; let Some(pkce_str) = saved_pkce else { tracing::warn!("OIDC callback: no PKCE verifier in session"); + crate::metrics::record_auth_attempt("oidc", "failure", "missing_pkce"); return redirect_login_with_error(i18n.t.login_oidc_error); }; @@ -300,6 +306,7 @@ pub async fn oidc_callback_handler( Ok(c) => c, Err(e) => { tracing::error!("OIDC provider error during callback: {e}"); + crate::metrics::record_auth_attempt("oidc", "failure", "provider_error"); return redirect_login_with_error(i18n.t.login_oidc_error); } }; @@ -318,6 +325,7 @@ pub async fn oidc_callback_handler( Ok(req) => req, Err(e) => { tracing::error!("OIDC token endpoint not configured: {e}"); + crate::metrics::record_auth_attempt("oidc", "failure", "token_config"); return redirect_login_with_error(i18n.t.login_oidc_error); } }; @@ -330,6 +338,7 @@ pub async fn oidc_callback_handler( Ok(t) => t, Err(e) => { tracing::error!("OIDC token exchange failed: {e}"); + crate::metrics::record_auth_attempt("oidc", "failure", "token_exchange"); return redirect_login_with_error(i18n.t.login_oidc_error); } }; @@ -340,6 +349,7 @@ pub async fn oidc_callback_handler( Some(t) => t, None => { tracing::error!("OIDC response missing ID token"); + crate::metrics::record_auth_attempt("oidc", "failure", "missing_id_token"); return redirect_login_with_error(i18n.t.login_oidc_error); } }; @@ -348,6 +358,7 @@ pub async fn oidc_callback_handler( Ok(c) => c, Err(e) => { tracing::error!("OIDC ID token verification failed: {e}"); + crate::metrics::record_auth_attempt("oidc", "failure", "id_token_verify"); return redirect_login_with_error(i18n.t.login_oidc_error); } }; @@ -395,6 +406,7 @@ pub async fn oidc_callback_handler( config.oidc_user_groups, config.oidc_admin_groups, ); + crate::metrics::record_auth_attempt("oidc", "failure", "not_in_group"); return redirect_login_with_error(i18n.t.login_access_denied); } @@ -413,12 +425,15 @@ pub async fn oidc_callback_handler( Ok(u) => u, Err(e) => { tracing::error!("OIDC user provisioning failed: {e}"); + crate::metrics::record_auth_attempt("oidc", "failure", "provisioning"); return redirect_login_with_error(i18n.t.login_oidc_error); } }; // Log the user in. auth::login(&session, user.id_val()).await?; + crate::metrics::record_auth_attempt("oidc", "success", "ok"); + crate::metrics::record_session_created("oidc"); // Clear OIDC session keys. let _: Option = session diff --git a/src/player/mod.rs b/src/player/mod.rs index 6aa5740..1f8b8d9 100644 --- a/src/player/mod.rs +++ b/src/player/mod.rs @@ -3571,6 +3571,7 @@ async fn stream_handler( let chunk_size = end - start + 1; let data = read_file_range(&full_path, start, chunk_size).await?; + crate::metrics::record_stream_request(true, chunk_size); let response = cot::http::Response::builder() .status(StatusCode::PARTIAL_CONTENT) @@ -3589,6 +3590,7 @@ async fn stream_handler( let data = tokio::fs::read(&full_path) .await .map_err(|e| cot::Error::internal(e.to_string()))?; + crate::metrics::record_stream_request(false, file_size); let response = cot::http::Response::builder() .status(StatusCode::OK) @@ -4491,6 +4493,7 @@ async fn history_handler( .execute(pool) .await .map_err(|e| cot::Error::internal(e.to_string()))?; + crate::metrics::record_play_history(entry.duration_listened, entry.completed); if let Some(listened_seconds) = entry.duration_listened { let (config, _) = AppConfig::load_with_db(&db).await; diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 53b9462..f3a1640 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -200,6 +200,12 @@ impl JobRun { duration_ms: i64, log: &str, ) -> cot::db::Result<()> { + crate::metrics::record_scheduler_job( + self.job_name.as_str(), + self.trigger.as_str(), + "completed", + duration_ms, + ); self.status = LimitedString::new("completed").unwrap(); self.finished_at = Some(now_iso().to_string()); self.duration_ms = Some(duration_ms); @@ -214,6 +220,12 @@ impl JobRun { log: &str, error: &str, ) -> cot::db::Result<()> { + crate::metrics::record_scheduler_job( + self.job_name.as_str(), + self.trigger.as_str(), + "failed", + duration_ms, + ); self.status = LimitedString::new("failed").unwrap(); self.finished_at = Some(now_iso().to_string()); self.duration_ms = Some(duration_ms); diff --git a/src/torrents.rs b/src/torrents.rs index 63b2d21..b9a7901 100644 --- a/src/torrents.rs +++ b/src/torrents.rs @@ -777,7 +777,7 @@ impl TorrentService { self.ensure_memory_job(pool, uploader_user_id, id).await?; - let (torrent_bytes, output_dir) = { + let (torrent_bytes, output_dir, selected_size) = { let mut jobs = self.jobs.lock().await; let job = jobs.get_mut(id).context("torrent job not found")?; if job.user_id != uploader_user_id { @@ -800,7 +800,11 @@ impl TorrentService { job.error = None; job.completed_at = None; job.updated_at = now_string(); - (job.torrent_bytes.clone(), job.output_dir.clone()) + ( + job.torrent_bytes.clone(), + job.output_dir.clone(), + job.selected_size(), + ) }; tokio::fs::create_dir_all(&output_dir).await?; @@ -846,6 +850,7 @@ impl TorrentService { let service = Arc::clone(self); let pool = pool.clone(); let id = id.to_string(); + let download_start = std::time::Instant::now(); tokio::spawn(async move { if let Err(err) = handle.wait_until_completed().await { if service.is_paused(&id).await { @@ -853,6 +858,11 @@ impl TorrentService { } service.stop_torrent(&handle).await; service.fail_job(&pool, &id, err.to_string()).await; + crate::metrics::record_torrent_download( + "failed", + selected_size, + download_start.elapsed(), + ); return; } service.stop_torrent(&handle).await; @@ -861,6 +871,17 @@ impl TorrentService { .await { service.fail_job(&pool, &id, err.to_string()).await; + crate::metrics::record_torrent_download( + "failed", + selected_size, + download_start.elapsed(), + ); + } else { + crate::metrics::record_torrent_download( + "completed", + selected_size, + download_start.elapsed(), + ); } });