Auto-merge: when ingest pipeline detects "source file missing", now checks if the track already exists in the library by file_hash. If so, marks the pending entry as 'merged' instead of 'error' — avoiding stale error entries for files that were already successfully ingested in a previous run. Prompts: replaced Pink Floyd/The Wall/Have a Cigar examples in both normalize.txt and merge.txt with Deep Purple examples. The LLM was using these famous artist/album/track names as fallback output when raw metadata was empty or ambiguous, causing hallucinated metadata like "artist: Pink Floyd, title: Have a Cigar" for completely unrelated tracks. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
756 lines
29 KiB
Rust
756 lines
29 KiB
Rust
pub mod metadata;
|
|
pub mod normalize;
|
|
pub mod path_hints;
|
|
pub mod mover;
|
|
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use crate::db;
|
|
use crate::web::AppState;
|
|
|
|
pub async fn run(state: Arc<AppState>) {
|
|
let interval = Duration::from_secs(state.config.poll_interval_secs);
|
|
tracing::info!("Ingest loop started, polling every {}s: {:?}", state.config.poll_interval_secs, state.config.inbox_dir);
|
|
|
|
loop {
|
|
match scan_inbox(&state).await {
|
|
Ok(0) => {}
|
|
Ok(count) => tracing::info!(count, "processed new files"),
|
|
Err(e) => tracing::error!(?e, "inbox scan failed"),
|
|
}
|
|
// Re-process pending tracks (e.g. retried from admin UI)
|
|
match reprocess_pending(&state).await {
|
|
Ok(0) => {}
|
|
Ok(count) => tracing::info!(count, "re-processed pending tracks"),
|
|
Err(e) => tracing::error!(?e, "pending re-processing failed"),
|
|
}
|
|
// Process pending merge proposals
|
|
match db::get_pending_merges_for_processing(&state.pool).await {
|
|
Ok(merge_ids) => {
|
|
for merge_id in merge_ids {
|
|
if let Err(e) = crate::merge::propose_merge(&state, merge_id).await {
|
|
tracing::error!(id = %merge_id, ?e, "Merge proposal failed");
|
|
let _ = db::update_merge_status(&state.pool, merge_id, "error", Some(&e.to_string())).await;
|
|
}
|
|
}
|
|
}
|
|
Err(e) => tracing::error!(?e, "Failed to load pending merges"),
|
|
}
|
|
tokio::time::sleep(interval).await;
|
|
}
|
|
}
|
|
|
|
async fn scan_inbox(state: &Arc<AppState>) -> anyhow::Result<usize> {
|
|
let mut count = 0;
|
|
let mut audio_files = Vec::new();
|
|
let mut image_files = Vec::new();
|
|
collect_files(&state.config.inbox_dir, &mut audio_files, &mut image_files).await?;
|
|
|
|
if !audio_files.is_empty() || !image_files.is_empty() {
|
|
tracing::info!("Scan found {} audio file(s) and {} image(s) in inbox", audio_files.len(), image_files.len());
|
|
}
|
|
|
|
for file_path in &audio_files {
|
|
match process_file(state, file_path).await {
|
|
Ok(true) => count += 1,
|
|
Ok(false) => tracing::debug!(path = ?file_path, "skipped (already known)"),
|
|
Err(e) => tracing::warn!(?e, path = ?file_path, "failed to process file"),
|
|
}
|
|
}
|
|
|
|
// Process cover images after audio (so albums exist in DB)
|
|
for image_path in &image_files {
|
|
match process_cover_image(state, image_path).await {
|
|
Ok(true) => {
|
|
tracing::info!(path = ?image_path, "Cover image processed");
|
|
count += 1;
|
|
}
|
|
Ok(false) => tracing::debug!(path = ?image_path, "cover image skipped"),
|
|
Err(e) => tracing::warn!(?e, path = ?image_path, "failed to process cover image"),
|
|
}
|
|
}
|
|
|
|
// Clean up empty directories in inbox
|
|
if count > 0 {
|
|
cleanup_empty_dirs(&state.config.inbox_dir).await;
|
|
}
|
|
|
|
Ok(count)
|
|
}
|
|
|
|
/// Re-process pending tracks from DB (e.g. tracks retried via admin UI).
|
|
/// These already have raw metadata and path hints stored — just need RAG + LLM.
|
|
async fn reprocess_pending(state: &Arc<AppState>) -> anyhow::Result<usize> {
|
|
let pending = db::list_pending_for_processing(&state.pool, 10).await?;
|
|
if pending.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
|
|
let mut count = 0;
|
|
for pt in &pending {
|
|
tracing::info!(id = %pt.id, title = pt.raw_title.as_deref().unwrap_or("?"), "Re-processing pending track");
|
|
|
|
db::update_pending_status(&state.pool, pt.id, "processing", None).await?;
|
|
|
|
// Build raw metadata and hints from stored DB fields
|
|
let raw_meta = metadata::RawMetadata {
|
|
title: pt.raw_title.clone(),
|
|
artist: pt.raw_artist.clone(),
|
|
album: pt.raw_album.clone(),
|
|
track_number: pt.raw_track_number.map(|n| n as u32),
|
|
year: pt.raw_year.map(|n| n as u32),
|
|
genre: pt.raw_genre.clone(),
|
|
duration_secs: pt.duration_secs,
|
|
};
|
|
|
|
let hints = db::PathHints {
|
|
title: pt.path_title.clone(),
|
|
artist: pt.path_artist.clone(),
|
|
album: pt.path_album.clone(),
|
|
year: pt.path_year,
|
|
track_number: pt.path_track_number,
|
|
};
|
|
|
|
// RAG lookup
|
|
let artist_query = raw_meta.artist.as_deref()
|
|
.or(hints.artist.as_deref())
|
|
.unwrap_or("");
|
|
let album_query = raw_meta.album.as_deref()
|
|
.or(hints.album.as_deref())
|
|
.unwrap_or("");
|
|
|
|
let similar_artists = if !artist_query.is_empty() {
|
|
db::find_similar_artists(&state.pool, artist_query, 5).await.unwrap_or_default()
|
|
} else {
|
|
Vec::new()
|
|
};
|
|
|
|
let similar_albums = if !album_query.is_empty() {
|
|
db::find_similar_albums(&state.pool, album_query, 5).await.unwrap_or_default()
|
|
} else {
|
|
Vec::new()
|
|
};
|
|
|
|
// LLM normalization (no folder context available for reprocessing from DB)
|
|
match normalize::normalize(state, &raw_meta, &hints, &similar_artists, &similar_albums, None).await {
|
|
Ok(normalized) => {
|
|
let confidence = normalized.confidence.unwrap_or(0.0);
|
|
let status = if confidence >= state.config.confidence_threshold {
|
|
"approved"
|
|
} else {
|
|
"review"
|
|
};
|
|
|
|
tracing::info!(
|
|
id = %pt.id,
|
|
norm_artist = normalized.artist.as_deref().unwrap_or("-"),
|
|
norm_title = normalized.title.as_deref().unwrap_or("-"),
|
|
confidence,
|
|
status,
|
|
"Re-processing complete"
|
|
);
|
|
|
|
db::update_pending_normalized(&state.pool, pt.id, status, &normalized, None).await?;
|
|
|
|
if status == "approved" {
|
|
let artist = normalized.artist.as_deref().unwrap_or("Unknown Artist");
|
|
let album = normalized.album.as_deref().unwrap_or("Unknown Album");
|
|
let title = normalized.title.as_deref().unwrap_or("Unknown Title");
|
|
let source = std::path::Path::new(&pt.inbox_path);
|
|
let ext = source.extension().and_then(|e| e.to_str()).unwrap_or("flac");
|
|
let track_num = normalized.track_number.unwrap_or(0);
|
|
|
|
let dest_filename = if track_num > 0 {
|
|
format!("{:02} - {}.{}", track_num, sanitize_filename(title), ext)
|
|
} else {
|
|
format!("{}.{}", sanitize_filename(title), ext)
|
|
};
|
|
|
|
// Check if already moved
|
|
let dest = state.config.storage_dir
|
|
.join(sanitize_filename(artist))
|
|
.join(sanitize_filename(album))
|
|
.join(&dest_filename);
|
|
|
|
let (storage_path, was_merged) = if dest.exists() && !source.exists() {
|
|
(dest.to_string_lossy().to_string(), false)
|
|
} else if source.exists() {
|
|
match mover::move_to_storage(
|
|
&state.config.storage_dir, artist, album, &dest_filename, source,
|
|
).await {
|
|
Ok(mover::MoveOutcome::Moved(p)) => (p.to_string_lossy().to_string(), false),
|
|
Ok(mover::MoveOutcome::Merged(p)) => (p.to_string_lossy().to_string(), true),
|
|
Err(e) => {
|
|
tracing::error!(id = %pt.id, ?e, "Failed to move file");
|
|
db::update_pending_status(&state.pool, pt.id, "error", Some(&e.to_string())).await?;
|
|
continue;
|
|
}
|
|
}
|
|
} else {
|
|
// Source file is gone — check if already in library by hash
|
|
let in_library: (bool,) = sqlx::query_as(
|
|
"SELECT EXISTS(SELECT 1 FROM tracks WHERE file_hash = $1)"
|
|
)
|
|
.bind(&pt.file_hash)
|
|
.fetch_one(&state.pool).await.unwrap_or((false,));
|
|
|
|
if in_library.0 {
|
|
tracing::info!(id = %pt.id, "Source missing but track already in library — merging");
|
|
db::update_pending_status(&state.pool, pt.id, "merged", None).await?;
|
|
} else {
|
|
tracing::error!(id = %pt.id, "Source file missing: {:?}", source);
|
|
db::update_pending_status(&state.pool, pt.id, "error", Some("Source file missing")).await?;
|
|
}
|
|
continue;
|
|
};
|
|
|
|
match db::approve_and_finalize(&state.pool, pt.id, &storage_path).await {
|
|
Ok(track_id) => {
|
|
if was_merged {
|
|
let _ = db::update_pending_status(&state.pool, pt.id, "merged", None).await;
|
|
}
|
|
tracing::info!(id = %pt.id, track_id, "Track finalized");
|
|
}
|
|
Err(e) => tracing::error!(id = %pt.id, ?e, "Failed to finalize"),
|
|
}
|
|
}
|
|
|
|
count += 1;
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(id = %pt.id, ?e, "LLM normalization failed");
|
|
db::update_pending_status(&state.pool, pt.id, "error", Some(&e.to_string())).await?;
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(count)
|
|
}
|
|
|
|
/// Recursively remove empty directories inside the inbox.
|
|
/// Does not remove the inbox root itself.
|
|
async fn cleanup_empty_dirs(dir: &std::path::Path) -> bool {
|
|
let mut entries = match tokio::fs::read_dir(dir).await {
|
|
Ok(e) => e,
|
|
Err(_) => return false,
|
|
};
|
|
|
|
let mut is_empty = true;
|
|
while let Ok(Some(entry)) = entries.next_entry().await {
|
|
let ft = match entry.file_type().await {
|
|
Ok(ft) => ft,
|
|
Err(_) => { is_empty = false; continue; }
|
|
};
|
|
if ft.is_dir() {
|
|
let child_empty = Box::pin(cleanup_empty_dirs(&entry.path())).await;
|
|
if child_empty {
|
|
if let Err(e) = tokio::fs::remove_dir(&entry.path()).await {
|
|
tracing::warn!(?e, path = ?entry.path(), "Failed to remove empty directory");
|
|
} else {
|
|
tracing::info!(path = ?entry.path(), "Removed empty inbox directory");
|
|
}
|
|
} else {
|
|
is_empty = false;
|
|
}
|
|
} else {
|
|
is_empty = false;
|
|
}
|
|
}
|
|
is_empty
|
|
}
|
|
|
|
/// Recursively collect all audio files and image files under a directory.
|
|
async fn collect_files(dir: &std::path::Path, audio: &mut Vec<std::path::PathBuf>, images: &mut Vec<std::path::PathBuf>) -> anyhow::Result<()> {
|
|
let mut entries = tokio::fs::read_dir(dir).await?;
|
|
while let Some(entry) = entries.next_entry().await? {
|
|
let name = entry.file_name().to_string_lossy().into_owned();
|
|
if name.starts_with('.') {
|
|
continue;
|
|
}
|
|
let ft = entry.file_type().await?;
|
|
if ft.is_dir() {
|
|
Box::pin(collect_files(&entry.path(), audio, images)).await?;
|
|
} else if ft.is_file() {
|
|
if is_audio_file(&name) {
|
|
audio.push(entry.path());
|
|
} else if is_cover_image(&name) {
|
|
images.push(entry.path());
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn is_audio_file(name: &str) -> bool {
|
|
let ext = name.rsplit('.').next().unwrap_or("").to_lowercase();
|
|
matches!(
|
|
ext.as_str(),
|
|
"mp3" | "flac" | "ogg" | "opus" | "aac" | "m4a" | "wav" | "ape" | "wv" | "wma" | "tta" | "aiff" | "aif"
|
|
)
|
|
}
|
|
|
|
fn is_cover_image(name: &str) -> bool {
|
|
let ext = name.rsplit('.').next().unwrap_or("").to_lowercase();
|
|
if !matches!(ext.as_str(), "jpg" | "jpeg" | "png" | "webp" | "bmp" | "gif") {
|
|
return false;
|
|
}
|
|
let stem = std::path::Path::new(name)
|
|
.file_stem()
|
|
.and_then(|s| s.to_str())
|
|
.unwrap_or("")
|
|
.to_lowercase();
|
|
matches!(
|
|
stem.as_str(),
|
|
"cover" | "front" | "folder" | "back" | "booklet" | "inlay" | "disc" | "cd"
|
|
| "album" | "artwork" | "art" | "scan" | "thumb" | "thumbnail"
|
|
)
|
|
}
|
|
|
|
fn classify_image(name: &str) -> &'static str {
|
|
let stem = std::path::Path::new(name)
|
|
.file_stem()
|
|
.and_then(|s| s.to_str())
|
|
.unwrap_or("")
|
|
.to_lowercase();
|
|
match stem.as_str() {
|
|
"back" => "back",
|
|
"booklet" | "inlay" | "scan" => "booklet",
|
|
"disc" | "cd" => "disc",
|
|
_ => "cover",
|
|
}
|
|
}
|
|
|
|
fn mime_for_image(name: &str) -> &'static str {
|
|
let ext = name.rsplit('.').next().unwrap_or("").to_lowercase();
|
|
match ext.as_str() {
|
|
"jpg" | "jpeg" => "image/jpeg",
|
|
"png" => "image/png",
|
|
"webp" => "image/webp",
|
|
"gif" => "image/gif",
|
|
"bmp" => "image/bmp",
|
|
_ => "application/octet-stream",
|
|
}
|
|
}
|
|
|
|
async fn process_file(state: &Arc<AppState>, file_path: &std::path::Path) -> anyhow::Result<bool> {
|
|
let filename = file_path.file_name().and_then(|n| n.to_str()).unwrap_or("?");
|
|
tracing::info!(file = filename, "Processing new file: {:?}", file_path);
|
|
|
|
// Compute file hash for dedup
|
|
tracing::info!(file = filename, "Computing file hash...");
|
|
let path_clone = file_path.to_path_buf();
|
|
let (hash, file_size) = tokio::task::spawn_blocking(move || -> anyhow::Result<(String, i64)> {
|
|
let data = std::fs::read(&path_clone)?;
|
|
let hash = blake3::hash(&data).to_hex().to_string();
|
|
let size = data.len() as i64;
|
|
Ok((hash, size))
|
|
})
|
|
.await??;
|
|
tracing::info!(file = filename, hash = &hash[..16], size = file_size, "File hashed");
|
|
|
|
// Skip if already known
|
|
if db::file_hash_exists(&state.pool, &hash).await? {
|
|
tracing::info!(file = filename, "Skipping: file hash already exists in database");
|
|
return Ok(false);
|
|
}
|
|
|
|
// Extract raw metadata
|
|
tracing::info!(file = filename, "Extracting metadata with Symphonia...");
|
|
let path_for_meta = file_path.to_path_buf();
|
|
let raw_meta = tokio::task::spawn_blocking(move || metadata::extract(&path_for_meta)).await??;
|
|
tracing::info!(
|
|
file = filename,
|
|
artist = raw_meta.artist.as_deref().unwrap_or("-"),
|
|
title = raw_meta.title.as_deref().unwrap_or("-"),
|
|
album = raw_meta.album.as_deref().unwrap_or("-"),
|
|
"Raw metadata extracted"
|
|
);
|
|
|
|
// Parse path hints relative to inbox dir
|
|
let relative = file_path.strip_prefix(&state.config.inbox_dir).unwrap_or(file_path);
|
|
let hints = path_hints::parse(relative);
|
|
if hints.artist.is_some() || hints.album.is_some() || hints.year.is_some() {
|
|
tracing::info!(
|
|
file = filename,
|
|
path_artist = hints.artist.as_deref().unwrap_or("-"),
|
|
path_album = hints.album.as_deref().unwrap_or("-"),
|
|
path_year = ?hints.year,
|
|
"Path hints parsed"
|
|
);
|
|
}
|
|
|
|
let inbox_path_str = file_path.to_string_lossy().to_string();
|
|
|
|
// Insert pending record
|
|
tracing::info!(file = filename, "Inserting pending track record...");
|
|
let pending_id = db::insert_pending(
|
|
&state.pool,
|
|
&inbox_path_str,
|
|
&hash,
|
|
file_size,
|
|
&db::RawFields {
|
|
title: raw_meta.title.clone(),
|
|
artist: raw_meta.artist.clone(),
|
|
album: raw_meta.album.clone(),
|
|
year: raw_meta.year.map(|y| y as i32),
|
|
track_number: raw_meta.track_number.map(|t| t as i32),
|
|
genre: raw_meta.genre.clone(),
|
|
},
|
|
&db::PathHints {
|
|
title: hints.title.clone(),
|
|
artist: hints.artist.clone(),
|
|
album: hints.album.clone(),
|
|
year: hints.year,
|
|
track_number: hints.track_number,
|
|
},
|
|
raw_meta.duration_secs,
|
|
)
|
|
.await?;
|
|
|
|
db::update_pending_status(&state.pool, pending_id, "processing", None).await?;
|
|
|
|
// RAG: find similar entries in DB
|
|
let artist_query = raw_meta.artist.as_deref()
|
|
.or(hints.artist.as_deref())
|
|
.unwrap_or("");
|
|
let album_query = raw_meta.album.as_deref()
|
|
.or(hints.album.as_deref())
|
|
.unwrap_or("");
|
|
|
|
tracing::info!(file = filename, "Searching database for similar artists/albums...");
|
|
let similar_artists = if !artist_query.is_empty() {
|
|
db::find_similar_artists(&state.pool, artist_query, 5).await.unwrap_or_default()
|
|
} else {
|
|
Vec::new()
|
|
};
|
|
|
|
let similar_albums = if !album_query.is_empty() {
|
|
db::find_similar_albums(&state.pool, album_query, 5).await.unwrap_or_default()
|
|
} else {
|
|
Vec::new()
|
|
};
|
|
|
|
if !similar_artists.is_empty() {
|
|
let names: Vec<&str> = similar_artists.iter().map(|a| a.name.as_str()).collect();
|
|
tracing::info!(file = filename, matches = ?names, "Found similar artists in DB");
|
|
}
|
|
if !similar_albums.is_empty() {
|
|
let names: Vec<&str> = similar_albums.iter().map(|a| a.name.as_str()).collect();
|
|
tracing::info!(file = filename, matches = ?names, "Found similar albums in DB");
|
|
}
|
|
|
|
// Build folder context for the LLM
|
|
let audio_extensions = ["flac", "mp3", "ogg", "wav", "aac", "m4a", "opus", "wma", "ape", "alac"];
|
|
let folder_ctx = {
|
|
let folder = file_path.parent().unwrap_or(file_path);
|
|
let mut folder_files: Vec<String> = std::fs::read_dir(folder)
|
|
.ok()
|
|
.map(|rd| {
|
|
rd.filter_map(|e| e.ok())
|
|
.filter_map(|e| {
|
|
let name = e.file_name().to_string_lossy().into_owned();
|
|
let ext = name.rsplit('.').next().unwrap_or("").to_lowercase();
|
|
if audio_extensions.contains(&ext.as_str()) { Some(name) } else { None }
|
|
})
|
|
.collect()
|
|
})
|
|
.unwrap_or_default();
|
|
folder_files.sort();
|
|
let track_count = folder_files.len();
|
|
let folder_path = folder
|
|
.strip_prefix(&state.config.inbox_dir)
|
|
.unwrap_or(folder)
|
|
.to_string_lossy()
|
|
.into_owned();
|
|
normalize::FolderContext { folder_path, folder_files, track_count }
|
|
};
|
|
|
|
// Call LLM for normalization
|
|
tracing::info!(file = filename, model = %state.config.ollama_model, "Sending to LLM for normalization...");
|
|
match normalize::normalize(state, &raw_meta, &hints, &similar_artists, &similar_albums, Some(&folder_ctx)).await {
|
|
Ok(normalized) => {
|
|
let confidence = normalized.confidence.unwrap_or(0.0);
|
|
let status = if confidence >= state.config.confidence_threshold {
|
|
"approved"
|
|
} else {
|
|
"review"
|
|
};
|
|
|
|
tracing::info!(
|
|
file = filename,
|
|
norm_artist = normalized.artist.as_deref().unwrap_or("-"),
|
|
norm_title = normalized.title.as_deref().unwrap_or("-"),
|
|
norm_album = normalized.album.as_deref().unwrap_or("-"),
|
|
confidence,
|
|
status,
|
|
notes = normalized.notes.as_deref().unwrap_or("-"),
|
|
"LLM normalization complete"
|
|
);
|
|
if !normalized.featured_artists.is_empty() {
|
|
tracing::info!(
|
|
file = filename,
|
|
featured = ?normalized.featured_artists,
|
|
"Featured artists detected"
|
|
);
|
|
}
|
|
|
|
db::update_pending_normalized(&state.pool, pending_id, status, &normalized, None).await?;
|
|
|
|
// Auto-approve: move file to storage
|
|
if status == "approved" {
|
|
let artist = normalized.artist.as_deref().unwrap_or("Unknown Artist");
|
|
let album = normalized.album.as_deref().unwrap_or("Unknown Album");
|
|
let title = normalized.title.as_deref().unwrap_or("Unknown Title");
|
|
let ext = file_path.extension().and_then(|e| e.to_str()).unwrap_or("flac");
|
|
let track_num = normalized.track_number.unwrap_or(0);
|
|
|
|
let dest_filename = if track_num > 0 {
|
|
format!("{:02} - {}.{}", track_num, sanitize_filename(title), ext)
|
|
} else {
|
|
format!("{}.{}", sanitize_filename(title), ext)
|
|
};
|
|
|
|
tracing::info!(
|
|
file = filename,
|
|
dest_artist = artist,
|
|
dest_album = album,
|
|
dest_filename = %dest_filename,
|
|
"Auto-approved, moving to storage..."
|
|
);
|
|
|
|
match mover::move_to_storage(
|
|
&state.config.storage_dir,
|
|
artist,
|
|
album,
|
|
&dest_filename,
|
|
file_path,
|
|
)
|
|
.await
|
|
{
|
|
Ok(outcome) => {
|
|
let (storage_path, was_merged) = match outcome {
|
|
mover::MoveOutcome::Moved(p) => (p, false),
|
|
mover::MoveOutcome::Merged(p) => (p, true),
|
|
};
|
|
let rel_path = storage_path.to_string_lossy().to_string();
|
|
match db::approve_and_finalize(&state.pool, pending_id, &rel_path).await {
|
|
Ok(track_id) => {
|
|
if was_merged {
|
|
let _ = db::update_pending_status(&state.pool, pending_id, "merged", None).await;
|
|
}
|
|
tracing::info!(file = filename, track_id, storage = %rel_path, "Track finalized in database");
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(file = filename, ?e, "Failed to finalize track in DB after move");
|
|
}
|
|
}
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(file = filename, ?e, "Failed to move file to storage");
|
|
db::update_pending_status(&state.pool, pending_id, "error", Some(&e.to_string())).await?;
|
|
}
|
|
}
|
|
} else {
|
|
tracing::info!(file = filename, confidence, "Sent to review queue (below threshold {})", state.config.confidence_threshold);
|
|
}
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(file = filename, ?e, "LLM normalization failed");
|
|
db::update_pending_status(&state.pool, pending_id, "error", Some(&e.to_string())).await?;
|
|
}
|
|
}
|
|
|
|
Ok(true)
|
|
}
|
|
|
|
/// Process a cover image found in the inbox.
|
|
/// Uses path hints (Artist/Album/) to find the matching album in the DB,
|
|
/// then copies the image to the album's storage folder.
|
|
async fn process_cover_image(state: &Arc<AppState>, image_path: &std::path::Path) -> anyhow::Result<bool> {
|
|
let filename = image_path.file_name().and_then(|n| n.to_str()).unwrap_or("?");
|
|
|
|
// Hash for dedup
|
|
let path_clone = image_path.to_path_buf();
|
|
let (hash, file_size) = tokio::task::spawn_blocking(move || -> anyhow::Result<(String, i64)> {
|
|
let data = std::fs::read(&path_clone)?;
|
|
let hash = blake3::hash(&data).to_hex().to_string();
|
|
let size = data.len() as i64;
|
|
Ok((hash, size))
|
|
})
|
|
.await??;
|
|
|
|
if db::image_hash_exists(&state.pool, &hash).await? {
|
|
return Ok(false);
|
|
}
|
|
|
|
// Derive artist/album from path hints
|
|
let relative = image_path.strip_prefix(&state.config.inbox_dir).unwrap_or(image_path);
|
|
let components: Vec<&str> = relative
|
|
.components()
|
|
.filter_map(|c| c.as_os_str().to_str())
|
|
.collect();
|
|
|
|
tracing::info!(file = filename, path = ?relative, components = components.len(), "Processing cover image");
|
|
|
|
// Supported structures:
|
|
// Artist/Album/image.jpg (3+ components)
|
|
// Album/image.jpg (2 components — album dir + image)
|
|
if components.len() < 2 {
|
|
tracing::info!(file = filename, "Cover image not inside an album folder, skipping");
|
|
return Ok(false);
|
|
}
|
|
|
|
// The directory directly containing the image is always the album hint
|
|
let album_raw = components[components.len() - 2];
|
|
let path_artist = if components.len() >= 3 {
|
|
Some(components[components.len() - 3])
|
|
} else {
|
|
None
|
|
};
|
|
|
|
let (album_name, _) = path_hints::parse_album_year_public(album_raw);
|
|
|
|
tracing::info!(
|
|
file = filename,
|
|
path_artist = path_artist.unwrap_or("-"),
|
|
album_hint = %album_name,
|
|
"Looking up album in database..."
|
|
);
|
|
|
|
// Try to find album in DB — try with artist if available, then without
|
|
let album_id = if let Some(artist) = path_artist {
|
|
find_album_for_cover(&state.pool, artist, &album_name).await?
|
|
} else {
|
|
None
|
|
};
|
|
|
|
// If not found with artist, try fuzzy album name match across all artists
|
|
let album_id = match album_id {
|
|
Some(id) => Some(id),
|
|
None => {
|
|
let similar_albums = db::find_similar_albums(&state.pool, &album_name, 3).await.unwrap_or_default();
|
|
if let Some(best) = similar_albums.first() {
|
|
if best.similarity > 0.5 {
|
|
tracing::info!(file = filename, album = %best.name, similarity = best.similarity, "Matched album by fuzzy search");
|
|
Some(best.id)
|
|
} else {
|
|
None
|
|
}
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
};
|
|
|
|
let album_id = match album_id {
|
|
Some(id) => id,
|
|
None => {
|
|
tracing::info!(
|
|
file = filename,
|
|
artist = path_artist.unwrap_or("-"),
|
|
album = %album_name,
|
|
"No matching album found in DB, skipping cover"
|
|
);
|
|
return Ok(false);
|
|
}
|
|
};
|
|
|
|
// Determine image type and move to storage
|
|
let image_type = classify_image(filename);
|
|
let mime = mime_for_image(filename);
|
|
|
|
// Get album's storage path from any track in that album
|
|
let storage_dir_opt: Option<(String,)> = sqlx::query_as(
|
|
"SELECT storage_path FROM tracks WHERE album_id = $1 LIMIT 1"
|
|
)
|
|
.bind(album_id)
|
|
.fetch_optional(&state.pool)
|
|
.await?;
|
|
|
|
let album_storage_dir = match storage_dir_opt {
|
|
Some((track_path,)) => {
|
|
let p = std::path::Path::new(&track_path);
|
|
match p.parent() {
|
|
Some(dir) if dir.is_dir() => dir.to_path_buf(),
|
|
_ => {
|
|
tracing::warn!(file = filename, track_path = %track_path, "Track storage path has no valid parent dir");
|
|
return Ok(false);
|
|
}
|
|
}
|
|
}
|
|
None => {
|
|
tracing::info!(file = filename, album_id, "Album has no tracks in storage yet, skipping cover");
|
|
return Ok(false);
|
|
}
|
|
};
|
|
tracing::info!(file = filename, dest_dir = ?album_storage_dir, "Will copy cover to album storage dir");
|
|
|
|
let dest = album_storage_dir.join(filename);
|
|
if !dest.exists() {
|
|
// Move or copy image
|
|
match tokio::fs::rename(image_path, &dest).await {
|
|
Ok(()) => {}
|
|
Err(_) => {
|
|
tokio::fs::copy(image_path, &dest).await?;
|
|
tokio::fs::remove_file(image_path).await?;
|
|
}
|
|
}
|
|
}
|
|
|
|
let dest_str = dest.to_string_lossy().to_string();
|
|
db::insert_album_image(&state.pool, album_id, image_type, &dest_str, &hash, mime, file_size).await?;
|
|
|
|
tracing::info!(
|
|
file = filename,
|
|
album_id,
|
|
image_type,
|
|
dest = %dest_str,
|
|
"Album image saved"
|
|
);
|
|
|
|
Ok(true)
|
|
}
|
|
|
|
/// Find an album in DB matching the path-derived artist and album name.
|
|
/// Tries exact match, then fuzzy artist + exact album, then fuzzy artist + fuzzy album.
|
|
async fn find_album_for_cover(pool: &sqlx::PgPool, path_artist: &str, album_name: &str) -> anyhow::Result<Option<i64>> {
|
|
// Try exact match first
|
|
if let Some(id) = db::find_album_id(pool, path_artist, album_name).await? {
|
|
return Ok(Some(id));
|
|
}
|
|
|
|
// Try fuzzy artist, then exact or fuzzy album under that artist
|
|
let similar_artists = db::find_similar_artists(pool, path_artist, 5).await.unwrap_or_default();
|
|
for artist in &similar_artists {
|
|
if artist.similarity < 0.3 {
|
|
continue;
|
|
}
|
|
// Exact album under fuzzy artist
|
|
if let Some(id) = db::find_album_id(pool, &artist.name, album_name).await? {
|
|
return Ok(Some(id));
|
|
}
|
|
// Fuzzy album under this artist
|
|
let similar_albums = db::find_similar_albums(pool, album_name, 3).await.unwrap_or_default();
|
|
for album in &similar_albums {
|
|
if album.artist_id == artist.id && album.similarity > 0.4 {
|
|
return Ok(Some(album.id));
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(None)
|
|
}
|
|
|
|
/// Remove characters that are unsafe for filenames.
|
|
fn sanitize_filename(name: &str) -> String {
|
|
name.chars()
|
|
.map(|c| match c {
|
|
'/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '_',
|
|
_ => c,
|
|
})
|
|
.collect::<String>()
|
|
.trim()
|
|
.to_owned()
|
|
}
|