Added merge
All checks were successful
Publish Metadata Agent Image / build-and-push-image (push) Successful in 1m7s
Publish Web Player Image / build-and-push-image (push) Successful in 1m11s
Publish Server Image / build-and-push-image (push) Successful in 2m14s

This commit is contained in:
2026-03-19 00:55:49 +00:00
parent 4a272f373d
commit e1782a6e3b
13 changed files with 949 additions and 23 deletions

View File

@@ -31,6 +31,7 @@ services:
FURUMI_AGENT_OLLAMA_URL: "${OLLAMA_URL:-http://host.docker.internal:11434}"
FURUMI_AGENT_OLLAMA_AUTH: "${OLLAMA_AUTH:-CHANGE-ME}"
FURUMI_PLAYER_BIND: "0.0.0.0:8090"
FURUMI_AGENT_POLL_INTERVAL_SECS: 5
volumes:
- ./inbox:/inbox
- ./storage:/storage

View File

@@ -0,0 +1,10 @@
CREATE TABLE artist_merges (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
status TEXT NOT NULL DEFAULT 'pending',
source_artist_ids TEXT NOT NULL,
proposal TEXT,
llm_notes TEXT,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

View File

@@ -0,0 +1,65 @@
You are a music library artist merge assistant. You will receive a list of artists (with their albums and tracks, each with database IDs) that have been identified as potential duplicates. Your job is to analyze them and produce a merge plan.
## Input format
You will receive a structured list like:
### Artist ID 42: "pink floyd"
Album ID 10: "the wall" (1979)
- 01. "In the Flesh?" [track_id=100]
- 02. "The Thin Ice" [track_id=101]
### Artist ID 43: "Pink Floyd"
Album ID 11: "Wish You Were Here" (1975)
- 01. "Shine On You Crazy Diamond (Parts I-V)" [track_id=200]
## Your task
Determine if the artists are duplicates and produce a merge plan.
## Rules
### 1. Canonical artist name
- Use correct capitalization and canonical spelling (e.g., "pink floyd" → "Pink Floyd", "AC DC" → "AC/DC").
- If the database already contains an artist with a well-formed name, prefer that exact form.
- If one artist has clearly more tracks or albums, their name spelling may be more authoritative.
- Fix obvious typos or casing errors.
### 2. Winner artist
- `winner_artist_id` must be the ID of one of the provided artists — the one whose identity (ID) will survive in the database.
- All other artists are "losers" and will be deleted after their albums and tracks are moved to the winner.
- Prefer the artist ID that has the most tracks/albums, or the one with the most correct canonical name.
### 3. Canonical album names
- Use correct capitalization (title case for English, preserve language for non-English).
- Fix slug-like names: "new-songs" → "New Songs", "the_dark_side" → "The Dark Side".
- Fix all-lowercase or all-uppercase: "WISH YOU WERE HERE" → "Wish You Were Here".
- Preserve creative/intentional stylization (e.g., "OK Computer" stays as-is, "(What's the Story) Morning Glory?" stays as-is).
- If the database already contains the album under another artist with a well-formed name, use that exact name.
### 4. Album deduplication
- If two albums (across the artists being merged) have the same or very similar name, they are the same album. In that case, pick the better-formed one as the "winner album".
- Set `merge_into_album_id` to the winner album's ID for the duplicate album. This means all tracks from the duplicate will be moved into the winner album, and the duplicate album will be deleted.
- If an album is unique (no duplicate exists), set `merge_into_album_id` to null — the album will simply be renamed and moved to the winner artist.
- When comparing album names for similarity, ignore case, punctuation, and common suffixes like "(Remastered)" for the purpose of duplicate detection. However, treat remastered editions as separate albums unless both albums are clearly the same remaster.
### 5. Album mappings coverage
- `album_mappings` must include an entry for EVERY album across ALL source artists, not just duplicates.
- Every album (from every artist being merged) needs a canonical name, even if it is not being merged into another album.
### 6. Notes
- The `notes` field should briefly explain: which artist was chosen as winner and why, which albums were renamed, which albums were deduplicated and into what.
## Response format
You MUST respond with a single JSON object, no markdown fences, no extra text:
{"canonical_artist_name": "...", "winner_artist_id": 42, "album_mappings": [{"source_album_id": 10, "canonical_name": "The Wall", "merge_into_album_id": null}, {"source_album_id": 11, "canonical_name": "Wish You Were Here", "merge_into_album_id": null}], "notes": "..."}
- `canonical_artist_name`: the single correct name for this artist after merging.
- `winner_artist_id`: the integer ID of the artist whose record survives (must be one of the IDs provided).
- `album_mappings`: an array covering ALL albums from ALL source artists. Each entry:
- `source_album_id`: the integer ID of this album (as provided in the input).
- `canonical_name`: the corrected canonical name for this album.
- `merge_into_album_id`: null if this album is just renamed/moved to the winner artist; or the integer ID of another album (the winner album) if this album's tracks should be merged into that album and this album deleted. Never set merge_into_album_id to the same album's own ID.
- `notes`: brief explanation of the decisions made.

View File

@@ -5,6 +5,8 @@ use clap::Parser;
/// Default system prompt, compiled into the binary as a fallback.
const DEFAULT_SYSTEM_PROMPT: &str = include_str!("../prompts/normalize.txt");
const DEFAULT_MERGE_PROMPT: &str = include_str!("../prompts/merge.txt");
#[derive(Parser, Debug)]
#[command(version, about = "Furumi Agent: music metadata ingest and normalization")]
pub struct Args {
@@ -47,6 +49,10 @@ pub struct Args {
/// Path to a custom system prompt file (overrides the built-in default)
#[arg(long, env = "FURUMI_AGENT_SYSTEM_PROMPT_FILE")]
pub system_prompt_file: Option<PathBuf>,
/// Path to a custom merge prompt file (overrides the built-in default)
#[arg(long, env = "FURUMI_AGENT_MERGE_PROMPT_FILE")]
pub merge_prompt_file: Option<PathBuf>,
}
impl Args {
@@ -76,4 +82,14 @@ impl Args {
}
}
}
pub fn load_merge_prompt(&self) -> Result<String, Box<dyn std::error::Error>> {
match &self.merge_prompt_file {
Some(path) => {
tracing::info!("Loading merge prompt from {:?}", path);
Ok(std::fs::read_to_string(path)?)
}
None => Ok(DEFAULT_MERGE_PROMPT.to_owned()),
}
}
}

View File

