use std::sync::Arc; use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::db; use crate::web::AppState; use crate::ingest::normalize::call_ollama; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct MergeProposal { pub canonical_artist_name: String, pub winner_artist_id: i64, pub album_mappings: Vec, pub notes: String, } #[derive(Debug, Serialize, Deserialize, Clone)] pub struct AlbumMapping { pub source_album_id: i64, pub canonical_name: String, pub merge_into_album_id: Option, } pub async fn propose_merge(state: &Arc, merge_id: Uuid) -> anyhow::Result<()> { db::update_merge_status(&state.pool, merge_id, "processing", None).await?; let merge = db::get_artist_merge(&state.pool, merge_id).await? .ok_or_else(|| anyhow::anyhow!("Merge not found: {}", merge_id))?; let source_ids: Vec = serde_json::from_str(&merge.source_artist_ids) .map_err(|e| anyhow::anyhow!("Invalid source_artist_ids: {}", e))?; let artists_data = db::get_artists_full_data(&state.pool, &source_ids).await?; let user_message = build_merge_message(&artists_data); let response = call_ollama( &state.config.ollama_url, &state.config.ollama_model, &state.merge_prompt, &user_message, state.config.ollama_auth.as_deref(), ).await?; let proposal = parse_merge_response(&response)?; let notes = proposal.notes.clone(); let proposal_json = serde_json::to_string(&proposal)?; db::update_merge_proposal(&state.pool, merge_id, &proposal_json, ¬es).await?; tracing::info!(id = %merge_id, "Merge proposal generated"); Ok(()) } fn build_merge_message(artists: &[db::ArtistFullData]) -> String { let mut msg = String::from("## Artists to merge\n\n"); for artist in artists { msg.push_str(&format!("### Artist ID {}: \"{}\"\n", artist.id, artist.name)); if artist.albums.is_empty() { msg.push_str(" (no albums)\n"); } for album in &artist.albums { let year_str = album.year.map(|y| format!(" ({})", y)).unwrap_or_default(); msg.push_str(&format!(" Album ID {}: \"{}\"{}\n", album.id, album.name, year_str)); for track in &album.tracks { let num = track.track_number.map(|n| format!("{:02}. ", n)).unwrap_or_default(); msg.push_str(&format!(" - {}\"{}\" [track_id={}]\n", num, track.title, track.id)); } } msg.push('\n'); } msg } fn parse_merge_response(response: &str) -> anyhow::Result { let cleaned = response.trim(); let json_str = if cleaned.starts_with("```") { let start = cleaned.find('{').unwrap_or(0); let end = cleaned.rfind('}').map(|i| i + 1).unwrap_or(cleaned.len()); &cleaned[start..end] } else { cleaned }; serde_json::from_str(json_str) .map_err(|e| anyhow::anyhow!("Failed to parse merge LLM response: {} — raw: {}", e, response)) } pub async fn execute_merge(state: &Arc, merge_id: Uuid) -> anyhow::Result<()> { let merge = db::get_artist_merge(&state.pool, merge_id).await? .ok_or_else(|| anyhow::anyhow!("Merge not found"))?; let proposal_str = merge.proposal.ok_or_else(|| anyhow::anyhow!("No proposal to execute"))?; let proposal: MergeProposal = serde_json::from_str(&proposal_str)?; let source_ids: Vec = serde_json::from_str(&merge.source_artist_ids)?; let loser_ids: Vec = source_ids.iter().copied() .filter(|&id| id != proposal.winner_artist_id).collect(); // Execute all DB mutations in a single atomic transaction. // On error the transaction rolls back automatically (dropped without commit). let mut tx = state.pool.begin().await?; if let Err(e) = merge_db(&mut tx, &proposal, &loser_ids).await { // tx is dropped here → auto-rollback return Err(e); } tx.commit().await?; // Move files after commit (best-effort; storage_path updated per file) let tracks = db::get_tracks_with_albums_for_artist(&state.pool, proposal.winner_artist_id).await?; for track in &tracks { let current = std::path::Path::new(&track.storage_path); let filename = match current.file_name() { Some(f) => f.to_string_lossy().to_string(), None => continue, }; let album_name = track.album_name.as_deref().unwrap_or("Unknown Album"); let new_path = state.config.storage_dir .join(sanitize(&proposal.canonical_artist_name)) .join(sanitize(album_name)) .join(&filename); if current != new_path.as_path() { if current.exists() { if let Some(parent) = new_path.parent() { let _ = tokio::fs::create_dir_all(parent).await; } let moved = tokio::fs::rename(current, &new_path).await; if moved.is_err() { if let Ok(_) = tokio::fs::copy(current, &new_path).await { let _ = tokio::fs::remove_file(current).await; } } } db::update_track_storage_path(&state.pool, track.id, &new_path.to_string_lossy()).await?; } } db::update_merge_status(&state.pool, merge_id, "approved", None).await?; tracing::info!(id = %merge_id, "Merge executed successfully"); Ok(()) } /// All DB mutations for a merge, executed inside a single transaction. /// `tx` is a `Transaction<'_, Postgres>` which derefs to `PgConnection`. async fn merge_db( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, proposal: &MergeProposal, loser_ids: &[i64], ) -> anyhow::Result<()> { // 0. Validate proposal — ensure winner and all album IDs belong to source artists let source_ids: Vec = loser_ids.iter().copied() .chain(std::iter::once(proposal.winner_artist_id)) .collect(); // Verify winner_artist_id is one of the source artists if !source_ids.contains(&proposal.winner_artist_id) { anyhow::bail!( "winner_artist_id {} is not among source artists {:?}", proposal.winner_artist_id, source_ids ); } // Build set of valid album IDs (albums that actually belong to source artists) let mut valid_album_ids = std::collections::HashSet::::new(); for &src_id in &source_ids { let rows: Vec<(i64,)> = sqlx::query_as("SELECT id FROM albums WHERE artist_id = $1") .bind(src_id).fetch_all(&mut **tx).await?; for (id,) in rows { valid_album_ids.insert(id); } } // 1. Rename winner artist to canonical name sqlx::query("UPDATE artists SET name = $2 WHERE id = $1") .bind(proposal.winner_artist_id) .bind(&proposal.canonical_artist_name) .execute(&mut **tx).await?; // 2. Process album mappings from the proposal for mapping in &proposal.album_mappings { // Skip albums that don't belong to any source artist (LLM hallucinated IDs) if !valid_album_ids.contains(&mapping.source_album_id) { tracing::warn!( album_id = mapping.source_album_id, "Skipping album mapping: album does not belong to source artists" ); continue; } // Skip if source was already processed (idempotent retry support) let src_exists: (bool,) = sqlx::query_as("SELECT EXISTS(SELECT 1 FROM albums WHERE id = $1)") .bind(mapping.source_album_id) .fetch_one(&mut **tx).await?; if !src_exists.0 { continue; } if let Some(target_id) = mapping.merge_into_album_id { album_merge_into(tx, mapping.source_album_id, target_id).await?; } else { // Rename first sqlx::query("UPDATE albums SET name = $2 WHERE id = $1") .bind(mapping.source_album_id) .bind(&mapping.canonical_name) .execute(&mut **tx).await?; // Check if winner already has an album with this canonical name (excluding self) let conflict: Option<(i64,)> = sqlx::query_as( "SELECT id FROM albums WHERE artist_id = $1 AND name = $2 AND id != $3" ) .bind(proposal.winner_artist_id) .bind(&mapping.canonical_name) .bind(mapping.source_album_id) .fetch_optional(&mut **tx).await?; if let Some((existing_id,)) = conflict { album_merge_into(tx, mapping.source_album_id, existing_id).await?; } else { // Just move to winner artist (only if not already there) sqlx::query( "UPDATE albums SET artist_id = $2 WHERE id = $1 AND artist_id != $2" ) .bind(mapping.source_album_id) .bind(proposal.winner_artist_id) .execute(&mut **tx).await?; } } } // 3. Move all remaining albums from each loser to winner, merging name conflicts for &loser_id in loser_ids { loop { // Fetch one album at a time; loop because merging changes the set let album: Option<(i64, String)> = sqlx::query_as( "SELECT id, name FROM albums WHERE artist_id = $1 LIMIT 1" ) .bind(loser_id) .fetch_optional(&mut **tx).await?; let (album_id, album_name) = match album { Some(a) => a, None => break, }; let conflict: Option<(i64,)> = sqlx::query_as( "SELECT id FROM albums WHERE artist_id = $1 AND name = $2" ) .bind(proposal.winner_artist_id) .bind(&album_name) .fetch_optional(&mut **tx).await?; if let Some((existing_id,)) = conflict { // Merge loser album into winner album album_merge_into(tx, album_id, existing_id).await?; } else { sqlx::query("UPDATE albums SET artist_id = $2 WHERE id = $1") .bind(album_id) .bind(proposal.winner_artist_id) .execute(&mut **tx).await?; } } } // 4. Move track_artists from losers to winner for &loser_id in loser_ids { // Remove winner's entries that would conflict after the update sqlx::query( r#"DELETE FROM track_artists WHERE artist_id = $2 AND (track_id, role) IN ( SELECT track_id, role FROM track_artists WHERE artist_id = $1 )"# ) .bind(loser_id) .bind(proposal.winner_artist_id) .execute(&mut **tx).await?; sqlx::query("UPDATE track_artists SET artist_id = $2 WHERE artist_id = $1") .bind(loser_id) .bind(proposal.winner_artist_id) .execute(&mut **tx).await?; } // 5. Delete loser artists (should be empty of albums/tracks by now) for &loser_id in loser_ids { sqlx::query("DELETE FROM artists WHERE id = $1") .bind(loser_id) .execute(&mut **tx).await?; } Ok(()) } /// Merge source album into target within an open transaction: /// deduplicate by file_hash, move the rest, delete source. async fn album_merge_into( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, source_id: i64, target_id: i64, ) -> anyhow::Result<()> { // Verify target exists let target_ok: (bool,) = sqlx::query_as("SELECT EXISTS(SELECT 1 FROM albums WHERE id = $1)") .bind(target_id) .fetch_one(&mut **tx).await?; if !target_ok.0 { anyhow::bail!("Target album {} does not exist", target_id); } // Delete duplicate tracks from source (same file_hash already in target) let dups: Vec<(i64,)> = sqlx::query_as( r#"SELECT t1.id FROM tracks t1 JOIN tracks t2 ON t1.file_hash = t2.file_hash AND t2.album_id = $2 WHERE t1.album_id = $1"# ) .bind(source_id) .bind(target_id) .fetch_all(&mut **tx).await?; for (dup_id,) in dups { // Retrieve path for later file deletion (non-fatal if missing) let path: Option<(String,)> = sqlx::query_as("SELECT storage_path FROM tracks WHERE id = $1") .bind(dup_id).fetch_optional(&mut **tx).await?; if let Some((p,)) = path { // Schedule physical deletion after commit — store in a side channel; // here we do a best-effort remove outside the tx scope via tokio::spawn. let p = p.clone(); tokio::spawn(async move { let _ = tokio::fs::remove_file(&p).await; }); } sqlx::query("DELETE FROM track_artists WHERE track_id = $1").bind(dup_id).execute(&mut **tx).await?; sqlx::query("DELETE FROM tracks WHERE id = $1").bind(dup_id).execute(&mut **tx).await?; } // Move remaining tracks from source to target sqlx::query("UPDATE tracks SET album_id = $2 WHERE album_id = $1") .bind(source_id) .bind(target_id) .execute(&mut **tx).await?; // Delete the now-empty source album sqlx::query("DELETE FROM albums WHERE id = $1") .bind(source_id) .execute(&mut **tx).await?; Ok(()) } fn sanitize(name: &str) -> String { name.chars() .map(|c| match c { '/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' | '\0' => '_', _ => c, }) .collect::() .trim() .trim_matches('.') .to_owned() }