diff --git a/.gitignore b/.gitignore index 0b2bd12..4bc575a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target Cargo.lock -/mnt \ No newline at end of file +/mnt +.vscode/ diff --git a/Cargo.toml b/Cargo.toml index 880ceb3..b799623 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "musfuse" -version = "0.1.0" +version = "0.4.0" authors = ["AB "] edition = "2018" @@ -10,8 +10,12 @@ reqwest = { version = "0.10", features = ["json", "blocking"] } tokio = { version = "0.2", features = ["full"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -percent-encoding = "*" -fuse = "*" -time = "0.1" -libc = "*" -rustc-serialize = "*" +percent-encoding = "2.1.0" +fuse = "0.3.1" +time = "0.1.42" +libc = "0.2.69" +chrono = "0.4.11" +env_logger = "0.7.1" +log = { version = "^0.4.5", features = ["std"] } +size_format = "1.0.2" +base64 = "0.12.0" diff --git a/src/main.rs b/src/main.rs index 45d4395..ec803cc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,6 @@ // Fuse staff + +extern crate base64; extern crate fuse; extern crate libc; extern crate time; @@ -9,17 +11,62 @@ use libc::ENOENT; use reqwest::blocking::Client; use reqwest::blocking::Response; use reqwest::header::CONTENT_LENGTH; -use std::collections::BTreeMap; +use size_format::SizeFormatterBinary; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::env; use std::ffi::OsStr; +use std::fmt; use time::Timespec; -//use http::Method; +#[macro_use] +extern crate log; +extern crate chrono; +extern crate env_logger; + +use chrono::Local; +use env_logger::Builder; +use log::LevelFilter; +use std::io::Write; // Download lib staff use percent_encoding::percent_decode_str; use serde::Deserialize; use std::path::Path; +struct Metrics { + http_requests: u64, + ingress: u64, + hit_len_cache: u64, + hit_data_cache: u64, + miss_len_cache: u64, + miss_data_cache: u64, + server_addr: String, +} + +impl fmt::Debug for Metrics { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, + "http_requests: {}\ningress: {}\nhit_len_cache: {}\nhit_data_cache: {}\nmiss_len_cache: {}\nmiss_data_cache: {}\nserver_addr: {}\n", + self.http_requests, + self.ingress, + self.hit_len_cache, + self.hit_data_cache, + self.miss_len_cache, + self.miss_data_cache, + self.server_addr, + ) + } +} + +static mut metrics: Metrics = Metrics { + http_requests: 0, + ingress: 0, + hit_len_cache: 0, + hit_data_cache: 0, + miss_len_cache: 0, + miss_data_cache: 0, + server_addr: String::new(), +}; + #[derive(Default, Debug, Clone, PartialEq, Deserialize)] pub struct Track { pub id: Option, @@ -34,8 +81,9 @@ pub struct Track { pub Size: Option, } -const CACHE_HEAD: i64 = 1024 * 1024; -const MAX_CACHE_SIZE: i64 = 10 * 1024 * 1025; // Mb +const CACHE_HEAD: i64 = 768 * 1024; +const MAX_CACHE_SIZE: i64 = 10; // Count +static mut http_auth: String = String::new(); fn get_basename(path: Option<&String>) -> Option { let base = match percent_decode_str(path.unwrap().as_str()).decode_utf8() { @@ -54,12 +102,18 @@ fn get_basename(path: Option<&String>) -> Option { #[tokio::main] async fn get_tracks(server: &String) -> Result, Box> { - let resp = reqwest::get(format!("{}/songs", server).as_str()) - .await? - .json::>() - .await?; - println!("Found {} tracks.", resp.len()); - Ok(resp) + let client = reqwest::Client::new(); + unsafe { + let resp = client + .get(format!("{}/songs", server).as_str()) + .header("Authorization", format!("Basic {}", http_auth)) + .send() + .await? + .json::>() + .await?; + info!("Found {} tracks.", resp.len()); + Ok(resp) + } } #[cfg(target_family = "unix")] @@ -68,8 +122,10 @@ struct JsonFilesystem { tree: Vec, attrs: BTreeMap, inodes: BTreeMap, - buffer_head: BTreeMap>, + buffer_head_index: HashSet, + buffer_head_data: HashMap>, buffer_length: BTreeMap, + metrics_inode: u64, } #[cfg(target_family = "unix")] @@ -78,6 +134,7 @@ impl JsonFilesystem { let mut attrs = BTreeMap::new(); let mut inodes = BTreeMap::new(); let ts = time::now().to_timespec(); + let mut total_size: i64 = 0; let attr = FileAttr { ino: 1, size: 0, @@ -96,26 +153,17 @@ impl JsonFilesystem { }; attrs.insert(1, attr); inodes.insert("/".to_string(), 1); - let client = Client::new(); - let mut resp: Response; for (i, track) in tree.iter().enumerate() { let basename = get_basename(track.path.as_ref()).unwrap().to_string(); - /* - let full_url = format!("{}/{}", server, track.path.as_ref().unwrap().to_string()); - resp = client.head(full_url.as_str()).send().unwrap(); - let content_length = resp - .headers() - .get(CONTENT_LENGTH) - .unwrap() - .to_str() - .unwrap() - .parse::() - .unwrap(); - println!("{} len is {}", basename, content_length); - */ + debug!( + "Added inode: {} - {} [{}]", + i + 2, + basename, + track.Size.unwrap() + ); + total_size = total_size + track.Size.unwrap(); let attr = FileAttr { ino: i as u64 + 2, - //size: 1024 * 1024 * 1024 as u64, size: track.Size.unwrap() as u64, blocks: 0, atime: ts, @@ -133,13 +181,41 @@ impl JsonFilesystem { attrs.insert(attr.ino, attr); inodes.insert(basename.clone(), attr.ino); } + // Metrics file + let metrics_inode = 2 + tree.len() as u64; + let metrics_attr = FileAttr { + ino: metrics_inode, + size: 4096, + blocks: 0, + atime: ts, + mtime: ts, + ctime: ts, + crtime: ts, + kind: FileType::RegularFile, + perm: 0o444, + nlink: 0, + uid: 0, + gid: 0, + rdev: 0, + flags: 0, + }; + attrs.insert(metrics_attr.ino, metrics_attr); + inodes.insert("METRICS.TXT".to_string(), metrics_attr.ino); + warn!("Len: attrs: {}, ino: {}", attrs.len(), inodes.len()); + info!( + "Filesystem initialized. Size: {} files, {}B in total.", + inodes.len(), + (SizeFormatterBinary::new(total_size as u64)) + ); JsonFilesystem { server: server, tree: tree.clone(), attrs: attrs, inodes: inodes, - buffer_head: BTreeMap::new(), + buffer_head_data: HashMap::new(), + buffer_head_index: HashSet::new(), buffer_length: BTreeMap::new(), + metrics_inode: metrics_inode, } } } @@ -147,7 +223,7 @@ impl JsonFilesystem { #[cfg(target_family = "unix")] impl Filesystem for JsonFilesystem { fn getattr(&mut self, _req: &Request, ino: u64, reply: ReplyAttr) { - //println!("getattr(ino={})", ino); + debug!("getattr(ino={})", ino); match self.attrs.get(&ino) { Some(attr) => { let ttl = Timespec::new(1, 0); @@ -158,7 +234,7 @@ impl Filesystem for JsonFilesystem { } fn lookup(&mut self, _req: &Request, parent: u64, name: &OsStr, reply: ReplyEntry) { - //println!("lookup(parent={}, name={})", parent, name.to_str().unwrap()); + debug!("lookup(parent={}, name={})", parent, name.to_str().unwrap()); let inode = match self.inodes.get(name.to_str().unwrap()) { Some(inode) => inode, None => { @@ -169,6 +245,7 @@ impl Filesystem for JsonFilesystem { match self.attrs.get(inode) { Some(attr) => { let ttl = Timespec::new(1, 0); + debug!("{:#?}", attr); reply.entry(&ttl, attr, 0); } None => reply.error(ENOENT), @@ -184,14 +261,37 @@ impl Filesystem for JsonFilesystem { size: u32, reply: ReplyData, ) { - print!( - "read(ino={}, fh={}, offset={}, size={}) ", - ino, fh, offset, size + // return usage statistics + if ino == self.metrics_inode { + unsafe { + let metrics_str = format!("{:#?}", metrics); + reply.data(&metrics_str.as_bytes()); + } + return; + } + + // cleaning cache + if self.buffer_head_index.len() > MAX_CACHE_SIZE as usize { + let mut iter = self.buffer_head_index.iter().filter(|&x| *x != ino); + let old_entry = iter.next().unwrap(); + self.buffer_head_data.remove(old_entry); + let old_entry_copy = old_entry.clone(); + self.buffer_head_index.remove(&old_entry_copy); + let basename = &self.tree[(ino - 2) as usize].path.as_ref(); + debug!( + "{} - Cache dropped for: {} ", + ino, + get_basename(*basename).unwrap().to_string() + ); + } + debug!( + "{} - read(ino={}, fh={}, offset={}, size={}) ", + ino, ino, fh, offset, size ); let url = &self.tree[(ino - 2) as usize].path.as_ref().unwrap(); let id = &self.tree[(ino - 2) as usize].id.as_ref().unwrap(); - let full_url = format!("{}/{}", self.server, url); + let full_url = format!("{}{}", self.server, url); let mut chunk: Vec; let content_length: i64; let client = Client::new(); @@ -200,9 +300,18 @@ impl Filesystem for JsonFilesystem { // content_length cache. if self.buffer_length.contains_key(id.as_str()) { content_length = self.buffer_length[id.as_str()]; - print!("Hit LC "); + debug!("{} - Hit length cache", ino); + unsafe { + metrics.hit_len_cache += 1; + } } else { - resp = client.head(full_url.as_str()).send().unwrap(); + unsafe { + resp = client + .head(full_url.as_str()) + .header("Authorization", format!("Basic {}", http_auth)) + .send() + .unwrap(); + } content_length = resp .headers() .get(CONTENT_LENGTH) @@ -211,14 +320,18 @@ impl Filesystem for JsonFilesystem { .unwrap() .parse::() .unwrap(); + unsafe { + metrics.http_requests += 1; + } self.buffer_length.insert(id.to_string(), content_length); - print!("Miss LC "); + debug!("{} - Miss length cache", ino); + unsafe { + metrics.miss_len_cache += 1; + } } - print!("LC: {} ", self.buffer_length.len()); - print!("HC: {} ", self.buffer_head.len()); - + // Check for API wrong file size here if content_length > offset { - print!("Content len {:?} ", content_length); + debug!("{} - Content len {:?} ", ino, content_length); let end_of_chunk = if size - 1 + offset as u32 > content_length as u32 { content_length } else { @@ -228,66 +341,84 @@ impl Filesystem for JsonFilesystem { // if it's beginning of file... if end_of_chunk < CACHE_HEAD { - // cleaning cache before. it should be less than MAX_CACHE_SIZE bytes - if self.buffer_head.len() as i64 * CACHE_HEAD > MAX_CACHE_SIZE { - let (key, _) = self.buffer_head.iter_mut().next().unwrap(); - let key_cpy: String = key.to_string(); - if *key == key_cpy { - self.buffer_head.remove(&key_cpy); - print!(" *Cache Cleaned* "); - } - } - // looking for CACHE_HEAD bytes file beginning in cache - if self.buffer_head.contains_key(id.as_str()) { - print!("Hit head cache! "); - chunk = self.buffer_head[id.as_str()][offset as usize..end_of_chunk as usize] + if self.buffer_head_data.contains_key(&ino) { + // Cache found + debug!("{} - Hit data cache", ino); + unsafe { + metrics.hit_data_cache += 1; + } + chunk = self.buffer_head_data[&ino][offset as usize..end_of_chunk as usize] .to_vec() .clone(); reply.data(&chunk); } else { - print!("Miss head cache! "); - resp = client - .get(full_url.as_str()) - .header( - "Range", - format!( - "bytes=0-{}", - if CACHE_HEAD > content_length { - content_length - 1 - } else { - CACHE_HEAD - 1 - } - ), - ) - .send() - .unwrap(); + // Cache doesn't found + debug!("{} - Miss data cache", ino); + unsafe { + metrics.miss_data_cache += 1; + } + // Fetch file head (CACHE_HEAD) + unsafe { + resp = client + .get(full_url.as_str()) + .header( + "Range", + format!( + "bytes=0-{}", + if CACHE_HEAD > content_length { + content_length - 1 + } else { + CACHE_HEAD - 1 + } + ), + ) + .header("Authorization", format!("Basic {}", http_auth)) + .send() + .unwrap(); + } let response = resp.bytes().unwrap(); - self.buffer_head.insert(id.to_string(), response.to_vec()); + unsafe { + metrics.http_requests += 1; + metrics.ingress += response.len() as u64; + } + // Save cache + self.buffer_head_data.insert(ino, response.to_vec()); + self.buffer_head_index.insert(ino); chunk = response[offset as usize..end_of_chunk as usize].to_vec(); reply.data(&chunk); } - println!("Chunk len: {:?} ", chunk.len()); + debug!("{} - Chunk len: {:?} ", ino, chunk.len()); return; } - resp = client - .get(full_url.as_str()) - .header("Range", &range) - .send() - .unwrap(); - let test = resp.bytes().unwrap(); - chunk = test.to_vec().clone(); + // If it isn't a beginning of file don't cache it and fetch over HTTP directly. + unsafe { + resp = client + .get(full_url.as_str()) + .header("Range", &range) + .header("Authorization", format!("Basic {}", http_auth)) + .send() + .unwrap(); + } + let response = resp.bytes().unwrap(); + unsafe { + metrics.http_requests += 1; + metrics.ingress += response.len() as u64; + } + chunk = response.to_vec().clone(); reply.data(&chunk); - println!( - " Len: {}, Chunk {} - {}", + debug!( + "{} - Len: {}, Chunk {} - {}", + ino, chunk.len(), offset, offset + chunk.len() as i64 ); } else { - println!( - "Wrong offset. Len is {} but offset {}", - content_length, offset + // Wrong filesize detected. + warn!( + "{} - Wrong offset. Len is {} but offset {}", + ino, content_length, offset ); reply.data(&[]); } @@ -302,7 +433,7 @@ impl Filesystem for JsonFilesystem { offset: i64, mut reply: ReplyDirectory, ) { - //println!("readdir(ino={}, fh={}, offset={})", ino, fh, offset); + debug!("readdir(ino={}, fh={}, offset={})", ino, fh, offset); if ino == 1 { if offset == 0 { reply.add(1, 0, FileType::Directory, "."); @@ -322,6 +453,18 @@ impl Filesystem for JsonFilesystem { } fn main() { + Builder::new() + .format(|buf, record| { + writeln!( + buf, + "{} [{}] - {}", + Local::now().format("%Y-%m-%dT%H:%M:%S"), + record.level(), + record.args() + ) + }) + .filter(None, LevelFilter::Info) + .init(); let mountpoint = match env::args().nth(1) { Some(path) => path, None => { @@ -342,11 +485,64 @@ fn main() { return; } }; - let lib = get_tracks(&server).unwrap(); + unsafe { + metrics.server_addr = server.clone(); + } + let http_user_var = "HTTP_USER"; + let http_pass_var = "HTTP_PASS"; + + let http_user = match env::var_os(http_user_var) { + Some(val) => { + info!( + "Variable {} is set. Will be used for http auth as user.", + http_user_var + ); + val.to_str().unwrap().to_string() + } + None => { + info!("{} is not defined in the environment.", http_user_var); + "".to_string() + } + }; + let http_pass = match env::var_os(http_pass_var) { + Some(val) => { + info!( + "Variable {} is set. Will be used for http auth as password.", + http_pass_var + ); + val.to_str().unwrap().to_string() + } + None => { + info!("{} is not defined in the environment.", http_pass_var); + "".to_string() + } + }; + unsafe { + let mut buf = String::new(); + buf.push_str(&http_user); + buf.push_str(":"); + buf.push_str(&http_pass); + http_auth = base64::encode(buf) + } + let lib = match get_tracks(&server) { + Ok(library) => library, + Err(err) => { + panic!("Can't fetch library from remote server: {}", err); + } + }; + info!("Remote library host: {}", &server); let fs = JsonFilesystem::new(&lib, server); - let options = ["-o", "ro", "-o", "fsname=musfs", "-o", "async_read"] + let options = ["-o", "ro", "-o", "fsname=musfs", "-o", "sync_read"] .iter() .map(|o| o.as_ref()) .collect::>(); + + info!( + "Caching {}B bytes in head of files.", + SizeFormatterBinary::new(CACHE_HEAD as u64) + ); + info!("Max cache is {} files.", MAX_CACHE_SIZE); + info!("Mount options: {:?}", options); + fuse::mount(fs, &mountpoint, &options).expect("Couldn't mount filesystem"); }