From eb473aeddc0936062e4507799f36ce3249ba3958 Mon Sep 17 00:00:00 2001 From: AB Date: Mon, 7 Dec 2020 15:55:25 +0300 Subject: [PATCH] Add realtions, words save feature. --- src/commands.rs | 8 ++-- src/db.rs | 124 +++++++++++++++++++++++++++++++++--------------- src/errors.rs | 7 +-- src/main.rs | 14 +++--- src/utils.rs | 40 ++++++++-------- 5 files changed, 123 insertions(+), 70 deletions(-) diff --git a/src/commands.rs b/src/commands.rs index 22bb781..0eccab0 100644 --- a/src/commands.rs +++ b/src/commands.rs @@ -1,8 +1,7 @@ use crate::db; use html_escape::encode_text; use telegram_bot::prelude::*; -use telegram_bot::{Api, Error, Message, MessageKind, ParseMode, UpdateKind}; -use tokio::time::delay_for; +use telegram_bot::{Api, Error, Message, ParseMode, }; pub(crate) async fn here(api: Api, message: Message) -> Result<(), Error> { let members: Vec = db::get_members(message.chat.id()).unwrap(); @@ -22,7 +21,10 @@ pub(crate) async fn here(api: Api, message: Message) -> Result<(), Error> { msg = format!("{} {}", msg, mention); } - match api.send(message.text_reply(msg).parse_mode(ParseMode::Html)).await { + match api + .send(message.text_reply(msg).parse_mode(ParseMode::Html)) + .await + { Ok(_) => debug!("@here command sent to {}", message.from.id), Err(_) => warn!("@here command sent failed to {}", message.from.id), } diff --git a/src/db.rs b/src/db.rs index 841e20d..c690da4 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,7 +1,6 @@ use crate::errors; use crate::utils; -use rusqlite::{named_params, params, Connection, Error, Result}; -use sha1::{Digest, Sha1}; +use rusqlite::{params, Connection, Error, Result}; use std::time::SystemTime; use telegram_bot::*; @@ -20,7 +19,7 @@ pub(crate) fn open() -> Result { pub(crate) fn update_scheme() -> Result<()> { let conn = open()?; - conn.execute(scheme, params![])?; + conn.execute(SCHEME, params![])?; info!("Scheme updated."); Ok(()) } @@ -57,7 +56,6 @@ pub(crate) fn get_conf(id: telegram_bot::ChatId) -> Result let mut rows = stmt.query_named(&[(":id", &id.to_string())])?; let mut confs = Vec::new(); - while let Some(row) = rows.next()? { confs.push(Conf { id: telegram_bot::ChatId::new(row.get(0)?), @@ -65,8 +63,6 @@ pub(crate) fn get_conf(id: telegram_bot::ChatId) -> Result date: row.get(2)?, }) } - //println!("Confs: {:?}", confs); - if confs.len() == 0 { Err(errors::Error::ConfNotFound) } else { @@ -74,6 +70,7 @@ pub(crate) fn get_conf(id: telegram_bot::ChatId) -> Result } } +/* pub(crate) fn get_confs() -> Result> { let conn = open()?; let mut stmt = conn.prepare("SELECT id, title, date FROM conf")?; @@ -92,6 +89,7 @@ pub(crate) fn get_confs() -> Result> { Ok(confs) } + */ pub(crate) fn get_members(id: telegram_bot::ChatId) -> Result> { let conn = open()?; @@ -118,7 +116,6 @@ pub(crate) fn get_members(id: telegram_bot::ChatId) -> Result Result<(), Error> { let title = utils::get_title(&message); match get_conf(message.chat.id()) { - Ok(conf) => { + Ok(_) => { let update = Conf { id: message.chat.id(), title, @@ -143,16 +140,13 @@ pub(crate) async fn add_conf(message: Message) -> Result<(), Error> { stmt.execute_named(&[(":id", &update.id.to_string()), (":title", &update.title)])?; //println!("Conf {:?} updated: {:?}", update.title, get_conf(update.id)); } - Err(e) => { + Err(_) => { let update = Conf { id: message.chat.id(), title, date: 0, }; - let unix_time = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() as i64; + let unix_time = utils::unixtime().await; let mut stmt = conn.prepare( "UPDATE conf @@ -169,7 +163,6 @@ pub(crate) async fn add_conf(message: Message) -> Result<(), Error> { ])?; //println!("Conf {:?} added: {:?}", update.title, get_conf(update.id)); } - _ => {} } Ok(()) } @@ -177,7 +170,7 @@ pub(crate) async fn add_conf(message: Message) -> Result<(), Error> { pub(crate) async fn add_user(message: Message) -> Result<(), Error> { let conn = open()?; match get_user(message.from.id) { - Ok(user) => { + Ok(_) => { let update = telegram_bot::User { id: message.from.id, first_name: message.from.first_name, @@ -203,7 +196,7 @@ pub(crate) async fn add_user(message: Message) -> Result<(), Error> { ])?; //println!("User {} updated: {:?}", update.first_name, get_user(user.id)); } - Err(e) => { + Err(_) => { let unix_time = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() @@ -230,7 +223,6 @@ pub(crate) async fn add_user(message: Message) -> Result<(), Error> { ])?; //println!("User added: {:?}", user); } - _ => {} } Ok(()) } @@ -255,44 +247,102 @@ pub(crate) async fn add_file( Ok(()) } -pub(crate) async fn get_file(file_id: String) -> Result<(), errors::Error> { +pub(crate) async fn get_file(file_id: String) -> Result { let conn = open()?; - let mut stmt = conn.prepare("SELECT path FROM file WHERE file_id = :file_id")?; - let mut rows = stmt.query_named(&[(":file_id", &file_id)])?; - let mut files = Vec::new(); + let file_rowid = match { conn.prepare("SELECT rowid FROM file WHERE file_id = :file_id")? } + .query_row(params![file_id], |row| row.get(0)) { + Ok(id) => Ok(id), + Err(_) => { + Err(errors::Error::FileNotFound) + } + }; - while let Some(row) = rows.next()? { - files.push("should be rewritten"); + file_rowid +} + +async fn add_word(word: &String) -> Result { + match get_stop_word(&word).await { + Err(_) => return Err(errors::Error::WordInStopList), + _ => {} } - if files.len() > 0 { - Ok(()) - } else { - Err(errors::Error::FileNotFound) + let conn = open()?; + let word_rowid = match { conn.prepare("INSERT OR IGNORE INTO word('word') VALUES (:word)")? } + .insert(params![word]) + { + Ok(id) => id, + Err(_) => { conn.prepare("SELECT rowid FROM word WHERE word = (:word)")? } + .query_row(params![word], |row| row.get(0))?, + }; + Ok(word_rowid) +} + +async fn get_stop_word(stop_word: &String) -> Result<(), errors::Error> { + let conn = open()?; + match conn.execute_named( + "SELECT rowid FROM stop_words WHERE word = (:stop_word)", + &[(":stop_word", &stop_word)], + ) { + Ok(i) => match i { + 0 => Ok(()), + _ => Err(errors::Error::WordNotFound), + }, + Err(e) => Err(errors::Error::SQLITE3Error(e)), } } - -async fn add_word(word: String) -> Result<(), errors::Error> { +async fn add_relation(word_id: i64, msg_id: i64, message: &Message) -> Result { + let user_id = i64::from(message.from.id); + let conf_id = i64::from(message.chat.id()); + let unix_time = utils::unixtime().await; let conn = open()?; - conn.execute_named("INSERT OR IGNORE INTO word('word') VALUES (:word)", &[(":word", &word)]); - debug!("Added word {}", word); - Ok(()) + let rowid = match { + conn.prepare( + "INSERT OR IGNORE INTO + relations('word_id', 'user_id', 'conf_id', 'msg_id', 'date') + VALUES (:word_id, :user_id, :conf_id, :msg_id, :date)", + )? + } + .insert(params![word_id, user_id, conf_id, msg_id, unix_time]) + { + Ok(id) => id, + Err(e) => return Err(errors::Error::SQLITE3Error(e)), + }; + Ok(rowid) } - - pub(crate) async fn add_sentence(message: &telegram_bot::Message) -> Result<(), errors::Error> { + let text = message.text().unwrap(); + let conn = open()?; + + // Save sentence + let msg_rowid = match { conn.prepare("INSERT OR IGNORE INTO messages('text') VALUES (:text)")? } + .insert(params![text]) + { + Ok(id) => id, + Err(_) => { conn.prepare("SELECT rowid FROM messages WHERE text = (:text)")? } + .query_row(params![text], |row| row.get(0))?, + }; + + // Save stemmed words let words = utils::stemming(message).await?; - //let conn = open()?; for word in words { - add_word(word).await?; + match add_word(&word).await { + Ok(id) => { + debug!("Added {}: rowid: {}", &word, id); + match add_relation(id, msg_rowid, message).await { + Ok(_) => {}, + Err(e) => panic!("SQLITE3 Error: Relations failed: {:?}", e) + } + } + Err(_) => debug!("Word {} is in stop list.", &word), + } } Ok(()) } // SCHEME -static scheme: &str = " +static SCHEME: &str = " PRAGMA foreign_keys = off; BEGIN TRANSACTION; diff --git a/src/errors.rs b/src/errors.rs index dad5740..cff1ad0 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,9 +1,8 @@ use reqwest::Error as reqwest_error; use rusqlite::Error as sqlite_error; -use rusqlite::{named_params, params, Connection, Result}; -use std::{fmt, io, io::Error as io_error}; -use telegram_bot::Error as tg_error; use serde_json::Error as serde_error; +use std::{fmt, io::Error as io_error}; +use telegram_bot::Error as tg_error; #[derive(Debug)] pub(crate) enum Error { @@ -12,6 +11,8 @@ pub(crate) enum Error { TelegramError(tg_error), ReqwestError(reqwest_error), ConfNotFound, + WordNotFound, + WordInStopList, IOError(io_error), FileNotFound, JsonParseError(serde_error), diff --git a/src/main.rs b/src/main.rs index f349b65..7aa43e2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,6 @@ use std::{env, process}; use futures::StreamExt; -use reqwest; -use telegram_bot::types::chat::MessageChat; use telegram_bot::*; #[macro_use] extern crate log; @@ -33,7 +31,6 @@ async fn handler(api: Api, message: Message, token: String) -> Result<(), errors } MessageKind::Photo { ref caption, - ref data, .. } => { let title = utils::get_title(&message); @@ -61,7 +58,7 @@ async fn handler(api: Api, message: Message, token: String) -> Result<(), errors utils::get_files(api, message, token).await?; } - MessageKind::Sticker { ref data, .. } => { + MessageKind::Sticker { .. } => { let title = utils::get_title(&message); info!( "<{}({})>[{}({})]: *STICKER*", @@ -115,8 +112,11 @@ async fn handler(api: Api, message: Message, token: String) -> Result<(), errors #[tokio::main] async fn main() -> Result<(), errors::Error> { - env_logger::from_env(Env::default().default_filter_or("debug")).init(); - db::update_scheme(); + env_logger::from_env(Env::default().default_filter_or("info")).init(); + match db::update_scheme() { + Ok(_) => {}, + Err(e) => panic!("Database error: {:?}", e) + } let token = match env::var("TELEGRAM_BOT_TOKEN") { Ok(token) => token, Err(_) => { @@ -125,7 +125,6 @@ async fn main() -> Result<(), errors::Error> { } }; let api = Api::new(token.clone()); - // Fetch new updates via long poll method let mut stream = api.stream(); while let Some(update) = stream.next().await { @@ -134,7 +133,6 @@ async fn main() -> Result<(), errors::Error> { if let UpdateKind::Message(message) = update.kind { db::add_user(message.clone()).await?; db::add_conf(message.clone()).await?; - handler(api.clone(), message, token.clone()).await?; } } diff --git a/src/utils.rs b/src/utils.rs index 69f0072..5e5f223 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,31 +1,21 @@ -use reqwest::Client; use sha1::Sha1; use std::fs::{create_dir as fs_create_dir, File}; use std::io::prelude::*; -use std::path::Path; -use std::{env, io}; +use std::time::SystemTime; use telegram_bot::*; use uuid::Uuid; use crate::db; use crate::errors; extern crate reqwest; -use ascii::AsciiChar::{LineFeed, EOT}; -use subprocess::{Exec, Popen, PopenConfig, Redirection}; use serde_json::Value; -//use serde::{Deserialize, Serialize}; - -//#[derive(Serialize, Deserialize)] -struct StemWord { - -} - +use subprocess::{Exec, }; pub(crate) fn get_title(message: &Message) -> String { match &message.chat { MessageChat::Supergroup(chat) => chat.title.clone(), MessageChat::Group(chat) => chat.title.clone(), - MessageChat::Private(chat) => "PRIVATE".to_string(), + MessageChat::Private(_) => format!("PRIVATE"), _ => "PRIVATE".to_string(), } } @@ -37,6 +27,13 @@ pub(crate) async fn create_dir(dir: &String) -> () { } } +pub(crate) async fn unixtime() -> i64 { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() as i64 +} + pub(crate) async fn get_files( api: Api, message: Message, @@ -66,7 +63,7 @@ pub(crate) async fn get_files( token, api_response.file_path.unwrap() ); - let mut file_response = reqwest::get(&url).await?; + let file_response = reqwest::get(&url).await?; let ext = { file_response .url() @@ -85,16 +82,18 @@ pub(crate) async fn get_files( let file_hash = hasher.digest().to_string(); match db::get_file(file_hash.clone()).await { Ok(_) => { - debug!("File {} exist", file_hash); } Err(_) => { let mut dest = File::create(path.clone())?; - dest.write(&content); + match dest.write(&content) { + Ok(_) => {}, + Err(e) => panic!("IO Error: Couldn't save file: {:?}", e) + } } }; db::add_file(&message, path, file_hash).await?; } - Err(e) => warn!("Couldn't get file: {}", e), + Err(e) => error!("Couldn't get file: {}", e), } } }; @@ -113,12 +112,15 @@ pub(crate) async fn stemming(message: &Message) -> Result, errors::E .0 { Some(line) => { - let mut v: Vec = serde_json::from_str(line.as_str())?; + let v: Vec = match serde_json::from_str(line.as_str()) { + Ok(val) => val, + Err(_) => return Ok(vec![]), + }; for i in v { words.push(i["analysis"][0]["lex"].to_string().replace("\"", "")); } words.retain(|x| x != "null"); - info!("{:?}", words); + //debug!("Parsed words: {}.", words.join(", ")); } None => return Ok(vec![]), };