@@ -88,6 +88,7 @@ pub struct SimilarAlbum {
pub similarity: f32,
}
#[allow(dead_code)]
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct AlbumImage {
pub id: i64,
@@ -416,6 +417,7 @@ pub async fn insert_album_image(
Ok(row.0)
}
#[allow(dead_code)]
pub async fn get_album_images(pool: &PgPool, album_id: i64) -> Result<Vec<AlbumImage>, sqlx::Error> {
sqlx::query_as::<_, AlbumImage>("SELECT * FROM album_images WHERE album_id = $1 ORDER BY image_type")
.bind(album_id)
@@ -563,6 +565,7 @@ pub struct Stats {
pub pending_count: i64,
pub review_count: i64,
pub error_count: i64,
pub merged_count: i64,
}
pub async fn get_stats(pool: &PgPool) -> Result<Stats, sqlx::Error> {
@@ -572,5 +575,200 @@ pub async fn get_stats(pool: &PgPool) -> Result<Stats, sqlx::Error> {
let (pending_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pending_tracks WHERE status = 'pending'").fetch_one(pool).await?;
let (review_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pending_tracks WHERE status = 'review'").fetch_one(pool).await?;
let (error_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pending_tracks WHERE status = 'error'").fetch_one(pool).await?;
Ok(Stats { total_tracks, total_artists, total_albums, pending_count, review_count, error_count })
let (merged_count,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM pending_tracks WHERE status = 'merged'").fetch_one(pool).await?;
Ok(Stats { total_tracks, total_artists, total_albums, pending_count, review_count, error_count, merged_count })
}
// =================== Artist Merges ===================
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct ArtistMerge {
pub id: Uuid,
pub status: String,
pub source_artist_ids: String,
pub proposal: Option<String>,
pub llm_notes: Option<String>,
pub error_message: Option<String>,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Serialize)]
pub struct ArtistFullData {
pub id: i64,
pub name: String,
pub albums: Vec<AlbumFullData>,
}
#[derive(Debug, Serialize)]
pub struct AlbumFullData {
pub id: i64,
pub name: String,
pub year: Option<i32>,
pub tracks: Vec<TrackBasic>,
}
#[derive(Debug, Serialize, sqlx::FromRow)]
pub struct TrackBasic {
pub id: i64,
pub title: String,
pub track_number: Option<i32>,
pub storage_path: String,
}
#[derive(Debug, sqlx::FromRow)]
pub struct TrackWithAlbum {
pub id: i64,
pub storage_path: String,
pub album_name: Option<String>,
}
pub async fn insert_artist_merge(pool: &PgPool, source_artist_ids: &[i64]) -> Result<Uuid, sqlx::Error> {
let ids_json = serde_json::to_string(source_artist_ids).unwrap_or_default();
let row: (Uuid,) = sqlx::query_as(
"INSERT INTO artist_merges (source_artist_ids) VALUES ($1) RETURNING id"
).bind(&ids_json).fetch_one(pool).await?;
Ok(row.0)
}
pub async fn list_artist_merges(pool: &PgPool) -> Result<Vec<ArtistMerge>, sqlx::Error> {
sqlx::query_as::<_, ArtistMerge>("SELECT * FROM artist_merges ORDER BY created_at DESC")
.fetch_all(pool).await
}
pub async fn get_artist_merge(pool: &PgPool, id: Uuid) -> Result<Option<ArtistMerge>, sqlx::Error> {
sqlx::query_as::<_, ArtistMerge>("SELECT * FROM artist_merges WHERE id = $1")
.bind(id).fetch_optional(pool).await
}
pub async fn update_merge_status(pool: &PgPool, id: Uuid, status: &str, error: Option<&str>) -> Result<(), sqlx::Error> {
sqlx::query("UPDATE artist_merges SET status = $2, error_message = $3, updated_at = NOW() WHERE id = $1")
.bind(id).bind(status).bind(error).execute(pool).await?;
Ok(())
}
pub async fn update_merge_proposal(pool: &PgPool, id: Uuid, proposal_json: &str, notes: &str) -> Result<(), sqlx::Error> {
sqlx::query("UPDATE artist_merges SET proposal = $2, llm_notes = $3, status = 'review', error_message = NULL, updated_at = NOW() WHERE id = $1")
.bind(id).bind(proposal_json).bind(notes).execute(pool).await?;
Ok(())
}
pub async fn get_pending_merges_for_processing(pool: &PgPool) -> Result<Vec<Uuid>, sqlx::Error> {
let rows: Vec<(Uuid,)> = sqlx::query_as(
"SELECT id FROM artist_merges WHERE status = 'pending' ORDER BY created_at ASC LIMIT 5"
).fetch_all(pool).await?;
Ok(rows.into_iter().map(|(id,)| id).collect())
}
pub async fn get_artists_full_data(pool: &PgPool, ids: &[i64]) -> Result<Vec<ArtistFullData>, sqlx::Error> {
let mut result = Vec::new();
for &id in ids {
let artist: Artist = sqlx::query_as("SELECT id, name FROM artists WHERE id = $1")
.bind(id).fetch_one(pool).await?;
let albums: Vec<Album> = sqlx::query_as("SELECT * FROM albums WHERE artist_id = $1 ORDER BY year NULLS LAST, name")
.bind(id).fetch_all(pool).await?;
let mut album_data = Vec::new();
for album in albums {
let tracks: Vec<TrackBasic> = sqlx::query_as(
"SELECT id, title, track_number, storage_path FROM tracks WHERE album_id = $1 ORDER BY track_number NULLS LAST, title"
).bind(album.id).fetch_all(pool).await?;
album_data.push(AlbumFullData { id: album.id, name: album.name, year: album.year, tracks });
}
result.push(ArtistFullData { id, name: artist.name, albums: album_data });
}
Ok(result)
}
pub async fn get_tracks_with_albums_for_artist(pool: &PgPool, artist_id: i64) -> Result<Vec<TrackWithAlbum>, sqlx::Error> {
sqlx::query_as::<_, TrackWithAlbum>(
r#"SELECT t.id, t.storage_path, a.name as album_name
FROM tracks t
LEFT JOIN albums a ON a.id = t.album_id
WHERE t.id IN (
SELECT track_id FROM track_artists WHERE artist_id = $1 AND role = 'primary'
)"#
).bind(artist_id).fetch_all(pool).await
}
pub async fn rename_artist(pool: &PgPool, id: i64, new_name: &str) -> Result<(), sqlx::Error> {
sqlx::query("UPDATE artists SET name = $2 WHERE id = $1")
.bind(id).bind(new_name).execute(pool).await?;
Ok(())
}
pub async fn delete_artist(pool: &PgPool, id: i64) -> Result<(), sqlx::Error> {
sqlx::query("DELETE FROM artists WHERE id = $1")
.bind(id).execute(pool).await?;
Ok(())
}
pub async fn rename_album(pool: &PgPool, id: i64, new_name: &str) -> Result<(), sqlx::Error> {
sqlx::query("UPDATE albums SET name = $2 WHERE id = $1")
.bind(id).bind(new_name).execute(pool).await?;
Ok(())
}
pub async fn set_album_artist(pool: &PgPool, album_id: i64, artist_id: i64) -> Result<(), sqlx::Error> {
sqlx::query("UPDATE albums SET artist_id = $2 WHERE id = $1")
.bind(album_id).bind(artist_id).execute(pool).await?;
Ok(())
}
pub async fn move_albums_to_artist(pool: &PgPool, from_artist_id: i64, to_artist_id: i64) -> Result<(), sqlx::Error> {
sqlx::query("UPDATE albums SET artist_id = $2 WHERE artist_id = $1")
.bind(from_artist_id).bind(to_artist_id).execute(pool).await?;
Ok(())
}
pub async fn move_track_artists(pool: &PgPool, from_artist_id: i64, to_artist_id: i64) -> Result<(), sqlx::Error> {
// Update, but avoid duplicate (track_id, artist_id, role) - delete first any conflicting rows
sqlx::query(
r#"DELETE FROM track_artists
WHERE artist_id = $2
AND (track_id, role) IN (
SELECT track_id, role FROM track_artists WHERE artist_id = $1
)"#
).bind(from_artist_id).bind(to_artist_id).execute(pool).await?;
sqlx::query("UPDATE track_artists SET artist_id = $2 WHERE artist_id = $1")
.bind(from_artist_id).bind(to_artist_id).execute(pool).await?;
Ok(())
}
pub async fn get_duplicate_track_ids_in_albums(pool: &PgPool, source_album_id: i64, target_album_id: i64) -> Result<Vec<i64>, sqlx::Error> {
let rows: Vec<(i64,)> = sqlx::query_as(
r#"SELECT t1.id FROM tracks t1
JOIN tracks t2 ON t1.file_hash = t2.file_hash AND t2.album_id = $2
WHERE t1.album_id = $1"#
).bind(source_album_id).bind(target_album_id).fetch_all(pool).await?;
Ok(rows.into_iter().map(|(id,)| id).collect())
}
pub async fn get_track_storage_path(pool: &PgPool, track_id: i64) -> Result<Option<String>, sqlx::Error> {
let row: Option<(String,)> = sqlx::query_as("SELECT storage_path FROM tracks WHERE id = $1")
.bind(track_id).fetch_optional(pool).await?;
Ok(row.map(|(p,)| p))
}
pub async fn delete_track(pool: &PgPool, track_id: i64) -> Result<(), sqlx::Error> {
sqlx::query("DELETE FROM track_artists WHERE track_id = $1").bind(track_id).execute(pool).await?;
sqlx::query("DELETE FROM tracks WHERE id = $1").bind(track_id).execute(pool).await?;
Ok(())
}
pub async fn move_tracks_to_album(pool: &PgPool, from_album_id: i64, to_album_id: i64) -> Result<(), sqlx::Error> {
sqlx::query("UPDATE tracks SET album_id = $2 WHERE album_id = $1")
.bind(from_album_id).bind(to_album_id).execute(pool).await?;
Ok(())
}
pub async fn delete_album(pool: &PgPool, id: i64) -> Result<(), sqlx::Error> {
sqlx::query("DELETE FROM albums WHERE id = $1")
.bind(id).execute(pool).await?;
Ok(())
}
pub async fn update_track_storage_path(pool: &PgPool, track_id: i64, new_path: &str) -> Result<(), sqlx::Error> {
sqlx::query("UPDATE tracks SET storage_path = $2 WHERE id = $1")
.bind(track_id).bind(new_path).execute(pool).await?;
Ok(())
}

View File

@@ -25,6 +25,18 @@ pub async fn run(state: Arc<AppState>) {
Ok(count) => tracing::info!(count, "re-processed pending tracks"),
Err(e) => tracing::error!(?e, "pending re-processing failed"),
}
// Process pending merge proposals
match db::get_pending_merges_for_processing(&state.pool).await {
Ok(merge_ids) => {
for merge_id in merge_ids {
if let Err(e) = crate::merge::propose_merge(&state, merge_id).await {
tracing::error!(id = %merge_id, ?e, "Merge proposal failed");
let _ = db::update_merge_status(&state.pool, merge_id, "error", Some(&e.to_string())).await;
}
}
}
Err(e) => tracing::error!(?e, "Failed to load pending merges"),
}
tokio::time::sleep(interval).await;
}
}
@@ -161,13 +173,14 @@ async fn reprocess_pending(state: &Arc<AppState>) -> anyhow::Result<usize> {
.join(sanitize_filename(album))
.join(&dest_filename);
let storage_path = if dest.exists() && !source.exists() {
dest.to_string_lossy().to_string()
let (storage_path, was_merged) = if dest.exists() && !source.exists() {
(dest.to_string_lossy().to_string(), false)
} else if source.exists() {
match mover::move_to_storage(
&state.config.storage_dir, artist, album, &dest_filename, source,
).await {
Ok(p) => p.to_string_lossy().to_string(),
Ok(mover::MoveOutcome::Moved(p)) => (p.to_string_lossy().to_string(), false),
Ok(mover::MoveOutcome::Merged(p)) => (p.to_string_lossy().to_string(), true),
Err(e) => {
tracing::error!(id = %pt.id, ?e, "Failed to move file");
db::update_pending_status(&state.pool, pt.id, "error", Some(&e.to_string())).await?;
@@ -181,7 +194,12 @@ async fn reprocess_pending(state: &Arc<AppState>) -> anyhow::Result<usize> {
};
match db::approve_and_finalize(&state.pool, pt.id, &storage_path).await {
Ok(track_id) => tracing::info!(id = %pt.id, track_id, "Track finalized"),
Ok(track_id) => {
if was_merged {
let _ = db::update_pending_status(&state.pool, pt.id, "merged", None).await;
}
tracing::info!(id = %pt.id, track_id, "Track finalized");
}
Err(e) => tracing::error!(id = %pt.id, ?e, "Failed to finalize"),
}
}
@@ -472,10 +490,17 @@ async fn process_file(state: &Arc<AppState>, file_path: &std::path::Path) -> any
)
.await
{
Ok(storage_path) => {
Ok(outcome) => {
let (storage_path, was_merged) = match outcome {
mover::MoveOutcome::Moved(p) => (p, false),
mover::MoveOutcome::Merged(p) => (p, true),
};
let rel_path = storage_path.to_string_lossy().to_string();
match db::approve_and_finalize(&state.pool, pending_id, &rel_path).await {
Ok(track_id) => {
if was_merged {
let _ = db::update_pending_status(&state.pool, pending_id, "merged", None).await;
}
tracing::info!(file = filename, track_id, storage = %rel_path, "Track finalized in database");
}
Err(e) => {

View File

@@ -1,18 +1,27 @@
use std::path::{Path, PathBuf};
pub enum MoveOutcome {
/// File was moved/renamed to destination.
Moved(PathBuf),
/// Destination already existed; inbox duplicate was removed.
Merged(PathBuf),
}
/// Move a file from inbox to the permanent storage directory.
///
/// Creates the directory structure: `storage_dir/artist/album/filename`
/// Returns the full path of the moved file.
///
/// If `rename` fails (cross-device), falls back to copy + remove.
/// If the destination already exists the inbox copy is removed and
/// `MoveOutcome::Merged` is returned instead of an error.
pub async fn move_to_storage(
storage_dir: &Path,
artist: &str,
album: &str,
filename: &str,
source: &Path,
) -> anyhow::Result<PathBuf> {
) -> anyhow::Result<MoveOutcome> {
let artist_dir = sanitize_dir_name(artist);
let album_dir = sanitize_dir_name(album);
@@ -21,9 +30,13 @@ pub async fn move_to_storage(
let dest = dest_dir.join(filename);
// Avoid overwriting existing files
// File already at destination — remove the inbox duplicate
if dest.exists() {
anyhow::bail!("Destination already exists: {:?}", dest);
if source.exists() {
tokio::fs::remove_file(source).await?;
tracing::info!(from = ?source, to = ?dest, "merged duplicate into existing storage file");
}
return Ok(MoveOutcome::Merged(dest));
}
// Try atomic rename first (same filesystem)
@@ -37,7 +50,7 @@ pub async fn move_to_storage(
}
tracing::info!(from = ?source, to = ?dest, "moved file to storage");
Ok(dest)
Ok(MoveOutcome::Moved(dest))
}
/// Remove characters that are unsafe for directory names.

View File

@@ -121,7 +121,7 @@ struct OllamaResponseMessage {
content: String,
}
async fn call_ollama(
pub async fn call_ollama(
base_url: &str,
model: &str,
system_prompt: &str,

View File

@@ -1,6 +1,7 @@
mod config;
mod db;
mod ingest;
mod merge;
mod web;
use std::sync::Arc;
@@ -24,6 +25,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let system_prompt = args.load_system_prompt()?;
tracing::info!("System prompt loaded: {} chars", system_prompt.len());
let merge_prompt = args.load_merge_prompt()?;
tracing::info!("Merge prompt loaded: {} chars", merge_prompt.len());
tracing::info!("Connecting to database...");
let pool = db::connect(&args.database_url).await?;
tracing::info!("Running database migrations...");
@@ -34,6 +38,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
pool: pool.clone(),
config: Arc::new(args),
system_prompt: Arc::new(system_prompt),
merge_prompt: Arc::new(merge_prompt),
});
// Spawn the ingest pipeline as a background task

187
furumi-agent/src/merge.rs Normal file
View File

@@ -0,0 +1,187 @@
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::db;
use crate::web::AppState;
use crate::ingest::normalize::call_ollama;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct MergeProposal {
pub canonical_artist_name: String,
pub winner_artist_id: i64,
pub album_mappings: Vec<AlbumMapping>,
pub notes: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct AlbumMapping {
pub source_album_id: i64,
pub canonical_name: String,
pub merge_into_album_id: Option<i64>,
}
pub async fn propose_merge(state: &Arc<AppState>, merge_id: Uuid) -> anyhow::Result<()> {
db::update_merge_status(&state.pool, merge_id, "processing", None).await?;
let merge = db::get_artist_merge(&state.pool, merge_id).await?
.ok_or_else(|| anyhow::anyhow!("Merge not found: {}", merge_id))?;
let source_ids: Vec<i64> = serde_json::from_str(&merge.source_artist_ids)
.map_err(|e| anyhow::anyhow!("Invalid source_artist_ids: {}", e))?;
let artists_data = db::get_artists_full_data(&state.pool, &source_ids).await?;
let user_message = build_merge_message(&artists_data);
let response = call_ollama(
&state.config.ollama_url,
&state.config.ollama_model,
&state.merge_prompt,
&user_message,
state.config.ollama_auth.as_deref(),
).await?;
let proposal = parse_merge_response(&response)?;
let notes = proposal.notes.clone();
let proposal_json = serde_json::to_string(&proposal)?;
db::update_merge_proposal(&state.pool, merge_id, &proposal_json, &notes).await?;
tracing::info!(id = %merge_id, "Merge proposal generated");
Ok(())
}
fn build_merge_message(artists: &[db::ArtistFullData]) -> String {
let mut msg = String::from("## Artists to merge\n\n");
for artist in artists {
msg.push_str(&format!("### Artist ID {}: \"{}\"\n", artist.id, artist.name));
if artist.albums.is_empty() {
msg.push_str(" (no albums)\n");
}
for album in &artist.albums {
let year_str = album.year.map(|y| format!(" ({})", y)).unwrap_or_default();
msg.push_str(&format!(" Album ID {}: \"{}\"{}\n", album.id, album.name, year_str));
for track in &album.tracks {
let num = track.track_number.map(|n| format!("{:02}. ", n)).unwrap_or_default();
msg.push_str(&format!(" - {}\"{}\" [track_id={}]\n", num, track.title, track.id));
}
}
msg.push('\n');
}
msg
}
fn parse_merge_response(response: &str) -> anyhow::Result<MergeProposal> {
let cleaned = response.trim();
let json_str = if cleaned.starts_with("```") {
let start = cleaned.find('{').unwrap_or(0);
let end = cleaned.rfind('}').map(|i| i + 1).unwrap_or(cleaned.len());
&cleaned[start..end]
} else {
cleaned
};
serde_json::from_str(json_str)
.map_err(|e| anyhow::anyhow!("Failed to parse merge LLM response: {} — raw: {}", e, response))
}
pub async fn execute_merge(state: &Arc<AppState>, merge_id: Uuid) -> anyhow::Result<()> {
let merge = db::get_artist_merge(&state.pool, merge_id).await?
.ok_or_else(|| anyhow::anyhow!("Merge not found"))?;
let proposal_str = merge.proposal.ok_or_else(|| anyhow::anyhow!("No proposal to execute"))?;
let proposal: MergeProposal = serde_json::from_str(&proposal_str)?;
let source_ids: Vec<i64> = serde_json::from_str(&merge.source_artist_ids)?;
let loser_ids: Vec<i64> = source_ids.iter().copied()
.filter(|&id| id != proposal.winner_artist_id).collect();
// 1. Rename winner artist to canonical name
db::rename_artist(&state.pool, proposal.winner_artist_id, &proposal.canonical_artist_name).await?;
// 2. Process album mappings
for mapping in &proposal.album_mappings {
if let Some(target_id) = mapping.merge_into_album_id {
// Remove duplicate tracks (same file_hash in both albums)
let dup_ids = db::get_duplicate_track_ids_in_albums(&state.pool, mapping.source_album_id, target_id).await?;
for dup_id in dup_ids {
if let Ok(Some(path)) = db::get_track_storage_path(&state.pool, dup_id).await {
let p = std::path::Path::new(&path);
if p.exists() {
let _ = tokio::fs::remove_file(p).await;
}
}
db::delete_track(&state.pool, dup_id).await?;
}
// Move remaining tracks to target album
db::move_tracks_to_album(&state.pool, mapping.source_album_id, target_id).await?;
db::delete_album(&state.pool, mapping.source_album_id).await?;
} else {
// Rename album and move to winner artist
db::rename_album(&state.pool, mapping.source_album_id, &mapping.canonical_name).await?;
db::set_album_artist(&state.pool, mapping.source_album_id, proposal.winner_artist_id).await?;
}
}
// 3. Move remaining albums from losers to winner
for &loser_id in &loser_ids {
db::move_albums_to_artist(&state.pool, loser_id, proposal.winner_artist_id).await?;
}
// 4. Move track_artists from losers to winner
for &loser_id in &loser_ids {
db::move_track_artists(&state.pool, loser_id, proposal.winner_artist_id).await?;
}
// 5. Move files on disk and update storage paths
let tracks = db::get_tracks_with_albums_for_artist(&state.pool, proposal.winner_artist_id).await?;
for track in &tracks {
let current = std::path::Path::new(&track.storage_path);
let filename = match current.file_name() {
Some(f) => f.to_string_lossy().to_string(),
None => continue,
};
let album_name = track.album_name.as_deref().unwrap_or("Unknown Album");
let new_path = state.config.storage_dir
.join(sanitize(&proposal.canonical_artist_name))
.join(sanitize(album_name))
.join(&filename);
if current != new_path.as_path() {
if current.exists() {
if let Some(parent) = new_path.parent() {
let _ = tokio::fs::create_dir_all(parent).await;
}
let moved = tokio::fs::rename(current, &new_path).await;
if moved.is_err() {
if let Ok(_) = tokio::fs::copy(current, &new_path).await {
let _ = tokio::fs::remove_file(current).await;
}
}
}
db::update_track_storage_path(&state.pool, track.id, &new_path.to_string_lossy()).await?;
}
}
// 6. Delete loser artists
for &loser_id in &loser_ids {
db::delete_artist(&state.pool, loser_id).await?;
}
// 7. Mark approved
db::update_merge_status(&state.pool, merge_id, "approved", None).await?;
tracing::info!(id = %merge_id, "Merge executed successfully");
Ok(())
}
fn sanitize(name: &str) -> String {
name.chars()
.map(|c| match c {
'/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' | '\0' => '_',
_ => c,
})
.collect::<String>()
.trim()
.trim_matches('.')
.to_owned()
}

View File

@@ -78,6 +78,7 @@ td .inline-input { background: var(--bg-card); border: 1px solid var(--accent);
.status-approved { background: #052e16; color: var(--success); }
.status-rejected { background: #450a0a; color: var(--danger); }
.status-error { background: #450a0a; color: var(--danger); }
.status-merged { background: #0c2340; color: #60a5fa; }
.actions { display: flex; gap: 3px; }
.btn { border: none; padding: 3px 8px; border-radius: 3px; cursor: pointer; font-size: 11px; font-family: inherit; font-weight: 500; }
@@ -127,6 +128,25 @@ td .inline-input { background: var(--bg-card); border: 1px solid var(--accent);
.artist-dropdown.open { display: block; }
.artist-option { padding: 5px 9px; cursor: pointer; font-size: 12px; }
.artist-option:hover { background: var(--bg-hover); }
/* File info & LLM expand */
.info-grid { display: grid; grid-template-columns: auto 1fr; gap: 2px 10px; margin-top: 4px; font-size: 11px; }
.info-grid .k { color: var(--text-muted); white-space: nowrap; }
.info-grid .v { color: var(--text-dim); word-break: break-all; }
details.llm-expand { margin-top: 10px; }
details.llm-expand summary { font-size: 11px; color: var(--text-muted); cursor: pointer; user-select: none; padding: 4px 0; }
details.llm-expand summary:hover { color: var(--text); }
details.llm-expand pre { background: var(--bg-card); border: 1px solid var(--border); border-radius: 5px; padding: 10px; font-size: 11px; color: var(--text-dim); overflow-x: auto; margin-top: 4px; white-space: pre-wrap; word-break: break-all; }
.modal.modal-wide { max-width: 900px; width: 90vw; }
.merge-table { width: 100%; border-collapse: collapse; font-size: 11px; margin-top: 6px; }
.merge-table th { text-align: left; padding: 5px 8px; color: var(--text-muted); border-bottom: 1px solid var(--border); font-weight: 500; }
.merge-table td { padding: 4px 8px; border-bottom: 1px solid var(--border); }
.merge-table input { background: var(--bg-card); border: 1px solid var(--border); border-radius: 3px; padding: 2px 5px; color: var(--text); font-size: 11px; font-family: inherit; width: 100%; }
.merge-table input:focus { border-color: var(--accent); outline: none; }
.section-label { font-size: 11px; color: var(--text-muted); margin-top: 12px; margin-bottom: 4px; font-weight: 500; text-transform: uppercase; letter-spacing: 0.05em; }
.artist-select-bar { display: none; position: fixed; bottom: 24px; right: 24px; background: var(--bg-panel); border: 1px solid var(--border); border-radius: 10px; padding: 10px 16px; display: none; align-items: center; gap: 10px; box-shadow: 0 8px 32px rgba(0,0,0,0.6); z-index: 50; }
.artist-select-bar.visible { display: flex; }
.modal select { width: 100%; background: var(--bg-card); border: 1px solid var(--border); border-radius: 5px; padding: 7px 9px; color: var(--text); font-family: inherit; font-size: 12px; }
</style>
</head>
<body>
@@ -136,6 +156,7 @@ td .inline-input { background: var(--bg-card); border: 1px solid var(--accent);
<nav>
<button class="active" onclick="showTab('queue',this)">Queue</button>
<button onclick="showTab('artists',this)">Artists</button>
<button onclick="showTab('merges',this)">Merges</button>
</nav>
<span class="agent-status idle" id="agentStatus">Idle</span>
<div class="stats" id="statsBar"></div>
@@ -158,6 +179,12 @@ td .inline-input { background: var(--bg-card); border: 1px solid var(--accent);
<div class="modal" id="modal"></div>
</div>
<div class="artist-select-bar" id="artistSelectBar">
<span id="artistSelectCount" style="font-size:13px;font-weight:600;color:var(--accent)">0 artists selected</span>
<button class="btn btn-primary" onclick="mergeSelectedArtists()">Merge Selected</button>
<button class="btn btn-cancel" onclick="clearArtistSelection()">Cancel</button>
</div>
<script>
const _base = location.pathname.replace(/\/+$/, '');
const API = _base + '/api';
@@ -206,6 +233,7 @@ function renderFilterBar(s) {
<button class="filter-btn ${f==='review'?'active':''}" onclick="loadQueue('review')">Review<span class="count">${s.review_count}</span></button>
<button class="filter-btn ${f==='pending'?'active':''}" onclick="loadQueue('pending')">Pending<span class="count">${s.pending_count}</span></button>
<button class="filter-btn ${f==='error'?'active':''}" onclick="loadQueue('error')">Errors<span class="count">${s.error_count}</span></button>
<button class="filter-btn ${f==='merged'?'active':''}" onclick="loadQueue('merged')">Merged<span class="count">${s.merged_count}</span></button>
<button class="filter-btn ${f==='approved'?'active':''}" onclick="loadQueue('approved')">Approved</button>
<button class="filter-btn ${f==='rejected'?'active':''}" onclick="loadQueue('rejected')">Rejected</button>
`;
@@ -218,6 +246,7 @@ function showTab(tab, btn) {
clearSelection();
if (tab === 'queue') { loadQueue(); loadStats(); }
else if (tab === 'artists') { loadArtists(); document.getElementById('filterBar').innerHTML = ''; }
else if (tab === 'merges') { loadMerges(); document.getElementById('filterBar').innerHTML = ''; }
}
// --- Queue ---
@@ -409,6 +438,24 @@ async function editItem(id) {
try { editFeatured = JSON.parse(item.norm_featured_artists); } catch(e) {}
}
// Build LLM JSON from normalized fields
let featuredParsed = [];
try { featuredParsed = item.norm_featured_artists ? JSON.parse(item.norm_featured_artists) : []; } catch(e) {}
const llmJson = {
artist: item.norm_artist,
title: item.norm_title,
album: item.norm_album,
year: item.norm_year,
track_number: item.norm_track_number,
genre: item.norm_genre,
featured_artists: featuredParsed,
confidence: item.confidence,
notes: item.llm_notes,
};
const fmtSize = (b) => b == null ? '-' : b >= 1048576 ? (b/1048576).toFixed(1)+' MB' : (b/1024).toFixed(0)+' KB';
const fmtDur = (s) => { if (s == null) return '-'; const m = Math.floor(s/60); const ss = Math.floor(s%60); return m+':'+(ss<10?'0':'')+ss; };
document.getElementById('modal').innerHTML = `
<h2>Edit Metadata</h2>
<div class="detail-row">
@@ -453,8 +500,19 @@ async function editItem(id) {
oninput="onFeatSearch(this.value)" onkeydown="onFeatKey(event)">
<div class="artist-dropdown" id="feat-dropdown"></div>
</div>
${item.llm_notes ? `<label>Agent Notes</label><div class="raw-value" style="margin-bottom:6px">${esc(item.llm_notes)}</div>` : ''}
${item.error_message ? `<label>Error</label><div class="raw-value" style="color:var(--danger)">${esc(item.error_message)}</div>` : ''}
${item.error_message ? `<label>Error</label><div class="raw-value" style="color:var(--danger);margin-bottom:6px">${esc(item.error_message)}</div>` : ''}
<details class="llm-expand">
<summary>File info &amp; agent response</summary>
<div class="info-grid" style="margin-bottom:8px">
<span class="k">Status</span><span class="v"><span class="status status-${item.status}">${item.status}</span></span>
<span class="k">Confidence</span><span class="v">${item.confidence != null ? item.confidence.toFixed(3) : '-'}</span>
<span class="k">Duration</span><span class="v">${fmtDur(item.duration_secs)}</span>
<span class="k">Size</span><span class="v">${fmtSize(item.file_size)}</span>
<span class="k">Hash</span><span class="v">${esc(item.file_hash ? item.file_hash.slice(0,16)+'…' : '-')}</span>
<span class="k">Inbox path</span><span class="v">${esc(item.inbox_path || '-')}</span>
</div>
<pre>${esc(JSON.stringify(llmJson, null, 2))}</pre>
</details>
<div class="modal-actions">
<button class="btn btn-cancel" onclick="closeModal()">Cancel</button>
<button class="btn btn-primary" onclick="saveEdit('${item.id}')">Save</button>
@@ -532,9 +590,10 @@ async function loadArtists() {
const artists = await api('/artists');
const el = document.getElementById('content');
if (!artists || !artists.length) { el.innerHTML = '<div class="empty">No artists yet</div>'; return; }
let html = '<table><tr><th>ID</th><th>Name</th><th>Actions</th></tr>';
let html = '<table><tr><th style="width:30px"></th><th>ID</th><th>Name</th><th>Actions</th></tr>';
for (const a of artists) {
html += `<tr>
<td><input type="checkbox" class="cb" ${selectedArtists.has(a.id)?'checked':''} onchange="toggleSelectArtist(${a.id},this.checked)"></td>
<td>${a.id}</td>
<td class="editable" ondblclick="inlineEditArtist(this,${a.id})">${esc(a.name)}</td>
<td class="actions">
@@ -592,6 +651,229 @@ function esc(s) {
return String(s).replace(/&/g,'&amp;').replace(/</g,'&lt;').replace(/>/g,'&gt;').replace(/"/g,'&quot;').replace(/'/g,'&#39;');
}
// --- Artist selection for merge ---
let selectedArtists = new Set();
function toggleSelectArtist(id, checked) {
if (checked) selectedArtists.add(id); else selectedArtists.delete(id);
const bar = document.getElementById('artistSelectBar');
if (selectedArtists.size >= 2) {
bar.classList.add('visible');
document.getElementById('artistSelectCount').textContent = selectedArtists.size + ' artists selected';
} else {
bar.classList.remove('visible');
}
}
async function mergeSelectedArtists() {
if (selectedArtists.size < 2) return;
const ids = [...selectedArtists];
const result = await api('/merges', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ artist_ids: ids }),
});
if (result && result.id) {
selectedArtists.clear();
document.getElementById('artistSelectBar').classList.remove('visible');
// Switch to merges tab
const mergesBtn = document.querySelector('nav button:nth-child(3)');
showTab('merges', mergesBtn);
}
}
function clearArtistSelection() {
selectedArtists.clear();
document.getElementById('artistSelectBar').classList.remove('visible');
if (currentTab === 'artists') loadArtists();
}
// --- Merges tab ---
let mergesData = [];
async function loadMerges() {
mergesData = await api('/merges') || [];
renderMerges();
}
function renderMerges() {
const el = document.getElementById('content');
if (!mergesData.length) { el.innerHTML = '<div class="empty">No merge jobs yet. Select artists and click Merge.</div>'; return; }
let html = `<table><tr>
<th style="width:80px">Status</th>
<th>Artists</th>
<th>Notes</th>
<th style="width:120px">Created</th>
<th style="width:80px">Actions</th>
</tr>`;
for (const m of mergesData) {
const ids = JSON.parse(m.source_artist_ids || '[]');
const notes = m.llm_notes ? esc(m.llm_notes.slice(0, 80)) + (m.llm_notes.length > 80 ? '…' : '') : '-';
const date = new Date(m.created_at).toLocaleDateString();
html += `<tr style="cursor:pointer" onclick="openMergeDetail('${m.id}')">
<td><span class="status status-${m.status}">${m.status}</span></td>
<td>${esc('IDs: ' + ids.join(', '))}</td>
<td style="max-width:300px;overflow:hidden;text-overflow:ellipsis;white-space:nowrap">${notes}</td>
<td>${date}</td>
<td class="actions"><button class="btn btn-edit" onclick="event.stopPropagation();openMergeDetail('${m.id}')">View</button></td>
</tr>`;
}
html += '</table>';
el.innerHTML = html;
}
async function openMergeDetail(id) {
const detail = await api(`/merges/${id}`);
if (!detail) return;
renderMergeModal(detail);
openModal();
}
function renderMergeModal(detail) {
const { merge, artists, proposal } = detail;
const artistNames = artists.map(a => `${esc(a.name)} (ID: ${a.id})`).join(', ');
// Build winner dropdown options
const winnerOpts = artists.map(a =>
`<option value="${a.id}" ${proposal && proposal.winner_artist_id === a.id ? 'selected' : ''}>${esc(a.name)} (ID: ${a.id})</option>`
).join('');
// Build album mappings table rows
let albumRows = '';
if (proposal && proposal.album_mappings) {
for (const [i, m] of proposal.album_mappings.entries()) {
// Find source album artist name
let srcArtistName = '?', srcAlbumName = '?';
for (const a of artists) {
const alb = a.albums.find(al => al.id === m.source_album_id);
if (alb) { srcArtistName = a.name; srcAlbumName = alb.name; break; }
}
albumRows += `<tr>
<td>${esc(srcArtistName)}</td>
<td>${esc(srcAlbumName)} (ID:${m.source_album_id})</td>
<td><input data-i="${i}" data-field="canonical_name" value="${esc(m.canonical_name)}" oninput="updateAlbumMapping(this)"></td>
<td><input data-i="${i}" data-field="merge_into_album_id" type="number" placeholder="(keep)" value="${m.merge_into_album_id != null ? m.merge_into_album_id : ''}" oninput="updateAlbumMapping(this)" style="width:80px"></td>
</tr>`;
}
}
// Build tracks table
let trackRows = '';
for (const a of artists) {
for (const alb of a.albums) {
for (const t of alb.tracks) {
const num = t.track_number ? String(t.track_number).padStart(2,'0') + '. ' : '';
trackRows += `<tr>
<td>${esc(a.name)}</td>
<td>${esc(alb.name)}</td>
<td>${num}${esc(t.title)}</td>
</tr>`;
}
}
}
const canEdit = merge.status === 'review';
const canRetry = merge.status === 'error' || merge.status === 'pending';
document.getElementById('modal').className = 'modal modal-wide';
document.getElementById('modal').innerHTML = `
<h2>Artist Merge</h2>
<div class="section-label">Source artists</div>
<div class="raw-value" style="margin-bottom:8px">${artistNames}</div>
<div class="raw-value">Status: <span class="status status-${merge.status}">${merge.status}</span></div>
${merge.error_message ? `<div class="raw-value" style="color:var(--danger);margin-top:4px">${esc(merge.error_message)}</div>` : ''}
${proposal ? `
<div class="section-label">Proposal</div>
<div class="detail-row">
<div class="field">
<label>Canonical artist name</label>
<input id="mg-artist" value="${esc(proposal.canonical_artist_name)}" ${!canEdit?'disabled':''}>
</div>
<div class="field">
<label>Winner artist</label>
<select id="mg-winner" ${!canEdit?'disabled':''}>${winnerOpts}</select>
</div>
</div>
<div class="section-label">Album mappings</div>
<table class="merge-table">
<tr><th>Source artist</th><th>Source album</th><th>Canonical name</th><th>Merge into ID</th></tr>
${albumRows || '<tr><td colspan="4" style="color:var(--text-muted)">No album mappings</td></tr>'}
</table>
<div class="section-label">Tracks</div>
<div style="max-height:200px;overflow-y:auto;border:1px solid var(--border);border-radius:5px">
<table class="merge-table">
<tr><th>Artist</th><th>Album</th><th>Track</th></tr>
${trackRows || '<tr><td colspan="3" style="color:var(--text-muted)">No tracks</td></tr>'}
</table>
</div>
${merge.llm_notes ? `<div class="section-label">LLM Notes</div><div class="raw-value" style="margin-bottom:8px">${esc(merge.llm_notes)}</div>` : ''}
` : `<div class="raw-value" style="margin-top:8px">Waiting for LLM proposal...</div>`}
<div class="modal-actions">
<button class="btn btn-cancel" onclick="closeModal()">Close</button>
${canRetry ? `<button class="btn btn-retry" onclick="retryMerge('${merge.id}')">Retry</button>` : ''}
${canEdit ? `
<button class="btn btn-reject" onclick="rejectMerge('${merge.id}')">Reject</button>
<button class="btn btn-edit" onclick="saveMergeProposal('${merge.id}')">Save Changes</button>
<button class="btn btn-approve" onclick="approveMerge('${merge.id}')">Approve & Execute</button>
` : ''}
</div>
`;
// Store proposal for editing
window._editingProposal = proposal ? JSON.parse(JSON.stringify(proposal)) : null;
window._editingMergeId = merge.id;
}
function updateAlbumMapping(input) {
if (!window._editingProposal) return;
const i = parseInt(input.dataset.i);
const field = input.dataset.field;
if (field === 'merge_into_album_id') {
const v = input.value.trim();
window._editingProposal.album_mappings[i][field] = v ? parseInt(v) : null;
} else {
window._editingProposal.album_mappings[i][field] = input.value;
}
}
async function saveMergeProposal(id) {
if (!window._editingProposal) return;
const artistInput = document.getElementById('mg-artist');
const winnerSelect = document.getElementById('mg-winner');
if (artistInput) window._editingProposal.canonical_artist_name = artistInput.value;
if (winnerSelect) window._editingProposal.winner_artist_id = parseInt(winnerSelect.value);
await api(`/merges/${id}`, {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ proposal: window._editingProposal }),
});
await openMergeDetail(id);
}
async function approveMerge(id) {
if (!confirm('Execute this merge? This will move files and update the database. This cannot be undone.')) return;
await api(`/merges/${id}/approve`, { method: 'POST' });
closeModal();
loadMerges();
}
async function rejectMerge(id) {
await api(`/merges/${id}/reject`, { method: 'POST' });
closeModal();
loadMerges();
}
async function retryMerge(id) {
await api(`/merges/${id}/retry`, { method: 'POST' });
closeModal();
loadMerges();
}
// --- Init ---
loadStats();
loadQueue();

View File

@@ -86,20 +86,27 @@ pub async fn approve_queue_item(State(state): State<S>, Path(id): Path<Uuid>) ->
let album_dir = sanitize_filename(album);
let dest = state.config.storage_dir.join(&artist_dir).join(&album_dir).join(&filename);
let storage_path = if dest.exists() && !source.exists() {
use crate::ingest::mover::MoveOutcome;
let (storage_path, was_merged) = if dest.exists() && !source.exists() {
// File already moved (e.g. auto-approved earlier but DB not finalized)
dest.to_string_lossy().to_string()
(dest.to_string_lossy().to_string(), false)
} else {
match crate::ingest::mover::move_to_storage(
&state.config.storage_dir, artist, album, &filename, source,
).await {
Ok(p) => p.to_string_lossy().to_string(),
Ok(MoveOutcome::Moved(p)) => (p.to_string_lossy().to_string(), false),
Ok(MoveOutcome::Merged(p)) => (p.to_string_lossy().to_string(), true),
Err(e) => return error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
}
};
match db::approve_and_finalize(&state.pool, id, &storage_path).await {
Ok(track_id) => (StatusCode::OK, Json(serde_json::json!({"track_id": track_id}))).into_response(),
Ok(track_id) => {
if was_merged {
let _ = db::update_pending_status(&state.pool, id, "merged", None).await;
}
(StatusCode::OK, Json(serde_json::json!({"track_id": track_id}))).into_response()
}
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
}
}
@@ -189,19 +196,26 @@ pub async fn batch_approve(State(state): State<S>, Json(body): Json<BatchIds>) -
let album_dir = sanitize_filename(album);
let dest = state.config.storage_dir.join(&artist_dir).join(&album_dir).join(&filename);
let rel_path = if dest.exists() && !source.exists() {
dest.to_string_lossy().to_string()
use crate::ingest::mover::MoveOutcome;
let (rel_path, was_merged) = if dest.exists() && !source.exists() {
(dest.to_string_lossy().to_string(), false)
} else {
match crate::ingest::mover::move_to_storage(
&state.config.storage_dir, artist, album, &filename, source,
).await {
Ok(p) => p.to_string_lossy().to_string(),
Ok(MoveOutcome::Moved(p)) => (p.to_string_lossy().to_string(), false),
Ok(MoveOutcome::Merged(p)) => (p.to_string_lossy().to_string(), true),
Err(e) => { errors.push(format!("{}: {}", id, e)); continue; }
}
};
match db::approve_and_finalize(&state.pool, *id, &rel_path).await {
Ok(_) => ok += 1,
Ok(_) => {
if was_merged {
let _ = db::update_pending_status(&state.pool, *id, "merged", None).await;
}
ok += 1;
}
Err(e) => errors.push(format!("{}: {}", id, e)),
}
}
@@ -312,6 +326,110 @@ pub async fn update_album(
}
}
// --- Merges ---
#[derive(Deserialize)]
pub struct CreateMergeBody {
pub artist_ids: Vec<i64>,
}
pub async fn create_merge(State(state): State<S>, Json(body): Json<CreateMergeBody>) -> impl IntoResponse {
if body.artist_ids.len() < 2 {
return error_response(StatusCode::BAD_REQUEST, "need at least 2 artists to merge");
}
match db::insert_artist_merge(&state.pool, &body.artist_ids).await {
Ok(id) => (StatusCode::OK, Json(serde_json::json!({"id": id}))).into_response(),
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
}
}
pub async fn list_merges(State(state): State<S>) -> impl IntoResponse {
match db::list_artist_merges(&state.pool).await {
Ok(items) => (StatusCode::OK, Json(serde_json::to_value(items).unwrap())).into_response(),
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
}
}
pub async fn get_merge(State(state): State<S>, Path(id): Path<Uuid>) -> impl IntoResponse {
let merge = match db::get_artist_merge(&state.pool, id).await {
Ok(Some(m)) => m,
Ok(None) => return error_response(StatusCode::NOT_FOUND, "not found"),
Err(e) => return error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
};
let source_ids: Vec<i64> = serde_json::from_str(&merge.source_artist_ids).unwrap_or_default();
let artists = match db::get_artists_full_data(&state.pool, &source_ids).await {
Ok(a) => a,
Err(e) => return error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
};
let proposal: Option<serde_json::Value> = merge.proposal.as_deref()
.and_then(|p| serde_json::from_str(p).ok());
(StatusCode::OK, Json(serde_json::json!({
"merge": {
"id": merge.id,
"status": merge.status,
"source_artist_ids": source_ids,
"llm_notes": merge.llm_notes,
"error_message": merge.error_message,
"created_at": merge.created_at,
"updated_at": merge.updated_at,
},
"artists": artists,
"proposal": proposal,
}))).into_response()
}
#[derive(Deserialize)]
pub struct UpdateMergeBody {
pub proposal: serde_json::Value,
}
pub async fn update_merge(
State(state): State<S>,
Path(id): Path<Uuid>,
Json(body): Json<UpdateMergeBody>,
) -> impl IntoResponse {
let notes = body.proposal.get("notes")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_owned();
let proposal_json = match serde_json::to_string(&body.proposal) {
Ok(s) => s,
Err(e) => return error_response(StatusCode::BAD_REQUEST, &e.to_string()),
};
match db::update_merge_proposal(&state.pool, id, &proposal_json, &notes).await {
Ok(()) => StatusCode::NO_CONTENT.into_response(),
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
}
}
pub async fn approve_merge(State(state): State<S>, Path(id): Path<Uuid>) -> impl IntoResponse {
match crate::merge::execute_merge(&state, id).await {
Ok(()) => StatusCode::NO_CONTENT.into_response(),
Err(e) => {
let msg = e.to_string();
let _ = db::update_merge_status(&state.pool, id, "error", Some(&msg)).await;
error_response(StatusCode::INTERNAL_SERVER_ERROR, &msg)
}
}
}
pub async fn reject_merge(State(state): State<S>, Path(id): Path<Uuid>) -> impl IntoResponse {
match db::update_merge_status(&state.pool, id, "rejected", None).await {
Ok(()) => StatusCode::NO_CONTENT.into_response(),
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
}
}
pub async fn retry_merge(State(state): State<S>, Path(id): Path<Uuid>) -> impl IntoResponse {
match db::update_merge_status(&state.pool, id, "pending", None).await {
Ok(()) => StatusCode::NO_CONTENT.into_response(),
Err(e) => error_response(StatusCode::INTERNAL_SERVER_ERROR, &e.to_string()),
}
}
// --- Helpers ---
fn error_response(status: StatusCode, message: &str) -> axum::response::Response {

View File

@@ -12,6 +12,7 @@ pub struct AppState {
pub pool: PgPool,
pub config: Arc<Args>,
pub system_prompt: Arc<String>,
pub merge_prompt: Arc<String>,
}
pub fn build_router(state: Arc<AppState>) -> Router {
@@ -31,7 +32,12 @@ pub fn build_router(state: Arc<AppState>) -> Router {
.route("/artists", get(api::list_artists))
.route("/artists/:id", put(api::update_artist))
.route("/artists/:id/albums", get(api::list_albums))
.route("/albums/:id", put(api::update_album));
.route("/albums/:id", put(api::update_album))
.route("/merges", get(api::list_merges).post(api::create_merge))
.route("/merges/:id", get(api::get_merge).put(api::update_merge))
.route("/merges/:id/approve", post(api::approve_merge))
.route("/merges/:id/reject", post(api::reject_merge))
.route("/merges/:id/retry", post(api::retry_merge));
Router::new()
.route("/", get(admin_html))