From 5600a8065d9d16d249a609da3343647102e16a65 Mon Sep 17 00:00:00 2001 From: Ultradesu Date: Wed, 27 May 2026 16:42:20 +0300 Subject: [PATCH] CORE: Added Last.FM scrobbling --- src/jobs/lastfm_scrobble.rs | 34 ++ src/lastfm.rs | 619 ++++++++++++++++++++++++++++++++++++ 2 files changed, 653 insertions(+) create mode 100644 src/jobs/lastfm_scrobble.rs create mode 100644 src/lastfm.rs diff --git a/src/jobs/lastfm_scrobble.rs b/src/jobs/lastfm_scrobble.rs new file mode 100644 index 0000000..6a1728f --- /dev/null +++ b/src/jobs/lastfm_scrobble.rs @@ -0,0 +1,34 @@ +use crate::lastfm; +use crate::scheduler::{Job, JobContext, JobLog}; + +pub struct LastfmScrobbleJob; + +#[async_trait::async_trait] +impl Job for LastfmScrobbleJob { + fn name(&self) -> &'static str { + "lastfm_scrobble" + } + + fn description(&self) -> &'static str { + "Send queued Last.fm scrobbles for connected users" + } + + fn default_cron(&self) -> &'static str { + // Every minute. + "0 * * * * *" + } + + async fn run(&self, ctx: &JobContext, log: &mut JobLog) -> anyhow::Result<()> { + if !lastfm::is_configured(&ctx.config) { + log.warn("Last.fm API key/shared secret are not configured; skipping scrobble outbox"); + return Ok(()); + } + + let summary = lastfm::process_pending_scrobbles(&ctx.pool, &ctx.config, None, 50).await?; + log.info(&format!( + "Last.fm scrobble outbox processed: considered={}, sent={}, failed={}, blocked={}, skipped={}", + summary.considered, summary.sent, summary.failed, summary.blocked, summary.skipped + )); + Ok(()) + } +} diff --git a/src/lastfm.rs b/src/lastfm.rs new file mode 100644 index 0000000..0316728 --- /dev/null +++ b/src/lastfm.rs @@ -0,0 +1,619 @@ +use std::collections::BTreeMap; +use std::time::Duration; + +use anyhow::Context; +use md5::{Digest, Md5}; +use reqwest::Client; +use serde::Deserialize; +use sqlx::PgPool; + +use crate::config::AppConfig; + +const LASTFM_API_URL: &str = "https://ws.audioscrobbler.com/2.0/"; +const MAX_BATCH_SIZE: i64 = 50; +const MAX_ATTEMPTS: i32 = 8; + +#[derive(Debug, Clone)] +pub struct LastfmCredentials { + api_key: String, + shared_secret: String, +} + +#[derive(Debug, Clone)] +pub struct LastfmSession { + pub username: String, + pub session_key: String, +} + +#[derive(Debug, Clone)] +pub struct LastfmTrackPayload { + pub artist: String, + pub track: String, + pub album: Option, + pub album_artist: Option, + pub track_number: Option, + pub duration_seconds: Option, +} + +#[derive(Debug, Clone)] +pub struct LastfmScrobblePayload { + pub track: LastfmTrackPayload, + pub timestamp: i64, +} + +#[derive(Debug)] +pub struct LastfmApiError { + pub code: Option, + pub message: String, +} + +impl std::fmt::Display for LastfmApiError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.code { + Some(code) => write!(f, "Last.fm API error {code}: {}", self.message), + None => write!(f, "Last.fm API error: {}", self.message), + } + } +} + +impl std::error::Error for LastfmApiError {} + +impl LastfmApiError { + fn new(code: Option, message: impl Into) -> Self { + Self { + code, + message: message.into(), + } + } + + pub fn is_invalid_session(&self) -> bool { + self.code == Some(9) + } + + pub fn is_retryable(&self) -> bool { + matches!(self.code, Some(11 | 16 | 29) | None) + } +} + +#[derive(Debug, Default)] +pub struct ScrobbleProcessSummary { + pub considered: u64, + pub sent: u64, + pub failed: u64, + pub blocked: u64, + pub skipped: u64, +} + +pub fn is_configured(config: &AppConfig) -> bool { + !config.lastfm_api_key.trim().is_empty() && !config.lastfm_shared_secret.trim().is_empty() +} + +impl LastfmCredentials { + pub fn from_config(config: &AppConfig) -> Option { + let api_key = config.lastfm_api_key.trim(); + let shared_secret = config.lastfm_shared_secret.trim(); + if api_key.is_empty() || shared_secret.is_empty() { + return None; + } + Some(Self { + api_key: api_key.to_owned(), + shared_secret: shared_secret.to_owned(), + }) + } + + pub fn api_key(&self) -> &str { + &self.api_key + } +} + +pub struct LastfmClient { + client: Client, + credentials: LastfmCredentials, +} + +impl LastfmClient { + pub fn new(credentials: LastfmCredentials) -> anyhow::Result { + let client = Client::builder() + .user_agent(format!( + "furumusic-lastfm-scrobbler/{}", + env!("CARGO_PKG_VERSION") + )) + .timeout(Duration::from_secs(15)) + .build() + .context("failed to build Last.fm HTTP client")?; + Ok(Self { + client, + credentials, + }) + } + + pub async fn get_session(&self, token: &str) -> Result { + let params = self.signed_params( + "auth.getSession", + None, + vec![("token".to_string(), token.to_string())], + ); + let body = self.post(params).await?; + let response: AuthSessionResponse = serde_json::from_str(&body) + .map_err(|err| LastfmApiError::new(None, err.to_string()))?; + if let Some(code) = response.error { + return Err(LastfmApiError::new( + Some(code), + response + .message + .unwrap_or_else(|| "authentication failed".to_string()), + )); + } + let Some(session) = response.session else { + return Err(LastfmApiError::new( + None, + "Last.fm auth response did not include a session", + )); + }; + Ok(LastfmSession { + username: session.name, + session_key: session.key, + }) + } + + pub async fn update_now_playing( + &self, + session_key: &str, + track: &LastfmTrackPayload, + ) -> Result<(), LastfmApiError> { + let mut extra = vec![ + ("artist".to_string(), track.artist.clone()), + ("track".to_string(), track.track.clone()), + ]; + push_optional(&mut extra, "album", track.album.as_deref()); + push_optional(&mut extra, "albumArtist", track.album_artist.as_deref()); + push_optional_i32(&mut extra, "trackNumber", track.track_number); + push_optional_i32(&mut extra, "duration", track.duration_seconds); + + let params = self.signed_params("track.updateNowPlaying", Some(session_key), extra); + let body = self.post(params).await?; + check_lastfm_error(&body) + } + + pub async fn scrobble_batch( + &self, + session_key: &str, + scrobbles: &[LastfmScrobblePayload], + ) -> Result<(), LastfmApiError> { + let mut extra = Vec::new(); + for (index, scrobble) in scrobbles.iter().take(MAX_BATCH_SIZE as usize).enumerate() { + let suffix = format!("[{index}]"); + extra.push((format!("artist{suffix}"), scrobble.track.artist.clone())); + extra.push((format!("track{suffix}"), scrobble.track.track.clone())); + extra.push((format!("timestamp{suffix}"), scrobble.timestamp.to_string())); + push_optional( + &mut extra, + &format!("album{suffix}"), + scrobble.track.album.as_deref(), + ); + push_optional( + &mut extra, + &format!("albumArtist{suffix}"), + scrobble.track.album_artist.as_deref(), + ); + push_optional_i32( + &mut extra, + &format!("trackNumber{suffix}"), + scrobble.track.track_number, + ); + push_optional_i32( + &mut extra, + &format!("duration{suffix}"), + scrobble.track.duration_seconds, + ); + } + + let params = self.signed_params("track.scrobble", Some(session_key), extra); + let body = self.post(params).await?; + check_lastfm_error(&body) + } + + fn signed_params( + &self, + method: &str, + session_key: Option<&str>, + extra: Vec<(String, String)>, + ) -> Vec<(String, String)> { + let mut params = BTreeMap::new(); + params.insert("api_key".to_string(), self.credentials.api_key.clone()); + params.insert("method".to_string(), method.to_string()); + if let Some(session_key) = session_key { + params.insert("sk".to_string(), session_key.to_string()); + } + for (key, value) in extra { + params.insert(key, value); + } + + let mut signature_input = String::new(); + for (key, value) in ¶ms { + signature_input.push_str(key); + signature_input.push_str(value); + } + signature_input.push_str(&self.credentials.shared_secret); + let digest = Md5::digest(signature_input.as_bytes()); + let api_sig = format!("{digest:x}"); + + let mut out = params.into_iter().collect::>(); + out.push(("api_sig".to_string(), api_sig)); + out.push(("format".to_string(), "json".to_string())); + out + } + + async fn post(&self, params: Vec<(String, String)>) -> Result { + let response = self + .client + .post(LASTFM_API_URL) + .form(¶ms) + .send() + .await + .map_err(|err| LastfmApiError::new(None, err.to_string()))?; + let status = response.status(); + let body = response + .text() + .await + .map_err(|err| LastfmApiError::new(None, err.to_string()))?; + if !status.is_success() { + if let Some(err) = parse_error(&body) { + return Err(err); + } + return Err(LastfmApiError::new(None, format!("HTTP {status}: {body}"))); + } + Ok(body) + } +} + +pub async fn process_pending_scrobbles( + pool: &PgPool, + config: &AppConfig, + user_id: Option, + limit_per_user: i64, +) -> anyhow::Result { + let Some(credentials) = LastfmCredentials::from_config(config) else { + return Ok(ScrobbleProcessSummary::default()); + }; + let client = LastfmClient::new(credentials)?; + let user_ids = pending_user_ids(pool, user_id).await?; + let mut summary = ScrobbleProcessSummary::default(); + + for uid in user_ids { + let rows = fetch_pending_scrobbles(pool, uid, limit_per_user.min(MAX_BATCH_SIZE)).await?; + if rows.is_empty() { + continue; + } + summary.considered += rows.len() as u64; + + let mut ids = Vec::new(); + let mut attempt_rows = Vec::new(); + let mut payloads = Vec::new(); + for row in &rows { + match row.payload() { + Some(payload) => { + ids.push(row.id); + attempt_rows.push((row.id, row.attempt_count)); + payloads.push(payload); + } + None => { + mark_row_failed(pool, row.id, "track has no primary Last.fm artist").await?; + summary.skipped += 1; + } + } + } + if ids.is_empty() { + continue; + } + + match client.scrobble_batch(&rows[0].session_key, &payloads).await { + Ok(()) => { + mark_rows_sent(pool, &ids).await?; + clear_account_error(pool, uid).await?; + summary.sent += ids.len() as u64; + } + Err(err) if err.is_invalid_session() => { + mark_account_reauth_required(pool, uid, &err.to_string()).await?; + mark_rows_blocked(pool, &ids, &err.to_string()).await?; + summary.blocked += ids.len() as u64; + } + Err(err) => { + mark_rows_retry_or_failed(pool, &attempt_rows, &err).await?; + summary.failed += ids.len() as u64; + if err.code == Some(29) { + break; + } + } + } + } + + Ok(summary) +} + +#[derive(Debug, Deserialize)] +struct AuthSessionResponse { + session: Option, + error: Option, + message: Option, +} + +#[derive(Debug, Deserialize)] +struct AuthSession { + name: String, + key: String, +} + +#[derive(Debug, Deserialize)] +struct ErrorEnvelope { + error: Option, + message: Option, +} + +#[derive(Debug, sqlx::FromRow)] +struct PendingScrobbleRow { + id: i64, + started_at: i64, + duration_seconds: i32, + attempt_count: i32, + session_key: String, + title: String, + artist_name: Option, + album_title: Option, + album_artist_name: Option, + track_number: Option, +} + +impl PendingScrobbleRow { + fn payload(&self) -> Option { + let artist = self + .artist_name + .as_deref() + .map(str::trim) + .filter(|value| !value.is_empty())? + .to_string(); + Some(LastfmScrobblePayload { + track: LastfmTrackPayload { + artist, + track: self.title.clone(), + album: non_empty(self.album_title.as_deref()), + album_artist: non_empty(self.album_artist_name.as_deref()), + track_number: self.track_number, + duration_seconds: Some(self.duration_seconds), + }, + timestamp: self.started_at, + }) + } +} + +fn non_empty(value: Option<&str>) -> Option { + value + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(ToOwned::to_owned) +} + +fn push_optional(params: &mut Vec<(String, String)>, key: &str, value: Option<&str>) { + if let Some(value) = value.map(str::trim).filter(|value| !value.is_empty()) { + params.push((key.to_string(), value.to_string())); + } +} + +fn push_optional_i32(params: &mut Vec<(String, String)>, key: &str, value: Option) { + if let Some(value) = value { + params.push((key.to_string(), value.to_string())); + } +} + +fn check_lastfm_error(body: &str) -> Result<(), LastfmApiError> { + if let Some(err) = parse_error(body) { + return Err(err); + } + Ok(()) +} + +fn parse_error(body: &str) -> Option { + let envelope: ErrorEnvelope = serde_json::from_str(body).ok()?; + envelope + .error + .map(|code| LastfmApiError::new(Some(code), envelope.message.unwrap_or_default())) +} + +async fn pending_user_ids(pool: &PgPool, user_id: Option) -> anyhow::Result> { + if let Some(user_id) = user_id { + return Ok(vec![user_id]); + } + let rows = sqlx::query_scalar::<_, i64>( + r#"SELECT DISTINCT o.user_id + FROM furumusic__lastfm_scrobble_outbox o + JOIN furumusic__lastfm_account a ON a.user_id = o.user_id + WHERE o.status IN ('pending', 'retry') + AND a.reauth_required = false + ORDER BY o.user_id"#, + ) + .fetch_all(pool) + .await?; + Ok(rows) +} + +async fn fetch_pending_scrobbles( + pool: &PgPool, + user_id: i64, + limit: i64, +) -> anyhow::Result> { + let rows = sqlx::query_as::<_, PendingScrobbleRow>( + r#"SELECT o.id, + o.started_at, + o.duration_seconds, + o.attempt_count, + a.session_key::text AS session_key, + t.title::text AS title, + r.title::text AS album_title, + t.track_number, + ( + SELECT ar.name::text + FROM furumusic__track_artist ta + JOIN furumusic__artist ar ON ar.id = ta.artist_id + WHERE ta.track_id = t.id AND ta.role <> 'featuring' + ORDER BY ta.position + LIMIT 1 + ) AS artist_name, + ( + SELECT ar.name::text + FROM furumusic__release_artist ra + JOIN furumusic__artist ar ON ar.id = ra.artist_id + WHERE ra.release_id = r.id + ORDER BY ra.position + LIMIT 1 + ) AS album_artist_name + FROM furumusic__lastfm_scrobble_outbox o + JOIN furumusic__lastfm_account a ON a.user_id = o.user_id + JOIN furumusic__track t ON t.id = o.track_id + LEFT JOIN furumusic__release r ON r.id = t.release_id + WHERE o.user_id = $1 + AND o.status IN ('pending', 'retry') + AND a.reauth_required = false + ORDER BY o.created_at, o.id + LIMIT $2"#, + ) + .bind(user_id) + .bind(limit) + .fetch_all(pool) + .await?; + Ok(rows) +} + +async fn clear_account_error(pool: &PgPool, user_id: i64) -> anyhow::Result<()> { + sqlx::query( + r#"UPDATE furumusic__lastfm_account + SET last_error = NULL, + reauth_required = false, + updated_at = $2 + WHERE user_id = $1"#, + ) + .bind(user_id) + .bind(now_iso()) + .execute(pool) + .await?; + Ok(()) +} + +async fn mark_account_reauth_required( + pool: &PgPool, + user_id: i64, + error: &str, +) -> anyhow::Result<()> { + sqlx::query( + r#"UPDATE furumusic__lastfm_account + SET reauth_required = true, + last_error = $2, + updated_at = $3 + WHERE user_id = $1"#, + ) + .bind(user_id) + .bind(error) + .bind(now_iso()) + .execute(pool) + .await?; + Ok(()) +} + +async fn mark_rows_sent(pool: &PgPool, ids: &[i64]) -> anyhow::Result<()> { + if ids.is_empty() { + return Ok(()); + } + let now = now_iso(); + sqlx::query( + r#"UPDATE furumusic__lastfm_scrobble_outbox + SET status = 'sent', + updated_at = $2, + scrobbled_at = $2, + last_error = NULL + WHERE id = ANY($1)"#, + ) + .bind(ids) + .bind(now) + .execute(pool) + .await?; + Ok(()) +} + +async fn mark_row_failed(pool: &PgPool, id: i64, error: &str) -> anyhow::Result<()> { + sqlx::query( + r#"UPDATE furumusic__lastfm_scrobble_outbox + SET status = 'failed', + attempt_count = attempt_count + 1, + last_error = $2, + updated_at = $3 + WHERE id = $1"#, + ) + .bind(id) + .bind(error) + .bind(now_iso()) + .execute(pool) + .await?; + Ok(()) +} + +async fn mark_rows_blocked(pool: &PgPool, ids: &[i64], error: &str) -> anyhow::Result<()> { + if ids.is_empty() { + return Ok(()); + } + sqlx::query( + r#"UPDATE furumusic__lastfm_scrobble_outbox + SET status = 'blocked', + attempt_count = attempt_count + 1, + last_error = $2, + updated_at = $3 + WHERE id = ANY($1)"#, + ) + .bind(ids) + .bind(error) + .bind(now_iso()) + .execute(pool) + .await?; + Ok(()) +} + +async fn mark_rows_retry_or_failed( + pool: &PgPool, + rows: &[(i64, i32)], + error: &LastfmApiError, +) -> anyhow::Result<()> { + if rows.is_empty() { + return Ok(()); + } + let ids: Vec = rows.iter().map(|(id, _)| *id).collect(); + let next_status = if error.is_retryable() + && rows + .iter() + .any(|(_, attempt_count)| attempt_count + 1 < MAX_ATTEMPTS) + { + "retry" + } else { + "failed" + }; + sqlx::query( + r#"UPDATE furumusic__lastfm_scrobble_outbox + SET status = CASE + WHEN attempt_count + 1 >= $2 THEN 'failed' + ELSE $3 + END, + attempt_count = attempt_count + 1, + last_error = $4, + updated_at = $5 + WHERE id = ANY($1)"#, + ) + .bind(&ids) + .bind(MAX_ATTEMPTS) + .bind(next_status) + .bind(error.to_string()) + .bind(now_iso()) + .execute(pool) + .await?; + Ok(()) +} + +fn now_iso() -> String { + chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string() +}