diff --git a/Cargo.toml b/Cargo.toml index f8e7556..a66d969 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "furumusic" -version = "0.1.10" +version = "0.1.11" edition = "2024" description = "Reusable web-app boilerplate: auth, OIDC/SSO, admin panel, user management, i18n, PostgreSQL" diff --git a/src/music/mod.rs b/src/music/mod.rs index a768029..816675b 100644 --- a/src/music/mod.rs +++ b/src/music/mod.rs @@ -1578,6 +1578,65 @@ pub mod db_migrations { &[Operation::custom(add_media_file_uploader).build()]; } + // -- M0031: persistent torrent import sessions --------------------------- + + #[cot::db::migrations::migration_op] + async fn create_torrent_session(ctx: migrations::MigrationContext<'_>) -> cot::db::Result<()> { + ctx.db + .raw( + "CREATE TABLE IF NOT EXISTS furumusic__torrent_session ( + id VARCHAR(36) PRIMARY KEY, + user_id BIGINT NOT NULL, + name TEXT NOT NULL, + info_hash VARCHAR(80) NOT NULL, + source_kind VARCHAR(32) NOT NULL, + source_label TEXT, + torrent_bytes BYTEA NOT NULL, + files_json TEXT NOT NULL, + selected_files_json TEXT NOT NULL DEFAULT '[]', + status VARCHAR(32) NOT NULL, + total_size BIGINT NOT NULL DEFAULT 0, + selected_size BIGINT NOT NULL DEFAULT 0, + downloaded_bytes BIGINT NOT NULL DEFAULT 0, + uploaded_bytes BIGINT NOT NULL DEFAULT 0, + progress_percent DOUBLE PRECISION NOT NULL DEFAULT 0, + error TEXT, + created_at VARCHAR(32) NOT NULL, + updated_at VARCHAR(32) NOT NULL, + completed_at VARCHAR(32) + )", + ) + .await?; + ctx.db + .raw( + "CREATE INDEX IF NOT EXISTS idx_torrent_session_user_updated + ON furumusic__torrent_session (user_id, updated_at DESC)", + ) + .await?; + ctx.db + .raw( + "CREATE INDEX IF NOT EXISTS idx_torrent_session_user_status + ON furumusic__torrent_session (user_id, status)", + ) + .await?; + Ok(()) + } + + #[derive(Debug, Copy, Clone)] + pub struct M0031CreateTorrentSession; + + impl migrations::Migration for M0031CreateTorrentSession { + const APP_NAME: &'static str = "furumusic"; + const MIGRATION_NAME: &'static str = "m_0031_create_torrent_session"; + const DEPENDENCIES: &'static [migrations::MigrationDependency] = + &[migrations::MigrationDependency::migration( + "furumusic", + "m_0030_add_media_file_uploader", + )]; + const OPERATIONS: &'static [Operation] = + &[Operation::custom(create_torrent_session).build()]; + } + pub const MIGRATIONS: &[&SyncDynMigration] = &[ &M0006CreateMediaFile, &M0007CreateArtist, @@ -1599,5 +1658,6 @@ pub mod db_migrations { &M0028AddModelNameColumns, &M0029AddPlaybackVolume, &M0030AddMediaFileUploader, + &M0031CreateTorrentSession, ]; } diff --git a/src/player/mod.rs b/src/player/mod.rs index c5dd1de..a6ab270 100644 --- a/src/player/mod.rs +++ b/src/player/mod.rs @@ -2136,28 +2136,170 @@ impl App for PlayerApp { ), // -- Torrent import widget -- Route::with_handler_and_name( - "/torrents/preview", + "/torrents", { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); let torrent_service = Arc::clone(&torrent_service); let scheduler_handle = Arc::clone(&self.scheduler_handle); - post( - move |session: Session, db: Database, json: Json| { + get(move |session: Session, db: Database| { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + let torrent_service = Arc::clone(&torrent_service); + let scheduler_handle = Arc::clone(&scheduler_handle); + async move { + let Some(user) = auth::get_session_user(&session, &db).await else { + return Ok(json_error( + StatusCode::UNAUTHORIZED, + "not authenticated", + )); + }; + let pg_pool = pool + .get_or_init(|| async { + sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&pool_config.database_url) + .await + .expect("player pool") + }) + .await; + let service = torrent_service + .get_or_init(|| async { + Arc::new(TorrentService::new(Arc::clone(&scheduler_handle))) + }) + .await; + match service.list(pg_pool, user.id).await { + Ok(items) => Json(items).into_response(), + Err(err) => { + Ok(json_error(StatusCode::BAD_REQUEST, &err.to_string())) + } + } + } + }) + }, + "player_torrent_list", + ), + Route::with_handler_and_name( + "/torrents/session/{id}", + { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + let torrent_service = Arc::clone(&torrent_service); + let scheduler_handle = Arc::clone(&self.scheduler_handle); + get({ + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + let torrent_service = Arc::clone(&torrent_service); + let scheduler_handle = Arc::clone(&scheduler_handle); + move |session: Session, db: Database, path: Path| { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); let torrent_service = Arc::clone(&torrent_service); let scheduler_handle = Arc::clone(&scheduler_handle); async move { - let Some(_user) = auth::get_session_user(&session, &db).await - else { + let Some(user) = auth::get_session_user(&session, &db).await else { return Ok(json_error( StatusCode::UNAUTHORIZED, "not authenticated", )); }; + let pg_pool = pool + .get_or_init(|| async { + sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&pool_config.database_url) + .await + .expect("player pool") + }) + .await; + let service = torrent_service + .get_or_init(|| async { + Arc::new(TorrentService::new(Arc::clone( + &scheduler_handle, + ))) + }) + .await; + match service.details(pg_pool, user.id, &path.0.id).await { + Ok(details) => Json(details).into_response(), + Err(err) => { + Ok(json_error(StatusCode::NOT_FOUND, &err.to_string())) + } + } + } + } + }) + .delete(move |session: Session, db: Database, path: Path| { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + let torrent_service = Arc::clone(&torrent_service); + let scheduler_handle = Arc::clone(&scheduler_handle); + async move { + let Some(user) = auth::get_session_user(&session, &db).await else { + return Ok(json_error( + StatusCode::UNAUTHORIZED, + "not authenticated", + )); + }; + let pg_pool = pool + .get_or_init(|| async { + sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&pool_config.database_url) + .await + .expect("player pool") + }) + .await; + let service = torrent_service + .get_or_init(|| async { + Arc::new(TorrentService::new(Arc::clone(&scheduler_handle))) + }) + .await; + match service.remove(pg_pool, user.id, &path.0.id).await { + Ok(()) => Json(serde_json::json!({ "ok": true })).into_response(), + Err(err) => { + Ok(json_error(StatusCode::NOT_FOUND, &err.to_string())) + } + } + } + }) + }, + "player_torrent_detail", + ), + Route::with_handler_and_name( + "/torrents/preview", + { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + let torrent_service = Arc::clone(&torrent_service); + let scheduler_handle = Arc::clone(&self.scheduler_handle); + post( + move |session: Session, db: Database, json: Json| { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); + let torrent_service = Arc::clone(&torrent_service); + let scheduler_handle = Arc::clone(&scheduler_handle); + async move { + let Some(user) = auth::get_session_user(&session, &db).await else { + return Ok(json_error( + StatusCode::UNAUTHORIZED, + "not authenticated", + )); + }; + let pg_pool = pool + .get_or_init(|| async { + sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&pool_config.database_url) + .await + .expect("player pool") + }) + .await; let service = torrent_service .get_or_init(|| async { Arc::new(TorrentService::new(Arc::clone(&scheduler_handle))) }) .await; - match service.preview(json.0).await { + match service.preview(pg_pool, user.id, json.0).await { Ok(preview) => Json(preview).into_response(), Err(err) => { Ok(json_error(StatusCode::BAD_REQUEST, &err.to_string())) @@ -2172,6 +2314,8 @@ impl App for PlayerApp { Route::with_handler_and_name( "/torrents/{id}/start", { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); let torrent_service = Arc::clone(&torrent_service); let scheduler_handle = Arc::clone(&self.scheduler_handle); post( @@ -2179,6 +2323,8 @@ impl App for PlayerApp { db: Database, path: Path, json: Json| { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); let torrent_service = Arc::clone(&torrent_service); let scheduler_handle = Arc::clone(&scheduler_handle); async move { @@ -2188,6 +2334,15 @@ impl App for PlayerApp { "not authenticated", )); }; + let pg_pool = pool + .get_or_init(|| async { + sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&pool_config.database_url) + .await + .expect("player pool") + }) + .await; let (live_config, _) = AppConfig::load_with_db(&db).await; let service = torrent_service .get_or_init(|| async { @@ -2196,6 +2351,7 @@ impl App for PlayerApp { .await; match service .start( + pg_pool, &path.0.id, json.0.selected_files, live_config.agent_inbox_dir, @@ -2217,26 +2373,38 @@ impl App for PlayerApp { Route::with_handler_and_name( "/torrents/{id}/status", { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); let torrent_service = Arc::clone(&torrent_service); let scheduler_handle = Arc::clone(&self.scheduler_handle); get( move |session: Session, db: Database, path: Path| { + let pool = Arc::clone(&pool); + let pool_config = Arc::clone(&pool_config); let torrent_service = Arc::clone(&torrent_service); let scheduler_handle = Arc::clone(&scheduler_handle); async move { - let Some(_user) = auth::get_session_user(&session, &db).await - else { + let Some(user) = auth::get_session_user(&session, &db).await else { return Ok(json_error( StatusCode::UNAUTHORIZED, "not authenticated", )); }; + let pg_pool = pool + .get_or_init(|| async { + sqlx::postgres::PgPoolOptions::new() + .max_connections(5) + .connect(&pool_config.database_url) + .await + .expect("player pool") + }) + .await; let service = torrent_service .get_or_init(|| async { Arc::new(TorrentService::new(Arc::clone(&scheduler_handle))) }) .await; - match service.status(&path.0.id).await { + match service.status(pg_pool, user.id, &path.0.id).await { Ok(job) => Json(job).into_response(), Err(err) => { Ok(json_error(StatusCode::NOT_FOUND, &err.to_string())) diff --git a/src/torrents.rs b/src/torrents.rs index 313ab8f..9232294 100644 --- a/src/torrents.rs +++ b/src/torrents.rs @@ -9,14 +9,16 @@ use librqbit::{ AddTorrent, AddTorrentOptions, AddTorrentResponse, ManagedTorrent, Session, SessionOptions, }; use serde::{Deserialize, Serialize}; +use sqlx::{FromRow, PgPool}; use tokio::sync::{Mutex, OnceCell}; use uuid::Uuid; use crate::scheduler::SchedulerHandle; const METADATA_TIMEOUT: Duration = Duration::from_secs(90); +const TORRENT_LIST_LIMIT: i64 = 100; -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct TorrentFileDto { pub index: usize, pub name: String, @@ -40,11 +42,29 @@ pub struct TorrentJobDto { pub name: String, pub info_hash: String, pub status: String, + pub client_state: Option, pub total_size: u64, pub selected_size: u64, pub downloaded_bytes: u64, + pub uploaded_bytes: u64, pub progress_percent: f64, + pub download_speed_mbps: Option, + pub upload_speed_mbps: Option, + pub peers_live: Option, + pub peers_seen: Option, + pub eta: Option, + pub active: bool, pub error: Option, + pub created_at: Option, + pub updated_at: Option, + pub completed_at: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct TorrentSessionDto { + pub job: TorrentJobDto, + pub preview: TorrentPreviewDto, + pub selected_files: Vec, } #[derive(Debug, Deserialize)] @@ -54,11 +74,21 @@ pub enum TorrentPreviewKind { TorrentFile, } +impl TorrentPreviewKind { + fn as_str(&self) -> &'static str { + match self { + Self::Magnet => "magnet", + Self::TorrentFile => "torrent_file", + } + } +} + #[derive(Debug, Deserialize)] pub struct TorrentPreviewRequest { pub kind: TorrentPreviewKind, pub magnet: Option, pub torrent_base64: Option, + pub source_label: Option, } #[derive(Debug, Deserialize)] @@ -73,6 +103,7 @@ enum TorrentJobStatus { Moving, Complete, Failed, + Paused, } impl TorrentJobStatus { @@ -83,21 +114,165 @@ impl TorrentJobStatus { Self::Moving => "moving", Self::Complete => "complete", Self::Failed => "failed", + Self::Paused => "paused", + } + } + + fn from_str(value: &str) -> Self { + match value { + "downloading" => Self::Downloading, + "moving" => Self::Moving, + "complete" => Self::Complete, + "failed" => Self::Failed, + "paused" => Self::Paused, + _ => Self::Preview, } } } struct TorrentJob { id: String, + user_id: i64, name: String, info_hash: String, + source_kind: String, + source_label: Option, torrent_bytes: Vec, files: Vec, status: TorrentJobStatus, output_dir: PathBuf, selected_files: Vec, handle: Option>, + downloaded_bytes: u64, + uploaded_bytes: u64, + progress_percent: f64, error: Option, + created_at: String, + updated_at: String, + completed_at: Option, +} + +#[derive(Debug, FromRow)] +struct TorrentSessionRow { + id: String, + user_id: i64, + name: String, + info_hash: String, + source_kind: String, + source_label: Option, + torrent_bytes: Vec, + files_json: String, + selected_files_json: String, + status: String, + total_size: i64, + selected_size: i64, + downloaded_bytes: i64, + uploaded_bytes: i64, + progress_percent: f64, + error: Option, + created_at: String, + updated_at: String, + completed_at: Option, +} + +impl TorrentSessionRow { + fn files(&self) -> anyhow::Result> { + serde_json::from_str(&self.files_json).context("invalid torrent file list") + } + + fn selected_files(&self) -> Vec { + serde_json::from_str(&self.selected_files_json).unwrap_or_default() + } + + fn dto(&self, handle: Option<&Arc>) -> TorrentJobDto { + let active = handle.is_some(); + let status = if active { + self.status.as_str() + } else if self.status == "downloading" || self.status == "moving" { + "paused" + } else { + self.status.as_str() + }; + let stats = handle.map(|h| h.stats()); + let downloaded_bytes = stats + .as_ref() + .map(|s| s.progress_bytes) + .unwrap_or_else(|| i64_to_u64(self.downloaded_bytes)); + let uploaded_bytes = stats + .as_ref() + .map(|s| s.uploaded_bytes) + .unwrap_or_else(|| i64_to_u64(self.uploaded_bytes)); + let total_bytes = stats + .as_ref() + .map(|s| s.total_bytes) + .filter(|v| *v > 0) + .unwrap_or_else(|| i64_to_u64(self.selected_size)); + let progress_percent = progress_percent(downloaded_bytes, total_bytes) + .unwrap_or(self.progress_percent) + .clamp(0.0, 100.0); + let live = stats.as_ref().and_then(|s| s.live.as_ref()); + let peer_stats = live.map(|l| &l.snapshot.peer_stats); + + TorrentJobDto { + id: self.id.clone(), + name: self.name.clone(), + info_hash: self.info_hash.clone(), + status: status.to_string(), + client_state: stats.as_ref().map(|s| s.state.to_string()), + total_size: i64_to_u64(self.total_size), + selected_size: i64_to_u64(self.selected_size), + downloaded_bytes, + uploaded_bytes, + progress_percent, + download_speed_mbps: live.map(|l| l.download_speed.mbps), + upload_speed_mbps: live.map(|l| l.upload_speed.mbps), + peers_live: peer_stats.map(|p| p.live), + peers_seen: peer_stats.map(|p| p.seen), + eta: live.and_then(|l| l.time_remaining.as_ref().map(|eta| eta.to_string())), + active, + error: self.error.clone(), + created_at: Some(self.created_at.clone()), + updated_at: Some(self.updated_at.clone()), + completed_at: self.completed_at.clone(), + } + } + + fn preview(&self) -> anyhow::Result { + Ok(TorrentPreviewDto { + id: self.id.clone(), + name: self.name.clone(), + info_hash: self.info_hash.clone(), + total_size: i64_to_u64(self.total_size), + files: self.files()?, + }) + } + + fn into_job(self, temp_root: &Path) -> anyhow::Result { + let id = self.id.clone(); + let files = self.files()?; + let selected_files = self.selected_files(); + Ok(TorrentJob { + id: id.clone(), + user_id: self.user_id, + name: self.name, + info_hash: self.info_hash, + source_kind: self.source_kind, + source_label: self.source_label, + torrent_bytes: self.torrent_bytes, + files, + status: TorrentJobStatus::from_str(&self.status), + output_dir: temp_root.join(&id).join("download"), + selected_files, + handle: None, + downloaded_bytes: i64_to_u64(self.downloaded_bytes), + uploaded_bytes: i64_to_u64(self.uploaded_bytes), + progress_percent: self.progress_percent, + error: self.error, + created_at: self.created_at, + updated_at: self.updated_at, + completed_at: self.completed_at, + }) + } } impl TorrentJob { @@ -106,65 +281,72 @@ impl TorrentJob { } fn selected_size(&self) -> u64 { - if self.selected_files.is_empty() { - return 0; + selected_size(&self.files, &self.selected_files) + } + + fn preview(&self) -> TorrentPreviewDto { + TorrentPreviewDto { + id: self.id.clone(), + name: self.name.clone(), + info_hash: self.info_hash.clone(), + total_size: self.total_size(), + files: self.files.clone(), } - self.files - .iter() - .filter(|f| self.selected_files.contains(&f.index)) - .map(|f| f.length) - .sum() + } + + fn refresh_progress(&mut self) { + let Some(handle) = &self.handle else { + return; + }; + let stats = handle.stats(); + self.downloaded_bytes = stats.progress_bytes; + self.uploaded_bytes = stats.uploaded_bytes; + self.progress_percent = progress_percent(stats.progress_bytes, stats.total_bytes) + .unwrap_or(self.progress_percent) + .clamp(0.0, 100.0); } fn dto(&self) -> TorrentJobDto { let stats = self.handle.as_ref().map(|h| h.stats()); - let downloaded_bytes = stats.as_ref().map(|s| s.progress_bytes).unwrap_or(0); + let downloaded_bytes = stats + .as_ref() + .map(|s| s.progress_bytes) + .unwrap_or(self.downloaded_bytes); + let uploaded_bytes = stats + .as_ref() + .map(|s| s.uploaded_bytes) + .unwrap_or(self.uploaded_bytes); let total_bytes = stats .as_ref() .map(|s| s.total_bytes) .filter(|v| *v > 0) .unwrap_or_else(|| self.selected_size()); - let progress_percent = if total_bytes == 0 { - 0.0 - } else { - downloaded_bytes as f64 / total_bytes as f64 * 100.0 - }; + let live = stats.as_ref().and_then(|s| s.live.as_ref()); + let peer_stats = live.map(|l| &l.snapshot.peer_stats); - Self::dto_from_parts( - &self.id, - &self.name, - &self.info_hash, - self.status, - self.total_size(), - self.selected_size(), - downloaded_bytes, - progress_percent, - self.error.clone(), - ) - } - - #[allow(clippy::too_many_arguments)] - fn dto_from_parts( - id: &str, - name: &str, - info_hash: &str, - status: TorrentJobStatus, - total_size: u64, - selected_size: u64, - downloaded_bytes: u64, - progress_percent: f64, - error: Option, - ) -> TorrentJobDto { TorrentJobDto { - id: id.to_string(), - name: name.to_string(), - info_hash: info_hash.to_string(), - status: status.as_str().to_string(), - total_size, - selected_size, + id: self.id.clone(), + name: self.name.clone(), + info_hash: self.info_hash.clone(), + status: self.status.as_str().to_string(), + client_state: stats.as_ref().map(|s| s.state.to_string()), + total_size: self.total_size(), + selected_size: self.selected_size(), downloaded_bytes, - progress_percent: progress_percent.clamp(0.0, 100.0), - error, + uploaded_bytes, + progress_percent: progress_percent(downloaded_bytes, total_bytes) + .unwrap_or(self.progress_percent) + .clamp(0.0, 100.0), + download_speed_mbps: live.map(|l| l.download_speed.mbps), + upload_speed_mbps: live.map(|l| l.upload_speed.mbps), + peers_live: peer_stats.map(|p| p.live), + peers_seen: peer_stats.map(|p| p.seen), + eta: live.and_then(|l| l.time_remaining.as_ref().map(|eta| eta.to_string())), + active: self.handle.is_some(), + error: self.error.clone(), + created_at: Some(self.created_at.clone()), + updated_at: Some(self.updated_at.clone()), + completed_at: self.completed_at.clone(), } } } @@ -205,15 +387,75 @@ impl TorrentService { .cloned() } + pub async fn list(&self, pool: &PgPool, user_id: i64) -> anyhow::Result> { + let rows = sqlx::query_as::<_, TorrentSessionRow>( + r#"SELECT id, user_id, name, info_hash, source_kind, source_label, torrent_bytes, + files_json, selected_files_json, status, total_size, selected_size, + downloaded_bytes, uploaded_bytes, progress_percent, error, + created_at, updated_at, completed_at + FROM furumusic__torrent_session + WHERE user_id = $1 + ORDER BY updated_at DESC, created_at DESC + LIMIT $2"#, + ) + .bind(user_id) + .bind(TORRENT_LIST_LIMIT) + .fetch_all(pool) + .await?; + + let handles = { + let jobs = self.jobs.lock().await; + jobs.iter() + .filter_map(|(id, job)| job.handle.as_ref().map(|h| (id.clone(), Arc::clone(h)))) + .collect::>() + }; + + Ok(rows + .iter() + .map(|row| row.dto(handles.get(&row.id))) + .collect()) + } + + pub async fn details( + &self, + pool: &PgPool, + user_id: i64, + id: &str, + ) -> anyhow::Result { + if let Some(session) = self.memory_details(user_id, id).await { + return Ok(session); + } + + let row = load_row(pool, user_id, id).await?; + let selected_files = row.selected_files(); + let job = row.dto(None); + let preview = row.preview()?; + Ok(TorrentSessionDto { + job, + preview, + selected_files, + }) + } + pub async fn preview( &self, + pool: &PgPool, + user_id: i64, request: TorrentPreviewRequest, - ) -> anyhow::Result { + ) -> anyhow::Result { let session = self.session().await?; let id = Uuid::new_v4().to_string(); let output_dir = self.temp_root.join(&id).join("download"); tokio::fs::create_dir_all(&output_dir).await?; + let source_kind = request.kind.as_str().to_string(); + let source_label = request + .source_label + .as_deref() + .map(str::trim) + .filter(|s| !s.is_empty()) + .map(str::to_owned); + let add = match request.kind { TorrentPreviewKind::Magnet => { let magnet = request @@ -269,50 +511,101 @@ impl TorrentService { .filename .to_string() .unwrap_or_else(|_| "".to_string()); - let selected = is_audio_path(&name); files.push(TorrentFileDto { index, name, components: details.filename.to_vec().unwrap_or_default(), length: details.len, - selected, + selected: true, }); } - let total_size = files.iter().map(|f| f.length).sum(); - let dto = TorrentPreviewDto { - id: id.clone(), - name: name.clone(), - info_hash: list.info_hash.as_string(), - total_size, - files: files.clone(), - }; - + let selected_files = files.iter().map(|f| f.index).collect::>(); + let now = now_string(); let job = TorrentJob { id: id.clone(), + user_id, name, - info_hash: dto.info_hash.clone(), + info_hash: list.info_hash.as_string(), + source_kind, + source_label, torrent_bytes: list.torrent_bytes.to_vec(), files, status: TorrentJobStatus::Preview, output_dir, - selected_files: Vec::new(), + selected_files, handle: None, + downloaded_bytes: 0, + uploaded_bytes: 0, + progress_percent: 0.0, error: None, + created_at: now.clone(), + updated_at: now, + completed_at: None, + }; + insert_job(pool, &job).await?; + + let dto = TorrentSessionDto { + job: job.dto(), + preview: job.preview(), + selected_files: job.selected_files.clone(), }; self.jobs.lock().await.insert(id, job); Ok(dto) } - pub async fn status(&self, id: &str) -> anyhow::Result { - let jobs = self.jobs.lock().await; - let job = jobs.get(id).context("torrent job not found")?; - Ok(job.dto()) + pub async fn status( + &self, + pool: &PgPool, + user_id: i64, + id: &str, + ) -> anyhow::Result { + let dto = { + let mut jobs = self.jobs.lock().await; + jobs.get_mut(id) + .filter(|job| job.user_id == user_id) + .map(|job| { + job.refresh_progress(); + job.dto() + }) + }; + + if let Some(dto) = dto { + persist_progress(pool, &dto).await?; + return Ok(dto); + } + + let row = load_row(pool, user_id, id).await?; + Ok(row.dto(None)) + } + + pub async fn remove(&self, pool: &PgPool, user_id: i64, id: &str) -> anyhow::Result<()> { + let removed = { + let mut jobs = self.jobs.lock().await; + jobs.remove(id).and_then(|job| job.handle) + }; + if let Some(handle) = removed { + self.stop_torrent(&handle).await; + } + + let result = sqlx::query( + "DELETE FROM furumusic__torrent_session WHERE id = $1 AND user_id = $2", + ) + .bind(id) + .bind(user_id) + .execute(pool) + .await?; + + if result.rows_affected() == 0 { + bail!("torrent session not found"); + } + Ok(()) } pub async fn start( self: &Arc, + pool: &PgPool, id: &str, selected_files: Vec, inbox_dir: String, @@ -326,21 +619,34 @@ impl TorrentService { } let inbox_dir = validate_inbox_dir(&inbox_dir)?; + self.ensure_memory_job(pool, uploader_user_id, id).await?; + let (torrent_bytes, output_dir) = { let mut jobs = self.jobs.lock().await; let job = jobs.get_mut(id).context("torrent job not found")?; - if job.status != TorrentJobStatus::Preview && job.status != TorrentJobStatus::Failed { - bail!("torrent job is already started"); + if job.user_id != uploader_user_id { + bail!("torrent job not found"); + } + if job.handle.is_some() && matches!(job.status, TorrentJobStatus::Downloading | TorrentJobStatus::Moving) { + bail!("torrent job is already running"); } validate_selection(&job.files, &selected_files)?; job.status = TorrentJobStatus::Downloading; job.selected_files = selected_files.clone(); + job.downloaded_bytes = 0; + job.uploaded_bytes = 0; + job.progress_percent = 0.0; job.error = None; + job.completed_at = None; + job.updated_at = now_string(); (job.torrent_bytes.clone(), job.output_dir.clone()) }; + tokio::fs::create_dir_all(&output_dir).await?; + mark_job_started(pool, id, &selected_files, &self.memory_job_dto(id).await?).await?; + let session = self.session().await?; - let response = session + let response = match session .add_torrent( AddTorrent::from_bytes(torrent_bytes), Some(AddTorrentOptions { @@ -350,11 +656,23 @@ impl TorrentService { ..Default::default() }), ) - .await?; + .await + { + Ok(response) => response, + Err(err) => { + self.fail_job(pool, id, err.to_string()).await; + return Err(err.into()); + } + }; - let handle = response - .into_handle() - .context("torrent did not return a download handle")?; + let handle = match response.into_handle() { + Some(handle) => handle, + None => { + let err = anyhow::anyhow!("torrent did not return a download handle"); + self.fail_job(pool, id, err.to_string()).await; + return Err(err); + } + }; let dto = { let mut jobs = self.jobs.lock().await; @@ -362,32 +680,84 @@ impl TorrentService { job.handle = Some(handle.clone()); job.dto() }; + persist_progress(pool, &dto).await?; let service = Arc::clone(self); + let pool = pool.clone(); let id = id.to_string(); tokio::spawn(async move { if let Err(err) = handle.wait_until_completed().await { service.stop_torrent(&handle).await; - service.fail_job(&id, err.to_string()).await; + service.fail_job(&pool, &id, err.to_string()).await; return; } service.stop_torrent(&handle).await; if let Err(err) = service - .finalize_completed(&id, &inbox_dir, uploader_user_id) + .finalize_completed(&pool, &id, &inbox_dir, uploader_user_id) .await { - service.fail_job(&id, err.to_string()).await; + service.fail_job(&pool, &id, err.to_string()).await; } }); Ok(dto) } - async fn fail_job(&self, id: &str, error: String) { - let mut jobs = self.jobs.lock().await; - if let Some(job) = jobs.get_mut(id) { - job.status = TorrentJobStatus::Failed; - job.error = Some(error); + async fn memory_details(&self, user_id: i64, id: &str) -> Option { + let jobs = self.jobs.lock().await; + let job = jobs.get(id)?; + if job.user_id != user_id { + return None; + } + Some(TorrentSessionDto { + job: job.dto(), + preview: job.preview(), + selected_files: job.selected_files.clone(), + }) + } + + async fn ensure_memory_job(&self, pool: &PgPool, user_id: i64, id: &str) -> anyhow::Result<()> { + if self.jobs.lock().await.contains_key(id) { + return Ok(()); + } + + let row = load_row(pool, user_id, id).await?; + let job = row.into_job(&self.temp_root)?; + self.jobs.lock().await.insert(id.to_string(), job); + Ok(()) + } + + async fn memory_job_dto(&self, id: &str) -> anyhow::Result { + let jobs = self.jobs.lock().await; + let job = jobs.get(id).context("torrent job not found")?; + Ok(job.dto()) + } + + async fn fail_job(&self, pool: &PgPool, id: &str, error: String) { + let dto = { + let mut jobs = self.jobs.lock().await; + jobs.get_mut(id).map(|job| { + job.refresh_progress(); + job.status = TorrentJobStatus::Failed; + job.error = Some(error.clone()); + job.handle = None; + job.updated_at = now_string(); + job.dto() + }) + }; + if let Some(dto) = dto { + let _ = persist_progress(pool, &dto).await; + } else { + let _ = sqlx::query( + "UPDATE furumusic__torrent_session + SET status = 'failed', error = $2, updated_at = $3 + WHERE id = $1", + ) + .bind(id) + .bind(error) + .bind(now_string()) + .execute(pool) + .await; } } @@ -406,6 +776,7 @@ impl TorrentService { async fn finalize_completed( &self, + pool: &PgPool, id: &str, inbox_dir: &Path, uploader_user_id: i64, @@ -413,7 +784,9 @@ impl TorrentService { let (name, files, selected_files, output_dir) = { let mut jobs = self.jobs.lock().await; let job = jobs.get_mut(id).context("torrent job not found")?; + job.refresh_progress(); job.status = TorrentJobStatus::Moving; + job.updated_at = now_string(); ( job.name.clone(), job.files.clone(), @@ -422,6 +795,9 @@ impl TorrentService { ) }; + let moving_dto = self.memory_job_dto(id).await?; + persist_progress(pool, &moving_dto).await?; + let destination_root = inbox_dir .join("user_uploads") .join(uploader_user_id.to_string()) @@ -443,11 +819,18 @@ impl TorrentService { let job_root = self.temp_root.join(id); let _ = tokio::fs::remove_dir_all(job_root).await; - { + let completed_dto = { let mut jobs = self.jobs.lock().await; let job = jobs.get_mut(id).context("torrent job not found")?; + job.refresh_progress(); job.status = TorrentJobStatus::Complete; - } + job.completed_at = Some(now_string()); + job.updated_at = now_string(); + let dto = job.dto(); + job.handle = None; + dto + }; + persist_progress(pool, &completed_dto).await?; if let Some(handle) = self.scheduler_handle.get() { let handle = Arc::clone(handle); @@ -462,6 +845,108 @@ impl TorrentService { } } +async fn load_row(pool: &PgPool, user_id: i64, id: &str) -> anyhow::Result { + sqlx::query_as::<_, TorrentSessionRow>( + r#"SELECT id, user_id, name, info_hash, source_kind, source_label, torrent_bytes, + files_json, selected_files_json, status, total_size, selected_size, + downloaded_bytes, uploaded_bytes, progress_percent, error, + created_at, updated_at, completed_at + FROM furumusic__torrent_session + WHERE id = $1 AND user_id = $2"#, + ) + .bind(id) + .bind(user_id) + .fetch_optional(pool) + .await? + .context("torrent session not found") +} + +async fn insert_job(pool: &PgPool, job: &TorrentJob) -> anyhow::Result<()> { + sqlx::query( + r#"INSERT INTO furumusic__torrent_session + (id, user_id, name, info_hash, source_kind, source_label, torrent_bytes, + files_json, selected_files_json, status, total_size, selected_size, + downloaded_bytes, uploaded_bytes, progress_percent, error, + created_at, updated_at, completed_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, + $8, $9, $10, $11, $12, + 0, 0, 0, NULL, + $13, $14, NULL)"#, + ) + .bind(&job.id) + .bind(job.user_id) + .bind(&job.name) + .bind(&job.info_hash) + .bind(&job.source_kind) + .bind(&job.source_label) + .bind(&job.torrent_bytes) + .bind(serde_json::to_string(&job.files)?) + .bind(serde_json::to_string(&job.selected_files)?) + .bind(job.status.as_str()) + .bind(u64_to_i64(job.total_size())) + .bind(u64_to_i64(job.selected_size())) + .bind(&job.created_at) + .bind(&job.updated_at) + .execute(pool) + .await?; + Ok(()) +} + +async fn mark_job_started( + pool: &PgPool, + id: &str, + selected_files: &[usize], + dto: &TorrentJobDto, +) -> anyhow::Result<()> { + sqlx::query( + r#"UPDATE furumusic__torrent_session + SET selected_files_json = $2, + status = 'downloading', + selected_size = $3, + downloaded_bytes = 0, + uploaded_bytes = 0, + progress_percent = 0, + error = NULL, + updated_at = $4, + completed_at = NULL + WHERE id = $1"#, + ) + .bind(id) + .bind(serde_json::to_string(selected_files)?) + .bind(u64_to_i64(dto.selected_size)) + .bind(now_string()) + .execute(pool) + .await?; + Ok(()) +} + +async fn persist_progress(pool: &PgPool, dto: &TorrentJobDto) -> anyhow::Result<()> { + sqlx::query( + r#"UPDATE furumusic__torrent_session + SET status = $2, + selected_size = $3, + downloaded_bytes = $4, + uploaded_bytes = $5, + progress_percent = $6, + error = $7, + updated_at = $8, + completed_at = $9 + WHERE id = $1"#, + ) + .bind(&dto.id) + .bind(&dto.status) + .bind(u64_to_i64(dto.selected_size)) + .bind(u64_to_i64(dto.downloaded_bytes)) + .bind(u64_to_i64(dto.uploaded_bytes)) + .bind(dto.progress_percent) + .bind(&dto.error) + .bind(now_string()) + .bind(&dto.completed_at) + .execute(pool) + .await?; + Ok(()) +} + fn validate_selection(files: &[TorrentFileDto], selected_files: &[usize]) -> anyhow::Result<()> { for index in selected_files { if !files.iter().any(|file| file.index == *index) { @@ -483,26 +968,35 @@ fn validate_inbox_dir(inbox_dir: &str) -> anyhow::Result { Ok(path) } -fn is_audio_path(path: &str) -> bool { - let Some(ext) = Path::new(path).extension().and_then(|e| e.to_str()) else { - return false; - }; - matches!( - ext.to_ascii_lowercase().as_str(), - "mp3" - | "flac" - | "ogg" - | "opus" - | "aac" - | "m4a" - | "wav" - | "ape" - | "wv" - | "wma" - | "tta" - | "aiff" - | "aif" - ) +fn selected_size(files: &[TorrentFileDto], selected_files: &[usize]) -> u64 { + if selected_files.is_empty() { + return 0; + } + files + .iter() + .filter(|f| selected_files.contains(&f.index)) + .map(|f| f.length) + .sum() +} + +fn progress_percent(downloaded: u64, total: u64) -> Option { + if total == 0 { + None + } else { + Some(downloaded as f64 / total as f64 * 100.0) + } +} + +fn now_string() -> String { + chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string() +} + +fn u64_to_i64(value: u64) -> i64 { + value.min(i64::MAX as u64) as i64 +} + +fn i64_to_u64(value: i64) -> u64 { + value.max(0) as u64 } fn sanitize_path_component(value: &str) -> String { diff --git a/templates/player.html b/templates/player.html index 845f73e..ce3ff70 100644 --- a/templates/player.html +++ b/templates/player.html @@ -3,5124 +3,11 @@ {% block title %}{{ t.site_name }}{% endblock title %} {% block head_extra %} - +{% include "player/styles.html" %} {% endblock head_extra %} {% block body %} -
- -
- - - - - - -
- -
- - - - v{{ t.app_version() }} - - -
- - - - - - - - - - - - - - - -
- - -
-
-

