diff --git a/src/jobs/metadata_backfill.rs b/src/jobs/metadata_backfill.rs index 79e8bdc..43717b8 100644 --- a/src/jobs/metadata_backfill.rs +++ b/src/jobs/metadata_backfill.rs @@ -84,6 +84,12 @@ struct LastfmTagStats { failed: u64, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum LastfmTagPassResult { + Completed, + RateLimited, +} + pub struct MetadataBackfillJob; #[async_trait::async_trait] @@ -345,9 +351,21 @@ async fn backfill_lastfm_tags( .build()?; let mut stats = LastfmTagStats::default(); - backfill_lastfm_artist_tags(ctx, log, &client, api_key, overwrite, &mut stats).await?; - backfill_lastfm_release_tags(ctx, log, &client, api_key, overwrite, &mut stats).await?; - backfill_lastfm_track_tags(ctx, log, &client, api_key, overwrite, &mut stats).await?; + if backfill_lastfm_artist_tags(ctx, log, &client, api_key, overwrite, &mut stats).await? + == LastfmTagPassResult::RateLimited + { + return Ok(stats); + } + if backfill_lastfm_release_tags(ctx, log, &client, api_key, overwrite, &mut stats).await? + == LastfmTagPassResult::RateLimited + { + return Ok(stats); + } + if backfill_lastfm_track_tags(ctx, log, &client, api_key, overwrite, &mut stats).await? + == LastfmTagPassResult::RateLimited + { + return Ok(stats); + } Ok(stats) } @@ -358,7 +376,7 @@ async fn backfill_lastfm_artist_tags( api_key: &str, overwrite: bool, stats: &mut LastfmTagStats, -) -> anyhow::Result<()> { +) -> anyhow::Result { let rows = sqlx::query_as::<_, LastfmArtistTagRow>( r#"SELECT DISTINCT a.id, a.name::text AS name FROM furumusic__artist a @@ -376,31 +394,66 @@ async fn backfill_lastfm_artist_tags( )); let total = rows.len(); for (index, row) in rows.into_iter().enumerate() { - if should_skip_lastfm_entity(&ctx.pool, "artist", row.id, overwrite).await? { - stats.skipped_existing += 1; - if should_log_lastfm_progress(index + 1, total, 25) { - log.info(&format!( - "Last.fm artist tags progress: {}/{}", - index + 1, - total - )); + if should_log_lastfm_item(index + 1, total, 25) { + log.info(&format!( + "Last.fm artist tags {}/{}: artist {} \"{}\"", + index + 1, + total, + row.id, + row.name + )); + } + match should_skip_lastfm_entity(&ctx.pool, "artist", row.id, overwrite).await { + Ok(true) => { + stats.skipped_existing += 1; + if should_log_lastfm_progress(index + 1, total, 25) { + log.info(&format!( + "Last.fm artist tags progress: {}/{}", + index + 1, + total + )); + } + continue; + } + Ok(false) => {} + Err(err) => { + stats.failed += 1; + log.warn(&format!( + "Last.fm artist tags skip check failed for artist {} \"{}\": {err}", + row.id, row.name + )); + continue; } - continue; } stats.considered += 1; match fetch_lastfm_artist_tags(client, api_key, &row.name).await { Ok(tags) if !tags.is_empty() => { - let saved = - replace_entity_tags(&ctx.pool, "artist", row.id, &tags, "lastfm", false) - .await?; - stats.tags_saved += saved; - stats.updated_entities += 1; + match replace_entity_tags(&ctx.pool, "artist", row.id, &tags, "lastfm", false) + .await + { + Ok(saved) => { + stats.tags_saved += saved; + stats.updated_entities += 1; + } + Err(err) => { + stats.failed += 1; + log.warn(&format!( + "Last.fm artist tags save failed for artist {} \"{}\": {err}", + row.id, row.name + )); + } + } } Ok(_) => { stats.not_found += 1; } - Err(err) if err.to_string().contains("Last.fm rate limit exceeded") => { - return Err(err); + Err(err) if is_lastfm_rate_limit_error(&err) => { + stats.failed += 1; + log.warn(&format!( + "Last.fm rate limit reached while fetching artist tags for artist {} \"{}\"; stopping Last.fm tag backfill for this run", + row.id, row.name + )); + return Ok(LastfmTagPassResult::RateLimited); } Err(err) => { stats.failed += 1; @@ -419,7 +472,7 @@ async fn backfill_lastfm_artist_tags( } tokio::time::sleep(LASTFM_TAG_REQUEST_DELAY).await; } - Ok(()) + Ok(LastfmTagPassResult::Completed) } async fn backfill_lastfm_release_tags( @@ -429,7 +482,7 @@ async fn backfill_lastfm_release_tags( api_key: &str, overwrite: bool, stats: &mut LastfmTagStats, -) -> anyhow::Result<()> { +) -> anyhow::Result { let rows = sqlx::query_as::<_, LastfmReleaseTagRow>( r#"SELECT r.id, r.title::text AS title, @@ -454,16 +507,36 @@ async fn backfill_lastfm_release_tags( )); let total = rows.len(); for (index, row) in rows.into_iter().enumerate() { - if should_skip_lastfm_entity(&ctx.pool, "release", row.id, overwrite).await? { - stats.skipped_existing += 1; - if should_log_lastfm_progress(index + 1, total, 25) { - log.info(&format!( - "Last.fm release tags progress: {}/{}", - index + 1, - total - )); + if should_log_lastfm_item(index + 1, total, 25) { + log.info(&format!( + "Last.fm release tags {}/{}: release {} \"{}\"", + index + 1, + total, + row.id, + row.title + )); + } + match should_skip_lastfm_entity(&ctx.pool, "release", row.id, overwrite).await { + Ok(true) => { + stats.skipped_existing += 1; + if should_log_lastfm_progress(index + 1, total, 25) { + log.info(&format!( + "Last.fm release tags progress: {}/{}", + index + 1, + total + )); + } + continue; + } + Ok(false) => {} + Err(err) => { + stats.failed += 1; + log.warn(&format!( + "Last.fm release tags skip check failed for release {} \"{}\": {err}", + row.id, row.title + )); + continue; } - continue; } let Some(artist) = row.artist_name.as_deref().filter(|value| !value.trim().is_empty()) else { @@ -480,17 +553,32 @@ async fn backfill_lastfm_release_tags( stats.considered += 1; match fetch_lastfm_album_tags(client, api_key, artist, &row.title).await { Ok(tags) if !tags.is_empty() => { - let saved = - replace_entity_tags(&ctx.pool, "release", row.id, &tags, "lastfm", false) - .await?; - stats.tags_saved += saved; - stats.updated_entities += 1; + match replace_entity_tags(&ctx.pool, "release", row.id, &tags, "lastfm", false) + .await + { + Ok(saved) => { + stats.tags_saved += saved; + stats.updated_entities += 1; + } + Err(err) => { + stats.failed += 1; + log.warn(&format!( + "Last.fm release tags save failed for release {} \"{}\" / \"{}\": {err}", + row.id, artist, row.title + )); + } + } } Ok(_) => { stats.not_found += 1; } - Err(err) if err.to_string().contains("Last.fm rate limit exceeded") => { - return Err(err); + Err(err) if is_lastfm_rate_limit_error(&err) => { + stats.failed += 1; + log.warn(&format!( + "Last.fm rate limit reached while fetching release tags for release {} \"{}\" / \"{}\"; stopping Last.fm tag backfill for this run", + row.id, artist, row.title + )); + return Ok(LastfmTagPassResult::RateLimited); } Err(err) => { stats.failed += 1; @@ -509,7 +597,7 @@ async fn backfill_lastfm_release_tags( } tokio::time::sleep(LASTFM_TAG_REQUEST_DELAY).await; } - Ok(()) + Ok(LastfmTagPassResult::Completed) } async fn backfill_lastfm_track_tags( @@ -519,7 +607,7 @@ async fn backfill_lastfm_track_tags( api_key: &str, overwrite: bool, stats: &mut LastfmTagStats, -) -> anyhow::Result<()> { +) -> anyhow::Result { let rows = sqlx::query_as::<_, LastfmTrackTagRow>( r#"SELECT t.id, t.title::text AS title, @@ -544,16 +632,36 @@ async fn backfill_lastfm_track_tags( )); let total = rows.len(); for (index, row) in rows.into_iter().enumerate() { - if should_skip_lastfm_entity(&ctx.pool, "track", row.id, overwrite).await? { - stats.skipped_existing += 1; - if should_log_lastfm_progress(index + 1, total, 50) { - log.info(&format!( - "Last.fm track tags progress: {}/{}", - index + 1, - total - )); + if should_log_lastfm_item(index + 1, total, 50) { + log.info(&format!( + "Last.fm track tags {}/{}: track {} \"{}\"", + index + 1, + total, + row.id, + row.title + )); + } + match should_skip_lastfm_entity(&ctx.pool, "track", row.id, overwrite).await { + Ok(true) => { + stats.skipped_existing += 1; + if should_log_lastfm_progress(index + 1, total, 50) { + log.info(&format!( + "Last.fm track tags progress: {}/{}", + index + 1, + total + )); + } + continue; + } + Ok(false) => {} + Err(err) => { + stats.failed += 1; + log.warn(&format!( + "Last.fm track tags skip check failed for track {} \"{}\": {err}", + row.id, row.title + )); + continue; } - continue; } let Some(artist) = row.artist_name.as_deref().filter(|value| !value.trim().is_empty()) else { @@ -570,16 +678,31 @@ async fn backfill_lastfm_track_tags( stats.considered += 1; match fetch_lastfm_track_tags(client, api_key, artist, &row.title).await { Ok(tags) if !tags.is_empty() => { - let saved = - replace_entity_tags(&ctx.pool, "track", row.id, &tags, "lastfm", true).await?; - stats.tags_saved += saved; - stats.updated_entities += 1; + match replace_entity_tags(&ctx.pool, "track", row.id, &tags, "lastfm", true).await + { + Ok(saved) => { + stats.tags_saved += saved; + stats.updated_entities += 1; + } + Err(err) => { + stats.failed += 1; + log.warn(&format!( + "Last.fm track tags save failed for track {} \"{}\" / \"{}\": {err}", + row.id, artist, row.title + )); + } + } } Ok(_) => { stats.not_found += 1; } - Err(err) if err.to_string().contains("Last.fm rate limit exceeded") => { - return Err(err); + Err(err) if is_lastfm_rate_limit_error(&err) => { + stats.failed += 1; + log.warn(&format!( + "Last.fm rate limit reached while fetching track tags for track {} \"{}\" / \"{}\"; stopping Last.fm tag backfill for this run", + row.id, artist, row.title + )); + return Ok(LastfmTagPassResult::RateLimited); } Err(err) => { stats.failed += 1; @@ -598,13 +721,21 @@ async fn backfill_lastfm_track_tags( } tokio::time::sleep(LASTFM_TAG_REQUEST_DELAY).await; } - Ok(()) + Ok(LastfmTagPassResult::Completed) } fn should_log_lastfm_progress(done: usize, total: usize, every: usize) -> bool { total > 0 && (done == total || done % every == 0) } +fn should_log_lastfm_item(done: usize, total: usize, every: usize) -> bool { + total > 0 && (done == 1 || done == total || done % every == 0) +} + +fn is_lastfm_rate_limit_error(err: &anyhow::Error) -> bool { + err.to_string().contains("Last.fm rate limit exceeded") +} + async fn should_skip_lastfm_entity( pool: &sqlx::PgPool, entity_kind: &str,