diff --git a/furumi-agent/src/db.rs b/furumi-agent/src/db.rs index aa29d31..72ce9d2 100644 --- a/furumi-agent/src/db.rs +++ b/furumi-agent/src/db.rs @@ -326,6 +326,18 @@ pub async fn approve_and_finalize( .fetch_one(pool) .await?; + // Check if track already exists (e.g. previously approved but pending not cleaned up) + let existing: Option<(i64,)> = sqlx::query_as("SELECT id FROM tracks WHERE file_hash = $1") + .bind(&pt.file_hash) + .fetch_optional(pool) + .await?; + + if let Some((track_id,)) = existing { + // Already finalized — just mark pending as approved + update_pending_status(pool, pending_id, "approved", None).await?; + return Ok(track_id); + } + let artist_name = pt.norm_artist.as_deref().unwrap_or("Unknown Artist"); let artist_id = upsert_artist(pool, artist_name).await?; @@ -425,6 +437,16 @@ pub async fn find_album_id(pool: &PgPool, artist_name: &str, album_name: &str) - Ok(row.map(|r| r.0)) } +/// Fetch pending tracks that need (re-)processing by the LLM pipeline. +pub async fn list_pending_for_processing(pool: &PgPool, limit: i64) -> Result, sqlx::Error> { + sqlx::query_as::<_, PendingTrack>( + "SELECT * FROM pending_tracks WHERE status = 'pending' ORDER BY created_at ASC LIMIT $1" + ) + .bind(limit) + .fetch_all(pool) + .await +} + // --- DTOs for insert helpers --- #[derive(Debug, Default)] diff --git a/furumi-agent/src/ingest/mod.rs b/furumi-agent/src/ingest/mod.rs index 3c13e7a..7919e4c 100644 --- a/furumi-agent/src/ingest/mod.rs +++ b/furumi-agent/src/ingest/mod.rs @@ -19,6 +19,12 @@ pub async fn run(state: Arc) { Ok(count) => tracing::info!(count, "processed new files"), Err(e) => tracing::error!(?e, "inbox scan failed"), } + // Re-process pending tracks (e.g. retried from admin UI) + match reprocess_pending(&state).await { + Ok(0) => {} + Ok(count) => tracing::info!(count, "re-processed pending tracks"), + Err(e) => tracing::error!(?e, "pending re-processing failed"), + } tokio::time::sleep(interval).await; } } @@ -61,6 +67,137 @@ async fn scan_inbox(state: &Arc) -> anyhow::Result { Ok(count) } +/// Re-process pending tracks from DB (e.g. tracks retried via admin UI). +/// These already have raw metadata and path hints stored — just need RAG + LLM. +async fn reprocess_pending(state: &Arc) -> anyhow::Result { + let pending = db::list_pending_for_processing(&state.pool, 10).await?; + if pending.is_empty() { + return Ok(0); + } + + let mut count = 0; + for pt in &pending { + tracing::info!(id = %pt.id, title = pt.raw_title.as_deref().unwrap_or("?"), "Re-processing pending track"); + + db::update_pending_status(&state.pool, pt.id, "processing", None).await?; + + // Build raw metadata and hints from stored DB fields + let raw_meta = metadata::RawMetadata { + title: pt.raw_title.clone(), + artist: pt.raw_artist.clone(), + album: pt.raw_album.clone(), + track_number: pt.raw_track_number.map(|n| n as u32), + year: pt.raw_year.map(|n| n as u32), + genre: pt.raw_genre.clone(), + duration_secs: pt.duration_secs, + }; + + let hints = db::PathHints { + title: pt.path_title.clone(), + artist: pt.path_artist.clone(), + album: pt.path_album.clone(), + year: pt.path_year, + track_number: pt.path_track_number, + }; + + // RAG lookup + let artist_query = raw_meta.artist.as_deref() + .or(hints.artist.as_deref()) + .unwrap_or(""); + let album_query = raw_meta.album.as_deref() + .or(hints.album.as_deref()) + .unwrap_or(""); + + let similar_artists = if !artist_query.is_empty() { + db::find_similar_artists(&state.pool, artist_query, 5).await.unwrap_or_default() + } else { + Vec::new() + }; + + let similar_albums = if !album_query.is_empty() { + db::find_similar_albums(&state.pool, album_query, 5).await.unwrap_or_default() + } else { + Vec::new() + }; + + // LLM normalization + match normalize::normalize(state, &raw_meta, &hints, &similar_artists, &similar_albums).await { + Ok(normalized) => { + let confidence = normalized.confidence.unwrap_or(0.0); + let status = if confidence >= state.config.confidence_threshold { + "approved" + } else { + "review" + }; + + tracing::info!( + id = %pt.id, + norm_artist = normalized.artist.as_deref().unwrap_or("-"), + norm_title = normalized.title.as_deref().unwrap_or("-"), + confidence, + status, + "Re-processing complete" + ); + + db::update_pending_normalized(&state.pool, pt.id, status, &normalized, None).await?; + + if status == "approved" { + let artist = normalized.artist.as_deref().unwrap_or("Unknown Artist"); + let album = normalized.album.as_deref().unwrap_or("Unknown Album"); + let title = normalized.title.as_deref().unwrap_or("Unknown Title"); + let source = std::path::Path::new(&pt.inbox_path); + let ext = source.extension().and_then(|e| e.to_str()).unwrap_or("flac"); + let track_num = normalized.track_number.unwrap_or(0); + + let dest_filename = if track_num > 0 { + format!("{:02} - {}.{}", track_num, sanitize_filename(title), ext) + } else { + format!("{}.{}", sanitize_filename(title), ext) + }; + + // Check if already moved + let dest = state.config.storage_dir + .join(sanitize_filename(artist)) + .join(sanitize_filename(album)) + .join(&dest_filename); + + let storage_path = if dest.exists() && !source.exists() { + dest.to_string_lossy().to_string() + } else if source.exists() { + match mover::move_to_storage( + &state.config.storage_dir, artist, album, &dest_filename, source, + ).await { + Ok(p) => p.to_string_lossy().to_string(), + Err(e) => { + tracing::error!(id = %pt.id, ?e, "Failed to move file"); + db::update_pending_status(&state.pool, pt.id, "error", Some(&e.to_string())).await?; + continue; + } + } + } else { + tracing::error!(id = %pt.id, "Source file missing: {:?}", source); + db::update_pending_status(&state.pool, pt.id, "error", Some("Source file missing")).await?; + continue; + }; + + match db::approve_and_finalize(&state.pool, pt.id, &storage_path).await { + Ok(track_id) => tracing::info!(id = %pt.id, track_id, "Track finalized"), + Err(e) => tracing::error!(id = %pt.id, ?e, "Failed to finalize"), + } + } + + count += 1; + } + Err(e) => { + tracing::error!(id = %pt.id, ?e, "LLM normalization failed"); + db::update_pending_status(&state.pool, pt.id, "error", Some(&e.to_string())).await?; + } + } + } + + Ok(count) +} + /// Recursively remove empty directories inside the inbox. /// Does not remove the inbox root itself. async fn cleanup_empty_dirs(dir: &std::path::Path) -> bool { diff --git a/furumi-agent/src/web/admin.html b/furumi-agent/src/web/admin.html index fd0117e..b9238b6 100644 --- a/furumi-agent/src/web/admin.html +++ b/furumi-agent/src/web/admin.html @@ -6,7 +6,6 @@ Furumi Agent — Admin @@ -310,26 +134,40 @@ tr:hover td { background: var(--bg-hover); }

Furumi Agent

+ Idle
+
+
+ + diff --git a/furumi-agent/src/web/api.rs b/furumi-agent/src/web/api.rs index 085792e..34f22d6 100644 --- a/furumi-agent/src/web/api.rs +++ b/furumi-agent/src/web/api.rs @@ -82,22 +82,24 @@ pub async fn approve_queue_item(State(state): State, Path(id): Path) -> format!("{}.{}", sanitize_filename(title), ext) }; - match crate::ingest::mover::move_to_storage( - &state.config.storage_dir, - artist, - album, - &filename, - source, - ) - .await - { - Ok(storage_path) => { - let rel_path = storage_path.to_string_lossy().to_string(); - match db::approve_and_finalize(&state.pool, id, &rel_path).await { - Ok(track_id) => (StatusCode::OK, Json(serde_json::json!({"track_id": track_id}))).into_response(), - Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()), - } + let artist_dir = sanitize_filename(artist); + let album_dir = sanitize_filename(album); + let dest = state.config.storage_dir.join(&artist_dir).join(&album_dir).join(&filename); + + let storage_path = if dest.exists() && !source.exists() { + // File already moved (e.g. auto-approved earlier but DB not finalized) + dest.to_string_lossy().to_string() + } else { + match crate::ingest::mover::move_to_storage( + &state.config.storage_dir, artist, album, &filename, source, + ).await { + Ok(p) => p.to_string_lossy().to_string(), + Err(e) => return error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()), } + }; + + match db::approve_and_finalize(&state.pool, id, &storage_path).await { + Ok(track_id) => (StatusCode::OK, Json(serde_json::json!({"track_id": track_id}))).into_response(), Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()), } } @@ -144,6 +146,98 @@ pub async fn update_queue_item( } } +// --- Retry --- + +pub async fn retry_queue_item(State(state): State, Path(id): Path) -> impl IntoResponse { + match db::update_pending_status(&state.pool, id, "pending", None).await { + Ok(()) => StatusCode::NO_CONTENT.into_response(), + Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()), + } +} + +// --- Batch operations --- + +#[derive(Deserialize)] +pub struct BatchIds { + pub ids: Vec, +} + +pub async fn batch_approve(State(state): State, Json(body): Json) -> impl IntoResponse { + let mut ok = 0u32; + let mut errors = Vec::new(); + for id in &body.ids { + let pt = match db::get_pending(&state.pool, *id).await { + Ok(Some(pt)) => pt, + Ok(None) => { errors.push(format!("{}: not found", id)); continue; } + Err(e) => { errors.push(format!("{}: {}", id, e)); continue; } + }; + + let artist = pt.norm_artist.as_deref().unwrap_or("Unknown Artist"); + let album = pt.norm_album.as_deref().unwrap_or("Unknown Album"); + let title = pt.norm_title.as_deref().unwrap_or("Unknown Title"); + let source = std::path::Path::new(&pt.inbox_path); + let ext = source.extension().and_then(|e| e.to_str()).unwrap_or("flac"); + let track_num = pt.norm_track_number.unwrap_or(0); + + let filename = if track_num > 0 { + format!("{:02} - {}.{}", track_num, sanitize_filename(title), ext) + } else { + format!("{}.{}", sanitize_filename(title), ext) + }; + + let artist_dir = sanitize_filename(artist); + let album_dir = sanitize_filename(album); + let dest = state.config.storage_dir.join(&artist_dir).join(&album_dir).join(&filename); + + let rel_path = if dest.exists() && !source.exists() { + dest.to_string_lossy().to_string() + } else { + match crate::ingest::mover::move_to_storage( + &state.config.storage_dir, artist, album, &filename, source, + ).await { + Ok(p) => p.to_string_lossy().to_string(), + Err(e) => { errors.push(format!("{}: {}", id, e)); continue; } + } + }; + + match db::approve_and_finalize(&state.pool, *id, &rel_path).await { + Ok(_) => ok += 1, + Err(e) => errors.push(format!("{}: {}", id, e)), + } + } + (StatusCode::OK, Json(serde_json::json!({"approved": ok, "errors": errors}))).into_response() +} + +pub async fn batch_reject(State(state): State, Json(body): Json) -> impl IntoResponse { + let mut ok = 0u32; + for id in &body.ids { + if db::update_pending_status(&state.pool, *id, "rejected", None).await.is_ok() { + ok += 1; + } + } + (StatusCode::OK, Json(serde_json::json!({"rejected": ok}))).into_response() +} + +pub async fn batch_retry(State(state): State, Json(body): Json) -> impl IntoResponse { + let mut ok = 0u32; + for id in &body.ids { + if db::update_pending_status(&state.pool, *id, "pending", None).await.is_ok() { + ok += 1; + } + } + (StatusCode::OK, Json(serde_json::json!({"retried": ok}))).into_response() +} + +pub async fn batch_delete(State(state): State, Json(body): Json) -> impl IntoResponse { + let mut ok = 0u32; + for id in &body.ids { + if db::delete_pending(&state.pool, *id).await.unwrap_or(false) { + ok += 1; + } + } + (StatusCode::OK, Json(serde_json::json!({"deleted": ok}))).into_response() +} + // --- Artists --- #[derive(Deserialize)] diff --git a/furumi-agent/src/web/mod.rs b/furumi-agent/src/web/mod.rs index 9fc99c8..e5840cd 100644 --- a/furumi-agent/src/web/mod.rs +++ b/furumi-agent/src/web/mod.rs @@ -21,7 +21,12 @@ pub fn build_router(state: Arc) -> Router { .route("/queue/:id", get(api::get_queue_item).delete(api::delete_queue_item)) .route("/queue/:id/approve", post(api::approve_queue_item)) .route("/queue/:id/reject", post(api::reject_queue_item)) + .route("/queue/:id/retry", post(api::retry_queue_item)) .route("/queue/:id/update", put(api::update_queue_item)) + .route("/queue/batch/approve", post(api::batch_approve)) + .route("/queue/batch/reject", post(api::batch_reject)) + .route("/queue/batch/retry", post(api::batch_retry)) + .route("/queue/batch/delete", post(api::batch_delete)) .route("/artists/search", get(api::search_artists)) .route("/artists", get(api::list_artists)) .route("/artists/:id", put(api::update_artist))