Refactor HTTP. Started work on cache.

This commit is contained in:
AB
2020-06-05 02:01:02 +03:00
parent 049646e40c
commit be7be1da46
2 changed files with 113 additions and 65 deletions

View File

@ -13,7 +13,7 @@ use polyfuse::{
use slab::Slab;
use std::path::{PathBuf};
use std::path::{PathBuf, Path};
use std::{
collections::hash_map::{Entry, HashMap},
ffi::{OsStr, OsString},
@ -24,6 +24,7 @@ use std::{
};
use tokio::sync::Mutex;
use tracing_futures::Instrument;
use crate::http::HTTP;
type Ino = u64;
@ -128,12 +129,21 @@ struct DirHandle {
entries: Vec<Arc<DirEntry>>,
}
#[derive(Debug)]
struct FileInodeMap {
parent: Ino,
ino: Ino,
path: PathBuf,
}
//noinspection RsUnresolvedReference
//noinspection RsUnresolvedReference
//noinspection RsUnresolvedReference
#[derive(Debug)]
pub struct MemFS {
http: http::HTTP,
inodes: Mutex<INodeTable>,
f_ino_map: Mutex<Vec<FileInodeMap>>,
ttl: Duration,
dir_handles: Mutex<Slab<Arc<Mutex<DirHandle>>>>,
cfg: config::Config,
@ -164,7 +174,9 @@ impl MemFS {
});
Self {
http: HTTP::new(cfg.server.clone(), cfg.username.clone(), cfg.password.clone()),
inodes: Mutex::new(inodes),
f_ino_map: Mutex::new(Vec::new()),
dir_handles: Mutex::default(),
ttl: Duration::from_secs(60 * 60 * 24),
cfg: cfg.clone(),
@ -180,7 +192,7 @@ impl MemFS {
}
async fn lookup_inode(&self, parent: Ino, name: &OsStr) -> io::Result<ReplyEntry> {
error!("==> lookup_inode: parent: {:?}, name: {:?}", parent, name);
debug!("==> lookup_inode: parent: {:?}, name: {:?}", parent, name);
let inodes = self.inodes.lock().await;
@ -196,7 +208,6 @@ impl MemFS {
let child = inodes.get(child_ino).unwrap_or_else(|| unreachable!());
let mut child = child.lock().await;
child.refcount += 1;
// warn!("==> lookup_inode: child_ino: {:?}, child.attr: {:?}", child_ino, child.attr);
Ok(self.make_entry_reply(child_ino, child.attr))
}
@ -408,22 +419,20 @@ impl MemFS {
let inode = inodes.get(f_inode).ok_or_else(no_entry)?;
let inode = inode.lock().await;
warn!("do_lookup inode {:?}", inode);
let file_path = match &inode.kind {
match &inode.kind {
INodeKind::Directory(_) => {
drop(inode);
drop(inodes);
let mut file_path = self.full_path(op.parent()).await.unwrap();
file_path.push(op.name());
file_path
self.fetch_remote(file_path, f_inode).await;
}
_ => {
drop(inode);
drop(inodes);
PathBuf::new()
}
};
warn!("{:?}", file_path);
self.fetch_remote(file_path, f_inode).await;
}
None => warn!("Cant find inode for {:?}", op.name()),
}
@ -505,22 +514,23 @@ impl MemFS {
}
pub async fn fetch_remote(&self, path: PathBuf, parent: u64) -> io::Result<()> {
let remote_entries = http::list_directory(
&self.cfg.server,
&self.cfg.username,
&self.cfg.password,
path,
)
.await
.unwrap();
let remote_entries = self.http.list(path).await.unwrap();
for r_entry in remote_entries.iter() {
match &r_entry.r#type {
Some(r#type) => match r#type.as_str() {
"file" => {
let f_name = r_entry.name.as_ref().unwrap();
let mut inode_map = self.f_ino_map.lock().await;
let mut full_name = self.full_path(parent).await.unwrap();
full_name.push(PathBuf::from(f_name));
self.make_node(parent, OsStr::new(f_name.as_str()), |entry| INode {
attr: {
info!("Adding file {:?} - {:?}", f_name, parent);
info!("Adding file {:?}", full_name);
inode_map.push(FileInodeMap {
parent,
ino: entry.ino(),
path: full_name,
});
let mut attr = FileAttr::default();
attr.set_ino(entry.ino());
attr.set_mtime(r_entry.parse_rfc2822());
@ -566,17 +576,16 @@ impl MemFS {
Ok(())
}
async fn inode_to_name(&self, inode_id: u64) -> Option<(PathBuf, u64)> {
async fn inode_to_name(&self, inode: u64) -> Option<(PathBuf, u64)> {
let inodes = self.inodes.lock().await;
let inode = inodes.get(inode_id).ok_or_else(no_entry).unwrap();
let inode_mutex = inodes.get(inode).ok_or_else(no_entry).unwrap();
let inode = inode.lock().await;
let inode_mutex = inode_mutex.lock().await;
// let clos = async || -> io::Result<HashMap<OsString, u64>> {
// let clos = || -> io::Result<HashMap<OsString, u64>> {
let mut parent_ino: u64 = 0;
let children = match &inode.kind {
let mut uri = PathBuf::new();
let ret = match &inode_mutex.kind {
INodeKind::Directory(dir) => match dir.parent {
Some(parent) => {
let par_inode = inodes.get(parent).ok_or_else(no_entry).unwrap();
@ -585,25 +594,28 @@ impl MemFS {
parent_ino = par_inode.attr.ino();
let _uri = match &par_inode.kind {
INodeKind::Directory(dir) => dir.children.clone(),
_ => HashMap::new(),
INodeKind::Directory(dir) => {
let _children = dir.children.clone();
for (name, c_inode) in &_children {
if &inode == c_inode {
uri.push(name.as_os_str());
}
}
Some((uri, parent_ino))
}
_ => Some((uri, parent_ino)),
};
_uri
}
None => HashMap::new(),
None => Some((uri, parent_ino)),
},
_ => HashMap::new(),
};
// };
// let children = clos().await.unwrap();
let mut uri = PathBuf::new();
for (name, c_inode) in &children {
if &inode_id == c_inode {
uri.push(name.as_os_str());
INodeKind::RegularFile(_) => {
warn!("inode_mutex {:?}", inode_mutex);
Some((uri, parent_ino))
}
}
Some((uri, parent_ino))
INodeKind::Symlink(_) => Some((uri, parent_ino))
};
ret
}
//noinspection RsUnresolvedReference
@ -856,11 +868,26 @@ impl MemFS {
}
async fn do_read(&self, op: &op::Read<'_>) -> io::Result<impl Reply + Debug> {
let full_path_mutex = self.f_ino_map.lock().await;
let mut counter = 0;
let full_path = loop {
if counter < full_path_mutex.len() {
if full_path_mutex[counter].ino == op.ino() {
break full_path_mutex[counter].path.clone();
}
} else {
break PathBuf::from("");
}
counter += 1;
};
drop(full_path_mutex);
let inodes = self.inodes.lock().await;
let inode = inodes.get(op.ino()).ok_or_else(no_entry)?;
let inode = inode.lock().await;
let content = match inode.kind {
INodeKind::RegularFile(ref content) => content,
_ => return Err(io::Error::from_raw_os_error(libc::EINVAL)),

View File

@ -7,6 +7,20 @@ use std::{
thread::sleep,
time::{Duration, SystemTime},
};
use chrono::{DateTime};
use reqwest::{Client, Error, header};
static APP_USER_AGENT: &str = concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
);
#[derive(Default, Debug, Clone)]
pub struct HTTP {
client: Client,
server: String,
}
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
pub struct RemoteEntry {
@ -23,35 +37,42 @@ impl RemoteEntry {
}
}
impl HTTP {
pub fn new(server: String, username: Option<String>, password: Option<String>, ) -> Self {
let mut headers = header::HeaderMap::new();
match username {
Some(username) => {
info!("HTTP credentials has been configured. Securing connection.");
let mut _buf = String::new();
_buf.push_str(format!("{}:{}", username, password.as_ref().unwrap()).as_str());
let creds = base64::encode(_buf);
use chrono::{DateTime};
headers.insert(header::AUTHORIZATION, header::HeaderValue::from_str(format!("Basic {}", creds).as_str()).unwrap());
pub async fn list_directory(
server: &std::string::String,
username: &Option<String>,
password: &Option<String>,
path: PathBuf,
) -> Result<Vec<RemoteEntry>, reqwest::Error> {
info!("Fetching path '{}/{}'", server, path.display());
let client = reqwest::Client::new();
let http_auth = match username {
Some(username) => {
// info!("Using Basic Auth");
let mut _buf = String::new();
_buf.push_str(format!("{}:{}", username, password.as_ref().unwrap()).as_str());
base64::encode(_buf)
}
None => {},
};
let client = reqwest::Client::builder()
.user_agent(APP_USER_AGENT)
.default_headers(headers)
// .gzip(true)
.build().unwrap();
Self {
client,
server
}
None => String::new(),
};
//info!("AUTH: {:?}", http_auth);
let resp = client
.get(format!("{}/{}", server, path.display()).as_str())
.header("Authorization", format!("Basic {}", http_auth))
.send()
.await?
.json::<Vec<RemoteEntry>>()
.await?;
info!("Found {} entries into '{}'", resp.len(), path.display());
Ok(resp)
}
pub async fn list(&self, path: PathBuf, ) -> Result<Vec<RemoteEntry>, Error> {
debug!("Fetching path '{}/{}'", self.server, path.display());
let mut client = &self.client;
let resp = client
.get(format!("{}/{}", self.server, path.display()).as_str())
.send()
.await?
.json::<Vec<RemoteEntry>>()
.await?;
info!("Found {} entries into '{}'", resp.len(), path.display());
Ok(resp)
}
}