From e818d63cada17d48852d6da9d56c8ef7aa191e3b Mon Sep 17 00:00:00 2001 From: Alexandr Bogomyakov Date: Tue, 19 Mar 2024 02:47:29 +0200 Subject: [PATCH] fix k8s things --- k8s.py | 38 +++++++++++++++-------------- lib.py | 2 +- main.py | 76 ++++++++++++++++++++++++++++++--------------------------- 3 files changed, 61 insertions(+), 55 deletions(-) diff --git a/k8s.py b/k8s.py index 9dae1e2..035fc54 100755 --- a/k8s.py +++ b/k8s.py @@ -27,23 +27,24 @@ log.addHandler(file_handler) def discovery_servers(): global CONFIG - interval = 10 + interval = 60 log = logging.getLogger("OutFleet.discovery") - with lib.lock: - while True: - pods = V1.list_namespaced_pod(NAMESPACE, label_selector="app=shadowbox") - log.debug(f"Started discovery thread every {interval}") - for pod in pods.items: - log.debug(f"Found Outline server pod {pod.metadata.name}") - container_log = V1.read_namespaced_pod_log(name=pod.metadata.name, namespace=NAMESPACE, container='manager-config-json') - secret = json.loads(container_log.replace('\'', '\"')) - config = lib.get_config() - config_servers = find_server(secret, config["servers"]) - #log.info(f"config_servers {config_servers}") - if len(config_servers) > 0: - log.debug(f"Already exist") - pass - else: + + while True: + pods = V1.list_namespaced_pod(NAMESPACE, label_selector="app=shadowbox") + log.debug(f"Started discovery thread every {interval}") + for pod in pods.items: + log.debug(f"Found Outline server pod {pod.metadata.name}") + container_log = V1.read_namespaced_pod_log(name=pod.metadata.name, namespace=NAMESPACE, container='manager-config-json') + secret = json.loads(container_log.replace('\'', '\"')) + config = lib.get_config() + config_servers = find_server(secret, config["servers"]) + #log.info(f"config_servers {config_servers}") + if len(config_servers) > 0: + log.debug(f"Already exist") + pass + else: + with lib.lock: config["servers"][str(uuid.uuid4())] = { "cert": secret["certSha256"], "name": f"{pod.metadata.name}", @@ -51,7 +52,7 @@ def discovery_servers(): "url": secret["apiUrl"], } write_config(config) - log.info(f"Added discovered server") + log.info(f"Added discovered server") time.sleep(interval) @@ -100,7 +101,8 @@ V1 = None def reload_config(): global CONFIG while True: - CONFIG = yaml.safe_load(V1.read_namespaced_config_map(name="config-outfleet", namespace=NAMESPACE).data['config.yaml']) + with lib.lock: + CONFIG = yaml.safe_load(V1.read_namespaced_config_map(name="config-outfleet", namespace=NAMESPACE).data['config.yaml']) log.debug(f"Synced system config with ConfigMap [config-outfleet].") time.sleep(30) diff --git a/lib.py b/lib.py index 41265ca..1c40580 100755 --- a/lib.py +++ b/lib.py @@ -125,7 +125,7 @@ class Server: else: return True - def apply_config(self, config, CFG_PATH): + def apply_config(self, config): if config.get("name"): self.client.set_server_name(config.get("name")) self.log.info( diff --git a/main.py b/main.py index d7381ac..e94282a 100755 --- a/main.py +++ b/main.py @@ -24,6 +24,7 @@ logging.basicConfig( ) log = logging.getLogger("OutFleet") +log.setLevel(logging.INFO) file_handler = logging.FileHandler("sync.log") file_handler.setLevel(logging.DEBUG) formatter = logging.Formatter( @@ -54,22 +55,20 @@ def random_string(length=64): -def update_state(): +def update_state(timer=40): + while True: with lock: global SERVERS global CLIENTS global BROKEN_SERVERS global HOSTNAME - - SERVERS = list() - BROKEN_SERVERS = list() - CLIENTS = dict() config = get_config() if config: HOSTNAME = config.get("ui_hostname", "my-own-SSL-ENABLED-domain.com") servers = config.get("servers", dict()) + _SERVERS = list() for local_server_id, server_config in servers.items(): try: server = Server( @@ -78,8 +77,8 @@ def update_state(): comment=server_config.get("comment", ''), local_server_id=local_server_id, ) - SERVERS.append(server) - log.info( + _SERVERS.append(server) + log.debug( "Server state updated: %s, [%s]", server.info()["name"], local_server_id, @@ -91,8 +90,10 @@ def update_state(): "id": local_server_id }) log.warning("Can't access server: %s - %s", server_config["url"], e) - + SERVERS = _SERVERS CLIENTS = config.get("clients", dict()) + if timer == 0: + break time.sleep(40) @@ -118,8 +119,8 @@ def index(): server = next( (item for item in SERVERS if item.info()["local_server_id"] == server), None ) - server.apply_config(request.form, CFG_PATH) - update_state() + server.apply_config(request.form) + update_state(timer=0) return redirect( url_for( "index", @@ -173,7 +174,7 @@ def add_server(): config["servers"] = servers write_config(config) log.info("Added server: %s", new_server.data["name"]) - update_state() + update_state(timer=0) return redirect(url_for("index", nt="Added Outline VPN Server")) except Exception as e: return redirect( @@ -200,7 +201,7 @@ def del_server(): pass write_config(config) log.info("Deleting server %s [%s]", server_name, request.form.get("local_server_id")) - update_state() + update_state(timer=0) return redirect(url_for("index", nt=f"Server {server_name} has been deleted")) @@ -265,7 +266,7 @@ def add_client(): request.form.get("name"), server.data["name"], ) - update_state() + update_state(timer=0) return redirect( url_for( "clients", @@ -299,7 +300,7 @@ def del_client(): config["clients"].pop(user_id) write_config(config) log.info("Deleting client %s", request.form.get("name")) - update_state() + update_state(timer=0) return redirect(url_for("clients", nt="User has been deleted")) @@ -370,39 +371,42 @@ def sync(): ) file_handler.setFormatter(formatter) log.addHandler(file_handler) - if request.form.get("wipe") == 'all': - for server in SERVERS: - log.info("Wiping all keys on [%s]", server.data["name"]) - for client in server.data['keys']: - server.delete_key(client.key_id) + with lock: + if request.form.get("wipe") == 'all': + for server in SERVERS: + log.info("Wiping all keys on [%s]", server.data["name"]) + for client in server.data['keys']: + server.delete_key(client.key_id) server_hash = {} - for server in SERVERS: - server_hash[server.data["local_server_id"]] = server - for key, client in CLIENTS.items(): - for u_server_id in client["servers"]: - if u_server_id in server_hash: - if not server_hash[u_server_id].check_client(client["name"]): - log.warning( - f"Client {client['name']} absent on {server_hash[u_server_id].data['name']}" - ) - server_hash[u_server_id].create_key(client["name"]) + with lock: + for server in SERVERS: + server_hash[server.data["local_server_id"]] = server + with lock: + for key, client in CLIENTS.items(): + for u_server_id in client["servers"]: + if u_server_id in server_hash: + if not server_hash[u_server_id].check_client(client["name"]): + log.warning( + f"Client {client['name']} absent on {server_hash[u_server_id].data['name']}" + ) + server_hash[u_server_id].create_key(client["name"]) + else: + log.info( + f"Client {client['name']} already present on {server_hash[u_server_id].data['name']}" + ) else: log.info( - f"Client {client['name']} already present on {server_hash[u_server_id].data['name']}" + f"Client {client['name']} incorrect server_id {u_server_id}" ) - else: - log.info( - f"Client {client['name']} incorrect server_id {u_server_id}" - ) - update_state() + update_state(timer=0) return redirect(url_for("sync")) if __name__ == "__main__": update_state_thread = threading.Thread(target=update_state) update_state_thread.start() + discovery_servers_thread = threading.Thread(target=k8s.discovery_servers) discovery_servers_thread.start() - app.run(host="0.0.0.0")