diff --git a/furumi-agent/src/db.rs b/furumi-agent/src/db.rs index e7686ec..3600c78 100644 --- a/furumi-agent/src/db.rs +++ b/furumi-agent/src/db.rs @@ -690,91 +690,8 @@ pub async fn get_tracks_with_albums_for_artist(pool: &PgPool, artist_id: i64) -> ).bind(artist_id).fetch_all(pool).await } -pub async fn rename_artist(pool: &PgPool, id: i64, new_name: &str) -> Result<(), sqlx::Error> { - sqlx::query("UPDATE artists SET name = $2 WHERE id = $1") - .bind(id).bind(new_name).execute(pool).await?; - Ok(()) -} - -pub async fn delete_artist(pool: &PgPool, id: i64) -> Result<(), sqlx::Error> { - sqlx::query("DELETE FROM artists WHERE id = $1") - .bind(id).execute(pool).await?; - Ok(()) -} - -pub async fn rename_album(pool: &PgPool, id: i64, new_name: &str) -> Result<(), sqlx::Error> { - sqlx::query("UPDATE albums SET name = $2 WHERE id = $1") - .bind(id).bind(new_name).execute(pool).await?; - Ok(()) -} - -pub async fn set_album_artist(pool: &PgPool, album_id: i64, artist_id: i64) -> Result<(), sqlx::Error> { - sqlx::query("UPDATE albums SET artist_id = $2 WHERE id = $1") - .bind(album_id).bind(artist_id).execute(pool).await?; - Ok(()) -} - - -pub async fn move_track_artists(pool: &PgPool, from_artist_id: i64, to_artist_id: i64) -> Result<(), sqlx::Error> { - // Update, but avoid duplicate (track_id, artist_id, role) - delete first any conflicting rows - sqlx::query( - r#"DELETE FROM track_artists - WHERE artist_id = $2 - AND (track_id, role) IN ( - SELECT track_id, role FROM track_artists WHERE artist_id = $1 - )"# - ).bind(from_artist_id).bind(to_artist_id).execute(pool).await?; - sqlx::query("UPDATE track_artists SET artist_id = $2 WHERE artist_id = $1") - .bind(from_artist_id).bind(to_artist_id).execute(pool).await?; - Ok(()) -} - -pub async fn get_duplicate_track_ids_in_albums(pool: &PgPool, source_album_id: i64, target_album_id: i64) -> Result, sqlx::Error> { - let rows: Vec<(i64,)> = sqlx::query_as( - r#"SELECT t1.id FROM tracks t1 - JOIN tracks t2 ON t1.file_hash = t2.file_hash AND t2.album_id = $2 - WHERE t1.album_id = $1"# - ).bind(source_album_id).bind(target_album_id).fetch_all(pool).await?; - Ok(rows.into_iter().map(|(id,)| id).collect()) -} - -pub async fn get_track_storage_path(pool: &PgPool, track_id: i64) -> Result, sqlx::Error> { - let row: Option<(String,)> = sqlx::query_as("SELECT storage_path FROM tracks WHERE id = $1") - .bind(track_id).fetch_optional(pool).await?; - Ok(row.map(|(p,)| p)) -} - -pub async fn delete_track(pool: &PgPool, track_id: i64) -> Result<(), sqlx::Error> { - sqlx::query("DELETE FROM track_artists WHERE track_id = $1").bind(track_id).execute(pool).await?; - sqlx::query("DELETE FROM tracks WHERE id = $1").bind(track_id).execute(pool).await?; - Ok(()) -} - -pub async fn move_tracks_to_album(pool: &PgPool, from_album_id: i64, to_album_id: i64) -> Result<(), sqlx::Error> { - sqlx::query("UPDATE tracks SET album_id = $2 WHERE album_id = $1") - .bind(from_album_id).bind(to_album_id).execute(pool).await?; - Ok(()) -} - -pub async fn delete_album(pool: &PgPool, id: i64) -> Result<(), sqlx::Error> { - sqlx::query("DELETE FROM albums WHERE id = $1") - .bind(id).execute(pool).await?; - Ok(()) -} - pub async fn update_track_storage_path(pool: &PgPool, track_id: i64, new_path: &str) -> Result<(), sqlx::Error> { sqlx::query("UPDATE tracks SET storage_path = $2 WHERE id = $1") .bind(track_id).bind(new_path).execute(pool).await?; Ok(()) } - -pub async fn get_albums_for_artist(pool: &PgPool, artist_id: i64) -> Result, sqlx::Error> { - sqlx::query_as::<_, Album>("SELECT * FROM albums WHERE artist_id = $1 ORDER BY year NULLS LAST, name") - .bind(artist_id).fetch_all(pool).await -} - -pub async fn find_album_by_artist_id_and_name(pool: &PgPool, artist_id: i64, name: &str) -> Result, sqlx::Error> { - let row: Option<(i64,)> = sqlx::query_as("SELECT id FROM albums WHERE artist_id = $1 AND name = $2") - .bind(artist_id).bind(name).fetch_optional(pool).await?; - Ok(row.map(|r| r.0)) -} diff --git a/furumi-agent/src/merge.rs b/furumi-agent/src/merge.rs index bd164b1..eec8105 100644 --- a/furumi-agent/src/merge.rs +++ b/furumi-agent/src/merge.rs @@ -96,49 +96,16 @@ pub async fn execute_merge(state: &Arc, merge_id: Uuid) -> anyhow::Res let loser_ids: Vec = source_ids.iter().copied() .filter(|&id| id != proposal.winner_artist_id).collect(); - // 1. Rename winner artist to canonical name - db::rename_artist(&state.pool, proposal.winner_artist_id, &proposal.canonical_artist_name).await?; - - // 2. Process album mappings - for mapping in &proposal.album_mappings { - if let Some(target_id) = mapping.merge_into_album_id { - merge_albums_into(&state.pool, mapping.source_album_id, target_id).await?; - } else { - // Rename album, then move to winner — merging if winner already has same name - db::rename_album(&state.pool, mapping.source_album_id, &mapping.canonical_name).await?; - if let Some(existing_id) = db::find_album_by_artist_id_and_name( - &state.pool, proposal.winner_artist_id, &mapping.canonical_name, - ).await? { - if existing_id != mapping.source_album_id { - merge_albums_into(&state.pool, mapping.source_album_id, existing_id).await?; - } - // else: source_album already belongs to winner with that name — nothing to do - } else { - db::set_album_artist(&state.pool, mapping.source_album_id, proposal.winner_artist_id).await?; - } - } + // Execute all DB mutations in a single atomic transaction. + // On error the transaction rolls back automatically (dropped without commit). + let mut tx = state.pool.begin().await?; + if let Err(e) = merge_db(&mut tx, &proposal, &loser_ids).await { + // tx is dropped here → auto-rollback + return Err(e); } + tx.commit().await?; - // 3. Move remaining albums from losers to winner, merging name conflicts - for &loser_id in &loser_ids { - let albums = db::get_albums_for_artist(&state.pool, loser_id).await?; - for album in albums { - if let Some(existing_id) = db::find_album_by_artist_id_and_name( - &state.pool, proposal.winner_artist_id, &album.name, - ).await? { - merge_albums_into(&state.pool, album.id, existing_id).await?; - } else { - db::set_album_artist(&state.pool, album.id, proposal.winner_artist_id).await?; - } - } - } - - // 4. Move track_artists from losers to winner - for &loser_id in &loser_ids { - db::move_track_artists(&state.pool, loser_id, proposal.winner_artist_id).await?; - } - - // 5. Move files on disk and update storage paths + // Move files after commit (best-effort; storage_path updated per file) let tracks = db::get_tracks_with_albums_for_artist(&state.pool, proposal.winner_artist_id).await?; for track in &tracks { let current = std::path::Path::new(&track.storage_path); @@ -168,31 +135,180 @@ pub async fn execute_merge(state: &Arc, merge_id: Uuid) -> anyhow::Res } } - // 6. Delete loser artists - for &loser_id in &loser_ids { - db::delete_artist(&state.pool, loser_id).await?; - } - - // 7. Mark approved db::update_merge_status(&state.pool, merge_id, "approved", None).await?; tracing::info!(id = %merge_id, "Merge executed successfully"); Ok(()) } -/// Merge source album into target: deduplicate tracks, move the rest, delete source. -async fn merge_albums_into(pool: &sqlx::PgPool, source_id: i64, target_id: i64) -> anyhow::Result<()> { - let dup_ids = db::get_duplicate_track_ids_in_albums(pool, source_id, target_id).await?; - for dup_id in dup_ids { - if let Ok(Some(path)) = db::get_track_storage_path(pool, dup_id).await { - let p = std::path::Path::new(&path); - if p.exists() { - let _ = tokio::fs::remove_file(p).await; +/// All DB mutations for a merge, executed inside a single transaction. +/// `tx` is a `Transaction<'_, Postgres>` which derefs to `PgConnection`. +async fn merge_db( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + proposal: &MergeProposal, + loser_ids: &[i64], +) -> anyhow::Result<()> { + // 1. Rename winner artist to canonical name + sqlx::query("UPDATE artists SET name = $2 WHERE id = $1") + .bind(proposal.winner_artist_id) + .bind(&proposal.canonical_artist_name) + .execute(&mut **tx).await?; + + // 2. Process album mappings from the proposal + for mapping in &proposal.album_mappings { + // Skip if source was already processed (idempotent retry support) + let src_exists: (bool,) = sqlx::query_as("SELECT EXISTS(SELECT 1 FROM albums WHERE id = $1)") + .bind(mapping.source_album_id) + .fetch_one(&mut **tx).await?; + if !src_exists.0 { continue; } + + if let Some(target_id) = mapping.merge_into_album_id { + album_merge_into(tx, mapping.source_album_id, target_id).await?; + } else { + // Rename first + sqlx::query("UPDATE albums SET name = $2 WHERE id = $1") + .bind(mapping.source_album_id) + .bind(&mapping.canonical_name) + .execute(&mut **tx).await?; + + // Check if winner already has an album with this canonical name (excluding self) + let conflict: Option<(i64,)> = sqlx::query_as( + "SELECT id FROM albums WHERE artist_id = $1 AND name = $2 AND id != $3" + ) + .bind(proposal.winner_artist_id) + .bind(&mapping.canonical_name) + .bind(mapping.source_album_id) + .fetch_optional(&mut **tx).await?; + + if let Some((existing_id,)) = conflict { + album_merge_into(tx, mapping.source_album_id, existing_id).await?; + } else { + // Just move to winner artist (only if not already there) + sqlx::query( + "UPDATE albums SET artist_id = $2 WHERE id = $1 AND artist_id != $2" + ) + .bind(mapping.source_album_id) + .bind(proposal.winner_artist_id) + .execute(&mut **tx).await?; } } - db::delete_track(pool, dup_id).await?; } - db::move_tracks_to_album(pool, source_id, target_id).await?; - db::delete_album(pool, source_id).await?; + + // 3. Move all remaining albums from each loser to winner, merging name conflicts + for &loser_id in loser_ids { + loop { + // Fetch one album at a time; loop because merging changes the set + let album: Option<(i64, String)> = sqlx::query_as( + "SELECT id, name FROM albums WHERE artist_id = $1 LIMIT 1" + ) + .bind(loser_id) + .fetch_optional(&mut **tx).await?; + + let (album_id, album_name) = match album { + Some(a) => a, + None => break, + }; + + let conflict: Option<(i64,)> = sqlx::query_as( + "SELECT id FROM albums WHERE artist_id = $1 AND name = $2" + ) + .bind(proposal.winner_artist_id) + .bind(&album_name) + .fetch_optional(&mut **tx).await?; + + if let Some((existing_id,)) = conflict { + // Merge loser album into winner album + album_merge_into(tx, album_id, existing_id).await?; + } else { + sqlx::query("UPDATE albums SET artist_id = $2 WHERE id = $1") + .bind(album_id) + .bind(proposal.winner_artist_id) + .execute(&mut **tx).await?; + } + } + } + + // 4. Move track_artists from losers to winner + for &loser_id in loser_ids { + // Remove winner's entries that would conflict after the update + sqlx::query( + r#"DELETE FROM track_artists + WHERE artist_id = $2 + AND (track_id, role) IN ( + SELECT track_id, role FROM track_artists WHERE artist_id = $1 + )"# + ) + .bind(loser_id) + .bind(proposal.winner_artist_id) + .execute(&mut **tx).await?; + + sqlx::query("UPDATE track_artists SET artist_id = $2 WHERE artist_id = $1") + .bind(loser_id) + .bind(proposal.winner_artist_id) + .execute(&mut **tx).await?; + } + + // 5. Delete loser artists (should be empty of albums/tracks by now) + for &loser_id in loser_ids { + sqlx::query("DELETE FROM artists WHERE id = $1") + .bind(loser_id) + .execute(&mut **tx).await?; + } + + Ok(()) +} + +/// Merge source album into target within an open transaction: +/// deduplicate by file_hash, move the rest, delete source. +async fn album_merge_into( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + source_id: i64, + target_id: i64, +) -> anyhow::Result<()> { + // Verify target exists + let target_ok: (bool,) = sqlx::query_as("SELECT EXISTS(SELECT 1 FROM albums WHERE id = $1)") + .bind(target_id) + .fetch_one(&mut **tx).await?; + if !target_ok.0 { + anyhow::bail!("Target album {} does not exist", target_id); + } + + // Delete duplicate tracks from source (same file_hash already in target) + let dups: Vec<(i64,)> = sqlx::query_as( + r#"SELECT t1.id FROM tracks t1 + JOIN tracks t2 ON t1.file_hash = t2.file_hash AND t2.album_id = $2 + WHERE t1.album_id = $1"# + ) + .bind(source_id) + .bind(target_id) + .fetch_all(&mut **tx).await?; + + for (dup_id,) in dups { + // Retrieve path for later file deletion (non-fatal if missing) + let path: Option<(String,)> = sqlx::query_as("SELECT storage_path FROM tracks WHERE id = $1") + .bind(dup_id).fetch_optional(&mut **tx).await?; + if let Some((p,)) = path { + // Schedule physical deletion after commit — store in a side channel; + // here we do a best-effort remove outside the tx scope via tokio::spawn. + let p = p.clone(); + tokio::spawn(async move { + let _ = tokio::fs::remove_file(&p).await; + }); + } + sqlx::query("DELETE FROM track_artists WHERE track_id = $1").bind(dup_id).execute(&mut **tx).await?; + sqlx::query("DELETE FROM tracks WHERE id = $1").bind(dup_id).execute(&mut **tx).await?; + } + + // Move remaining tracks from source to target + sqlx::query("UPDATE tracks SET album_id = $2 WHERE album_id = $1") + .bind(source_id) + .bind(target_id) + .execute(&mut **tx).await?; + + // Delete the now-empty source album + sqlx::query("DELETE FROM albums WHERE id = $1") + .bind(source_id) + .execute(&mut **tx).await?; + Ok(()) }