diff --git a/Cargo.lock b/Cargo.lock index 77c9d48..d4fb93e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1418,7 +1418,7 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "furumusic" -version = "0.2.13" +version = "0.2.14" dependencies = [ "anyhow", "async-trait", diff --git a/src/admin/v2.rs b/src/admin/v2.rs index 1ee84de..838111a 100644 --- a/src/admin/v2.rs +++ b/src/admin/v2.rs @@ -191,6 +191,7 @@ struct AdminUserRowDto { struct AdminUserDetailDto { user: AdminUserRowDto, stats: AdminUserStatsDto, + recent_plays: Vec, } #[derive(Debug, Serialize, JsonSchema)] @@ -207,6 +208,25 @@ struct AdminUserStatsDto { lastfm_connected: bool, } +#[derive(Debug, Serialize, JsonSchema)] +struct AdminUserPlayDto { + history_id: i64, + played_at: String, + duration_listened: Option, + completed: bool, + track_id: i64, + title: String, + artists: String, + release_id: i64, + release_title: String, + release_year: Option, + cover_url: Option, + track_duration_seconds: f64, + uploader_name: String, + audio_format: Option, + audio_bitrate: Option, +} + #[derive(Debug, Serialize, JsonSchema)] struct OverviewStatsDto { tracks: i64, @@ -1778,7 +1798,7 @@ async fn load_admin_user_detail( .bind(user_id) .fetch_one(pool), sqlx::query_scalar::<_, i64>( - "SELECT COUNT(DISTINCT t.id) FROM furumusic__track t JOIN furumusic__media_file mf ON mf.id = t.media_file_id WHERE mf.uploaded_by_user_id = $1" + "SELECT COUNT(DISTINCT t.id) FROM furumusic__track t JOIN furumusic__media_file mf ON mf.id = t.audio_file_id WHERE mf.uploaded_by_user_id = $1" ) .bind(user_id) .fetch_one(pool), @@ -1793,6 +1813,7 @@ async fn load_admin_user_detail( .bind(user_id) .fetch_one(pool), )?; + let recent_plays = load_admin_user_recent_plays(pool, user_id, 30).await?; Ok(Some(AdminUserDetailDto { user, @@ -1808,9 +1829,98 @@ async fn load_admin_user_detail( torrent_sessions, lastfm_connected, }, + recent_plays, })) } +async fn load_admin_user_recent_plays( + pool: &PgPool, + user_id: i64, + limit: i64, +) -> anyhow::Result> { + let rows = sqlx::query_as::<_, AdminUserPlaySqlRow>( + r#"SELECT ph.id AS history_id, + ph.played_at::text AS played_at, + ph.duration_listened, + ph.completed, + t.id AS track_id, + t.title::text AS title, + COALESCE(NULLIF(STRING_AGG(a.name::text, ', ' ORDER BY ta.position), ''), 'Unknown artist') AS artists, + t.release_id, + COALESCE(r.title::text, '') AS release_title, + r.year AS release_year, + t.cover_file_id, + r.cover_file_id AS release_cover_file_id, + t.duration_seconds AS track_duration_seconds, + COALESCE(mf.uploader_name, 'UFO')::text AS uploader_name, + mf.audio_format, + mf.audio_bitrate + FROM furumusic__play_history ph + JOIN furumusic__track t ON t.id = ph.track_id + LEFT JOIN furumusic__release r ON r.id = t.release_id + LEFT JOIN furumusic__media_file mf ON mf.id = t.audio_file_id + LEFT JOIN furumusic__track_artist ta ON ta.track_id = t.id AND ta.role = 'main' + LEFT JOIN furumusic__artist a ON a.id = ta.artist_id + WHERE ph.user_id = $1 + GROUP BY ph.id, ph.played_at, ph.duration_listened, ph.completed, t.id, t.title, + t.release_id, r.title, r.year, t.cover_file_id, r.cover_file_id, + t.duration_seconds, mf.uploader_name, mf.audio_format, mf.audio_bitrate + ORDER BY ph.played_at DESC, ph.id DESC + LIMIT $2"#, + ) + .bind(user_id) + .bind(limit) + .fetch_all(pool) + .await?; + + Ok(rows + .into_iter() + .map(|row| AdminUserPlayDto { + history_id: row.history_id, + played_at: row.played_at, + duration_listened: row.duration_listened, + completed: row.completed, + track_id: row.track_id, + title: row.title, + artists: row.artists, + release_id: row.release_id, + release_title: row.release_title, + release_year: row.release_year, + cover_url: admin_track_cover_url(row.cover_file_id, row.release_cover_file_id), + track_duration_seconds: row.track_duration_seconds, + uploader_name: row.uploader_name, + audio_format: row.audio_format, + audio_bitrate: row.audio_bitrate, + }) + .collect()) +} + +#[derive(Debug, sqlx::FromRow)] +struct AdminUserPlaySqlRow { + history_id: i64, + played_at: String, + duration_listened: Option, + completed: bool, + track_id: i64, + title: String, + artists: String, + release_id: i64, + release_title: String, + release_year: Option, + cover_file_id: Option, + release_cover_file_id: Option, + track_duration_seconds: f64, + uploader_name: String, + audio_format: Option, + audio_bitrate: Option, +} + +fn admin_track_cover_url(track_cover: Option, release_cover: Option) -> Option { + track_cover + .or(release_cover) + .map(|id| format!("/api/player/cover/{id}/medium")) +} + fn admin_user_row( row: AdminUserSqlRow, active: &HashMap, diff --git a/src/jobs/inbox_process.rs b/src/jobs/inbox_process.rs index 6dd2d7b..e892ce8 100644 --- a/src/jobs/inbox_process.rs +++ b/src/jobs/inbox_process.rs @@ -17,28 +17,99 @@ pub fn is_orchestrator_running() -> bool { ORCHESTRATOR_RUNNING.load(Ordering::SeqCst) } -/// Try to acquire the PostgreSQL advisory lock for the orchestrator. -/// Returns true if the lock was acquired (no other orchestrator is running). -async fn try_acquire_orchestrator_lock(pool: &sqlx::PgPool) -> bool { - match sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_lock($1)") - .bind(ORCHESTRATOR_ADVISORY_LOCK_ID) - .fetch_one(pool) - .await - { - Ok(acquired) => acquired, - Err(e) => { - tracing::error!("Failed to acquire advisory lock: {e}"); - false - } +struct OrchestratorAdvisoryGuard { + conn: Option>, +} + +impl Drop for OrchestratorAdvisoryGuard { + fn drop(&mut self) { + let Some(mut conn) = self.conn.take() else { + return; + }; + + tokio::spawn(async move { + match sqlx::query_scalar::<_, bool>("SELECT pg_advisory_unlock($1)") + .bind(ORCHESTRATOR_ADVISORY_LOCK_ID) + .fetch_one(&mut *conn) + .await + { + Ok(true) => tracing::info!("inbox_process: advisory lock released"), + Ok(false) => tracing::warn!( + "inbox_process: advisory lock was not held by the guard connection" + ), + Err(e) => tracing::error!("inbox_process: failed to release advisory lock: {e}"), + } + }); } } -/// Release the PostgreSQL advisory lock for the orchestrator. -async fn release_orchestrator_lock(pool: &sqlx::PgPool) { - let _ = sqlx::query("SELECT pg_advisory_unlock($1)") +async fn connection_holds_orchestrator_lock( + conn: &mut sqlx::pool::PoolConnection, +) -> anyhow::Result { + let holds_lock = sqlx::query_scalar::<_, bool>( + r#" + SELECT EXISTS ( + SELECT 1 + FROM pg_locks + WHERE locktype = 'advisory' + AND pid = pg_backend_pid() + AND mode = 'ExclusiveLock' + AND granted + AND objsubid = 1 + AND classid::bigint = (($1::bigint >> 32) & 4294967295::bigint) + AND objid::bigint = ($1::bigint & 4294967295::bigint) + ) + "#, + ) + .bind(ORCHESTRATOR_ADVISORY_LOCK_ID) + .fetch_one(&mut **conn) + .await?; + + Ok(holds_lock) +} + +/// Try to acquire the PostgreSQL advisory lock for the orchestrator. +/// +/// The guard owns the same pooled connection that acquired the session-level +/// lock. This matters because PostgreSQL session advisory locks must be +/// released on the same connection, not just through the same pool. +async fn try_acquire_orchestrator_lock( + pool: &sqlx::PgPool, +) -> anyhow::Result> { + let mut conn = pool.acquire().await?; + + // Older versions acquired the lock through the pool and could return a + // locked idle connection. If we get such a connection, drain that stale + // re-entrant lock before taking the new guarded lock. + let mut cleaned_stale_locks = 0u8; + while connection_holds_orchestrator_lock(&mut conn).await? { + let unlocked = sqlx::query_scalar::<_, bool>("SELECT pg_advisory_unlock($1)") + .bind(ORCHESTRATOR_ADVISORY_LOCK_ID) + .fetch_one(&mut *conn) + .await?; + + cleaned_stale_locks = cleaned_stale_locks.saturating_add(u8::from(unlocked)); + if cleaned_stale_locks >= 8 { + break; + } + } + if cleaned_stale_locks > 0 { + tracing::warn!( + count = cleaned_stale_locks, + "inbox_process: released stale advisory lock(s) from an idle pooled connection" + ); + } + + let acquired = sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_lock($1)") .bind(ORCHESTRATOR_ADVISORY_LOCK_ID) - .execute(pool) - .await; + .fetch_one(&mut *conn) + .await?; + + if acquired { + Ok(Some(OrchestratorAdvisoryGuard { conn: Some(conn) })) + } else { + Ok(None) + } } use crate::agent::dto::{FolderContext, NormalizedFields, PathHints, RawMetadata}; @@ -83,10 +154,10 @@ impl Job for InboxProcessJob { .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_err() { - log.info( + log.warn( "Another inbox_process orchestrator is already running (AtomicBool), skipping", ); - return Ok(()); + anyhow::bail!("another inbox_process orchestrator is already running in this process"); } struct AtomicGuard; impl Drop for AtomicGuard { @@ -98,27 +169,16 @@ impl Job for InboxProcessJob { let _atomic_guard = AtomicGuard; // --- Guard 2: PostgreSQL advisory lock (cross-process/binary safe) --- - if !try_acquire_orchestrator_lock(&ctx.pool).await { - log.info("Another inbox_process orchestrator holds the advisory lock, skipping"); - return Ok(()); - } - tracing::info!("inbox_process: advisory lock acquired"); - let pool_for_unlock = ctx.pool.clone(); - struct AdvisoryGuard { - pool: sqlx::PgPool, - } - impl Drop for AdvisoryGuard { - fn drop(&mut self) { - let pool = self.pool.clone(); - tokio::spawn(async move { - release_orchestrator_lock(&pool).await; - tracing::info!("inbox_process: advisory lock released"); - }); + let _advisory_guard = match try_acquire_orchestrator_lock(&ctx.pool).await? { + Some(guard) => guard, + None => { + log.warn("Another inbox_process orchestrator holds the advisory lock"); + anyhow::bail!( + "inbox_process advisory lock is held by another database session; no in-process orchestrator is running" + ); } - } - let _advisory_guard = AdvisoryGuard { - pool: pool_for_unlock, }; + tracing::info!("inbox_process: advisory lock acquired"); let config = Arc::clone(&ctx.config); let mut total_ok = 0u64; diff --git a/src/player/mod.rs b/src/player/mod.rs index 0d8a514..5c93de8 100644 --- a/src/player/mod.rs +++ b/src/player/mod.rs @@ -2774,13 +2774,72 @@ async fn artists_handler( pool: &sqlx::PgPool, query: cot::request::extractors::UrlQuery, ) -> cot::Result { - let Some(_user) = auth::get_session_user(&session, &db).await else { + let Some(user) = auth::get_session_user(&session, &db).await else { return Ok(json_error(StatusCode::UNAUTHORIZED, "not authenticated")); }; let page = query.0.page.unwrap_or(1).max(1); let per_page = query.0.limit.unwrap_or(60).clamp(1, 200); let offset = (page - 1) as i64 * per_page as i64; + let mine = query.0.mine.unwrap_or(false); + + if mine { + let total_row = sqlx::query_as::<_, CountRow>( + r#"SELECT COUNT(DISTINCT a.id) AS count + FROM furumusic__artist a + JOIN furumusic__track_artist ta ON ta.artist_id = a.id AND ta.role <> 'featuring' + JOIN furumusic__track t ON t.id = ta.track_id AND t.is_hidden = false + JOIN furumusic__release r ON r.id = t.release_id AND r.is_hidden = false + JOIN furumusic__media_file mf ON mf.id = t.audio_file_id + WHERE a.is_hidden = false + AND mf.uploaded_by_user_id = $1"#, + ) + .bind(user.id) + .fetch_one(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + + let rows = sqlx::query_as::<_, ArtistRow>( + r#"SELECT a.id, a.name::text as name, a.image_file_id, + COUNT(DISTINCT r.id) AS release_count, + COUNT(DISTINCT t.id) AS track_count + FROM furumusic__artist a + JOIN furumusic__track_artist ta ON ta.artist_id = a.id AND ta.role <> 'featuring' + JOIN furumusic__track t ON t.id = ta.track_id AND t.is_hidden = false + JOIN furumusic__release r ON r.id = t.release_id AND r.is_hidden = false + JOIN furumusic__media_file mf ON mf.id = t.audio_file_id + WHERE a.is_hidden = false + AND mf.uploaded_by_user_id = $1 + GROUP BY a.id, a.name, a.name_sort, a.image_file_id + ORDER BY release_count DESC, track_count DESC, a.name_sort + LIMIT $2 OFFSET $3"#, + ) + .bind(user.id) + .bind(per_page as i64) + .bind(offset) + .fetch_all(pool) + .await + .map_err(|e| cot::Error::internal(e.to_string()))?; + + let items: Vec = rows + .into_iter() + .map(|r| ArtistCard { + id: r.id, + name: r.name, + image_url: cover_variant_url(r.image_file_id, "medium"), + release_count: r.release_count, + track_count: r.track_count, + }) + .collect(); + + return Json(Paginated { + items, + total: total_row.count, + page, + per_page, + }) + .into_response(); + } let total_row = sqlx::query_as::<_, CountRow>( r#"SELECT COUNT(DISTINCT a.id) AS count diff --git a/src/player/queries.rs b/src/player/queries.rs index 970142b..bb28f5a 100644 --- a/src/player/queries.rs +++ b/src/player/queries.rs @@ -49,6 +49,7 @@ pub(super) struct RemoveTrackRequest { pub(super) struct PaginationQuery { pub(super) page: Option, pub(super) limit: Option, + pub(super) mine: Option, } #[derive(Debug, Deserialize)] diff --git a/templates/admin/v2.html b/templates/admin/v2.html index 23539b3..598964a 100644 --- a/templates/admin/v2.html +++ b/templates/admin/v2.html @@ -1332,6 +1332,89 @@ tbody tr:hover { .user-stats-grid { grid-template-columns: repeat(4, minmax(0, 1fr)); } + +.user-activity-list { + display: grid; + gap: 8px; +} + +.user-activity-row { + min-width: 0; + padding: 8px; + border: 1px solid var(--border-color); + border-radius: 8px; + background: var(--bg-primary); + display: grid; + grid-template-columns: 52px minmax(0, 1fr) auto; + align-items: center; + gap: 10px; +} + +.user-activity-cover { + width: 52px; + height: 52px; + border-radius: 4px; + background: var(--bg-elevated); + overflow: hidden; + display: flex; + align-items: center; + justify-content: center; + color: var(--text-subdued); +} + +.user-activity-cover img { + width: 100%; + height: 100%; + object-fit: cover; +} + +.user-activity-cover svg { + width: 24px; + height: 24px; +} + +.user-activity-main { + min-width: 0; +} + +.user-activity-title, +.user-activity-meta, +.user-activity-sub { + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + +.user-activity-title { + color: var(--text-primary); + font-size: 13px; + font-weight: 800; +} + +.user-activity-meta { + margin-top: 3px; + color: var(--text-secondary); + font-size: 12px; +} + +.user-activity-sub { + margin-top: 3px; + color: var(--text-subdued); + font-size: 11px; +} + +.user-activity-time { + min-width: 118px; + color: var(--text-subdued); + font-size: 11px; + text-align: right; +} + +.user-activity-time strong { + display: block; + color: var(--text-primary); + font-size: 12px; +} {% endblock head_extra %} @@ -2160,7 +2243,7 @@ tbody tr:hover { +
+
+ +
No play history for this user yet
+
+
@@ -2837,7 +2946,7 @@ function adminV2() { async openUser(user) { if (!user) return; this.userModalTab = 'overview'; - this.activeUserDetail = { user, stats: {} }; + this.activeUserDetail = { user, stats: {}, recent_plays: [] }; this.userModalOpen = true; try { this.activeUserDetail = await this.request(`${this.apiBase}/users/${user.id}`); @@ -3788,6 +3897,26 @@ function adminV2() { ]; }, + userPlayListened(play) { + const listened = play?.duration_listened; + const duration = play?.track_duration_seconds; + const listenedText = listened == null ? 'unknown' : this.durationApprox(listened); + const durationText = duration ? this.durationApprox(duration) : null; + return durationText ? `${listenedText} / ${durationText}` : listenedText; + }, + + userPlayMeta(play) { + const parts = []; + if (play.release_title) { + parts.push(play.release_year ? `${play.release_title} (${play.release_year})` : play.release_title); + } + if (play.audio_format) parts.push(String(play.audio_format).toUpperCase()); + if (play.audio_bitrate) parts.push(`${play.audio_bitrate} kbps`); + if (play.uploader_name) parts.push(`uploaded by ${play.uploader_name}`); + parts.push(play.completed ? 'completed' : 'partial'); + return parts.join(' ยท '); + }, + durationApprox(seconds) { const value = Math.max(0, Number(seconds || 0)); if (value < 60) return `${Math.floor(value)}s`; diff --git a/templates/player/scripts.html b/templates/player/scripts.html index 3e5ffa1..77e4f5c 100644 --- a/templates/player/scripts.html +++ b/templates/player/scripts.html @@ -1847,11 +1847,13 @@ document.addEventListener('alpine:init', () => { // ----------------------------------------------------------------------- Alpine.store('library', { view: 'artists', + artistFilter: 'all', artists: [], artistsPage: 0, artistsTotal: 0, loading: false, _allLoaded: false, + _artistsLoadToken: 0, currentArtist: null, currentRelease: null, currentPlaylist: null, @@ -1866,7 +1868,6 @@ document.addEventListener('alpine:init', () => { _hashNav: false, // guard against circular hash updates init() { - this.loadArtists(1); this._setupScroll(); // Listen for browser back/forward @@ -1903,7 +1904,10 @@ document.addEventListener('alpine:init', () => { const params = match[3] || ''; if (view === 'artists' && !id) { - if (this.view !== 'artists') this.goArtists(options); + if (this.view !== 'artists' || this.artistsPage === 0) this.goArtists(options); + else if (options.restoreScroll) this._restoreScrollPosition(hash); + } else if (view === 'uploads' && !id) { + if (this.view !== 'my_uploads' || this.artistsPage === 0) this.goMyUploads(options); else if (options.restoreScroll) this._restoreScrollPosition(hash); } else if (view === 'artist' && id) { this.openArtist(id, options); @@ -1969,8 +1973,22 @@ document.addEventListener('alpine:init', () => { } }, + _resetArtistList(filter) { + this.artistFilter = filter; + this.artists = []; + this.artistsPage = 0; + this.artistsTotal = 0; + this._allLoaded = false; + this.loading = false; + this._artistsLoadToken += 1; + }, + goArtists(options = {}) { this._beginNavigation('#artists', options); + if (this.artistFilter !== 'all' || this.artistsPage === 0) { + this._resetArtistList('all'); + this.loadArtists(1); + } this.view = 'artists'; this.currentArtist = null; this.currentRelease = null; @@ -1982,13 +2000,35 @@ document.addEventListener('alpine:init', () => { this._afterNavigation(options); }, + goMyUploads(options = {}) { + this._beginNavigation('#uploads', options); + if (this.artistFilter !== 'uploads' || this.artistsPage === 0) { + this._resetArtistList('uploads'); + this.loadArtists(1); + } + this.view = 'my_uploads'; + this.currentArtist = null; + this.currentRelease = null; + this.currentPlaylist = null; + this.searchQuery = ''; + this.searchResults = null; + this._previousView = 'my_uploads'; + this.$nextTick(() => { this._setupScroll(); }); + this._afterNavigation(options); + }, + async loadArtists(page) { if (this.loading || this._allLoaded) return; this.loading = true; + const filter = this.artistFilter; + const token = this._artistsLoadToken + 1; + this._artistsLoadToken = token; try { - const res = await fetch(`/api/player/artists?page=${page}&limit=60`); + const mine = filter === 'uploads' ? '&mine=true' : ''; + const res = await fetch(`/api/player/artists?page=${page}&limit=60${mine}`); if (!res.ok) throw new Error('failed'); const data = await res.json(); + if (token !== this._artistsLoadToken || filter !== this.artistFilter) return; if (page === 1) { this.artists = data.items; } else { @@ -2000,7 +2040,9 @@ document.addEventListener('alpine:init', () => { this._allLoaded = true; } } catch {} - this.loading = false; + if (token === this._artistsLoadToken) { + this.loading = false; + } }, async openArtist(id, options = {}) { @@ -2337,8 +2379,8 @@ document.addEventListener('alpine:init', () => { this.searchLoading = false; if (this.view === 'search') { this.view = this._previousView || 'artists'; - this._setHash('#artists'); - if (this.view === 'artists') { + this._setHash(this.view === 'my_uploads' ? '#uploads' : '#artists'); + if (this.view === 'artists' || this.view === 'my_uploads') { this.$nextTick(() => { this._setupScroll(); }); } } diff --git a/templates/player/shell.html b/templates/player/shell.html index 87ddc31..d6536e4 100644 --- a/templates/player/shell.html +++ b/templates/player/shell.html @@ -62,6 +62,12 @@ {{ t.player_artists }} +
@@ -507,9 +519,9 @@ -