mirror of
https://github.com/house-of-vanity/OutFleet.git
synced 2025-08-21 14:37:16 +00:00
Fixed last release
This commit is contained in:
@@ -14,10 +14,27 @@ app.conf.beat_schedule = {
|
||||
'task': 'sync_all_servers',
|
||||
'schedule': crontab(minute='*'),
|
||||
},
|
||||
'cleanup_old_task_logs': {
|
||||
'task': 'cleanup_task_logs',
|
||||
'schedule': crontab(hour=2, minute=0), # Daily at 2 AM
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
app.config_from_object('django.conf:settings', namespace='CELERY')
|
||||
|
||||
# Additional celery settings for better logging and performance
|
||||
app.conf.update(
|
||||
# Keep detailed results for debugging
|
||||
result_expires=3600, # 1 hour
|
||||
task_always_eager=False,
|
||||
task_eager_propagates=True,
|
||||
# Improve task tracking
|
||||
task_track_started=True,
|
||||
task_send_sent_event=True,
|
||||
# Clean up settings
|
||||
result_backend_cleanup_interval=300, # Clean up every 5 minutes
|
||||
)
|
||||
|
||||
app.autodiscover_tasks()
|
||||
|
||||
|
175
vpn/admin.py
175
vpn/admin.py
@@ -12,7 +12,7 @@ from django.urls import path, reverse
|
||||
from django.http import HttpResponseRedirect
|
||||
|
||||
from django.contrib.auth.admin import UserAdmin
|
||||
from .models import User, AccessLog
|
||||
from .models import User, AccessLog, TaskExecutionLog
|
||||
from django.utils.timezone import localtime
|
||||
from vpn.models import User, ACL, ACLLink
|
||||
from vpn.forms import UserForm
|
||||
@@ -24,6 +24,70 @@ from .server_plugins import (
|
||||
OutlineServer,
|
||||
OutlineServerAdmin)
|
||||
|
||||
@admin.register(TaskExecutionLog)
|
||||
class TaskExecutionLogAdmin(admin.ModelAdmin):
|
||||
list_display = ('task_name_display', 'action', 'status_display', 'server', 'user', 'execution_time_display', 'created_at')
|
||||
list_filter = ('task_name', 'status', 'server', 'created_at')
|
||||
search_fields = ('task_id', 'task_name', 'action', 'user__username', 'server__name', 'message')
|
||||
readonly_fields = ('task_id', 'task_name', 'server', 'user', 'action', 'status', 'message_formatted', 'execution_time', 'created_at')
|
||||
ordering = ('-created_at',)
|
||||
list_per_page = 100
|
||||
date_hierarchy = 'created_at'
|
||||
|
||||
fieldsets = (
|
||||
('Task Information', {
|
||||
'fields': ('task_id', 'task_name', 'action', 'status')
|
||||
}),
|
||||
('Related Objects', {
|
||||
'fields': ('server', 'user')
|
||||
}),
|
||||
('Execution Details', {
|
||||
'fields': ('message_formatted', 'execution_time', 'created_at')
|
||||
}),
|
||||
)
|
||||
|
||||
@admin.display(description='Task', ordering='task_name')
|
||||
def task_name_display(self, obj):
|
||||
task_names = {
|
||||
'sync_all_servers': '🔄 Sync All',
|
||||
'sync_all_users_on_server': '👥 Server Sync',
|
||||
'sync_server_info': '⚙️ Server Info',
|
||||
'sync_user_on_server': '👤 User Sync',
|
||||
'cleanup_task_logs': '🧹 Cleanup',
|
||||
}
|
||||
return task_names.get(obj.task_name, obj.task_name)
|
||||
|
||||
@admin.display(description='Status', ordering='status')
|
||||
def status_display(self, obj):
|
||||
status_icons = {
|
||||
'STARTED': '🟡 Started',
|
||||
'SUCCESS': '✅ Success',
|
||||
'FAILURE': '❌ Failed',
|
||||
'RETRY': '🔄 Retry',
|
||||
}
|
||||
return status_icons.get(obj.status, obj.status)
|
||||
|
||||
@admin.display(description='Time', ordering='execution_time')
|
||||
def execution_time_display(self, obj):
|
||||
if obj.execution_time:
|
||||
if obj.execution_time < 1:
|
||||
return f"{obj.execution_time*1000:.0f}ms"
|
||||
else:
|
||||
return f"{obj.execution_time:.2f}s"
|
||||
return '-'
|
||||
|
||||
@admin.display(description='Message')
|
||||
def message_formatted(self, obj):
|
||||
if obj.message:
|
||||
return mark_safe(f"<pre style='white-space: pre-wrap; max-width: 800px;'>{obj.message}</pre>")
|
||||
return '-'
|
||||
|
||||
def has_add_permission(self, request):
|
||||
return False
|
||||
|
||||
def has_change_permission(self, request, obj=None):
|
||||
return False
|
||||
|
||||
|
||||
admin.site.site_title = "VPN Manager"
|
||||
admin.site.site_header = "VPN Manager"
|
||||
@@ -425,7 +489,7 @@ class ACLAdmin(admin.ModelAdmin):
|
||||
return mark_safe('<br>'.join(formatted_links))
|
||||
|
||||
try:
|
||||
from django_celery_results.models import GroupResult
|
||||
from django_celery_results.models import GroupResult, TaskResult
|
||||
from django_celery_beat.models import (
|
||||
PeriodicTask,
|
||||
ClockedSchedule,
|
||||
@@ -440,6 +504,113 @@ try:
|
||||
admin.site.unregister(CrontabSchedule)
|
||||
admin.site.unregister(IntervalSchedule)
|
||||
admin.site.unregister(SolarSchedule)
|
||||
admin.site.unregister(TaskResult)
|
||||
|
||||
except (ImportError, admin.sites.NotRegistered):
|
||||
pass
|
||||
|
||||
# Custom Celery admin interfaces
|
||||
try:
|
||||
from django_celery_results.models import TaskResult
|
||||
from django_celery_beat.models import PeriodicTask
|
||||
|
||||
@admin.register(TaskResult)
|
||||
class CustomTaskResultAdmin(admin.ModelAdmin):
|
||||
list_display = ('task_name_display', 'status', 'date_created', 'date_done', 'worker', 'result_display', 'traceback_display')
|
||||
list_filter = ('status', 'date_created', 'worker', 'task_name')
|
||||
search_fields = ('task_name', 'task_id', 'worker')
|
||||
readonly_fields = ('task_id', 'task_name', 'status', 'result_formatted', 'date_created', 'date_done', 'traceback', 'worker', 'task_args', 'task_kwargs', 'meta')
|
||||
ordering = ('-date_created',)
|
||||
list_per_page = 50
|
||||
|
||||
fieldsets = (
|
||||
('Task Information', {
|
||||
'fields': ('task_id', 'task_name', 'status', 'worker')
|
||||
}),
|
||||
('Timing', {
|
||||
'fields': ('date_created', 'date_done')
|
||||
}),
|
||||
('Result', {
|
||||
'fields': ('result_formatted',),
|
||||
'classes': ('collapse',)
|
||||
}),
|
||||
('Arguments', {
|
||||
'fields': ('task_args', 'task_kwargs'),
|
||||
'classes': ('collapse',)
|
||||
}),
|
||||
('Error Details', {
|
||||
'fields': ('traceback',),
|
||||
'classes': ('collapse',)
|
||||
}),
|
||||
('Metadata', {
|
||||
'fields': ('meta',),
|
||||
'classes': ('collapse',)
|
||||
}),
|
||||
)
|
||||
|
||||
@admin.display(description='Task Name', ordering='task_name')
|
||||
def task_name_display(self, obj):
|
||||
task_names = {
|
||||
'sync_all_servers': '🔄 Sync All Servers',
|
||||
'sync_all_users_on_server': '👥 Sync Users on Server',
|
||||
'sync_server_info': '⚙️ Sync Server Info',
|
||||
'sync_user_on_server': '👤 Sync User on Server',
|
||||
'cleanup_task_logs': '🧹 Cleanup Old Logs',
|
||||
}
|
||||
return task_names.get(obj.task_name, obj.task_name)
|
||||
|
||||
@admin.display(description='Result')
|
||||
def result_display(self, obj):
|
||||
if obj.status == 'SUCCESS' and obj.result:
|
||||
try:
|
||||
import json
|
||||
result = json.loads(obj.result) if isinstance(obj.result, str) else obj.result
|
||||
if isinstance(result, str):
|
||||
return result[:100] + '...' if len(result) > 100 else result
|
||||
elif isinstance(result, dict):
|
||||
return ', '.join(f'{k}: {v}' for k, v in result.items())[:100]
|
||||
except:
|
||||
return str(obj.result)[:100] if obj.result else '-'
|
||||
elif obj.status == 'FAILURE':
|
||||
return '❌ Failed'
|
||||
elif obj.status == 'PENDING':
|
||||
return '⏳ Pending'
|
||||
elif obj.status == 'RETRY':
|
||||
return '🔄 Retrying'
|
||||
return '-'
|
||||
|
||||
@admin.display(description='Result Details')
|
||||
def result_formatted(self, obj):
|
||||
if obj.result:
|
||||
try:
|
||||
import json
|
||||
result = json.loads(obj.result) if isinstance(obj.result, str) else obj.result
|
||||
formatted = json.dumps(result, indent=2)
|
||||
return mark_safe(f"<pre>{formatted}</pre>")
|
||||
except:
|
||||
return mark_safe(f"<pre>{obj.result}</pre>")
|
||||
return '-'
|
||||
|
||||
@admin.display(description='Error Info')
|
||||
def traceback_display(self, obj):
|
||||
if obj.traceback:
|
||||
# Show first 200 chars of traceback
|
||||
short_tb = obj.traceback[:200] + '...' if len(obj.traceback) > 200 else obj.traceback
|
||||
return mark_safe(f"<pre style='color: red; font-size: 12px;'>{short_tb}</pre>")
|
||||
return '-'
|
||||
|
||||
def has_add_permission(self, request):
|
||||
return False
|
||||
|
||||
def has_change_permission(self, request, obj=None):
|
||||
return False
|
||||
|
||||
@admin.register(PeriodicTask)
|
||||
class CustomPeriodicTaskAdmin(admin.ModelAdmin):
|
||||
list_display = ('name', 'task', 'enabled', 'last_run_at', 'total_run_count')
|
||||
list_filter = ('enabled', 'last_run_at')
|
||||
search_fields = ('name', 'task')
|
||||
readonly_fields = ('last_run_at', 'total_run_count')
|
||||
|
||||
except ImportError:
|
||||
pass
|
||||
|
@@ -11,6 +11,36 @@ from django.contrib.auth.models import AbstractUser
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class TaskExecutionLog(models.Model):
|
||||
task_id = models.CharField(max_length=255, help_text="Celery task ID")
|
||||
task_name = models.CharField(max_length=100, help_text="Task name")
|
||||
server = models.ForeignKey('Server', on_delete=models.SET_NULL, null=True, blank=True)
|
||||
user = models.ForeignKey('User', on_delete=models.SET_NULL, null=True, blank=True)
|
||||
action = models.CharField(max_length=100, help_text="Action performed")
|
||||
status = models.CharField(max_length=20, choices=[
|
||||
('STARTED', 'Started'),
|
||||
('SUCCESS', 'Success'),
|
||||
('FAILURE', 'Failure'),
|
||||
('RETRY', 'Retry'),
|
||||
], default='STARTED')
|
||||
message = models.TextField(help_text="Detailed execution message")
|
||||
execution_time = models.FloatField(null=True, blank=True, help_text="Execution time in seconds")
|
||||
created_at = models.DateTimeField(auto_now_add=True)
|
||||
|
||||
class Meta:
|
||||
ordering = ['-created_at']
|
||||
verbose_name = 'Task Execution Log'
|
||||
verbose_name_plural = 'Task Execution Logs'
|
||||
indexes = [
|
||||
models.Index(fields=['task_id']),
|
||||
models.Index(fields=['created_at']),
|
||||
models.Index(fields=['status']),
|
||||
]
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.task_name} - {self.action} ({self.status})"
|
||||
|
||||
|
||||
class AccessLog(models.Model):
|
||||
user = models.CharField(max_length=256, blank=True, null=True, editable=False)
|
||||
server = models.CharField(max_length=256, blank=True, null=True, editable=False)
|
||||
|
@@ -1,6 +1,5 @@
|
||||
from polymorphic.models import PolymorphicModel
|
||||
from django.db import models
|
||||
from vpn.tasks import sync_server
|
||||
|
||||
|
||||
class Server(PolymorphicModel):
|
||||
@@ -18,8 +17,19 @@ class Server(PolymorphicModel):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
sync_server.delay(self.id)
|
||||
# Only sync if the server actually exists and is valid
|
||||
is_new = self.pk is None
|
||||
super().save(*args, **kwargs)
|
||||
|
||||
# Schedule sync task for existing servers only
|
||||
if not is_new:
|
||||
try:
|
||||
from vpn.tasks import sync_server
|
||||
sync_server.delay(self.id)
|
||||
except Exception as e:
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.error(f"Failed to schedule sync for server {self.name}: {e}")
|
||||
|
||||
def get_server_status(self, *args, **kwargs):
|
||||
return {"name": self.name}
|
||||
|
@@ -86,15 +86,34 @@ class OutlineServer(Server):
|
||||
from vpn.models import User, ACL
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.debug(f"[{self.name}] Sync all users")
|
||||
keys = self.client.get_keys()
|
||||
|
||||
try:
|
||||
keys = self.client.get_keys()
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.name}] Failed to get keys from server: {e}")
|
||||
return False
|
||||
|
||||
acls = ACL.objects.filter(server=self)
|
||||
acl_users = set(acl.user for acl in acls)
|
||||
|
||||
# Log user synchronization details
|
||||
user_list = ", ".join([user.username for user in acl_users])
|
||||
logger.info(f"[{self.name}] Syncing {len(acl_users)} users: {user_list[:200]}{'...' if len(user_list) > 200 else ''}")
|
||||
|
||||
for user in User.objects.all():
|
||||
if user in acl_users:
|
||||
self.add_user(user=user)
|
||||
try:
|
||||
result = self.add_user(user=user)
|
||||
logger.debug(f"[{self.name}] Added user {user.username}: {result}")
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.name}] Failed to add user {user.username}: {e}")
|
||||
else:
|
||||
self.delete_user(user=user)
|
||||
try:
|
||||
result = self.delete_user(user=user)
|
||||
if result and 'status' in result and 'deleted' in result['status']:
|
||||
logger.debug(f"[{self.name}] Removed user {user.username}")
|
||||
except Exception as e:
|
||||
logger.error(f"[{self.name}] Failed to remove user {user.username}: {e}")
|
||||
|
||||
return True
|
||||
|
||||
|
223
vpn/tasks.py
223
vpn/tasks.py
@@ -1,10 +1,54 @@
|
||||
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from celery import group, shared_task
|
||||
from celery.exceptions import Retry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def create_task_log(task_id, task_name, action, status='STARTED', server=None, user=None, message='', execution_time=None):
|
||||
"""Helper function to create task execution log"""
|
||||
try:
|
||||
from .models import TaskExecutionLog
|
||||
TaskExecutionLog.objects.create(
|
||||
task_id=task_id,
|
||||
task_name=task_name,
|
||||
server=server,
|
||||
user=user,
|
||||
action=action,
|
||||
status=status,
|
||||
message=message,
|
||||
execution_time=execution_time
|
||||
)
|
||||
except Exception as e:
|
||||
# Don't fail tasks if logging fails - just log to console
|
||||
logger.error(f"Failed to create task log (task_id: {task_id}, action: {action}): {e}")
|
||||
# If table doesn't exist, just continue without logging to DB
|
||||
if "does not exist" in str(e):
|
||||
logger.info(f"TaskExecutionLog table not found - run migrations. Task: {task_name}, Action: {action}, Status: {status}")
|
||||
|
||||
@shared_task(name="cleanup_task_logs")
|
||||
def cleanup_task_logs():
|
||||
"""Clean up old task execution logs (older than 30 days)"""
|
||||
from .models import TaskExecutionLog
|
||||
|
||||
try:
|
||||
cutoff_date = datetime.now() - timedelta(days=30)
|
||||
old_logs = TaskExecutionLog.objects.filter(created_at__lt=cutoff_date)
|
||||
count = old_logs.count()
|
||||
|
||||
if count > 0:
|
||||
old_logs.delete()
|
||||
logger.info(f"Cleaned up {count} old task execution logs")
|
||||
return f"Cleaned up {count} old task execution logs"
|
||||
else:
|
||||
logger.info("No old task execution logs to clean up")
|
||||
return "No old task execution logs to clean up"
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning up task logs: {e}")
|
||||
return f"Error cleaning up task logs: {e}"
|
||||
|
||||
class TaskFailedException(Exception):
|
||||
def __init__(self, message=""):
|
||||
self.message = message
|
||||
@@ -14,77 +58,180 @@ class TaskFailedException(Exception):
|
||||
@shared_task(name="sync_all_servers", bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60})
|
||||
def sync_all_users(self):
|
||||
from vpn.server_plugins import Server
|
||||
|
||||
servers = Server.objects.all()
|
||||
|
||||
if not servers.exists():
|
||||
logger.warning("No servers found for synchronization")
|
||||
return "No servers to sync"
|
||||
start_time = time.time()
|
||||
task_id = self.request.id
|
||||
|
||||
tasks = group(sync_users.s(server.id) for server in servers)
|
||||
result = tasks.apply_async()
|
||||
create_task_log(task_id, "sync_all_servers", "Starting sync all servers", 'STARTED')
|
||||
|
||||
return f"Initiated sync for {servers.count()} servers"
|
||||
try:
|
||||
servers = Server.objects.all()
|
||||
|
||||
if not servers.exists():
|
||||
message = "No servers found for synchronization"
|
||||
logger.warning(message)
|
||||
create_task_log(task_id, "sync_all_servers", "No servers to sync", 'SUCCESS', message=message, execution_time=time.time() - start_time)
|
||||
return message
|
||||
|
||||
# Filter out servers that might not exist anymore
|
||||
valid_servers = []
|
||||
for server in servers:
|
||||
try:
|
||||
# Test basic server access
|
||||
server.get_server_status()
|
||||
valid_servers.append(server)
|
||||
except Exception as e:
|
||||
logger.warning(f"Skipping server {server.name} (ID: {server.id}) due to connection issues: {e}")
|
||||
create_task_log(task_id, "sync_all_servers", f"Skipped server {server.name}", 'STARTED', server=server, message=f"Connection failed: {e}")
|
||||
|
||||
# Log all servers that will be synced
|
||||
server_list = ", ".join([s.name for s in valid_servers])
|
||||
if valid_servers:
|
||||
create_task_log(task_id, "sync_all_servers", f"Found {len(valid_servers)} valid servers", 'STARTED', message=f"Servers: {server_list}")
|
||||
|
||||
tasks = group(sync_users.s(server.id) for server in valid_servers)
|
||||
result = tasks.apply_async()
|
||||
|
||||
success_message = f"Initiated sync for {len(valid_servers)} servers: {server_list}"
|
||||
else:
|
||||
success_message = "No valid servers found for synchronization"
|
||||
|
||||
create_task_log(task_id, "sync_all_servers", "Sync initiated", 'SUCCESS', message=success_message, execution_time=time.time() - start_time)
|
||||
|
||||
return success_message
|
||||
|
||||
except Exception as e:
|
||||
error_message = f"Error initiating sync: {e}"
|
||||
logger.error(error_message)
|
||||
create_task_log(task_id, "sync_all_servers", "Sync failed", 'FAILURE', message=error_message, execution_time=time.time() - start_time)
|
||||
raise
|
||||
|
||||
@shared_task(name="sync_all_users_on_server", bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60})
|
||||
def sync_users(self, server_id):
|
||||
from vpn.server_plugins import Server
|
||||
|
||||
start_time = time.time()
|
||||
task_id = self.request.id
|
||||
server = None
|
||||
|
||||
try:
|
||||
server = Server.objects.get(id=server_id)
|
||||
try:
|
||||
server = Server.objects.get(id=server_id)
|
||||
except Server.DoesNotExist:
|
||||
error_message = f"Server with id {server_id} not found - may have been deleted"
|
||||
logger.error(error_message)
|
||||
create_task_log(task_id, "sync_all_users_on_server", "Server not found", 'FAILURE', message=error_message, execution_time=time.time() - start_time)
|
||||
return error_message # Don't raise exception for deleted servers
|
||||
|
||||
# Test server connectivity before proceeding
|
||||
try:
|
||||
server.get_server_status()
|
||||
except Exception as e:
|
||||
error_message = f"Server {server.name} is not accessible: {e}"
|
||||
logger.warning(error_message)
|
||||
create_task_log(task_id, "sync_all_users_on_server", "Server not accessible", 'FAILURE', server=server, message=error_message, execution_time=time.time() - start_time)
|
||||
return error_message # Don't retry for connectivity issues
|
||||
|
||||
create_task_log(task_id, "sync_all_users_on_server", f"Starting user sync for server {server.name}", 'STARTED', server=server)
|
||||
logger.info(f"Starting user sync for server {server.name}")
|
||||
|
||||
# Get all users for this server
|
||||
from .models import ACL
|
||||
acls = ACL.objects.filter(server=server).select_related('user')
|
||||
user_count = acls.count()
|
||||
user_list = ", ".join([acl.user.username for acl in acls[:10]]) # First 10 users
|
||||
if user_count > 10:
|
||||
user_list += f" and {user_count - 10} more"
|
||||
|
||||
create_task_log(task_id, "sync_all_users_on_server", f"Found {user_count} users to sync", 'STARTED', server=server, message=f"Users: {user_list}")
|
||||
|
||||
sync_result = server.sync_users()
|
||||
|
||||
if sync_result:
|
||||
logger.info(f"Successfully synced users for server {server.name}")
|
||||
return f"Successfully synced users for server {server.name}"
|
||||
success_message = f"Successfully synced {user_count} users for server {server.name}"
|
||||
logger.info(success_message)
|
||||
create_task_log(task_id, "sync_all_users_on_server", "User sync completed", 'SUCCESS', server=server, message=success_message, execution_time=time.time() - start_time)
|
||||
return success_message
|
||||
else:
|
||||
raise TaskFailedException(f"Sync failed for server {server.name}")
|
||||
|
||||
except Server.DoesNotExist:
|
||||
logger.error(f"Server with id {server_id} not found")
|
||||
raise TaskFailedException(f"Server with id {server_id} not found")
|
||||
error_message = f"Sync failed for server {server.name}"
|
||||
create_task_log(task_id, "sync_all_users_on_server", "User sync failed", 'FAILURE', server=server, message=error_message, execution_time=time.time() - start_time)
|
||||
raise TaskFailedException(error_message)
|
||||
|
||||
except TaskFailedException:
|
||||
# Don't retry TaskFailedException
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error syncing users for server id {server_id}: {e}")
|
||||
error_message = f"Error syncing users for server {server.name if server else server_id}: {e}"
|
||||
logger.error(error_message)
|
||||
|
||||
if self.request.retries < 3:
|
||||
logger.info(f"Retrying sync for server id {server_id} (attempt {self.request.retries + 1})")
|
||||
retry_message = f"Retrying sync for server {server.name if server else server_id} (attempt {self.request.retries + 1})"
|
||||
logger.info(retry_message)
|
||||
create_task_log(task_id, "sync_all_users_on_server", "Retrying user sync", 'RETRY', server=server, message=retry_message)
|
||||
raise self.retry(countdown=60)
|
||||
raise TaskFailedException(f"Error syncing users for server id {server_id}: {e}")
|
||||
|
||||
create_task_log(task_id, "sync_all_users_on_server", "User sync failed after retries", 'FAILURE', server=server, message=error_message, execution_time=time.time() - start_time)
|
||||
raise TaskFailedException(error_message)
|
||||
|
||||
@shared_task(name="sync_server_info", bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 30})
|
||||
def sync_server(self, id):
|
||||
from vpn.server_plugins import Server
|
||||
|
||||
start_time = time.time()
|
||||
task_id = self.request.id
|
||||
server = None
|
||||
|
||||
try:
|
||||
server = Server.objects.get(id=id)
|
||||
|
||||
create_task_log(task_id, "sync_server_info", f"Starting server info sync for {server.name}", 'STARTED', server=server)
|
||||
logger.info(f"Starting server info sync for {server.name}")
|
||||
|
||||
sync_result = server.sync()
|
||||
|
||||
success_message = f"Successfully synced server info for {server.name}"
|
||||
result_details = f"Sync result: {sync_result}"
|
||||
logger.info(f"{success_message}. {result_details}")
|
||||
|
||||
create_task_log(task_id, "sync_server_info", "Server info synced", 'SUCCESS', server=server, message=f"{success_message}. {result_details}", execution_time=time.time() - start_time)
|
||||
|
||||
return {"status": sync_result, "server": server.name}
|
||||
|
||||
except Server.DoesNotExist:
|
||||
logger.error(f"Server with id {id} not found")
|
||||
return {"error": f"Server with id {id} not found"}
|
||||
error_message = f"Server with id {id} not found"
|
||||
logger.error(error_message)
|
||||
create_task_log(task_id, "sync_server_info", "Server not found", 'FAILURE', message=error_message, execution_time=time.time() - start_time)
|
||||
return {"error": error_message}
|
||||
except Exception as e:
|
||||
logger.error(f"Error syncing server info for id {id}: {e}")
|
||||
error_message = f"Error syncing server info for {server.name if server else id}: {e}"
|
||||
logger.error(error_message)
|
||||
|
||||
if self.request.retries < 3:
|
||||
logger.info(f"Retrying server sync for id {id} (attempt {self.request.retries + 1})")
|
||||
retry_message = f"Retrying server sync for {server.name if server else id} (attempt {self.request.retries + 1})"
|
||||
logger.info(retry_message)
|
||||
create_task_log(task_id, "sync_server_info", "Retrying server sync", 'RETRY', server=server, message=retry_message)
|
||||
raise self.retry(countdown=30)
|
||||
return {"error": f"Error syncing server info: {e}"}
|
||||
|
||||
create_task_log(task_id, "sync_server_info", "Server sync failed after retries", 'FAILURE', server=server, message=error_message, execution_time=time.time() - start_time)
|
||||
return {"error": error_message}
|
||||
|
||||
@shared_task(name="sync_user_on_server", bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 5, 'countdown': 30})
|
||||
def sync_user(self, user_id, server_id):
|
||||
from .models import User, ACL
|
||||
from vpn.server_plugins import Server
|
||||
|
||||
start_time = time.time()
|
||||
task_id = self.request.id
|
||||
errors = {}
|
||||
result = {}
|
||||
user = None
|
||||
server = None
|
||||
|
||||
try:
|
||||
user = User.objects.get(id=user_id)
|
||||
server = Server.objects.get(id=server_id)
|
||||
|
||||
create_task_log(task_id, "sync_user_on_server", f"Starting user sync for {user.username} on {server.name}", 'STARTED', server=server, user=user)
|
||||
logger.info(f"Syncing user {user.username} on server {server.name}")
|
||||
|
||||
# Check if ACL exists
|
||||
@@ -92,30 +239,48 @@ def sync_user(self, user_id, server_id):
|
||||
|
||||
if acl_exists:
|
||||
# User should exist on server
|
||||
action_message = f"Adding/updating user {user.username} on server {server.name}"
|
||||
create_task_log(task_id, "sync_user_on_server", action_message, 'STARTED', server=server, user=user)
|
||||
|
||||
result[server.name] = server.add_user(user)
|
||||
logger.info(f"Added/updated user {user.username} on server {server.name}")
|
||||
|
||||
success_message = f"Successfully added/updated user {user.username} on server {server.name}"
|
||||
logger.info(success_message)
|
||||
create_task_log(task_id, "sync_user_on_server", "User added/updated", 'SUCCESS', server=server, user=user, message=f"{success_message}. Result: {result[server.name]}", execution_time=time.time() - start_time)
|
||||
else:
|
||||
# User should be removed from server
|
||||
action_message = f"Removing user {user.username} from server {server.name}"
|
||||
create_task_log(task_id, "sync_user_on_server", action_message, 'STARTED', server=server, user=user)
|
||||
|
||||
result[server.name] = server.delete_user(user)
|
||||
logger.info(f"Removed user {user.username} from server {server.name}")
|
||||
|
||||
success_message = f"Successfully removed user {user.username} from server {server.name}"
|
||||
logger.info(success_message)
|
||||
create_task_log(task_id, "sync_user_on_server", "User removed", 'SUCCESS', server=server, user=user, message=f"{success_message}. Result: {result[server.name]}", execution_time=time.time() - start_time)
|
||||
|
||||
except User.DoesNotExist:
|
||||
error_msg = f"User with id {user_id} not found"
|
||||
logger.error(error_msg)
|
||||
errors["user"] = error_msg
|
||||
create_task_log(task_id, "sync_user_on_server", "User not found", 'FAILURE', message=error_msg, execution_time=time.time() - start_time)
|
||||
except Server.DoesNotExist:
|
||||
error_msg = f"Server with id {server_id} not found"
|
||||
logger.error(error_msg)
|
||||
errors["server"] = error_msg
|
||||
create_task_log(task_id, "sync_user_on_server", "Server not found", 'FAILURE', message=error_msg, execution_time=time.time() - start_time)
|
||||
except Exception as e:
|
||||
error_msg = f"Error syncing user {user_id} on server {server_id}: {e}"
|
||||
error_msg = f"Error syncing user {user.username if user else user_id} on server {server.name if server else server_id}: {e}"
|
||||
logger.error(error_msg)
|
||||
errors[f"server_{server_id}"] = error_msg
|
||||
|
||||
# Retry on failure unless it's a permanent error
|
||||
if self.request.retries < 5:
|
||||
logger.info(f"Retrying user sync for user {user_id} on server {server_id} (attempt {self.request.retries + 1})")
|
||||
retry_message = f"Retrying user sync for user {user.username if user else user_id} on server {server.name if server else server_id} (attempt {self.request.retries + 1})"
|
||||
logger.info(retry_message)
|
||||
create_task_log(task_id, "sync_user_on_server", "Retrying user sync", 'RETRY', server=server, user=user, message=retry_message)
|
||||
raise self.retry(countdown=30)
|
||||
|
||||
create_task_log(task_id, "sync_user_on_server", "User sync failed after retries", 'FAILURE', server=server, user=user, message=error_msg, execution_time=time.time() - start_time)
|
||||
|
||||
if errors:
|
||||
raise TaskFailedException(message=f"Errors during task: {errors}")
|
||||
|
Reference in New Issue
Block a user