Added merge
This commit is contained in:
@@ -690,91 +690,8 @@ pub async fn get_tracks_with_albums_for_artist(pool: &PgPool, artist_id: i64) ->
|
|||||||
).bind(artist_id).fetch_all(pool).await
|
).bind(artist_id).fetch_all(pool).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn rename_artist(pool: &PgPool, id: i64, new_name: &str) -> Result<(), sqlx::Error> {
|
|
||||||
sqlx::query("UPDATE artists SET name = $2 WHERE id = $1")
|
|
||||||
.bind(id).bind(new_name).execute(pool).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn delete_artist(pool: &PgPool, id: i64) -> Result<(), sqlx::Error> {
|
|
||||||
sqlx::query("DELETE FROM artists WHERE id = $1")
|
|
||||||
.bind(id).execute(pool).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn rename_album(pool: &PgPool, id: i64, new_name: &str) -> Result<(), sqlx::Error> {
|
|
||||||
sqlx::query("UPDATE albums SET name = $2 WHERE id = $1")
|
|
||||||
.bind(id).bind(new_name).execute(pool).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn set_album_artist(pool: &PgPool, album_id: i64, artist_id: i64) -> Result<(), sqlx::Error> {
|
|
||||||
sqlx::query("UPDATE albums SET artist_id = $2 WHERE id = $1")
|
|
||||||
.bind(album_id).bind(artist_id).execute(pool).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
pub async fn move_track_artists(pool: &PgPool, from_artist_id: i64, to_artist_id: i64) -> Result<(), sqlx::Error> {
|
|
||||||
// Update, but avoid duplicate (track_id, artist_id, role) - delete first any conflicting rows
|
|
||||||
sqlx::query(
|
|
||||||
r#"DELETE FROM track_artists
|
|
||||||
WHERE artist_id = $2
|
|
||||||
AND (track_id, role) IN (
|
|
||||||
SELECT track_id, role FROM track_artists WHERE artist_id = $1
|
|
||||||
)"#
|
|
||||||
).bind(from_artist_id).bind(to_artist_id).execute(pool).await?;
|
|
||||||
sqlx::query("UPDATE track_artists SET artist_id = $2 WHERE artist_id = $1")
|
|
||||||
.bind(from_artist_id).bind(to_artist_id).execute(pool).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get_duplicate_track_ids_in_albums(pool: &PgPool, source_album_id: i64, target_album_id: i64) -> Result<Vec<i64>, sqlx::Error> {
|
|
||||||
let rows: Vec<(i64,)> = sqlx::query_as(
|
|
||||||
r#"SELECT t1.id FROM tracks t1
|
|
||||||
JOIN tracks t2 ON t1.file_hash = t2.file_hash AND t2.album_id = $2
|
|
||||||
WHERE t1.album_id = $1"#
|
|
||||||
).bind(source_album_id).bind(target_album_id).fetch_all(pool).await?;
|
|
||||||
Ok(rows.into_iter().map(|(id,)| id).collect())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn get_track_storage_path(pool: &PgPool, track_id: i64) -> Result<Option<String>, sqlx::Error> {
|
|
||||||
let row: Option<(String,)> = sqlx::query_as("SELECT storage_path FROM tracks WHERE id = $1")
|
|
||||||
.bind(track_id).fetch_optional(pool).await?;
|
|
||||||
Ok(row.map(|(p,)| p))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn delete_track(pool: &PgPool, track_id: i64) -> Result<(), sqlx::Error> {
|
|
||||||
sqlx::query("DELETE FROM track_artists WHERE track_id = $1").bind(track_id).execute(pool).await?;
|
|
||||||
sqlx::query("DELETE FROM tracks WHERE id = $1").bind(track_id).execute(pool).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn move_tracks_to_album(pool: &PgPool, from_album_id: i64, to_album_id: i64) -> Result<(), sqlx::Error> {
|
|
||||||
sqlx::query("UPDATE tracks SET album_id = $2 WHERE album_id = $1")
|
|
||||||
.bind(from_album_id).bind(to_album_id).execute(pool).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn delete_album(pool: &PgPool, id: i64) -> Result<(), sqlx::Error> {
|
|
||||||
sqlx::query("DELETE FROM albums WHERE id = $1")
|
|
||||||
.bind(id).execute(pool).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn update_track_storage_path(pool: &PgPool, track_id: i64, new_path: &str) -> Result<(), sqlx::Error> {
|
pub async fn update_track_storage_path(pool: &PgPool, track_id: i64, new_path: &str) -> Result<(), sqlx::Error> {
|
||||||
sqlx::query("UPDATE tracks SET storage_path = $2 WHERE id = $1")
|
sqlx::query("UPDATE tracks SET storage_path = $2 WHERE id = $1")
|
||||||
.bind(track_id).bind(new_path).execute(pool).await?;
|
.bind(track_id).bind(new_path).execute(pool).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_albums_for_artist(pool: &PgPool, artist_id: i64) -> Result<Vec<Album>, sqlx::Error> {
|
|
||||||
sqlx::query_as::<_, Album>("SELECT * FROM albums WHERE artist_id = $1 ORDER BY year NULLS LAST, name")
|
|
||||||
.bind(artist_id).fetch_all(pool).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn find_album_by_artist_id_and_name(pool: &PgPool, artist_id: i64, name: &str) -> Result<Option<i64>, sqlx::Error> {
|
|
||||||
let row: Option<(i64,)> = sqlx::query_as("SELECT id FROM albums WHERE artist_id = $1 AND name = $2")
|
|
||||||
.bind(artist_id).bind(name).fetch_optional(pool).await?;
|
|
||||||
Ok(row.map(|r| r.0))
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -96,49 +96,16 @@ pub async fn execute_merge(state: &Arc<AppState>, merge_id: Uuid) -> anyhow::Res
|
|||||||
let loser_ids: Vec<i64> = source_ids.iter().copied()
|
let loser_ids: Vec<i64> = source_ids.iter().copied()
|
||||||
.filter(|&id| id != proposal.winner_artist_id).collect();
|
.filter(|&id| id != proposal.winner_artist_id).collect();
|
||||||
|
|
||||||
// 1. Rename winner artist to canonical name
|
// Execute all DB mutations in a single atomic transaction.
|
||||||
db::rename_artist(&state.pool, proposal.winner_artist_id, &proposal.canonical_artist_name).await?;
|
// On error the transaction rolls back automatically (dropped without commit).
|
||||||
|
let mut tx = state.pool.begin().await?;
|
||||||
|
if let Err(e) = merge_db(&mut tx, &proposal, &loser_ids).await {
|
||||||
|
// tx is dropped here → auto-rollback
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
tx.commit().await?;
|
||||||
|
|
||||||
// 2. Process album mappings
|
// Move files after commit (best-effort; storage_path updated per file)
|
||||||
for mapping in &proposal.album_mappings {
|
|
||||||
if let Some(target_id) = mapping.merge_into_album_id {
|
|
||||||
merge_albums_into(&state.pool, mapping.source_album_id, target_id).await?;
|
|
||||||
} else {
|
|
||||||
// Rename album, then move to winner — merging if winner already has same name
|
|
||||||
db::rename_album(&state.pool, mapping.source_album_id, &mapping.canonical_name).await?;
|
|
||||||
if let Some(existing_id) = db::find_album_by_artist_id_and_name(
|
|
||||||
&state.pool, proposal.winner_artist_id, &mapping.canonical_name,
|
|
||||||
).await? {
|
|
||||||
if existing_id != mapping.source_album_id {
|
|
||||||
merge_albums_into(&state.pool, mapping.source_album_id, existing_id).await?;
|
|
||||||
}
|
|
||||||
// else: source_album already belongs to winner with that name — nothing to do
|
|
||||||
} else {
|
|
||||||
db::set_album_artist(&state.pool, mapping.source_album_id, proposal.winner_artist_id).await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Move remaining albums from losers to winner, merging name conflicts
|
|
||||||
for &loser_id in &loser_ids {
|
|
||||||
let albums = db::get_albums_for_artist(&state.pool, loser_id).await?;
|
|
||||||
for album in albums {
|
|
||||||
if let Some(existing_id) = db::find_album_by_artist_id_and_name(
|
|
||||||
&state.pool, proposal.winner_artist_id, &album.name,
|
|
||||||
).await? {
|
|
||||||
merge_albums_into(&state.pool, album.id, existing_id).await?;
|
|
||||||
} else {
|
|
||||||
db::set_album_artist(&state.pool, album.id, proposal.winner_artist_id).await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 4. Move track_artists from losers to winner
|
|
||||||
for &loser_id in &loser_ids {
|
|
||||||
db::move_track_artists(&state.pool, loser_id, proposal.winner_artist_id).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 5. Move files on disk and update storage paths
|
|
||||||
let tracks = db::get_tracks_with_albums_for_artist(&state.pool, proposal.winner_artist_id).await?;
|
let tracks = db::get_tracks_with_albums_for_artist(&state.pool, proposal.winner_artist_id).await?;
|
||||||
for track in &tracks {
|
for track in &tracks {
|
||||||
let current = std::path::Path::new(&track.storage_path);
|
let current = std::path::Path::new(&track.storage_path);
|
||||||
@@ -168,31 +135,180 @@ pub async fn execute_merge(state: &Arc<AppState>, merge_id: Uuid) -> anyhow::Res
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6. Delete loser artists
|
|
||||||
for &loser_id in &loser_ids {
|
|
||||||
db::delete_artist(&state.pool, loser_id).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 7. Mark approved
|
|
||||||
db::update_merge_status(&state.pool, merge_id, "approved", None).await?;
|
db::update_merge_status(&state.pool, merge_id, "approved", None).await?;
|
||||||
tracing::info!(id = %merge_id, "Merge executed successfully");
|
tracing::info!(id = %merge_id, "Merge executed successfully");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Merge source album into target: deduplicate tracks, move the rest, delete source.
|
/// All DB mutations for a merge, executed inside a single transaction.
|
||||||
async fn merge_albums_into(pool: &sqlx::PgPool, source_id: i64, target_id: i64) -> anyhow::Result<()> {
|
/// `tx` is a `Transaction<'_, Postgres>` which derefs to `PgConnection`.
|
||||||
let dup_ids = db::get_duplicate_track_ids_in_albums(pool, source_id, target_id).await?;
|
async fn merge_db(
|
||||||
for dup_id in dup_ids {
|
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
|
||||||
if let Ok(Some(path)) = db::get_track_storage_path(pool, dup_id).await {
|
proposal: &MergeProposal,
|
||||||
let p = std::path::Path::new(&path);
|
loser_ids: &[i64],
|
||||||
if p.exists() {
|
) -> anyhow::Result<()> {
|
||||||
let _ = tokio::fs::remove_file(p).await;
|
// 1. Rename winner artist to canonical name
|
||||||
|
sqlx::query("UPDATE artists SET name = $2 WHERE id = $1")
|
||||||
|
.bind(proposal.winner_artist_id)
|
||||||
|
.bind(&proposal.canonical_artist_name)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
|
||||||
|
// 2. Process album mappings from the proposal
|
||||||
|
for mapping in &proposal.album_mappings {
|
||||||
|
// Skip if source was already processed (idempotent retry support)
|
||||||
|
let src_exists: (bool,) = sqlx::query_as("SELECT EXISTS(SELECT 1 FROM albums WHERE id = $1)")
|
||||||
|
.bind(mapping.source_album_id)
|
||||||
|
.fetch_one(&mut **tx).await?;
|
||||||
|
if !src_exists.0 { continue; }
|
||||||
|
|
||||||
|
if let Some(target_id) = mapping.merge_into_album_id {
|
||||||
|
album_merge_into(tx, mapping.source_album_id, target_id).await?;
|
||||||
|
} else {
|
||||||
|
// Rename first
|
||||||
|
sqlx::query("UPDATE albums SET name = $2 WHERE id = $1")
|
||||||
|
.bind(mapping.source_album_id)
|
||||||
|
.bind(&mapping.canonical_name)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
|
||||||
|
// Check if winner already has an album with this canonical name (excluding self)
|
||||||
|
let conflict: Option<(i64,)> = sqlx::query_as(
|
||||||
|
"SELECT id FROM albums WHERE artist_id = $1 AND name = $2 AND id != $3"
|
||||||
|
)
|
||||||
|
.bind(proposal.winner_artist_id)
|
||||||
|
.bind(&mapping.canonical_name)
|
||||||
|
.bind(mapping.source_album_id)
|
||||||
|
.fetch_optional(&mut **tx).await?;
|
||||||
|
|
||||||
|
if let Some((existing_id,)) = conflict {
|
||||||
|
album_merge_into(tx, mapping.source_album_id, existing_id).await?;
|
||||||
|
} else {
|
||||||
|
// Just move to winner artist (only if not already there)
|
||||||
|
sqlx::query(
|
||||||
|
"UPDATE albums SET artist_id = $2 WHERE id = $1 AND artist_id != $2"
|
||||||
|
)
|
||||||
|
.bind(mapping.source_album_id)
|
||||||
|
.bind(proposal.winner_artist_id)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
db::delete_track(pool, dup_id).await?;
|
|
||||||
}
|
}
|
||||||
db::move_tracks_to_album(pool, source_id, target_id).await?;
|
|
||||||
db::delete_album(pool, source_id).await?;
|
// 3. Move all remaining albums from each loser to winner, merging name conflicts
|
||||||
|
for &loser_id in loser_ids {
|
||||||
|
loop {
|
||||||
|
// Fetch one album at a time; loop because merging changes the set
|
||||||
|
let album: Option<(i64, String)> = sqlx::query_as(
|
||||||
|
"SELECT id, name FROM albums WHERE artist_id = $1 LIMIT 1"
|
||||||
|
)
|
||||||
|
.bind(loser_id)
|
||||||
|
.fetch_optional(&mut **tx).await?;
|
||||||
|
|
||||||
|
let (album_id, album_name) = match album {
|
||||||
|
Some(a) => a,
|
||||||
|
None => break,
|
||||||
|
};
|
||||||
|
|
||||||
|
let conflict: Option<(i64,)> = sqlx::query_as(
|
||||||
|
"SELECT id FROM albums WHERE artist_id = $1 AND name = $2"
|
||||||
|
)
|
||||||
|
.bind(proposal.winner_artist_id)
|
||||||
|
.bind(&album_name)
|
||||||
|
.fetch_optional(&mut **tx).await?;
|
||||||
|
|
||||||
|
if let Some((existing_id,)) = conflict {
|
||||||
|
// Merge loser album into winner album
|
||||||
|
album_merge_into(tx, album_id, existing_id).await?;
|
||||||
|
} else {
|
||||||
|
sqlx::query("UPDATE albums SET artist_id = $2 WHERE id = $1")
|
||||||
|
.bind(album_id)
|
||||||
|
.bind(proposal.winner_artist_id)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Move track_artists from losers to winner
|
||||||
|
for &loser_id in loser_ids {
|
||||||
|
// Remove winner's entries that would conflict after the update
|
||||||
|
sqlx::query(
|
||||||
|
r#"DELETE FROM track_artists
|
||||||
|
WHERE artist_id = $2
|
||||||
|
AND (track_id, role) IN (
|
||||||
|
SELECT track_id, role FROM track_artists WHERE artist_id = $1
|
||||||
|
)"#
|
||||||
|
)
|
||||||
|
.bind(loser_id)
|
||||||
|
.bind(proposal.winner_artist_id)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
|
||||||
|
sqlx::query("UPDATE track_artists SET artist_id = $2 WHERE artist_id = $1")
|
||||||
|
.bind(loser_id)
|
||||||
|
.bind(proposal.winner_artist_id)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. Delete loser artists (should be empty of albums/tracks by now)
|
||||||
|
for &loser_id in loser_ids {
|
||||||
|
sqlx::query("DELETE FROM artists WHERE id = $1")
|
||||||
|
.bind(loser_id)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Merge source album into target within an open transaction:
|
||||||
|
/// deduplicate by file_hash, move the rest, delete source.
|
||||||
|
async fn album_merge_into(
|
||||||
|
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
|
||||||
|
source_id: i64,
|
||||||
|
target_id: i64,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
// Verify target exists
|
||||||
|
let target_ok: (bool,) = sqlx::query_as("SELECT EXISTS(SELECT 1 FROM albums WHERE id = $1)")
|
||||||
|
.bind(target_id)
|
||||||
|
.fetch_one(&mut **tx).await?;
|
||||||
|
if !target_ok.0 {
|
||||||
|
anyhow::bail!("Target album {} does not exist", target_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete duplicate tracks from source (same file_hash already in target)
|
||||||
|
let dups: Vec<(i64,)> = sqlx::query_as(
|
||||||
|
r#"SELECT t1.id FROM tracks t1
|
||||||
|
JOIN tracks t2 ON t1.file_hash = t2.file_hash AND t2.album_id = $2
|
||||||
|
WHERE t1.album_id = $1"#
|
||||||
|
)
|
||||||
|
.bind(source_id)
|
||||||
|
.bind(target_id)
|
||||||
|
.fetch_all(&mut **tx).await?;
|
||||||
|
|
||||||
|
for (dup_id,) in dups {
|
||||||
|
// Retrieve path for later file deletion (non-fatal if missing)
|
||||||
|
let path: Option<(String,)> = sqlx::query_as("SELECT storage_path FROM tracks WHERE id = $1")
|
||||||
|
.bind(dup_id).fetch_optional(&mut **tx).await?;
|
||||||
|
if let Some((p,)) = path {
|
||||||
|
// Schedule physical deletion after commit — store in a side channel;
|
||||||
|
// here we do a best-effort remove outside the tx scope via tokio::spawn.
|
||||||
|
let p = p.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let _ = tokio::fs::remove_file(&p).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
sqlx::query("DELETE FROM track_artists WHERE track_id = $1").bind(dup_id).execute(&mut **tx).await?;
|
||||||
|
sqlx::query("DELETE FROM tracks WHERE id = $1").bind(dup_id).execute(&mut **tx).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Move remaining tracks from source to target
|
||||||
|
sqlx::query("UPDATE tracks SET album_id = $2 WHERE album_id = $1")
|
||||||
|
.bind(source_id)
|
||||||
|
.bind(target_id)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
|
||||||
|
// Delete the now-empty source album
|
||||||
|
sqlx::query("DELETE FROM albums WHERE id = $1")
|
||||||
|
.bind(source_id)
|
||||||
|
.execute(&mut **tx).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user