fix k8s things

This commit is contained in:
2024-03-19 01:11:08 +02:00
parent 77b78ec751
commit f82631b174
3 changed files with 99 additions and 46 deletions

64
k8s.py
View File

@@ -1,9 +1,12 @@
import base64 import base64
import json import json
import uuid
import yaml import yaml
import logging import logging
import threading import threading
import time import time
import lib
from kubernetes import client, config as kube_config from kubernetes import client, config as kube_config
from kubernetes.client.rest import ApiException from kubernetes.client.rest import ApiException
@@ -22,12 +25,54 @@ formatter = logging.Formatter(
file_handler.setFormatter(formatter) file_handler.setFormatter(formatter)
log.addHandler(file_handler) log.addHandler(file_handler)
def discovery_servers():
global CONFIG
interval = 10
log = logging.getLogger("OutFleet.discovery")
with lib.lock:
while True:
pods = V1.list_namespaced_pod(NAMESPACE, label_selector="app=shadowbox")
log.debug(f"Started discovery thread every {interval}")
for pod in pods.items:
log.debug(f"Found Outline server pod {pod.metadata.name}")
container_log = V1.read_namespaced_pod_log(name=pod.metadata.name, namespace=NAMESPACE, container='manager-config-json')
secret = json.loads(container_log.replace('\'', '\"'))
config = lib.get_config()
config_servers = find_server(secret, config["servers"])
#log.info(f"config_servers {config_servers}")
if len(config_servers) > 0:
log.debug(f"Already exist")
pass
else:
config["servers"][str(uuid.uuid4())] = {
"cert": secret["certSha256"],
"name": f"{pod.metadata.name}",
"comment": f"{pod.spec.node_name}",
"url": secret["apiUrl"],
}
write_config(config)
log.info(f"Added discovered server")
time.sleep(interval)
def find_server(search_data, servers):
found_servers = {}
for server_id, server_info in servers.items():
if server_info["url"] == search_data["apiUrl"] and server_info["cert"] == search_data["certSha256"]:
found_servers[server_id] = server_info
return found_servers
def write_config(config): def write_config(config):
config_map = client.V1ConfigMap( config_map = client.V1ConfigMap(
api_version="v1", api_version="v1",
kind="ConfigMap", kind="ConfigMap",
metadata=client.V1ObjectMeta( metadata=client.V1ObjectMeta(
name=f"config-outfleet", name=f"config-outfleet-bkp",
labels={ labels={
"app": "outfleet", "app": "outfleet",
} }
@@ -41,11 +86,11 @@ def write_config(config):
) )
except ApiException as e: except ApiException as e:
api_response = V1.patch_namespaced_config_map( api_response = V1.patch_namespaced_config_map(
name="config-outfleet", name="config-outfleet-bkp",
namespace=NAMESPACE, namespace=NAMESPACE,
body=config_map, body=config_map,
) )
log.info("Updated config in Kubernetes ConfigMap [config-outfleet]") log.info("Updated config in Kubernetes ConfigMap [config-outfleet-bkp]")
NAMESPACE = False NAMESPACE = False
SERVERS = list() SERVERS = list()
@@ -55,8 +100,8 @@ V1 = None
def reload_config(): def reload_config():
global CONFIG global CONFIG
while True: while True:
CONFIG = yaml.safe_load(V1.read_namespaced_config_map(name="config-outfleet", namespace=NAMESPACE).data['config.yaml']) CONFIG = yaml.safe_load(V1.read_namespaced_config_map(name="config-outfleet-bkp", namespace=NAMESPACE).data['config.yaml'])
log.debug(f"Synced system config with ConfigMap [config-outfleet].") log.debug(f"Synced system config with ConfigMap [config-outfleet-bkp].")
time.sleep(30) time.sleep(30)
@@ -68,17 +113,18 @@ try:
NAMESPACE = f.read().strip() NAMESPACE = f.read().strip()
log.info(f"Found Kubernetes environment. Deployed to namespace '{NAMESPACE}'") log.info(f"Found Kubernetes environment. Deployed to namespace '{NAMESPACE}'")
try: try:
CONFIG = yaml.safe_load(V1.read_namespaced_config_map(name="config-outfleet", namespace=NAMESPACE).data['config.yaml']) CONFIG = yaml.safe_load(V1.read_namespaced_config_map(name="config-outfleet-bkp", namespace=NAMESPACE).data['config.yaml'])
log.info(f"ConfigMap loaded from Kubernetes API. Servers: {len(CONFIG['servers'])}, Clients: {len(CONFIG['clients'])}. Started monitoring for changes every minute.") log.info(f"ConfigMap loaded from Kubernetes API. Servers: {len(CONFIG['servers'])}, Clients: {len(CONFIG['clients'])}. Started monitoring for changes every minute.")
except Exception as e: except Exception as e:
try: try:
write_config({"clients": [], "servers": {}, "ui_hostname": "accessible-address.com"}) write_config({"clients": [], "servers": {}, "ui_hostname": "accessible-address.com"})
CONFIG = yaml.safe_load(V1.read_namespaced_config_map(name="config-outfleet", namespace=NAMESPACE).data['config.yaml']) CONFIG = yaml.safe_load(V1.read_namespaced_config_map(name="config-outfleet-bkp", namespace=NAMESPACE).data['config.yaml'])
log.info("Created new ConfigMap [config-outfleet]. Started monitoring for changes every minute.") log.info("Created new ConfigMap [config-outfleet-bkp]. Started monitoring for changes every minute.")
except Exception as e: except Exception as e:
log.info(f"Failed to create new ConfigMap [config-outfleet] {e}") log.info(f"Failed to create new ConfigMap [config-outfleet-bkp] {e}")
thread = threading.Thread(target=reload_config) thread = threading.Thread(target=reload_config)
thread.start() thread.start()
except: except:
log.info("Kubernetes environment not detected") log.info("Kubernetes environment not detected")
except: except:

4
lib.py
View File

@@ -1,5 +1,6 @@
import argparse import argparse
import logging import logging
import threading
from typing import TypedDict, List from typing import TypedDict, List
from outline_vpn.outline_vpn import OutlineKey, OutlineVPN from outline_vpn.outline_vpn import OutlineKey, OutlineVPN
import yaml import yaml
@@ -21,6 +22,9 @@ parser.add_argument(
help="Config file location", help="Config file location",
) )
lock = threading.Lock()
args = parser.parse_args() args = parser.parse_args()
def get_config(): def get_config():
if k8s.CONFIG: if k8s.CONFIG:

15
main.py
View File

@@ -12,7 +12,7 @@ import uuid
import k8s import k8s
from flask import Flask, render_template, request, url_for, redirect from flask import Flask, render_template, request, url_for, redirect
from flask_cors import CORS from flask_cors import CORS
from lib import Server, write_config, get_config, args from lib import Server, write_config, get_config, args, lock
logging.getLogger("werkzeug").setLevel(logging.ERROR) logging.getLogger("werkzeug").setLevel(logging.ERROR)
@@ -32,7 +32,6 @@ formatter = logging.Formatter(
file_handler.setFormatter(formatter) file_handler.setFormatter(formatter)
log.addHandler(file_handler) log.addHandler(file_handler)
CFG_PATH = args.config CFG_PATH = args.config
NAMESPACE = k8s.NAMESPACE NAMESPACE = k8s.NAMESPACE
SERVERS = list() SERVERS = list()
@@ -57,6 +56,7 @@ def random_string(length=64):
def update_state(): def update_state():
while True: while True:
with lock:
global SERVERS global SERVERS
global CLIENTS global CLIENTS
global BROKEN_SERVERS global BROKEN_SERVERS
@@ -75,11 +75,11 @@ def update_state():
server = Server( server = Server(
url=server_config["url"], url=server_config["url"],
cert=server_config["cert"], cert=server_config["cert"],
comment=server_config["comment"], comment=server_config.get("comment", ''),
local_server_id=local_server_id, local_server_id=local_server_id,
) )
SERVERS.append(server) SERVERS.append(server)
log.debug( log.info(
"Server state updated: %s, [%s]", "Server state updated: %s, [%s]",
server.info()["name"], server.info()["name"],
local_server_id, local_server_id,
@@ -400,6 +400,9 @@ def sync():
if __name__ == "__main__": if __name__ == "__main__":
thread = threading.Thread(target=update_state) update_state_thread = threading.Thread(target=update_state)
thread.start() update_state_thread.start()
discovery_servers_thread = threading.Thread(target=k8s.discovery_servers)
discovery_servers_thread.start()
app.run(host="0.0.0.0") app.run(host="0.0.0.0")