pub mod metadata; pub mod normalize; pub mod path_hints; pub mod mover; use std::sync::Arc; use std::time::Duration; use crate::db; use crate::web::AppState; pub async fn run(state: Arc) { let interval = Duration::from_secs(state.config.poll_interval_secs); tracing::info!("Ingest loop started, polling every {}s: {:?}", state.config.poll_interval_secs, state.config.inbox_dir); loop { match scan_inbox(&state).await { Ok(0) => {} Ok(count) => tracing::info!(count, "processed new files"), Err(e) => tracing::error!(?e, "inbox scan failed"), } // Re-process pending tracks (e.g. retried from admin UI) match reprocess_pending(&state).await { Ok(0) => {} Ok(count) => tracing::info!(count, "re-processed pending tracks"), Err(e) => tracing::error!(?e, "pending re-processing failed"), } // Process pending merge proposals match db::get_pending_merges_for_processing(&state.pool).await { Ok(merge_ids) => { for merge_id in merge_ids { if let Err(e) = crate::merge::propose_merge(&state, merge_id).await { tracing::error!(id = %merge_id, ?e, "Merge proposal failed"); let _ = db::update_merge_status(&state.pool, merge_id, "error", Some(&e.to_string())).await; } } } Err(e) => tracing::error!(?e, "Failed to load pending merges"), } tokio::time::sleep(interval).await; } } async fn scan_inbox(state: &Arc) -> anyhow::Result { let mut count = 0; let mut audio_files = Vec::new(); let mut image_files = Vec::new(); collect_files(&state.config.inbox_dir, &mut audio_files, &mut image_files).await?; if !audio_files.is_empty() || !image_files.is_empty() { tracing::info!("Scan found {} audio file(s) and {} image(s) in inbox", audio_files.len(), image_files.len()); } for file_path in &audio_files { match process_file(state, file_path).await { Ok(true) => count += 1, Ok(false) => tracing::debug!(path = ?file_path, "skipped (already known)"), Err(e) => tracing::warn!(?e, path = ?file_path, "failed to process file"), } } // Process cover images after audio (so albums exist in DB) for image_path in &image_files { match process_cover_image(state, image_path).await { Ok(true) => { tracing::info!(path = ?image_path, "Cover image processed"); count += 1; } Ok(false) => tracing::debug!(path = ?image_path, "cover image skipped"), Err(e) => tracing::warn!(?e, path = ?image_path, "failed to process cover image"), } } // Clean up empty directories in inbox if count > 0 { cleanup_empty_dirs(&state.config.inbox_dir).await; } Ok(count) } /// Re-process pending tracks from DB (e.g. tracks retried via admin UI). /// These already have raw metadata and path hints stored — just need RAG + LLM. async fn reprocess_pending(state: &Arc) -> anyhow::Result { let pending = db::list_pending_for_processing(&state.pool, 10).await?; if pending.is_empty() { return Ok(0); } let mut count = 0; for pt in &pending { tracing::info!(id = %pt.id, title = pt.raw_title.as_deref().unwrap_or("?"), "Re-processing pending track"); db::update_pending_status(&state.pool, pt.id, "processing", None).await?; // Build raw metadata and hints from stored DB fields let raw_meta = metadata::RawMetadata { title: pt.raw_title.clone(), artist: pt.raw_artist.clone(), album: pt.raw_album.clone(), track_number: pt.raw_track_number.map(|n| n as u32), year: pt.raw_year.map(|n| n as u32), genre: pt.raw_genre.clone(), duration_secs: pt.duration_secs, }; let hints = db::PathHints { title: pt.path_title.clone(), artist: pt.path_artist.clone(), album: pt.path_album.clone(), year: pt.path_year, track_number: pt.path_track_number, }; // RAG lookup let artist_query = raw_meta.artist.as_deref() .or(hints.artist.as_deref()) .unwrap_or(""); let album_query = raw_meta.album.as_deref() .or(hints.album.as_deref()) .unwrap_or(""); let similar_artists = if !artist_query.is_empty() { db::find_similar_artists(&state.pool, artist_query, 5).await.unwrap_or_default() } else { Vec::new() }; let similar_albums = if !album_query.is_empty() { db::find_similar_albums(&state.pool, album_query, 5).await.unwrap_or_default() } else { Vec::new() }; // LLM normalization (no folder context available for reprocessing from DB) match normalize::normalize(state, &raw_meta, &hints, &similar_artists, &similar_albums, None).await { Ok(normalized) => { let confidence = normalized.confidence.unwrap_or(0.0); let status = if confidence >= state.config.confidence_threshold { "approved" } else { "review" }; tracing::info!( id = %pt.id, norm_artist = normalized.artist.as_deref().unwrap_or("-"), norm_title = normalized.title.as_deref().unwrap_or("-"), confidence, status, "Re-processing complete" ); db::update_pending_normalized(&state.pool, pt.id, status, &normalized, None).await?; if status == "approved" { let artist = normalized.artist.as_deref().unwrap_or("Unknown Artist"); let album = normalized.album.as_deref().unwrap_or("Unknown Album"); let title = normalized.title.as_deref().unwrap_or("Unknown Title"); let source = std::path::Path::new(&pt.inbox_path); let ext = source.extension().and_then(|e| e.to_str()).unwrap_or("flac"); let track_num = normalized.track_number.unwrap_or(0); let dest_filename = if track_num > 0 { format!("{:02} - {}.{}", track_num, sanitize_filename(title), ext) } else { format!("{}.{}", sanitize_filename(title), ext) }; // Check if already moved let dest = state.config.storage_dir .join(sanitize_filename(artist)) .join(sanitize_filename(album)) .join(&dest_filename); let (storage_path, was_merged) = if dest.exists() && !source.exists() { (dest.to_string_lossy().to_string(), false) } else if source.exists() { match mover::move_to_storage( &state.config.storage_dir, artist, album, &dest_filename, source, ).await { Ok(mover::MoveOutcome::Moved(p)) => (p.to_string_lossy().to_string(), false), Ok(mover::MoveOutcome::Merged(p)) => (p.to_string_lossy().to_string(), true), Err(e) => { tracing::error!(id = %pt.id, ?e, "Failed to move file"); db::update_pending_status(&state.pool, pt.id, "error", Some(&e.to_string())).await?; continue; } } } else { tracing::error!(id = %pt.id, "Source file missing: {:?}", source); db::update_pending_status(&state.pool, pt.id, "error", Some("Source file missing")).await?; continue; }; match db::approve_and_finalize(&state.pool, pt.id, &storage_path).await { Ok(track_id) => { if was_merged { let _ = db::update_pending_status(&state.pool, pt.id, "merged", None).await; } tracing::info!(id = %pt.id, track_id, "Track finalized"); } Err(e) => tracing::error!(id = %pt.id, ?e, "Failed to finalize"), } } count += 1; } Err(e) => { tracing::error!(id = %pt.id, ?e, "LLM normalization failed"); db::update_pending_status(&state.pool, pt.id, "error", Some(&e.to_string())).await?; } } } Ok(count) } /// Recursively remove empty directories inside the inbox. /// Does not remove the inbox root itself. async fn cleanup_empty_dirs(dir: &std::path::Path) -> bool { let mut entries = match tokio::fs::read_dir(dir).await { Ok(e) => e, Err(_) => return false, }; let mut is_empty = true; while let Ok(Some(entry)) = entries.next_entry().await { let ft = match entry.file_type().await { Ok(ft) => ft, Err(_) => { is_empty = false; continue; } }; if ft.is_dir() { let child_empty = Box::pin(cleanup_empty_dirs(&entry.path())).await; if child_empty { if let Err(e) = tokio::fs::remove_dir(&entry.path()).await { tracing::warn!(?e, path = ?entry.path(), "Failed to remove empty directory"); } else { tracing::info!(path = ?entry.path(), "Removed empty inbox directory"); } } else { is_empty = false; } } else { is_empty = false; } } is_empty } /// Recursively collect all audio files and image files under a directory. async fn collect_files(dir: &std::path::Path, audio: &mut Vec, images: &mut Vec) -> anyhow::Result<()> { let mut entries = tokio::fs::read_dir(dir).await?; while let Some(entry) = entries.next_entry().await? { let name = entry.file_name().to_string_lossy().into_owned(); if name.starts_with('.') { continue; } let ft = entry.file_type().await?; if ft.is_dir() { Box::pin(collect_files(&entry.path(), audio, images)).await?; } else if ft.is_file() { if is_audio_file(&name) { audio.push(entry.path()); } else if is_cover_image(&name) { images.push(entry.path()); } } } Ok(()) } fn is_audio_file(name: &str) -> bool { let ext = name.rsplit('.').next().unwrap_or("").to_lowercase(); matches!( ext.as_str(), "mp3" | "flac" | "ogg" | "opus" | "aac" | "m4a" | "wav" | "ape" | "wv" | "wma" | "tta" | "aiff" | "aif" ) } fn is_cover_image(name: &str) -> bool { let ext = name.rsplit('.').next().unwrap_or("").to_lowercase(); if !matches!(ext.as_str(), "jpg" | "jpeg" | "png" | "webp" | "bmp" | "gif") { return false; } let stem = std::path::Path::new(name) .file_stem() .and_then(|s| s.to_str()) .unwrap_or("") .to_lowercase(); matches!( stem.as_str(), "cover" | "front" | "folder" | "back" | "booklet" | "inlay" | "disc" | "cd" | "album" | "artwork" | "art" | "scan" | "thumb" | "thumbnail" ) } fn classify_image(name: &str) -> &'static str { let stem = std::path::Path::new(name) .file_stem() .and_then(|s| s.to_str()) .unwrap_or("") .to_lowercase(); match stem.as_str() { "back" => "back", "booklet" | "inlay" | "scan" => "booklet", "disc" | "cd" => "disc", _ => "cover", } } fn mime_for_image(name: &str) -> &'static str { let ext = name.rsplit('.').next().unwrap_or("").to_lowercase(); match ext.as_str() { "jpg" | "jpeg" => "image/jpeg", "png" => "image/png", "webp" => "image/webp", "gif" => "image/gif", "bmp" => "image/bmp", _ => "application/octet-stream", } } async fn process_file(state: &Arc, file_path: &std::path::Path) -> anyhow::Result { let filename = file_path.file_name().and_then(|n| n.to_str()).unwrap_or("?"); tracing::info!(file = filename, "Processing new file: {:?}", file_path); // Compute file hash for dedup tracing::info!(file = filename, "Computing file hash..."); let path_clone = file_path.to_path_buf(); let (hash, file_size) = tokio::task::spawn_blocking(move || -> anyhow::Result<(String, i64)> { let data = std::fs::read(&path_clone)?; let hash = blake3::hash(&data).to_hex().to_string(); let size = data.len() as i64; Ok((hash, size)) }) .await??; tracing::info!(file = filename, hash = &hash[..16], size = file_size, "File hashed"); // Skip if already known if db::file_hash_exists(&state.pool, &hash).await? { tracing::info!(file = filename, "Skipping: file hash already exists in database"); return Ok(false); } // Extract raw metadata tracing::info!(file = filename, "Extracting metadata with Symphonia..."); let path_for_meta = file_path.to_path_buf(); let raw_meta = tokio::task::spawn_blocking(move || metadata::extract(&path_for_meta)).await??; tracing::info!( file = filename, artist = raw_meta.artist.as_deref().unwrap_or("-"), title = raw_meta.title.as_deref().unwrap_or("-"), album = raw_meta.album.as_deref().unwrap_or("-"), "Raw metadata extracted" ); // Parse path hints relative to inbox dir let relative = file_path.strip_prefix(&state.config.inbox_dir).unwrap_or(file_path); let hints = path_hints::parse(relative); if hints.artist.is_some() || hints.album.is_some() || hints.year.is_some() { tracing::info!( file = filename, path_artist = hints.artist.as_deref().unwrap_or("-"), path_album = hints.album.as_deref().unwrap_or("-"), path_year = ?hints.year, "Path hints parsed" ); } let inbox_path_str = file_path.to_string_lossy().to_string(); // Insert pending record tracing::info!(file = filename, "Inserting pending track record..."); let pending_id = db::insert_pending( &state.pool, &inbox_path_str, &hash, file_size, &db::RawFields { title: raw_meta.title.clone(), artist: raw_meta.artist.clone(), album: raw_meta.album.clone(), year: raw_meta.year.map(|y| y as i32), track_number: raw_meta.track_number.map(|t| t as i32), genre: raw_meta.genre.clone(), }, &db::PathHints { title: hints.title.clone(), artist: hints.artist.clone(), album: hints.album.clone(), year: hints.year, track_number: hints.track_number, }, raw_meta.duration_secs, ) .await?; db::update_pending_status(&state.pool, pending_id, "processing", None).await?; // RAG: find similar entries in DB let artist_query = raw_meta.artist.as_deref() .or(hints.artist.as_deref()) .unwrap_or(""); let album_query = raw_meta.album.as_deref() .or(hints.album.as_deref()) .unwrap_or(""); tracing::info!(file = filename, "Searching database for similar artists/albums..."); let similar_artists = if !artist_query.is_empty() { db::find_similar_artists(&state.pool, artist_query, 5).await.unwrap_or_default() } else { Vec::new() }; let similar_albums = if !album_query.is_empty() { db::find_similar_albums(&state.pool, album_query, 5).await.unwrap_or_default() } else { Vec::new() }; if !similar_artists.is_empty() { let names: Vec<&str> = similar_artists.iter().map(|a| a.name.as_str()).collect(); tracing::info!(file = filename, matches = ?names, "Found similar artists in DB"); } if !similar_albums.is_empty() { let names: Vec<&str> = similar_albums.iter().map(|a| a.name.as_str()).collect(); tracing::info!(file = filename, matches = ?names, "Found similar albums in DB"); } // Build folder context for the LLM let audio_extensions = ["flac", "mp3", "ogg", "wav", "aac", "m4a", "opus", "wma", "ape", "alac"]; let folder_ctx = { let folder = file_path.parent().unwrap_or(file_path); let mut folder_files: Vec = std::fs::read_dir(folder) .ok() .map(|rd| { rd.filter_map(|e| e.ok()) .filter_map(|e| { let name = e.file_name().to_string_lossy().into_owned(); let ext = name.rsplit('.').next().unwrap_or("").to_lowercase(); if audio_extensions.contains(&ext.as_str()) { Some(name) } else { None } }) .collect() }) .unwrap_or_default(); folder_files.sort(); let track_count = folder_files.len(); let folder_path = folder .strip_prefix(&state.config.inbox_dir) .unwrap_or(folder) .to_string_lossy() .into_owned(); normalize::FolderContext { folder_path, folder_files, track_count } }; // Call LLM for normalization tracing::info!(file = filename, model = %state.config.ollama_model, "Sending to LLM for normalization..."); match normalize::normalize(state, &raw_meta, &hints, &similar_artists, &similar_albums, Some(&folder_ctx)).await { Ok(normalized) => { let confidence = normalized.confidence.unwrap_or(0.0); let status = if confidence >= state.config.confidence_threshold { "approved" } else { "review" }; tracing::info!( file = filename, norm_artist = normalized.artist.as_deref().unwrap_or("-"), norm_title = normalized.title.as_deref().unwrap_or("-"), norm_album = normalized.album.as_deref().unwrap_or("-"), confidence, status, notes = normalized.notes.as_deref().unwrap_or("-"), "LLM normalization complete" ); if !normalized.featured_artists.is_empty() { tracing::info!( file = filename, featured = ?normalized.featured_artists, "Featured artists detected" ); } db::update_pending_normalized(&state.pool, pending_id, status, &normalized, None).await?; // Auto-approve: move file to storage if status == "approved" { let artist = normalized.artist.as_deref().unwrap_or("Unknown Artist"); let album = normalized.album.as_deref().unwrap_or("Unknown Album"); let title = normalized.title.as_deref().unwrap_or("Unknown Title"); let ext = file_path.extension().and_then(|e| e.to_str()).unwrap_or("flac"); let track_num = normalized.track_number.unwrap_or(0); let dest_filename = if track_num > 0 { format!("{:02} - {}.{}", track_num, sanitize_filename(title), ext) } else { format!("{}.{}", sanitize_filename(title), ext) }; tracing::info!( file = filename, dest_artist = artist, dest_album = album, dest_filename = %dest_filename, "Auto-approved, moving to storage..." ); match mover::move_to_storage( &state.config.storage_dir, artist, album, &dest_filename, file_path, ) .await { Ok(outcome) => { let (storage_path, was_merged) = match outcome { mover::MoveOutcome::Moved(p) => (p, false), mover::MoveOutcome::Merged(p) => (p, true), }; let rel_path = storage_path.to_string_lossy().to_string(); match db::approve_and_finalize(&state.pool, pending_id, &rel_path).await { Ok(track_id) => { if was_merged { let _ = db::update_pending_status(&state.pool, pending_id, "merged", None).await; } tracing::info!(file = filename, track_id, storage = %rel_path, "Track finalized in database"); } Err(e) => { tracing::error!(file = filename, ?e, "Failed to finalize track in DB after move"); } } } Err(e) => { tracing::error!(file = filename, ?e, "Failed to move file to storage"); db::update_pending_status(&state.pool, pending_id, "error", Some(&e.to_string())).await?; } } } else { tracing::info!(file = filename, confidence, "Sent to review queue (below threshold {})", state.config.confidence_threshold); } } Err(e) => { tracing::error!(file = filename, ?e, "LLM normalization failed"); db::update_pending_status(&state.pool, pending_id, "error", Some(&e.to_string())).await?; } } Ok(true) } /// Process a cover image found in the inbox. /// Uses path hints (Artist/Album/) to find the matching album in the DB, /// then copies the image to the album's storage folder. async fn process_cover_image(state: &Arc, image_path: &std::path::Path) -> anyhow::Result { let filename = image_path.file_name().and_then(|n| n.to_str()).unwrap_or("?"); // Hash for dedup let path_clone = image_path.to_path_buf(); let (hash, file_size) = tokio::task::spawn_blocking(move || -> anyhow::Result<(String, i64)> { let data = std::fs::read(&path_clone)?; let hash = blake3::hash(&data).to_hex().to_string(); let size = data.len() as i64; Ok((hash, size)) }) .await??; if db::image_hash_exists(&state.pool, &hash).await? { return Ok(false); } // Derive artist/album from path hints let relative = image_path.strip_prefix(&state.config.inbox_dir).unwrap_or(image_path); let components: Vec<&str> = relative .components() .filter_map(|c| c.as_os_str().to_str()) .collect(); tracing::info!(file = filename, path = ?relative, components = components.len(), "Processing cover image"); // Supported structures: // Artist/Album/image.jpg (3+ components) // Album/image.jpg (2 components — album dir + image) if components.len() < 2 { tracing::info!(file = filename, "Cover image not inside an album folder, skipping"); return Ok(false); } // The directory directly containing the image is always the album hint let album_raw = components[components.len() - 2]; let path_artist = if components.len() >= 3 { Some(components[components.len() - 3]) } else { None }; let (album_name, _) = path_hints::parse_album_year_public(album_raw); tracing::info!( file = filename, path_artist = path_artist.unwrap_or("-"), album_hint = %album_name, "Looking up album in database..." ); // Try to find album in DB — try with artist if available, then without let album_id = if let Some(artist) = path_artist { find_album_for_cover(&state.pool, artist, &album_name).await? } else { None }; // If not found with artist, try fuzzy album name match across all artists let album_id = match album_id { Some(id) => Some(id), None => { let similar_albums = db::find_similar_albums(&state.pool, &album_name, 3).await.unwrap_or_default(); if let Some(best) = similar_albums.first() { if best.similarity > 0.5 { tracing::info!(file = filename, album = %best.name, similarity = best.similarity, "Matched album by fuzzy search"); Some(best.id) } else { None } } else { None } } }; let album_id = match album_id { Some(id) => id, None => { tracing::info!( file = filename, artist = path_artist.unwrap_or("-"), album = %album_name, "No matching album found in DB, skipping cover" ); return Ok(false); } }; // Determine image type and move to storage let image_type = classify_image(filename); let mime = mime_for_image(filename); // Get album's storage path from any track in that album let storage_dir_opt: Option<(String,)> = sqlx::query_as( "SELECT storage_path FROM tracks WHERE album_id = $1 LIMIT 1" ) .bind(album_id) .fetch_optional(&state.pool) .await?; let album_storage_dir = match storage_dir_opt { Some((track_path,)) => { let p = std::path::Path::new(&track_path); match p.parent() { Some(dir) if dir.is_dir() => dir.to_path_buf(), _ => { tracing::warn!(file = filename, track_path = %track_path, "Track storage path has no valid parent dir"); return Ok(false); } } } None => { tracing::info!(file = filename, album_id, "Album has no tracks in storage yet, skipping cover"); return Ok(false); } }; tracing::info!(file = filename, dest_dir = ?album_storage_dir, "Will copy cover to album storage dir"); let dest = album_storage_dir.join(filename); if !dest.exists() { // Move or copy image match tokio::fs::rename(image_path, &dest).await { Ok(()) => {} Err(_) => { tokio::fs::copy(image_path, &dest).await?; tokio::fs::remove_file(image_path).await?; } } } let dest_str = dest.to_string_lossy().to_string(); db::insert_album_image(&state.pool, album_id, image_type, &dest_str, &hash, mime, file_size).await?; tracing::info!( file = filename, album_id, image_type, dest = %dest_str, "Album image saved" ); Ok(true) } /// Find an album in DB matching the path-derived artist and album name. /// Tries exact match, then fuzzy artist + exact album, then fuzzy artist + fuzzy album. async fn find_album_for_cover(pool: &sqlx::PgPool, path_artist: &str, album_name: &str) -> anyhow::Result> { // Try exact match first if let Some(id) = db::find_album_id(pool, path_artist, album_name).await? { return Ok(Some(id)); } // Try fuzzy artist, then exact or fuzzy album under that artist let similar_artists = db::find_similar_artists(pool, path_artist, 5).await.unwrap_or_default(); for artist in &similar_artists { if artist.similarity < 0.3 { continue; } // Exact album under fuzzy artist if let Some(id) = db::find_album_id(pool, &artist.name, album_name).await? { return Ok(Some(id)); } // Fuzzy album under this artist let similar_albums = db::find_similar_albums(pool, album_name, 3).await.unwrap_or_default(); for album in &similar_albums { if album.artist_id == artist.id && album.similarity > 0.4 { return Ok(Some(album.id)); } } } Ok(None) } /// Remove characters that are unsafe for filenames. fn sanitize_filename(name: &str) -> String { name.chars() .map(|c| match c { '/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '_', _ => c, }) .collect::() .trim() .to_owned() }