Furumi init

This commit is contained in:
2026-05-23 13:08:09 +03:00
parent b8afaa1864
commit 8912c51165
42 changed files with 14279 additions and 54 deletions
+58
View File
@@ -0,0 +1,58 @@
use crate::scheduler::{Job, JobContext, JobLog};
/// Periodic job that auto-assigns artist images from their release covers.
///
/// For every artist that has no `image_file_id`, picks the cover of the most
/// recent release (by year) that has one. Runs after the cover backfill job
/// so freshly-extracted covers are available.
pub struct ArtistImageBackfillJob;
#[async_trait::async_trait]
impl Job for ArtistImageBackfillJob {
fn name(&self) -> &'static str {
"artist_image_backfill"
}
fn description(&self) -> &'static str {
"Auto-assign artist images from release covers"
}
fn default_cron(&self) -> &'static str {
// 03:15 daily — after cover_backfill at 03:00
"0 15 3 * * *"
}
async fn run(&self, ctx: &JobContext, log: &mut JobLog) -> anyhow::Result<()> {
let result = sqlx::query(
"UPDATE furumusic__artist a \
SET image_file_id = ( \
SELECT r.cover_file_id \
FROM furumusic__release_artist ra \
JOIN furumusic__release r ON r.id = ra.release_id \
WHERE ra.artist_id = a.id \
AND r.cover_file_id IS NOT NULL \
ORDER BY r.year DESC NULLS LAST \
LIMIT 1 \
), \
updated_at = $1 \
WHERE a.image_file_id IS NULL \
AND EXISTS ( \
SELECT 1 FROM furumusic__release_artist ra2 \
JOIN furumusic__release r2 ON r2.id = ra2.release_id \
WHERE ra2.artist_id = a.id AND r2.cover_file_id IS NOT NULL \
)",
)
.bind(chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string())
.execute(&ctx.pool)
.await?;
let count = result.rows_affected();
if count > 0 {
log.info(&format!("Assigned images to {count} artists from release covers"));
} else {
log.info("All artists already have images (or no covers available)");
}
Ok(())
}
}
+69
View File
@@ -0,0 +1,69 @@
use crate::scheduler::{Job, JobContext, JobLog};
/// Fallback job that assigns artist images from track cover art.
///
/// The primary `artist_image_backfill` job uses release covers. This job
/// runs afterwards and covers the case where the release itself has no
/// cover but individual tracks do (e.g. when cover art is embedded in the
/// audio file and extracted per-track rather than per-release).
///
/// For every artist that *still* has no `image_file_id` after the release-
/// based backfill, picks the `cover_file_id` of the most recent track
/// (by year, then track id) that has one.
pub struct ArtistTrackImageBackfillJob;
#[async_trait::async_trait]
impl Job for ArtistTrackImageBackfillJob {
fn name(&self) -> &'static str {
"artist_track_image_backfill"
}
fn description(&self) -> &'static str {
"Auto-assign artist images from track covers (fallback)"
}
fn default_cron(&self) -> &'static str {
// 03:30 daily — after artist_image_backfill at 03:15
"0 30 3 * * *"
}
async fn run(&self, ctx: &JobContext, log: &mut JobLog) -> anyhow::Result<()> {
let result = sqlx::query(
"UPDATE furumusic__artist a \
SET image_file_id = ( \
SELECT t.cover_file_id \
FROM furumusic__track_artist ta \
JOIN furumusic__track t ON t.id = ta.track_id \
WHERE ta.artist_id = a.id \
AND t.cover_file_id IS NOT NULL \
AND t.is_hidden = false \
ORDER BY t.year DESC NULLS LAST, t.id DESC \
LIMIT 1 \
), \
updated_at = $1 \
WHERE a.image_file_id IS NULL \
AND a.is_hidden = false \
AND EXISTS ( \
SELECT 1 FROM furumusic__track_artist ta2 \
JOIN furumusic__track t2 ON t2.id = ta2.track_id \
WHERE ta2.artist_id = a.id \
AND t2.cover_file_id IS NOT NULL \
AND t2.is_hidden = false \
)",
)
.bind(chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string())
.execute(&ctx.pool)
.await?;
let count = result.rows_affected();
if count > 0 {
log.info(&format!(
"Assigned images to {count} artists from track covers"
));
} else {
log.info("All artists already have images (or no track covers available)");
}
Ok(())
}
}
+172
View File
@@ -0,0 +1,172 @@
use std::path::{Path, PathBuf};
use crate::agent::cover_art;
use crate::scheduler::{Job, JobContext, JobLog};
/// One-shot / periodic job that finds releases without cover art and attempts
/// to extract or discover covers from their audio files in storage.
pub struct CoverBackfillJob;
#[async_trait::async_trait]
impl Job for CoverBackfillJob {
fn name(&self) -> &'static str {
"cover_backfill"
}
fn description(&self) -> &'static str {
"Backfill cover art for releases missing covers"
}
fn default_cron(&self) -> &'static str {
// Once a day at 03:00
"0 0 3 * * *"
}
async fn run(&self, ctx: &JobContext, log: &mut JobLog) -> anyhow::Result<()> {
let storage_dir = &ctx.config.agent_storage_dir;
if storage_dir.is_empty() {
log.warn("agent_storage_dir is not configured, skipping cover backfill");
return Ok(());
}
// Find all releases without a cover
let rows: Vec<(i64, String)> = sqlx::query_as(
"SELECT r.id, r.title \
FROM furumusic__release r \
WHERE r.cover_file_id IS NULL \
ORDER BY r.id",
)
.fetch_all(&ctx.pool)
.await?;
if rows.is_empty() {
log.info("All releases already have cover art, nothing to backfill");
return Ok(());
}
log.info(&format!(
"Found {} releases without cover art, starting backfill...",
rows.len()
));
let mut assigned = 0u32;
let mut failed = 0u32;
let mut skipped_no_audio = 0u32;
let mut skipped_no_cover = 0u32;
let total = rows.len();
for (i, (release_id, release_title)) in rows.iter().enumerate() {
log.info(&format!(
"[{}/{}] Processing release {release_id} \"{release_title}\"...",
i + 1,
total,
));
// Find audio files belonging to this release via tracks → media_file
let audio_paths: Vec<(String,)> = sqlx::query_as(
"SELECT mf.file_path \
FROM furumusic__track t \
JOIN furumusic__media_file mf ON mf.id = t.audio_file_id \
WHERE t.release_id = $1 AND mf.file_type = 'audio'",
)
.bind(release_id)
.fetch_all(&ctx.pool)
.await
.unwrap_or_default();
if audio_paths.is_empty() {
log.warn(&format!(
"Release {release_id} \"{release_title}\": no audio files found, skipping"
));
skipped_no_audio += 1;
continue;
}
// Determine the folder from the first audio file's path
let first_path = Path::new(&audio_paths[0].0);
let folder = first_path.parent().unwrap_or(Path::new("."));
// Collect all audio file paths as PathBuf
let audio_files: Vec<PathBuf> = audio_paths
.iter()
.map(|(p,)| PathBuf::from(p))
.collect();
// Try to find cover art
let cover = match cover_art::find_best_cover(folder, &audio_files).await {
Some(c) => c,
None => {
log.info(&format!(
"Release {release_id} \"{release_title}\": no cover image found in {} audio files, skipping",
audio_files.len(),
));
skipped_no_cover += 1;
continue;
}
};
let source_desc = match &cover.source {
cover_art::CoverSource::FolderFile(p) => format!("folder: {}", p.display()),
cover_art::CoverSource::Embedded(p) => format!("embedded: {}", p.display()),
};
// Look up artist name for storage path
let artist_name: String = sqlx::query_scalar(
"SELECT a.name FROM furumusic__artist a \
JOIN furumusic__release_artist ra ON ra.artist_id = a.id \
WHERE ra.release_id = $1 \
ORDER BY ra.position LIMIT 1",
)
.bind(release_id)
.fetch_optional(&ctx.pool)
.await
.ok()
.flatten()
.unwrap_or_else(|| "Unknown Artist".to_string());
match cover_art::save_cover_to_storage(
&ctx.db,
&ctx.pool,
storage_dir,
&artist_name,
release_title,
&cover,
)
.await
{
Ok(cover_file_id) => {
if let Err(e) = cover_art::assign_cover_to_release(
&ctx.pool,
*release_id,
cover_file_id,
)
.await
{
log.warn(&format!(
"Release {release_id} \"{release_title}\": saved cover but failed to assign: {e}"
));
failed += 1;
} else {
log.info(&format!(
"Release {release_id} \"{release_title}\": assigned cover from {source_desc}"
));
assigned += 1;
}
}
Err(e) => {
log.warn(&format!(
"Release {release_id} \"{release_title}\": failed to save cover: {e}"
));
failed += 1;
}
}
}
log.info(&format!(
"Cover backfill complete: {assigned} assigned, {failed} failed, \
{skipped_no_audio} skipped (no audio), {skipped_no_cover} skipped (no cover found)"
));
Ok(())
}
}
+240
View File
@@ -0,0 +1,240 @@
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use sha2::{Digest, Sha256};
use crate::scheduler::{Job, JobContext, JobLog, PendingReview};
/// Guard to prevent overlapping inbox_discover runs.
static DISCOVER_RUNNING: AtomicBool = AtomicBool::new(false);
const AUDIO_EXTENSIONS: &[&str] = &[
"mp3", "flac", "ogg", "opus", "aac", "m4a", "wav", "ape", "wv", "wma", "tta", "aiff", "aif",
];
pub struct InboxDiscoverJob;
#[async_trait::async_trait]
impl Job for InboxDiscoverJob {
fn name(&self) -> &'static str {
"inbox_discover"
}
fn description(&self) -> &'static str {
"Scan inbox for new audio files and queue them for processing"
}
fn default_cron(&self) -> &'static str {
"0 */5 * * * *"
}
async fn run(&self, ctx: &JobContext, log: &mut JobLog) -> anyhow::Result<()> {
// Prevent overlapping discover runs
if DISCOVER_RUNNING.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
log.info("Another inbox_discover is already running, skipping");
return Ok(());
}
struct Guard;
impl Drop for Guard {
fn drop(&mut self) {
DISCOVER_RUNNING.store(false, Ordering::SeqCst);
}
}
let _guard = Guard;
let config = &ctx.config;
if config.agent_inbox_dir.is_empty() {
log.info("No inbox directory configured, skipping");
return Ok(());
}
let inbox = Path::new(&config.agent_inbox_dir);
if !inbox.exists() {
log.warn(&format!("Inbox path does not exist: {}", inbox.display()));
return Ok(());
}
let mut audio_files = Vec::new();
collect_audio_files(inbox, &mut audio_files).await?;
log.info(&format!("Found {} audio files in inbox", audio_files.len()));
if audio_files.is_empty() {
return Ok(());
}
let groups = group_by_folder(&audio_files);
log.info(&format!("Grouped into {} folder batches", groups.len()));
let mut discovered = 0u64;
let mut skipped_hash = 0u64;
let mut skipped_existing = 0u64;
for (_folder, files) in &groups {
for file_path in files {
let input_path_str = file_path.to_string_lossy().to_string();
// Skip if a PendingReview already exists for this path
match PendingReview::exists_for_path(&ctx.db, &input_path_str).await {
Ok(true) => {
skipped_existing += 1;
continue;
}
Ok(false) => {}
Err(e) => {
log.warn(&format!("Error checking existing review for {}: {e}", input_path_str));
continue;
}
}
// Compute SHA-256 hash
let path_clone = file_path.to_path_buf();
let (hash, file_size) = match tokio::task::spawn_blocking(move || -> anyhow::Result<(String, i64)> {
let data = std::fs::read(&path_clone)?;
let digest = Sha256::digest(&data);
let hash = format!("{:x}", digest);
let size = data.len() as i64;
Ok((hash, size))
})
.await?
{
Ok(v) => v,
Err(e) => {
log.warn(&format!("Failed to hash {}: {e}", file_path.display()));
continue;
}
};
// Skip if hash already in media_files
if crate::agent::rag::file_hash_exists(&ctx.pool, &hash).await.unwrap_or(false) {
skipped_hash += 1;
continue;
}
// Extract raw metadata
let path_for_meta = file_path.to_path_buf();
let raw_meta = match tokio::task::spawn_blocking(move || {
crate::agent::metadata::extract(&path_for_meta)
})
.await?
{
Ok(m) => m,
Err(e) => {
log.warn(&format!("Failed to extract metadata from {}: {e}", file_path.display()));
continue;
}
};
// Parse path hints
let relative = file_path.strip_prefix(inbox).unwrap_or(file_path);
let hints = crate::agent::path_hints::parse(relative);
// Build context JSON
let context = serde_json::json!({
"sha256": hash,
"file_size": file_size,
"raw_title": raw_meta.title,
"raw_artist": raw_meta.artist,
"raw_album": raw_meta.album,
"raw_track_number": raw_meta.track_number,
"raw_year": raw_meta.year,
"raw_genre": raw_meta.genre,
"duration_secs": raw_meta.duration_secs,
"path_title": hints.title,
"path_artist": hints.artist,
"path_album": hints.album,
"path_year": hints.year,
"path_track_number": hints.track_number,
});
let context_str = serde_json::to_string(&context).unwrap_or_default();
// Create PendingReview with status "queued"
PendingReview::create_queued(
&ctx.db,
ctx.run_id,
"new_file",
Some(&input_path_str),
Some(&context_str),
)
.await
.map_err(|e| anyhow::anyhow!("failed to create queued review: {e}"))?;
discovered += 1;
}
}
log.info(&format!(
"Discovered {} new files, skipped {} (hash known), skipped {} (already queued)",
discovered, skipped_hash, skipped_existing
));
// Trigger inbox_process in background if new files were discovered
// and no orchestrator is already running
if discovered > 0 {
if crate::jobs::inbox_process::is_orchestrator_running() {
log.info("New files discovered but inbox_process already running, it will pick them up");
} else {
log.info("Spawning inbox_process in background...");
let config = ctx.config.clone();
let db = ctx.db.clone();
let pool = ctx.pool.clone();
let registry = ctx.registry.clone();
tokio::spawn(async move {
if let Err(e) = crate::scheduler::trigger_job_now(
&config, &db, &pool, &registry, "inbox_process",
)
.await
{
tracing::error!("Background inbox_process trigger failed: {e}");
}
});
}
}
Ok(())
}
}
// ---------------------------------------------------------------------------
// Helpers (moved from inbox_scan.rs)
// ---------------------------------------------------------------------------
pub fn group_by_folder(files: &[PathBuf]) -> Vec<(PathBuf, Vec<PathBuf>)> {
use std::collections::HashMap;
let mut map: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
for f in files {
let folder = f.parent().unwrap_or(f).to_path_buf();
map.entry(folder).or_default().push(f.clone());
}
let mut groups: Vec<(PathBuf, Vec<PathBuf>)> = map.into_iter().collect();
groups.sort_by(|a, b| a.0.cmp(&b.0));
for (_, files) in &mut groups {
files.sort();
}
groups
}
pub async fn collect_audio_files(
dir: &Path,
audio: &mut Vec<PathBuf>,
) -> anyhow::Result<()> {
let mut entries = tokio::fs::read_dir(dir).await?;
while let Some(entry) = entries.next_entry().await? {
let name = entry.file_name().to_string_lossy().into_owned();
if name.starts_with('.') {
continue;
}
let ft = entry.file_type().await?;
if ft.is_dir() {
Box::pin(collect_audio_files(&entry.path(), audio)).await?;
} else if ft.is_file() && is_audio_file(&name) {
audio.push(entry.path());
}
}
Ok(())
}
pub fn is_audio_file(name: &str) -> bool {
let ext = name.rsplit('.').next().unwrap_or("").to_lowercase();
AUDIO_EXTENSIONS.contains(&ext.as_str())
}
+957
View File
@@ -0,0 +1,957 @@
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use cot::db::{Database, Model};
/// Guard to prevent multiple inbox_process orchestrators from running simultaneously.
static ORCHESTRATOR_RUNNING: AtomicBool = AtomicBool::new(false);
/// Well-known advisory lock ID for the inbox_process orchestrator.
/// PostgreSQL advisory locks use a 64-bit key; this is an arbitrary unique value.
const ORCHESTRATOR_ADVISORY_LOCK_ID: i64 = 0x4655_5255_4D55_5349; // "FURUMUSI" in hex
/// Check if an orchestrator is currently running (used by inbox_discover to avoid redundant triggers).
pub fn is_orchestrator_running() -> bool {
ORCHESTRATOR_RUNNING.load(Ordering::SeqCst)
}
/// Try to acquire the PostgreSQL advisory lock for the orchestrator.
/// Returns true if the lock was acquired (no other orchestrator is running).
async fn try_acquire_orchestrator_lock(pool: &sqlx::PgPool) -> bool {
match sqlx::query_scalar::<_, bool>(
"SELECT pg_try_advisory_lock($1)"
)
.bind(ORCHESTRATOR_ADVISORY_LOCK_ID)
.fetch_one(pool)
.await
{
Ok(acquired) => acquired,
Err(e) => {
tracing::error!("Failed to acquire advisory lock: {e}");
false
}
}
}
/// Release the PostgreSQL advisory lock for the orchestrator.
async fn release_orchestrator_lock(pool: &sqlx::PgPool) {
let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
.bind(ORCHESTRATOR_ADVISORY_LOCK_ID)
.execute(pool)
.await;
}
use crate::config::AppConfig;
use crate::music::{
Artist, MediaFile, Release, ReleaseArtist, Track, TrackArtist,
};
use crate::scheduler::{Job, JobContext, JobLog, JobRun, PendingReview, ProcessingStats};
use crate::agent::dto::{FolderContext, NormalizedFields, RawMetadata, PathHints};
use crate::agent::normalize::BatchFileInput;
use crate::agent::mover;
const AUDIO_EXTENSIONS: &[&str] = &[
"mp3", "flac", "ogg", "opus", "aac", "m4a", "wav", "ape", "wv", "wma", "tta", "aiff", "aif",
];
// ---------------------------------------------------------------------------
// InboxProcessJob — orchestrator that runs until ALL queued files are done
// ---------------------------------------------------------------------------
pub struct InboxProcessJob;
#[async_trait::async_trait]
impl Job for InboxProcessJob {
fn name(&self) -> &'static str {
"inbox_process"
}
fn description(&self) -> &'static str {
"Orchestrator: process queued files in folder batches"
}
fn default_cron(&self) -> &'static str {
"30 */5 * * * *"
}
async fn run(&self, ctx: &JobContext, log: &mut JobLog) -> anyhow::Result<()> {
// --- Guard 1: AtomicBool (fast in-process check) ---
let prev = ORCHESTRATOR_RUNNING.load(Ordering::SeqCst);
tracing::info!(
previous_value = prev,
"inbox_process: checking ORCHESTRATOR_RUNNING AtomicBool"
);
if ORCHESTRATOR_RUNNING.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
log.info("Another inbox_process orchestrator is already running (AtomicBool), skipping");
return Ok(());
}
struct AtomicGuard;
impl Drop for AtomicGuard {
fn drop(&mut self) {
tracing::info!("inbox_process: releasing ORCHESTRATOR_RUNNING AtomicBool");
ORCHESTRATOR_RUNNING.store(false, Ordering::SeqCst);
}
}
let _atomic_guard = AtomicGuard;
// --- Guard 2: PostgreSQL advisory lock (cross-process/binary safe) ---
if !try_acquire_orchestrator_lock(&ctx.pool).await {
log.info("Another inbox_process orchestrator holds the advisory lock, skipping");
return Ok(());
}
tracing::info!("inbox_process: advisory lock acquired");
let pool_for_unlock = ctx.pool.clone();
struct AdvisoryGuard {
pool: sqlx::PgPool,
}
impl Drop for AdvisoryGuard {
fn drop(&mut self) {
let pool = self.pool.clone();
tokio::spawn(async move {
release_orchestrator_lock(&pool).await;
tracing::info!("inbox_process: advisory lock released");
});
}
}
let _advisory_guard = AdvisoryGuard { pool: pool_for_unlock };
let config = Arc::clone(&ctx.config);
let mut total_ok = 0u64;
let mut total_fail = 0u64;
// Outer loop: re-check for newly queued files after each round
loop {
let queued = PendingReview::list_queued(&ctx.db)
.await
.map_err(|e| anyhow::anyhow!("failed to list queued reviews: {e}"))?;
if queued.is_empty() {
if total_ok == 0 && total_fail == 0 {
log.info("No queued files to process");
} else {
log.info("No more queued files, finishing");
}
break;
}
// Group queued reviews by parent folder
let groups = group_reviews_by_folder(&queued, &config.agent_inbox_dir);
log.info(&format!(
"{} queued file(s) in {} folder batch(es)",
queued.len(),
groups.len(),
));
for (folder_rel, reviews) in groups {
let file_count = reviews.len();
log.info(&format!(
"Folder batch: \"{}\" ({} files)",
folder_rel, file_count,
));
let (ok, fail) = process_folder_batch(
&ctx.db, &config, &ctx.pool, &folder_rel, reviews, log,
).await;
total_ok += ok;
total_fail += fail;
log.info(&format!(
"Folder done: {ok} ok, {fail} err. Total so far: {total_ok} ok, {total_fail} err"
));
}
}
// Cleanup empty dirs
let inbox_path = Path::new(&config.agent_inbox_dir);
if total_ok > 0 && !config.agent_inbox_dir.is_empty() {
cleanup_empty_dirs(inbox_path).await;
}
log.info(&format!(
"Orchestrator finished: {total_ok} succeeded, {total_fail} failed"
));
Ok(())
}
}
// ---------------------------------------------------------------------------
// FileProcessJob — registered for admin UI visibility (no cron, never auto-triggered)
// ---------------------------------------------------------------------------
pub struct FileProcessJob;
#[async_trait::async_trait]
impl Job for FileProcessJob {
fn name(&self) -> &'static str {
"file_process"
}
fn description(&self) -> &'static str {
"Process audio files through LLM (spawned by orchestrator)"
}
fn default_cron(&self) -> &'static str {
"" // no cron — only spawned by the orchestrator
}
async fn run(&self, _ctx: &JobContext, _log: &mut JobLog) -> anyhow::Result<()> {
Ok(())
}
}
// ---------------------------------------------------------------------------
// Prepared file — metadata extracted, ready for LLM
// ---------------------------------------------------------------------------
struct PreparedFile {
review: PendingReview,
filename: String,
raw_meta: RawMetadata,
hints: PathHints,
context: serde_json::Value,
}
// ---------------------------------------------------------------------------
// Group reviews by parent folder
// ---------------------------------------------------------------------------
fn group_reviews_by_folder(
reviews: &[PendingReview],
inbox_dir: &str,
) -> Vec<(String, Vec<PendingReview>)> {
let inbox = Path::new(inbox_dir);
let mut map: HashMap<String, Vec<PendingReview>> = HashMap::new();
for r in reviews {
let path = Path::new(r.input_path_str());
let folder = path.parent().unwrap_or(path);
let rel = folder.strip_prefix(inbox).unwrap_or(folder);
let key = rel.to_string_lossy().to_string();
map.entry(key).or_default().push(r.clone());
}
let mut groups: Vec<(String, Vec<PendingReview>)> = map.into_iter().collect();
groups.sort_by(|a, b| a.0.cmp(&b.0));
// Sort files within each group by path
for (_, reviews) in &mut groups {
reviews.sort_by(|a, b| a.input_path_str().cmp(b.input_path_str()));
}
groups
}
// ---------------------------------------------------------------------------
// Process one folder batch
// ---------------------------------------------------------------------------
async fn process_folder_batch(
db: &Database,
config: &AppConfig,
pool: &sqlx::PgPool,
folder_rel: &str,
reviews: Vec<PendingReview>,
orch_log: &mut JobLog,
) -> (u64, u64) {
let inbox_path = Path::new(&config.agent_inbox_dir);
let file_count = reviews.len();
// Create a single JobRun for the folder batch
let trigger_label = if folder_rel.is_empty() {
format!("batch({})", file_count)
} else {
let short = truncate_path(folder_rel, 20);
format!("{short}({})", file_count)
};
let mut run = match JobRun::create_running(db, "file_process", &trigger_label).await {
Ok(r) => r,
Err(e) => {
orch_log.error(&format!("Failed to create batch JobRun: {e}"));
return (0, file_count as u64);
}
};
let batch_start = std::time::Instant::now();
let mut log = JobLog::with_live_flush(pool.clone(), run.id_val());
log.info(&format!(
"Folder batch: \"{folder_rel}\"{file_count} file(s)"
));
// Phase 1: Prepare all files (extract metadata, parse hints)
log.info("Phase 1: extracting metadata...");
let mut prepared: Vec<PreparedFile> = Vec::with_capacity(file_count);
let mut failed_reviews: Vec<PendingReview> = Vec::new();
for mut review in reviews {
let input_path_str = review.input_path_str().to_owned();
let file_path = Path::new(&input_path_str);
let filename = file_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_owned();
// Set status → processing
let _ = review.set_processing(db).await;
// Parse context_json
let context: serde_json::Value = review
.context_json
.as_deref()
.and_then(|s| serde_json::from_str(s).ok())
.unwrap_or_default();
// Extract metadata (with 60s timeout)
let path_for_meta = file_path.to_path_buf();
let meta_future = tokio::task::spawn_blocking(move || {
crate::agent::metadata::extract(&path_for_meta)
});
let raw_meta = match tokio::time::timeout(
std::time::Duration::from_secs(60),
meta_future,
).await {
Ok(Ok(Ok(m))) => m,
Ok(Ok(Err(e))) => {
let msg = format!("{filename}: metadata error: {e}");
log.error(&msg);
let _ = review.set_failed(db, &msg).await;
failed_reviews.push(review);
continue;
}
Ok(Err(e)) => {
let msg = format!("{filename}: metadata panic: {e}");
log.error(&msg);
let _ = review.set_failed(db, &msg).await;
failed_reviews.push(review);
continue;
}
Err(_) => {
let msg = format!("{filename}: metadata timeout (60s)");
log.error(&msg);
let _ = review.set_failed(db, &msg).await;
failed_reviews.push(review);
continue;
}
};
// Parse path hints
let relative = file_path.strip_prefix(inbox_path).unwrap_or(file_path);
let hints = crate::agent::path_hints::parse(relative);
prepared.push(PreparedFile {
review,
filename,
raw_meta,
hints,
context,
});
}
log.info(&format!(
"Phase 1 done: {} prepared, {} failed metadata",
prepared.len(),
failed_reviews.len(),
));
if prepared.is_empty() {
let duration_ms = batch_start.elapsed().as_millis() as i64;
let _ = run.set_completed(db, duration_ms, &log.output()).await;
return (0, failed_reviews.len() as u64);
}
// Phase 2: RAG lookup (collect unique artist/album queries from all files)
log.info("Phase 2: RAG lookup...");
let mut artist_queries: Vec<String> = Vec::new();
let mut album_queries: Vec<String> = Vec::new();
for p in &prepared {
let artist_q = p.raw_meta.artist.as_deref()
.or(p.hints.artist.as_deref())
.unwrap_or("")
.to_owned();
if !artist_q.is_empty() && !artist_queries.contains(&artist_q) {
artist_queries.push(artist_q);
}
let album_q = p.raw_meta.album.as_deref()
.or(p.hints.album.as_deref())
.unwrap_or("")
.to_owned();
if !album_q.is_empty() && !album_queries.contains(&album_q) {
album_queries.push(album_q);
}
}
// Lookup all unique artist queries
let mut all_similar_artists = Vec::new();
for q in &artist_queries {
match tokio::time::timeout(
std::time::Duration::from_secs(30),
crate::agent::rag::find_similar_artists(pool, q, 5),
).await {
Ok(Ok(results)) => {
for a in results {
if !all_similar_artists.iter().any(|x: &crate::agent::dto::SimilarArtist| x.id == a.id) {
all_similar_artists.push(a);
}
}
}
Ok(Err(e)) => log.warn(&format!("RAG artist lookup failed for \"{q}\": {e}")),
Err(_) => log.warn(&format!("RAG artist lookup timed out for \"{q}\"")),
}
}
let mut all_similar_releases = Vec::new();
for q in &album_queries {
match tokio::time::timeout(
std::time::Duration::from_secs(30),
crate::agent::rag::find_similar_releases(pool, q, 5),
).await {
Ok(Ok(results)) => {
for r in results {
if !all_similar_releases.iter().any(|x: &crate::agent::dto::SimilarRelease| x.id == r.id) {
all_similar_releases.push(r);
}
}
}
Ok(Err(e)) => log.warn(&format!("RAG release lookup failed for \"{q}\": {e}")),
Err(_) => log.warn(&format!("RAG release lookup timed out for \"{q}\"")),
}
}
log.info(&format!(
"Phase 2 done: {} similar artists, {} similar releases",
all_similar_artists.len(),
all_similar_releases.len(),
));
// Phase 3: Build folder context and call batch LLM
log.info("Phase 3: calling LLM (batch)...");
// Build folder context from the first file's folder
let folder_ctx = {
let first_path = Path::new(prepared[0].review.input_path_str());
let folder = first_path.parent().unwrap_or(first_path);
let mut folder_files: Vec<String> = std::fs::read_dir(folder)
.ok()
.map(|rd| {
rd.filter_map(|e| e.ok())
.filter_map(|e| {
let name = e.file_name().to_string_lossy().into_owned();
let ext = name.rsplit('.').next().unwrap_or("").to_lowercase();
if AUDIO_EXTENSIONS.contains(&ext.as_str()) {
Some(name)
} else {
None
}
})
.collect()
})
.unwrap_or_default();
folder_files.sort();
let track_count = folder_files.len();
FolderContext {
folder_path: folder_rel.to_owned(),
folder_files,
track_count,
}
};
// Build batch input
let batch_files: Vec<BatchFileInput> = prepared.iter().map(|p| {
BatchFileInput {
filename: p.filename.clone(),
raw: RawMetadata {
title: p.raw_meta.title.clone(),
artist: p.raw_meta.artist.clone(),
album: p.raw_meta.album.clone(),
track_number: p.raw_meta.track_number,
year: p.raw_meta.year,
genre: p.raw_meta.genre.clone(),
duration_secs: p.raw_meta.duration_secs,
},
hints: PathHints {
title: p.hints.title.clone(),
artist: p.hints.artist.clone(),
album: p.hints.album.clone(),
year: p.hints.year,
track_number: p.hints.track_number,
},
}
}).collect();
let system_prompt = include_str!("../../prompts/normalize_batch.txt");
let context_limit = config.agent_context_limit;
let llm_result = crate::agent::normalize::normalize_batch(
&config.agent_llm_url,
&config.agent_llm_model,
&config.agent_llm_auth,
system_prompt,
context_limit,
batch_files,
&all_similar_artists,
&all_similar_releases,
Some(&folder_ctx),
).await;
let batch_result = match llm_result {
Ok(r) => r,
Err(e) => {
let err_msg = format!("Batch LLM call failed: {e}");
log.error(&err_msg);
// Mark all files as failed
for mut p in prepared {
let _ = p.review.set_failed(db, &err_msg).await;
}
let total_fail_count = failed_reviews.len() as u64 + file_count as u64;
let duration_ms = batch_start.elapsed().as_millis() as i64;
let _ = run.set_failed(db, duration_ms, &log.output(), &err_msg).await;
return (0, total_fail_count);
}
};
log.info(&format!(
"Phase 3 done: LLM returned {} results in {}ms (model={}, tokens={}/{})",
batch_result.results.len(),
batch_result.duration_ms,
batch_result.model,
batch_result.prompt_tokens,
batch_result.completion_tokens,
));
// Phase 4: Match results to files and finalize
log.info("Phase 4: finalizing...");
// Build lookup map: filename → NormalizedFields
let result_map: HashMap<String, NormalizedFields> = batch_result.results
.into_iter()
.collect();
let llm_model = &batch_result.model;
let prompt_per_file = batch_result.prompt_tokens / prepared.len().max(1) as u64;
let completion_per_file = batch_result.completion_tokens / prepared.len().max(1) as u64;
let duration_per_file = batch_result.duration_ms as i64 / prepared.len().max(1) as i64;
let mut ok_count = 0u64;
let mut fail_count = failed_reviews.len() as u64;
for mut p in prepared {
let filename = &p.filename;
let normalized = match result_map.get(filename) {
Some(n) => n,
None => {
let msg = format!("LLM returned no result for \"{filename}\"");
log.error(&msg);
let _ = p.review.set_failed(db, &msg).await;
fail_count += 1;
continue;
}
};
// Record processing stats
let _ = ProcessingStats::create(
db,
p.review.id_val(),
llm_model,
duration_per_file,
prompt_per_file as i64,
completion_per_file as i64,
).await;
let result_json = serde_json::to_string(normalized).unwrap_or_default();
let confidence = normalized.confidence.unwrap_or(0.0);
let feat = if normalized.featured_artists.is_empty() {
String::new()
} else {
format!(" feat=[{}]", normalized.featured_artists.join(", "))
};
log.info(&format!(
"{filename}: artist={} | album={} | title={} | track={} | year={} | conf={}{}",
normalized.artist.as_deref().unwrap_or("-"),
normalized.album.as_deref().unwrap_or("-"),
normalized.title.as_deref().unwrap_or("-"),
normalized.track_number.map_or("-".into(), |n| n.to_string()),
normalized.year.map_or("-".into(), |y| y.to_string()),
confidence,
feat,
));
p.review.result_json = Some(result_json);
let _ = p.review.save(db).await;
let input_path_str = p.review.input_path_str().to_owned();
if confidence >= config.agent_confidence_threshold {
match finalize_approved(
db, pool, config, &input_path_str, normalized, &p.context,
&config.agent_storage_dir, Some(llm_model),
).await {
Ok(()) => {
let _ = p.review.set_auto_approved(db).await;
ok_count += 1;
}
Err(e) => {
let msg = format!("{filename}: finalize failed: {e}");
log.error(&msg);
let _ = p.review.set_failed(db, &msg).await;
fail_count += 1;
}
}
} else {
p.review.status = cot::db::LimitedString::new("pending").unwrap();
p.review.updated_at = cot::db::LimitedString::new(
&chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
).unwrap();
let _ = p.review.save(db).await;
log.info(&format!(
"{filename}: manual review (confidence {confidence} < {})",
config.agent_confidence_threshold,
));
ok_count += 1; // Not a failure, just needs review
}
}
let duration_ms = batch_start.elapsed().as_millis() as i64;
if fail_count == 0 {
let _ = run.set_completed(db, duration_ms, &log.output()).await;
} else {
let msg = format!("{fail_count} file(s) failed");
let _ = run.set_failed(db, duration_ms, &log.output(), &msg).await;
}
(ok_count, fail_count)
}
// ---------------------------------------------------------------------------
// Finalization (called on approve or auto-approve)
// ---------------------------------------------------------------------------
pub async fn finalize_approved(
db: &cot::db::Database,
pool: &sqlx::PgPool,
_config: &crate::config::AppConfig,
input_path_str: &str,
normalized: &NormalizedFields,
context: &serde_json::Value,
storage_dir_str: &str,
model_name: Option<&str>,
) -> anyhow::Result<()> {
let artist_name = normalized.artist.as_deref().unwrap_or("Unknown Artist");
let release_title = normalized.album.as_deref().unwrap_or("Unknown Release");
let track_title = normalized.title.as_deref().unwrap_or("Unknown Title");
let release_type = normalized.release_type.as_deref().unwrap_or("album");
let year = normalized.year;
let track_number = normalized.track_number;
let artist = find_or_create_artist(db, artist_name, model_name).await?;
let release = find_or_create_release(db, release_title, release_type, year, model_name).await?;
// Link ReleaseArtist
let existing_links = ReleaseArtist::find_by_release(db, release.id_val())
.await
.unwrap_or_default();
let already_linked = existing_links
.iter()
.any(|l| l.artist_id() == artist.id_val());
if !already_linked {
let position = existing_links.len() as i32;
let mut link = ReleaseArtist {
id: cot::db::Auto::auto(),
release_id: release.id_val(),
artist_id: artist.id_val(),
position,
};
link.insert(db)
.await
.map_err(|e| anyhow::anyhow!("failed to link release-artist: {e}"))?;
}
let sha256 = context
.get("sha256")
.and_then(|v| v.as_str())
.unwrap_or("");
let file_size = context
.get("file_size")
.and_then(|v| v.as_i64())
.unwrap_or(0);
let duration_secs = context
.get("duration_secs")
.and_then(|v| v.as_f64())
.unwrap_or(0.0);
let source_path = Path::new(input_path_str);
let original_filename = source_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown");
let ext = source_path
.extension()
.and_then(|e| e.to_str())
.unwrap_or("flac");
let mime_type = match ext.to_lowercase().as_str() {
"mp3" => "audio/mpeg",
"flac" => "audio/flac",
"ogg" | "opus" => "audio/ogg",
"aac" | "m4a" => "audio/mp4",
"wav" => "audio/wav",
"aiff" | "aif" => "audio/aiff",
_ => "application/octet-stream",
};
let track_num = track_number.unwrap_or(0);
let dest_filename = if track_num > 0 {
format!(
"{:02} - {}.{}",
track_num,
sanitize_filename(track_title),
ext
)
} else {
format!("{}.{}", sanitize_filename(track_title), ext)
};
let storage_dir = Path::new(storage_dir_str);
let storage_path = if source_path.exists() {
match mover::move_to_storage(
storage_dir,
artist_name,
release_title,
&dest_filename,
source_path,
)
.await?
{
mover::MoveOutcome::Moved(p) => p.to_string_lossy().to_string(),
mover::MoveOutcome::Merged(p) => p.to_string_lossy().to_string(),
}
} else {
storage_dir
.join(sanitize_filename(artist_name))
.join(sanitize_filename(release_title))
.join(&dest_filename)
.to_string_lossy()
.to_string()
};
let media_file = MediaFile::create(
db,
"audio",
&storage_path,
original_filename,
mime_type,
file_size,
sha256,
Some(ext),
None,
None,
None,
)
.await
.map_err(|e| anyhow::anyhow!("failed to create media file: {e}"))?;
let track = Track::create(
db,
track_title,
release.id_val(),
track_number,
None,
duration_secs,
media_file.id_val(),
year,
model_name,
)
.await
.map_err(|e| anyhow::anyhow!("failed to create track: {e}"))?;
TrackArtist::create(db, track.id_val(), artist.id_val(), "main", 0)
.await
.map_err(|e| anyhow::anyhow!("failed to link track-artist: {e}"))?;
for (i, feat_name) in normalized.featured_artists.iter().enumerate() {
let feat_artist = find_or_create_artist(db, feat_name, model_name).await?;
let _ = TrackArtist::create(
db,
track.id_val(),
feat_artist.id_val(),
"featuring",
(i + 1) as i32,
)
.await;
}
// Cover art: if the release has no cover yet, try to find one
if release.cover_file_id.is_none() {
let source_folder = Path::new(input_path_str)
.parent()
.unwrap_or(Path::new("."));
// Collect audio files in the same folder to try embedded extraction
let audio_files_in_folder: Vec<std::path::PathBuf> = std::fs::read_dir(source_folder)
.ok()
.map(|rd| {
rd.filter_map(|e| e.ok())
.filter(|e| {
let name = e.file_name().to_string_lossy().into_owned();
let ext = name.rsplit('.').next().unwrap_or("").to_lowercase();
AUDIO_EXTENSIONS.contains(&ext.as_str())
})
.map(|e| e.path())
.collect()
})
.unwrap_or_default();
match crate::agent::cover_art::find_best_cover(source_folder, &audio_files_in_folder).await
{
Some(cover) => {
let source_desc = match &cover.source {
crate::agent::cover_art::CoverSource::FolderFile(p) => {
format!("folder file: {}", p.display())
}
crate::agent::cover_art::CoverSource::Embedded(p) => {
format!("embedded in: {}", p.display())
}
};
match crate::agent::cover_art::save_cover_to_storage(
db,
pool,
storage_dir_str,
artist_name,
release_title,
&cover,
)
.await
{
Ok(cover_file_id) => {
let _ = crate::agent::cover_art::assign_cover_to_release(
pool,
release.id_val(),
cover_file_id,
)
.await;
tracing::info!(
release_id = release.id_val(),
cover_file_id,
source = %source_desc,
"Assigned cover art to release"
);
}
Err(e) => {
tracing::warn!(
release_id = release.id_val(),
error = %e,
"Failed to save cover art"
);
}
}
}
None => {
tracing::debug!(
release_id = release.id_val(),
"No cover art found for release"
);
}
}
}
tracing::info!(
track_id = track.id_val(),
artist = artist_name,
release = release_title,
title = track_title,
"Track finalized"
);
Ok(())
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
async fn find_or_create_artist(
db: &cot::db::Database,
name: &str,
model_name: Option<&str>,
) -> anyhow::Result<Artist> {
let name_sort = name.trim().to_lowercase();
let all = Artist::list_all(db).await.unwrap_or_default();
for a in &all {
if a.name_sort.as_str() == name_sort {
return Ok(a.clone());
}
}
Artist::create(db, name, model_name)
.await
.map_err(|e| anyhow::anyhow!("failed to create artist: {e}"))
}
async fn find_or_create_release(
db: &cot::db::Database,
title: &str,
release_type: &str,
year: Option<i32>,
model_name: Option<&str>,
) -> anyhow::Result<Release> {
let title_sort = title.trim().to_lowercase();
let all = Release::list_all(db).await.unwrap_or_default();
for r in &all {
if r.title_sort.as_str() == title_sort && r.release_type.as_str() == release_type {
return Ok(r.clone());
}
}
Release::create(db, title, release_type, year, model_name)
.await
.map_err(|e| anyhow::anyhow!("failed to create release: {e}"))
}
async fn cleanup_empty_dirs(dir: &Path) -> bool {
let mut entries = match tokio::fs::read_dir(dir).await {
Ok(e) => e,
Err(_) => return false,
};
let mut is_empty = true;
while let Ok(Some(entry)) = entries.next_entry().await {
let ft = match entry.file_type().await {
Ok(ft) => ft,
Err(_) => {
is_empty = false;
continue;
}
};
if ft.is_dir() {
let child_empty = Box::pin(cleanup_empty_dirs(&entry.path())).await;
if child_empty {
let _ = tokio::fs::remove_dir(&entry.path()).await;
} else {
is_empty = false;
}
} else {
is_empty = false;
}
}
is_empty
}
fn sanitize_filename(name: &str) -> String {
name.chars()
.map(|c| match c {
'/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '_',
_ => c,
})
.collect::<String>()
.trim()
.to_owned()
}
fn truncate_path(path: &str, max_len: usize) -> String {
if path.len() <= max_len {
path.to_owned()
} else {
format!("...{}", &path[path.len() - (max_len - 3)..])
}
}
+5
View File
@@ -0,0 +1,5 @@
pub mod artist_image_backfill;
pub mod artist_track_image_backfill;
pub mod cover_backfill;
pub mod inbox_discover;
pub mod inbox_process;