Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
722183047d | ||
|
|
106ab96c56 | ||
|
|
cbc5639f99 | ||
|
|
754097f894 | ||
|
|
b761245fd0 | ||
|
|
0f49d8d079 | ||
|
|
a17ff322ad | ||
|
|
707ef85e5d | ||
|
|
ec4c53497f | ||
|
|
46ba3d5490 |
1918
Cargo.lock
generated
1918
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "furumi-client-core"
|
name = "furumi-client-core"
|
||||||
version = "0.2.1"
|
version = "0.3.4"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
use crate::error::{ClientError, Result};
|
use crate::error::{ClientError, Result};
|
||||||
use furumi_common::proto::{
|
use furumi_common::proto::{
|
||||||
remote_file_system_client::RemoteFileSystemClient, AttrResponse, DirEntry, FileChunk,
|
remote_file_system_client::RemoteFileSystemClient, AttrResponse, DirEntry, FileChunk,
|
||||||
PathRequest, ReadRequest, SnapshotRequest, WatchRequest,
|
PathRequest, ReadRequest,
|
||||||
};
|
};
|
||||||
use moka::future::Cache;
|
use moka::future::Cache;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
@@ -15,7 +15,7 @@ use tonic::codegen::InterceptedService;
|
|||||||
use tonic::metadata::MetadataValue;
|
use tonic::metadata::MetadataValue;
|
||||||
use tonic::transport::{Channel, Endpoint, Uri};
|
use tonic::transport::{Channel, Endpoint, Uri};
|
||||||
use tonic::{Request, Status};
|
use tonic::{Request, Status};
|
||||||
use tracing::{debug, info, warn, trace};
|
use tracing::{debug, info};
|
||||||
|
|
||||||
// ── Auth interceptor ───────────────────────────────────────────
|
// ── Auth interceptor ───────────────────────────────────────────
|
||||||
|
|
||||||
@@ -127,7 +127,6 @@ impl tower::Service<Uri> for InsecureTlsConnector {
|
|||||||
pub struct FurumiClient {
|
pub struct FurumiClient {
|
||||||
client: GrpcClient,
|
client: GrpcClient,
|
||||||
attr_cache: Cache<String, AttrResponse>,
|
attr_cache: Cache<String, AttrResponse>,
|
||||||
dir_cache: Cache<String, Arc<Vec<DirEntry>>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FurumiClient {
|
impl FurumiClient {
|
||||||
@@ -188,130 +187,29 @@ impl FurumiClient {
|
|||||||
.time_to_live(Duration::from_secs(5))
|
.time_to_live(Duration::from_secs(5))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
let dir_cache = Cache::builder()
|
Ok(Self { client, attr_cache })
|
||||||
.max_capacity(10_000)
|
|
||||||
.time_to_live(Duration::from_secs(30))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
Ok(Self { client, attr_cache, dir_cache })
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Spawns background tasks that pre-warm the cache with a server snapshot and then
|
|
||||||
/// subscribe to live change events to keep it up to date.
|
|
||||||
/// Must be called after a successful authentication check.
|
|
||||||
pub fn start_background_sync(&self) {
|
|
||||||
info!("background sync: starting");
|
|
||||||
let this = self.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let t = std::time::Instant::now();
|
|
||||||
match this.load_snapshot("/", 3).await {
|
|
||||||
Ok(n) => info!(
|
|
||||||
"background sync: snapshot loaded — {} directories in {:.1}s",
|
|
||||||
n,
|
|
||||||
t.elapsed().as_secs_f32()
|
|
||||||
),
|
|
||||||
Err(e) => {
|
|
||||||
warn!("background sync: GetSnapshot failed ({}), falling back to on-demand caching", e);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("background sync: subscribing to change events");
|
|
||||||
// Reconnect loop: if the watch stream drops, reconnect after a short delay.
|
|
||||||
loop {
|
|
||||||
match this.run_watch_loop().await {
|
|
||||||
Ok(()) => {
|
|
||||||
info!("background sync: WatchChanges stream closed cleanly");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("background sync: WatchChanges error ({}), reconnecting in 5s", e);
|
|
||||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Fetches the server's pre-built directory snapshot and populates both caches.
|
|
||||||
/// Returns the number of directories loaded.
|
|
||||||
async fn load_snapshot(&self, path: &str, depth: u32) -> Result<usize> {
|
|
||||||
debug!("snapshot: requesting path={} depth={}", path, depth);
|
|
||||||
let mut client = self.client.clone();
|
|
||||||
let req = tonic::Request::new(SnapshotRequest {
|
|
||||||
path: path.to_string(),
|
|
||||||
depth,
|
|
||||||
});
|
|
||||||
let mut stream = client.get_snapshot(req).await?.into_inner();
|
|
||||||
let mut dirs = 0;
|
|
||||||
let mut attrs_warmed = 0;
|
|
||||||
while let Some(entry) = stream.next().await {
|
|
||||||
let entry = entry?;
|
|
||||||
trace!("snapshot: got dir '{}' ({} entries)", entry.path, entry.children.len());
|
|
||||||
|
|
||||||
// Warm attr_cache for the directory itself.
|
|
||||||
if let Some(dir_attr) = entry.dir_attr {
|
|
||||||
self.attr_cache.insert(entry.path.clone(), dir_attr).await;
|
|
||||||
attrs_warmed += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Warm attr_cache for each child (parallel slice: children[i] ↔ child_attrs[i]).
|
|
||||||
for (child, attr) in entry.children.iter().zip(entry.child_attrs.iter()) {
|
|
||||||
let child_path = if entry.path == "/" {
|
|
||||||
format!("/{}", child.name)
|
|
||||||
} else {
|
|
||||||
format!("{}/{}", entry.path, child.name)
|
|
||||||
};
|
|
||||||
self.attr_cache.insert(child_path, attr.clone()).await;
|
|
||||||
attrs_warmed += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Populate dir_cache.
|
|
||||||
self.dir_cache
|
|
||||||
.insert(entry.path, Arc::new(entry.children))
|
|
||||||
.await;
|
|
||||||
dirs += 1;
|
|
||||||
}
|
|
||||||
debug!("snapshot: {} dirs → dir_cache, {} attrs → attr_cache", dirs, attrs_warmed);
|
|
||||||
Ok(dirs)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Subscribes to the server's live change events and invalidates `dir_cache` entries.
|
|
||||||
/// Returns when the stream closes or on error.
|
|
||||||
async fn run_watch_loop(&self) -> Result<()> {
|
|
||||||
let mut client = self.client.clone();
|
|
||||||
let req = tonic::Request::new(WatchRequest {});
|
|
||||||
let mut stream = client.watch_changes(req).await?.into_inner();
|
|
||||||
while let Some(event) = stream.next().await {
|
|
||||||
let event = event?;
|
|
||||||
debug!("watch: invalidating dir_cache for '{}'", event.path);
|
|
||||||
self.dir_cache.invalidate(&event.path).await;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetches file attributes from the server, utilizing an internal cache.
|
/// Fetches file attributes from the server, utilizing an internal cache.
|
||||||
pub async fn get_attr(&self, path: &str) -> Result<AttrResponse> {
|
pub async fn get_attr(&self, path: &str) -> Result<AttrResponse> {
|
||||||
if let Some(attr) = self.attr_cache.get(path).await {
|
if let Some(attr) = self.attr_cache.get(path).await {
|
||||||
trace!("get_attr: cache hit '{}'", path);
|
|
||||||
return Ok(attr);
|
return Ok(attr);
|
||||||
}
|
}
|
||||||
|
|
||||||
let t = std::time::Instant::now();
|
debug!("get_attr (cache miss): {}", path);
|
||||||
let mut client = self.client.clone();
|
let mut client = self.client.clone();
|
||||||
let req = tonic::Request::new(PathRequest {
|
let req = tonic::Request::new(PathRequest {
|
||||||
path: path.to_string(),
|
path: path.to_string(),
|
||||||
});
|
});
|
||||||
|
|
||||||
let response = client.get_attr(req).await?.into_inner();
|
let response = client.get_attr(req).await?.into_inner();
|
||||||
debug!("get_attr: cache miss '{}' — rpc {:.1}ms", path, t.elapsed().as_secs_f32() * 1000.0);
|
|
||||||
self.attr_cache.insert(path.to_string(), response.clone()).await;
|
self.attr_cache.insert(path.to_string(), response.clone()).await;
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetches directory contents from gRPC and stores them in the cache.
|
/// Reads directory contents from the server stream.
|
||||||
/// Does not trigger prefetch — safe to call from background tasks.
|
pub async fn read_dir(&self, path: &str) -> Result<Vec<DirEntry>> {
|
||||||
async fn fetch_and_cache_dir(&self, path: &str) -> Result<Arc<Vec<DirEntry>>> {
|
debug!("read_dir: {}", path);
|
||||||
let t = std::time::Instant::now();
|
|
||||||
let mut client = self.client.clone();
|
let mut client = self.client.clone();
|
||||||
let req = tonic::Request::new(PathRequest {
|
let req = tonic::Request::new(PathRequest {
|
||||||
path: path.to_string(),
|
path: path.to_string(),
|
||||||
@@ -321,87 +219,13 @@ impl FurumiClient {
|
|||||||
let mut entries = Vec::new();
|
let mut entries = Vec::new();
|
||||||
|
|
||||||
while let Some(chunk) = stream.next().await {
|
while let Some(chunk) = stream.next().await {
|
||||||
entries.push(chunk?);
|
let entry = chunk?;
|
||||||
|
entries.push(entry);
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(
|
|
||||||
"fetch_dir: '{}' — {} entries in {:.1}ms",
|
|
||||||
path,
|
|
||||||
entries.len(),
|
|
||||||
t.elapsed().as_secs_f32() * 1000.0
|
|
||||||
);
|
|
||||||
let entries = Arc::new(entries);
|
|
||||||
self.dir_cache.insert(path.to_string(), entries.clone()).await;
|
|
||||||
Ok(entries)
|
Ok(entries)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reads directory contents, utilizing an internal cache.
|
|
||||||
/// On cache miss, fetches from server and spawns a background task to
|
|
||||||
/// prefetch attributes and immediate subdirectory listings.
|
|
||||||
pub async fn read_dir(&self, path: &str) -> Result<Vec<DirEntry>> {
|
|
||||||
if let Some(entries) = self.dir_cache.get(path).await {
|
|
||||||
debug!("read_dir: cache hit '{}' ({} entries)", path, entries.len());
|
|
||||||
return Ok((*entries).clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("read_dir: cache miss '{}' — fetching from server", path);
|
|
||||||
let entries = self.fetch_and_cache_dir(path).await?;
|
|
||||||
|
|
||||||
let self_clone = self.clone();
|
|
||||||
let path_clone = path.to_string();
|
|
||||||
let entries_clone = entries.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
self_clone.prefetch_children(&path_clone, &entries_clone).await;
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok((*entries).clone())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Background: warms attr_cache for all children and dir_cache for immediate subdirs.
|
|
||||||
async fn prefetch_children(&self, parent: &str, entries: &[DirEntry]) {
|
|
||||||
let dirs: Vec<_> = entries.iter().filter(|e| e.r#type == 4).collect();
|
|
||||||
let files: Vec<_> = entries.iter().filter(|e| e.r#type != 4).collect();
|
|
||||||
debug!(
|
|
||||||
"prefetch: '{}' — warming {} attrs, {} subdirs",
|
|
||||||
parent,
|
|
||||||
entries.len(),
|
|
||||||
dirs.len().min(20)
|
|
||||||
);
|
|
||||||
|
|
||||||
// Warm attr_cache for all children.
|
|
||||||
for entry in entries {
|
|
||||||
let child_path = if parent == "/" {
|
|
||||||
format!("/{}", entry.name)
|
|
||||||
} else {
|
|
||||||
format!("{}/{}", parent, entry.name)
|
|
||||||
};
|
|
||||||
let _ = self.get_attr(&child_path).await;
|
|
||||||
}
|
|
||||||
let _ = files; // suppress unused warning
|
|
||||||
|
|
||||||
// Prefetch dir listings for immediate subdirs (up to 20).
|
|
||||||
let subdirs: Vec<_> = dirs.into_iter().take(20).collect();
|
|
||||||
let mut fetched = 0;
|
|
||||||
let mut already_cached = 0;
|
|
||||||
for subdir in &subdirs {
|
|
||||||
let child_path = if parent == "/" {
|
|
||||||
format!("/{}", subdir.name)
|
|
||||||
} else {
|
|
||||||
format!("{}/{}", parent, subdir.name)
|
|
||||||
};
|
|
||||||
if self.dir_cache.get(&child_path).await.is_none() {
|
|
||||||
let _ = self.fetch_and_cache_dir(&child_path).await;
|
|
||||||
fetched += 1;
|
|
||||||
} else {
|
|
||||||
already_cached += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
debug!(
|
|
||||||
"prefetch: '{}' done — {} subdirs fetched, {} already cached",
|
|
||||||
parent, fetched, already_cached
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Fetches file chunk stream from the server.
|
/// Fetches file chunk stream from the server.
|
||||||
pub async fn read_file(
|
pub async fn read_file(
|
||||||
&self,
|
&self,
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "furumi-common"
|
name = "furumi-common"
|
||||||
version = "0.2.1"
|
version = "0.3.4"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
@@ -29,40 +29,6 @@ message FileChunk {
|
|||||||
bytes data = 1;
|
bytes data = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Snapshot & watch ──────────────────────────────────────────────
|
|
||||||
|
|
||||||
// Request a pre-built snapshot of the directory tree up to `depth` levels.
|
|
||||||
// depth = 0 means only the requested path itself; depth = 1 includes immediate children, etc.
|
|
||||||
message SnapshotRequest {
|
|
||||||
string path = 1;
|
|
||||||
uint32 depth = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
// One directory's contents within a snapshot response.
|
|
||||||
// child_attrs is parallel to children: child_attrs[i] is the AttrResponse for children[i].
|
|
||||||
// dir_attr is the AttrResponse for the directory itself (path).
|
|
||||||
message SnapshotEntry {
|
|
||||||
string path = 1;
|
|
||||||
repeated DirEntry children = 2;
|
|
||||||
repeated AttrResponse child_attrs = 3;
|
|
||||||
AttrResponse dir_attr = 4;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Subscribe to live filesystem change notifications (no parameters needed).
|
|
||||||
message WatchRequest {}
|
|
||||||
|
|
||||||
enum ChangeKind {
|
|
||||||
CREATED = 0;
|
|
||||||
DELETED = 1;
|
|
||||||
MODIFIED = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Notifies the client that the contents of `path` have changed.
|
|
||||||
message ChangeEvent {
|
|
||||||
string path = 1;
|
|
||||||
ChangeKind kind = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
service RemoteFileSystem {
|
service RemoteFileSystem {
|
||||||
// Get file or directory attributes (size, permissions, timestamps). Maps to stat/getattr.
|
// Get file or directory attributes (size, permissions, timestamps). Maps to stat/getattr.
|
||||||
rpc GetAttr (PathRequest) returns (AttrResponse);
|
rpc GetAttr (PathRequest) returns (AttrResponse);
|
||||||
@@ -72,12 +38,4 @@ service RemoteFileSystem {
|
|||||||
|
|
||||||
// Read chunks of a file. Uses Server Streaming for efficient chunk delivery based on offset/size.
|
// Read chunks of a file. Uses Server Streaming for efficient chunk delivery based on offset/size.
|
||||||
rpc ReadFile (ReadRequest) returns (stream FileChunk);
|
rpc ReadFile (ReadRequest) returns (stream FileChunk);
|
||||||
|
|
||||||
// Return a pre-built in-memory snapshot of the directory tree rooted at `path`.
|
|
||||||
// The server walks `depth` levels deep on its side — one round-trip fills the client cache.
|
|
||||||
rpc GetSnapshot (SnapshotRequest) returns (stream SnapshotEntry);
|
|
||||||
|
|
||||||
// Subscribe to live filesystem change events. The server pushes a ChangeEvent whenever
|
|
||||||
// a directory's contents change, allowing the client to invalidate its cache immediately.
|
|
||||||
rpc WatchChanges (WatchRequest) returns (stream ChangeEvent);
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "furumi-mount-linux"
|
name = "furumi-mount-linux"
|
||||||
version = "0.2.1"
|
version = "0.3.4"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ use std::time::{Duration, UNIX_EPOCH};
|
|||||||
use tracing::{debug, error};
|
use tracing::{debug, error};
|
||||||
use tokio::runtime::Handle;
|
use tokio::runtime::Handle;
|
||||||
|
|
||||||
const TTL: Duration = Duration::from_secs(5); // 5 second FUSE kernel TTL (matches attr_cache)
|
const TTL: Duration = Duration::from_secs(1); // 1 second FUSE kernel TTL
|
||||||
|
|
||||||
// ── InodeMapper ──────────────────────────────────────────────────
|
// ── InodeMapper ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|||||||
@@ -64,9 +64,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
return Err(format!("Failed to authenticate or connect to server: {}", e).into());
|
return Err(format!("Failed to authenticate or connect to server: {}", e).into());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Auth verified — start background snapshot + watch sync
|
|
||||||
c.start_background_sync();
|
|
||||||
|
|
||||||
Ok::<_, Box<dyn std::error::Error>>(c)
|
Ok::<_, Box<dyn std::error::Error>>(c)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
@@ -78,7 +75,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
MountOption::NoExec, // Better security for media mount
|
MountOption::NoExec, // Better security for media mount
|
||||||
];
|
];
|
||||||
|
|
||||||
println!("Mounting Furumi-ng to {:?}", args.mount);
|
println!("Mounting Furumi-ng v{} to {:?}", env!("CARGO_PKG_VERSION"), args.mount);
|
||||||
|
|
||||||
// Use Session + BackgroundSession for graceful unmount on exit
|
// Use Session + BackgroundSession for graceful unmount on exit
|
||||||
let session = Session::new(fuse_fs, &args.mount, &options)?;
|
let session = Session::new(fuse_fs, &args.mount, &options)?;
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "furumi-mount-macos"
|
name = "furumi-mount-macos"
|
||||||
version = "0.2.1"
|
version = "0.3.4"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
@@ -58,11 +58,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
format!("https://{}", args.server)
|
format!("https://{}", args.server)
|
||||||
};
|
};
|
||||||
|
|
||||||
let client = rt.block_on(async {
|
let client = rt.block_on(async { FurumiClient::connect(&full_addr, &args.token).await })?;
|
||||||
let c = FurumiClient::connect(&full_addr, &args.token).await?;
|
|
||||||
c.start_background_sync();
|
|
||||||
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(c)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let furumi_nfs = nfs::FurumiNfs::new(client);
|
let furumi_nfs = nfs::FurumiNfs::new(client);
|
||||||
|
|
||||||
@@ -112,7 +108,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
std::process::exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
println!("Mounted Furumi-ng to {:?}", mount_path);
|
println!("Mounted Furumi-ng v{} to {:?}", env!("CARGO_PKG_VERSION"), mount_path);
|
||||||
|
|
||||||
// Wait for shutdown signal
|
// Wait for shutdown signal
|
||||||
while running.load(Ordering::SeqCst) {
|
while running.load(Ordering::SeqCst) {
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "furumi-server"
|
name = "furumi-server"
|
||||||
version = "0.2.1"
|
version = "0.3.4"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
@@ -18,16 +18,31 @@ rustls = { version = "0.23.37", features = ["ring"] }
|
|||||||
thiserror = "2.0.18"
|
thiserror = "2.0.18"
|
||||||
tokio = { version = "1.50.0", features = ["full"] }
|
tokio = { version = "1.50.0", features = ["full"] }
|
||||||
tokio-stream = "0.1.18"
|
tokio-stream = "0.1.18"
|
||||||
|
tokio-util = { version = "0.7", features = ["io"] }
|
||||||
tonic = { version = "0.12.3", features = ["tls"] }
|
tonic = { version = "0.12.3", features = ["tls"] }
|
||||||
tracing = "0.1.44"
|
tracing = "0.1.44"
|
||||||
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
|
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
|
||||||
async-stream = "0.3.6"
|
async-stream = "0.3.6"
|
||||||
async-trait = "0.1.89"
|
async-trait = "0.1.89"
|
||||||
prometheus = { version = "0.14.0", features = ["process"] }
|
prometheus = { version = "0.14.0", features = ["process"] }
|
||||||
axum = { version = "0.7", features = ["tokio"] }
|
axum = { version = "0.7", features = ["tokio", "macros"] }
|
||||||
once_cell = "1.21.3"
|
once_cell = "1.21.3"
|
||||||
rcgen = { version = "0.14.7", features = ["pem"] }
|
rcgen = { version = "0.14.7", features = ["pem"] }
|
||||||
notify = "6"
|
symphonia = { version = "0.5", default-features = false, features = ["mp3", "aac", "flac", "vorbis", "wav", "alac", "adpcm", "pcm", "mpa", "isomp4", "ogg", "aiff", "mkv"] }
|
||||||
|
opus = "0.3"
|
||||||
|
ogg = "0.9"
|
||||||
|
mime_guess = "2.0"
|
||||||
|
tower = { version = "0.4", features = ["util"] }
|
||||||
|
sha2 = "0.10"
|
||||||
|
base64 = "0.22"
|
||||||
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
serde_json = "1"
|
||||||
|
openidconnect = "3.4"
|
||||||
|
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
|
||||||
|
hmac = "0.12"
|
||||||
|
rand = "0.8"
|
||||||
|
encoding_rs = "0.8"
|
||||||
|
urlencoding = "2.1.3"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tempfile = "3.26.0"
|
tempfile = "3.26.0"
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ pub mod vfs;
|
|||||||
pub mod security;
|
pub mod security;
|
||||||
pub mod server;
|
pub mod server;
|
||||||
pub mod metrics;
|
pub mod metrics;
|
||||||
pub mod tree;
|
pub mod web;
|
||||||
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
@@ -34,9 +34,37 @@ struct Args {
|
|||||||
#[arg(long, env = "FURUMI_METRICS_BIND", default_value = "0.0.0.0:9090")]
|
#[arg(long, env = "FURUMI_METRICS_BIND", default_value = "0.0.0.0:9090")]
|
||||||
metrics_bind: String,
|
metrics_bind: String,
|
||||||
|
|
||||||
|
/// IP address and port for the web music player
|
||||||
|
#[arg(long, env = "FURUMI_WEB_BIND", default_value = "0.0.0.0:8080")]
|
||||||
|
web_bind: String,
|
||||||
|
|
||||||
|
/// Disable the web music player UI
|
||||||
|
#[arg(long, default_value_t = false)]
|
||||||
|
no_web: bool,
|
||||||
|
|
||||||
/// Disable TLS encryption (not recommended, use only for debugging)
|
/// Disable TLS encryption (not recommended, use only for debugging)
|
||||||
#[arg(long, default_value_t = false)]
|
#[arg(long, default_value_t = false)]
|
||||||
no_tls: bool,
|
no_tls: bool,
|
||||||
|
|
||||||
|
/// OIDC Issuer URL (e.g. https://auth.example.com/application/o/furumi/)
|
||||||
|
#[arg(long, env = "FURUMI_OIDC_ISSUER_URL")]
|
||||||
|
oidc_issuer_url: Option<String>,
|
||||||
|
|
||||||
|
/// OIDC Client ID
|
||||||
|
#[arg(long, env = "FURUMI_OIDC_CLIENT_ID")]
|
||||||
|
oidc_client_id: Option<String>,
|
||||||
|
|
||||||
|
/// OIDC Client Secret
|
||||||
|
#[arg(long, env = "FURUMI_OIDC_CLIENT_SECRET")]
|
||||||
|
oidc_client_secret: Option<String>,
|
||||||
|
|
||||||
|
/// OIDC Redirect URL (e.g. https://music.example.com/auth/callback)
|
||||||
|
#[arg(long, env = "FURUMI_OIDC_REDIRECT_URL")]
|
||||||
|
oidc_redirect_url: Option<String>,
|
||||||
|
|
||||||
|
/// OIDC Session Secret (32+ chars, for HMAC). If not provided, a random one is generated on startup.
|
||||||
|
#[arg(long, env = "FURUMI_OIDC_SESSION_SECRET")]
|
||||||
|
oidc_session_secret: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn metrics_handler() -> String {
|
async fn metrics_handler() -> String {
|
||||||
@@ -75,13 +103,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let vfs = Arc::new(LocalVfs::new(&root_path));
|
let vfs = Arc::new(LocalVfs::new(&root_path));
|
||||||
let tree = Arc::new(tree::WatchedTree::new(root_path.clone()).await?);
|
let remote_fs = RemoteFileSystemImpl::new(vfs);
|
||||||
let remote_fs = RemoteFileSystemImpl::new(vfs, tree);
|
|
||||||
let auth = AuthInterceptor::new(args.token.clone());
|
let auth = AuthInterceptor::new(args.token.clone());
|
||||||
let svc = RemoteFileSystemServer::with_interceptor(remote_fs, auth.clone());
|
let svc = RemoteFileSystemServer::with_interceptor(remote_fs, auth.clone());
|
||||||
|
|
||||||
// Print startup info
|
// Print startup info
|
||||||
println!("Furumi-ng Server listening on {}", addr);
|
println!("Furumi-ng Server v{} listening on {}", env!("CARGO_PKG_VERSION"), addr);
|
||||||
if args.no_tls {
|
if args.no_tls {
|
||||||
println!("WARNING: TLS is DISABLED — traffic is unencrypted");
|
println!("WARNING: TLS is DISABLED — traffic is unencrypted");
|
||||||
} else {
|
} else {
|
||||||
@@ -102,6 +129,40 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
axum::serve(metrics_listener, metrics_app).await.unwrap();
|
axum::serve(metrics_listener, metrics_app).await.unwrap();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Spawn the web music player on its own port
|
||||||
|
if !args.no_web {
|
||||||
|
let web_addr: SocketAddr = args.web_bind.parse().unwrap_or_else(|e| {
|
||||||
|
eprintln!("Error: Invalid web bind address '{}': {}", args.web_bind, e);
|
||||||
|
std::process::exit(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Initialize OIDC State if provided
|
||||||
|
let oidc_state = if let (Some(issuer), Some(client_id), Some(secret), Some(redirect)) = (
|
||||||
|
args.oidc_issuer_url,
|
||||||
|
args.oidc_client_id,
|
||||||
|
args.oidc_client_secret,
|
||||||
|
args.oidc_redirect_url,
|
||||||
|
) {
|
||||||
|
println!("OIDC (SSO): enabled for web UI (issuer: {})", issuer);
|
||||||
|
match web::auth::oidc_init(issuer, client_id, secret, redirect, args.oidc_session_secret).await {
|
||||||
|
Ok(state) => Some(Arc::new(state)),
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("Error initializing OIDC client: {}", e);
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let web_app = web::build_router(root_path.clone(), args.token.clone(), oidc_state);
|
||||||
|
let web_listener = tokio::net::TcpListener::bind(web_addr).await?;
|
||||||
|
println!("Web player: http://{}", web_addr);
|
||||||
|
tokio::spawn(async move {
|
||||||
|
axum::serve(web_listener, web_app).await.unwrap();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
let mut builder = Server::builder()
|
let mut builder = Server::builder()
|
||||||
.tcp_keepalive(Some(std::time::Duration::from_secs(60)))
|
.tcp_keepalive(Some(std::time::Duration::from_secs(60)))
|
||||||
.http2_keepalive_interval(Some(std::time::Duration::from_secs(60)));
|
.http2_keepalive_interval(Some(std::time::Duration::from_secs(60)));
|
||||||
|
|||||||
@@ -1,27 +1,22 @@
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use tokio::sync::broadcast;
|
|
||||||
use tokio_stream::Stream;
|
use tokio_stream::Stream;
|
||||||
use tonic::{Request, Response, Status};
|
use tonic::{Request, Response, Status};
|
||||||
|
|
||||||
use crate::metrics::{self, RequestTimer};
|
|
||||||
use crate::security::sanitize_path;
|
|
||||||
use crate::tree::WatchedTree;
|
|
||||||
use crate::vfs::VirtualFileSystem;
|
use crate::vfs::VirtualFileSystem;
|
||||||
|
use crate::metrics::{self, RequestTimer};
|
||||||
use furumi_common::proto::{
|
use furumi_common::proto::{
|
||||||
remote_file_system_server::RemoteFileSystem, AttrResponse, ChangeEvent, DirEntry, FileChunk,
|
remote_file_system_server::RemoteFileSystem, AttrResponse, DirEntry, FileChunk,
|
||||||
PathRequest, ReadRequest, SnapshotEntry, SnapshotRequest, WatchRequest,
|
PathRequest, ReadRequest,
|
||||||
};
|
};
|
||||||
|
use crate::security::sanitize_path;
|
||||||
|
|
||||||
pub struct RemoteFileSystemImpl<V: VirtualFileSystem> {
|
pub struct RemoteFileSystemImpl<V: VirtualFileSystem> {
|
||||||
vfs: Arc<V>,
|
vfs: std::sync::Arc<V>,
|
||||||
tree: Arc<WatchedTree>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<V: VirtualFileSystem> RemoteFileSystemImpl<V> {
|
impl<V: VirtualFileSystem> RemoteFileSystemImpl<V> {
|
||||||
pub fn new(vfs: Arc<V>, tree: Arc<WatchedTree>) -> Self {
|
pub fn new(vfs: std::sync::Arc<V>) -> Self {
|
||||||
Self { vfs, tree }
|
Self { vfs }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -48,7 +43,11 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type ReadDirStream = Pin<Box<dyn Stream<Item = Result<DirEntry, Status>> + Send + 'static>>;
|
type ReadDirStream = Pin<
|
||||||
|
Box<
|
||||||
|
dyn Stream<Item = Result<DirEntry, Status>> + Send + 'static,
|
||||||
|
>,
|
||||||
|
>;
|
||||||
|
|
||||||
async fn read_dir(
|
async fn read_dir(
|
||||||
&self,
|
&self,
|
||||||
@@ -79,7 +78,11 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type ReadFileStream = Pin<Box<dyn Stream<Item = Result<FileChunk, Status>> + Send + 'static>>;
|
type ReadFileStream = Pin<
|
||||||
|
Box<
|
||||||
|
dyn Stream<Item = Result<FileChunk, Status>> + Send + 'static,
|
||||||
|
>,
|
||||||
|
>;
|
||||||
|
|
||||||
async fn read_file(
|
async fn read_file(
|
||||||
&self,
|
&self,
|
||||||
@@ -123,74 +126,5 @@ impl<V: VirtualFileSystem> RemoteFileSystem for RemoteFileSystemImpl<V> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Snapshot ─────────────────────────────────────────────────
|
|
||||||
|
|
||||||
type GetSnapshotStream =
|
|
||||||
Pin<Box<dyn Stream<Item = Result<SnapshotEntry, Status>> + Send + 'static>>;
|
|
||||||
|
|
||||||
async fn get_snapshot(
|
|
||||||
&self,
|
|
||||||
request: Request<SnapshotRequest>,
|
|
||||||
) -> Result<Response<Self::GetSnapshotStream>, Status> {
|
|
||||||
let req = request.into_inner();
|
|
||||||
let safe_path = sanitize_path(&req.path)?;
|
|
||||||
// sanitize_path strips the leading "/" — map "" back to "/" for snapshot lookup.
|
|
||||||
let virt_path = sanitized_to_virt(&safe_path);
|
|
||||||
|
|
||||||
let entries = self.tree.get_snapshot(&virt_path, req.depth).await;
|
|
||||||
let total_entries: usize = entries.iter().map(|(_, d)| d.children.len()).sum();
|
|
||||||
tracing::debug!(
|
|
||||||
"GetSnapshot: path='{}' depth={} → {} dirs, {} total entries",
|
|
||||||
virt_path, req.depth, entries.len(), total_entries
|
|
||||||
);
|
|
||||||
|
|
||||||
let stream = async_stream::try_stream! {
|
|
||||||
for (path, snap_dir) in entries {
|
|
||||||
yield SnapshotEntry {
|
|
||||||
path,
|
|
||||||
children: snap_dir.children,
|
|
||||||
child_attrs: snap_dir.child_attrs,
|
|
||||||
dir_attr: Some(snap_dir.dir_attr),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
};
|
|
||||||
Ok(Response::new(Box::pin(stream) as Self::GetSnapshotStream))
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Watch ─────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
type WatchChangesStream =
|
|
||||||
Pin<Box<dyn Stream<Item = Result<ChangeEvent, Status>> + Send + 'static>>;
|
|
||||||
|
|
||||||
async fn watch_changes(
|
|
||||||
&self,
|
|
||||||
_request: Request<WatchRequest>,
|
|
||||||
) -> Result<Response<Self::WatchChangesStream>, Status> {
|
|
||||||
let mut rx = self.tree.subscribe();
|
|
||||||
|
|
||||||
let stream = async_stream::try_stream! {
|
|
||||||
loop {
|
|
||||||
match rx.recv().await {
|
|
||||||
Ok(event) => yield event,
|
|
||||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
|
||||||
// Client was too slow — it missed n events.
|
|
||||||
// Log and continue; the client's TTL will cover the gap.
|
|
||||||
tracing::warn!("WatchChanges client lagged, skipped {} events", n);
|
|
||||||
}
|
|
||||||
Err(broadcast::error::RecvError::Closed) => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
Ok(Response::new(Box::pin(stream) as Self::WatchChangesStream))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// sanitize_path removes the leading "/" so "/" becomes "". Map it back.
|
|
||||||
fn sanitized_to_virt(safe: &str) -> String {
|
|
||||||
if safe.is_empty() {
|
|
||||||
"/".to_string()
|
|
||||||
} else {
|
|
||||||
format!("/{}", safe)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -1,327 +0,0 @@
|
|||||||
use std::collections::{HashMap, HashSet, VecDeque};
|
|
||||||
use std::os::unix::fs::MetadataExt;
|
|
||||||
use std::path::{Path, PathBuf};
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
|
|
||||||
use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
|
|
||||||
use tokio::sync::{broadcast, RwLock};
|
|
||||||
use tracing::{debug, info, warn};
|
|
||||||
|
|
||||||
use furumi_common::proto::{AttrResponse, ChangeEvent, ChangeKind, DirEntry};
|
|
||||||
|
|
||||||
/// How many directory levels to pre-walk on startup.
|
|
||||||
const INITIAL_DEPTH: u32 = 3;
|
|
||||||
|
|
||||||
/// Broadcast channel capacity — clients that fall behind lose events and rely on TTL.
|
|
||||||
const BROADCAST_CAPACITY: usize = 256;
|
|
||||||
|
|
||||||
// ── Types ─────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
/// One directory in the snapshot: its entries and the attr for each child.
|
|
||||||
/// `children` and `child_attrs` are parallel slices (same index = same file).
|
|
||||||
/// `dir_attr` is the attr of the directory itself.
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct SnapDir {
|
|
||||||
pub children: Vec<DirEntry>,
|
|
||||||
pub child_attrs: Vec<AttrResponse>,
|
|
||||||
pub dir_attr: AttrResponse,
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── WatchedTree ──────────────────────────────────────────────────
|
|
||||||
|
|
||||||
/// Maintains an in-memory snapshot of the directory tree and broadcasts
|
|
||||||
/// change events to connected clients via inotify.
|
|
||||||
pub struct WatchedTree {
|
|
||||||
/// Virtual-path → (entries + attrs).
|
|
||||||
snapshot: Arc<RwLock<HashMap<String, SnapDir>>>,
|
|
||||||
change_tx: broadcast::Sender<ChangeEvent>,
|
|
||||||
/// Kept alive to continue watching. Shared with the event handler so it can
|
|
||||||
/// add new watches when directories are created at runtime.
|
|
||||||
_watcher: Arc<Mutex<RecommendedWatcher>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl WatchedTree {
|
|
||||||
pub async fn new(root: PathBuf) -> anyhow::Result<Self> {
|
|
||||||
let snapshot: Arc<RwLock<HashMap<String, SnapDir>>> =
|
|
||||||
Arc::new(RwLock::new(HashMap::new()));
|
|
||||||
let (change_tx, _) = broadcast::channel(BROADCAST_CAPACITY);
|
|
||||||
|
|
||||||
info!("WatchedTree: walking '{}' (depth {})…", root.display(), INITIAL_DEPTH);
|
|
||||||
let t = std::time::Instant::now();
|
|
||||||
let watched_dirs = walk_tree(&root, INITIAL_DEPTH, &snapshot).await;
|
|
||||||
let snap_len = snapshot.read().await.len();
|
|
||||||
let total_entries: usize = snapshot.read().await.values().map(|d| d.children.len()).sum();
|
|
||||||
info!(
|
|
||||||
"WatchedTree: snapshot ready — {} dirs, {} entries, {} watches, took {:.1}s",
|
|
||||||
snap_len,
|
|
||||||
total_entries,
|
|
||||||
watched_dirs.len(),
|
|
||||||
t.elapsed().as_secs_f32(),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Bridge notify's sync callback → async tokio task.
|
|
||||||
let (notify_tx, mut notify_rx) =
|
|
||||||
tokio::sync::mpsc::unbounded_channel::<notify::Result<notify::Event>>();
|
|
||||||
let watcher = Arc::new(Mutex::new(RecommendedWatcher::new(
|
|
||||||
move |res| {
|
|
||||||
let _ = notify_tx.send(res);
|
|
||||||
},
|
|
||||||
Config::default(),
|
|
||||||
)?));
|
|
||||||
|
|
||||||
// Add one non-recursive inotify watch per directory in the snapshot.
|
|
||||||
{
|
|
||||||
let mut w = watcher.lock().unwrap();
|
|
||||||
for dir_abs in &watched_dirs {
|
|
||||||
if let Err(e) = w.watch(dir_abs, RecursiveMode::NonRecursive) {
|
|
||||||
warn!("watch failed for {:?}: {}", dir_abs, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let snapshot_bg = Arc::clone(&snapshot);
|
|
||||||
let root_bg = root.clone();
|
|
||||||
let tx_bg = change_tx.clone();
|
|
||||||
let watcher_bg = Arc::clone(&watcher);
|
|
||||||
tokio::spawn(async move {
|
|
||||||
while let Some(res) = notify_rx.recv().await {
|
|
||||||
match res {
|
|
||||||
Ok(event) => {
|
|
||||||
handle_fs_event(event, &root_bg, &snapshot_bg, &tx_bg, &watcher_bg).await
|
|
||||||
}
|
|
||||||
Err(e) => warn!("notify error: {}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
snapshot,
|
|
||||||
change_tx,
|
|
||||||
_watcher: watcher,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns all snapshot entries within `depth` levels of `base`.
|
|
||||||
pub async fn get_snapshot(&self, base: &str, depth: u32) -> Vec<(String, SnapDir)> {
|
|
||||||
let snap = self.snapshot.read().await;
|
|
||||||
snap.iter()
|
|
||||||
.filter(|(path, _)| path_depth_from(base, path).map_or(false, |d| d <= depth))
|
|
||||||
.map(|(path, snap_dir)| (path.clone(), snap_dir.clone()))
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn subscribe(&self) -> broadcast::Receiver<ChangeEvent> {
|
|
||||||
self.change_tx.subscribe()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Helpers ──────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
fn metadata_to_attr(meta: &std::fs::Metadata) -> AttrResponse {
|
|
||||||
AttrResponse {
|
|
||||||
size: meta.len(),
|
|
||||||
mode: meta.mode(),
|
|
||||||
mtime: meta.mtime() as u64,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// BFS walk: reads filesystem, stores entries + attrs in snapshot.
|
|
||||||
/// Returns the list of absolute paths walked (for inotify setup).
|
|
||||||
async fn walk_tree(
|
|
||||||
root: &Path,
|
|
||||||
max_depth: u32,
|
|
||||||
snapshot: &Arc<RwLock<HashMap<String, SnapDir>>>,
|
|
||||||
) -> Vec<PathBuf> {
|
|
||||||
let mut walked: Vec<PathBuf> = Vec::new();
|
|
||||||
let mut queue: VecDeque<(PathBuf, String, u32)> = VecDeque::new();
|
|
||||||
queue.push_back((root.to_path_buf(), "/".to_string(), 0));
|
|
||||||
|
|
||||||
while let Some((abs_path, virt_path, depth)) = queue.pop_front() {
|
|
||||||
// Stat the directory itself.
|
|
||||||
let dir_attr = match std::fs::metadata(&abs_path) {
|
|
||||||
Ok(m) => metadata_to_attr(&m),
|
|
||||||
Err(e) => {
|
|
||||||
warn!("walk_tree: cannot stat {:?}: {}", abs_path, e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut dir = match tokio::fs::read_dir(&abs_path).await {
|
|
||||||
Ok(d) => d,
|
|
||||||
Err(e) => {
|
|
||||||
warn!("walk_tree: cannot read {:?}: {}", abs_path, e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut children: Vec<DirEntry> = Vec::new();
|
|
||||||
let mut child_attrs: Vec<AttrResponse> = Vec::new();
|
|
||||||
|
|
||||||
while let Ok(Some(entry)) = dir.next_entry().await {
|
|
||||||
let Ok(ft) = entry.file_type().await else { continue };
|
|
||||||
let type_val = if ft.is_dir() { 4 } else if ft.is_file() { 8 } else { continue };
|
|
||||||
let name = entry.file_name().to_string_lossy().into_owned();
|
|
||||||
|
|
||||||
// Stat the child.
|
|
||||||
let attr = match entry.metadata().await {
|
|
||||||
Ok(m) => metadata_to_attr(&m),
|
|
||||||
Err(_) => AttrResponse { size: 0, mode: 0, mtime: 0 },
|
|
||||||
};
|
|
||||||
|
|
||||||
// Skip hidden directories to avoid exploding the watch list.
|
|
||||||
if ft.is_dir() && name.starts_with('.') {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ft.is_dir() && depth < max_depth {
|
|
||||||
let child_virt = child_virt_path(&virt_path, &name);
|
|
||||||
queue.push_back((entry.path(), child_virt, depth + 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
children.push(DirEntry { name, r#type: type_val });
|
|
||||||
child_attrs.push(attr);
|
|
||||||
}
|
|
||||||
|
|
||||||
snapshot.write().await.insert(virt_path, SnapDir { children, child_attrs, dir_attr });
|
|
||||||
walked.push(abs_path);
|
|
||||||
}
|
|
||||||
|
|
||||||
walked
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Re-reads a single directory and updates its snapshot entry.
|
|
||||||
async fn refresh_dir(
|
|
||||||
abs_path: &Path,
|
|
||||||
virt_path: &str,
|
|
||||||
snapshot: &Arc<RwLock<HashMap<String, SnapDir>>>,
|
|
||||||
) {
|
|
||||||
let dir_attr = match std::fs::metadata(abs_path) {
|
|
||||||
Ok(m) => metadata_to_attr(&m),
|
|
||||||
Err(_) => {
|
|
||||||
snapshot.write().await.remove(virt_path);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut dir = match tokio::fs::read_dir(abs_path).await {
|
|
||||||
Ok(d) => d,
|
|
||||||
Err(_) => {
|
|
||||||
snapshot.write().await.remove(virt_path);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut children: Vec<DirEntry> = Vec::new();
|
|
||||||
let mut child_attrs: Vec<AttrResponse> = Vec::new();
|
|
||||||
|
|
||||||
while let Ok(Some(entry)) = dir.next_entry().await {
|
|
||||||
let Ok(ft) = entry.file_type().await else { continue };
|
|
||||||
let type_val = if ft.is_dir() { 4 } else if ft.is_file() { 8 } else { continue };
|
|
||||||
let attr = match entry.metadata().await {
|
|
||||||
Ok(m) => metadata_to_attr(&m),
|
|
||||||
Err(_) => AttrResponse { size: 0, mode: 0, mtime: 0 },
|
|
||||||
};
|
|
||||||
children.push(DirEntry {
|
|
||||||
name: entry.file_name().to_string_lossy().into_owned(),
|
|
||||||
r#type: type_val,
|
|
||||||
});
|
|
||||||
child_attrs.push(attr);
|
|
||||||
}
|
|
||||||
|
|
||||||
snapshot.write().await.insert(virt_path.to_string(), SnapDir { children, child_attrs, dir_attr });
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_fs_event(
|
|
||||||
event: notify::Event,
|
|
||||||
root: &Path,
|
|
||||||
snapshot: &Arc<RwLock<HashMap<String, SnapDir>>>,
|
|
||||||
tx: &broadcast::Sender<ChangeEvent>,
|
|
||||||
watcher: &Arc<Mutex<RecommendedWatcher>>,
|
|
||||||
) {
|
|
||||||
use notify::EventKind;
|
|
||||||
|
|
||||||
let proto_kind = match &event.kind {
|
|
||||||
EventKind::Create(_) => ChangeKind::Created,
|
|
||||||
EventKind::Remove(_) => ChangeKind::Deleted,
|
|
||||||
EventKind::Modify(_) => ChangeKind::Modified,
|
|
||||||
_ => return,
|
|
||||||
};
|
|
||||||
let kind_i32 = proto_kind as i32;
|
|
||||||
|
|
||||||
if matches!(event.kind, EventKind::Create(_)) {
|
|
||||||
for path in &event.paths {
|
|
||||||
if path.is_dir()
|
|
||||||
&& !path.file_name().map_or(false, |n| n.to_string_lossy().starts_with('.'))
|
|
||||||
{
|
|
||||||
let mut w = watcher.lock().unwrap();
|
|
||||||
if let Err(e) = w.watch(path, RecursiveMode::NonRecursive) {
|
|
||||||
warn!("failed to add watch for new dir {:?}: {}", path, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut parents: HashSet<PathBuf> = HashSet::new();
|
|
||||||
for path in &event.paths {
|
|
||||||
if let Some(parent) = path.parent() {
|
|
||||||
if parent.starts_with(root) {
|
|
||||||
parents.insert(parent.to_path_buf());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for parent_abs in parents {
|
|
||||||
let virt = abs_to_virt(root, &parent_abs);
|
|
||||||
debug!("snapshot refresh: {}", virt);
|
|
||||||
refresh_dir(&parent_abs, &virt, snapshot).await;
|
|
||||||
let _ = tx.send(ChangeEvent { path: virt, kind: kind_i32 });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn abs_to_virt(root: &Path, abs: &Path) -> String {
|
|
||||||
match abs.strip_prefix(root) {
|
|
||||||
Ok(rel) if rel.as_os_str().is_empty() => "/".to_string(),
|
|
||||||
Ok(rel) => format!("/{}", rel.to_string_lossy()),
|
|
||||||
Err(_) => "/".to_string(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn child_virt_path(parent: &str, name: &str) -> String {
|
|
||||||
if parent == "/" { format!("/{}", name) } else { format!("{}/{}", parent, name) }
|
|
||||||
}
|
|
||||||
|
|
||||||
fn path_depth_from(base: &str, path: &str) -> Option<u32> {
|
|
||||||
if base == "/" {
|
|
||||||
if path == "/" { return Some(0); }
|
|
||||||
let trimmed = path.trim_start_matches('/');
|
|
||||||
Some(trimmed.matches('/').count() as u32 + 1)
|
|
||||||
} else {
|
|
||||||
if path == base { return Some(0); }
|
|
||||||
let prefix = format!("{}/", base);
|
|
||||||
path.strip_prefix(prefix.as_str())
|
|
||||||
.map(|rest| rest.matches('/').count() as u32 + 1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_path_depth_from_root() {
|
|
||||||
assert_eq!(path_depth_from("/", "/"), Some(0));
|
|
||||||
assert_eq!(path_depth_from("/", "/a"), Some(1));
|
|
||||||
assert_eq!(path_depth_from("/", "/a/b"), Some(2));
|
|
||||||
assert_eq!(path_depth_from("/", "/a/b/c"), Some(3));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_path_depth_from_subdir() {
|
|
||||||
assert_eq!(path_depth_from("/movies", "/movies"), Some(0));
|
|
||||||
assert_eq!(path_depth_from("/movies", "/movies/action"), Some(1));
|
|
||||||
assert_eq!(path_depth_from("/movies", "/movies/action/marvel"), Some(2));
|
|
||||||
assert_eq!(path_depth_from("/movies", "/music"), None);
|
|
||||||
assert_eq!(path_depth_from("/movies", "/movies-extra"), None);
|
|
||||||
assert_eq!(path_depth_from("/movies", "/"), None);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
521
furumi-server/src/web/auth.rs
Normal file
521
furumi-server/src/web/auth.rs
Normal file
@@ -0,0 +1,521 @@
|
|||||||
|
use axum::{
|
||||||
|
body::Body,
|
||||||
|
extract::{Form, Request, State},
|
||||||
|
http::{header, HeaderMap, StatusCode},
|
||||||
|
middleware::Next,
|
||||||
|
response::{Html, IntoResponse, Redirect, Response},
|
||||||
|
};
|
||||||
|
use openidconnect::{
|
||||||
|
core::{CoreClient, CoreProviderMetadata, CoreResponseType},
|
||||||
|
reqwest::async_http_client,
|
||||||
|
AuthenticationFlow, AuthorizationCode, ClientId, ClientSecret, CsrfToken, IssuerUrl, Nonce,
|
||||||
|
PkceCodeChallenge, PkceCodeVerifier, RedirectUrl, Scope, TokenResponse,
|
||||||
|
};
|
||||||
|
use rand::RngCore;
|
||||||
|
use serde::Deserialize;
|
||||||
|
use sha2::{Digest, Sha256};
|
||||||
|
|
||||||
|
use base64::Engine;
|
||||||
|
use hmac::{Hmac, Mac};
|
||||||
|
|
||||||
|
use super::{OidcState, WebState};
|
||||||
|
|
||||||
|
/// Cookie name used to store the session token.
|
||||||
|
const SESSION_COOKIE: &str = "furumi_session";
|
||||||
|
|
||||||
|
fn esc(s: &str) -> String {
|
||||||
|
s.replace('&', "&")
|
||||||
|
.replace('<', "<")
|
||||||
|
.replace('>', ">")
|
||||||
|
.replace('"', """)
|
||||||
|
.replace('\'', "'")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Compute SHA-256 of the token as hex string (stored in cookie, not raw token).
|
||||||
|
pub fn token_hash(token: &str) -> String {
|
||||||
|
let mut h = Sha256::new();
|
||||||
|
h.update(token.as_bytes());
|
||||||
|
format!("{:x}", h.finalize())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn require_auth(
|
||||||
|
State(state): State<WebState>,
|
||||||
|
mut req: Request,
|
||||||
|
next: Next,
|
||||||
|
) -> Response {
|
||||||
|
// Auth disabled when token is empty
|
||||||
|
if state.token.is_empty() {
|
||||||
|
req.extensions_mut().insert(super::AuthUserInfo("Unauthenticated".to_string()));
|
||||||
|
return next.run(req).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let cookies = req
|
||||||
|
.headers()
|
||||||
|
.get(header::COOKIE)
|
||||||
|
.and_then(|v| v.to_str().ok())
|
||||||
|
.unwrap_or("");
|
||||||
|
|
||||||
|
let expected = token_hash(&state.token);
|
||||||
|
let mut authed_user = None;
|
||||||
|
for c in cookies.split(';') {
|
||||||
|
let c = c.trim();
|
||||||
|
if let Some(val) = c.strip_prefix(&format!("{}=", SESSION_COOKIE)) {
|
||||||
|
if val == expected {
|
||||||
|
authed_user = Some("Master Token".to_string());
|
||||||
|
break;
|
||||||
|
} else if let Some(oidc) = &state.oidc {
|
||||||
|
if let Some(user) = verify_sso_cookie(&oidc.session_secret, val) {
|
||||||
|
authed_user = Some(user);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(user) = authed_user {
|
||||||
|
req.extensions_mut().insert(super::AuthUserInfo(user));
|
||||||
|
next.run(req).await
|
||||||
|
} else {
|
||||||
|
let uri = req.uri().to_string();
|
||||||
|
if uri.starts_with("/api/") {
|
||||||
|
(StatusCode::UNAUTHORIZED, "Unauthorized").into_response()
|
||||||
|
} else {
|
||||||
|
let redirect_url = format!("/login?next={}", urlencoding::encode(&uri));
|
||||||
|
Redirect::to(&redirect_url).into_response()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type HmacSha256 = Hmac<sha2::Sha256>;
|
||||||
|
|
||||||
|
pub fn generate_sso_cookie(secret: &[u8], user_id: &str) -> String {
|
||||||
|
let mut mac = HmacSha256::new_from_slice(secret).unwrap();
|
||||||
|
mac.update(user_id.as_bytes());
|
||||||
|
let sig = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(mac.finalize().into_bytes());
|
||||||
|
format!("sso:{}:{}", user_id, sig)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn verify_sso_cookie(secret: &[u8], cookie_val: &str) -> Option<String> {
|
||||||
|
let parts: Vec<&str> = cookie_val.split(':').collect();
|
||||||
|
if parts.len() != 3 || parts[0] != "sso" {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let user_id = parts[1];
|
||||||
|
let sig = parts[2];
|
||||||
|
|
||||||
|
let mut mac = HmacSha256::new_from_slice(secret).unwrap();
|
||||||
|
mac.update(user_id.as_bytes());
|
||||||
|
|
||||||
|
let expected_sig = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(mac.finalize().into_bytes());
|
||||||
|
if sig == expected_sig {
|
||||||
|
Some(user_id.to_string())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct LoginQuery {
|
||||||
|
pub next: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// GET /login — show login form.
|
||||||
|
pub async fn login_page(
|
||||||
|
State(state): State<WebState>,
|
||||||
|
axum::extract::Query(query): axum::extract::Query<LoginQuery>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let token_enabled = !state.token.is_empty();
|
||||||
|
let oidc_enabled = state.oidc.is_some();
|
||||||
|
|
||||||
|
if !token_enabled && !oidc_enabled {
|
||||||
|
return Redirect::to("/").into_response();
|
||||||
|
}
|
||||||
|
|
||||||
|
let next_val = query.next.unwrap_or_else(|| "/".to_string());
|
||||||
|
let next_encoded = urlencoding::encode(&next_val);
|
||||||
|
|
||||||
|
let oidc_html = if oidc_enabled {
|
||||||
|
format!(
|
||||||
|
r#"<div class="divider"><span>OR</span></div>
|
||||||
|
<a href="/auth/login?next={}" class="btn-oidc">Log in with Authentik (SSO)</a>"#,
|
||||||
|
next_encoded
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
"".to_string()
|
||||||
|
};
|
||||||
|
|
||||||
|
let next_input = format!(r#"<input type="hidden" name="next" value="{}">"#, esc(&next_val));
|
||||||
|
|
||||||
|
let html = LOGIN_HTML
|
||||||
|
.replace("<!-- OIDC_PLACEHOLDER -->", &oidc_html)
|
||||||
|
.replace("<!-- NEXT_INPUT_PLACEHOLDER -->", &next_input);
|
||||||
|
Html(html).into_response()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct LoginForm {
|
||||||
|
password: String,
|
||||||
|
next: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// POST /login — validate password, set session cookie.
|
||||||
|
pub async fn login_submit(
|
||||||
|
State(state): State<WebState>,
|
||||||
|
Form(form): Form<LoginForm>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
if state.token.is_empty() {
|
||||||
|
return Redirect::to("/").into_response();
|
||||||
|
}
|
||||||
|
|
||||||
|
if form.password == *state.token {
|
||||||
|
let hash = token_hash(&state.token);
|
||||||
|
let cookie = format!(
|
||||||
|
"{}={}; HttpOnly; SameSite=Strict; Path=/; Max-Age=604800",
|
||||||
|
SESSION_COOKIE, hash
|
||||||
|
);
|
||||||
|
let redirect_to = form.next.as_deref().unwrap_or("/");
|
||||||
|
let mut headers = HeaderMap::new();
|
||||||
|
headers.insert(header::SET_COOKIE, cookie.parse().unwrap());
|
||||||
|
headers.insert(header::LOCATION, redirect_to.parse().unwrap());
|
||||||
|
(StatusCode::FOUND, headers, Body::empty()).into_response()
|
||||||
|
} else {
|
||||||
|
Html(LOGIN_ERROR_HTML).into_response()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// GET /logout — clear session cookie and redirect to login.
|
||||||
|
pub async fn logout() -> impl IntoResponse {
|
||||||
|
let cookie = format!(
|
||||||
|
"{}=; HttpOnly; SameSite=Strict; Path=/; Expires=Thu, 01 Jan 1970 00:00:00 GMT",
|
||||||
|
SESSION_COOKIE
|
||||||
|
);
|
||||||
|
let mut headers = HeaderMap::new();
|
||||||
|
headers.insert(header::SET_COOKIE, cookie.parse().unwrap());
|
||||||
|
headers.insert(header::LOCATION, "/login".parse().unwrap());
|
||||||
|
(StatusCode::FOUND, headers, Body::empty()).into_response()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn oidc_init(
|
||||||
|
issuer: String,
|
||||||
|
client_id: String,
|
||||||
|
client_secret: String,
|
||||||
|
redirect: String,
|
||||||
|
session_secret_override: Option<String>,
|
||||||
|
) -> anyhow::Result<OidcState> {
|
||||||
|
let provider_metadata = CoreProviderMetadata::discover_async(
|
||||||
|
IssuerUrl::new(issuer)?,
|
||||||
|
async_http_client,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let client = CoreClient::from_provider_metadata(
|
||||||
|
provider_metadata,
|
||||||
|
ClientId::new(client_id),
|
||||||
|
Some(ClientSecret::new(client_secret)),
|
||||||
|
)
|
||||||
|
.set_auth_type(openidconnect::AuthType::RequestBody)
|
||||||
|
.set_redirect_uri(RedirectUrl::new(redirect)?);
|
||||||
|
|
||||||
|
let session_secret = if let Some(s) = session_secret_override {
|
||||||
|
let mut b = s.into_bytes();
|
||||||
|
b.resize(32, 0); // Ensure at least 32 bytes for HMAC-SHA256
|
||||||
|
b
|
||||||
|
} else {
|
||||||
|
let mut b = vec![0u8; 32];
|
||||||
|
rand::thread_rng().fill_bytes(&mut b);
|
||||||
|
b
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(OidcState {
|
||||||
|
client,
|
||||||
|
session_secret,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn oidc_login(
|
||||||
|
State(state): State<WebState>,
|
||||||
|
axum::extract::Query(query): axum::extract::Query<LoginQuery>,
|
||||||
|
req: Request,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let oidc = match &state.oidc {
|
||||||
|
Some(o) => o,
|
||||||
|
None => return Redirect::to("/login").into_response(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let (pkce_challenge, pkce_verifier) = PkceCodeChallenge::new_random_sha256();
|
||||||
|
|
||||||
|
let (auth_url, csrf_token, nonce) = oidc
|
||||||
|
.client
|
||||||
|
.authorize_url(
|
||||||
|
AuthenticationFlow::<CoreResponseType>::AuthorizationCode,
|
||||||
|
CsrfToken::new_random,
|
||||||
|
Nonce::new_random,
|
||||||
|
)
|
||||||
|
.add_scope(Scope::new("openid".to_string()))
|
||||||
|
.add_scope(Scope::new("profile".to_string()))
|
||||||
|
.set_pkce_challenge(pkce_challenge)
|
||||||
|
.url();
|
||||||
|
|
||||||
|
let next_url = query.next.unwrap_or_else(|| "/".to_string());
|
||||||
|
let cookie_val = format!("{}:{}:{}:{}", csrf_token.secret(), nonce.secret(), pkce_verifier.secret(), urlencoding::encode(&next_url));
|
||||||
|
|
||||||
|
// Determine if we are running behind an HTTPS proxy
|
||||||
|
let is_https = req.headers().get("x-forwarded-proto")
|
||||||
|
.and_then(|v| v.to_str().ok())
|
||||||
|
.map(|s| s == "https")
|
||||||
|
.unwrap_or(false);
|
||||||
|
|
||||||
|
// If HTTPS, use SameSite=None + Secure to fully support cross-domain POST redirects.
|
||||||
|
// Otherwise fallback to Lax for local testing.
|
||||||
|
let cookie_attrs = if is_https {
|
||||||
|
"SameSite=None; Secure"
|
||||||
|
} else {
|
||||||
|
"SameSite=Lax"
|
||||||
|
};
|
||||||
|
|
||||||
|
let cookie = format!("furumi_oidc_state={}; HttpOnly; {}; Path=/; Max-Age=3600", cookie_val, cookie_attrs);
|
||||||
|
|
||||||
|
let mut headers = HeaderMap::new();
|
||||||
|
headers.insert(header::SET_COOKIE, cookie.parse().unwrap());
|
||||||
|
headers.insert(header::LOCATION, auth_url.as_str().parse().unwrap());
|
||||||
|
headers.insert(header::CACHE_CONTROL, "no-store, no-cache, must-revalidate".parse().unwrap());
|
||||||
|
|
||||||
|
(StatusCode::FOUND, headers, Body::empty()).into_response()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct AuthCallbackQuery {
|
||||||
|
code: String,
|
||||||
|
state: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn oidc_callback(
|
||||||
|
State(state): State<WebState>,
|
||||||
|
axum::extract::Query(query): axum::extract::Query<AuthCallbackQuery>,
|
||||||
|
req: Request,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let oidc = match &state.oidc {
|
||||||
|
Some(o) => o,
|
||||||
|
None => return Redirect::to("/login").into_response(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let cookies = req
|
||||||
|
.headers()
|
||||||
|
.get(header::COOKIE)
|
||||||
|
.and_then(|v| v.to_str().ok())
|
||||||
|
.unwrap_or("");
|
||||||
|
|
||||||
|
let mut matching_val = None;
|
||||||
|
for c in cookies.split(';') {
|
||||||
|
let c = c.trim();
|
||||||
|
if let Some(val) = c.strip_prefix("furumi_oidc_state=") {
|
||||||
|
let parts: Vec<&str> = val.split(':').collect();
|
||||||
|
if parts.len() >= 3 && parts[0] == query.state {
|
||||||
|
matching_val = Some(val.to_string());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let cookie_val = match matching_val {
|
||||||
|
Some(c) => c,
|
||||||
|
None => {
|
||||||
|
tracing::warn!("OIDC callback failed: Invalid state or missing valid cookie. Received cookies: {}", cookies);
|
||||||
|
return (StatusCode::BAD_REQUEST, "Invalid state").into_response();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let parts: Vec<&str> = cookie_val.split(':').collect();
|
||||||
|
let nonce = Nonce::new(parts[1].to_string());
|
||||||
|
let pkce_verifier = PkceCodeVerifier::new(parts[2].to_string());
|
||||||
|
|
||||||
|
let token_response = oidc
|
||||||
|
.client
|
||||||
|
.exchange_code(AuthorizationCode::new(query.code))
|
||||||
|
.set_pkce_verifier(pkce_verifier)
|
||||||
|
.request_async(async_http_client)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let token_response = match token_response {
|
||||||
|
Ok(tr) => tr,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("OIDC exchange code error: {:?}", e);
|
||||||
|
if let openidconnect::RequestTokenError::ServerResponse(err) = &e {
|
||||||
|
tracing::error!("OIDC Server returned error: {:?}", err);
|
||||||
|
}
|
||||||
|
return (StatusCode::INTERNAL_SERVER_ERROR, format!("OIDC error: {}", e)).into_response();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let id_token = match token_response.id_token() {
|
||||||
|
Some(t) => t,
|
||||||
|
None => return (StatusCode::INTERNAL_SERVER_ERROR, "No ID token").into_response(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let claims = match id_token.claims(&oidc.client.id_token_verifier(), &nonce) {
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(e) => return (StatusCode::UNAUTHORIZED, format!("Invalid ID token: {}", e)).into_response(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let user_id = claims
|
||||||
|
.preferred_username()
|
||||||
|
.map(|u| u.to_string())
|
||||||
|
.or_else(|| claims.email().map(|e| e.to_string()))
|
||||||
|
.unwrap_or_else(|| claims.subject().to_string());
|
||||||
|
|
||||||
|
let session_val = generate_sso_cookie(&oidc.session_secret, &user_id);
|
||||||
|
|
||||||
|
let parts: Vec<&str> = cookie_val.split(':').collect();
|
||||||
|
let redirect_to = parts.get(3)
|
||||||
|
.and_then(|&s| urlencoding::decode(s).ok())
|
||||||
|
.map(|v| v.into_owned())
|
||||||
|
.unwrap_or_else(|| "/".to_string());
|
||||||
|
let redirect_to = if redirect_to.is_empty() { "/".to_string() } else { redirect_to };
|
||||||
|
|
||||||
|
let is_https = req.headers().get("x-forwarded-proto")
|
||||||
|
.and_then(|v| v.to_str().ok())
|
||||||
|
.map(|s| s == "https")
|
||||||
|
.unwrap_or(false);
|
||||||
|
|
||||||
|
let session_attrs = if is_https {
|
||||||
|
"SameSite=Strict; Secure"
|
||||||
|
} else {
|
||||||
|
"SameSite=Strict"
|
||||||
|
};
|
||||||
|
|
||||||
|
let session_cookie = format!("{}={}; HttpOnly; {}; Path=/; Max-Age=604800", SESSION_COOKIE, session_val, session_attrs);
|
||||||
|
let clear_state_cookie = "furumi_oidc_state=; HttpOnly; Path=/; Expires=Thu, 01 Jan 1970 00:00:00 GMT";
|
||||||
|
|
||||||
|
let mut headers = HeaderMap::new();
|
||||||
|
headers.insert(header::SET_COOKIE, session_cookie.parse().unwrap());
|
||||||
|
headers.append(header::SET_COOKIE, clear_state_cookie.parse().unwrap());
|
||||||
|
headers.insert(header::LOCATION, redirect_to.parse().unwrap());
|
||||||
|
|
||||||
|
(StatusCode::FOUND, headers, Body::empty()).into_response()
|
||||||
|
}
|
||||||
|
|
||||||
|
const LOGIN_HTML: &str = r#"<!DOCTYPE html>
|
||||||
|
<html lang="en">
|
||||||
|
<head>
|
||||||
|
<meta charset="UTF-8">
|
||||||
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||||
|
<title>Furumi Player — Login</title>
|
||||||
|
<style>
|
||||||
|
* { box-sizing: border-box; margin: 0; padding: 0; }
|
||||||
|
body {
|
||||||
|
min-height: 100vh;
|
||||||
|
display: flex; align-items: center; justify-content: center;
|
||||||
|
background: #0d0f14;
|
||||||
|
font-family: 'Inter', system-ui, sans-serif;
|
||||||
|
color: #e2e8f0;
|
||||||
|
}
|
||||||
|
.card {
|
||||||
|
background: #161b27;
|
||||||
|
border: 1px solid #2a3347;
|
||||||
|
border-radius: 16px;
|
||||||
|
padding: 2.5rem 3rem;
|
||||||
|
width: 360px;
|
||||||
|
box-shadow: 0 20px 60px rgba(0,0,0,0.5);
|
||||||
|
}
|
||||||
|
.logo { font-size: 1.8rem; font-weight: 700; color: #7c6af7; margin-bottom: 0.25rem; }
|
||||||
|
.subtitle { font-size: 0.85rem; color: #64748b; margin-bottom: 2rem; }
|
||||||
|
label { display: block; font-size: 0.8rem; color: #94a3b8; margin-bottom: 0.4rem; }
|
||||||
|
input[type=password] {
|
||||||
|
width: 100%; padding: 0.6rem 0.8rem;
|
||||||
|
background: #0d0f14; border: 1px solid #2a3347; border-radius: 8px;
|
||||||
|
color: #e2e8f0; font-size: 0.95rem; outline: none;
|
||||||
|
transition: border-color 0.2s;
|
||||||
|
}
|
||||||
|
input[type=password]:focus { border-color: #7c6af7; }
|
||||||
|
button {
|
||||||
|
margin-top: 1.2rem; width: 100%; padding: 0.65rem;
|
||||||
|
background: #7c6af7; border: none; border-radius: 8px;
|
||||||
|
color: #fff; font-size: 0.95rem; font-weight: 600; cursor: pointer;
|
||||||
|
transition: background 0.2s;
|
||||||
|
}
|
||||||
|
button:hover { background: #6b58e8; }
|
||||||
|
.btn-oidc {
|
||||||
|
display: block; width: 100%; padding: 0.65rem; text-align: center;
|
||||||
|
background: #2a3347; border: 1px solid #3d4a66; border-radius: 8px;
|
||||||
|
color: #e2e8f0; font-size: 0.95rem; font-weight: 600; text-decoration: none;
|
||||||
|
transition: background 0.2s;
|
||||||
|
}
|
||||||
|
.btn-oidc:hover { background: #3d4a66; }
|
||||||
|
.divider {
|
||||||
|
display: flex; align-items: center; text-align: center; margin: 1.5rem 0;
|
||||||
|
color: #64748b; font-size: 0.75rem;
|
||||||
|
}
|
||||||
|
.divider::before, .divider::after {
|
||||||
|
content: ''; flex: 1; border-bottom: 1px solid #2a3347;
|
||||||
|
}
|
||||||
|
.divider span { padding: 0 10px; }
|
||||||
|
</style>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<div class="card">
|
||||||
|
<div class="logo">🎵 Furumi</div>
|
||||||
|
<div class="subtitle">Enter access token to continue</div>
|
||||||
|
<form method="POST" action="/login">
|
||||||
|
<!-- NEXT_INPUT_PLACEHOLDER -->
|
||||||
|
<label for="password">Access Token</label>
|
||||||
|
<input type="password" id="password" name="password" autofocus autocomplete="current-password">
|
||||||
|
<button type="submit">Sign In</button>
|
||||||
|
</form>
|
||||||
|
<!-- OIDC_PLACEHOLDER -->
|
||||||
|
</div>
|
||||||
|
</body>
|
||||||
|
</html>"#;
|
||||||
|
|
||||||
|
const LOGIN_ERROR_HTML: &str = r#"<!DOCTYPE html>
|
||||||
|
<html lang="en">
|
||||||
|
<head>
|
||||||
|
<meta charset="UTF-8">
|
||||||
|
<title>Furumi Player — Login</title>
|
||||||
|
<style>
|
||||||
|
* { box-sizing: border-box; margin: 0; padding: 0; }
|
||||||
|
body {
|
||||||
|
min-height: 100vh;
|
||||||
|
display: flex; align-items: center; justify-content: center;
|
||||||
|
background: #0d0f14;
|
||||||
|
font-family: 'Inter', system-ui, sans-serif;
|
||||||
|
color: #e2e8f0;
|
||||||
|
}
|
||||||
|
.card {
|
||||||
|
background: #161b27;
|
||||||
|
border: 1px solid #2a3347;
|
||||||
|
border-radius: 16px;
|
||||||
|
padding: 2.5rem 3rem;
|
||||||
|
width: 360px;
|
||||||
|
box-shadow: 0 20px 60px rgba(0,0,0,0.5);
|
||||||
|
}
|
||||||
|
.logo { font-size: 1.8rem; font-weight: 700; color: #7c6af7; margin-bottom: 0.25rem; }
|
||||||
|
.subtitle { font-size: 0.85rem; color: #64748b; margin-bottom: 2rem; }
|
||||||
|
.error { color: #f87171; font-size: 0.85rem; margin-bottom: 1rem; }
|
||||||
|
label { display: block; font-size: 0.8rem; color: #94a3b8; margin-bottom: 0.4rem; }
|
||||||
|
input[type=password] {
|
||||||
|
width: 100%; padding: 0.6rem 0.8rem;
|
||||||
|
background: #0d0f14; border: 1px solid #f87171; border-radius: 8px;
|
||||||
|
color: #e2e8f0; font-size: 0.95rem; outline: none;
|
||||||
|
}
|
||||||
|
button {
|
||||||
|
margin-top: 1.2rem; width: 100%; padding: 0.65rem;
|
||||||
|
background: #7c6af7; border: none; border-radius: 8px;
|
||||||
|
color: #fff; font-size: 0.95rem; font-weight: 600; cursor: pointer;
|
||||||
|
}
|
||||||
|
button:hover { background: #6b58e8; }
|
||||||
|
</style>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<div class="card">
|
||||||
|
<div class="logo">🎵 Furumi</div>
|
||||||
|
<div class="subtitle">Enter access token to continue</div>
|
||||||
|
<p class="error">❌ Invalid token. Please try again.</p>
|
||||||
|
<form method="POST" action="/login">
|
||||||
|
<!-- NEXT_INPUT_PLACEHOLDER -->
|
||||||
|
<label for="password">Access Token</label>
|
||||||
|
<input type="password" id="password" name="password" autofocus>
|
||||||
|
<button type="submit">Sign In</button>
|
||||||
|
</form>
|
||||||
|
</div>
|
||||||
|
</body>
|
||||||
|
</html>"#;
|
||||||
132
furumi-server/src/web/browse.rs
Normal file
132
furumi-server/src/web/browse.rs
Normal file
@@ -0,0 +1,132 @@
|
|||||||
|
use axum::{
|
||||||
|
extract::{Query, State},
|
||||||
|
http::StatusCode,
|
||||||
|
response::{IntoResponse, Json},
|
||||||
|
};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
use crate::security::sanitize_path;
|
||||||
|
use super::WebState;
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct BrowseQuery {
|
||||||
|
#[serde(default)]
|
||||||
|
pub path: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
pub struct BrowseResponse {
|
||||||
|
pub path: String,
|
||||||
|
pub entries: Vec<Entry>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
pub struct Entry {
|
||||||
|
pub name: String,
|
||||||
|
#[serde(rename = "type")]
|
||||||
|
pub kind: EntryKind,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub size: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
#[serde(rename_all = "lowercase")]
|
||||||
|
pub enum EntryKind {
|
||||||
|
File,
|
||||||
|
Dir,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handler(
|
||||||
|
State(state): State<WebState>,
|
||||||
|
Query(query): Query<BrowseQuery>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let safe = match sanitize_path(&query.path) {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(_) => {
|
||||||
|
return (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "invalid path"}))).into_response();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let dir_path: PathBuf = state.root.join(&safe);
|
||||||
|
|
||||||
|
let read_dir = match tokio::fs::read_dir(&dir_path).await {
|
||||||
|
Ok(rd) => rd,
|
||||||
|
Err(e) => {
|
||||||
|
let status = if e.kind() == std::io::ErrorKind::NotFound {
|
||||||
|
StatusCode::NOT_FOUND
|
||||||
|
} else {
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR
|
||||||
|
};
|
||||||
|
return (status, Json(serde_json::json!({"error": e.to_string()}))).into_response();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut entries: Vec<Entry> = Vec::new();
|
||||||
|
let mut rd = read_dir;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match rd.next_entry().await {
|
||||||
|
Ok(Some(entry)) => {
|
||||||
|
let name = entry.file_name().to_string_lossy().into_owned();
|
||||||
|
// Skip hidden files
|
||||||
|
if name.starts_with('.') {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let meta = match entry.metadata().await {
|
||||||
|
Ok(m) => m,
|
||||||
|
Err(_) => continue,
|
||||||
|
};
|
||||||
|
if meta.is_dir() {
|
||||||
|
entries.push(Entry { name, kind: EntryKind::Dir, size: None });
|
||||||
|
} else if meta.is_file() {
|
||||||
|
// Only expose audio files
|
||||||
|
if is_audio_file(&name) {
|
||||||
|
entries.push(Entry {
|
||||||
|
name,
|
||||||
|
kind: EntryKind::File,
|
||||||
|
size: Some(meta.len()),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => break,
|
||||||
|
Err(e) => {
|
||||||
|
return (
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
Json(serde_json::json!({"error": e.to_string()})),
|
||||||
|
)
|
||||||
|
.into_response();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort: dirs first, then files; alphabetically within each group
|
||||||
|
entries.sort_by(|a, b| {
|
||||||
|
let a_dir = matches!(a.kind, EntryKind::Dir);
|
||||||
|
let b_dir = matches!(b.kind, EntryKind::Dir);
|
||||||
|
b_dir.cmp(&a_dir).then(a.name.to_lowercase().cmp(&b.name.to_lowercase()))
|
||||||
|
});
|
||||||
|
|
||||||
|
let response = BrowseResponse {
|
||||||
|
path: safe,
|
||||||
|
entries,
|
||||||
|
};
|
||||||
|
|
||||||
|
(StatusCode::OK, Json(response)).into_response()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Whitelist of audio extensions served via the web player.
|
||||||
|
pub fn is_audio_file(name: &str) -> bool {
|
||||||
|
let ext = name.rsplit('.').next().unwrap_or("").to_lowercase();
|
||||||
|
matches!(
|
||||||
|
ext.as_str(),
|
||||||
|
"mp3" | "flac" | "ogg" | "opus" | "aac" | "m4a" | "wav" | "ape" | "wv" | "wma" | "tta" | "aiff" | "aif"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if the format needs transcoding (not natively supported by browsers).
|
||||||
|
pub fn needs_transcode(name: &str) -> bool {
|
||||||
|
let ext = name.rsplit('.').next().unwrap_or("").to_lowercase();
|
||||||
|
matches!(ext.as_str(), "ape" | "wv" | "wma" | "tta" | "aiff" | "aif")
|
||||||
|
}
|
||||||
204
furumi-server/src/web/meta.rs
Normal file
204
furumi-server/src/web/meta.rs
Normal file
@@ -0,0 +1,204 @@
|
|||||||
|
use axum::{
|
||||||
|
extract::{Path, State},
|
||||||
|
http::StatusCode,
|
||||||
|
response::{IntoResponse, Json},
|
||||||
|
};
|
||||||
|
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
|
||||||
|
use serde::Serialize;
|
||||||
|
use symphonia::core::{
|
||||||
|
codecs::CODEC_TYPE_NULL,
|
||||||
|
formats::FormatOptions,
|
||||||
|
io::MediaSourceStream,
|
||||||
|
meta::{MetadataOptions, StandardTagKey},
|
||||||
|
probe::Hint,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::security::sanitize_path;
|
||||||
|
use super::WebState;
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
pub struct MetaResponse {
|
||||||
|
pub title: Option<String>,
|
||||||
|
pub artist: Option<String>,
|
||||||
|
pub album: Option<String>,
|
||||||
|
pub track: Option<u32>,
|
||||||
|
pub year: Option<u32>,
|
||||||
|
pub duration_secs: Option<f64>,
|
||||||
|
pub cover_base64: Option<String>, // "data:image/jpeg;base64,..."
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handler(
|
||||||
|
State(state): State<WebState>,
|
||||||
|
Path(path): Path<String>,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let safe = match sanitize_path(&path) {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(_) => return (StatusCode::BAD_REQUEST, Json(serde_json::json!({"error": "invalid path"}))).into_response(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let file_path = state.root.join(&safe);
|
||||||
|
let filename = file_path
|
||||||
|
.file_name()
|
||||||
|
.and_then(|n| n.to_str())
|
||||||
|
.unwrap_or("")
|
||||||
|
.to_owned();
|
||||||
|
|
||||||
|
let meta = tokio::task::spawn_blocking(move || read_meta(file_path, &filename)).await;
|
||||||
|
|
||||||
|
match meta {
|
||||||
|
Ok(Ok(m)) => (StatusCode::OK, Json(m)).into_response(),
|
||||||
|
Ok(Err(e)) => (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))).into_response(),
|
||||||
|
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({"error": e.to_string()}))).into_response(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_meta(file_path: std::path::PathBuf, filename: &str) -> anyhow::Result<MetaResponse> {
|
||||||
|
let file = std::fs::File::open(&file_path)?;
|
||||||
|
let mss = MediaSourceStream::new(Box::new(file), Default::default());
|
||||||
|
|
||||||
|
let mut hint = Hint::new();
|
||||||
|
if let Some(ext) = file_path.extension().and_then(|e| e.to_str()) {
|
||||||
|
hint.with_extension(ext);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut probed = symphonia::default::get_probe().format(
|
||||||
|
&hint,
|
||||||
|
mss,
|
||||||
|
&FormatOptions { enable_gapless: false, ..Default::default() },
|
||||||
|
&MetadataOptions::default(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// Extract tags from container-level metadata
|
||||||
|
let mut title: Option<String> = None;
|
||||||
|
let mut artist: Option<String> = None;
|
||||||
|
let mut album: Option<String> = None;
|
||||||
|
let mut track: Option<u32> = None;
|
||||||
|
let mut year: Option<u32> = None;
|
||||||
|
let mut cover_data: Option<(Vec<u8>, String)> = None;
|
||||||
|
|
||||||
|
// Check metadata side-data (e.g., ID3 tags probed before format)
|
||||||
|
if let Some(rev) = probed.metadata.get().as_ref().and_then(|m| m.current()) {
|
||||||
|
extract_tags(rev.tags(), rev.visuals(), &mut title, &mut artist, &mut album, &mut track, &mut year, &mut cover_data);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Also check format-embedded metadata
|
||||||
|
if let Some(rev) = probed.format.metadata().current() {
|
||||||
|
if title.is_none() {
|
||||||
|
extract_tags(rev.tags(), rev.visuals(), &mut title, &mut artist, &mut album, &mut track, &mut year, &mut cover_data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no title from tags, use filename without extension
|
||||||
|
if title.is_none() {
|
||||||
|
title = Some(
|
||||||
|
std::path::Path::new(filename)
|
||||||
|
.file_stem()
|
||||||
|
.and_then(|s| s.to_str())
|
||||||
|
.unwrap_or(filename)
|
||||||
|
.to_owned(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Estimate duration from track time_base + n_frames
|
||||||
|
let duration_secs = probed
|
||||||
|
.format
|
||||||
|
.tracks()
|
||||||
|
.iter()
|
||||||
|
.find(|t| t.codec_params.codec != CODEC_TYPE_NULL)
|
||||||
|
.and_then(|t| {
|
||||||
|
let n_frames = t.codec_params.n_frames?;
|
||||||
|
let tb = t.codec_params.time_base?;
|
||||||
|
Some(n_frames as f64 * tb.numer as f64 / tb.denom as f64)
|
||||||
|
});
|
||||||
|
|
||||||
|
let cover_base64 = cover_data.map(|(data, mime)| {
|
||||||
|
format!("data:{};base64,{}", mime, BASE64.encode(&data))
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(MetaResponse {
|
||||||
|
title,
|
||||||
|
artist,
|
||||||
|
album,
|
||||||
|
track,
|
||||||
|
year,
|
||||||
|
duration_secs,
|
||||||
|
cover_base64,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn extract_tags(
|
||||||
|
tags: &[symphonia::core::meta::Tag],
|
||||||
|
visuals: &[symphonia::core::meta::Visual],
|
||||||
|
title: &mut Option<String>,
|
||||||
|
artist: &mut Option<String>,
|
||||||
|
album: &mut Option<String>,
|
||||||
|
track: &mut Option<u32>,
|
||||||
|
year: &mut Option<u32>,
|
||||||
|
cover: &mut Option<(Vec<u8>, String)>,
|
||||||
|
) {
|
||||||
|
for tag in tags {
|
||||||
|
let value = fix_encoding(tag.value.to_string());
|
||||||
|
if let Some(key) = tag.std_key {
|
||||||
|
match key {
|
||||||
|
StandardTagKey::TrackTitle => {
|
||||||
|
*title = Some(value);
|
||||||
|
}
|
||||||
|
StandardTagKey::Artist | StandardTagKey::Performer => {
|
||||||
|
if artist.is_none() {
|
||||||
|
*artist = Some(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
StandardTagKey::Album => {
|
||||||
|
*album = Some(value);
|
||||||
|
}
|
||||||
|
StandardTagKey::TrackNumber => {
|
||||||
|
if track.is_none() {
|
||||||
|
*track = value.parse().ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
StandardTagKey::Date | StandardTagKey::OriginalDate => {
|
||||||
|
if year.is_none() {
|
||||||
|
// Parse first 4 characters as year
|
||||||
|
*year = value[..4.min(value.len())].parse().ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if cover.is_none() {
|
||||||
|
if let Some(visual) = visuals.first() {
|
||||||
|
let mime = visual.media_type.clone();
|
||||||
|
*cover = Some((visual.data.to_vec(), mime));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Heuristic to fix mojibake (CP1251 bytes interpreted as Latin-1/Windows-1252)
|
||||||
|
fn fix_encoding(s: String) -> String {
|
||||||
|
// If it's already a valid UTF-8 string that doesn't look like mojibake, return it.
|
||||||
|
// Mojibake looks like characters from Latin-1 Supplement (0xC0-0xFF)
|
||||||
|
// where they should be Cyrillic.
|
||||||
|
|
||||||
|
let bytes: Vec<u8> = s.chars().map(|c| c as u32).filter(|&c| c <= 255).map(|c| c as u8).collect();
|
||||||
|
|
||||||
|
// If the length is different, it means there were characters > 255, so it's not simple Latin-1 mojibake.
|
||||||
|
if bytes.len() != s.chars().count() {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if it's likely CP1251. Russian characters in CP1251 are 0xC0-0xFF.
|
||||||
|
// In Latin-1 these are characters like À-ÿ.
|
||||||
|
let has_mojibake = bytes.iter().any(|&b| b >= 0xC0);
|
||||||
|
if !has_mojibake {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (decoded, _, errors) = encoding_rs::WINDOWS_1251.decode(&bytes);
|
||||||
|
if errors {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
decoded.into_owned()
|
||||||
|
}
|
||||||
66
furumi-server/src/web/mod.rs
Normal file
66
furumi-server/src/web/mod.rs
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
pub mod auth;
|
||||||
|
pub mod browse;
|
||||||
|
pub mod meta;
|
||||||
|
pub mod stream;
|
||||||
|
pub mod transcoder;
|
||||||
|
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use axum::{
|
||||||
|
Router,
|
||||||
|
middleware,
|
||||||
|
routing::get,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Shared state passed to all web handlers.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct WebState {
|
||||||
|
pub root: Arc<PathBuf>,
|
||||||
|
pub token: Arc<String>,
|
||||||
|
pub oidc: Option<Arc<OidcState>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct OidcState {
|
||||||
|
pub client: openidconnect::core::CoreClient,
|
||||||
|
pub session_secret: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build the axum Router for the web player.
|
||||||
|
pub fn build_router(root: PathBuf, token: String, oidc: Option<Arc<OidcState>>) -> Router {
|
||||||
|
let state = WebState {
|
||||||
|
root: Arc::new(root),
|
||||||
|
token: Arc::new(token),
|
||||||
|
oidc,
|
||||||
|
};
|
||||||
|
|
||||||
|
let api = Router::new()
|
||||||
|
.route("/browse", get(browse::handler))
|
||||||
|
.route("/stream/*path", get(stream::handler))
|
||||||
|
.route("/meta/*path", get(meta::handler));
|
||||||
|
|
||||||
|
let authed_routes = Router::new()
|
||||||
|
.route("/", get(player_html))
|
||||||
|
.nest("/api", api)
|
||||||
|
.route_layer(middleware::from_fn_with_state(state.clone(), auth::require_auth));
|
||||||
|
|
||||||
|
Router::new()
|
||||||
|
.route("/login", get(auth::login_page).post(auth::login_submit))
|
||||||
|
.route("/logout", get(auth::logout))
|
||||||
|
.route("/auth/login", get(auth::oidc_login))
|
||||||
|
.route("/auth/callback", get(auth::oidc_callback))
|
||||||
|
.merge(authed_routes)
|
||||||
|
.with_state(state)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct AuthUserInfo(pub String);
|
||||||
|
|
||||||
|
async fn player_html(
|
||||||
|
axum::extract::Extension(user_info): axum::extract::Extension<AuthUserInfo>,
|
||||||
|
) -> axum::response::Html<String> {
|
||||||
|
let html = include_str!("player.html")
|
||||||
|
.replace("<!-- USERNAME_PLACEHOLDER -->", &user_info.0)
|
||||||
|
.replace("<!-- VERSION_PLACEHOLDER -->", env!("CARGO_PKG_VERSION"));
|
||||||
|
axum::response::Html(html)
|
||||||
|
}
|
||||||
1070
furumi-server/src/web/player.html
Normal file
1070
furumi-server/src/web/player.html
Normal file
File diff suppressed because it is too large
Load Diff
171
furumi-server/src/web/stream.rs
Normal file
171
furumi-server/src/web/stream.rs
Normal file
@@ -0,0 +1,171 @@
|
|||||||
|
use axum::{
|
||||||
|
body::Body,
|
||||||
|
extract::{Path, Query, State},
|
||||||
|
http::{HeaderMap, HeaderValue, StatusCode, header},
|
||||||
|
response::{IntoResponse, Response},
|
||||||
|
};
|
||||||
|
use serde::Deserialize;
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
||||||
|
|
||||||
|
use crate::security::sanitize_path;
|
||||||
|
use super::{
|
||||||
|
WebState,
|
||||||
|
browse::{is_audio_file, needs_transcode},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct StreamQuery {
|
||||||
|
#[serde(default)]
|
||||||
|
pub transcode: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handler(
|
||||||
|
State(state): State<WebState>,
|
||||||
|
Path(path): Path<String>,
|
||||||
|
Query(query): Query<StreamQuery>,
|
||||||
|
headers: HeaderMap,
|
||||||
|
) -> impl IntoResponse {
|
||||||
|
let safe = match sanitize_path(&path) {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(_) => return bad_request("invalid path"),
|
||||||
|
};
|
||||||
|
|
||||||
|
let file_path = state.root.join(&safe);
|
||||||
|
|
||||||
|
let filename = file_path
|
||||||
|
.file_name()
|
||||||
|
.and_then(|n| n.to_str())
|
||||||
|
.unwrap_or("")
|
||||||
|
.to_owned();
|
||||||
|
|
||||||
|
if !is_audio_file(&filename) {
|
||||||
|
return (StatusCode::FORBIDDEN, "not an audio file").into_response();
|
||||||
|
}
|
||||||
|
|
||||||
|
let force_transcode = query.transcode.as_deref() == Some("1");
|
||||||
|
|
||||||
|
if force_transcode || needs_transcode(&filename) {
|
||||||
|
return stream_transcoded(file_path).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
stream_native(file_path, &filename, &headers).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stream a file as-is with Range support.
|
||||||
|
async fn stream_native(file_path: std::path::PathBuf, filename: &str, req_headers: &HeaderMap) -> Response {
|
||||||
|
let mut file = match tokio::fs::File::open(&file_path).await {
|
||||||
|
Ok(f) => f,
|
||||||
|
Err(e) => {
|
||||||
|
let status = if e.kind() == std::io::ErrorKind::NotFound {
|
||||||
|
StatusCode::NOT_FOUND
|
||||||
|
} else {
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR
|
||||||
|
};
|
||||||
|
return (status, e.to_string()).into_response();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let file_size = match file.metadata().await {
|
||||||
|
Ok(m) => m.len(),
|
||||||
|
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let content_type = guess_content_type(filename);
|
||||||
|
|
||||||
|
// Parse Range header
|
||||||
|
let range_header = req_headers
|
||||||
|
.get(header::RANGE)
|
||||||
|
.and_then(|v| v.to_str().ok())
|
||||||
|
.and_then(parse_range);
|
||||||
|
|
||||||
|
if let Some((start, end)) = range_header {
|
||||||
|
let end = end.unwrap_or(file_size - 1).min(file_size - 1);
|
||||||
|
if start > end || start >= file_size {
|
||||||
|
return (StatusCode::RANGE_NOT_SATISFIABLE, "invalid range").into_response();
|
||||||
|
}
|
||||||
|
|
||||||
|
let length = end - start + 1;
|
||||||
|
|
||||||
|
if let Err(e) = file.seek(std::io::SeekFrom::Start(start)).await {
|
||||||
|
return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
|
||||||
|
}
|
||||||
|
|
||||||
|
let limited = file.take(length);
|
||||||
|
let stream = tokio_util::io::ReaderStream::new(limited);
|
||||||
|
let body = Body::from_stream(stream);
|
||||||
|
|
||||||
|
let mut resp_headers = HeaderMap::new();
|
||||||
|
resp_headers.insert(header::CONTENT_TYPE, content_type.parse().unwrap());
|
||||||
|
resp_headers.insert(header::ACCEPT_RANGES, HeaderValue::from_static("bytes"));
|
||||||
|
resp_headers.insert(header::CONTENT_LENGTH, length.to_string().parse().unwrap());
|
||||||
|
resp_headers.insert(
|
||||||
|
header::CONTENT_RANGE,
|
||||||
|
format!("bytes {}-{}/{}", start, end, file_size).parse().unwrap(),
|
||||||
|
);
|
||||||
|
(StatusCode::PARTIAL_CONTENT, resp_headers, body).into_response()
|
||||||
|
} else {
|
||||||
|
// Full file
|
||||||
|
let stream = tokio_util::io::ReaderStream::new(file);
|
||||||
|
let body = Body::from_stream(stream);
|
||||||
|
|
||||||
|
let mut resp_headers = HeaderMap::new();
|
||||||
|
resp_headers.insert(header::CONTENT_TYPE, content_type.parse().unwrap());
|
||||||
|
resp_headers.insert(header::ACCEPT_RANGES, HeaderValue::from_static("bytes"));
|
||||||
|
resp_headers.insert(header::CONTENT_LENGTH, file_size.to_string().parse().unwrap());
|
||||||
|
(StatusCode::OK, resp_headers, body).into_response()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stream a transcoded (Ogg/Opus) version of the file.
|
||||||
|
async fn stream_transcoded(file_path: std::path::PathBuf) -> Response {
|
||||||
|
let ogg_data = match tokio::task::spawn_blocking(move || {
|
||||||
|
super::transcoder::transcode_to_ogg_opus(file_path)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(Ok(data)) => data,
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let len = ogg_data.len();
|
||||||
|
let mut resp_headers = HeaderMap::new();
|
||||||
|
resp_headers.insert(header::CONTENT_TYPE, "audio/ogg".parse().unwrap());
|
||||||
|
resp_headers.insert(header::CONTENT_LENGTH, len.to_string().parse().unwrap());
|
||||||
|
resp_headers.insert(header::ACCEPT_RANGES, HeaderValue::from_static("none"));
|
||||||
|
|
||||||
|
(StatusCode::OK, resp_headers, Body::from(ogg_data)).into_response()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse `Range: bytes=<start>-<end>` header.
|
||||||
|
fn parse_range(s: &str) -> Option<(u64, Option<u64>)> {
|
||||||
|
let s = s.strip_prefix("bytes=")?;
|
||||||
|
let mut parts = s.splitn(2, '-');
|
||||||
|
let start: u64 = parts.next()?.parse().ok()?;
|
||||||
|
let end: Option<u64> = parts.next().and_then(|e| {
|
||||||
|
if e.is_empty() { None } else { e.parse().ok() }
|
||||||
|
});
|
||||||
|
Some((start, end))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn guess_content_type(filename: &str) -> &'static str {
|
||||||
|
let ext = filename.rsplit('.').next().unwrap_or("").to_lowercase();
|
||||||
|
match ext.as_str() {
|
||||||
|
"mp3" => "audio/mpeg",
|
||||||
|
"flac" => "audio/flac",
|
||||||
|
"ogg" => "audio/ogg",
|
||||||
|
"opus" => "audio/ogg; codecs=opus",
|
||||||
|
"aac" => "audio/aac",
|
||||||
|
"m4a" => "audio/mp4",
|
||||||
|
"wav" => "audio/wav",
|
||||||
|
_ => "application/octet-stream",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn bad_request(msg: &'static str) -> Response {
|
||||||
|
(StatusCode::BAD_REQUEST, msg).into_response()
|
||||||
|
}
|
||||||
244
furumi-server/src/web/transcoder.rs
Normal file
244
furumi-server/src/web/transcoder.rs
Normal file
@@ -0,0 +1,244 @@
|
|||||||
|
//! Symphonia-based audio transcoder: decodes any format → encodes to Ogg/Opus stream.
|
||||||
|
//!
|
||||||
|
//! The heavy decode/encode work runs in a `spawn_blocking` thread.
|
||||||
|
//! PCM samples are sent over a channel to the async stream handler.
|
||||||
|
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::io::Cursor;
|
||||||
|
|
||||||
|
use anyhow::{anyhow, Result};
|
||||||
|
use symphonia::core::{
|
||||||
|
audio::{AudioBufferRef, Signal},
|
||||||
|
codecs::{DecoderOptions, CODEC_TYPE_NULL},
|
||||||
|
errors::Error as SymphoniaError,
|
||||||
|
formats::FormatOptions,
|
||||||
|
io::MediaSourceStream,
|
||||||
|
meta::MetadataOptions,
|
||||||
|
probe::Hint,
|
||||||
|
};
|
||||||
|
use ogg::writing::PacketWriter;
|
||||||
|
use opus::{Application, Channels, Encoder};
|
||||||
|
|
||||||
|
/// Transcode an audio file at `path` into an Ogg/Opus byte stream.
|
||||||
|
/// Returns `Vec<u8>` with the full Ogg/Opus file (suitable for streaming/download).
|
||||||
|
///
|
||||||
|
/// This is intentionally synchronous (for use inside `spawn_blocking`).
|
||||||
|
pub fn transcode_to_ogg_opus(path: PathBuf) -> Result<Vec<u8>> {
|
||||||
|
// ---- Open and probe the source ----
|
||||||
|
let file = std::fs::File::open(&path)?;
|
||||||
|
let mss = MediaSourceStream::new(Box::new(file), Default::default());
|
||||||
|
|
||||||
|
let mut hint = Hint::new();
|
||||||
|
if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
|
||||||
|
hint.with_extension(ext);
|
||||||
|
}
|
||||||
|
|
||||||
|
let probed = symphonia::default::get_probe()
|
||||||
|
.format(&hint, mss, &FormatOptions::default(), &MetadataOptions::default())
|
||||||
|
.map_err(|e| anyhow!("probe failed: {e}"))?;
|
||||||
|
|
||||||
|
let mut format = probed.format;
|
||||||
|
|
||||||
|
// Find the default audio track
|
||||||
|
let track = format
|
||||||
|
.tracks()
|
||||||
|
.iter()
|
||||||
|
.find(|t| t.codec_params.codec != CODEC_TYPE_NULL)
|
||||||
|
.ok_or_else(|| anyhow!("no audio track found"))?
|
||||||
|
.clone();
|
||||||
|
|
||||||
|
let track_id = track.id;
|
||||||
|
let codec_params = &track.codec_params;
|
||||||
|
|
||||||
|
let sample_rate = codec_params.sample_rate.unwrap_or(44100);
|
||||||
|
let n_channels = codec_params.channels.map(|c| c.count()).unwrap_or(2);
|
||||||
|
|
||||||
|
// Opus only supports 1 or 2 channels; downmix to stereo if needed
|
||||||
|
let opus_channels = if n_channels == 1 { Channels::Mono } else { Channels::Stereo };
|
||||||
|
let opus_ch_count = if n_channels == 1 { 1usize } else { 2 };
|
||||||
|
|
||||||
|
// Opus encoder (target 48 kHz, we'll resample if needed)
|
||||||
|
// Opus natively works at 48000 Hz; symphonia will decode at source rate.
|
||||||
|
// For simplicity, we encode at the source sample rate - most clients handle this.
|
||||||
|
let opus_sample_rate = if [8000u32, 12000, 16000, 24000, 48000].contains(&sample_rate) {
|
||||||
|
sample_rate
|
||||||
|
} else {
|
||||||
|
// Opus spec: use closest supported rate; 48000 is safest
|
||||||
|
48000
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut encoder = Encoder::new(opus_sample_rate, opus_channels, Application::Audio)
|
||||||
|
.map_err(|e| anyhow!("opus encoder init: {e}"))?;
|
||||||
|
|
||||||
|
// Typical Opus frame = 20ms
|
||||||
|
let frame_size = (opus_sample_rate as usize * 20) / 1000; // samples per channel per frame
|
||||||
|
|
||||||
|
let mut decoder = symphonia::default::get_codecs()
|
||||||
|
.make(codec_params, &DecoderOptions::default())
|
||||||
|
.map_err(|e| anyhow!("decoder init: {e}"))?;
|
||||||
|
|
||||||
|
// ---- Ogg output buffer ----
|
||||||
|
let mut ogg_buf: Vec<u8> = Vec::with_capacity(4 * 1024 * 1024);
|
||||||
|
{
|
||||||
|
let cursor = Cursor::new(&mut ogg_buf);
|
||||||
|
let mut pkt_writer = PacketWriter::new(cursor);
|
||||||
|
|
||||||
|
// Write Opus header packet (stream serial = 1)
|
||||||
|
let serial: u32 = 1;
|
||||||
|
let opus_head = build_opus_head(opus_ch_count as u8, opus_sample_rate, 0);
|
||||||
|
pkt_writer.write_packet(opus_head, serial, ogg::writing::PacketWriteEndInfo::EndPage, 0)?;
|
||||||
|
|
||||||
|
// Write Opus tags packet (empty)
|
||||||
|
let opus_tags = build_opus_tags();
|
||||||
|
pkt_writer.write_packet(opus_tags, serial, ogg::writing::PacketWriteEndInfo::EndPage, 0)?;
|
||||||
|
|
||||||
|
let mut sample_buf: Vec<f32> = Vec::new();
|
||||||
|
let mut granule_pos: u64 = 0;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let packet = match format.next_packet() {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(SymphoniaError::IoError(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
|
||||||
|
Err(SymphoniaError::ResetRequired) => {
|
||||||
|
decoder.reset();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Err(e) => return Err(anyhow!("format error: {e}")),
|
||||||
|
};
|
||||||
|
|
||||||
|
if packet.track_id() != track_id {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
match decoder.decode(&packet) {
|
||||||
|
Ok(decoded) => {
|
||||||
|
collect_samples(&decoded, opus_ch_count, &mut sample_buf);
|
||||||
|
}
|
||||||
|
Err(SymphoniaError::DecodeError(_)) => continue,
|
||||||
|
Err(e) => return Err(anyhow!("decode error: {e}")),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encode complete frames from sample_buf
|
||||||
|
while sample_buf.len() >= frame_size * opus_ch_count {
|
||||||
|
let frame: Vec<f32> = sample_buf.drain(..frame_size * opus_ch_count).collect();
|
||||||
|
let mut out = vec![0u8; 4000];
|
||||||
|
let encoded_len = encoder
|
||||||
|
.encode_float(&frame, &mut out)
|
||||||
|
.map_err(|e| anyhow!("opus encode: {e}"))?;
|
||||||
|
out.truncate(encoded_len);
|
||||||
|
|
||||||
|
granule_pos += frame_size as u64;
|
||||||
|
pkt_writer.write_packet(
|
||||||
|
out,
|
||||||
|
serial,
|
||||||
|
ogg::writing::PacketWriteEndInfo::NormalPacket,
|
||||||
|
granule_pos,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Encode remaining samples (partial frame — pad with silence)
|
||||||
|
if !sample_buf.is_empty() {
|
||||||
|
let needed = frame_size * opus_ch_count;
|
||||||
|
sample_buf.resize(needed, 0.0);
|
||||||
|
let mut out = vec![0u8; 4000];
|
||||||
|
let encoded_len = encoder
|
||||||
|
.encode_float(&sample_buf, &mut out)
|
||||||
|
.map_err(|e| anyhow!("opus encode final: {e}"))?;
|
||||||
|
out.truncate(encoded_len);
|
||||||
|
granule_pos += frame_size as u64;
|
||||||
|
pkt_writer.write_packet(
|
||||||
|
out,
|
||||||
|
serial,
|
||||||
|
ogg::writing::PacketWriteEndInfo::EndStream,
|
||||||
|
granule_pos,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(ogg_buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Collect PCM samples from a symphonia AudioBufferRef into a flat f32 vec.
|
||||||
|
/// Downmixes to `target_channels` (1 or 2) if source has more channels.
|
||||||
|
fn collect_samples(decoded: &AudioBufferRef<'_>, target_channels: usize, out: &mut Vec<f32>) {
|
||||||
|
match decoded {
|
||||||
|
AudioBufferRef::F32(buf) => {
|
||||||
|
interleave_channels(buf.chan(0), if buf.spec().channels.count() > 1 { Some(buf.chan(1)) } else { None }, target_channels, out);
|
||||||
|
}
|
||||||
|
AudioBufferRef::S16(buf) => {
|
||||||
|
let ch0: Vec<f32> = buf.chan(0).iter().map(|&s| s as f32 / 32768.0).collect();
|
||||||
|
let ch1 = if buf.spec().channels.count() > 1 {
|
||||||
|
Some(buf.chan(1).iter().map(|&s| s as f32 / 32768.0).collect::<Vec<_>>())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
interleave_channels(&ch0, ch1.as_deref(), target_channels, out);
|
||||||
|
}
|
||||||
|
AudioBufferRef::S32(buf) => {
|
||||||
|
let ch0: Vec<f32> = buf.chan(0).iter().map(|&s| s as f32 / 2147483648.0).collect();
|
||||||
|
let ch1 = if buf.spec().channels.count() > 1 {
|
||||||
|
Some(buf.chan(1).iter().map(|&s| s as f32 / 2147483648.0).collect::<Vec<_>>())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
interleave_channels(&ch0, ch1.as_deref(), target_channels, out);
|
||||||
|
}
|
||||||
|
AudioBufferRef::U8(buf) => {
|
||||||
|
let ch0: Vec<f32> = buf.chan(0).iter().map(|&s| (s as f32 - 128.0) / 128.0).collect();
|
||||||
|
let ch1 = if buf.spec().channels.count() > 1 {
|
||||||
|
Some(buf.chan(1).iter().map(|&s| (s as f32 - 128.0) / 128.0).collect::<Vec<_>>())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
interleave_channels(&ch0, ch1.as_deref(), target_channels, out);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
// For other formats, try to get samples via S16 conversion
|
||||||
|
// (symphonia may provide other types; we skip unsupported ones)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn interleave_channels(ch0: &[f32], ch1: Option<&[f32]>, target_channels: usize, out: &mut Vec<f32>) {
|
||||||
|
let len = ch0.len();
|
||||||
|
if target_channels == 1 {
|
||||||
|
if let Some(c1) = ch1 {
|
||||||
|
// Mix down to mono
|
||||||
|
out.extend(ch0.iter().zip(c1.iter()).map(|(l, r)| (l + r) * 0.5));
|
||||||
|
} else {
|
||||||
|
out.extend_from_slice(ch0);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Stereo interleaved
|
||||||
|
let c1 = ch1.unwrap_or(ch0);
|
||||||
|
for i in 0..len {
|
||||||
|
out.push(ch0[i]);
|
||||||
|
out.push(c1[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build OpusHead binary packet (RFC 7845).
|
||||||
|
fn build_opus_head(channels: u8, sample_rate: u32, pre_skip: u16) -> Vec<u8> {
|
||||||
|
let mut v = Vec::with_capacity(19);
|
||||||
|
v.extend_from_slice(b"OpusHead");
|
||||||
|
v.push(1); // version
|
||||||
|
v.push(channels);
|
||||||
|
v.extend_from_slice(&pre_skip.to_le_bytes());
|
||||||
|
v.extend_from_slice(&sample_rate.to_le_bytes());
|
||||||
|
v.extend_from_slice(&0u16.to_le_bytes()); // output gain
|
||||||
|
v.push(0); // channel mapping family
|
||||||
|
v
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build OpusTags binary packet with minimal vendor string.
|
||||||
|
fn build_opus_tags() -> Vec<u8> {
|
||||||
|
let vendor = b"furumi-server";
|
||||||
|
let mut v = Vec::new();
|
||||||
|
v.extend_from_slice(b"OpusTags");
|
||||||
|
v.extend_from_slice(&(vendor.len() as u32).to_le_bytes());
|
||||||
|
v.extend_from_slice(vendor);
|
||||||
|
v.extend_from_slice(&0u32.to_le_bytes()); // user comment list length = 0
|
||||||
|
v
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user