Files
Wireguard-Peer-Manager/gen.py

335 lines
10 KiB
Python
Raw Normal View History

2021-05-18 15:16:31 -07:00
#!/usr/bin/env python3
# Author: 'UltraDesu <ab@hexor.ru>'
# Home: https://github.com/house-of-vanity/Wireguard-Peer-Manager
2021-05-06 17:36:18 +03:00
import wgconfig # default iniparser cannot read WG configs.
import logging
2022-06-07 15:43:57 +00:00
import json as _json
2021-05-06 17:36:18 +03:00
import ipaddress
import argparse
2021-05-08 11:18:22 +03:00
import configparser
2022-06-07 15:43:57 +00:00
from typing import TypedDict
from subprocess import call, Popen, PIPE
from socket import getfqdn
2022-06-05 23:11:08 +00:00
from os import path, mkdir
2021-05-06 17:36:18 +03:00
from base64 import b64encode, b64decode
from nacl.public import PrivateKey
from ipaddress import ip_address
from datetime import date
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger('generator')
2021-05-06 17:36:18 +03:00
# Create the parser
my_parser = argparse.ArgumentParser()
# Add the arguments
2022-06-07 15:43:57 +00:00
args = {
"--update": {
"action": "store_true",
"default": False,
"desc": "Regenerate all client configs"
},
"--json": {
"action": "store_true",
"default": False,
"desc": "Print all Wireguard statistics in JSON"
},
"--peer": {
"action": "store",
"default": None,
"desc": "Add new peer"
},
"--delete": {
"action": "store",
"default": None,
"desc": "Delete peer"
},
"--config": {
"action": "store",
"default": "wg0",
"desc": "Config to use, default wg0"
},
}
for arg in args.items():
my_parser.add_argument(arg[0], action=arg[1]['action'], default=arg[1]['default'])
help_msg = ""
for arg in args.items():
help_msg += f" {arg[0]}\t{arg[1]['desc']}\n"
2021-05-06 17:36:18 +03:00
2021-05-08 11:18:22 +03:00
## Reading config
2021-05-06 17:36:18 +03:00
# Execute the parse_args() method
args = my_parser.parse_args()
peer_name = args.peer
del_name = args.delete
2021-05-06 17:36:18 +03:00
is_update = args.update
2022-06-07 15:43:57 +00:00
json = args.json
2021-05-08 11:29:51 +03:00
wpm_config = configparser.ConfigParser()
2022-03-16 14:14:35 +03:00
client_dir = f"/etc/wireguard/clients_{args.config}"
2022-03-07 11:19:21 +03:00
if not path.isdir(client_dir):
2022-03-16 14:14:35 +03:00
log.info("Creating clients directory %s", client_dir)
2022-03-07 11:19:21 +03:00
mkdir(client_dir)
2021-05-18 15:16:31 -07:00
if wpm_config.read('wpm.conf'):
ips = wpm_config['Interface'].get('allowed_ips', '0.0.0.0/0')
2021-05-18 15:21:04 -07:00
dns = wpm_config['Interface'].get('dns', '8.8.8.8')
2021-05-18 15:16:31 -07:00
hostname = wpm_config['Interface'].get('hostname', getfqdn())
config = args.config if args.config else (wpm_config['Interface'].get('config', 'wg0'))
else:
ips = '0.0.0.0/0'
2021-05-18 15:21:04 -07:00
dns = '8.8.8.8'
2021-05-18 15:16:31 -07:00
hostname = getfqdn()
config = args.config
2022-06-07 15:43:57 +00:00
log.debug('Using %s WG config file.', config)
class WG_peer(TypedDict):
preshared_key: str
endpoint: str
latest_handshake: int
transfer_rx: int
transfer_tx: int
persistent_keepalive: bool
allowed_ips: list
class Interface(TypedDict):
name: str
private_key: str
public_key: str
listen_port: int
fwmark: str
peers: list
started: str
class Wireguard(TypedDict):
interfaces: dict
wg_state = Wireguard({})
2022-06-26 18:21:36 +03:00
def wg_if_status(config="all"):
2022-06-08 11:51:02 +03:00
cmd = ["/usr/bin/wg", "show", config, "dump"]
2022-06-07 15:43:57 +00:00
proc = Popen(cmd,
stdout=PIPE,
stderr=PIPE,
universal_newlines=True
)
stdout, stderr = proc.communicate()
for v in stdout.split('\n'):
args = v.split('\t')
if len(args) == 5:
interface = Interface(
name=args[0],
private_key=args[1],
public_key=args[2],
listen_port=args[3],
fwmark=args[4],
2022-06-08 11:51:02 +03:00
started=None,
2022-06-07 15:43:57 +00:00
peers=[])
wg_state[interface['name']] = interface
elif len(args) == 9:
allowed_ips = args[4].replace(' ', '').split(',')
peer = WG_peer(
preshared_key=args[1],
endpoint=args[3],
latest_handshake=int(args[5]),
transfer_rx=int(args[6]),
transfer_tx=int(args[7]),
persistent_keepalive=args[8],
allowed_ips=allowed_ips)
wg_state[args[0]]['peers'].append(peer)
2022-06-08 11:51:02 +03:00
elif len(args) == 4:
interface = Interface(
name=config,
private_key=args[0],
public_key=args[1],
listen_port=args[2],
fwmark=args[3],
started=None,
peers=[])
wg_state[interface['name']] = interface
elif len(args) == 8:
allowed_ips = args[3].replace(' ', '').split(',')
peer = WG_peer(
preshared_key=args[0],
endpoint=args[2],
latest_handshake=int(args[4]),
transfer_rx=int(args[5]),
transfer_tx=int(args[6]),
persistent_keepalive=args[7],
allowed_ips=allowed_ips)
wg_state[config]['peers'].append(peer)
2022-06-07 15:43:57 +00:00
else:
pass
2022-06-08 11:51:02 +03:00
for interface in set(wg_state):
2022-10-23 13:09:59 +03:00
cmd = ["systemctl", "show", f"wg-quick@{interface}", "--property", "ActiveEnterTimestamp"]
2022-06-08 11:51:02 +03:00
proc = Popen(cmd,
stdout=PIPE,
stderr=PIPE,
universal_newlines=True
)
stdout, stderr = proc.communicate()
wg_state[interface]['started'] = stdout.strip().split("=")[1]
2022-06-07 15:43:57 +00:00
return wg_state
2021-05-18 15:16:31 -07:00
2021-05-06 17:36:18 +03:00
class Peer:
def __init__(self, peer=None, allowed_ips=None, comment='None'):
self.comment = comment
self.managed = False
if peer:
self.pub_key = peer['PublicKey']
self.allowed_ips = peer['AllowedIPs']
try:
data = list(map(str.strip, peer['_rawdata'][0].replace('#', '').split(';')))
if len(data) != 2:
self.priv_key = self.generate_key()
self.pub_key = self.public_key(self.priv_key)
else:
self.priv_key = data[0].split(':')[1].strip()
self.comment = data[1].split(':')[1].strip()
except:
pass
else:
self.priv_key = self.generate_key()
self.pub_key = self.public_key(self.priv_key)
2021-05-08 11:18:22 +03:00
self.allowed_ips = allowed_ips if allowed_ips else Helper(cfg_path=config).next_ip
2021-05-06 17:36:18 +03:00
self.full_comment = "# priv_key: " + " ; ".join([self.priv_key, "comment: " + comment])
def generate_key(self):
"""Generates a new private key"""
private = PrivateKey.generate()
return b64encode(bytes(private)).decode("ascii")
def public_key(self, private_key):
"""Given a private key, returns the corresponding public key"""
private = PrivateKey(b64decode(private_key))
return b64encode(bytes(private.public_key)).decode("ascii")
def gen_config(self, helper):
2021-05-08 16:44:02 +03:00
"""Generate peer config"""
2022-03-16 14:14:35 +03:00
filename = f"{client_dir}/{self.comment.replace(' ', '_')}"
2021-05-06 17:36:18 +03:00
_wg = wgconfig.WGConfig(f"{filename}.conf")
_wg.initialize_file()
_wg.add_attr(None, 'Address', self.allowed_ips)
_wg.add_attr(None, 'DNS', helper.dns)
_wg.add_attr(None, 'PrivateKey', self.priv_key)
_wg.add_peer(helper.server_pub_key)
_wg.add_attr(helper.server_pub_key, 'AllowedIPs', f'{helper.dns}/32, {ips}')
2022-03-07 11:19:21 +03:00
_wg.add_attr(helper.server_pub_key, 'Endpoint', f"{helper.server_addr}")
2021-05-06 17:36:18 +03:00
_wg.add_attr(helper.server_pub_key, 'PersistentKeepalive', 10)
_wg.write_file()
2022-06-05 23:11:08 +00:00
call(f'qrencode -r {filename}.conf -o {filename}-qr.png', shell=True)
call(f'qrencode -t ansiutf8 -r {filename}.conf -o {filename}-qr.txt', shell=True)
2022-03-16 14:14:35 +03:00
log.info(f"Updated config for {filename}")
2021-05-06 17:36:18 +03:00
class Helper:
def __init__(
self,
2021-05-08 11:18:22 +03:00
cfg_path):
2021-05-06 17:36:18 +03:00
self.cfg_path = cfg_path
2021-05-08 11:29:51 +03:00
self.server_addr = hostname
2021-05-06 17:36:18 +03:00
self.dns = dns
self.wg = wgconfig.WGConfig(cfg_path)
self.wg.read_file()
@property
def server_pub_key(self):
2021-05-06 18:23:36 +03:00
"""Return server public key"""
2021-05-06 17:36:18 +03:00
return Peer().public_key(self.wg.interface['PrivateKey'])
@property
def peer_list(self):
2021-05-06 18:23:36 +03:00
"""Return list of WG peers"""
2021-05-06 17:36:18 +03:00
peer_list = list()
for i, v in self.wg.peers.items():
peer_list.append(Peer(peer=v))
return peer_list
@property
def ip_list(self):
2021-05-06 18:23:36 +03:00
"""Return list of IPs"""
2021-05-06 17:36:18 +03:00
ip_list = list()
2021-05-08 11:18:22 +03:00
ip_list.append(ipaddress.ip_address(Helper(cfg_path=config).wg.interface['Address'].split('/')[0]))
2021-05-06 17:36:18 +03:00
for i, v in self.wg.peers.items():
try:
ip_raw = v.get('AllowedIPs', None)
if isinstance(ip_raw, str):
ip = ipaddress.ip_address(ip_raw.split('/')[0])
elif isinstance(ip_raw, list):
ip = ipaddress.ip_address(ip_raw[0].split('/')[0])
ip_list.append(ip)
except:
pass
ip_list.sort()
return ip_list
2021-05-06 18:23:36 +03:00
@property
def next_ip(self):
"""Return next free IP"""
return self.ip_list[-1]+1
2021-05-06 17:36:18 +03:00
2021-05-06 18:23:36 +03:00
def add_peer(self, comment):
"""Generate a new peer"""
2021-05-06 17:36:18 +03:00
cl = Peer(comment=comment)
self.wg.add_peer(cl.pub_key, cl.full_comment)
self.wg.add_attr(cl.pub_key, 'AllowedIPs', f"{self.ip_list[-1]+1}/32")
cl.gen_config(self)
def del_peer(self, name):
"""Delete given peer"""
try:
pub_key = list(filter(lambda peer: peer['name'] == name, list_peers()))[0]['pub_key']
except:
log.info("Couldn't find peer.")
return False
self.wg.del_peer(pub_key)
2022-06-05 23:11:08 +00:00
filename = f"{client_dir}/{name.replace(' ', '_')}"
call(f"rm -f {filename}*", shell=True)
2021-05-06 17:36:18 +03:00
def add_peer(peer_name):
log.info('Generate a new peer config.')
2021-05-08 11:18:22 +03:00
helper = Helper(cfg_path=config)
2021-05-06 18:23:36 +03:00
helper.add_peer(peer_name)
2021-05-06 17:36:18 +03:00
helper.wg.write_file()
2022-06-09 16:13:10 +03:00
call(f"bash -c 'wg syncconf {config} <(wg-quick strip {config})'",shell=True)
2021-05-06 17:36:18 +03:00
def del_peer(peer_name):
log.info(f'Remove given peer {peer_name}.')
2021-05-08 11:18:22 +03:00
helper = Helper(cfg_path=config)
helper.del_peer(peer_name)
helper.wg.write_file()
2022-06-09 16:13:10 +03:00
call(f"bash -c 'wg syncconf {config} <(wg-quick strip {config})'",shell=True)
def update_configs():
log.info("Update all clients configs.")
2021-05-08 11:18:22 +03:00
for peer in Helper(cfg_path=config).peer_list:
peer.gen_config(Helper(cfg_path=config))
2021-05-06 17:36:18 +03:00
def list_peers():
2021-05-08 11:18:22 +03:00
return [{'name': p.comment, 'ip': p.allowed_ips, 'pub_key': p.pub_key} for p in Helper(cfg_path=config).peer_list]
2021-05-08 16:44:02 +03:00
if __name__ == '__main__':
if del_name:
del_peer(del_name)
2022-06-07 15:43:57 +00:00
elif not is_update and peer_name:
2021-05-08 16:44:02 +03:00
add_peer(peer_name)
2022-06-07 15:43:57 +00:00
elif is_update:
2021-05-08 16:44:02 +03:00
update_configs()
2022-06-07 15:43:57 +00:00
elif json:
2022-06-26 18:21:36 +03:00
print(wg_if_status()['wg0']['peers'][0])
2022-06-07 15:43:57 +00:00
else:
print(help_msg)