From 538bc2f65eaa67ed8e0fc0d1b0c5ed26bc02381a Mon Sep 17 00:00:00 2001 From: A B Date: Mon, 21 Oct 2024 13:22:03 +0000 Subject: [PATCH] Fixed tasks --- .gitignore | 3 +- mysite/celery.py | 16 +++--- mysite/settings.py | 1 + requirements.txt | 3 +- vpn/models.py | 8 +-- vpn/server_plugins/generic.py | 3 ++ vpn/server_plugins/outline.py | 93 +++++++++++++++++++++++++---------- vpn/tasks.py | 65 +++++++++++++++--------- vpn/views.py | 12 ++--- 9 files changed, 133 insertions(+), 71 deletions(-) diff --git a/.gitignore b/.gitignore index 48bffb6..9df6e2f 100755 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ debug.log *.pyc staticfiles/ *.__pycache__.* -vpn/migrations/ \ No newline at end of file +vpn/migrations/ +celerybeat-schedule diff --git a/mysite/celery.py b/mysite/celery.py index 2cd6ae9..a3368af 100644 --- a/mysite/celery.py +++ b/mysite/celery.py @@ -3,19 +3,21 @@ import os from celery import Celery from celery import shared_task +from celery.schedules import crontab - -# Set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings') logger = logging.getLogger(__name__) app = Celery('mysite') -# Using a string here means the worker doesn't have to serialize -# the configuration object to child processes. -# - namespace='CELERY' means all celery-related configuration keys -# should have a `CELERY_` prefix. +app.conf.beat_schedule = { + 'periodical_servers_sync': { + 'task': 'sync_all_servers', + 'schedule': crontab(minute='*'), + }, +} + + app.config_from_object('django.conf:settings', namespace='CELERY') -# Load task modules from all registered Django apps. app.autodiscover_tasks() diff --git a/mysite/settings.py b/mysite/settings.py index eaad95c..99f1b30 100644 --- a/mysite/settings.py +++ b/mysite/settings.py @@ -103,6 +103,7 @@ INSTALLED_APPS = [ 'polymorphic', 'corsheaders', 'django_celery_results', + 'django_celery_beat', 'vpn', ] diff --git a/requirements.txt b/requirements.txt index d4940a9..a8b6c11 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,5 @@ whitenoise==6.7.0 psycopg2-binary==2.9.10 setuptools==75.2.0 shortuuid==1.0.13 -django-celery-results==2.5.1 \ No newline at end of file +django-celery-results==2.5.1 +django-celery-beat==1.0.0 \ No newline at end of file diff --git a/vpn/models.py b/vpn/models.py index 6b2cbe2..29509fc 100644 --- a/vpn/models.py +++ b/vpn/models.py @@ -20,7 +20,6 @@ class User(models.Model): def save(self, *args, **kwargs): if not self.hash: self.hash = shortuuid.ShortUUID().random(length=16) - sync_user.delay_on_commit(self.id) super().save(*args, **kwargs) def __str__(self): @@ -49,11 +48,8 @@ class ACL(models.Model): @receiver(post_save, sender=ACL) def acl_created_or_updated(sender, instance, created, **kwargs): - if created: - sync_user.delay(instance.user.id) - else: - pass + sync_user.delay_on_commit(instance.user.id, instance.server.id) @receiver(pre_delete, sender=ACL) def acl_deleted(sender, instance, **kwargs): - sync_user.delay(instance.user.id) \ No newline at end of file + sync_user.delay_on_commit(instance.user.id, instance.server.id) \ No newline at end of file diff --git a/vpn/server_plugins/generic.py b/vpn/server_plugins/generic.py index 4b10611..e648628 100644 --- a/vpn/server_plugins/generic.py +++ b/vpn/server_plugins/generic.py @@ -27,6 +27,9 @@ class Server(PolymorphicModel): def sync(self, *args, **kwargs): pass + def sync_users(self, *args, **kwargs): + pass + def add_user(self, *args, **kwargs): pass diff --git a/vpn/server_plugins/outline.py b/vpn/server_plugins/outline.py index 52abed4..b57f8de 100644 --- a/vpn/server_plugins/outline.py +++ b/vpn/server_plugins/outline.py @@ -4,7 +4,7 @@ import requests from django.db import models from .generic import Server from urllib3 import PoolManager -from outline_vpn.outline_vpn import OutlineVPN, OutlineLibraryException +from outline_vpn.outline_vpn import OutlineVPN, OutlineServerErrorException from polymorphic.admin import PolymorphicChildModelAdmin from django.contrib import admin from django.utils.safestring import mark_safe @@ -78,6 +78,22 @@ class OutlineServer(Server): status.update({f"error": e}) return status + def sync_users(self): + from vpn.models import User, ACL + logger.debug(f"[{self.name}] Sync all users") + keys = self.client.get_keys() + acls = ACL.objects.filter(server=self) + acl_users = set(acl.user for acl in acls) + + for user in User.objects.all(): + if user in acl_users: + self.add_user(user=user) + else: + self.delete_user(user=user) + + return True + + def sync(self): status = {} try: @@ -98,11 +114,7 @@ class OutlineServer(Server): raise OutlineConnectionError("Client error. Can't connect.", original_exception=e) def _get_key(self, user): - try: - return self.client.get_key(user.hash) - except Exception as e: - logger.warning(f"sync error: {e}") - return None + return self.client.get_key(user.hash) def get_user(self, user, raw=False): user_info = self._get_key(user) @@ -120,29 +132,55 @@ class OutlineServer(Server): def add_user(self, user): - server_user = self._get_key(user) - logger.warning(server_user) + try: + server_user = self._get_key(user) + except OutlineServerErrorException as e: + server_user = None + logger.debug(f"[{self.name}] User {str(server_user)}") + result = {} key = None if server_user: - self.client.delete_key(user.hash) - key = self.client.create_key( - name=user.name, - method=server_user.method, - password=user.hash, - data_limit=None, - port=server_user.port - ) + if server_user.method != "chacha20-ietf-poly1305" or \ + server_user.port != int(self.client_port) or \ + server_user.name != user.name or \ + server_user.password != user.hash or \ + self.client.delete_key(user.hash): + + self.delete_user(user) + key = self.client.create_key( + key_id=user.hash, + name=user.name, + method=server_user.method, + password=user.hash, + data_limit=None, + port=server_user.port + ) + logger.debug(f"[{self.name}] User {user.name} updated") else: - key = self.client.create_key( - key_id=user.hash, - name=user.name, - method=server_user.method, - password=user.hash, - data_limit=None, - port=server_user.port - ) + try: + key = self.client.create_key( + key_id=user.hash, + name=user.name, + method="chacha20-ietf-poly1305", + password=user.hash, + data_limit=None, + port=int(self.client_port) + ) + logger.info(f"[{self.name}] User {user.name} created") + except OutlineServerErrorException as e: + error_message = str(e) + if "code\":\"Conflict" in error_message: + logger.warning(f"[{self.name}] Conflict for User {user.name}, trying to force sync. {error_message}") + for key in self.client.get_keys(): + logger.warning(f"[{self.name}] hash: {user.hash}, password: {key.password}") + if key.password == user.hash: + self.client.delete_key(key.key_id) + logger.warning(f"[{self.name}] Removed orphan key{str(key)}") + return self.add_user(user) + else: + raise OutlineConnectionError("API Error", original_exception=e) try: result['key_id'] = key.key_id result['name'] = key.name @@ -155,16 +193,17 @@ class OutlineServer(Server): return result def delete_user(self, user): - server_user = self._get_key(user) result = None + try: + server_user = self._get_key(user) + except OutlineServerErrorException as e: + return {"status": "User not found on server. Nothing to do."} if server_user: self.logger.info(f"[{self.name}] TEST") self.client.delete_key(server_user.key_id) result = {"status": "User was deleted"} self.logger.info(f"[{self.name}] User deleted: {user.name} on server {self.name}") - else: - result = {"status": "User absent, nothing to do."} return result diff --git a/vpn/tasks.py b/vpn/tasks.py index 77fe0a6..efb37f4 100644 --- a/vpn/tasks.py +++ b/vpn/tasks.py @@ -1,6 +1,6 @@ import logging -from celery import shared_task +from celery import group, shared_task #from django_celery_results.models import TaskResult from outline_vpn.outline_vpn import OutlineServerErrorException @@ -13,7 +13,32 @@ class TaskFailedException(Exception): super().__init__(f"{self.message}") -@shared_task(name="sync.server") +@shared_task(name="sync_all_servers") +def sync_all_users(): + from .models import User, ACL + from vpn.server_plugins import Server + + servers = Server.objects.all() + + tasks = group(sync_users.s(server.id) for server in servers) + + result = tasks.apply_async() + + return result + +@shared_task(name="sync_all_users_on_server") +def sync_users(server_id): + from .models import Server + + try: + server = Server.objects.get(id=server_id) + server.sync_users() + logger.info(f"Successfully synced users for server {server.name}") + except Exception as e: + logger.error(f"Error syncing users for server {server.name}: {e}") + raise TaskFailedException(message=f"Error syncing users for server {server.name}") + +@shared_task(name="sync_server_info") def sync_server(id): from vpn.server_plugins import Server # task_result = TaskResult.objects.get_task(self.request.id) @@ -21,30 +46,26 @@ def sync_server(id): # task_result.save() return {"status": Server.objects.get(id=id).sync()} -@shared_task(name="sync.user") -def sync_user(id): +@shared_task(name="sync_user_on_server") +def sync_user(user_id, server_id): from .models import User, ACL from vpn.server_plugins import Server errors = {} result = {} - user = User.objects.get(id=id) + user = User.objects.get(id=user_id) acls = ACL.objects.filter(user=user) - servers = Server.objects.all() - - for server in servers: - try: - if acls.filter(server=server).exists(): - result[server.name] = server.add_user(user) - else: - result[server.name] = server.delete_user(user) - except Exception as e: - errors[server.name] = {"error": e} - finally: - if errors: - logger.error("ERROR ERROR") - raise TaskFailedException(message=f"Errors during taks: {errors}") - else: - logger.error(f"PUK PUEK. {errors}") - return result \ No newline at end of file + server = Server.objects.get(id=server_id) + + try: + if acls.filter(server=server).exists(): + result[server.name] = server.add_user(user) + else: + result[server.name] = server.delete_user(user) + except Exception as e: + errors[server.name] = {"error": e} + finally: + if errors: + raise TaskFailedException(message=f"Errors during taks: {errors}") + return result \ No newline at end of file diff --git a/vpn/views.py b/vpn/views.py index 7d55e17..a92c42c 100644 --- a/vpn/views.py +++ b/vpn/views.py @@ -1,17 +1,15 @@ -from django.shortcuts import render - - -# views.py - from django.shortcuts import get_object_or_404 from django.http import JsonResponse -from django.utils import timezone def shadowsocks(request, link): from .models import ACL acl = get_object_or_404(ACL, link=link) - server_user = acl.server.get_user(acl.user, raw=True) + try: + server_user = acl.server.get_user(acl.user, raw=True) + except: + return JsonResponse({"error": "Couldn't get credentials from server."}) + config = { "info": "Managed by OutFleet_v2 [github.com/house-of-vanity/OutFleet/]", "password": server_user.password,