diff --git a/Cargo.toml b/Cargo.toml index a2a0412..86c174d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "musfuse" -version = "0.4.0" +version = "0.5.0" authors = ["AB "] edition = "2018" diff --git a/src/main.rs b/src/main.rs index b0d8034..cb9b588 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,31 +2,33 @@ extern crate base64; extern crate fuse; extern crate libc; extern crate time; +#[macro_use] +extern crate log; +extern crate chrono; + + +use time::Timespec; +use percent_encoding::percent_decode_str; +use serde::Deserialize; +use libc::{EIO, ENOENT}; +use reqwest::{blocking::Client, header::CONTENT_LENGTH}; +use size_format::SizeFormatterBinary; +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + env, + ffi::OsStr, + fmt, + path::Path, + process, +}; use fuse::{ FileAttr, FileType, Filesystem, ReplyAttr, ReplyData, ReplyDirectory, ReplyEntry, Request, }; -use libc::ENOENT; -use reqwest::blocking::Client; -use reqwest::blocking::Response; -use reqwest::header::CONTENT_LENGTH; -use size_format::SizeFormatterBinary; -use std::collections::{BTreeMap, HashMap, HashSet}; -use std::env; -use std::ffi::OsStr; -use std::fmt; -use time::Timespec; -#[macro_use] -extern crate log; -use std::process; -extern crate chrono; -// Download lib staff -use percent_encoding::percent_decode_str; -use serde::Deserialize; -use std::path::Path; struct Metrics { http_requests: u64, + connect_errors: u64, ingress: u64, hit_len_cache: u64, hit_data_cache: u64, @@ -38,8 +40,9 @@ struct Metrics { impl fmt::Debug for Metrics { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, - "http_requests: {}\ningress: {}\nhit_len_cache: {}\nhit_data_cache: {}\nmiss_len_cache: {}\nmiss_data_cache: {}\nserver_addr: {}\n", + "http_requests: {}\nconnect_errors: {}\ningress: {}\nhit_len_cache: {}\nhit_data_cache: {}\nmiss_len_cache: {}\nmiss_data_cache: {}\nserver_addr: {}\n", self.http_requests, + self.connect_errors, self.ingress, self.hit_len_cache, self.hit_data_cache, @@ -50,8 +53,9 @@ impl fmt::Debug for Metrics { } } -static mut metrics: Metrics = Metrics { +static mut METRICS: Metrics = Metrics { http_requests: 0, + connect_errors: 0, ingress: 0, hit_len_cache: 0, hit_data_cache: 0, @@ -76,7 +80,7 @@ pub struct Track { const CACHE_HEAD: i64 = 768 * 1024; const MAX_CACHE_SIZE: i64 = 10; // Count -static mut http_auth: String = String::new(); +static mut HTTP_AUTH: String = String::new(); fn get_basename(path: Option<&String>) -> Option { let base = match percent_decode_str(path.unwrap().as_str()).decode_utf8() { @@ -99,7 +103,7 @@ async fn get_tracks(server: &String) -> Result, Box>() @@ -257,7 +261,7 @@ impl Filesystem for JsonFilesystem { // return usage statistics if ino == self.metrics_inode { unsafe { - let metrics_str = format!("{:#?}", metrics); + let metrics_str = format!("{:#?}", METRICS); reply.data(&metrics_str.as_bytes()); } return; @@ -288,38 +292,50 @@ impl Filesystem for JsonFilesystem { let mut chunk: Vec; let content_length: i64; let client = Client::new(); - let mut resp: Response; // content_length cache. if self.buffer_length.contains_key(id.as_str()) { content_length = self.buffer_length[id.as_str()]; debug!("{} - Hit length cache", ino); unsafe { - metrics.hit_len_cache += 1; + METRICS.hit_len_cache += 1; } } else { unsafe { - resp = client + content_length = match client .head(full_url.as_str()) - .header("Authorization", format!("Basic {}", http_auth)) + .header("Authorization", format!("Basic {}", HTTP_AUTH)) .send() - .unwrap(); + { + Ok(content) => { + let content_length = match content.headers().get(CONTENT_LENGTH) { + Some(header_content) => { + header_content.to_str().unwrap().parse::().unwrap() + } + None => { + reply.error(EIO); + return; + } + }; + content_length + } + Err(err) => { + let name = &self.tree[(ino - 2) as usize].path.as_ref(); + let basename = get_basename(*name).unwrap().to_string(); + error!("An error fetching file {}. {}", basename, err); + METRICS.connect_errors += 1; + reply.error(EIO); + return; + } + }; } - content_length = resp - .headers() - .get(CONTENT_LENGTH) - .unwrap() - .to_str() - .unwrap() - .parse::() - .unwrap(); unsafe { - metrics.http_requests += 1; + METRICS.http_requests += 1; } self.buffer_length.insert(id.to_string(), content_length); debug!("{} - Miss length cache", ino); unsafe { - metrics.miss_len_cache += 1; + METRICS.miss_len_cache += 1; } } // Check for API wrong file size here @@ -339,7 +355,7 @@ impl Filesystem for JsonFilesystem { // Cache found debug!("{} - Hit data cache", ino); unsafe { - metrics.hit_data_cache += 1; + METRICS.hit_data_cache += 1; } chunk = self.buffer_head_data[&ino][offset as usize..end_of_chunk as usize] .to_vec() @@ -349,11 +365,12 @@ impl Filesystem for JsonFilesystem { // Cache doesn't found debug!("{} - Miss data cache", ino); unsafe { - metrics.miss_data_cache += 1; + METRICS.miss_data_cache += 1; } // Fetch file head (CACHE_HEAD) + let response: Vec; unsafe { - resp = client + response = match client .get(full_url.as_str()) .header( "Range", @@ -366,14 +383,23 @@ impl Filesystem for JsonFilesystem { } ), ) - .header("Authorization", format!("Basic {}", http_auth)) + .header("Authorization", format!("Basic {}", HTTP_AUTH)) .send() - .unwrap(); + { + Ok(content) => content.bytes().unwrap().to_vec(), + Err(err) => { + let name = &self.tree[(ino - 2) as usize].path.as_ref(); + let basename = get_basename(*name).unwrap().to_string(); + error!("An error fetching file {}. {}", basename, err); + METRICS.connect_errors += 1; + reply.error(EIO); + return; + } + }; } - let response = resp.bytes().unwrap(); unsafe { - metrics.http_requests += 1; - metrics.ingress += response.len() as u64; + METRICS.http_requests += 1; + METRICS.ingress += response.len() as u64; } // Save cache self.buffer_head_data.insert(ino, response.to_vec()); @@ -385,18 +411,28 @@ impl Filesystem for JsonFilesystem { return; } // If it isn't a beginning of file don't cache it and fetch over HTTP directly. + let response: Vec; unsafe { - resp = client + response = match client .get(full_url.as_str()) .header("Range", &range) - .header("Authorization", format!("Basic {}", http_auth)) + .header("Authorization", format!("Basic {}", HTTP_AUTH)) .send() - .unwrap(); + { + Ok(content) => content.bytes().unwrap().to_vec(), + Err(err) => { + let name = &self.tree[(ino - 2) as usize].path.as_ref(); + let basename = get_basename(*name).unwrap().to_string(); + error!("An error fetching file {}. {}", basename, err); + METRICS.connect_errors += 1; + reply.error(EIO); + return; + } + }; } - let response = resp.bytes().unwrap(); unsafe { - metrics.http_requests += 1; - metrics.ingress += response.len() as u64; + METRICS.http_requests += 1; + METRICS.ingress += response.len() as u64; } chunk = response.to_vec().clone(); reply.data(&chunk); @@ -469,7 +505,7 @@ fn main() { } }; unsafe { - metrics.server_addr = server.clone(); + METRICS.server_addr = server.clone(); } let http_user_var = "HTTP_USER"; let http_pass_var = "HTTP_PASS"; @@ -505,7 +541,7 @@ fn main() { buf.push_str(&http_user); buf.push_str(":"); buf.push_str(&http_pass); - http_auth = base64::encode(buf) + HTTP_AUTH = base64::encode(buf) } let lib = match get_tracks(&server) { Ok(library) => library, @@ -542,8 +578,7 @@ fn main() { info!("Mount options: {:?}", options); let mut mount: fuse::BackgroundSession; unsafe { - mount = - fuse::spawn_mount(fs, &mountpoint, &options).expect("Couldn't mount filesystem"); + mount = fuse::spawn_mount(fs, &mountpoint, &options).expect("Couldn't mount filesystem"); } ctrlc::set_handler(move || { println!("Exitting...");