#!/usr/bin/env python3 # Author: 'UltraDesu ' # Home: https://github.com/house-of-vanity/Wireguard-Peer-Manager import logging import os import sys import configparser from hurry.filesize import size from subprocess import call from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update from telegram.ext import Updater, MessageHandler, CommandHandler, filters, CallbackQueryHandler, CallbackContext from gen import wg_json from gen import add_peer as wg_add_peer from gen import update_configs from gen import list_peers as wg_list_peers from gen import del_peer as wg_del_peer tg_max_len = 4096 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') log = logging.getLogger(__name__) token = os.environ.get('TG_TOKEN') admin = os.environ.get('TG_ADMIN').replace('"', '').replace(' ', '').split(',') if not token or not admin: log.error("Env var TG_TOKEN or TG_ADMIN aren't set.") sys.exit(1) wpm_config = configparser.ConfigParser() if wpm_config.read('wpm.conf'): config = wpm_config['Interface'].get('config', 'wg0') else: config = "wg0" def _help(update, context): update.message.reply_text( 'Help:\n * /add peer name\n * /del peer name\n * /list [peer name]\n* /status - show status', parse_mode='HTML', disable_web_page_preview=True) def auth(handler): def wrapper(update, context): if update.message.chat.username not in admin: update.message.reply_text( 'You are not allowed to do that.', parse_mode='HTML', disable_web_page_preview=True) return False handler(update, context) return wrapper @auth def list_peers(update, context): if len(update.message.text.split()) == 1: n = 1 message = "Peers:\n" for peer in wg_list_peers(): message += f"{n} * {peer['ip']}: {peer['name']}\n" n += 1 update.message.reply_text( f"{message}", parse_mode='HTML', disable_web_page_preview=True) else: peer_name = "_".join(update.message.text.split()[1:]) if peer_name.isnumeric(): n = 1 for peer in wg_list_peers(): if int(peer_name) == n: peer_name = peer['name'] break n += 1 try: msg = open(f'/etc/wireguard/clients_{config}/{peer_name}.conf', 'r').read() update.message.reply_photo( open(f'/etc/wireguard/clients_{config}/{peer_name}-qr.png', 'rb'), parse_mode='HTML', filename=f'{peer_name} QR.png', quote=True, caption=f"Install Wireguard VPN app and scan or open config.\n{msg}") update.message.reply_document( open(f'/etc/wireguard/clients_{config}/{peer_name}.conf', 'rb')) except: update.message.reply_text("Wrong client name.") @auth def del_peer(update, context): if len(update.message.text.split()) < 2: _help(update, context) return False peer_name = "_".join(update.message.text.split()[1:]) log.info("Deleting peer %s", peer_name) wg_del_peer(peer_name) update.message.reply_text("Done.") # {'preshared_key': 'kl0iFpC0jtqPwWaGbasVyQQsUgOYS9KyH917IwJ6Izs=', 'endpoint': '(none)', 'latest_handshake': 0, 'transfer_rx': 0, 'transfer_tx': 0, 'persistent_keepalive': 'off', 'allowed_ips': ['10.150.200.14/32']} @auth def status(update, context): stat = wg_json() msg = [] for _if in stat.items(): print(_if) msg.append(f"{_if[0]}\nStarted {_if[1]['started']}") peers = {} for peer in _if[1]['peers']: peers[peer['allowed_ips'][0]] = { "tx": peer['transfer_rx'], "rx": peer['transfer_tx'], "total": peer['transfer_rx'] + peer['transfer_tx']} peers_sorted = sorted(peers.items(), key=lambda x: x[1]['total'], reverse=True) peers_sorted = list(filter(lambda x: (x[1]['total'] != 0), peers_sorted)) for peer in peers_sorted: t_msg = f" * {peer[0]}\n Total {size(peer[1]['total'])} RX: {size(peer[1]['rx'])} TX: {size(peer[1]['tx'])}" if len(t_msg + "\n".join(msg)) >= tg_max_len: msg = "\n".join(msg) update.message.reply_text(f"{msg}", parse_mode='HTML',) msg = [] msg.append(t_msg) msg.append("Clients without any activity are skipped.") msg = "\n".join(msg) update.message.reply_text(f"{msg}", parse_mode='HTML',) @auth def restart(update, context): call(f"systemctl restart wg-quick@{config}.service", shell=True) update.message.reply_text(f"Restarted {config} interface.") @auth def add_peer(update, context): if len(update.message.text.split()) < 2: _help(update, context) return False peer_name = "_".join(update.message.text.split()[1:]) log.info("Creating peer %s", peer_name) wg_add_peer(peer_name) msg = open(f'/etc/wireguard/clients_{config}/{peer_name}.conf', 'r').read() update.message.reply_photo( open(f'/etc/wireguard/clients_{config}/{peer_name}-qr.png', 'rb'), parse_mode='HTML', filename=f'{peer_name} QR.png', quote=True, caption=f"Install Wireguard VPN app and scan or open config.\n{msg}") update.message.reply_document(open(f'/etc/wireguard/clients_{config}/{peer_name}.conf', 'rb')) def error(update, context): update.message.reply_text("Something went wrong...") def main(): updater = Updater(token, use_context=True) updater.dispatcher.add_error_handler(error) updater.dispatcher.add_handler(CommandHandler('add', add_peer)) updater.dispatcher.add_handler(CommandHandler('list', list_peers)) updater.dispatcher.add_handler(CommandHandler('del', del_peer)) updater.dispatcher.add_handler(CommandHandler('restart', restart)) updater.dispatcher.add_handler(CommandHandler('status', status)) updater.dispatcher.add_handler(MessageHandler(filters.Filters.text, _help)) updater.start_polling() updater.idle() if __name__ == '__main__': log = logging.getLogger('WireGuard-GenBot') main()