Made HTTP part tolerant to connection issues.

This commit is contained in:
AB
2020-04-20 02:57:29 +03:00
parent a244af9184
commit 9c98b12d1a
2 changed files with 92 additions and 57 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "musfuse" name = "musfuse"
version = "0.4.0" version = "0.5.0"
authors = ["AB <ultradesu@hexor.ru>"] authors = ["AB <ultradesu@hexor.ru>"]
edition = "2018" edition = "2018"

View File

@ -2,31 +2,33 @@ extern crate base64;
extern crate fuse; extern crate fuse;
extern crate libc; extern crate libc;
extern crate time; extern crate time;
#[macro_use]
extern crate log;
extern crate chrono;
use time::Timespec;
use percent_encoding::percent_decode_str;
use serde::Deserialize;
use libc::{EIO, ENOENT};
use reqwest::{blocking::Client, header::CONTENT_LENGTH};
use size_format::SizeFormatterBinary;
use std::{
collections::{BTreeMap, HashMap, HashSet},
env,
ffi::OsStr,
fmt,
path::Path,
process,
};
use fuse::{ use fuse::{
FileAttr, FileType, Filesystem, ReplyAttr, ReplyData, ReplyDirectory, ReplyEntry, Request, FileAttr, FileType, Filesystem, ReplyAttr, ReplyData, ReplyDirectory, ReplyEntry, Request,
}; };
use libc::ENOENT;
use reqwest::blocking::Client;
use reqwest::blocking::Response;
use reqwest::header::CONTENT_LENGTH;
use size_format::SizeFormatterBinary;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::env;
use std::ffi::OsStr;
use std::fmt;
use time::Timespec;
#[macro_use]
extern crate log;
use std::process;
extern crate chrono;
// Download lib staff
use percent_encoding::percent_decode_str;
use serde::Deserialize;
use std::path::Path;
struct Metrics { struct Metrics {
http_requests: u64, http_requests: u64,
connect_errors: u64,
ingress: u64, ingress: u64,
hit_len_cache: u64, hit_len_cache: u64,
hit_data_cache: u64, hit_data_cache: u64,
@ -38,8 +40,9 @@ struct Metrics {
impl fmt::Debug for Metrics { impl fmt::Debug for Metrics {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, write!(f,
"http_requests: {}\ningress: {}\nhit_len_cache: {}\nhit_data_cache: {}\nmiss_len_cache: {}\nmiss_data_cache: {}\nserver_addr: {}\n", "http_requests: {}\nconnect_errors: {}\ningress: {}\nhit_len_cache: {}\nhit_data_cache: {}\nmiss_len_cache: {}\nmiss_data_cache: {}\nserver_addr: {}\n",
self.http_requests, self.http_requests,
self.connect_errors,
self.ingress, self.ingress,
self.hit_len_cache, self.hit_len_cache,
self.hit_data_cache, self.hit_data_cache,
@ -50,8 +53,9 @@ impl fmt::Debug for Metrics {
} }
} }
static mut metrics: Metrics = Metrics { static mut METRICS: Metrics = Metrics {
http_requests: 0, http_requests: 0,
connect_errors: 0,
ingress: 0, ingress: 0,
hit_len_cache: 0, hit_len_cache: 0,
hit_data_cache: 0, hit_data_cache: 0,
@ -76,7 +80,7 @@ pub struct Track {
const CACHE_HEAD: i64 = 768 * 1024; const CACHE_HEAD: i64 = 768 * 1024;
const MAX_CACHE_SIZE: i64 = 10; // Count const MAX_CACHE_SIZE: i64 = 10; // Count
static mut http_auth: String = String::new(); static mut HTTP_AUTH: String = String::new();
fn get_basename(path: Option<&String>) -> Option<String> { fn get_basename(path: Option<&String>) -> Option<String> {
let base = match percent_decode_str(path.unwrap().as_str()).decode_utf8() { let base = match percent_decode_str(path.unwrap().as_str()).decode_utf8() {
@ -99,7 +103,7 @@ async fn get_tracks(server: &String) -> Result<Vec<Track>, Box<dyn std::error::E
unsafe { unsafe {
let resp = client let resp = client
.get(format!("{}/songs", server).as_str()) .get(format!("{}/songs", server).as_str())
.header("Authorization", format!("Basic {}", http_auth)) .header("Authorization", format!("Basic {}", HTTP_AUTH))
.send() .send()
.await? .await?
.json::<Vec<Track>>() .json::<Vec<Track>>()
@ -257,7 +261,7 @@ impl Filesystem for JsonFilesystem {
// return usage statistics // return usage statistics
if ino == self.metrics_inode { if ino == self.metrics_inode {
unsafe { unsafe {
let metrics_str = format!("{:#?}", metrics); let metrics_str = format!("{:#?}", METRICS);
reply.data(&metrics_str.as_bytes()); reply.data(&metrics_str.as_bytes());
} }
return; return;
@ -288,38 +292,50 @@ impl Filesystem for JsonFilesystem {
let mut chunk: Vec<u8>; let mut chunk: Vec<u8>;
let content_length: i64; let content_length: i64;
let client = Client::new(); let client = Client::new();
let mut resp: Response;
// content_length cache. // content_length cache.
if self.buffer_length.contains_key(id.as_str()) { if self.buffer_length.contains_key(id.as_str()) {
content_length = self.buffer_length[id.as_str()]; content_length = self.buffer_length[id.as_str()];
debug!("{} - Hit length cache", ino); debug!("{} - Hit length cache", ino);
unsafe { unsafe {
metrics.hit_len_cache += 1; METRICS.hit_len_cache += 1;
} }
} else { } else {
unsafe { unsafe {
resp = client content_length = match client
.head(full_url.as_str()) .head(full_url.as_str())
.header("Authorization", format!("Basic {}", http_auth)) .header("Authorization", format!("Basic {}", HTTP_AUTH))
.send() .send()
.unwrap(); {
Ok(content) => {
let content_length = match content.headers().get(CONTENT_LENGTH) {
Some(header_content) => {
header_content.to_str().unwrap().parse::<i64>().unwrap()
}
None => {
reply.error(EIO);
return;
}
};
content_length
}
Err(err) => {
let name = &self.tree[(ino - 2) as usize].path.as_ref();
let basename = get_basename(*name).unwrap().to_string();
error!("An error fetching file {}. {}", basename, err);
METRICS.connect_errors += 1;
reply.error(EIO);
return;
}
};
} }
content_length = resp
.headers()
.get(CONTENT_LENGTH)
.unwrap()
.to_str()
.unwrap()
.parse::<i64>()
.unwrap();
unsafe { unsafe {
metrics.http_requests += 1; METRICS.http_requests += 1;
} }
self.buffer_length.insert(id.to_string(), content_length); self.buffer_length.insert(id.to_string(), content_length);
debug!("{} - Miss length cache", ino); debug!("{} - Miss length cache", ino);
unsafe { unsafe {
metrics.miss_len_cache += 1; METRICS.miss_len_cache += 1;
} }
} }
// Check for API wrong file size here // Check for API wrong file size here
@ -339,7 +355,7 @@ impl Filesystem for JsonFilesystem {
// Cache found // Cache found
debug!("{} - Hit data cache", ino); debug!("{} - Hit data cache", ino);
unsafe { unsafe {
metrics.hit_data_cache += 1; METRICS.hit_data_cache += 1;
} }
chunk = self.buffer_head_data[&ino][offset as usize..end_of_chunk as usize] chunk = self.buffer_head_data[&ino][offset as usize..end_of_chunk as usize]
.to_vec() .to_vec()
@ -349,11 +365,12 @@ impl Filesystem for JsonFilesystem {
// Cache doesn't found // Cache doesn't found
debug!("{} - Miss data cache", ino); debug!("{} - Miss data cache", ino);
unsafe { unsafe {
metrics.miss_data_cache += 1; METRICS.miss_data_cache += 1;
} }
// Fetch file head (CACHE_HEAD) // Fetch file head (CACHE_HEAD)
let response: Vec<u8>;
unsafe { unsafe {
resp = client response = match client
.get(full_url.as_str()) .get(full_url.as_str())
.header( .header(
"Range", "Range",
@ -366,14 +383,23 @@ impl Filesystem for JsonFilesystem {
} }
), ),
) )
.header("Authorization", format!("Basic {}", http_auth)) .header("Authorization", format!("Basic {}", HTTP_AUTH))
.send() .send()
.unwrap(); {
Ok(content) => content.bytes().unwrap().to_vec(),
Err(err) => {
let name = &self.tree[(ino - 2) as usize].path.as_ref();
let basename = get_basename(*name).unwrap().to_string();
error!("An error fetching file {}. {}", basename, err);
METRICS.connect_errors += 1;
reply.error(EIO);
return;
}
};
} }
let response = resp.bytes().unwrap();
unsafe { unsafe {
metrics.http_requests += 1; METRICS.http_requests += 1;
metrics.ingress += response.len() as u64; METRICS.ingress += response.len() as u64;
} }
// Save cache // Save cache
self.buffer_head_data.insert(ino, response.to_vec()); self.buffer_head_data.insert(ino, response.to_vec());
@ -385,18 +411,28 @@ impl Filesystem for JsonFilesystem {
return; return;
} }
// If it isn't a beginning of file don't cache it and fetch over HTTP directly. // If it isn't a beginning of file don't cache it and fetch over HTTP directly.
let response: Vec<u8>;
unsafe { unsafe {
resp = client response = match client
.get(full_url.as_str()) .get(full_url.as_str())
.header("Range", &range) .header("Range", &range)
.header("Authorization", format!("Basic {}", http_auth)) .header("Authorization", format!("Basic {}", HTTP_AUTH))
.send() .send()
.unwrap(); {
Ok(content) => content.bytes().unwrap().to_vec(),
Err(err) => {
let name = &self.tree[(ino - 2) as usize].path.as_ref();
let basename = get_basename(*name).unwrap().to_string();
error!("An error fetching file {}. {}", basename, err);
METRICS.connect_errors += 1;
reply.error(EIO);
return;
}
};
} }
let response = resp.bytes().unwrap();
unsafe { unsafe {
metrics.http_requests += 1; METRICS.http_requests += 1;
metrics.ingress += response.len() as u64; METRICS.ingress += response.len() as u64;
} }
chunk = response.to_vec().clone(); chunk = response.to_vec().clone();
reply.data(&chunk); reply.data(&chunk);
@ -469,7 +505,7 @@ fn main() {
} }
}; };
unsafe { unsafe {
metrics.server_addr = server.clone(); METRICS.server_addr = server.clone();
} }
let http_user_var = "HTTP_USER"; let http_user_var = "HTTP_USER";
let http_pass_var = "HTTP_PASS"; let http_pass_var = "HTTP_PASS";
@ -505,7 +541,7 @@ fn main() {
buf.push_str(&http_user); buf.push_str(&http_user);
buf.push_str(":"); buf.push_str(":");
buf.push_str(&http_pass); buf.push_str(&http_pass);
http_auth = base64::encode(buf) HTTP_AUTH = base64::encode(buf)
} }
let lib = match get_tracks(&server) { let lib = match get_tracks(&server) {
Ok(library) => library, Ok(library) => library,
@ -542,8 +578,7 @@ fn main() {
info!("Mount options: {:?}", options); info!("Mount options: {:?}", options);
let mut mount: fuse::BackgroundSession; let mut mount: fuse::BackgroundSession;
unsafe { unsafe {
mount = mount = fuse::spawn_mount(fs, &mountpoint, &options).expect("Couldn't mount filesystem");
fuse::spawn_mount(fs, &mountpoint, &options).expect("Couldn't mount filesystem");
} }
ctrlc::set_handler(move || { ctrlc::set_handler(move || {
println!("Exitting..."); println!("Exitting...");