#!/usr/bin/env python3 # Author: 'UltraDesu ' # Home: https://github.com/house-of-vanity/Wireguard-Peer-Manager import wgconfig # default iniparser cannot read WG configs. import logging import json as _json import ipaddress import argparse import configparser from typing import TypedDict from subprocess import call, Popen, PIPE from socket import getfqdn from os import path, mkdir from base64 import b64encode, b64decode from nacl.public import PrivateKey from ipaddress import ip_address from datetime import date logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') log = logging.getLogger('generator') # Create the parser my_parser = argparse.ArgumentParser() # Add the arguments args = { "--update": { "action": "store_true", "default": False, "desc": "Regenerate all client configs" }, "--json": { "action": "store_true", "default": False, "desc": "Print all Wireguard statistics in JSON" }, "--peer": { "action": "store", "default": None, "desc": "Add new peer" }, "--delete": { "action": "store", "default": None, "desc": "Delete peer" }, "--config": { "action": "store", "default": "wg0", "desc": "Config to use, default wg0" }, } for arg in args.items(): my_parser.add_argument(arg[0], action=arg[1]['action'], default=arg[1]['default']) help_msg = "" for arg in args.items(): help_msg += f" {arg[0]}\t{arg[1]['desc']}\n" ## Reading config # Execute the parse_args() method args = my_parser.parse_args() peer_name = args.peer del_name = args.delete is_update = args.update json = args.json wpm_config = configparser.ConfigParser() client_dir = f"/etc/wireguard/clients_{args.config}" if not path.isdir(client_dir): log.info("Creating clients directory %s", client_dir) mkdir(client_dir) if wpm_config.read('wpm.conf'): ips = wpm_config['Interface'].get('allowed_ips', '0.0.0.0/0') dns = wpm_config['Interface'].get('dns', '8.8.8.8') hostname = wpm_config['Interface'].get('hostname', getfqdn()) config = args.config if args.config else (wpm_config['Interface'].get('config', 'wg0')) else: ips = '0.0.0.0/0' dns = '8.8.8.8' hostname = getfqdn() config = args.config log.debug('Using %s WG config file.', config) class WG_peer(TypedDict): preshared_key: str endpoint: str latest_handshake: int transfer_rx: int transfer_tx: int persistent_keepalive: bool allowed_ips: list class Interface(TypedDict): name: str private_key: str public_key: str listen_port: int fwmark: str peers: list started: str class Wireguard(TypedDict): interfaces: dict wg_state = Wireguard({}) def wg_if_status(config="all"): cmd = ["/usr/bin/wg", "show", config, "dump"] proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True ) stdout, stderr = proc.communicate() for v in stdout.split('\n'): args = v.split('\t') if len(args) == 5: interface = Interface( name=args[0], private_key=args[1], public_key=args[2], listen_port=args[3], fwmark=args[4], started=None, peers=[]) wg_state[interface['name']] = interface elif len(args) == 9: allowed_ips = args[4].replace(' ', '').split(',') peer = WG_peer( preshared_key=args[1], endpoint=args[3], latest_handshake=int(args[5]), transfer_rx=int(args[6]), transfer_tx=int(args[7]), persistent_keepalive=args[8], allowed_ips=allowed_ips) wg_state[args[0]]['peers'].append(peer) elif len(args) == 4: interface = Interface( name=config, private_key=args[0], public_key=args[1], listen_port=args[2], fwmark=args[3], started=None, peers=[]) wg_state[interface['name']] = interface elif len(args) == 8: allowed_ips = args[3].replace(' ', '').split(',') peer = WG_peer( preshared_key=args[0], endpoint=args[2], latest_handshake=int(args[4]), transfer_rx=int(args[5]), transfer_tx=int(args[6]), persistent_keepalive=args[7], allowed_ips=allowed_ips) wg_state[config]['peers'].append(peer) else: pass for interface in set(wg_state): cmd = ["systemctl", "show", f"wg-quick@{interface}", "--property", "ActiveEnterTimestamp"] proc = Popen(cmd, stdout=PIPE, stderr=PIPE, universal_newlines=True ) stdout, stderr = proc.communicate() wg_state[interface]['started'] = stdout.strip().split("=")[1] return wg_state class Peer: def __init__(self, peer=None, allowed_ips=None, comment='None'): self.comment = comment self.managed = False if peer: self.pub_key = peer['PublicKey'] self.allowed_ips = peer['AllowedIPs'] try: data = list(map(str.strip, peer['_rawdata'][0].replace('#', '').split(';'))) if len(data) != 2: self.priv_key = self.generate_key() self.pub_key = self.public_key(self.priv_key) else: self.priv_key = data[0].split(':')[1].strip() self.comment = data[1].split(':')[1].strip() except: pass else: self.priv_key = self.generate_key() self.pub_key = self.public_key(self.priv_key) self.allowed_ips = allowed_ips if allowed_ips else Helper(cfg_path=config).next_ip self.full_comment = "# priv_key: " + " ; ".join([self.priv_key, "comment: " + comment]) def generate_key(self): """Generates a new private key""" private = PrivateKey.generate() return b64encode(bytes(private)).decode("ascii") def public_key(self, private_key): """Given a private key, returns the corresponding public key""" private = PrivateKey(b64decode(private_key)) return b64encode(bytes(private.public_key)).decode("ascii") def gen_config(self, helper): """Generate peer config""" filename = f"{client_dir}/{self.comment.replace(' ', '_')}" _wg = wgconfig.WGConfig(f"{filename}.conf") _wg.initialize_file() _wg.add_attr(None, 'Address', self.allowed_ips) _wg.add_attr(None, 'DNS', helper.dns) _wg.add_attr(None, 'PrivateKey', self.priv_key) _wg.add_peer(helper.server_pub_key) _wg.add_attr(helper.server_pub_key, 'AllowedIPs', f'{helper.dns}/32, {ips}') _wg.add_attr(helper.server_pub_key, 'Endpoint', f"{helper.server_addr}") _wg.add_attr(helper.server_pub_key, 'PersistentKeepalive', 10) _wg.write_file() call(f'qrencode -r {filename}.conf -o {filename}-qr.png', shell=True) call(f'qrencode -t ansiutf8 -r {filename}.conf -o {filename}-qr.txt', shell=True) log.info(f"Updated config for {filename}") class Helper: def __init__( self, cfg_path): self.cfg_path = cfg_path self.server_addr = hostname self.dns = dns self.wg = wgconfig.WGConfig(cfg_path) self.wg.read_file() @property def server_pub_key(self): """Return server public key""" return Peer().public_key(self.wg.interface['PrivateKey']) @property def peer_list(self): """Return list of WG peers""" peer_list = list() for i, v in self.wg.peers.items(): peer_list.append(Peer(peer=v)) return peer_list @property def ip_list(self): """Return list of IPs""" ip_list = list() ip_list.append(ipaddress.ip_address(Helper(cfg_path=config).wg.interface['Address'].split('/')[0])) for i, v in self.wg.peers.items(): try: ip_raw = v.get('AllowedIPs', None) if isinstance(ip_raw, str): ip = ipaddress.ip_address(ip_raw.split('/')[0]) elif isinstance(ip_raw, list): ip = ipaddress.ip_address(ip_raw[0].split('/')[0]) ip_list.append(ip) except: pass ip_list.sort() return ip_list @property def next_ip(self): """Return next free IP""" return self.ip_list[-1]+1 def add_peer(self, comment): """Generate a new peer""" cl = Peer(comment=comment) self.wg.add_peer(cl.pub_key, cl.full_comment) self.wg.add_attr(cl.pub_key, 'AllowedIPs', f"{self.ip_list[-1]+1}/32") cl.gen_config(self) def del_peer(self, name): """Delete given peer""" try: pub_key = list(filter(lambda peer: peer['name'] == name, list_peers()))[0]['pub_key'] except: log.info("Couldn't find peer.") return False self.wg.del_peer(pub_key) filename = f"{client_dir}/{name.replace(' ', '_')}" call(f"rm -f {filename}*", shell=True) def add_peer(peer_name): log.info('Generate a new peer config.') helper = Helper(cfg_path=config) helper.add_peer(peer_name) helper.wg.write_file() call(f"bash -c 'wg syncconf {config} <(wg-quick strip {config})'",shell=True) def del_peer(peer_name): log.info(f'Remove given peer {peer_name}.') helper = Helper(cfg_path=config) helper.del_peer(peer_name) helper.wg.write_file() call(f"bash -c 'wg syncconf {config} <(wg-quick strip {config})'",shell=True) def update_configs(): log.info("Update all clients configs.") for peer in Helper(cfg_path=config).peer_list: peer.gen_config(Helper(cfg_path=config)) def list_peers(): return [{'name': p.comment, 'ip': p.allowed_ips, 'pub_key': p.pub_key} for p in Helper(cfg_path=config).peer_list] if __name__ == '__main__': if del_name: del_peer(del_name) elif not is_update and peer_name: add_peer(peer_name) elif is_update: update_configs() elif json: print(wg_if_status()['wg0']['peers'][0]) else: print(help_msg)