JOBS: fixed metadata_backfill.rs

This commit is contained in:
Ultradesu
2026-06-01 18:49:44 +03:00
parent c244b3d4d8
commit a1dafaa5f2
+187 -56
View File
@@ -84,6 +84,12 @@ struct LastfmTagStats {
failed: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum LastfmTagPassResult {
Completed,
RateLimited,
}
pub struct MetadataBackfillJob;
#[async_trait::async_trait]
@@ -345,9 +351,21 @@ async fn backfill_lastfm_tags(
.build()?;
let mut stats = LastfmTagStats::default();
backfill_lastfm_artist_tags(ctx, log, &client, api_key, overwrite, &mut stats).await?;
backfill_lastfm_release_tags(ctx, log, &client, api_key, overwrite, &mut stats).await?;
backfill_lastfm_track_tags(ctx, log, &client, api_key, overwrite, &mut stats).await?;
if backfill_lastfm_artist_tags(ctx, log, &client, api_key, overwrite, &mut stats).await?
== LastfmTagPassResult::RateLimited
{
return Ok(stats);
}
if backfill_lastfm_release_tags(ctx, log, &client, api_key, overwrite, &mut stats).await?
== LastfmTagPassResult::RateLimited
{
return Ok(stats);
}
if backfill_lastfm_track_tags(ctx, log, &client, api_key, overwrite, &mut stats).await?
== LastfmTagPassResult::RateLimited
{
return Ok(stats);
}
Ok(stats)
}
@@ -358,7 +376,7 @@ async fn backfill_lastfm_artist_tags(
api_key: &str,
overwrite: bool,
stats: &mut LastfmTagStats,
) -> anyhow::Result<()> {
) -> anyhow::Result<LastfmTagPassResult> {
let rows = sqlx::query_as::<_, LastfmArtistTagRow>(
r#"SELECT DISTINCT a.id, a.name::text AS name
FROM furumusic__artist a
@@ -376,31 +394,66 @@ async fn backfill_lastfm_artist_tags(
));
let total = rows.len();
for (index, row) in rows.into_iter().enumerate() {
if should_skip_lastfm_entity(&ctx.pool, "artist", row.id, overwrite).await? {
stats.skipped_existing += 1;
if should_log_lastfm_progress(index + 1, total, 25) {
log.info(&format!(
"Last.fm artist tags progress: {}/{}",
index + 1,
total
));
if should_log_lastfm_item(index + 1, total, 25) {
log.info(&format!(
"Last.fm artist tags {}/{}: artist {} \"{}\"",
index + 1,
total,
row.id,
row.name
));
}
match should_skip_lastfm_entity(&ctx.pool, "artist", row.id, overwrite).await {
Ok(true) => {
stats.skipped_existing += 1;
if should_log_lastfm_progress(index + 1, total, 25) {
log.info(&format!(
"Last.fm artist tags progress: {}/{}",
index + 1,
total
));
}
continue;
}
Ok(false) => {}
Err(err) => {
stats.failed += 1;
log.warn(&format!(
"Last.fm artist tags skip check failed for artist {} \"{}\": {err}",
row.id, row.name
));
continue;
}
continue;
}
stats.considered += 1;
match fetch_lastfm_artist_tags(client, api_key, &row.name).await {
Ok(tags) if !tags.is_empty() => {
let saved =
replace_entity_tags(&ctx.pool, "artist", row.id, &tags, "lastfm", false)
.await?;
stats.tags_saved += saved;
stats.updated_entities += 1;
match replace_entity_tags(&ctx.pool, "artist", row.id, &tags, "lastfm", false)
.await
{
Ok(saved) => {
stats.tags_saved += saved;
stats.updated_entities += 1;
}
Err(err) => {
stats.failed += 1;
log.warn(&format!(
"Last.fm artist tags save failed for artist {} \"{}\": {err}",
row.id, row.name
));
}
}
}
Ok(_) => {
stats.not_found += 1;
}
Err(err) if err.to_string().contains("Last.fm rate limit exceeded") => {
return Err(err);
Err(err) if is_lastfm_rate_limit_error(&err) => {
stats.failed += 1;
log.warn(&format!(
"Last.fm rate limit reached while fetching artist tags for artist {} \"{}\"; stopping Last.fm tag backfill for this run",
row.id, row.name
));
return Ok(LastfmTagPassResult::RateLimited);
}
Err(err) => {
stats.failed += 1;
@@ -419,7 +472,7 @@ async fn backfill_lastfm_artist_tags(
}
tokio::time::sleep(LASTFM_TAG_REQUEST_DELAY).await;
}
Ok(())
Ok(LastfmTagPassResult::Completed)
}
async fn backfill_lastfm_release_tags(
@@ -429,7 +482,7 @@ async fn backfill_lastfm_release_tags(
api_key: &str,
overwrite: bool,
stats: &mut LastfmTagStats,
) -> anyhow::Result<()> {
) -> anyhow::Result<LastfmTagPassResult> {
let rows = sqlx::query_as::<_, LastfmReleaseTagRow>(
r#"SELECT r.id,
r.title::text AS title,
@@ -454,16 +507,36 @@ async fn backfill_lastfm_release_tags(
));
let total = rows.len();
for (index, row) in rows.into_iter().enumerate() {
if should_skip_lastfm_entity(&ctx.pool, "release", row.id, overwrite).await? {
stats.skipped_existing += 1;
if should_log_lastfm_progress(index + 1, total, 25) {
log.info(&format!(
"Last.fm release tags progress: {}/{}",
index + 1,
total
));
if should_log_lastfm_item(index + 1, total, 25) {
log.info(&format!(
"Last.fm release tags {}/{}: release {} \"{}\"",
index + 1,
total,
row.id,
row.title
));
}
match should_skip_lastfm_entity(&ctx.pool, "release", row.id, overwrite).await {
Ok(true) => {
stats.skipped_existing += 1;
if should_log_lastfm_progress(index + 1, total, 25) {
log.info(&format!(
"Last.fm release tags progress: {}/{}",
index + 1,
total
));
}
continue;
}
Ok(false) => {}
Err(err) => {
stats.failed += 1;
log.warn(&format!(
"Last.fm release tags skip check failed for release {} \"{}\": {err}",
row.id, row.title
));
continue;
}
continue;
}
let Some(artist) = row.artist_name.as_deref().filter(|value| !value.trim().is_empty())
else {
@@ -480,17 +553,32 @@ async fn backfill_lastfm_release_tags(
stats.considered += 1;
match fetch_lastfm_album_tags(client, api_key, artist, &row.title).await {
Ok(tags) if !tags.is_empty() => {
let saved =
replace_entity_tags(&ctx.pool, "release", row.id, &tags, "lastfm", false)
.await?;
stats.tags_saved += saved;
stats.updated_entities += 1;
match replace_entity_tags(&ctx.pool, "release", row.id, &tags, "lastfm", false)
.await
{
Ok(saved) => {
stats.tags_saved += saved;
stats.updated_entities += 1;
}
Err(err) => {
stats.failed += 1;
log.warn(&format!(
"Last.fm release tags save failed for release {} \"{}\" / \"{}\": {err}",
row.id, artist, row.title
));
}
}
}
Ok(_) => {
stats.not_found += 1;
}
Err(err) if err.to_string().contains("Last.fm rate limit exceeded") => {
return Err(err);
Err(err) if is_lastfm_rate_limit_error(&err) => {
stats.failed += 1;
log.warn(&format!(
"Last.fm rate limit reached while fetching release tags for release {} \"{}\" / \"{}\"; stopping Last.fm tag backfill for this run",
row.id, artist, row.title
));
return Ok(LastfmTagPassResult::RateLimited);
}
Err(err) => {
stats.failed += 1;
@@ -509,7 +597,7 @@ async fn backfill_lastfm_release_tags(
}
tokio::time::sleep(LASTFM_TAG_REQUEST_DELAY).await;
}
Ok(())
Ok(LastfmTagPassResult::Completed)
}
async fn backfill_lastfm_track_tags(
@@ -519,7 +607,7 @@ async fn backfill_lastfm_track_tags(
api_key: &str,
overwrite: bool,
stats: &mut LastfmTagStats,
) -> anyhow::Result<()> {
) -> anyhow::Result<LastfmTagPassResult> {
let rows = sqlx::query_as::<_, LastfmTrackTagRow>(
r#"SELECT t.id,
t.title::text AS title,
@@ -544,16 +632,36 @@ async fn backfill_lastfm_track_tags(
));
let total = rows.len();
for (index, row) in rows.into_iter().enumerate() {
if should_skip_lastfm_entity(&ctx.pool, "track", row.id, overwrite).await? {
stats.skipped_existing += 1;
if should_log_lastfm_progress(index + 1, total, 50) {
log.info(&format!(
"Last.fm track tags progress: {}/{}",
index + 1,
total
));
if should_log_lastfm_item(index + 1, total, 50) {
log.info(&format!(
"Last.fm track tags {}/{}: track {} \"{}\"",
index + 1,
total,
row.id,
row.title
));
}
match should_skip_lastfm_entity(&ctx.pool, "track", row.id, overwrite).await {
Ok(true) => {
stats.skipped_existing += 1;
if should_log_lastfm_progress(index + 1, total, 50) {
log.info(&format!(
"Last.fm track tags progress: {}/{}",
index + 1,
total
));
}
continue;
}
Ok(false) => {}
Err(err) => {
stats.failed += 1;
log.warn(&format!(
"Last.fm track tags skip check failed for track {} \"{}\": {err}",
row.id, row.title
));
continue;
}
continue;
}
let Some(artist) = row.artist_name.as_deref().filter(|value| !value.trim().is_empty())
else {
@@ -570,16 +678,31 @@ async fn backfill_lastfm_track_tags(
stats.considered += 1;
match fetch_lastfm_track_tags(client, api_key, artist, &row.title).await {
Ok(tags) if !tags.is_empty() => {
let saved =
replace_entity_tags(&ctx.pool, "track", row.id, &tags, "lastfm", true).await?;
stats.tags_saved += saved;
stats.updated_entities += 1;
match replace_entity_tags(&ctx.pool, "track", row.id, &tags, "lastfm", true).await
{
Ok(saved) => {
stats.tags_saved += saved;
stats.updated_entities += 1;
}
Err(err) => {
stats.failed += 1;
log.warn(&format!(
"Last.fm track tags save failed for track {} \"{}\" / \"{}\": {err}",
row.id, artist, row.title
));
}
}
}
Ok(_) => {
stats.not_found += 1;
}
Err(err) if err.to_string().contains("Last.fm rate limit exceeded") => {
return Err(err);
Err(err) if is_lastfm_rate_limit_error(&err) => {
stats.failed += 1;
log.warn(&format!(
"Last.fm rate limit reached while fetching track tags for track {} \"{}\" / \"{}\"; stopping Last.fm tag backfill for this run",
row.id, artist, row.title
));
return Ok(LastfmTagPassResult::RateLimited);
}
Err(err) => {
stats.failed += 1;
@@ -598,13 +721,21 @@ async fn backfill_lastfm_track_tags(
}
tokio::time::sleep(LASTFM_TAG_REQUEST_DELAY).await;
}
Ok(())
Ok(LastfmTagPassResult::Completed)
}
fn should_log_lastfm_progress(done: usize, total: usize, every: usize) -> bool {
total > 0 && (done == total || done % every == 0)
}
fn should_log_lastfm_item(done: usize, total: usize, every: usize) -> bool {
total > 0 && (done == 1 || done == total || done % every == 0)
}
fn is_lastfm_rate_limit_error(err: &anyhow::Error) -> bool {
err.to_string().contains("Last.fm rate limit exceeded")
}
async fn should_skip_lastfm_entity(
pool: &sqlx::PgPool,
entity_kind: &str,