//! Symphonia-based audio transcoder: decodes any format → encodes to Ogg/Opus stream. //! //! The heavy decode/encode work runs in a `spawn_blocking` thread. //! PCM samples are sent over a channel to the async stream handler. use std::path::PathBuf; use std::io::Cursor; use anyhow::{anyhow, Result}; use symphonia::core::{ audio::{AudioBufferRef, Signal}, codecs::{DecoderOptions, CODEC_TYPE_NULL}, errors::Error as SymphoniaError, formats::FormatOptions, io::MediaSourceStream, meta::MetadataOptions, probe::Hint, }; use ogg::writing::PacketWriter; use opus::{Application, Channels, Encoder}; /// Transcode an audio file at `path` into an Ogg/Opus byte stream. /// Returns `Vec` with the full Ogg/Opus file (suitable for streaming/download). /// /// This is intentionally synchronous (for use inside `spawn_blocking`). pub fn transcode_to_ogg_opus(path: PathBuf) -> Result> { // ---- Open and probe the source ---- let file = std::fs::File::open(&path)?; let mss = MediaSourceStream::new(Box::new(file), Default::default()); let mut hint = Hint::new(); if let Some(ext) = path.extension().and_then(|e| e.to_str()) { hint.with_extension(ext); } let probed = symphonia::default::get_probe() .format(&hint, mss, &FormatOptions::default(), &MetadataOptions::default()) .map_err(|e| anyhow!("probe failed: {e}"))?; let mut format = probed.format; // Find the default audio track let track = format .tracks() .iter() .find(|t| t.codec_params.codec != CODEC_TYPE_NULL) .ok_or_else(|| anyhow!("no audio track found"))? .clone(); let track_id = track.id; let codec_params = &track.codec_params; let sample_rate = codec_params.sample_rate.unwrap_or(44100); let n_channels = codec_params.channels.map(|c| c.count()).unwrap_or(2); // Opus only supports 1 or 2 channels; downmix to stereo if needed let opus_channels = if n_channels == 1 { Channels::Mono } else { Channels::Stereo }; let opus_ch_count = if n_channels == 1 { 1usize } else { 2 }; // Opus encoder (target 48 kHz, we'll resample if needed) // Opus natively works at 48000 Hz; symphonia will decode at source rate. // For simplicity, we encode at the source sample rate - most clients handle this. let opus_sample_rate = if [8000u32, 12000, 16000, 24000, 48000].contains(&sample_rate) { sample_rate } else { // Opus spec: use closest supported rate; 48000 is safest 48000 }; let mut encoder = Encoder::new(opus_sample_rate, opus_channels, Application::Audio) .map_err(|e| anyhow!("opus encoder init: {e}"))?; // Typical Opus frame = 20ms let frame_size = (opus_sample_rate as usize * 20) / 1000; // samples per channel per frame let mut decoder = symphonia::default::get_codecs() .make(codec_params, &DecoderOptions::default()) .map_err(|e| anyhow!("decoder init: {e}"))?; // ---- Ogg output buffer ---- let mut ogg_buf: Vec = Vec::with_capacity(4 * 1024 * 1024); { let cursor = Cursor::new(&mut ogg_buf); let mut pkt_writer = PacketWriter::new(cursor); // Write Opus header packet (stream serial = 1) let serial: u32 = 1; let opus_head = build_opus_head(opus_ch_count as u8, opus_sample_rate, 0); pkt_writer.write_packet(opus_head, serial, ogg::writing::PacketWriteEndInfo::EndPage, 0)?; // Write Opus tags packet (empty) let opus_tags = build_opus_tags(); pkt_writer.write_packet(opus_tags, serial, ogg::writing::PacketWriteEndInfo::EndPage, 0)?; let mut sample_buf: Vec = Vec::new(); let mut granule_pos: u64 = 0; loop { let packet = match format.next_packet() { Ok(p) => p, Err(SymphoniaError::IoError(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => break, Err(SymphoniaError::ResetRequired) => { decoder.reset(); continue; } Err(e) => return Err(anyhow!("format error: {e}")), }; if packet.track_id() != track_id { continue; } match decoder.decode(&packet) { Ok(decoded) => { collect_samples(&decoded, opus_ch_count, &mut sample_buf); } Err(SymphoniaError::DecodeError(_)) => continue, Err(e) => return Err(anyhow!("decode error: {e}")), } // Encode complete frames from sample_buf while sample_buf.len() >= frame_size * opus_ch_count { let frame: Vec = sample_buf.drain(..frame_size * opus_ch_count).collect(); let mut out = vec![0u8; 4000]; let encoded_len = encoder .encode_float(&frame, &mut out) .map_err(|e| anyhow!("opus encode: {e}"))?; out.truncate(encoded_len); granule_pos += frame_size as u64; pkt_writer.write_packet( out, serial, ogg::writing::PacketWriteEndInfo::NormalPacket, granule_pos, )?; } } // Encode remaining samples (partial frame — pad with silence) if !sample_buf.is_empty() { let needed = frame_size * opus_ch_count; sample_buf.resize(needed, 0.0); let mut out = vec![0u8; 4000]; let encoded_len = encoder .encode_float(&sample_buf, &mut out) .map_err(|e| anyhow!("opus encode final: {e}"))?; out.truncate(encoded_len); granule_pos += frame_size as u64; pkt_writer.write_packet( out, serial, ogg::writing::PacketWriteEndInfo::EndStream, granule_pos, )?; } } Ok(ogg_buf) } /// Collect PCM samples from a symphonia AudioBufferRef into a flat f32 vec. /// Downmixes to `target_channels` (1 or 2) if source has more channels. fn collect_samples(decoded: &AudioBufferRef<'_>, target_channels: usize, out: &mut Vec) { match decoded { AudioBufferRef::F32(buf) => { interleave_channels(buf.chan(0), if buf.spec().channels.count() > 1 { Some(buf.chan(1)) } else { None }, target_channels, out); } AudioBufferRef::S16(buf) => { let ch0: Vec = buf.chan(0).iter().map(|&s| s as f32 / 32768.0).collect(); let ch1 = if buf.spec().channels.count() > 1 { Some(buf.chan(1).iter().map(|&s| s as f32 / 32768.0).collect::>()) } else { None }; interleave_channels(&ch0, ch1.as_deref(), target_channels, out); } AudioBufferRef::S32(buf) => { let ch0: Vec = buf.chan(0).iter().map(|&s| s as f32 / 2147483648.0).collect(); let ch1 = if buf.spec().channels.count() > 1 { Some(buf.chan(1).iter().map(|&s| s as f32 / 2147483648.0).collect::>()) } else { None }; interleave_channels(&ch0, ch1.as_deref(), target_channels, out); } AudioBufferRef::U8(buf) => { let ch0: Vec = buf.chan(0).iter().map(|&s| (s as f32 - 128.0) / 128.0).collect(); let ch1 = if buf.spec().channels.count() > 1 { Some(buf.chan(1).iter().map(|&s| (s as f32 - 128.0) / 128.0).collect::>()) } else { None }; interleave_channels(&ch0, ch1.as_deref(), target_channels, out); } _ => { // For other formats, try to get samples via S16 conversion // (symphonia may provide other types; we skip unsupported ones) } } } fn interleave_channels(ch0: &[f32], ch1: Option<&[f32]>, target_channels: usize, out: &mut Vec) { let len = ch0.len(); if target_channels == 1 { if let Some(c1) = ch1 { // Mix down to mono out.extend(ch0.iter().zip(c1.iter()).map(|(l, r)| (l + r) * 0.5)); } else { out.extend_from_slice(ch0); } } else { // Stereo interleaved let c1 = ch1.unwrap_or(ch0); for i in 0..len { out.push(ch0[i]); out.push(c1[i]); } } } /// Build OpusHead binary packet (RFC 7845). fn build_opus_head(channels: u8, sample_rate: u32, pre_skip: u16) -> Vec { let mut v = Vec::with_capacity(19); v.extend_from_slice(b"OpusHead"); v.push(1); // version v.push(channels); v.extend_from_slice(&pre_skip.to_le_bytes()); v.extend_from_slice(&sample_rate.to_le_bytes()); v.extend_from_slice(&0u16.to_le_bytes()); // output gain v.push(0); // channel mapping family v } /// Build OpusTags binary packet with minimal vendor string. fn build_opus_tags() -> Vec { let vendor = b"furumi-server"; let mut v = Vec::new(); v.extend_from_slice(b"OpusTags"); v.extend_from_slice(&(vendor.len() as u32).to_le_bytes()); v.extend_from_slice(vendor); v.extend_from_slice(&0u32.to_le_bytes()); // user comment list length = 0 v }