diff --git a/Cargo.lock b/Cargo.lock index 3f975db..09da410 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1397,7 +1397,7 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "furumusic" -version = "0.1.7" +version = "0.1.8" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 3cb4f76..b8a6e9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "furumusic" -version = "0.1.8" +version = "0.1.9" edition = "2024" description = "Reusable web-app boilerplate: auth, OIDC/SSO, admin panel, user management, i18n, PostgreSQL" diff --git a/src/admin/mod.rs b/src/admin/mod.rs index d088313..807c9f1 100644 --- a/src/admin/mod.rs +++ b/src/admin/mod.rs @@ -1,3 +1,4 @@ +mod v2; pub mod views; use std::sync::Arc; @@ -131,20 +132,296 @@ impl App for AdminApp { ), "admin_setup", ), + // -- Admin v2 ----------------------------------------------------- + Route::with_handler_and_name( + "/v2", + |session: Session, db: Database, i18n: I18n| async move { + let count = User::count_all(&db).await.unwrap_or(0); + if count == 0 { + return Ok(auth::redirect("/admin/setup")); + } + let admin = match auth::require_admin_or_redirect(&session, &db).await { + Ok(u) => u, + Err(resp) => return Ok(resp), + }; + v2::page(admin, i18n).await?.into_response() + }, + "admin_v2", + ), + Route::with_handler_and_name( + "/v2/api/dashboard", + { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + let registry = Arc::clone(&self.registry); + get(move |session: Session, db: Database| { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + let registry = Arc::clone(®istry); + async move { + let pg_pool = pool + .get_or_init(|| async { + sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&pool_config.database_url) + .await + .expect("admin pool") + }) + .await; + v2::dashboard(session, db, pg_pool, ®istry).await + } + }) + }, + "admin_v2_dashboard", + ), + Route::with_handler_and_name( + "/v2/api/reviews", + { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + get(move |session: Session, db: Database, + query: UrlQuery| { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + async move { + let pg_pool = pool + .get_or_init(|| async { + sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&pool_config.database_url) + .await + .expect("admin pool") + }) + .await; + v2::reviews(session, db, pg_pool, query.0).await + } + }) + }, + "admin_v2_reviews", + ), + Route::with_handler_and_name( + "/v2/api/reviews/bulk", + { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + cot::router::method::post( + move |session: Session, + db: Database, + json: Json| { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + async move { + let pg_pool = pool + .get_or_init(|| async { + sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&pool_config.database_url) + .await + .expect("admin pool") + }) + .await; + v2::bulk_reviews(session, db, pg_pool, json).await + } + }, + ) + }, + "admin_v2_reviews_bulk", + ), + Route::with_handler_and_name( + "/v2/api/jobs", + { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + let registry = Arc::clone(&self.registry); + get(move |session: Session, db: Database| { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + let registry = Arc::clone(®istry); + async move { + let pg_pool = pool + .get_or_init(|| async { + sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&pool_config.database_url) + .await + .expect("admin pool") + }) + .await; + v2::jobs(session, db, pg_pool, ®istry).await + } + }) + }, + "admin_v2_jobs", + ), + Route::with_handler_and_name( + "/v2/api/jobs/{name}/run", + cot::router::method::post({ + let handle = Arc::clone(&self.scheduler_handle); + move |session: Session, db: Database, path: Path| { + let handle = Arc::clone(&handle); + async move { v2::run_job(session, db, &handle, &path.0.name).await } + } + }), + "admin_v2_job_run", + ), + Route::with_handler_and_name( + "/v2/api/jobs/{name}/toggle", + cot::router::method::post({ + let handle = Arc::clone(&self.scheduler_handle); + move |session: Session, db: Database, path: Path| { + let handle = Arc::clone(&handle); + async move { v2::toggle_job(session, db, &handle, &path.0.name).await } + } + }), + "admin_v2_job_toggle", + ), + Route::with_handler_and_name( + "/v2/api/jobs/{name}/runs", + { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + get(move |session: Session, db: Database, path: Path| { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + async move { + let pg_pool = pool + .get_or_init(|| async { + sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&pool_config.database_url) + .await + .expect("admin pool") + }) + .await; + v2::job_runs(session, db, pg_pool, &path.0.name).await + } + }) + }, + "admin_v2_job_runs", + ), + Route::with_handler_and_name( + "/v2/api/jobs/{name}/runs/{run_id}", + { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + get( + move |session: Session, db: Database, path: Path| { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + async move { + let pg_pool = pool + .get_or_init(|| async { + sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&pool_config.database_url) + .await + .expect("admin pool") + }) + .await; + v2::job_run_detail(session, db, pg_pool, path.0.run_id).await + } + }, + ) + }, + "admin_v2_job_run_detail", + ), + Route::with_handler_and_name( + "/v2/api/library", + { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + get(move |session: Session, db: Database, + query: UrlQuery| { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + async move { + let pg_pool = pool + .get_or_init(|| async { + sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&pool_config.database_url) + .await + .expect("admin pool") + }) + .await; + v2::library(session, db, pg_pool, query.0).await + } + }) + }, + "admin_v2_library", + ), + Route::with_handler_and_name( + "/v2/api/library/item", + { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + cot::router::method::post( + move |session: Session, + db: Database, + json: Json| { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + async move { + let pg_pool = pool + .get_or_init(|| async { + sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&pool_config.database_url) + .await + .expect("admin pool") + }) + .await; + v2::update_library_item(session, db, pg_pool, json).await + } + }, + ) + }, + "admin_v2_library_item", + ), + Route::with_handler_and_name( + "/v2/api/library/bulk", + { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + cot::router::method::post( + move |session: Session, + db: Database, + json: Json| { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + async move { + let pg_pool = pool + .get_or_init(|| async { + sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&pool_config.database_url) + .await + .expect("admin pool") + }) + .await; + v2::bulk_library(session, db, pg_pool, json).await + } + }, + ) + }, + "admin_v2_library_bulk", + ), // -- Dashboard ---------------------------------------------------- Route::with_handler_and_name( "/", |session: Session, db: Database, i18n: I18n| async move { let count = User::count_all(&db).await.unwrap_or(0); if count == 0 { - return Ok(auth::redirect("/admin/setup")); + return Ok::(auth::redirect( + "/admin/setup", + )); } - let admin = - match auth::require_admin_or_redirect(&session, &db).await { - Ok(u) => u, - Err(resp) => return Ok(resp), - }; - views::admin_index(admin, i18n).await?.into_response() + let _admin = match auth::require_admin_or_redirect(&session, &db).await { + Ok(u) => u, + Err(resp) => return Ok(resp), + }; + let _ = i18n; + Ok::(auth::redirect("/admin/v2")) }, "admin_index", ), diff --git a/src/admin/v2.rs b/src/admin/v2.rs new file mode 100644 index 0000000..9a7102d --- /dev/null +++ b/src/admin/v2.rs @@ -0,0 +1,1766 @@ +use std::collections::HashMap; + +use cot::db::Database; +use cot::html::Html; +use cot::http::StatusCode; +use cot::http::header::CONTENT_TYPE; +use cot::json::Json; +use cot::response::IntoResponse; +use cot::session::Session; +use cot::{Body, Template}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use sqlx::{PgPool, Postgres, QueryBuilder}; + +use super::BUILD_INFO; +use crate::auth::{self, AuthenticatedUser, Role}; +use crate::i18n::{I18n, Translations}; +use crate::scheduler::{JobRegistry, ScheduledJob}; + +#[derive(Debug, Template)] +#[template(path = "admin/v2.html")] +struct AdminV2Template { + t: &'static Translations, + user_name: String, + user_role: String, + version: &'static str, +} + +#[derive(Debug, Deserialize)] +pub(super) struct ReviewsQuery { + pub(super) status: Option, + pub(super) search: Option, + pub(super) limit: Option, + pub(super) offset: Option, +} + +#[derive(Debug, Deserialize)] +pub(super) struct LibraryQuery { + pub(super) kind: Option, + pub(super) search: Option, + pub(super) limit: Option, + pub(super) offset: Option, +} + +#[derive(Debug, Deserialize)] +pub(super) struct BulkReviewsRequest { + action: String, + mode: Option, + ids: Option>, + filter: Option, +} + +#[derive(Debug, Deserialize)] +pub(super) struct BulkLibraryRequest { + action: String, + kind: String, + mode: Option, + ids: Option>, + filter: Option, +} + +#[derive(Debug, Deserialize)] +pub(super) struct UpdateLibraryItemRequest { + kind: String, + id: i64, + title: String, + hidden: bool, +} + +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] +struct ReviewFilter { + status: Option, + search: Option, +} + +#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)] +struct LibraryFilter { + search: Option, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct AdminUserDto { + id: i64, + name: String, + role: String, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct BuildDto { + package: &'static str, + version: &'static str, + profile: &'static str, + target: &'static str, + rustc_version: &'static str, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct AdminDashboardDto { + user: AdminUserDto, + build: BuildDto, + stats: OverviewStatsDto, + reviews: ReviewPageDto, + jobs: Vec, + recent_runs: Vec, + library: LibraryOverviewDto, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct OverviewStatsDto { + tracks: i64, + releases: i64, + artists: i64, + playlists: i64, + hidden_tracks: i64, + hidden_releases: i64, + hidden_artists: i64, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct StatusCountDto { + status: String, + count: i64, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct TagDto { + label: String, + kind: String, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct ReviewPageDto { + items: Vec, + total: i64, + limit: i64, + offset: i64, + status: Option, + search: Option, + status_counts: Vec, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct ReviewDto { + id: i64, + job_run_id: i64, + review_type: String, + input_path: String, + display_path: String, + filename: String, + status: String, + confidence: Option, + model_name: Option, + llm_duration_ms: Option, + token_count: Option, + tags: Vec, + error_message: Option, + created_at: String, + updated_at: String, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct JobDto { + name: String, + description: String, + cron_expression: String, + enabled: bool, + health: String, + is_running: bool, + last_run_at: Option, + next_run_at: Option, + recent_runs: Vec, +} + +#[derive(Debug, Clone, Serialize, JsonSchema)] +struct JobRunDto { + id: i64, + job_name: String, + status: String, + started_at: String, + finished_at: Option, + duration_ms: Option, + trigger: String, + error_message: Option, + log_excerpt: String, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct JobRunStartedDto { + ok: bool, + run_id: i64, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct JobRunsDto { + job_name: String, + runs: Vec, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct JobRunDetailDto { + run: JobRunDto, + log_output: String, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct BulkReviewsResponse { + ok: bool, + affected: u64, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct MutationResponse { + ok: bool, + affected: u64, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct LibraryOverviewDto { + artists: i64, + releases: i64, + tracks: i64, + playlists: i64, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct LibraryPageDto { + kind: String, + items: Vec, + total: i64, + limit: i64, + offset: i64, + search: Option, +} + +#[derive(Debug, Serialize, JsonSchema)] +struct LibraryItemDto { + id: i64, + kind: String, + title: String, + subtitle: String, + is_hidden: Option, + tags: Vec, + updated_at: Option, +} + +#[derive(Debug, sqlx::FromRow)] +struct IdRow { + id: i64, +} + +#[derive(Debug, sqlx::FromRow)] +struct CountRow { + count: i64, +} + +#[derive(Debug, sqlx::FromRow)] +struct StatusCountRow { + status: String, + count: i64, +} + +#[derive(Debug, sqlx::FromRow)] +struct ReviewRow { + id: i64, + job_run_id: i64, + review_type: String, + input_path: Option, + context_json: Option, + result_json: Option, + status: String, + created_at: String, + updated_at: String, + error_message: Option, +} + +#[derive(Debug, sqlx::FromRow)] +struct ReviewStatsRow { + pending_review_id: i64, + model_name: String, + llm_duration_ms: i64, + prompt_tokens: i64, + completion_tokens: i64, +} + +#[derive(Debug, Clone, sqlx::FromRow)] +struct ReviewMediaRow { + sha256_hash: String, + original_filename: String, + file_size_bytes: i64, + audio_format: Option, + audio_bitrate: Option, + audio_sample_rate: Option, + audio_bit_depth: Option, +} + +#[derive(Debug, Clone, sqlx::FromRow)] +struct JobRunRow { + id: i64, + job_name: String, + status: String, + started_at: String, + finished_at: Option, + duration_ms: Option, + trigger: String, + error_message: Option, + log_excerpt: String, +} + +#[derive(Debug, sqlx::FromRow)] +struct JobRunDetailRow { + id: i64, + job_name: String, + status: String, + started_at: String, + finished_at: Option, + duration_ms: Option, + trigger: String, + error_message: Option, + log_excerpt: String, + log_output: Option, +} + +#[derive(Debug, sqlx::FromRow)] +struct LibraryItemRow { + id: i64, + title: String, + subtitle: Option, + is_hidden: Option, + primary_count: i64, + secondary_count: i64, + tertiary_count: i64, + updated_at: Option, +} + +pub async fn page(admin: AuthenticatedUser, i18n: I18n) -> cot::Result { + let template = AdminV2Template { + t: i18n.t, + user_name: admin.name, + user_role: admin.role.code().to_owned(), + version: BUILD_INFO.pkg_version, + }; + Ok(Html::new(template.render()?)) +} + +pub async fn dashboard( + session: Session, + db: Database, + pool: &PgPool, + registry: &JobRegistry, +) -> cot::Result { + let admin = match require_admin_json(&session, &db).await { + Ok(admin) => admin, + Err(response) => return Ok(response), + }; + + sync_registered_jobs(&db, registry).await; + let reviews_query = ReviewsQuery { + status: None, + search: None, + limit: Some(80), + offset: Some(0), + }; + let (reviews, stats, jobs, recent_runs, library) = tokio::try_join!( + load_review_page(pool, reviews_query), + load_overview_stats(pool), + load_jobs(&db, pool, registry), + load_recent_runs(pool, 5), + load_library_overview(pool), + ) + .map_err(|e| cot::Error::internal(e.to_string()))?; + + Json(AdminDashboardDto { + user: AdminUserDto { + id: admin.id, + name: admin.name, + role: admin.role.code().to_owned(), + }, + build: build_dto(), + stats, + reviews, + jobs, + recent_runs, + library, + }) + .into_response() +} + +pub async fn reviews( + session: Session, + db: Database, + pool: &PgPool, + query: ReviewsQuery, +) -> cot::Result { + if let Err(response) = require_admin_json(&session, &db).await { + return Ok(response); + } + + let page = load_review_page(pool, query) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + Json(page).into_response() +} + +pub async fn bulk_reviews( + session: Session, + db: Database, + pool: &PgPool, + Json(body): Json, +) -> cot::Result { + if let Err(response) = require_admin_json(&session, &db).await { + return Ok(response); + } + + let action = body.action.trim(); + if action != "delete" && action != "requeue" { + return Ok(json_error(StatusCode::BAD_REQUEST, "unknown bulk action")); + } + + let mode = body.mode.as_deref().unwrap_or("ids"); + let affected = if mode == "filter" { + apply_review_filter_action(pool, action, body.filter.unwrap_or_default()).await? + } else { + let mut ids = body.ids.unwrap_or_default(); + ids.retain(|id| *id > 0); + ids.sort_unstable(); + ids.dedup(); + if ids.is_empty() { + 0 + } else if action == "delete" { + crate::scheduler::PendingReview::delete_by_ids(&db, &ids) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + ids.len() as u64 + } else { + crate::scheduler::PendingReview::requeue_by_ids(&db, &ids) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + ids.len() as u64 + } + }; + + Json(BulkReviewsResponse { ok: true, affected }).into_response() +} + +pub async fn jobs( + session: Session, + db: Database, + pool: &PgPool, + registry: &JobRegistry, +) -> cot::Result { + if let Err(response) = require_admin_json(&session, &db).await { + return Ok(response); + } + sync_registered_jobs(&db, registry).await; + let jobs = load_jobs(&db, pool, registry) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + Json(jobs).into_response() +} + +pub async fn run_job( + session: Session, + db: Database, + handle_cell: &std::sync::Arc< + tokio::sync::OnceCell>, + >, + job_name: &str, +) -> cot::Result { + if let Err(response) = require_admin_json(&session, &db).await { + return Ok(response); + } + + let Some(handle) = handle_cell.get() else { + return Ok(json_error( + StatusCode::SERVICE_UNAVAILABLE, + "scheduler is not ready", + )); + }; + + match std::sync::Arc::clone(handle) + .trigger_job_now_background(job_name) + .await + { + Ok(run_id) => Json(JobRunStartedDto { ok: true, run_id }).into_response(), + Err(e) => Ok(json_error(StatusCode::BAD_REQUEST, &e.to_string())), + } +} + +pub async fn toggle_job( + session: Session, + db: Database, + handle_cell: &std::sync::Arc< + tokio::sync::OnceCell>, + >, + job_name: &str, +) -> cot::Result { + if let Err(response) = require_admin_json(&session, &db).await { + return Ok(response); + } + + let job = ScheduledJob::get_by_name(&db, job_name) + .await + .map_err(|e| cot::Error::internal(e.to_string()))? + .ok_or_else(|| cot::Error::internal("job not found"))?; + let Some(handle) = handle_cell.get() else { + return Ok(json_error( + StatusCode::SERVICE_UNAVAILABLE, + "scheduler is not ready", + )); + }; + let enabled = !job.enabled; + if let Err(e) = handle.toggle_job(job_name, enabled).await { + return Ok(json_error(StatusCode::BAD_REQUEST, &e.to_string())); + } + + Json(serde_json::json!({ "ok": true, "enabled": enabled })).into_response() +} + +pub async fn job_runs( + session: Session, + db: Database, + pool: &PgPool, + job_name: &str, +) -> cot::Result { + if let Err(response) = require_admin_json(&session, &db).await { + return Ok(response); + } + let runs = load_runs_for_job(pool, job_name, 40) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + Json(JobRunsDto { + job_name: job_name.to_owned(), + runs, + }) + .into_response() +} + +pub async fn job_run_detail( + session: Session, + db: Database, + pool: &PgPool, + run_id: i64, +) -> cot::Result { + if let Err(response) = require_admin_json(&session, &db).await { + return Ok(response); + } + let row = sqlx::query_as::<_, JobRunDetailRow>( + "SELECT id, job_name::text AS job_name, status::text AS status, started_at::text AS started_at, \ + finished_at, duration_ms, trigger::text AS trigger, error_message, \ + LEFT(COALESCE(log_output, ''), 1600) AS log_excerpt, log_output \ + FROM furumusic__job_run WHERE id = $1", + ) + .bind(run_id) + .fetch_optional(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + + let Some(row) = row else { + return Ok(json_error(StatusCode::NOT_FOUND, "run not found")); + }; + + let log_output = row.log_output.clone().unwrap_or_default(); + Json(JobRunDetailDto { + run: row.into(), + log_output, + }) + .into_response() +} + +pub async fn library( + session: Session, + db: Database, + pool: &PgPool, + query: LibraryQuery, +) -> cot::Result { + if let Err(response) = require_admin_json(&session, &db).await { + return Ok(response); + } + let page = load_library_page(pool, query) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + Json(page).into_response() +} + +pub async fn update_library_item( + session: Session, + db: Database, + pool: &PgPool, + Json(body): Json, +) -> cot::Result { + if let Err(response) = require_admin_json(&session, &db).await { + return Ok(response); + } + + let kind = normalize_library_kind(Some(body.kind.as_str())); + let title = body.title.trim(); + if title.is_empty() { + return Ok(json_error(StatusCode::BAD_REQUEST, "title cannot be empty")); + } + + let now = now_string(); + let affected = match kind.as_str() { + "artists" => { + sqlx::query( + "UPDATE furumusic__artist \ + SET name = $1, name_sort = $2, is_hidden = $3, updated_at = $4 \ + WHERE id = $5", + ) + .bind(title) + .bind(normalize_name(title)) + .bind(body.hidden) + .bind(&now) + .bind(body.id) + .execute(pool) + .await + } + "releases" => { + sqlx::query( + "UPDATE furumusic__release \ + SET title = $1, title_sort = $2, is_hidden = $3, updated_at = $4 \ + WHERE id = $5", + ) + .bind(title) + .bind(normalize_name(title)) + .bind(body.hidden) + .bind(&now) + .bind(body.id) + .execute(pool) + .await + } + "playlists" => { + sqlx::query( + "UPDATE furumusic__playlist \ + SET title = $1, is_public = $2, updated_at = $3 \ + WHERE id = $4", + ) + .bind(title) + .bind(!body.hidden) + .bind(&now) + .bind(body.id) + .execute(pool) + .await + } + _ => unreachable!(), + } + .map_err(|e| cot::Error::internal(e.to_string()))? + .rows_affected(); + + if affected == 0 { + return Ok(json_error(StatusCode::NOT_FOUND, "library item not found")); + } + + let Some(item) = fetch_library_item(pool, &kind, body.id) + .await + .map_err(|e| cot::Error::internal(e.to_string()))? + else { + return Ok(json_error(StatusCode::NOT_FOUND, "library item not found")); + }; + + Json(item).into_response() +} + +pub async fn bulk_library( + session: Session, + db: Database, + pool: &PgPool, + Json(body): Json, +) -> cot::Result { + if let Err(response) = require_admin_json(&session, &db).await { + return Ok(response); + } + + let kind = normalize_library_kind(Some(body.kind.as_str())); + let action = body.action.trim(); + if !matches!(action, "hide" | "show" | "delete") { + return Ok(json_error( + StatusCode::BAD_REQUEST, + "unknown library action", + )); + } + + let mut ids = if body.mode.as_deref() == Some("filter") { + library_ids_by_filter(pool, &kind, body.filter.unwrap_or_default()).await? + } else { + body.ids.unwrap_or_default() + }; + ids.retain(|id| *id > 0); + ids.sort_unstable(); + ids.dedup(); + if ids.is_empty() { + return Json(MutationResponse { + ok: true, + affected: 0, + }) + .into_response(); + } + + let affected = apply_library_action(pool, &kind, action, &ids).await?; + Json(MutationResponse { ok: true, affected }).into_response() +} + +async fn require_admin_json( + session: &Session, + db: &Database, +) -> Result { + let Some(user) = auth::get_session_user(session, db).await else { + return Err(json_error(StatusCode::UNAUTHORIZED, "not authenticated")); + }; + if user.role != Role::Admin { + return Err(json_error(StatusCode::FORBIDDEN, "forbidden")); + } + Ok(user) +} + +fn json_error(status: StatusCode, message: &str) -> cot::response::Response { + let body = serde_json::json!({ "error": message }); + cot::http::Response::builder() + .status(status) + .header(CONTENT_TYPE, "application/json") + .body(Body::fixed(body.to_string())) + .expect("valid response") +} + +fn build_dto() -> BuildDto { + BuildDto { + package: BUILD_INFO.pkg_name, + version: BUILD_INFO.pkg_version, + profile: BUILD_INFO.profile, + target: BUILD_INFO.target, + rustc_version: BUILD_INFO.rustc_version, + } +} + +async fn sync_registered_jobs(db: &Database, registry: &JobRegistry) { + for job in registry.all_jobs() { + if let Err(e) = + ScheduledJob::upsert(db, job.name(), job.description(), job.default_cron()).await + { + tracing::error!("failed to upsert scheduled job {}: {e}", job.name()); + } + } + if let Ok(all) = ScheduledJob::list_all(db).await { + for sched_job in all { + if registry.get(sched_job.name_str()).is_none() { + tracing::warn!("removing orphaned scheduled job '{}'", sched_job.name_str()); + let _ = ScheduledJob::delete_by_name(db, sched_job.name_str()).await; + } + } + } +} + +async fn load_overview_stats(pool: &PgPool) -> anyhow::Result { + let tracks: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM furumusic__track") + .fetch_one(pool) + .await?; + let releases: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM furumusic__release") + .fetch_one(pool) + .await?; + let artists: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM furumusic__artist") + .fetch_one(pool) + .await?; + let playlists: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM furumusic__playlist") + .fetch_one(pool) + .await?; + let hidden_tracks: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM furumusic__track WHERE is_hidden") + .fetch_one(pool) + .await?; + let hidden_releases: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM furumusic__release WHERE is_hidden") + .fetch_one(pool) + .await?; + let hidden_artists: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM furumusic__artist WHERE is_hidden") + .fetch_one(pool) + .await?; + + Ok(OverviewStatsDto { + tracks, + releases, + artists, + playlists, + hidden_tracks, + hidden_releases, + hidden_artists, + }) +} + +async fn load_library_overview(pool: &PgPool) -> anyhow::Result { + let stats = load_overview_stats(pool).await?; + Ok(LibraryOverviewDto { + artists: stats.artists, + releases: stats.releases, + tracks: stats.tracks, + playlists: stats.playlists, + }) +} + +async fn load_review_page(pool: &PgPool, query: ReviewsQuery) -> anyhow::Result { + let limit = query.limit.unwrap_or(80).clamp(10, 250); + let offset = query.offset.unwrap_or(0).max(0); + let status = normalize_status(query.status.as_deref()); + let search = clean_search(query.search.as_deref()); + let search_pattern = search.as_ref().map(|s| format!("%{s}%")); + + let total = count_reviews(pool, status.clone(), search_pattern.clone()).await?; + let status_counts = load_review_status_counts(pool, None, search_pattern.clone()).await?; + + let mut qb = QueryBuilder::::new( + "SELECT id, job_run_id, review_type::text AS review_type, input_path, context_json, \ + result_json, status::text AS status, created_at::text AS created_at, \ + updated_at::text AS updated_at, error_message \ + FROM furumusic__pending_review WHERE 1=1", + ); + push_review_filters(&mut qb, status.clone(), search_pattern.clone()); + qb.push(" ORDER BY id DESC LIMIT "); + qb.push_bind(limit); + qb.push(" OFFSET "); + qb.push_bind(offset); + + let rows = qb.build_query_as::().fetch_all(pool).await?; + let stats = load_review_stats(pool, rows.iter().map(|row| row.id).collect()).await?; + let media = load_review_media(pool, &rows).await?; + let items = rows + .into_iter() + .map(|row| review_dto(row, &stats, &media)) + .collect(); + + Ok(ReviewPageDto { + items, + total, + limit, + offset, + status, + search, + status_counts, + }) +} + +async fn count_reviews( + pool: &PgPool, + status: Option, + search_pattern: Option, +) -> anyhow::Result { + let mut qb = QueryBuilder::::new( + "SELECT COUNT(*) AS count FROM furumusic__pending_review WHERE 1=1", + ); + push_review_filters(&mut qb, status, search_pattern); + Ok(qb.build_query_as::().fetch_one(pool).await?.count) +} + +async fn load_review_status_counts( + pool: &PgPool, + status: Option, + search_pattern: Option, +) -> anyhow::Result> { + let mut qb = QueryBuilder::::new( + "SELECT status::text AS status, COUNT(*) AS count \ + FROM furumusic__pending_review WHERE 1=1", + ); + push_review_filters(&mut qb, status, search_pattern); + qb.push(" GROUP BY status ORDER BY status"); + + let rows = qb + .build_query_as::() + .fetch_all(pool) + .await?; + Ok(rows + .into_iter() + .map(|row| StatusCountDto { + status: row.status, + count: row.count, + }) + .collect()) +} + +fn push_review_filters( + qb: &mut QueryBuilder<'_, Postgres>, + status: Option, + search_pattern: Option, +) { + if let Some(status) = status { + qb.push(" AND status = "); + qb.push_bind(status); + } + if let Some(pattern) = search_pattern { + qb.push(" AND (input_path ILIKE "); + qb.push_bind(pattern.clone()); + qb.push(" OR review_type::text ILIKE "); + qb.push_bind(pattern.clone()); + qb.push(" OR COALESCE(error_message, '') ILIKE "); + qb.push_bind(pattern); + qb.push(")"); + } +} + +async fn load_review_stats( + pool: &PgPool, + ids: Vec, +) -> anyhow::Result> { + if ids.is_empty() { + return Ok(HashMap::new()); + } + let rows = sqlx::query_as::<_, ReviewStatsRow>( + "SELECT pending_review_id, model_name::text AS model_name, llm_duration_ms, \ + prompt_tokens, completion_tokens \ + FROM furumusic__processing_stats WHERE pending_review_id = ANY($1)", + ) + .bind(&ids) + .fetch_all(pool) + .await?; + Ok(rows + .into_iter() + .map(|row| (row.pending_review_id, row)) + .collect()) +} + +async fn load_review_media( + pool: &PgPool, + rows: &[ReviewRow], +) -> anyhow::Result> { + let mut hashes = rows + .iter() + .filter_map(|row| context_sha256(row.context_json.as_deref().unwrap_or(""))) + .collect::>(); + hashes.sort(); + hashes.dedup(); + if hashes.is_empty() { + return Ok(HashMap::new()); + } + + let media_rows = sqlx::query_as::<_, ReviewMediaRow>( + "SELECT sha256_hash::text AS sha256_hash, original_filename::text AS original_filename, \ + file_size_bytes, audio_format, audio_bitrate, audio_sample_rate, audio_bit_depth \ + FROM furumusic__media_file \ + WHERE file_type = 'audio' AND sha256_hash = ANY($1)", + ) + .bind(&hashes) + .fetch_all(pool) + .await?; + + Ok(media_rows + .into_iter() + .map(|row| (row.sha256_hash.to_ascii_lowercase(), row)) + .collect()) +} + +fn review_dto( + row: ReviewRow, + stats: &HashMap, + media: &HashMap, +) -> ReviewDto { + let input_path = row.input_path.unwrap_or_default(); + let filename = file_name(&input_path); + let sha = context_sha256(row.context_json.as_deref().unwrap_or("")); + let media_row = sha.as_ref().and_then(|hash| media.get(hash)); + let tags = media_row.map(media_tags).unwrap_or_default(); + let stat = stats.get(&row.id); + let confidence = row + .result_json + .as_deref() + .and_then(|json| serde_json::from_str::(json).ok()) + .and_then(|value| value.get("confidence").and_then(|v| v.as_f64())); + + ReviewDto { + id: row.id, + job_run_id: row.job_run_id, + review_type: row.review_type, + display_path: compact_path_tail(&input_path, 96), + input_path, + filename, + status: row.status, + confidence, + model_name: stat.map(|s| s.model_name.clone()), + llm_duration_ms: stat.map(|s| s.llm_duration_ms), + token_count: stat.map(|s| s.prompt_tokens + s.completion_tokens), + tags, + error_message: row.error_message, + created_at: row.created_at, + updated_at: row.updated_at, + } +} + +fn media_tags(row: &ReviewMediaRow) -> Vec { + let mut tags = Vec::new(); + if let Some(format) = row.audio_format.as_deref().filter(|s| !s.trim().is_empty()) { + tags.push(tag(format.to_ascii_lowercase(), "format")); + } else if let Some(ext) = file_extension(&row.original_filename) { + tags.push(tag(ext, "format")); + } + if let Some(bitrate) = row.audio_bitrate { + tags.push(tag(format!("{bitrate} kbps"), "bitrate")); + } + if let Some(sample_rate) = row.audio_sample_rate { + if sample_rate % 1000 == 0 { + tags.push(tag(format!("{} kHz", sample_rate / 1000), "sample")); + } else { + tags.push(tag( + format!("{:.1} kHz", sample_rate as f64 / 1000.0), + "sample", + )); + } + } + if let Some(bit_depth) = row.audio_bit_depth { + tags.push(tag(format!("{bit_depth}-bit"), "depth")); + } + tags.push(tag(size_display(row.file_size_bytes), "size")); + tags +} + +async fn apply_review_filter_action( + pool: &PgPool, + action: &str, + filter: ReviewFilter, +) -> cot::Result { + let status = normalize_status(filter.status.as_deref()); + let search_pattern = clean_search(filter.search.as_deref()).map(|s| format!("%{s}%")); + + let result = if action == "delete" { + let mut qb = + QueryBuilder::::new("DELETE FROM furumusic__pending_review WHERE 1=1"); + push_review_filters(&mut qb, status, search_pattern); + qb.build() + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))? + } else { + let now = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(); + let mut qb = QueryBuilder::::new( + "UPDATE furumusic__pending_review \ + SET status = 'queued', error_message = NULL, updated_at = ", + ); + qb.push_bind(now); + qb.push(" WHERE 1=1"); + push_review_filters(&mut qb, status, search_pattern); + qb.build() + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))? + }; + + Ok(result.rows_affected()) +} + +async fn load_jobs( + db: &Database, + pool: &PgPool, + _registry: &JobRegistry, +) -> anyhow::Result> { + let mut jobs = ScheduledJob::list_all(db).await?; + jobs.sort_by(|a, b| a.name_str().cmp(b.name_str())); + let recent_runs = load_recent_runs_per_job(pool, 8).await?; + let mut runs_by_job: HashMap> = HashMap::new(); + for run in recent_runs { + runs_by_job + .entry(run.job_name.clone()) + .or_default() + .push(run); + } + + Ok(jobs + .into_iter() + .map(|job| { + let runs = runs_by_job.remove(job.name_str()).unwrap_or_default(); + let is_running = runs.iter().any(|run| run.status == "running"); + let last_run = runs.first(); + let health = if !job.enabled { + "disabled" + } else if is_running { + "running" + } else if last_run.is_some_and(|run| run.status == "failed") { + "failed" + } else if last_run.is_some() { + "ok" + } else { + "idle" + }; + JobDto { + name: job.name_str().to_owned(), + description: job.description_str().to_owned(), + cron_expression: job.cron_expression_str().to_owned(), + enabled: job.enabled, + health: health.to_owned(), + is_running, + last_run_at: optional_job_time(job.last_run_at_str()), + next_run_at: optional_job_time(job.next_run_at_str()), + recent_runs: runs, + } + }) + .collect()) +} + +async fn load_recent_runs(pool: &PgPool, limit: i64) -> anyhow::Result> { + let rows = sqlx::query_as::<_, JobRunRow>( + "SELECT id, job_name::text AS job_name, status::text AS status, started_at::text AS started_at, \ + finished_at, duration_ms, trigger::text AS trigger, error_message, \ + LEFT(COALESCE(log_output, ''), 1600) AS log_excerpt \ + FROM furumusic__job_run ORDER BY id DESC LIMIT $1", + ) + .bind(limit) + .fetch_all(pool) + .await?; + Ok(rows.into_iter().map(Into::into).collect()) +} + +async fn load_recent_runs_per_job(pool: &PgPool, per_job: i64) -> anyhow::Result> { + let rows = sqlx::query_as::<_, JobRunRow>( + "WITH ranked AS ( \ + SELECT id, job_name::text AS job_name, status::text AS status, started_at::text AS started_at, \ + finished_at, duration_ms, trigger::text AS trigger, error_message, \ + LEFT(COALESCE(log_output, ''), 1600) AS log_excerpt, \ + ROW_NUMBER() OVER (PARTITION BY job_name ORDER BY id DESC) AS rn \ + FROM furumusic__job_run \ + ) \ + SELECT id, job_name, status, started_at, finished_at, duration_ms, trigger, error_message, log_excerpt \ + FROM ranked WHERE rn <= $1 ORDER BY id DESC", + ) + .bind(per_job) + .fetch_all(pool) + .await?; + Ok(rows.into_iter().map(Into::into).collect()) +} + +async fn load_runs_for_job( + pool: &PgPool, + job_name: &str, + limit: i64, +) -> anyhow::Result> { + let rows = sqlx::query_as::<_, JobRunRow>( + "SELECT id, job_name::text AS job_name, status::text AS status, started_at::text AS started_at, \ + finished_at, duration_ms, trigger::text AS trigger, error_message, \ + LEFT(COALESCE(log_output, ''), 1600) AS log_excerpt \ + FROM furumusic__job_run WHERE job_name = $1 ORDER BY id DESC LIMIT $2", + ) + .bind(job_name) + .bind(limit) + .fetch_all(pool) + .await?; + Ok(rows.into_iter().map(Into::into).collect()) +} + +async fn load_library_page(pool: &PgPool, query: LibraryQuery) -> anyhow::Result { + let kind = normalize_library_kind(query.kind.as_deref()); + let limit = query.limit.unwrap_or(40).clamp(10, 120); + let offset = query.offset.unwrap_or(0).max(0); + let search = clean_search(query.search.as_deref()); + let search_pattern = search.as_ref().map(|s| format!("%{s}%")); + + let total = count_library(pool, &kind, search_pattern.clone()).await?; + let rows = match kind.as_str() { + "releases" => load_release_items(pool, search_pattern.clone(), limit, offset).await?, + "playlists" => load_playlist_items(pool, search_pattern.clone(), limit, offset).await?, + _ => load_artist_items(pool, search_pattern.clone(), limit, offset).await?, + }; + let items = rows + .into_iter() + .map(|row| library_item_dto(&kind, row)) + .collect(); + + Ok(LibraryPageDto { + kind, + items, + total, + limit, + offset, + search, + }) +} + +async fn fetch_library_item( + pool: &PgPool, + kind: &str, + id: i64, +) -> anyhow::Result> { + let row = match kind { + "releases" => { + sqlx::query_as::<_, LibraryItemRow>( + "SELECT r.id, r.title::text AS title, \ + CONCAT(r.release_type::text, COALESCE(' · ' || r.year::text, '')) AS subtitle, \ + r.is_hidden, COUNT(DISTINCT t.id)::bigint AS primary_count, \ + COUNT(DISTINCT ra.artist_id)::bigint AS secondary_count, \ + COUNT(DISTINCT ph.id)::bigint AS tertiary_count, \ + r.updated_at::text AS updated_at \ + FROM furumusic__release r \ + LEFT JOIN furumusic__track t ON t.release_id = r.id \ + LEFT JOIN furumusic__release_artist ra ON ra.release_id = r.id \ + LEFT JOIN furumusic__play_history ph ON ph.track_id = t.id \ + WHERE r.id = $1 \ + GROUP BY r.id", + ) + .bind(id) + .fetch_optional(pool) + .await? + } + "playlists" => { + sqlx::query_as::<_, LibraryItemRow>( + "SELECT p.id, p.title::text AS title, \ + COALESCE(u.display_name, u.username, 'unknown') AS subtitle, \ + (NOT p.is_public) AS is_hidden, COUNT(pt.id)::bigint AS primary_count, \ + CASE WHEN p.is_public THEN 1 ELSE 0 END::bigint AS secondary_count, \ + 0::bigint AS tertiary_count, p.updated_at::text AS updated_at \ + FROM furumusic__playlist p \ + LEFT JOIN furumusic__playlist_track pt ON pt.playlist_id = p.id \ + LEFT JOIN furumusic__user u ON u.id = p.owner_id \ + WHERE p.id = $1 \ + GROUP BY p.id, u.display_name, u.username", + ) + .bind(id) + .fetch_optional(pool) + .await? + } + _ => { + sqlx::query_as::<_, LibraryItemRow>( + "SELECT a.id, a.name::text AS title, NULL::text AS subtitle, a.is_hidden, \ + COUNT(DISTINCT ra.release_id)::bigint AS primary_count, \ + COUNT(DISTINCT ta.track_id)::bigint AS secondary_count, \ + COUNT(DISTINCT ufa.user_id)::bigint AS tertiary_count, \ + a.updated_at::text AS updated_at \ + FROM furumusic__artist a \ + LEFT JOIN furumusic__release_artist ra ON ra.artist_id = a.id \ + LEFT JOIN furumusic__track_artist ta ON ta.artist_id = a.id \ + LEFT JOIN furumusic__user_followed_artist ufa ON ufa.artist_id = a.id \ + WHERE a.id = $1 \ + GROUP BY a.id", + ) + .bind(id) + .fetch_optional(pool) + .await? + } + }; + + Ok(row.map(|row| library_item_dto(kind, row))) +} + +async fn library_ids_by_filter( + pool: &PgPool, + kind: &str, + filter: LibraryFilter, +) -> cot::Result> { + let search_pattern = clean_search(filter.search.as_deref()).map(|s| format!("%{s}%")); + let mut qb = match kind { + "releases" => QueryBuilder::::new( + "SELECT DISTINCT r.id \ + FROM furumusic__release r \ + LEFT JOIN furumusic__release_artist ra ON ra.release_id = r.id \ + LEFT JOIN furumusic__artist a ON a.id = ra.artist_id WHERE 1=1", + ), + "playlists" => QueryBuilder::::new( + "SELECT DISTINCT p.id \ + FROM furumusic__playlist p \ + LEFT JOIN furumusic__user u ON u.id = p.owner_id WHERE 1=1", + ), + _ => { + QueryBuilder::::new("SELECT DISTINCT a.id FROM furumusic__artist a WHERE 1=1") + } + }; + push_library_search_filter(&mut qb, kind, search_pattern); + let rows = qb + .build_query_as::() + .fetch_all(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + Ok(rows.into_iter().map(|row| row.id).collect()) +} + +async fn apply_library_action( + pool: &PgPool, + kind: &str, + action: &str, + ids: &[i64], +) -> cot::Result { + match action { + "hide" | "show" => set_library_visibility(pool, kind, ids, action == "hide").await, + "delete" => delete_library_items(pool, kind, ids).await, + _ => Ok(0), + } +} + +async fn set_library_visibility( + pool: &PgPool, + kind: &str, + ids: &[i64], + hidden: bool, +) -> cot::Result { + let now = now_string(); + let result = + match kind { + "releases" => sqlx::query( + "UPDATE furumusic__release SET is_hidden = $1, updated_at = $2 WHERE id = ANY($3)", + ) + .bind(hidden) + .bind(&now) + .bind(ids) + .execute(pool) + .await, + "playlists" => sqlx::query( + "UPDATE furumusic__playlist SET is_public = $1, updated_at = $2 WHERE id = ANY($3)", + ) + .bind(!hidden) + .bind(&now) + .bind(ids) + .execute(pool) + .await, + _ => sqlx::query( + "UPDATE furumusic__artist SET is_hidden = $1, updated_at = $2 WHERE id = ANY($3)", + ) + .bind(hidden) + .bind(&now) + .bind(ids) + .execute(pool) + .await, + } + .map_err(|e| cot::Error::internal(e.to_string()))?; + Ok(result.rows_affected()) +} + +async fn delete_library_items(pool: &PgPool, kind: &str, ids: &[i64]) -> cot::Result { + match kind { + "releases" => delete_releases(pool, ids).await, + "playlists" => delete_playlists(pool, ids).await, + _ => delete_artists(pool, ids).await, + } +} + +async fn delete_artists(pool: &PgPool, ids: &[i64]) -> cot::Result { + sqlx::query("DELETE FROM furumusic__user_followed_artist WHERE artist_id = ANY($1)") + .bind(ids) + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + sqlx::query("DELETE FROM furumusic__track_artist WHERE artist_id = ANY($1)") + .bind(ids) + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + sqlx::query("DELETE FROM furumusic__release_artist WHERE artist_id = ANY($1)") + .bind(ids) + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + let result = sqlx::query("DELETE FROM furumusic__artist WHERE id = ANY($1)") + .bind(ids) + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + Ok(result.rows_affected()) +} + +async fn delete_releases(pool: &PgPool, ids: &[i64]) -> cot::Result { + let track_ids = + sqlx::query_as::<_, IdRow>("SELECT id FROM furumusic__track WHERE release_id = ANY($1)") + .bind(ids) + .fetch_all(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))? + .into_iter() + .map(|row| row.id) + .collect::>(); + + if !track_ids.is_empty() { + sqlx::query("DELETE FROM furumusic__playlist_track WHERE track_id = ANY($1)") + .bind(&track_ids) + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + sqlx::query("DELETE FROM furumusic__user_liked_track WHERE track_id = ANY($1)") + .bind(&track_ids) + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + sqlx::query("DELETE FROM furumusic__play_history WHERE track_id = ANY($1)") + .bind(&track_ids) + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + sqlx::query("DELETE FROM furumusic__track_genre WHERE track_id = ANY($1)") + .bind(&track_ids) + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + sqlx::query("DELETE FROM furumusic__track_artist WHERE track_id = ANY($1)") + .bind(&track_ids) + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + } + + sqlx::query("DELETE FROM furumusic__track WHERE release_id = ANY($1)") + .bind(ids) + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + sqlx::query("DELETE FROM furumusic__release_artist WHERE release_id = ANY($1)") + .bind(ids) + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + let result = sqlx::query("DELETE FROM furumusic__release WHERE id = ANY($1)") + .bind(ids) + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + Ok(result.rows_affected()) +} + +async fn delete_playlists(pool: &PgPool, ids: &[i64]) -> cot::Result { + sqlx::query("DELETE FROM furumusic__playlist_track WHERE playlist_id = ANY($1)") + .bind(ids) + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + sqlx::query("DELETE FROM furumusic__saved_playlist WHERE playlist_id = ANY($1)") + .bind(ids) + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + let result = sqlx::query("DELETE FROM furumusic__playlist WHERE id = ANY($1)") + .bind(ids) + .execute(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + Ok(result.rows_affected()) +} + +async fn count_library( + pool: &PgPool, + kind: &str, + search_pattern: Option, +) -> anyhow::Result { + let mut qb = match kind { + "releases" => QueryBuilder::::new( + "SELECT COUNT(DISTINCT r.id) AS count \ + FROM furumusic__release r \ + LEFT JOIN furumusic__release_artist ra ON ra.release_id = r.id \ + LEFT JOIN furumusic__artist a ON a.id = ra.artist_id WHERE 1=1", + ), + "playlists" => QueryBuilder::::new( + "SELECT COUNT(DISTINCT p.id) AS count \ + FROM furumusic__playlist p \ + LEFT JOIN furumusic__user u ON u.id = p.owner_id WHERE 1=1", + ), + _ => QueryBuilder::::new( + "SELECT COUNT(DISTINCT a.id) AS count FROM furumusic__artist a WHERE 1=1", + ), + }; + + push_library_search_filter(&mut qb, kind, search_pattern); + + Ok(qb.build_query_as::().fetch_one(pool).await?.count) +} + +fn push_library_search_filter( + qb: &mut QueryBuilder<'_, Postgres>, + kind: &str, + search_pattern: Option, +) { + let Some(pattern) = search_pattern else { + return; + }; + match kind { + "releases" => { + qb.push(" AND (r.title ILIKE "); + qb.push_bind(pattern.clone()); + qb.push(" OR a.name ILIKE "); + qb.push_bind(pattern); + qb.push(")"); + } + "playlists" => { + qb.push(" AND (p.title ILIKE "); + qb.push_bind(pattern.clone()); + qb.push(" OR COALESCE(p.description, '') ILIKE "); + qb.push_bind(pattern.clone()); + qb.push(" OR COALESCE(u.display_name, u.username, '') ILIKE "); + qb.push_bind(pattern); + qb.push(")"); + } + _ => { + qb.push(" AND a.name ILIKE "); + qb.push_bind(pattern); + } + } +} + +async fn load_artist_items( + pool: &PgPool, + search_pattern: Option, + limit: i64, + offset: i64, +) -> anyhow::Result> { + let mut qb = QueryBuilder::::new( + "SELECT a.id, a.name::text AS title, NULL::text AS subtitle, a.is_hidden, \ + COUNT(DISTINCT ra.release_id)::bigint AS primary_count, \ + COUNT(DISTINCT ta.track_id)::bigint AS secondary_count, \ + COUNT(DISTINCT ufa.user_id)::bigint AS tertiary_count, \ + a.updated_at::text AS updated_at \ + FROM furumusic__artist a \ + LEFT JOIN furumusic__release_artist ra ON ra.artist_id = a.id \ + LEFT JOIN furumusic__track_artist ta ON ta.artist_id = a.id \ + LEFT JOIN furumusic__user_followed_artist ufa ON ufa.artist_id = a.id \ + WHERE 1=1", + ); + if let Some(pattern) = search_pattern { + qb.push(" AND a.name ILIKE "); + qb.push_bind(pattern); + } + qb.push(" GROUP BY a.id ORDER BY secondary_count DESC, a.name ASC LIMIT "); + qb.push_bind(limit); + qb.push(" OFFSET "); + qb.push_bind(offset); + Ok(qb + .build_query_as::() + .fetch_all(pool) + .await?) +} + +async fn load_release_items( + pool: &PgPool, + search_pattern: Option, + limit: i64, + offset: i64, +) -> anyhow::Result> { + let mut qb = QueryBuilder::::new( + "SELECT r.id, r.title::text AS title, \ + CONCAT(r.release_type::text, COALESCE(' · ' || r.year::text, '')) AS subtitle, \ + r.is_hidden, COUNT(DISTINCT t.id)::bigint AS primary_count, \ + COUNT(DISTINCT ra.artist_id)::bigint AS secondary_count, \ + COUNT(DISTINCT ph.id)::bigint AS tertiary_count, \ + r.updated_at::text AS updated_at \ + FROM furumusic__release r \ + LEFT JOIN furumusic__track t ON t.release_id = r.id \ + LEFT JOIN furumusic__release_artist ra ON ra.release_id = r.id \ + LEFT JOIN furumusic__artist a ON a.id = ra.artist_id \ + LEFT JOIN furumusic__play_history ph ON ph.track_id = t.id \ + WHERE 1=1", + ); + if let Some(pattern) = search_pattern { + qb.push(" AND (r.title ILIKE "); + qb.push_bind(pattern.clone()); + qb.push(" OR a.name ILIKE "); + qb.push_bind(pattern); + qb.push(")"); + } + qb.push(" GROUP BY r.id ORDER BY r.updated_at DESC, r.title ASC LIMIT "); + qb.push_bind(limit); + qb.push(" OFFSET "); + qb.push_bind(offset); + Ok(qb + .build_query_as::() + .fetch_all(pool) + .await?) +} + +async fn load_playlist_items( + pool: &PgPool, + search_pattern: Option, + limit: i64, + offset: i64, +) -> anyhow::Result> { + let mut qb = QueryBuilder::::new( + "SELECT p.id, p.title::text AS title, \ + COALESCE(u.display_name, u.username, 'unknown') AS subtitle, \ + (NOT p.is_public) AS is_hidden, COUNT(pt.id)::bigint AS primary_count, \ + CASE WHEN p.is_public THEN 1 ELSE 0 END::bigint AS secondary_count, \ + 0::bigint AS tertiary_count, p.updated_at::text AS updated_at \ + FROM furumusic__playlist p \ + LEFT JOIN furumusic__playlist_track pt ON pt.playlist_id = p.id \ + LEFT JOIN furumusic__user u ON u.id = p.owner_id \ + WHERE 1=1", + ); + if let Some(pattern) = search_pattern { + qb.push(" AND (p.title ILIKE "); + qb.push_bind(pattern.clone()); + qb.push(" OR COALESCE(p.description, '') ILIKE "); + qb.push_bind(pattern.clone()); + qb.push(" OR COALESCE(u.display_name, u.username, '') ILIKE "); + qb.push_bind(pattern); + qb.push(")"); + } + qb.push( + " GROUP BY p.id, u.display_name, u.username ORDER BY p.updated_at DESC, p.title ASC LIMIT ", + ); + qb.push_bind(limit); + qb.push(" OFFSET "); + qb.push_bind(offset); + Ok(qb + .build_query_as::() + .fetch_all(pool) + .await?) +} + +fn library_item_dto(kind: &str, row: LibraryItemRow) -> LibraryItemDto { + let tags = match kind { + "releases" => vec![ + tag(format!("{} tracks", row.primary_count), "count"), + tag(format!("{} artists", row.secondary_count), "relation"), + tag(format!("{} plays", row.tertiary_count), "plays"), + ], + "playlists" => vec![ + tag(format!("{} tracks", row.primary_count), "count"), + tag( + if row.secondary_count > 0 { + "public" + } else { + "private" + }, + "visibility", + ), + ], + _ => vec![ + tag(format!("{} tracks", row.secondary_count), "count"), + tag(format!("{} releases", row.primary_count), "relation"), + tag(format!("{} followers", row.tertiary_count), "followers"), + ], + }; + + LibraryItemDto { + id: row.id, + kind: kind.to_owned(), + title: row.title, + subtitle: row.subtitle.unwrap_or_default(), + is_hidden: row.is_hidden, + tags, + updated_at: row.updated_at, + } +} + +impl Default for ReviewFilter { + fn default() -> Self { + Self { + status: None, + search: None, + } + } +} + +impl From for JobRunDto { + fn from(row: JobRunRow) -> Self { + Self { + id: row.id, + job_name: row.job_name, + status: row.status, + started_at: row.started_at, + finished_at: row.finished_at, + duration_ms: row.duration_ms, + trigger: row.trigger, + error_message: row.error_message, + log_excerpt: row.log_excerpt, + } + } +} + +impl From for JobRunDto { + fn from(row: JobRunDetailRow) -> Self { + Self { + id: row.id, + job_name: row.job_name, + status: row.status, + started_at: row.started_at, + finished_at: row.finished_at, + duration_ms: row.duration_ms, + trigger: row.trigger, + error_message: row.error_message, + log_excerpt: row.log_excerpt, + } + } +} + +fn tag(label: impl Into, kind: impl Into) -> TagDto { + TagDto { + label: label.into(), + kind: kind.into(), + } +} + +fn optional_job_time(value: &str) -> Option { + if value.is_empty() { + None + } else { + Some(value.to_owned()) + } +} + +fn normalize_library_kind(kind: Option<&str>) -> String { + match kind { + Some("releases") => "releases", + Some("playlists") => "playlists", + _ => "artists", + } + .to_owned() +} + +fn normalize_status(status: Option<&str>) -> Option { + let status = status?.trim(); + if status.is_empty() || status == "all" { + return None; + } + Some(status.to_owned()) +} + +fn clean_search(search: Option<&str>) -> Option { + search + .map(str::trim) + .filter(|s| !s.is_empty()) + .map(|s| s.chars().take(120).collect()) +} + +fn normalize_name(value: &str) -> String { + value.trim().to_lowercase() +} + +fn now_string() -> String { + chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string() +} + +fn context_sha256(context_json: &str) -> Option { + let value = serde_json::from_str::(context_json).ok()?; + let sha = value.get("sha256")?.as_str()?.trim(); + let is_sha256 = sha.len() == 64 && sha.chars().all(|ch| ch.is_ascii_hexdigit()); + is_sha256.then(|| sha.to_ascii_lowercase()) +} + +fn file_name(path: &str) -> String { + path.replace('\\', "/") + .rsplit('/') + .next() + .unwrap_or(path) + .to_owned() +} + +fn compact_path_tail(path: &str, max_chars: usize) -> String { + let normalized = path.replace('\\', "/"); + if normalized.chars().count() <= max_chars { + return normalized; + } + let filename = file_name(&normalized); + let filename_len = filename.chars().count(); + if filename_len + 4 <= max_chars { + return format!(".../{filename}"); + } + let suffix_len = max_chars.saturating_sub(3); + let suffix = filename + .chars() + .skip(filename_len.saturating_sub(suffix_len)) + .collect::(); + format!("...{suffix}") +} + +fn file_extension(filename: &str) -> Option { + std::path::Path::new(filename) + .extension() + .and_then(|ext| ext.to_str()) + .map(|ext| ext.trim().to_ascii_lowercase()) + .filter(|ext| !ext.is_empty()) +} + +fn size_display(bytes: i64) -> String { + if bytes >= 1_073_741_824 { + format!("{:.1} GB", bytes as f64 / 1_073_741_824.0) + } else if bytes >= 1_048_576 { + format!("{:.1} MB", bytes as f64 / 1_048_576.0) + } else if bytes >= 1024 { + format!("{:.1} KB", bytes as f64 / 1024.0) + } else { + format!("{bytes} B") + } +} diff --git a/src/admin/views.rs b/src/admin/views.rs index a999822..7ec0e17 100644 --- a/src/admin/views.rs +++ b/src/admin/views.rs @@ -211,23 +211,6 @@ pub async fn debug_handler( Ok(Html::new(template.render()?)) } -#[derive(Debug, Template)] -#[template(path = "admin/index.html")] -struct AdminIndexTemplate { - t: &'static Translations, - user_name: String, - user_role: String, -} - -pub async fn admin_index(admin: AuthenticatedUser, i18n: I18n) -> cot::Result { - let template = AdminIndexTemplate { - t: i18n.t, - user_name: admin.name, - user_role: admin.role.code().to_owned(), - }; - Ok(Html::new(template.render()?)) -} - // --------------------------------------------------------------------------- // Settings page // --------------------------------------------------------------------------- diff --git a/src/player/dto.rs b/src/player/dto.rs index b7a74e1..0202245 100644 --- a/src/player/dto.rs +++ b/src/player/dto.rs @@ -108,6 +108,9 @@ pub(super) struct PlaylistCard { pub(super) title: String, pub(super) track_count: i64, pub(super) is_own: bool, + pub(super) owner_name: Option, + pub(super) is_public: bool, + pub(super) is_saved: bool, pub(super) kind: String, } @@ -128,6 +131,9 @@ pub(super) struct PlaylistDetail { pub(super) title: String, pub(super) description: Option, pub(super) is_own: bool, + pub(super) owner_name: Option, + pub(super) is_public: bool, + pub(super) is_saved: bool, pub(super) kind: String, pub(super) tracks: Vec, } diff --git a/src/player/mod.rs b/src/player/mod.rs index 8b00953..78f5c6d 100644 --- a/src/player/mod.rs +++ b/src/player/mod.rs @@ -546,18 +546,33 @@ async fn playlists_handler( title: "Likes".to_string(), track_count: likes_count.0, is_own: true, + owner_name: None, + is_public: false, + is_saved: false, kind: "likes".to_string(), }]; let rows = sqlx::query_as::<_, PlaylistRow>( r#"SELECT p.id, p.title::text as title, COALESCE((SELECT COUNT(*) FROM furumusic__playlist_track pt WHERE pt.playlist_id = p.id), 0) as track_count, - (p.owner_id = $1) as is_own + (p.owner_id = $1) as is_own, + COALESCE(NULLIF(u.display_name, ''), u.username)::text as owner_name, + p.is_public, + EXISTS ( + SELECT 1 FROM furumusic__saved_playlist sp + WHERE sp.user_id = $1 AND sp.playlist_id = p.id + ) as is_saved FROM furumusic__playlist p + JOIN furumusic__user u ON u.id = p.owner_id WHERE p.owner_id = $1 - OR p.id IN (SELECT sp.playlist_id FROM furumusic__saved_playlist sp WHERE sp.user_id = $1) + OR EXISTS ( + SELECT 1 FROM furumusic__saved_playlist sp + WHERE sp.user_id = $1 AND sp.playlist_id = p.id + ) OR p.is_public = true - ORDER BY p.title"#, + ORDER BY + CASE WHEN p.owner_id = $1 THEN 0 WHEN p.is_public THEN 2 ELSE 1 END, + p.title"#, ) .bind(user.id) .fetch_all(pool) @@ -569,6 +584,9 @@ async fn playlists_handler( title: r.title, track_count: r.track_count, is_own: r.is_own, + owner_name: Some(r.owner_name), + is_public: r.is_public, + is_saved: r.is_saved, kind: "user".to_string(), })); @@ -597,9 +615,19 @@ async fn playlist_detail_handler( } let info = sqlx::query_as::<_, PlaylistInfoRow>( - "SELECT id, title::text as title, description, owner_id FROM furumusic__playlist WHERE id = $1", + r#"SELECT p.id, p.title::text as title, p.description, p.owner_id, + COALESCE(NULLIF(u.display_name, ''), u.username)::text as owner_name, + p.is_public, + EXISTS ( + SELECT 1 FROM furumusic__saved_playlist sp + WHERE sp.user_id = $2 AND sp.playlist_id = p.id + ) as is_saved + FROM furumusic__playlist p + JOIN furumusic__user u ON u.id = p.owner_id + WHERE p.id = $1"#, ) .bind(playlist_id) + .bind(user.id) .fetch_optional(pool) .await .map_err(|e| cot::Error::internal(e.to_string()))?; @@ -637,6 +665,9 @@ async fn playlist_detail_handler( title: info.title, description: info.description, is_own: info.owner_id == user.id, + owner_name: Some(info.owner_name), + is_public: info.is_public, + is_saved: info.is_saved, kind: "user".to_string(), tracks: track_items, }) @@ -748,6 +779,9 @@ async fn likes_playlist_handler( title: "Likes".to_string(), description: None, is_own: true, + owner_name: None, + is_public: false, + is_saved: false, kind: "likes".to_string(), tracks: track_items, }) @@ -1429,6 +1463,9 @@ async fn create_playlist_handler( title, track_count: 0, is_own: true, + owner_name: Some(user.name), + is_public: false, + is_saved: false, kind: "user".to_string(), }) .into_response() diff --git a/src/player/rows.rs b/src/player/rows.rs index feca5c2..5a680ff 100644 --- a/src/player/rows.rs +++ b/src/player/rows.rs @@ -77,6 +77,9 @@ pub(super) struct PlaylistRow { pub(super) title: String, pub(super) track_count: i64, pub(super) is_own: bool, + pub(super) owner_name: String, + pub(super) is_public: bool, + pub(super) is_saved: bool, } #[derive(sqlx::FromRow)] @@ -85,6 +88,9 @@ pub(super) struct PlaylistInfoRow { pub(super) title: String, pub(super) description: Option, pub(super) owner_id: i64, + pub(super) owner_name: String, + pub(super) is_public: bool, + pub(super) is_saved: bool, } #[derive(sqlx::FromRow)] diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 23932ec..d0380da 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -1124,6 +1124,34 @@ pub struct SchedulerHandle { } impl SchedulerHandle { + /// Start a job immediately in the background and return the created run id. + pub async fn trigger_job_now_background( + self: Arc, + job_name: &str, + ) -> anyhow::Result { + self.registry + .get(job_name) + .ok_or_else(|| anyhow::anyhow!("unknown job: {job_name}"))?; + + let db = self.shared_db.clone(); + let pool = self.shared_pool.clone(); + let (live_config, _) = AppConfig::load_with_db(&db).await; + let run = JobRun::create_running(&db, job_name, "manual") + .await + .map_err(|e| anyhow::anyhow!("failed to create job run: {e}"))?; + let run_id = run.id_val(); + let job_name = job_name.to_owned(); + let handle = Arc::clone(&self); + + tokio::spawn(async move { + handle + .finish_manual_run(job_name, live_config, db, pool, run) + .await; + }); + + Ok(run_id) + } + /// Execute a job immediately (manual or programmatic trigger). pub async fn trigger_job_now(&self, job_name: &str) -> anyhow::Result { let job_impl = self @@ -1172,6 +1200,51 @@ impl SchedulerHandle { Ok(run.id_val()) } + async fn finish_manual_run( + self: Arc, + job_name: String, + live_config: AppConfig, + db: Database, + pool: sqlx::PgPool, + mut run: JobRun, + ) { + let Some(job_impl) = self.registry.get(&job_name) else { + let _ = run + .set_failed(&db, 0, "", &format!("unknown job: {job_name}")) + .await; + return; + }; + + let start = std::time::Instant::now(); + let ctx = JobContext { + config: Arc::new(live_config), + db: db.clone(), + pool: pool.clone(), + run_id: run.id_val(), + registry: Arc::clone(&self.registry), + }; + let mut log = JobLog::with_live_flush(pool, run.id_val()); + + match job_impl.run(&ctx, &mut log).await { + Ok(()) => { + let duration_ms = start.elapsed().as_millis() as i64; + let _ = run.set_completed(&db, duration_ms, &log.output()).await; + } + Err(e) => { + let duration_ms = start.elapsed().as_millis() as i64; + let _ = run + .set_failed(&db, duration_ms, &log.output(), &e.to_string()) + .await; + } + } + + if let Ok(Some(mut sched_job)) = ScheduledJob::get_by_name(&db, &job_name).await { + sched_job.last_run_at = Some(now_iso().to_string()); + sched_job.updated_at = now_iso(); + let _ = sched_job.save(&db).await; + } + } + /// Remove a cron job from the scheduler and re-add it with a new cron /// expression. Also updates the DB row. pub async fn reschedule_job(&self, job_name: &str, new_cron: &str) -> anyhow::Result<()> { diff --git a/templates/admin/v2.html b/templates/admin/v2.html new file mode 100644 index 0000000..8bdc14d --- /dev/null +++ b/templates/admin/v2.html @@ -0,0 +1,1905 @@ +{% extends "base.html" %} + +{% block title %}FuruMusic Admin v{{ version }}{% endblock title %} + +{% block head_extra %} + +{% endblock head_extra %} + +{% block content %} +
+ + +
+
+
+