Queue

- -
-
- - -
-
-
- - -
-
- -
- -
-
- - - - - -
-
- -
-
-
-
-
- -
-
- -
-
- -
-
-
-
-
-
- -
-
- - - - - - - - - - - - - - - -
- - - +{% include "player/shell.html" %} +{% include "player/modals.html" %} +{% include "player/scripts.html" %} {% endblock body %} diff --git a/templates/player/modals.html b/templates/player/modals.html new file mode 100644 index 0000000..8bb4a80 --- /dev/null +++ b/templates/player/modals.html @@ -0,0 +1,258 @@ + + + + + + + + + + + + + + + + diff --git a/templates/player/scripts.html b/templates/player/scripts.html new file mode 100644 index 0000000..1024fd1 --- /dev/null +++ b/templates/player/scripts.html @@ -0,0 +1,1545 @@ + + diff --git a/templates/player/shell.html b/templates/player/shell.html new file mode 100644 index 0000000..ddf7c53 --- /dev/null +++ b/templates/player/shell.html @@ -0,0 +1,981 @@ +
+ +
+ + + + + + +
+ +
+ + + + v{{ t.app_version() }} + + +
+ + + + + + + + + + + + + + + +
+ + +
+
+
+

Queue

+ +
+
+ + +
+
+
+ + +
+
+ +
+ +
+
+ + + + + +
+
+ +
+
+
+
+
+ +
+
+ +
+
+ +
+
+
+
+
+
+ +
+
diff --git a/templates/player/styles.html b/templates/player/styles.html new file mode 100644 index 0000000..5ec26de --- /dev/null +++ b/templates/player/styles.html @@ -0,0 +1,2741 @@ +