CORE: reworked artwork_backfill.rs task
This commit is contained in:
+141
-86
@@ -460,6 +460,98 @@ async fn update_lastfm_account_error(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn enqueue_lastfm_scrobble(
|
||||
pool: &sqlx::PgPool,
|
||||
config: &AppConfig,
|
||||
user_id: i64,
|
||||
track_id: i64,
|
||||
started_at: Option<i64>,
|
||||
listened_seconds: i32,
|
||||
) -> cot::Result<LastfmActionResponse> {
|
||||
if !crate::lastfm::is_configured(config) {
|
||||
return Ok(LastfmActionResponse {
|
||||
ok: false,
|
||||
queued: false,
|
||||
sent: false,
|
||||
message: Some("Last.fm is not configured".to_string()),
|
||||
});
|
||||
}
|
||||
if load_lastfm_account(pool, user_id).await?.is_none() {
|
||||
return Ok(LastfmActionResponse {
|
||||
ok: false,
|
||||
queued: false,
|
||||
sent: false,
|
||||
message: Some("Last.fm account is not connected".to_string()),
|
||||
});
|
||||
}
|
||||
let Some(track) = load_lastfm_track_payload(pool, track_id).await? else {
|
||||
return Ok(LastfmActionResponse {
|
||||
ok: false,
|
||||
queued: false,
|
||||
sent: false,
|
||||
message: Some("Track has no primary artist for Last.fm".to_string()),
|
||||
});
|
||||
};
|
||||
let duration_seconds = track.duration_seconds.unwrap_or(0).max(0);
|
||||
if duration_seconds <= 30 {
|
||||
return Ok(LastfmActionResponse {
|
||||
ok: false,
|
||||
queued: false,
|
||||
sent: false,
|
||||
message: Some("Track is too short to scrobble".to_string()),
|
||||
});
|
||||
}
|
||||
let threshold = ((duration_seconds as f64 / 2.0).min(240.0)).ceil() as i32;
|
||||
let listened_seconds = listened_seconds.max(0);
|
||||
if listened_seconds < threshold {
|
||||
return Ok(LastfmActionResponse {
|
||||
ok: false,
|
||||
queued: false,
|
||||
sent: false,
|
||||
message: Some("Scrobble threshold has not been reached".to_string()),
|
||||
});
|
||||
}
|
||||
|
||||
let now_ts = chrono::Utc::now().timestamp();
|
||||
let started_at = started_at
|
||||
.unwrap_or(now_ts - listened_seconds as i64)
|
||||
.min(now_ts);
|
||||
let now = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
|
||||
let dedupe_key = format!("{user_id}:{track_id}:{started_at}");
|
||||
sqlx::query(
|
||||
r#"INSERT INTO furumusic__lastfm_scrobble_outbox
|
||||
(user_id, track_id, started_at, listened_seconds, duration_seconds, status, created_at, updated_at, dedupe_key)
|
||||
VALUES ($1, $2, $3, $4, $5, 'pending', $6, $6, $7)
|
||||
ON CONFLICT (dedupe_key) DO NOTHING"#,
|
||||
)
|
||||
.bind(user_id)
|
||||
.bind(track_id)
|
||||
.bind(started_at)
|
||||
.bind(listened_seconds)
|
||||
.bind(duration_seconds)
|
||||
.bind(&now)
|
||||
.bind(&dedupe_key)
|
||||
.execute(pool)
|
||||
.await
|
||||
.map_err(|e| cot::Error::internal(e.to_string()))?;
|
||||
|
||||
let sent = match crate::lastfm::process_pending_scrobbles(pool, config, Some(user_id), 10).await
|
||||
{
|
||||
Ok(summary) => summary.sent > 0,
|
||||
Err(err) => {
|
||||
tracing::warn!("Last.fm immediate scrobble send failed: {err:#}");
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
Ok(LastfmActionResponse {
|
||||
ok: true,
|
||||
queued: true,
|
||||
sent,
|
||||
message: None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn lastfm_now_playing_handler(
|
||||
session: Session,
|
||||
db: Database,
|
||||
@@ -542,93 +634,17 @@ async fn lastfm_scrobble_handler(
|
||||
return Ok(json_error(StatusCode::UNAUTHORIZED, "not authenticated"));
|
||||
};
|
||||
let (config, _) = AppConfig::load_with_db(&db).await;
|
||||
if !crate::lastfm::is_configured(&config) {
|
||||
return Json(LastfmActionResponse {
|
||||
ok: false,
|
||||
queued: false,
|
||||
sent: false,
|
||||
message: Some("Last.fm is not configured".to_string()),
|
||||
})
|
||||
.into_response();
|
||||
}
|
||||
if load_lastfm_account(pool, user.id).await?.is_none() {
|
||||
return Json(LastfmActionResponse {
|
||||
ok: false,
|
||||
queued: false,
|
||||
sent: false,
|
||||
message: Some("Last.fm account is not connected".to_string()),
|
||||
})
|
||||
.into_response();
|
||||
}
|
||||
let Some(track) = load_lastfm_track_payload(pool, entry.track_id).await? else {
|
||||
return Json(LastfmActionResponse {
|
||||
ok: false,
|
||||
queued: false,
|
||||
sent: false,
|
||||
message: Some("Track has no primary artist for Last.fm".to_string()),
|
||||
})
|
||||
.into_response();
|
||||
};
|
||||
let duration_seconds = track.duration_seconds.unwrap_or(0).max(0);
|
||||
if duration_seconds <= 30 {
|
||||
return Json(LastfmActionResponse {
|
||||
ok: false,
|
||||
queued: false,
|
||||
sent: false,
|
||||
message: Some("Track is too short to scrobble".to_string()),
|
||||
})
|
||||
.into_response();
|
||||
}
|
||||
let threshold = ((duration_seconds as f64 / 2.0).min(240.0)).ceil() as i32;
|
||||
let listened_seconds = entry.listened_seconds.max(0);
|
||||
if listened_seconds < threshold {
|
||||
return Json(LastfmActionResponse {
|
||||
ok: false,
|
||||
queued: false,
|
||||
sent: false,
|
||||
message: Some("Scrobble threshold has not been reached".to_string()),
|
||||
})
|
||||
.into_response();
|
||||
}
|
||||
|
||||
let now_ts = chrono::Utc::now().timestamp();
|
||||
let started_at = entry
|
||||
.started_at
|
||||
.unwrap_or(now_ts - listened_seconds as i64)
|
||||
.min(now_ts);
|
||||
let now = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
|
||||
let dedupe_key = format!("{}:{}:{}", user.id, entry.track_id, started_at);
|
||||
sqlx::query(
|
||||
r#"INSERT INTO furumusic__lastfm_scrobble_outbox
|
||||
(user_id, track_id, started_at, listened_seconds, duration_seconds, status, created_at, updated_at, dedupe_key)
|
||||
VALUES ($1, $2, $3, $4, $5, 'pending', $6, $6, $7)
|
||||
ON CONFLICT (dedupe_key) DO NOTHING"#,
|
||||
Json(
|
||||
enqueue_lastfm_scrobble(
|
||||
pool,
|
||||
&config,
|
||||
user.id,
|
||||
entry.track_id,
|
||||
entry.started_at,
|
||||
entry.listened_seconds,
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
.bind(user.id)
|
||||
.bind(entry.track_id)
|
||||
.bind(started_at)
|
||||
.bind(listened_seconds)
|
||||
.bind(duration_seconds)
|
||||
.bind(&now)
|
||||
.bind(&dedupe_key)
|
||||
.execute(pool)
|
||||
.await
|
||||
.map_err(|e| cot::Error::internal(e.to_string()))?;
|
||||
|
||||
let sent =
|
||||
match crate::lastfm::process_pending_scrobbles(pool, &config, Some(user.id), 10).await {
|
||||
Ok(summary) => summary.sent > 0,
|
||||
Err(err) => {
|
||||
tracing::warn!("Last.fm immediate scrobble send failed: {err:#}");
|
||||
false
|
||||
}
|
||||
};
|
||||
Json(LastfmActionResponse {
|
||||
ok: true,
|
||||
queued: true,
|
||||
sent,
|
||||
message: None,
|
||||
})
|
||||
.into_response()
|
||||
}
|
||||
|
||||
@@ -1944,6 +1960,45 @@ async fn history_handler(
|
||||
.await
|
||||
.map_err(|e| cot::Error::internal(e.to_string()))?;
|
||||
|
||||
if let Some(listened_seconds) = entry.duration_listened {
|
||||
let (config, _) = AppConfig::load_with_db(&db).await;
|
||||
match enqueue_lastfm_scrobble(
|
||||
pool,
|
||||
&config,
|
||||
user.id,
|
||||
entry.track_id,
|
||||
entry.started_at,
|
||||
listened_seconds,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result) if result.queued => {
|
||||
tracing::info!(
|
||||
user_id = user.id,
|
||||
track_id = entry.track_id,
|
||||
sent = result.sent,
|
||||
"Queued Last.fm scrobble from play history"
|
||||
);
|
||||
}
|
||||
Ok(result) => {
|
||||
tracing::debug!(
|
||||
user_id = user.id,
|
||||
track_id = entry.track_id,
|
||||
message = ?result.message,
|
||||
"Play history did not queue Last.fm scrobble"
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
user_id = user.id,
|
||||
track_id = entry.track_id,
|
||||
error = %err,
|
||||
"Failed to queue Last.fm scrobble from play history"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Json(serde_json::json!({"ok": true})).into_response()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user