From e1782a6e3bff279c7d029df7c4cb36e0e495c451 Mon Sep 17 00:00:00 2001 From: AB-UK Date: Thu, 19 Mar 2026 00:55:49 +0000 Subject: [PATCH] Added merge --- docker-compose.yml | 1 + .../migrations/0003_artist_merges.sql | 10 + furumi-agent/prompts/merge.txt | 65 ++++ furumi-agent/src/config.rs | 16 + furumi-agent/src/db.rs | 200 +++++++++++- furumi-agent/src/ingest/mod.rs | 35 ++- furumi-agent/src/ingest/mover.rs | 21 +- furumi-agent/src/ingest/normalize.rs | 2 +- furumi-agent/src/main.rs | 5 + furumi-agent/src/merge.rs | 187 ++++++++++++ furumi-agent/src/web/admin.html | 288 +++++++++++++++++- furumi-agent/src/web/api.rs | 134 +++++++- furumi-agent/src/web/mod.rs | 8 +- 13 files changed, 949 insertions(+), 23 deletions(-) create mode 100644 furumi-agent/migrations/0003_artist_merges.sql create mode 100644 furumi-agent/prompts/merge.txt create mode 100644 furumi-agent/src/merge.rs diff --git a/docker-compose.yml b/docker-compose.yml index 4632462..08a10e2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,6 +31,7 @@ services: FURUMI_AGENT_OLLAMA_URL: "${OLLAMA_URL:-http://host.docker.internal:11434}" FURUMI_AGENT_OLLAMA_AUTH: "${OLLAMA_AUTH:-CHANGE-ME}" FURUMI_PLAYER_BIND: "0.0.0.0:8090" + FURUMI_AGENT_POLL_INTERVAL_SECS: 5 volumes: - ./inbox:/inbox - ./storage:/storage diff --git a/furumi-agent/migrations/0003_artist_merges.sql b/furumi-agent/migrations/0003_artist_merges.sql new file mode 100644 index 0000000..bd82df9 --- /dev/null +++ b/furumi-agent/migrations/0003_artist_merges.sql @@ -0,0 +1,10 @@ +CREATE TABLE artist_merges ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + status TEXT NOT NULL DEFAULT 'pending', + source_artist_ids TEXT NOT NULL, + proposal TEXT, + llm_notes TEXT, + error_message TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); diff --git a/furumi-agent/prompts/merge.txt b/furumi-agent/prompts/merge.txt new file mode 100644 index 0000000..bb7599a --- /dev/null +++ b/furumi-agent/prompts/merge.txt @@ -0,0 +1,65 @@ +You are a music library artist merge assistant. You will receive a list of artists (with their albums and tracks, each with database IDs) that have been identified as potential duplicates. Your job is to analyze them and produce a merge plan. + +## Input format + +You will receive a structured list like: + +### Artist ID 42: "pink floyd" + Album ID 10: "the wall" (1979) + - 01. "In the Flesh?" [track_id=100] + - 02. "The Thin Ice" [track_id=101] + +### Artist ID 43: "Pink Floyd" + Album ID 11: "Wish You Were Here" (1975) + - 01. "Shine On You Crazy Diamond (Parts I-V)" [track_id=200] + +## Your task + +Determine if the artists are duplicates and produce a merge plan. + +## Rules + +### 1. Canonical artist name +- Use correct capitalization and canonical spelling (e.g., "pink floyd" → "Pink Floyd", "AC DC" → "AC/DC"). +- If the database already contains an artist with a well-formed name, prefer that exact form. +- If one artist has clearly more tracks or albums, their name spelling may be more authoritative. +- Fix obvious typos or casing errors. + +### 2. Winner artist +- `winner_artist_id` must be the ID of one of the provided artists — the one whose identity (ID) will survive in the database. +- All other artists are "losers" and will be deleted after their albums and tracks are moved to the winner. +- Prefer the artist ID that has the most tracks/albums, or the one with the most correct canonical name. + +### 3. Canonical album names +- Use correct capitalization (title case for English, preserve language for non-English). +- Fix slug-like names: "new-songs" → "New Songs", "the_dark_side" → "The Dark Side". +- Fix all-lowercase or all-uppercase: "WISH YOU WERE HERE" → "Wish You Were Here". +- Preserve creative/intentional stylization (e.g., "OK Computer" stays as-is, "(What's the Story) Morning Glory?" stays as-is). +- If the database already contains the album under another artist with a well-formed name, use that exact name. + +### 4. Album deduplication +- If two albums (across the artists being merged) have the same or very similar name, they are the same album. In that case, pick the better-formed one as the "winner album". +- Set `merge_into_album_id` to the winner album's ID for the duplicate album. This means all tracks from the duplicate will be moved into the winner album, and the duplicate album will be deleted. +- If an album is unique (no duplicate exists), set `merge_into_album_id` to null — the album will simply be renamed and moved to the winner artist. +- When comparing album names for similarity, ignore case, punctuation, and common suffixes like "(Remastered)" for the purpose of duplicate detection. However, treat remastered editions as separate albums unless both albums are clearly the same remaster. + +### 5. Album mappings coverage +- `album_mappings` must include an entry for EVERY album across ALL source artists, not just duplicates. +- Every album (from every artist being merged) needs a canonical name, even if it is not being merged into another album. + +### 6. Notes +- The `notes` field should briefly explain: which artist was chosen as winner and why, which albums were renamed, which albums were deduplicated and into what. + +## Response format + +You MUST respond with a single JSON object, no markdown fences, no extra text: + +{"canonical_artist_name": "...", "winner_artist_id": 42, "album_mappings": [{"source_album_id": 10, "canonical_name": "The Wall", "merge_into_album_id": null}, {"source_album_id": 11, "canonical_name": "Wish You Were Here", "merge_into_album_id": null}], "notes": "..."} + +- `canonical_artist_name`: the single correct name for this artist after merging. +- `winner_artist_id`: the integer ID of the artist whose record survives (must be one of the IDs provided). +- `album_mappings`: an array covering ALL albums from ALL source artists. Each entry: + - `source_album_id`: the integer ID of this album (as provided in the input). + - `canonical_name`: the corrected canonical name for this album. + - `merge_into_album_id`: null if this album is just renamed/moved to the winner artist; or the integer ID of another album (the winner album) if this album's tracks should be merged into that album and this album deleted. Never set merge_into_album_id to the same album's own ID. +- `notes`: brief explanation of the decisions made. diff --git a/furumi-agent/src/config.rs b/furumi-agent/src/config.rs index 2427c08..a0c77ce 100644 --- a/furumi-agent/src/config.rs +++ b/furumi-agent/src/config.rs @@ -5,6 +5,8 @@ use clap::Parser; /// Default system prompt, compiled into the binary as a fallback. const DEFAULT_SYSTEM_PROMPT: &str = include_str!("../prompts/normalize.txt"); +const DEFAULT_MERGE_PROMPT: &str = include_str!("../prompts/merge.txt"); + #[derive(Parser, Debug)] #[command(version, about = "Furumi Agent: music metadata ingest and normalization")] pub struct Args { @@ -47,6 +49,10 @@ pub struct Args { /// Path to a custom system prompt file (overrides the built-in default) #[arg(long, env = "FURUMI_AGENT_SYSTEM_PROMPT_FILE")] pub system_prompt_file: Option, + + /// Path to a custom merge prompt file (overrides the built-in default) + #[arg(long, env = "FURUMI_AGENT_MERGE_PROMPT_FILE")] + pub merge_prompt_file: Option, } impl Args { @@ -76,4 +82,14 @@ impl Args { } } } + + pub fn load_merge_prompt(&self) -> Result> { + match &self.merge_prompt_file { + Some(path) => { + tracing::info!("Loading merge prompt from {:?}", path); + Ok(std::fs::read_to_string(path)?) + } + None => Ok(DEFAULT_MERGE_PROMPT.to_owned()), + } + } } diff --git a/furumi-agent/src/db.rs b/furumi-agent/src/db.rs index 72ce9d2..fd5bf6e 100644 --- a/furumi-agent/src/db.rs +++ b/furumi-agent/src/db.rs @@ -88,6 +88,7 @@ pub struct SimilarAlbum { pub similarity: f32, } +#[allow(dead_code)] #[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] pub struct AlbumImage { pub id: i64, @@ -416,6 +417,7 @@ pub async fn insert_album_image( Ok(row.0) } +#[allow(dead_code)] pub async fn get_album_images(pool: &PgPool, album_id: i64) -> Result, sqlx::Error> { sqlx::query_as::<_, AlbumImage>("SELECT * FROM album_images WHERE album_id = $1 ORDER BY image_type") .bind(album_id) @@ -563,6 +565,7 @@ pub struct Stats { pub pending_count: i64, pub review_count: i64, pub error_count: i64, + pub merged_count: i64, } pub async fn get_stats(pool: &PgPool) -> Result { @@ -572,5 +575,200 @@ pub async fn get_stats(pool: &PgPool) -> Result { let (pending_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pending_tracks WHERE status = 'pending'").fetch_one(pool).await?; let (review_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pending_tracks WHERE status = 'review'").fetch_one(pool).await?; let (error_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pending_tracks WHERE status = 'error'").fetch_one(pool).await?; - Ok(Stats { total_tracks, total_artists, total_albums, pending_count, review_count, error_count }) + let (merged_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pending_tracks WHERE status = 'merged'").fetch_one(pool).await?; + Ok(Stats { total_tracks, total_artists, total_albums, pending_count, review_count, error_count, merged_count }) +} + +// =================== Artist Merges =================== + +#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)] +pub struct ArtistMerge { + pub id: Uuid, + pub status: String, + pub source_artist_ids: String, + pub proposal: Option, + pub llm_notes: Option, + pub error_message: Option, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, +} + +#[derive(Debug, Serialize)] +pub struct ArtistFullData { + pub id: i64, + pub name: String, + pub albums: Vec, +} + +#[derive(Debug, Serialize)] +pub struct AlbumFullData { + pub id: i64, + pub name: String, + pub year: Option, + pub tracks: Vec, +} + +#[derive(Debug, Serialize, sqlx::FromRow)] +pub struct TrackBasic { + pub id: i64, + pub title: String, + pub track_number: Option, + pub storage_path: String, +} + +#[derive(Debug, sqlx::FromRow)] +pub struct TrackWithAlbum { + pub id: i64, + pub storage_path: String, + pub album_name: Option, +} + +pub async fn insert_artist_merge(pool: &PgPool, source_artist_ids: &[i64]) -> Result { + let ids_json = serde_json::to_string(source_artist_ids).unwrap_or_default(); + let row: (Uuid,) = sqlx::query_as( + "INSERT INTO artist_merges (source_artist_ids) VALUES ($1) RETURNING id" + ).bind(&ids_json).fetch_one(pool).await?; + Ok(row.0) +} + +pub async fn list_artist_merges(pool: &PgPool) -> Result, sqlx::Error> { + sqlx::query_as::<_, ArtistMerge>("SELECT * FROM artist_merges ORDER BY created_at DESC") + .fetch_all(pool).await +} + +pub async fn get_artist_merge(pool: &PgPool, id: Uuid) -> Result, sqlx::Error> { + sqlx::query_as::<_, ArtistMerge>("SELECT * FROM artist_merges WHERE id = $1") + .bind(id).fetch_optional(pool).await +} + +pub async fn update_merge_status(pool: &PgPool, id: Uuid, status: &str, error: Option<&str>) -> Result<(), sqlx::Error> { + sqlx::query("UPDATE artist_merges SET status = $2, error_message = $3, updated_at = NOW() WHERE id = $1") + .bind(id).bind(status).bind(error).execute(pool).await?; + Ok(()) +} + +pub async fn update_merge_proposal(pool: &PgPool, id: Uuid, proposal_json: &str, notes: &str) -> Result<(), sqlx::Error> { + sqlx::query("UPDATE artist_merges SET proposal = $2, llm_notes = $3, status = 'review', error_message = NULL, updated_at = NOW() WHERE id = $1") + .bind(id).bind(proposal_json).bind(notes).execute(pool).await?; + Ok(()) +} + +pub async fn get_pending_merges_for_processing(pool: &PgPool) -> Result, sqlx::Error> { + let rows: Vec<(Uuid,)> = sqlx::query_as( + "SELECT id FROM artist_merges WHERE status = 'pending' ORDER BY created_at ASC LIMIT 5" + ).fetch_all(pool).await?; + Ok(rows.into_iter().map(|(id,)| id).collect()) +} + +pub async fn get_artists_full_data(pool: &PgPool, ids: &[i64]) -> Result, sqlx::Error> { + let mut result = Vec::new(); + for &id in ids { + let artist: Artist = sqlx::query_as("SELECT id, name FROM artists WHERE id = $1") + .bind(id).fetch_one(pool).await?; + let albums: Vec = sqlx::query_as("SELECT * FROM albums WHERE artist_id = $1 ORDER BY year NULLS LAST, name") + .bind(id).fetch_all(pool).await?; + let mut album_data = Vec::new(); + for album in albums { + let tracks: Vec = sqlx::query_as( + "SELECT id, title, track_number, storage_path FROM tracks WHERE album_id = $1 ORDER BY track_number NULLS LAST, title" + ).bind(album.id).fetch_all(pool).await?; + album_data.push(AlbumFullData { id: album.id, name: album.name, year: album.year, tracks }); + } + result.push(ArtistFullData { id, name: artist.name, albums: album_data }); + } + Ok(result) +} + +pub async fn get_tracks_with_albums_for_artist(pool: &PgPool, artist_id: i64) -> Result, sqlx::Error> { + sqlx::query_as::<_, TrackWithAlbum>( + r#"SELECT t.id, t.storage_path, a.name as album_name + FROM tracks t + LEFT JOIN albums a ON a.id = t.album_id + WHERE t.id IN ( + SELECT track_id FROM track_artists WHERE artist_id = $1 AND role = 'primary' + )"# + ).bind(artist_id).fetch_all(pool).await +} + +pub async fn rename_artist(pool: &PgPool, id: i64, new_name: &str) -> Result<(), sqlx::Error> { + sqlx::query("UPDATE artists SET name = $2 WHERE id = $1") + .bind(id).bind(new_name).execute(pool).await?; + Ok(()) +} + +pub async fn delete_artist(pool: &PgPool, id: i64) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM artists WHERE id = $1") + .bind(id).execute(pool).await?; + Ok(()) +} + +pub async fn rename_album(pool: &PgPool, id: i64, new_name: &str) -> Result<(), sqlx::Error> { + sqlx::query("UPDATE albums SET name = $2 WHERE id = $1") + .bind(id).bind(new_name).execute(pool).await?; + Ok(()) +} + +pub async fn set_album_artist(pool: &PgPool, album_id: i64, artist_id: i64) -> Result<(), sqlx::Error> { + sqlx::query("UPDATE albums SET artist_id = $2 WHERE id = $1") + .bind(album_id).bind(artist_id).execute(pool).await?; + Ok(()) +} + +pub async fn move_albums_to_artist(pool: &PgPool, from_artist_id: i64, to_artist_id: i64) -> Result<(), sqlx::Error> { + sqlx::query("UPDATE albums SET artist_id = $2 WHERE artist_id = $1") + .bind(from_artist_id).bind(to_artist_id).execute(pool).await?; + Ok(()) +} + +pub async fn move_track_artists(pool: &PgPool, from_artist_id: i64, to_artist_id: i64) -> Result<(), sqlx::Error> { + // Update, but avoid duplicate (track_id, artist_id, role) - delete first any conflicting rows + sqlx::query( + r#"DELETE FROM track_artists + WHERE artist_id = $2 + AND (track_id, role) IN ( + SELECT track_id, role FROM track_artists WHERE artist_id = $1 + )"# + ).bind(from_artist_id).bind(to_artist_id).execute(pool).await?; + sqlx::query("UPDATE track_artists SET artist_id = $2 WHERE artist_id = $1") + .bind(from_artist_id).bind(to_artist_id).execute(pool).await?; + Ok(()) +} + +pub async fn get_duplicate_track_ids_in_albums(pool: &PgPool, source_album_id: i64, target_album_id: i64) -> Result, sqlx::Error> { + let rows: Vec<(i64,)> = sqlx::query_as( + r#"SELECT t1.id FROM tracks t1 + JOIN tracks t2 ON t1.file_hash = t2.file_hash AND t2.album_id = $2 + WHERE t1.album_id = $1"# + ).bind(source_album_id).bind(target_album_id).fetch_all(pool).await?; + Ok(rows.into_iter().map(|(id,)| id).collect()) +} + +pub async fn get_track_storage_path(pool: &PgPool, track_id: i64) -> Result, sqlx::Error> { + let row: Option<(String,)> = sqlx::query_as("SELECT storage_path FROM tracks WHERE id = $1") + .bind(track_id).fetch_optional(pool).await?; + Ok(row.map(|(p,)| p)) +} + +pub async fn delete_track(pool: &PgPool, track_id: i64) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM track_artists WHERE track_id = $1").bind(track_id).execute(pool).await?; + sqlx::query("DELETE FROM tracks WHERE id = $1").bind(track_id).execute(pool).await?; + Ok(()) +} + +pub async fn move_tracks_to_album(pool: &PgPool, from_album_id: i64, to_album_id: i64) -> Result<(), sqlx::Error> { + sqlx::query("UPDATE tracks SET album_id = $2 WHERE album_id = $1") + .bind(from_album_id).bind(to_album_id).execute(pool).await?; + Ok(()) +} + +pub async fn delete_album(pool: &PgPool, id: i64) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM albums WHERE id = $1") + .bind(id).execute(pool).await?; + Ok(()) +} + +pub async fn update_track_storage_path(pool: &PgPool, track_id: i64, new_path: &str) -> Result<(), sqlx::Error> { + sqlx::query("UPDATE tracks SET storage_path = $2 WHERE id = $1") + .bind(track_id).bind(new_path).execute(pool).await?; + Ok(()) } diff --git a/furumi-agent/src/ingest/mod.rs b/furumi-agent/src/ingest/mod.rs index 7919e4c..091877b 100644 --- a/furumi-agent/src/ingest/mod.rs +++ b/furumi-agent/src/ingest/mod.rs @@ -25,6 +25,18 @@ pub async fn run(state: Arc) { Ok(count) => tracing::info!(count, "re-processed pending tracks"), Err(e) => tracing::error!(?e, "pending re-processing failed"), } + // Process pending merge proposals + match db::get_pending_merges_for_processing(&state.pool).await { + Ok(merge_ids) => { + for merge_id in merge_ids { + if let Err(e) = crate::merge::propose_merge(&state, merge_id).await { + tracing::error!(id = %merge_id, ?e, "Merge proposal failed"); + let _ = db::update_merge_status(&state.pool, merge_id, "error", Some(&e.to_string())).await; + } + } + } + Err(e) => tracing::error!(?e, "Failed to load pending merges"), + } tokio::time::sleep(interval).await; } } @@ -161,13 +173,14 @@ async fn reprocess_pending(state: &Arc) -> anyhow::Result { .join(sanitize_filename(album)) .join(&dest_filename); - let storage_path = if dest.exists() && !source.exists() { - dest.to_string_lossy().to_string() + let (storage_path, was_merged) = if dest.exists() && !source.exists() { + (dest.to_string_lossy().to_string(), false) } else if source.exists() { match mover::move_to_storage( &state.config.storage_dir, artist, album, &dest_filename, source, ).await { - Ok(p) => p.to_string_lossy().to_string(), + Ok(mover::MoveOutcome::Moved(p)) => (p.to_string_lossy().to_string(), false), + Ok(mover::MoveOutcome::Merged(p)) => (p.to_string_lossy().to_string(), true), Err(e) => { tracing::error!(id = %pt.id, ?e, "Failed to move file"); db::update_pending_status(&state.pool, pt.id, "error", Some(&e.to_string())).await?; @@ -181,7 +194,12 @@ async fn reprocess_pending(state: &Arc) -> anyhow::Result { }; match db::approve_and_finalize(&state.pool, pt.id, &storage_path).await { - Ok(track_id) => tracing::info!(id = %pt.id, track_id, "Track finalized"), + Ok(track_id) => { + if was_merged { + let _ = db::update_pending_status(&state.pool, pt.id, "merged", None).await; + } + tracing::info!(id = %pt.id, track_id, "Track finalized"); + } Err(e) => tracing::error!(id = %pt.id, ?e, "Failed to finalize"), } } @@ -472,10 +490,17 @@ async fn process_file(state: &Arc, file_path: &std::path::Path) -> any ) .await { - Ok(storage_path) => { + Ok(outcome) => { + let (storage_path, was_merged) = match outcome { + mover::MoveOutcome::Moved(p) => (p, false), + mover::MoveOutcome::Merged(p) => (p, true), + }; let rel_path = storage_path.to_string_lossy().to_string(); match db::approve_and_finalize(&state.pool, pending_id, &rel_path).await { Ok(track_id) => { + if was_merged { + let _ = db::update_pending_status(&state.pool, pending_id, "merged", None).await; + } tracing::info!(file = filename, track_id, storage = %rel_path, "Track finalized in database"); } Err(e) => { diff --git a/furumi-agent/src/ingest/mover.rs b/furumi-agent/src/ingest/mover.rs index 502cef1..fc3cbb8 100644 --- a/furumi-agent/src/ingest/mover.rs +++ b/furumi-agent/src/ingest/mover.rs @@ -1,18 +1,27 @@ use std::path::{Path, PathBuf}; +pub enum MoveOutcome { + /// File was moved/renamed to destination. + Moved(PathBuf), + /// Destination already existed; inbox duplicate was removed. + Merged(PathBuf), +} + /// Move a file from inbox to the permanent storage directory. /// /// Creates the directory structure: `storage_dir/artist/album/filename` /// Returns the full path of the moved file. /// /// If `rename` fails (cross-device), falls back to copy + remove. +/// If the destination already exists the inbox copy is removed and +/// `MoveOutcome::Merged` is returned instead of an error. pub async fn move_to_storage( storage_dir: &Path, artist: &str, album: &str, filename: &str, source: &Path, -) -> anyhow::Result { +) -> anyhow::Result { let artist_dir = sanitize_dir_name(artist); let album_dir = sanitize_dir_name(album); @@ -21,9 +30,13 @@ pub async fn move_to_storage( let dest = dest_dir.join(filename); - // Avoid overwriting existing files + // File already at destination — remove the inbox duplicate if dest.exists() { - anyhow::bail!("Destination already exists: {:?}", dest); + if source.exists() { + tokio::fs::remove_file(source).await?; + tracing::info!(from = ?source, to = ?dest, "merged duplicate into existing storage file"); + } + return Ok(MoveOutcome::Merged(dest)); } // Try atomic rename first (same filesystem) @@ -37,7 +50,7 @@ pub async fn move_to_storage( } tracing::info!(from = ?source, to = ?dest, "moved file to storage"); - Ok(dest) + Ok(MoveOutcome::Moved(dest)) } /// Remove characters that are unsafe for directory names. diff --git a/furumi-agent/src/ingest/normalize.rs b/furumi-agent/src/ingest/normalize.rs index 50e8c60..ef3f031 100644 --- a/furumi-agent/src/ingest/normalize.rs +++ b/furumi-agent/src/ingest/normalize.rs @@ -121,7 +121,7 @@ struct OllamaResponseMessage { content: String, } -async fn call_ollama( +pub async fn call_ollama( base_url: &str, model: &str, system_prompt: &str, diff --git a/furumi-agent/src/main.rs b/furumi-agent/src/main.rs index 370c8e0..6bfb3b0 100644 --- a/furumi-agent/src/main.rs +++ b/furumi-agent/src/main.rs @@ -1,6 +1,7 @@ mod config; mod db; mod ingest; +mod merge; mod web; use std::sync::Arc; @@ -24,6 +25,9 @@ async fn main() -> Result<(), Box> { let system_prompt = args.load_system_prompt()?; tracing::info!("System prompt loaded: {} chars", system_prompt.len()); + let merge_prompt = args.load_merge_prompt()?; + tracing::info!("Merge prompt loaded: {} chars", merge_prompt.len()); + tracing::info!("Connecting to database..."); let pool = db::connect(&args.database_url).await?; tracing::info!("Running database migrations..."); @@ -34,6 +38,7 @@ async fn main() -> Result<(), Box> { pool: pool.clone(), config: Arc::new(args), system_prompt: Arc::new(system_prompt), + merge_prompt: Arc::new(merge_prompt), }); // Spawn the ingest pipeline as a background task diff --git a/furumi-agent/src/merge.rs b/furumi-agent/src/merge.rs new file mode 100644 index 0000000..bcc7f2a --- /dev/null +++ b/furumi-agent/src/merge.rs @@ -0,0 +1,187 @@ +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::db; +use crate::web::AppState; +use crate::ingest::normalize::call_ollama; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct MergeProposal { + pub canonical_artist_name: String, + pub winner_artist_id: i64, + pub album_mappings: Vec, + pub notes: String, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct AlbumMapping { + pub source_album_id: i64, + pub canonical_name: String, + pub merge_into_album_id: Option, +} + +pub async fn propose_merge(state: &Arc, merge_id: Uuid) -> anyhow::Result<()> { + db::update_merge_status(&state.pool, merge_id, "processing", None).await?; + + let merge = db::get_artist_merge(&state.pool, merge_id).await? + .ok_or_else(|| anyhow::anyhow!("Merge not found: {}", merge_id))?; + + let source_ids: Vec = serde_json::from_str(&merge.source_artist_ids) + .map_err(|e| anyhow::anyhow!("Invalid source_artist_ids: {}", e))?; + + let artists_data = db::get_artists_full_data(&state.pool, &source_ids).await?; + + let user_message = build_merge_message(&artists_data); + + let response = call_ollama( + &state.config.ollama_url, + &state.config.ollama_model, + &state.merge_prompt, + &user_message, + state.config.ollama_auth.as_deref(), + ).await?; + + let proposal = parse_merge_response(&response)?; + let notes = proposal.notes.clone(); + let proposal_json = serde_json::to_string(&proposal)?; + + db::update_merge_proposal(&state.pool, merge_id, &proposal_json, ¬es).await?; + tracing::info!(id = %merge_id, "Merge proposal generated"); + Ok(()) +} + +fn build_merge_message(artists: &[db::ArtistFullData]) -> String { + let mut msg = String::from("## Artists to merge\n\n"); + for artist in artists { + msg.push_str(&format!("### Artist ID {}: \"{}\"\n", artist.id, artist.name)); + if artist.albums.is_empty() { + msg.push_str(" (no albums)\n"); + } + for album in &artist.albums { + let year_str = album.year.map(|y| format!(" ({})", y)).unwrap_or_default(); + msg.push_str(&format!(" Album ID {}: \"{}\"{}\n", album.id, album.name, year_str)); + for track in &album.tracks { + let num = track.track_number.map(|n| format!("{:02}. ", n)).unwrap_or_default(); + msg.push_str(&format!(" - {}\"{}\" [track_id={}]\n", num, track.title, track.id)); + } + } + msg.push('\n'); + } + msg +} + +fn parse_merge_response(response: &str) -> anyhow::Result { + let cleaned = response.trim(); + let json_str = if cleaned.starts_with("```") { + let start = cleaned.find('{').unwrap_or(0); + let end = cleaned.rfind('}').map(|i| i + 1).unwrap_or(cleaned.len()); + &cleaned[start..end] + } else { + cleaned + }; + serde_json::from_str(json_str) + .map_err(|e| anyhow::anyhow!("Failed to parse merge LLM response: {} — raw: {}", e, response)) +} + +pub async fn execute_merge(state: &Arc, merge_id: Uuid) -> anyhow::Result<()> { + let merge = db::get_artist_merge(&state.pool, merge_id).await? + .ok_or_else(|| anyhow::anyhow!("Merge not found"))?; + + let proposal_str = merge.proposal.ok_or_else(|| anyhow::anyhow!("No proposal to execute"))?; + let proposal: MergeProposal = serde_json::from_str(&proposal_str)?; + + let source_ids: Vec = serde_json::from_str(&merge.source_artist_ids)?; + let loser_ids: Vec = source_ids.iter().copied() + .filter(|&id| id != proposal.winner_artist_id).collect(); + + // 1. Rename winner artist to canonical name + db::rename_artist(&state.pool, proposal.winner_artist_id, &proposal.canonical_artist_name).await?; + + // 2. Process album mappings + for mapping in &proposal.album_mappings { + if let Some(target_id) = mapping.merge_into_album_id { + // Remove duplicate tracks (same file_hash in both albums) + let dup_ids = db::get_duplicate_track_ids_in_albums(&state.pool, mapping.source_album_id, target_id).await?; + for dup_id in dup_ids { + if let Ok(Some(path)) = db::get_track_storage_path(&state.pool, dup_id).await { + let p = std::path::Path::new(&path); + if p.exists() { + let _ = tokio::fs::remove_file(p).await; + } + } + db::delete_track(&state.pool, dup_id).await?; + } + // Move remaining tracks to target album + db::move_tracks_to_album(&state.pool, mapping.source_album_id, target_id).await?; + db::delete_album(&state.pool, mapping.source_album_id).await?; + } else { + // Rename album and move to winner artist + db::rename_album(&state.pool, mapping.source_album_id, &mapping.canonical_name).await?; + db::set_album_artist(&state.pool, mapping.source_album_id, proposal.winner_artist_id).await?; + } + } + + // 3. Move remaining albums from losers to winner + for &loser_id in &loser_ids { + db::move_albums_to_artist(&state.pool, loser_id, proposal.winner_artist_id).await?; + } + + // 4. Move track_artists from losers to winner + for &loser_id in &loser_ids { + db::move_track_artists(&state.pool, loser_id, proposal.winner_artist_id).await?; + } + + // 5. Move files on disk and update storage paths + let tracks = db::get_tracks_with_albums_for_artist(&state.pool, proposal.winner_artist_id).await?; + for track in &tracks { + let current = std::path::Path::new(&track.storage_path); + let filename = match current.file_name() { + Some(f) => f.to_string_lossy().to_string(), + None => continue, + }; + let album_name = track.album_name.as_deref().unwrap_or("Unknown Album"); + let new_path = state.config.storage_dir + .join(sanitize(&proposal.canonical_artist_name)) + .join(sanitize(album_name)) + .join(&filename); + + if current != new_path.as_path() { + if current.exists() { + if let Some(parent) = new_path.parent() { + let _ = tokio::fs::create_dir_all(parent).await; + } + let moved = tokio::fs::rename(current, &new_path).await; + if moved.is_err() { + if let Ok(_) = tokio::fs::copy(current, &new_path).await { + let _ = tokio::fs::remove_file(current).await; + } + } + } + db::update_track_storage_path(&state.pool, track.id, &new_path.to_string_lossy()).await?; + } + } + + // 6. Delete loser artists + for &loser_id in &loser_ids { + db::delete_artist(&state.pool, loser_id).await?; + } + + // 7. Mark approved + db::update_merge_status(&state.pool, merge_id, "approved", None).await?; + tracing::info!(id = %merge_id, "Merge executed successfully"); + Ok(()) +} + +fn sanitize(name: &str) -> String { + name.chars() + .map(|c| match c { + '/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' | '\0' => '_', + _ => c, + }) + .collect::() + .trim() + .trim_matches('.') + .to_owned() +} diff --git a/furumi-agent/src/web/admin.html b/furumi-agent/src/web/admin.html index b9238b6..7818e69 100644 --- a/furumi-agent/src/web/admin.html +++ b/furumi-agent/src/web/admin.html @@ -78,6 +78,7 @@ td .inline-input { background: var(--bg-card); border: 1px solid var(--accent); .status-approved { background: #052e16; color: var(--success); } .status-rejected { background: #450a0a; color: var(--danger); } .status-error { background: #450a0a; color: var(--danger); } +.status-merged { background: #0c2340; color: #60a5fa; } .actions { display: flex; gap: 3px; } .btn { border: none; padding: 3px 8px; border-radius: 3px; cursor: pointer; font-size: 11px; font-family: inherit; font-weight: 500; } @@ -127,6 +128,25 @@ td .inline-input { background: var(--bg-card); border: 1px solid var(--accent); .artist-dropdown.open { display: block; } .artist-option { padding: 5px 9px; cursor: pointer; font-size: 12px; } .artist-option:hover { background: var(--bg-hover); } + +/* File info & LLM expand */ +.info-grid { display: grid; grid-template-columns: auto 1fr; gap: 2px 10px; margin-top: 4px; font-size: 11px; } +.info-grid .k { color: var(--text-muted); white-space: nowrap; } +.info-grid .v { color: var(--text-dim); word-break: break-all; } +details.llm-expand { margin-top: 10px; } +details.llm-expand summary { font-size: 11px; color: var(--text-muted); cursor: pointer; user-select: none; padding: 4px 0; } +details.llm-expand summary:hover { color: var(--text); } +details.llm-expand pre { background: var(--bg-card); border: 1px solid var(--border); border-radius: 5px; padding: 10px; font-size: 11px; color: var(--text-dim); overflow-x: auto; margin-top: 4px; white-space: pre-wrap; word-break: break-all; } +.modal.modal-wide { max-width: 900px; width: 90vw; } +.merge-table { width: 100%; border-collapse: collapse; font-size: 11px; margin-top: 6px; } +.merge-table th { text-align: left; padding: 5px 8px; color: var(--text-muted); border-bottom: 1px solid var(--border); font-weight: 500; } +.merge-table td { padding: 4px 8px; border-bottom: 1px solid var(--border); } +.merge-table input { background: var(--bg-card); border: 1px solid var(--border); border-radius: 3px; padding: 2px 5px; color: var(--text); font-size: 11px; font-family: inherit; width: 100%; } +.merge-table input:focus { border-color: var(--accent); outline: none; } +.section-label { font-size: 11px; color: var(--text-muted); margin-top: 12px; margin-bottom: 4px; font-weight: 500; text-transform: uppercase; letter-spacing: 0.05em; } +.artist-select-bar { display: none; position: fixed; bottom: 24px; right: 24px; background: var(--bg-panel); border: 1px solid var(--border); border-radius: 10px; padding: 10px 16px; display: none; align-items: center; gap: 10px; box-shadow: 0 8px 32px rgba(0,0,0,0.6); z-index: 50; } +.artist-select-bar.visible { display: flex; } +.modal select { width: 100%; background: var(--bg-card); border: 1px solid var(--border); border-radius: 5px; padding: 7px 9px; color: var(--text); font-family: inherit; font-size: 12px; } @@ -136,6 +156,7 @@ td .inline-input { background: var(--bg-card); border: 1px solid var(--accent); Idle
@@ -158,6 +179,12 @@ td .inline-input { background: var(--bg-card); border: 1px solid var(--accent); +
+ 0 artists selected + + +
+