+

+
+
+ + + + Debug + + + + Player + +
+
+ +
+
+ +
+ +
+
+
+
+ Pending Reviews + +
+
+
+ +
+ +
+
+ +
+
+ + + +
+
+ + + +
+
+ +
+ + + + + + + + + + + + + + + +
StatusInputTypeConf.TagsUpdated
+
No reviews in this filter
+
+
+ +
+ + + +
+
+
+ +
+
+ +
+
+
+
+
+ Tasks + +
+
+ +
+
+
+ + + + + + + + + + + + + +
TaskStateScheduleLatest RunsActions
+
+
+ +
+ + + +
+
+
+ +
+
+
+ + +
+
+
+
+ + +
+
+ + +
+
+ + +
+
+ + +
+
+ +
+ +
+
+
+
Select a task from the table
+
+ +
+
+
+ Selected Run Log + +
+
+ + +
+
+
+

+                        
No run selected
+
+
+
+
+ +
+
+
+
+
+ Library Workbench + +
+
+
+ + + +
+ +
+
+
+
+ + + +
+
+ + + + + + +
+
+
+ +
No library rows in this filter
+
+
+ +
+ + + +
+
+
+ +
+
+ +
+
+
+
+ Future Tools + Reserved operation slots for complex library maintenance +
+
+
+ + + +
+
+
+
+ + + + + +
+
+ Loading +
+
+ + + + +{% endblock content %} diff --git a/templates/player.html b/templates/player.html index ad8bb4f..5c12cdf 100644 --- a/templates/player.html +++ b/templates/player.html @@ -312,6 +312,61 @@ button.user-stat:hover { color: var(--text-subdued); } +.playlist-public-section { + margin-top: 10px; + padding-top: 10px; + border-top: 1px solid var(--border-color); +} + +.playlist-subtitle { + padding-top: 2px; + padding-bottom: 8px; +} + +.playlist-title-line, +.playlist-meta-line { + min-width: 0; + display: flex; + align-items: center; + gap: 6px; +} + +.playlist-title-text { + min-width: 0; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + +.playlist-item-public { + white-space: normal; +} + +.playlist-meta-line { + margin-top: 2px; + font-size: 11px; + color: var(--text-subdued); + white-space: nowrap; +} + +.playlist-owner { + min-width: 0; + overflow: hidden; + text-overflow: ellipsis; +} + +.playlist-public-badge { + flex-shrink: 0; + padding: 1px 5px; + border-radius: 999px; + background: rgba(52, 211, 153, 0.12); + color: #6ee7b7; + font-size: 10px; + font-weight: 700; + text-transform: uppercase; + letter-spacing: 0.3px; +} + .sidebar-bottom { padding: 12px 16px; border-top: 1px solid var(--border-color); @@ -348,6 +403,15 @@ button.user-stat:hover { margin-bottom: 20px; } +.playlist-detail-meta { + display: flex; + align-items: center; + gap: 8px; + margin: -12px 0 18px; + color: var(--text-subdued); + font-size: 13px; +} + .breadcrumb { display: flex; align-items: center; @@ -2339,7 +2403,7 @@ button.user-stat:hover {
-