From d348989afba757c20cd78fa43848ea63a7eae979 Mon Sep 17 00:00:00 2001 From: AB Date: Thu, 26 Dec 2019 20:18:40 +0000 Subject: [PATCH] major UI improvement. --- bot_keys.py | 302 ++++++++++++++++++++++++++++++++-------------------- main.py | 6 +- reqs.txt | 9 +- 3 files changed, 195 insertions(+), 122 deletions(-) diff --git a/bot_keys.py b/bot_keys.py index f16f8f5..7d37091 100644 --- a/bot_keys.py +++ b/bot_keys.py @@ -13,6 +13,9 @@ from telegram import * from telegram.ext import Updater, CommandHandler, CallbackQueryHandler from telegram.error import NetworkError, Unauthorized from time import sleep +from functools import wraps + +from pprint import pprint try: TOKEN = os.environ["TG_TOKEN"] except: @@ -24,146 +27,217 @@ try: except: from main import Perdoliq -update_id = None +logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + level=logging.INFO) +LOG = logging.getLogger(__name__) +auth = dict() + +cached_tests = dict() def main(): - """Run the bot.""" - global update_id - # Telegram Bot Authorization Token - bot = telegram.Bot(TOKEN) + """Run bot.""" + updater = Updater(TOKEN, use_context=True) + dp = updater.dispatcher + dp.add_handler(CallbackQueryHandler(button_handler)) + dp.add_handler(CommandHandler("login", login, + pass_args=True, + pass_job_queue=True, + pass_chat_data=True)) + dp.add_handler(CommandHandler("list", list_, + pass_args=True, + pass_job_queue=True, + pass_chat_data=True)) + # log all errors + dp.add_error_handler(error) + updater.start_polling() + updater.idle() - # get the first pending update_id, - # this is so we can skip over it in case - # we get an "Unauthorized" exception. +def error(update, context): + """Log Errors caused by Updates.""" + LOG.warning('Update "%s" caused error "%s"', update, context.error) + +def is_authorized(id_): + def decorator(f): + @wraps(f) + def wrapped(*args, **kwargs): + if args[0].effective_user.id not in auth: + LOG.info( + "User %s (%s) is new user. Redirecting to login.", + args[0].effective_user.first_name, + args[0].effective_user.id) + return login_first(*args, **kwargs) + ret = f(*args, **kwargs) + return ret + return wrapped + return decorator + +def login_first(update, context): + msg = "Необходимо авторизоваться. "\ + "Авторизация будет сохранена "\ + "до момента перезагрузки бота. Пример: /login " try: - update_id = bot.get_updates()[0].update_id - except IndexError: - update_id = None + update.message.reply_text(msg) + except: + query = update.callback_query + query.edit_message_text(msg) + LOG.info("Going to login") - logging.basicConfig(level=logging.DEBUG) - while True: - try: - handle_update(bot) - except NetworkError: - sleep(1) - except Unauthorized: - # The user has removed or blocked the bot. - update_id += 1 +def login(update, context): + user_id = update.effective_user.id + message = update.message.text.split() + if len(message) == 3: + login = message[1] + passwd = message[2] + auth[user_id] = dict() + auth[user_id]["login"] = login + auth[user_id]["passwd"] = passwd + update.message.reply_text("Учетные данные сохранены.") + elif len(message) == 2 and message[1] == 'status': + if user_id in auth: + update.message.reply_text( + "Вы авторизованы с данными: {} : {}***\n"\ + "Для обновления данных авторизуйтесь повторно.".format( + auth[user_id]["login"], + auth[user_id]["passwd"][0:2])) + else: + update.message.reply_text("Вы еще не авторизованы.") + else: + update.message.reply_text("Запрет! Попытка взлома! Делай так: "\ + "/login или /login status") -def perdoliq(username, password, subj, test, acc, is_delayed): +def perdoliq(username, password, subj, test, acc, submit=True, is_delayed=False): try: app = Perdoliq(username, password) app.auth() - app.resolve(subj, test, acc, is_delayed=int(is_delayed)) + app.resolve(subj, test, acc, submit, is_delayed) except Exception as e: return "Exception: " + str(e) - def list_test(username, password): + if username in cached_tests: + return cached_tests[username] try: app = Perdoliq(username, password) app.auth() - return (app.get_tests()) + tests = app.get_tests() + cached_tests[username] = tests + return tests except Exception as e: return "Exception: " + str(e) +@is_authorized('list_') +def list_(update, context): + query = update.callback_query + user_id = update.effective_user.id + tests = list_test(auth[user_id]["login"], auth[user_id]["passwd"]) + print(tests) + keyboard = list() + i = 1 + for subj in tests: + tests_count = len(tests[subj]) + #msg = msg + (" [%s] %s (%s tests)\n" % (i, subj, tests_count)) + keyboard.append([InlineKeyboardButton(f"{i}. {subj}", callback_data=f"s_{i}")]) + i += 1 + keyboard.append([InlineKeyboardButton("Close", callback_data="subj")]) + reply_markup = InlineKeyboardMarkup(keyboard) + update.message.reply_text('Чо будем хакать?', reply_markup=reply_markup) -def handle_update(bot): - """Echo the message the user sent.""" - global update_id - # Request updates after the last update_id - for update in bot.get_updates(offset=update_id, timeout=10): - update_id = update.update_id + 1 - - if update.message: - # if there is and update, process it in threads - t = threading.Thread(target=do_action, args=(update,)) - t.start() - -def do_action(update): - try: - s = update.message.text.split() - except: - return 1 - if s[0] == '/resolve': - if len(s) != 7: - update.message.reply_markdown("Missing operand... Try again") - update.message.reply_markdown("Usage: */resolve "\ - " "\ - " *") - return False - - msg = "Please wait. If you have chosen commit=1, so test "\ - "going to be resolved in about 20 minutes and will "\ - "be commited automatically, otherwise it will take "\ - "about a 2 minutes and you have to "\ - "commit it by yourself. Just wait. PS you have "\ - "chosen subj %s "\ - "test %s and accuracy %s" % (s[3], s[4], s[5]) - update.message.reply_text(msg) - update.message.reply_text("You cannot resolve more than "\ - "one test in the same time."\ - "Одновременно решать более одного теста невозможно,"\ - " вы испортите результаты обоих тестов.") - perdoliq(s[1], s[2], s[3], s[4], s[5], s[6]) - update.message.reply_text("It's done. Check your test because "\ - "i disclaim any responsibility.") - elif s[0] == '/list': - try: - if len(s) == 3: - update.message.reply_text("Fetching subjects...") - sleep(1) - tests = list_test(s[1], s[2]) - msg = "Here is an available subjects:\n```" - i = 0 - for subj in tests: - tests_count = len(tests[subj]) - msg = msg + (" [%s] %s (%s tests)\n" % (i, subj, tests_count)) - i += 1 -# j = 0 -# for test in tests[subj]: -# msg = msg + (" [%s] %s\n" % (j, test)) -# j += 1 - update.message.reply_markdown(msg + "```\n Pay attention to "\ - "numbers in brackets \[..] *Here is subj or test numbers*") - elif len(s) == 4: - update.message.reply_text("Fetching tests...") - sleep(1) - tests = list_test(s[1], s[2]) - i = int(s[3]) - msg = "Here is an available tests:\n" - j = 0 - for subj in tests: - if j == i: - msg = msg + ("``` *** %s ***\n----------\n" % subj) - k = 0 - for test in tests[subj]: - msg = msg + ("[%s] %s\n" % (k, test)) - k += 1 - j += 1 - update.message.reply_markdown(msg + "```\n Pay attention to "\ - "numbers in brackets \[..] *Here is subj or test numbers*") +@is_authorized('button_handler') +def button_handler(update, context): + query = update.callback_query + user_id = update.effective_user.id + tests = list_test(auth[user_id]["login"], auth[user_id]["passwd"]) + if query.data.split('_')[0] == 'close': + query.edit_message_text('Пака братишка') + if query.data.split('_')[0] == 's': # subj + data = query.data + subj_num = query.data.split('_')[1] + keyboard = list() + subj_name = None + j = 1 # subj number + i = 1 # test number + for subj in tests: + LOG.debug("Checking: %s. %s is %s", subj, j, subj_num) + if j != int(subj_num): + j += 1 + continue else: - update.message.reply_markdown("Usage: */list "\ - " *") - return False - except: - update.message.reply_markdown("Usage: */list "\ - " *") - return False - else: - update.message.reply_markdown("Possible commands: */resolve, /list*") - update.message.reply_markdown("Usage: */resolve "\ - " "\ - " *") - update.message.reply_markdown("Usage: */list "\ - "*") + LOG.info("Found needed subj: %s", subj) + subj_name = subj + for test in tests[subj]: + LOG.info("Found test: %s", test) + keyboard.append([ + InlineKeyboardButton(f"{i}. {test}", + callback_data=f"t_{i}_{data}")]) + i += 1 + break + keyboard.append([InlineKeyboardButton("Закрыть", callback_data=f"close")]) + reply_markup = InlineKeyboardMarkup(keyboard) + query.edit_message_text( + f"Ok, решаем {subj_name}, а какой тест?", + reply_markup=reply_markup) + if query.data.split('_')[0] == 't': # test + data = query.data + keyboard = [[], []] + for p in range(0, 51, 5): + keyboard[0].append(InlineKeyboardButton(f"{p}", callback_data=f"p_{p}_{data}")) + for p in range(50, 101, 5): + keyboard[1].append(InlineKeyboardButton(f"{p}", callback_data=f"p_{p}_{data}")) + keyboard.append([InlineKeyboardButton("Закрыть", callback_data=f"close")]) + reply_markup = InlineKeyboardMarkup(keyboard) + query.edit_message_text("Ok, а какая точность?", reply_markup=reply_markup) + + if query.data.split('_')[0] == 'm': # autosubmit + data = query.data + keyboard = [[ + InlineKeyboardButton(f"Быстро", callback_data=f"d_0_{data}"), + InlineKeyboardButton(f"С задержками", callback_data=f"d_1_{data}"),]] + keyboard.append([InlineKeyboardButton("Закрыть", callback_data=f"close")]) + reply_markup = InlineKeyboardMarkup(keyboard) + query.edit_message_text( + "Решить тест быстро или с задержками, типа я думал над ответом.", + reply_markup=reply_markup) + + if query.data.split('_')[0] == 'p': # precision + data = query.data + keyboard = [[ + InlineKeyboardButton(f"Завершить", callback_data=f"m_1_{data}"), + InlineKeyboardButton(f"Не завершать", callback_data=f"m_0_{data}"),]] + keyboard.append([InlineKeyboardButton("Close", callback_data=f"close")]) + reply_markup = InlineKeyboardMarkup(keyboard) + query.edit_message_text( + "Завершить тест автоматически после решения или оставить для проверки?", + reply_markup=reply_markup) + + if query.data.split('_')[0] == 'd': # is_delayed + data = query.data + LOG.info(data) + # d_0_m_1_p_100_t_1_s_10 + subj = query.data.split('_')[9] + test = query.data.split('_')[7] + precision = query.data.split('_')[5] + submit = bool(int(query.data.split('_')[3])) + delay = bool(int(query.data.split('_')[1])) + query.edit_message_text( + "Опять работа... "\ + "Пока я решаю тест ЗАПРЕЩЕНО "\ + "пользоваться сайтом с тестами. "\ + "Резултат будет испорчен.") + perdoliq( + auth[user_id]["login"], + auth[user_id]["passwd"], + int(subj)-1, + int(test)-1, + precision, + submit, + delay) if __name__ == '__main__': try: main() - except: + except Exception as e: + LOG.error("%s", e) pass diff --git a/main.py b/main.py index c04745d..90e1fe1 100644 --- a/main.py +++ b/main.py @@ -224,8 +224,10 @@ class Perdoliq: data=settings.scam_data_10, cookies={'ASP.NET_SessionId': self.SessionId}) - def resolve(self, subj, test, accuracy, is_delayed=False): + def resolve(self, subj, test, accuracy=100, submit=False, is_delayed=False): # renew auth + logging.info( + f"Resolving subj {subj}, test {test}, acc {accuracy}, submit {submit}, delay {is_delayed}") self.auth() # get count of questions q_count = self.start_test(subj, test) @@ -256,5 +258,5 @@ class Perdoliq: self.answer(i, prediction) # make autocommit - if is_delayed == True: + if submit == True: self.finish_test() diff --git a/reqs.txt b/reqs.txt index d667be2..bfebfa5 100644 --- a/reqs.txt +++ b/reqs.txt @@ -1,7 +1,4 @@ -# python3 only -pip install requests - -pip install beautifulsoup4 - -pip install python-telegram-bot +requests +beautifulsoup4 +python-telegram-bot