Improve hostname detection. Add /list and /del commands.

This commit is contained in:
root
2021-05-07 02:15:06 +03:00
parent fc57851c90
commit 5bf83e3ce0
3 changed files with 96 additions and 16 deletions

3
.gitignore vendored
View File

@ -4,3 +4,6 @@ server.key
server.pub
wg0.conf
wg0.conf_old
*.swp
*.swo
hostname

50
bot.py
View File

@ -5,6 +5,8 @@ from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import Updater, MessageHandler, CommandHandler, filters, CallbackQueryHandler, CallbackContext
from gen import add_peer as wg_add_peer
from gen import update_configs
from gen import list_peers as wg_list_peers
from gen import del_peer as wg_del_peer
logging.basicConfig(
@ -18,25 +20,63 @@ if not token or not admin:
sys.exit(1)
def _help(update, context):
update.message.reply_text('<b>Help:</b>\n <b>*</b> /add <i>peer name</i>', parse_mode='HTML', disable_web_page_preview=True)
update.message.reply_text('<b>Help:</b>\n <b>*</b> /add <i>peer name</i>\n <b>*</b> /del <i>peer name</i>\n <b>*</b> /list [<i>peer name</i>]', parse_mode='HTML', disable_web_page_preview=True)
def add_peer(update, context):
def auth(handler):
def wrapper(update, context):
if update.message.chat.username != admin:
update.message.reply_text('You are not allowed to do that.', parse_mode='HTML', disable_web_page_preview=True)
update.message.reply_text(
'You are not allowed to do that.', parse_mode='HTML', disable_web_page_preview=True)
return False
handler(update, context)
return wrapper
@auth
def list_peers(update, context):
if len(update.message.text.split()) == 1:
n = 1
message = "Peers:\n<code>"
for peer in wg_list_peers():
message += f"{n} * {peer['ip']}: {peer['name']}\n"
n += 1
update.message.reply_text(f"{message}</code>", parse_mode='HTML', disable_web_page_preview=True)
else:
peer_name = "_".join(update.message.text.split()[1:])
try:
update.message.reply_photo(
open(f'clients/{peer_name}-qr.png', 'rb'), filename=f'{peer_name} QR.png', quote=True, caption=open(f'clients/{peer_name}.conf', 'r').read())
except:
update.message.reply_text("Wrong client name.")
@auth
def del_peer(update, context):
if len(update.message.text.split()) < 2:
update.message.reply_text('Wrong usage.\n<b>Help:</b>\n <b>*</b> /add <i>peer name</i>', parse_mode='HTML', disable_web_page_preview=True)
_help(update, context)
return False
peer_name = "_".join(update.message.text.split()[1:])
log.info("Deleting peer %s", peer_name)
wg_del_peer(peer_name)
update.message.reply_text("Done.")
@auth
def add_peer(update, context):
if len(update.message.text.split()) < 2:
_help(update, context)
return False
log.info(update.message.chat.username)
peer_name = "_".join(update.message.text.split()[1:])
log.info("Creating peer %s", peer_name)
wg_add_peer(peer_name)
update.message.reply_photo(open(f'clients/{peer_name}-qr.png', 'rb'), filename=f'{peer_name} QR.png', quote=True, caption=open(f'clients/{peer_name}.conf', 'r').read())
def error(update, context):
update.message.reply_text("Something went wrong...")
def main():
updater = Updater(token, use_context=True)
updater.dispatcher.add_error_handler(error)
updater.dispatcher.add_handler(CommandHandler('add', add_peer))
updater.dispatcher.add_handler(CommandHandler('list', list_peers))
updater.dispatcher.add_handler(CommandHandler('del', del_peer))
updater.dispatcher.add_handler(MessageHandler(filters.Filters.text, _help))
updater.start_polling()
updater.idle()

55
gen.py
View File

@ -1,29 +1,37 @@
import wgconfig # default iniparser cannot read WG configs.
import logging
import json
import ipaddress
import argparse
from socket import getfqdn
from os import system
from base64 import b64encode, b64decode
from nacl.public import PrivateKey
from ipaddress import ip_address
from datetime import date
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger('generator')
# Create the parser
my_parser = argparse.ArgumentParser()
# Add the arguments
my_parser.add_argument('--update', action='store_true', default=False)
my_parser.add_argument('--peer', action='store', type=str)
my_parser.add_argument('--delete', action='store', type=str)
ips = "0.0.0.0/5, 8.0.0.0/7, 10.150.200.0/24, 11.0.0.0/8, 12.0.0.0/6, 16.0.0.0/4, 32.0.0.0/3, 64.0.0.0/2, 128.0.0.0/3, 160.0.0.0/5, 168.0.0.0/6, 172.0.0.0/12, 172.32.0.0/11, 172.64.0.0/10, 172.128.0.0/9, 173.0.0.0/8, 174.0.0.0/7, 176.0.0.0/4, 192.0.0.0/9, 192.128.0.0/11, 192.160.0.0/13, 192.169.0.0/16, 192.170.0.0/15, 192.172.0.0/14, 192.176.0.0/12, 192.192.0.0/10, 193.0.0.0/8, 194.0.0.0/7, 196.0.0.0/6, 200.0.0.0/5, 208.0.0.0/4"
# Execute the parse_args() method
args = my_parser.parse_args()
peer_name = args.peer
del_name = args.delete
is_update = args.update
class Peer:
def __init__(self, peer=None, allowed_ips=None, comment='None'):
self.comment = comment
@ -64,7 +72,7 @@ class Peer:
def gen_config(self, helper):
# if not self.managed:
# print(f"Unmanaged peer {self.comment}. Can't generate config.")
# log.info(f"Unmanaged peer {self.comment}. Can't generate config.")
# return False
filename = f"clients/{self.comment.replace(' ', '_')}"
_wg = wgconfig.WGConfig(f"{filename}.conf")
@ -79,17 +87,16 @@ class Peer:
_wg.write_file()
system(f'qrencode -r {filename}.conf -o {filename}-qr.png')
system(f'qrencode -t ansiutf8 -r {filename}.conf -o {filename}-qr.txt')
print(f"Updated config for {self.comment}")
log.info(f"Updated config for {self.comment}")
class Helper:
def __init__(
self,
cfg_path="/etc/wireguard/wg0.conf",
server_addr="vpn.hexor.ru:51820",
dns="8.8.8.8"):
self.cfg_path = cfg_path
self.server_addr = server_addr
self.server_addr = self.hostname
self.dns = dns
self.wg = wgconfig.WGConfig(cfg_path)
self.wg.read_file()
@ -125,6 +132,16 @@ class Helper:
ip_list.sort()
return ip_list
@property
def hostname(self):
try:
f = open('hostname', 'rb')
hostname = f.read().decode('utf-8').strip()
except OSError:
hostname = getfqdn()
return hostname
@property
def next_ip(self):
"""Return next free IP"""
@ -138,17 +155,32 @@ class Helper:
self.wg.add_attr(cl.pub_key, 'AllowedIPs', f"{self.ip_list[-1]+1}/32")
cl.gen_config(self)
def del_peer(self, name):
"""Delete given peer"""
try:
pub_key = list(filter(lambda peer: peer['name'] == name, list_peers()))[0]['pub_key']
except:
log.info("Couldn't find peer.")
return False
self.wg.del_peer(pub_key)
def add_peer(peer_name):
print('Generate a new peer config.')
log.info('Generate a new peer config.')
helper = Helper()
helper.add_peer(peer_name)
helper.wg.write_file()
system('systemctl restart wg-quick@wg0.service')
def update_configs():
print("Update all clients configs.")
def del_peer(peer_name):
log.info(f'Remove given peer {peer_name}.')
helper = Helper()
for peer in helper.peer_list:
helper.del_peer(peer_name)
helper.wg.write_file()
system('systemctl restart wg-quick@wg0.service')
def update_configs():
log.info("Update all clients configs.")
for peer in Helper().peer_list:
peer.gen_config(Helper())
if not is_update and peer_name:
@ -157,3 +189,8 @@ if not is_update and peer_name:
if is_update:
update_configs()
def list_peers():
return [{'name': p.comment, 'ip': p.allowed_ips, 'pub_key': p.pub_key} for p in Helper().peer_list]
if del_name:
del_peer(del_name)