Added stat command

This commit is contained in:
root
2022-06-07 15:43:57 +00:00
parent abacb8acc2
commit a6cd4eabf3
3 changed files with 178 additions and 17 deletions

126
gen.py
View File

@@ -4,11 +4,12 @@
import wgconfig # default iniparser cannot read WG configs.
import logging
import json
import json as _json
import ipaddress
import argparse
import configparser
from subprocess import call
from typing import TypedDict
from subprocess import call, Popen, PIPE
from socket import getfqdn
from os import path, mkdir
from base64 import b64encode, b64decode
@@ -25,10 +26,39 @@ log = logging.getLogger('generator')
my_parser = argparse.ArgumentParser()
# Add the arguments
my_parser.add_argument('--update', action='store_true', default=False)
my_parser.add_argument('--peer', action='store', type=str)
my_parser.add_argument('--delete', action='store', type=str)
my_parser.add_argument('--config', action='store', default='wg0', type=str)
args = {
"--update": {
"action": "store_true",
"default": False,
"desc": "Regenerate all client configs"
},
"--json": {
"action": "store_true",
"default": False,
"desc": "Print all Wireguard statistics in JSON"
},
"--peer": {
"action": "store",
"default": None,
"desc": "Add new peer"
},
"--delete": {
"action": "store",
"default": None,
"desc": "Delete peer"
},
"--config": {
"action": "store",
"default": "wg0",
"desc": "Config to use, default wg0"
},
}
for arg in args.items():
my_parser.add_argument(arg[0], action=arg[1]['action'], default=arg[1]['default'])
help_msg = ""
for arg in args.items():
help_msg += f" {arg[0]}\t{arg[1]['desc']}\n"
## Reading config
# Execute the parse_args() method
@@ -36,6 +66,7 @@ args = my_parser.parse_args()
peer_name = args.peer
del_name = args.delete
is_update = args.update
json = args.json
wpm_config = configparser.ConfigParser()
client_dir = f"/etc/wireguard/clients_{args.config}"
if not path.isdir(client_dir):
@@ -51,9 +82,79 @@ else:
dns = '8.8.8.8'
hostname = getfqdn()
config = args.config
log.info('Using %s WG config file.', config)
log.debug('Using %s WG config file.', config)
class WG_peer(TypedDict):
preshared_key: str
endpoint: str
latest_handshake: int
transfer_rx: int
transfer_tx: int
persistent_keepalive: bool
allowed_ips: list
class Interface(TypedDict):
name: str
private_key: str
public_key: str
listen_port: int
fwmark: str
peers: list
started: str
class Wireguard(TypedDict):
interfaces: dict
wg_state = Wireguard({})
def wg_json():
cmd = ["/usr/bin/wg", "show", "all", "dump"]
proc = Popen(cmd,
stdout=PIPE,
stderr=PIPE,
universal_newlines=True
)
stdout, stderr = proc.communicate()
for v in stdout.split('\n'):
cmd = ["systemctl", "show", "wg-quick@wg0", "--property", "InactiveEnterTimestamp"]
proc = Popen(cmd,
stdout=PIPE,
stderr=PIPE,
universal_newlines=True
)
stdout, stderr = proc.communicate()
args = v.split('\t')
if len(args) == 5:
interface = Interface(
name=args[0],
private_key=args[1],
public_key=args[2],
listen_port=args[3],
fwmark=args[4],
started=stdout.strip().split("=")[1],
peers=[])
wg_state[interface['name']] = interface
elif len(args) == 9:
allowed_ips = args[4].replace(' ', '').split(',')
peer = WG_peer(
preshared_key=args[1],
endpoint=args[3],
latest_handshake=int(args[5]),
transfer_rx=int(args[6]),
transfer_tx=int(args[7]),
persistent_keepalive=args[8],
allowed_ips=allowed_ips)
wg_state[args[0]]['peers'].append(peer)
else:
pass
#return _json.dumps(wg_state)
return wg_state
class Peer:
def __init__(self, peer=None, allowed_ips=None, comment='None'):
@@ -201,9 +302,12 @@ def list_peers():
if __name__ == '__main__':
if del_name:
del_peer(del_name)
if not is_update and peer_name:
elif not is_update and peer_name:
add_peer(peer_name)
if is_update:
elif is_update:
update_configs()
elif json:
#print(_json.dumps(wg_json()))
print(wg_json()['wg0']['peers'][0])
else:
print(help_msg)