mirror of
https://github.com/house-of-vanity/fesmoo_perdoliq.git
synced 2025-07-06 05:44:06 +00:00
major UI improvement.
This commit is contained in:
302
bot_keys.py
302
bot_keys.py
@ -13,6 +13,9 @@ from telegram import *
|
||||
from telegram.ext import Updater, CommandHandler, CallbackQueryHandler
|
||||
from telegram.error import NetworkError, Unauthorized
|
||||
from time import sleep
|
||||
from functools import wraps
|
||||
|
||||
from pprint import pprint
|
||||
try:
|
||||
TOKEN = os.environ["TG_TOKEN"]
|
||||
except:
|
||||
@ -24,146 +27,217 @@ try:
|
||||
except:
|
||||
from main import Perdoliq
|
||||
|
||||
update_id = None
|
||||
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
level=logging.INFO)
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
auth = dict()
|
||||
|
||||
cached_tests = dict()
|
||||
|
||||
def main():
|
||||
"""Run the bot."""
|
||||
global update_id
|
||||
# Telegram Bot Authorization Token
|
||||
bot = telegram.Bot(TOKEN)
|
||||
"""Run bot."""
|
||||
updater = Updater(TOKEN, use_context=True)
|
||||
dp = updater.dispatcher
|
||||
dp.add_handler(CallbackQueryHandler(button_handler))
|
||||
dp.add_handler(CommandHandler("login", login,
|
||||
pass_args=True,
|
||||
pass_job_queue=True,
|
||||
pass_chat_data=True))
|
||||
dp.add_handler(CommandHandler("list", list_,
|
||||
pass_args=True,
|
||||
pass_job_queue=True,
|
||||
pass_chat_data=True))
|
||||
# log all errors
|
||||
dp.add_error_handler(error)
|
||||
updater.start_polling()
|
||||
updater.idle()
|
||||
|
||||
# get the first pending update_id,
|
||||
# this is so we can skip over it in case
|
||||
# we get an "Unauthorized" exception.
|
||||
def error(update, context):
|
||||
"""Log Errors caused by Updates."""
|
||||
LOG.warning('Update "%s" caused error "%s"', update, context.error)
|
||||
|
||||
def is_authorized(id_):
|
||||
def decorator(f):
|
||||
@wraps(f)
|
||||
def wrapped(*args, **kwargs):
|
||||
if args[0].effective_user.id not in auth:
|
||||
LOG.info(
|
||||
"User %s (%s) is new user. Redirecting to login.",
|
||||
args[0].effective_user.first_name,
|
||||
args[0].effective_user.id)
|
||||
return login_first(*args, **kwargs)
|
||||
ret = f(*args, **kwargs)
|
||||
return ret
|
||||
return wrapped
|
||||
return decorator
|
||||
|
||||
def login_first(update, context):
|
||||
msg = "Необходимо авторизоваться. "\
|
||||
"Авторизация будет сохранена "\
|
||||
"до момента перезагрузки бота. Пример: /login <user> <pass>"
|
||||
try:
|
||||
update_id = bot.get_updates()[0].update_id
|
||||
except IndexError:
|
||||
update_id = None
|
||||
update.message.reply_text(msg)
|
||||
except:
|
||||
query = update.callback_query
|
||||
query.edit_message_text(msg)
|
||||
LOG.info("Going to login")
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
while True:
|
||||
try:
|
||||
handle_update(bot)
|
||||
except NetworkError:
|
||||
sleep(1)
|
||||
except Unauthorized:
|
||||
# The user has removed or blocked the bot.
|
||||
update_id += 1
|
||||
def login(update, context):
|
||||
user_id = update.effective_user.id
|
||||
message = update.message.text.split()
|
||||
if len(message) == 3:
|
||||
login = message[1]
|
||||
passwd = message[2]
|
||||
auth[user_id] = dict()
|
||||
auth[user_id]["login"] = login
|
||||
auth[user_id]["passwd"] = passwd
|
||||
update.message.reply_text("Учетные данные сохранены.")
|
||||
elif len(message) == 2 and message[1] == 'status':
|
||||
if user_id in auth:
|
||||
update.message.reply_text(
|
||||
"Вы авторизованы с данными: {} : {}***\n"\
|
||||
"Для обновления данных авторизуйтесь повторно.".format(
|
||||
auth[user_id]["login"],
|
||||
auth[user_id]["passwd"][0:2]))
|
||||
else:
|
||||
update.message.reply_text("Вы еще не авторизованы.")
|
||||
else:
|
||||
update.message.reply_text("Запрет! Попытка взлома! Делай так: "\
|
||||
"/login <user> <pass> или /login status")
|
||||
|
||||
|
||||
def perdoliq(username, password, subj, test, acc, is_delayed):
|
||||
def perdoliq(username, password, subj, test, acc, submit=True, is_delayed=False):
|
||||
try:
|
||||
app = Perdoliq(username, password)
|
||||
app.auth()
|
||||
app.resolve(subj, test, acc, is_delayed=int(is_delayed))
|
||||
app.resolve(subj, test, acc, submit, is_delayed)
|
||||
except Exception as e:
|
||||
return "Exception: " + str(e)
|
||||
|
||||
|
||||
def list_test(username, password):
|
||||
if username in cached_tests:
|
||||
return cached_tests[username]
|
||||
try:
|
||||
app = Perdoliq(username, password)
|
||||
app.auth()
|
||||
return (app.get_tests())
|
||||
tests = app.get_tests()
|
||||
cached_tests[username] = tests
|
||||
return tests
|
||||
except Exception as e:
|
||||
return "Exception: " + str(e)
|
||||
|
||||
@is_authorized('list_')
|
||||
def list_(update, context):
|
||||
query = update.callback_query
|
||||
user_id = update.effective_user.id
|
||||
tests = list_test(auth[user_id]["login"], auth[user_id]["passwd"])
|
||||
print(tests)
|
||||
keyboard = list()
|
||||
i = 1
|
||||
for subj in tests:
|
||||
tests_count = len(tests[subj])
|
||||
#msg = msg + (" [%s] %s (%s tests)\n" % (i, subj, tests_count))
|
||||
keyboard.append([InlineKeyboardButton(f"{i}. {subj}", callback_data=f"s_{i}")])
|
||||
i += 1
|
||||
keyboard.append([InlineKeyboardButton("Close", callback_data="subj")])
|
||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||
update.message.reply_text('Чо будем хакать?', reply_markup=reply_markup)
|
||||
|
||||
def handle_update(bot):
|
||||
"""Echo the message the user sent."""
|
||||
global update_id
|
||||
# Request updates after the last update_id
|
||||
for update in bot.get_updates(offset=update_id, timeout=10):
|
||||
update_id = update.update_id + 1
|
||||
|
||||
if update.message:
|
||||
# if there is and update, process it in threads
|
||||
t = threading.Thread(target=do_action, args=(update,))
|
||||
t.start()
|
||||
|
||||
def do_action(update):
|
||||
try:
|
||||
s = update.message.text.split()
|
||||
except:
|
||||
return 1
|
||||
if s[0] == '/resolve':
|
||||
if len(s) != 7:
|
||||
update.message.reply_markdown("Missing operand... Try again")
|
||||
update.message.reply_markdown("Usage: */resolve <user[text]> "\
|
||||
"<pass[text]> <subj[int]> <test[int]> "\
|
||||
"<accuracy[0-100]> <commit[1/0]>*")
|
||||
return False
|
||||
|
||||
msg = "Please wait. If you have chosen commit=1, so test "\
|
||||
"going to be resolved in about 20 minutes and will "\
|
||||
"be commited automatically, otherwise it will take "\
|
||||
"about a 2 minutes and you have to "\
|
||||
"commit it by yourself. Just wait. PS you have "\
|
||||
"chosen subj %s "\
|
||||
"test %s and accuracy %s" % (s[3], s[4], s[5])
|
||||
update.message.reply_text(msg)
|
||||
update.message.reply_text("You cannot resolve more than "\
|
||||
"one test in the same time."\
|
||||
"Одновременно решать более одного теста невозможно,"\
|
||||
" вы испортите результаты обоих тестов.")
|
||||
perdoliq(s[1], s[2], s[3], s[4], s[5], s[6])
|
||||
update.message.reply_text("It's done. Check your test because "\
|
||||
"i disclaim any responsibility.")
|
||||
elif s[0] == '/list':
|
||||
try:
|
||||
if len(s) == 3:
|
||||
update.message.reply_text("Fetching subjects...")
|
||||
sleep(1)
|
||||
tests = list_test(s[1], s[2])
|
||||
msg = "Here is an available subjects:\n```"
|
||||
i = 0
|
||||
for subj in tests:
|
||||
tests_count = len(tests[subj])
|
||||
msg = msg + (" [%s] %s (%s tests)\n" % (i, subj, tests_count))
|
||||
i += 1
|
||||
# j = 0
|
||||
# for test in tests[subj]:
|
||||
# msg = msg + (" [%s] %s\n" % (j, test))
|
||||
# j += 1
|
||||
update.message.reply_markdown(msg + "```\n Pay attention to "\
|
||||
"numbers in brackets \[..] *Here is subj or test numbers*")
|
||||
elif len(s) == 4:
|
||||
update.message.reply_text("Fetching tests...")
|
||||
sleep(1)
|
||||
tests = list_test(s[1], s[2])
|
||||
i = int(s[3])
|
||||
msg = "Here is an available tests:\n"
|
||||
j = 0
|
||||
for subj in tests:
|
||||
if j == i:
|
||||
msg = msg + ("``` *** %s ***\n----------\n" % subj)
|
||||
k = 0
|
||||
for test in tests[subj]:
|
||||
msg = msg + ("[%s] %s\n" % (k, test))
|
||||
k += 1
|
||||
j += 1
|
||||
update.message.reply_markdown(msg + "```\n Pay attention to "\
|
||||
"numbers in brackets \[..] *Here is subj or test numbers*")
|
||||
@is_authorized('button_handler')
|
||||
def button_handler(update, context):
|
||||
query = update.callback_query
|
||||
user_id = update.effective_user.id
|
||||
tests = list_test(auth[user_id]["login"], auth[user_id]["passwd"])
|
||||
if query.data.split('_')[0] == 'close':
|
||||
query.edit_message_text('Пака братишка')
|
||||
if query.data.split('_')[0] == 's': # subj
|
||||
data = query.data
|
||||
subj_num = query.data.split('_')[1]
|
||||
keyboard = list()
|
||||
subj_name = None
|
||||
j = 1 # subj number
|
||||
i = 1 # test number
|
||||
for subj in tests:
|
||||
LOG.debug("Checking: %s. %s is %s", subj, j, subj_num)
|
||||
if j != int(subj_num):
|
||||
j += 1
|
||||
continue
|
||||
else:
|
||||
update.message.reply_markdown("Usage: */list <user[text]>"\
|
||||
" <pass[text]>*")
|
||||
return False
|
||||
except:
|
||||
update.message.reply_markdown("Usage: */list <user[text]>"\
|
||||
" <pass[text]>*")
|
||||
return False
|
||||
else:
|
||||
update.message.reply_markdown("Possible commands: */resolve, /list*")
|
||||
update.message.reply_markdown("Usage: */resolve <user[text]> "\
|
||||
"<pass[text]> <subj[int]> <test[int]> "\
|
||||
"<accuracy[0-100]> <commit[1/0]>*")
|
||||
update.message.reply_markdown("Usage: */list <user[text]> "\
|
||||
"<pass[text]>*")
|
||||
LOG.info("Found needed subj: %s", subj)
|
||||
subj_name = subj
|
||||
for test in tests[subj]:
|
||||
LOG.info("Found test: %s", test)
|
||||
keyboard.append([
|
||||
InlineKeyboardButton(f"{i}. {test}",
|
||||
callback_data=f"t_{i}_{data}")])
|
||||
i += 1
|
||||
break
|
||||
keyboard.append([InlineKeyboardButton("Закрыть", callback_data=f"close")])
|
||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||
query.edit_message_text(
|
||||
f"Ok, решаем {subj_name}, а какой тест?",
|
||||
reply_markup=reply_markup)
|
||||
|
||||
if query.data.split('_')[0] == 't': # test
|
||||
data = query.data
|
||||
keyboard = [[], []]
|
||||
for p in range(0, 51, 5):
|
||||
keyboard[0].append(InlineKeyboardButton(f"{p}", callback_data=f"p_{p}_{data}"))
|
||||
for p in range(50, 101, 5):
|
||||
keyboard[1].append(InlineKeyboardButton(f"{p}", callback_data=f"p_{p}_{data}"))
|
||||
keyboard.append([InlineKeyboardButton("Закрыть", callback_data=f"close")])
|
||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||
query.edit_message_text("Ok, а какая точность?", reply_markup=reply_markup)
|
||||
|
||||
if query.data.split('_')[0] == 'm': # autosubmit
|
||||
data = query.data
|
||||
keyboard = [[
|
||||
InlineKeyboardButton(f"Быстро", callback_data=f"d_0_{data}"),
|
||||
InlineKeyboardButton(f"С задержками", callback_data=f"d_1_{data}"),]]
|
||||
keyboard.append([InlineKeyboardButton("Закрыть", callback_data=f"close")])
|
||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||
query.edit_message_text(
|
||||
"Решить тест быстро или с задержками, типа я думал над ответом.",
|
||||
reply_markup=reply_markup)
|
||||
|
||||
if query.data.split('_')[0] == 'p': # precision
|
||||
data = query.data
|
||||
keyboard = [[
|
||||
InlineKeyboardButton(f"Завершить", callback_data=f"m_1_{data}"),
|
||||
InlineKeyboardButton(f"Не завершать", callback_data=f"m_0_{data}"),]]
|
||||
keyboard.append([InlineKeyboardButton("Close", callback_data=f"close")])
|
||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||
query.edit_message_text(
|
||||
"Завершить тест автоматически после решения или оставить для проверки?",
|
||||
reply_markup=reply_markup)
|
||||
|
||||
if query.data.split('_')[0] == 'd': # is_delayed
|
||||
data = query.data
|
||||
LOG.info(data)
|
||||
# d_0_m_1_p_100_t_1_s_10
|
||||
subj = query.data.split('_')[9]
|
||||
test = query.data.split('_')[7]
|
||||
precision = query.data.split('_')[5]
|
||||
submit = bool(int(query.data.split('_')[3]))
|
||||
delay = bool(int(query.data.split('_')[1]))
|
||||
query.edit_message_text(
|
||||
"Опять работа... "\
|
||||
"Пока я решаю тест ЗАПРЕЩЕНО "\
|
||||
"пользоваться сайтом с тестами. "\
|
||||
"Резултат будет испорчен.")
|
||||
perdoliq(
|
||||
auth[user_id]["login"],
|
||||
auth[user_id]["passwd"],
|
||||
int(subj)-1,
|
||||
int(test)-1,
|
||||
precision,
|
||||
submit,
|
||||
delay)
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
main()
|
||||
except:
|
||||
except Exception as e:
|
||||
LOG.error("%s", e)
|
||||
pass
|
||||
|
6
main.py
6
main.py
@ -224,8 +224,10 @@ class Perdoliq:
|
||||
data=settings.scam_data_10,
|
||||
cookies={'ASP.NET_SessionId': self.SessionId})
|
||||
|
||||
def resolve(self, subj, test, accuracy, is_delayed=False):
|
||||
def resolve(self, subj, test, accuracy=100, submit=False, is_delayed=False):
|
||||
# renew auth
|
||||
logging.info(
|
||||
f"Resolving subj {subj}, test {test}, acc {accuracy}, submit {submit}, delay {is_delayed}")
|
||||
self.auth()
|
||||
# get count of questions
|
||||
q_count = self.start_test(subj, test)
|
||||
@ -256,5 +258,5 @@ class Perdoliq:
|
||||
self.answer(i, prediction)
|
||||
|
||||
# make autocommit
|
||||
if is_delayed == True:
|
||||
if submit == True:
|
||||
self.finish_test()
|
||||
|
Reference in New Issue
Block a user