import logging from venv import logger import requests from django.db import models from .generic import Server from urllib3 import PoolManager from outline_vpn.outline_vpn import OutlineVPN, OutlineLibraryException from polymorphic.admin import PolymorphicChildModelAdmin from django.contrib import admin from django.utils.safestring import mark_safe from django.db.models import Count class OutlineConnectionError(Exception): def __init__(self, message, original_exception=None): super().__init__(message) self.original_exception = original_exception class _FingerprintAdapter(requests.adapters.HTTPAdapter): """ This adapter injected into the requests session will check that the fingerprint for the certificate matches for every request """ def __init__(self, fingerprint=None, **kwargs): self.fingerprint = str(fingerprint) super(_FingerprintAdapter, self).__init__(**kwargs) def init_poolmanager(self, connections, maxsize, block=False): self.poolmanager = PoolManager( num_pools=connections, maxsize=maxsize, block=block, assert_fingerprint=self.fingerprint, ) class OutlineServer(Server): logger = logging.getLogger(__name__) admin_url = models.URLField() admin_access_cert = models.CharField(max_length=255) client_server_name = models.CharField(max_length=255) client_hostname = models.CharField(max_length=255) client_port = models.CharField(max_length=5) class Meta: verbose_name = 'Outline' verbose_name_plural = 'Outline' def save(self, *args, **kwargs): self.server_type = 'Outline' super().save(*args, **kwargs) @property def status(self): return self.get_server_status(raw=True) @property def client(self): return OutlineVPN(api_url=self.admin_url, cert_sha256=self.admin_access_cert) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def __str__(self): return f"{self.name} ({self.client_hostname}:{self.client_port})" def get_server_status(self, raw=False): status = {} try: info = self.client.get_server_information() if raw: status = info else: status.update(info) except Exception as e: status.update({f"error": e}) return status def sync(self): status = {} try: state = self.client.get_server_information() if state["name"] != self.name: self.client.set_server_name(self.name) status["name"] = f"{state['name']} -> {self.name}" elif state["hostnameForAccessKeys"] != self.client_hostname: self.client.set_hostname(self.client_hostname) status["hostnameForAccessKeys"] = f"{state['hostnameForAccessKeys']} -> {self.client_hostname}" elif int(state["portForNewAccessKeys"]) != int(self.client_port): self.client.set_port_new_for_access_keys(int(self.client_port)) status["portForNewAccessKeys"] = f"{state['portForNewAccessKeys']} -> {self.client_port}" if len(status) == 0: status = {"status": "Nothing to do"} return status except AttributeError as e: raise OutlineConnectionError("Client error. Can't connect.", original_exception=e) def _get_key(self, user): try: return self.client.get_key(user.hash) except Exception as e: logger.warning(f"sync error: {e}") return None def get_user(self, user, raw=False): user_info = self._get_key(user) if raw: return user_info else: outline_key_dict = user_info.__dict__ outline_key_dict = { key: value for key, value in user_info.__dict__.items() if not key.startswith('_') and key not in [] # fields to mask } return outline_key_dict def add_user(self, user): server_user = self._get_key(user) logger.warning(server_user) result = {} key = None if server_user: self.client.delete_key(user.hash) key = self.client.create_key( name=user.name, method=server_user.method, password=user.hash, data_limit=None, port=server_user.port ) else: key = self.client.create_key( key_id=user.hash, name=user.name, method=server_user.method, password=user.hash, data_limit=None, port=server_user.port ) try: result['key_id'] = key.key_id result['name'] = key.name result['method'] = key.method result['password'] = key.password result['data_limit'] = key.data_limit result['port'] = key.port except Exception as e: result = {"error": str(e)} return result def delete_user(self, user): server_user = self._get_key(user) result = None if server_user: self.logger.info(f"[{self.name}] TEST") self.client.delete_key(server_user.key_id) result = {"status": "User was deleted"} self.logger.info(f"[{self.name}] User deleted: {user.name} on server {self.name}") else: result = {"status": "User absent, nothing to do."} return result class OutlineServerAdmin(PolymorphicChildModelAdmin): base_model = OutlineServer show_in_index = False # Не отображать в главном списке админки list_display = ( 'name', 'admin_url', 'admin_access_cert', 'client_server_name', 'client_hostname', 'client_port', 'server_status_inline', 'user_count', 'registration_date' ) readonly_fields = ('server_status_full', ) exclude = ('server_type',) @admin.display(description='Clients', ordering='user_count') def user_count(self, obj): return obj.user_count def get_queryset(self, request): qs = super().get_queryset(request) qs = qs.annotate(user_count=Count('acl__user')) return qs def server_status_inline(self, obj): status = obj.get_server_status() if 'error' in status: return mark_safe(f"Error: {status['error']}") # Преобразуем JSON в красивый формат import json pretty_status = json.dumps(status, indent=4) return mark_safe(f"
{pretty_status}") server_status_inline.short_description = "Status" def server_status_full(self, obj): if obj and obj.pk: status = obj.get_server_status() if 'error' in status: return mark_safe(f"Error: {status['error']}") import json pretty_status = json.dumps(status, indent=4) return mark_safe(f"
{pretty_status}") return "N/A" server_status_full.short_description = "Server Status" def get_model_perms(self, request): """It disables display for sub-model""" return {} admin.site.register(OutlineServer, OutlineServerAdmin)