Improved upload UI
Build and Publish / Build and Publish Docker Image (push) Successful in 3m2s

This commit is contained in:
Ultradesu
2026-05-26 16:59:36 +03:00
parent 82923c871e
commit d425bf3087
8 changed files with 738 additions and 97 deletions
+316 -34
View File
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
@@ -98,6 +98,7 @@ pub struct TorrentStartRequest {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum TorrentJobStatus {
Resolving,
Preview,
Downloading,
Moving,
@@ -110,6 +111,7 @@ impl TorrentJobStatus {
fn as_str(self) -> &'static str {
match self {
Self::Preview => "preview",
Self::Resolving => "resolving",
Self::Downloading => "downloading",
Self::Moving => "moving",
Self::Complete => "complete",
@@ -121,6 +123,7 @@ impl TorrentJobStatus {
fn from_str(value: &str) -> Self {
match value {
"downloading" => Self::Downloading,
"resolving" => Self::Resolving,
"moving" => Self::Moving,
"complete" => Self::Complete,
"failed" => Self::Failed,
@@ -372,6 +375,7 @@ pub struct TorrentService {
temp_root: PathBuf,
session: OnceCell<Arc<Session>>,
jobs: Mutex<HashMap<String, TorrentJob>>,
resolving_jobs: Mutex<HashSet<String>>,
scheduler_handle: Arc<OnceCell<Arc<SchedulerHandle>>>,
}
@@ -381,6 +385,7 @@ impl TorrentService {
temp_root: std::env::temp_dir().join("furumusic").join("torrents"),
session: OnceCell::new(),
jobs: Mutex::new(HashMap::new()),
resolving_jobs: Mutex::new(HashSet::new()),
scheduler_handle,
}
}
@@ -404,7 +409,11 @@ impl TorrentService {
.cloned()
}
pub async fn list(&self, pool: &PgPool, user_id: i64) -> anyhow::Result<Vec<TorrentJobDto>> {
pub async fn list(
self: &Arc<Self>,
pool: &PgPool,
user_id: i64,
) -> anyhow::Result<Vec<TorrentJobDto>> {
let rows = sqlx::query_as::<_, TorrentSessionRow>(
r#"SELECT id, user_id, name, info_hash, source_kind, source_label, torrent_bytes,
files_json, selected_files_json, status, total_size, selected_size,
@@ -412,7 +421,7 @@ impl TorrentService {
created_at, updated_at, completed_at
FROM furumusic__torrent_session
WHERE user_id = $1
ORDER BY updated_at DESC, created_at DESC
ORDER BY created_at DESC, id DESC
LIMIT $2"#,
)
.bind(user_id)
@@ -427,6 +436,21 @@ impl TorrentService {
.collect::<HashMap<_, _>>()
};
for row in rows.iter().filter(|row| row.status == "resolving") {
if row.source_kind == "magnet" {
if let Some(magnet) = row.source_label.clone() {
self.spawn_resolve_pending_magnet(
pool.clone(),
user_id,
row.id.clone(),
magnet,
row.created_at.clone(),
)
.await;
}
}
}
Ok(rows
.iter()
.map(|row| row.dto(handles.get(&row.id)))
@@ -455,7 +479,7 @@ impl TorrentService {
}
pub async fn preview(
&self,
self: &Arc<Self>,
pool: &PgPool,
user_id: i64,
request: TorrentPreviewRequest,
@@ -473,42 +497,50 @@ impl TorrentService {
.filter(|s| !s.is_empty())
.map(str::to_owned);
let add = match request.kind {
TorrentPreviewKind::Magnet => {
let magnet = request
.magnet
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty())
.context("magnet link is empty")?;
AddTorrent::from_url(magnet.to_string())
}
TorrentPreviewKind::TorrentFile => {
let encoded = request
.torrent_base64
.as_deref()
.filter(|s| !s.is_empty())
.context("torrent file is empty")?;
let bytes = base64::engine::general_purpose::STANDARD
.decode(encoded)
.context("invalid torrent file encoding")?;
AddTorrent::from_bytes(bytes)
}
};
if matches!(request.kind, TorrentPreviewKind::Magnet) {
let magnet = request
.magnet
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty())
.context("magnet link is empty")?
.to_string();
let info_hash = extract_magnet_info_hash(&magnet).context("invalid magnet link")?;
let name = magnet_display_name(&magnet)
.or(source_label)
.unwrap_or_else(|| info_hash.clone());
let now = now_string();
insert_pending_magnet(pool, &id, user_id, &name, &info_hash, &magnet, &now).await?;
self.spawn_resolve_pending_magnet(pool.clone(), user_id, id.clone(), magnet, now)
.await;
let response = tokio::time::timeout(
METADATA_TIMEOUT,
session.add_torrent(
add,
let row = load_row(pool, user_id, &id).await?;
return Ok(TorrentSessionDto {
job: row.dto(None),
preview: row.preview()?,
selected_files: row.selected_files(),
});
}
let encoded = request
.torrent_base64
.as_deref()
.filter(|s| !s.is_empty())
.context("torrent file is empty")?;
let bytes = base64::engine::general_purpose::STANDARD
.decode(encoded)
.context("invalid torrent file encoding")?;
let response = session
.add_torrent(
AddTorrent::from_bytes(bytes),
Some(AddTorrentOptions {
list_only: true,
output_folder: Some(output_dir.to_string_lossy().to_string()),
..Default::default()
}),
),
)
.await
.context("timed out while resolving torrent metadata")??;
)
.await?;
let AddTorrentResponse::ListOnly(list) = response else {
bail!("torrent was unexpectedly added instead of previewed");
@@ -572,6 +604,114 @@ impl TorrentService {
Ok(dto)
}
async fn spawn_resolve_pending_magnet(
self: &Arc<Self>,
pool: PgPool,
user_id: i64,
id: String,
magnet: String,
created_at: String,
) {
{
let mut resolving = self.resolving_jobs.lock().await;
if !resolving.insert(id.clone()) {
return;
}
}
let service = Arc::clone(self);
tokio::spawn(async move {
let result = service
.resolve_pending_magnet(&pool, user_id, &id, &magnet, &created_at)
.await;
if let Err(err) = result {
update_resolving_error(&pool, &id, &err.to_string()).await;
}
service.resolving_jobs.lock().await.remove(&id);
});
}
async fn resolve_pending_magnet(
&self,
pool: &PgPool,
user_id: i64,
id: &str,
magnet: &str,
created_at: &str,
) -> anyhow::Result<()> {
let session = self.session().await?;
let output_dir = self.temp_root.join(id).join("download");
tokio::fs::create_dir_all(&output_dir).await?;
let response = tokio::time::timeout(
METADATA_TIMEOUT,
session.add_torrent(
AddTorrent::from_url(magnet.to_string()),
Some(AddTorrentOptions {
list_only: true,
output_folder: Some(output_dir.to_string_lossy().to_string()),
..Default::default()
}),
),
)
.await
.context("timed out while resolving torrent metadata")??;
let AddTorrentResponse::ListOnly(list) = response else {
bail!("torrent was unexpectedly added instead of previewed");
};
let name = list
.info
.name
.as_ref()
.map(|b| String::from_utf8_lossy(b.as_ref()).to_string())
.filter(|s| !s.is_empty())
.or_else(|| magnet_display_name(magnet))
.unwrap_or_else(|| list.info_hash.as_string());
let mut files = Vec::new();
for (index, details) in list.info.iter_file_details()?.enumerate() {
let name = details
.filename
.to_string()
.unwrap_or_else(|_| "<invalid filename>".to_string());
files.push(TorrentFileDto {
index,
name,
components: details.filename.to_vec().unwrap_or_default(),
length: details.len,
selected: true,
});
}
let selected_files = files.iter().map(|f| f.index).collect::<Vec<_>>();
let job = TorrentJob {
id: id.to_string(),
user_id,
name,
info_hash: list.info_hash.as_string(),
source_kind: "magnet".to_string(),
source_label: Some(magnet.to_string()),
torrent_bytes: list.torrent_bytes.to_vec(),
files,
status: TorrentJobStatus::Preview,
output_dir,
selected_files,
handle: None,
downloaded_bytes: 0,
uploaded_bytes: 0,
progress_percent: 0.0,
error: None,
created_at: created_at.to_string(),
updated_at: now_string(),
completed_at: None,
};
update_resolved_job(pool, &job).await?;
self.jobs.lock().await.insert(id.to_string(), job);
Ok(())
}
pub async fn status(
&self,
pool: &PgPool,
@@ -949,6 +1089,89 @@ async fn insert_job(pool: &PgPool, job: &TorrentJob) -> anyhow::Result<()> {
Ok(())
}
async fn insert_pending_magnet(
pool: &PgPool,
id: &str,
user_id: i64,
name: &str,
info_hash: &str,
magnet: &str,
now: &str,
) -> anyhow::Result<()> {
sqlx::query(
r#"INSERT INTO furumusic__torrent_session
(id, user_id, name, info_hash, source_kind, source_label, torrent_bytes,
files_json, selected_files_json, status, total_size, selected_size,
downloaded_bytes, uploaded_bytes, progress_percent, error,
created_at, updated_at, completed_at)
VALUES ($1, $2, $3, $4, 'magnet', $5, $6,
'[]', '[]', 'resolving', 0, 0,
0, 0, 0, NULL,
$7, $8, NULL)"#,
)
.bind(id)
.bind(user_id)
.bind(name)
.bind(info_hash)
.bind(magnet)
.bind(Vec::<u8>::new())
.bind(now)
.bind(now)
.execute(pool)
.await?;
Ok(())
}
async fn update_resolved_job(pool: &PgPool, job: &TorrentJob) -> anyhow::Result<()> {
sqlx::query(
r#"UPDATE furumusic__torrent_session
SET name = $2,
info_hash = $3,
torrent_bytes = $4,
files_json = $5,
selected_files_json = $6,
status = 'preview',
total_size = $7,
selected_size = $8,
downloaded_bytes = 0,
uploaded_bytes = 0,
progress_percent = 0,
error = NULL,
updated_at = $9,
completed_at = NULL
WHERE id = $1"#,
)
.bind(&job.id)
.bind(&job.name)
.bind(&job.info_hash)
.bind(&job.torrent_bytes)
.bind(serde_json::to_string(&job.files)?)
.bind(serde_json::to_string(&job.selected_files)?)
.bind(u64_to_i64(job.total_size()))
.bind(u64_to_i64(job.selected_size()))
.bind(&job.updated_at)
.execute(pool)
.await?;
Ok(())
}
async fn update_resolving_error(pool: &PgPool, id: &str, error: &str) {
if let Err(err) = sqlx::query(
r#"UPDATE furumusic__torrent_session
SET error = $2,
updated_at = $3
WHERE id = $1 AND status = 'resolving'"#,
)
.bind(id)
.bind(error)
.bind(now_string())
.execute(pool)
.await
{
tracing::warn!("failed to persist torrent metadata resolving error: {err}");
}
}
async fn mark_job_started(
pool: &PgPool,
id: &str,
@@ -1056,6 +1279,65 @@ fn i64_to_u64(value: i64) -> u64 {
value.max(0) as u64
}
fn extract_magnet_info_hash(magnet: &str) -> Option<String> {
if !magnet.starts_with("magnet:?") {
return None;
}
magnet
.split(['?', '&'])
.find_map(|part| part.strip_prefix("xt=urn:btih:"))
.map(|hash| percent_decode(hash).to_ascii_lowercase())
.filter(|hash| !hash.is_empty())
}
fn magnet_display_name(magnet: &str) -> Option<String> {
magnet
.split(['?', '&'])
.find_map(|part| part.strip_prefix("dn="))
.map(percent_decode)
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
}
fn percent_decode(value: &str) -> String {
let bytes = value.as_bytes();
let mut out = Vec::with_capacity(bytes.len());
let mut index = 0;
while index < bytes.len() {
match bytes[index] {
b'+' => {
out.push(b' ');
index += 1;
}
b'%' if index + 2 < bytes.len() => {
let hi = hex_value(bytes[index + 1]);
let lo = hex_value(bytes[index + 2]);
if let (Some(hi), Some(lo)) = (hi, lo) {
out.push((hi << 4) | lo);
index += 3;
} else {
out.push(bytes[index]);
index += 1;
}
}
byte => {
out.push(byte);
index += 1;
}
}
}
String::from_utf8_lossy(&out).to_string()
}
fn hex_value(byte: u8) -> Option<u8> {
match byte {
b'0'..=b'9' => Some(byte - b'0'),
b'a'..=b'f' => Some(byte - b'a' + 10),
b'A'..=b'F' => Some(byte - b'A' + 10),
_ => None,
}
}
fn sanitize_path_component(value: &str) -> String {
let sanitized: String = value
.chars()