From dc6d170f08348433b787d6d226d5880f58ebdd85 Mon Sep 17 00:00:00 2001 From: Ultradesu Date: Sun, 20 Jul 2025 22:50:22 +0300 Subject: [PATCH] Fixed last release --- mysite/celery.py | 17 +++ vpn/admin.py | 175 +++++++++++++++++++++++++- vpn/models.py | 30 +++++ vpn/server_plugins/generic.py | 14 ++- vpn/server_plugins/outline.py | 25 +++- vpn/tasks.py | 223 +++++++++++++++++++++++++++++----- 6 files changed, 448 insertions(+), 36 deletions(-) diff --git a/mysite/celery.py b/mysite/celery.py index a3368af..958f38d 100644 --- a/mysite/celery.py +++ b/mysite/celery.py @@ -14,10 +14,27 @@ app.conf.beat_schedule = { 'task': 'sync_all_servers', 'schedule': crontab(minute='*'), }, + 'cleanup_old_task_logs': { + 'task': 'cleanup_task_logs', + 'schedule': crontab(hour=2, minute=0), # Daily at 2 AM + }, } app.config_from_object('django.conf:settings', namespace='CELERY') +# Additional celery settings for better logging and performance +app.conf.update( + # Keep detailed results for debugging + result_expires=3600, # 1 hour + task_always_eager=False, + task_eager_propagates=True, + # Improve task tracking + task_track_started=True, + task_send_sent_event=True, + # Clean up settings + result_backend_cleanup_interval=300, # Clean up every 5 minutes +) + app.autodiscover_tasks() diff --git a/vpn/admin.py b/vpn/admin.py index 2091b92..a7d8182 100644 --- a/vpn/admin.py +++ b/vpn/admin.py @@ -12,7 +12,7 @@ from django.urls import path, reverse from django.http import HttpResponseRedirect from django.contrib.auth.admin import UserAdmin -from .models import User, AccessLog +from .models import User, AccessLog, TaskExecutionLog from django.utils.timezone import localtime from vpn.models import User, ACL, ACLLink from vpn.forms import UserForm @@ -24,6 +24,70 @@ from .server_plugins import ( OutlineServer, OutlineServerAdmin) +@admin.register(TaskExecutionLog) +class TaskExecutionLogAdmin(admin.ModelAdmin): + list_display = ('task_name_display', 'action', 'status_display', 'server', 'user', 'execution_time_display', 'created_at') + list_filter = ('task_name', 'status', 'server', 'created_at') + search_fields = ('task_id', 'task_name', 'action', 'user__username', 'server__name', 'message') + readonly_fields = ('task_id', 'task_name', 'server', 'user', 'action', 'status', 'message_formatted', 'execution_time', 'created_at') + ordering = ('-created_at',) + list_per_page = 100 + date_hierarchy = 'created_at' + + fieldsets = ( + ('Task Information', { + 'fields': ('task_id', 'task_name', 'action', 'status') + }), + ('Related Objects', { + 'fields': ('server', 'user') + }), + ('Execution Details', { + 'fields': ('message_formatted', 'execution_time', 'created_at') + }), + ) + + @admin.display(description='Task', ordering='task_name') + def task_name_display(self, obj): + task_names = { + 'sync_all_servers': '๐Ÿ”„ Sync All', + 'sync_all_users_on_server': '๐Ÿ‘ฅ Server Sync', + 'sync_server_info': 'โš™๏ธ Server Info', + 'sync_user_on_server': '๐Ÿ‘ค User Sync', + 'cleanup_task_logs': '๐Ÿงน Cleanup', + } + return task_names.get(obj.task_name, obj.task_name) + + @admin.display(description='Status', ordering='status') + def status_display(self, obj): + status_icons = { + 'STARTED': '๐ŸŸก Started', + 'SUCCESS': 'โœ… Success', + 'FAILURE': 'โŒ Failed', + 'RETRY': '๐Ÿ”„ Retry', + } + return status_icons.get(obj.status, obj.status) + + @admin.display(description='Time', ordering='execution_time') + def execution_time_display(self, obj): + if obj.execution_time: + if obj.execution_time < 1: + return f"{obj.execution_time*1000:.0f}ms" + else: + return f"{obj.execution_time:.2f}s" + return '-' + + @admin.display(description='Message') + def message_formatted(self, obj): + if obj.message: + return mark_safe(f"
{obj.message}
") + return '-' + + def has_add_permission(self, request): + return False + + def has_change_permission(self, request, obj=None): + return False + admin.site.site_title = "VPN Manager" admin.site.site_header = "VPN Manager" @@ -425,7 +489,7 @@ class ACLAdmin(admin.ModelAdmin): return mark_safe('
'.join(formatted_links)) try: - from django_celery_results.models import GroupResult + from django_celery_results.models import GroupResult, TaskResult from django_celery_beat.models import ( PeriodicTask, ClockedSchedule, @@ -440,6 +504,113 @@ try: admin.site.unregister(CrontabSchedule) admin.site.unregister(IntervalSchedule) admin.site.unregister(SolarSchedule) + admin.site.unregister(TaskResult) except (ImportError, admin.sites.NotRegistered): pass + +# Custom Celery admin interfaces +try: + from django_celery_results.models import TaskResult + from django_celery_beat.models import PeriodicTask + + @admin.register(TaskResult) + class CustomTaskResultAdmin(admin.ModelAdmin): + list_display = ('task_name_display', 'status', 'date_created', 'date_done', 'worker', 'result_display', 'traceback_display') + list_filter = ('status', 'date_created', 'worker', 'task_name') + search_fields = ('task_name', 'task_id', 'worker') + readonly_fields = ('task_id', 'task_name', 'status', 'result_formatted', 'date_created', 'date_done', 'traceback', 'worker', 'task_args', 'task_kwargs', 'meta') + ordering = ('-date_created',) + list_per_page = 50 + + fieldsets = ( + ('Task Information', { + 'fields': ('task_id', 'task_name', 'status', 'worker') + }), + ('Timing', { + 'fields': ('date_created', 'date_done') + }), + ('Result', { + 'fields': ('result_formatted',), + 'classes': ('collapse',) + }), + ('Arguments', { + 'fields': ('task_args', 'task_kwargs'), + 'classes': ('collapse',) + }), + ('Error Details', { + 'fields': ('traceback',), + 'classes': ('collapse',) + }), + ('Metadata', { + 'fields': ('meta',), + 'classes': ('collapse',) + }), + ) + + @admin.display(description='Task Name', ordering='task_name') + def task_name_display(self, obj): + task_names = { + 'sync_all_servers': '๐Ÿ”„ Sync All Servers', + 'sync_all_users_on_server': '๐Ÿ‘ฅ Sync Users on Server', + 'sync_server_info': 'โš™๏ธ Sync Server Info', + 'sync_user_on_server': '๐Ÿ‘ค Sync User on Server', + 'cleanup_task_logs': '๐Ÿงน Cleanup Old Logs', + } + return task_names.get(obj.task_name, obj.task_name) + + @admin.display(description='Result') + def result_display(self, obj): + if obj.status == 'SUCCESS' and obj.result: + try: + import json + result = json.loads(obj.result) if isinstance(obj.result, str) else obj.result + if isinstance(result, str): + return result[:100] + '...' if len(result) > 100 else result + elif isinstance(result, dict): + return ', '.join(f'{k}: {v}' for k, v in result.items())[:100] + except: + return str(obj.result)[:100] if obj.result else '-' + elif obj.status == 'FAILURE': + return 'โŒ Failed' + elif obj.status == 'PENDING': + return 'โณ Pending' + elif obj.status == 'RETRY': + return '๐Ÿ”„ Retrying' + return '-' + + @admin.display(description='Result Details') + def result_formatted(self, obj): + if obj.result: + try: + import json + result = json.loads(obj.result) if isinstance(obj.result, str) else obj.result + formatted = json.dumps(result, indent=2) + return mark_safe(f"
{formatted}
") + except: + return mark_safe(f"
{obj.result}
") + return '-' + + @admin.display(description='Error Info') + def traceback_display(self, obj): + if obj.traceback: + # Show first 200 chars of traceback + short_tb = obj.traceback[:200] + '...' if len(obj.traceback) > 200 else obj.traceback + return mark_safe(f"
{short_tb}
") + return '-' + + def has_add_permission(self, request): + return False + + def has_change_permission(self, request, obj=None): + return False + + @admin.register(PeriodicTask) + class CustomPeriodicTaskAdmin(admin.ModelAdmin): + list_display = ('name', 'task', 'enabled', 'last_run_at', 'total_run_count') + list_filter = ('enabled', 'last_run_at') + search_fields = ('name', 'task') + readonly_fields = ('last_run_at', 'total_run_count') + +except ImportError: + pass diff --git a/vpn/models.py b/vpn/models.py index ae6584e..80dd8a5 100644 --- a/vpn/models.py +++ b/vpn/models.py @@ -11,6 +11,36 @@ from django.contrib.auth.models import AbstractUser logger = logging.getLogger(__name__) +class TaskExecutionLog(models.Model): + task_id = models.CharField(max_length=255, help_text="Celery task ID") + task_name = models.CharField(max_length=100, help_text="Task name") + server = models.ForeignKey('Server', on_delete=models.SET_NULL, null=True, blank=True) + user = models.ForeignKey('User', on_delete=models.SET_NULL, null=True, blank=True) + action = models.CharField(max_length=100, help_text="Action performed") + status = models.CharField(max_length=20, choices=[ + ('STARTED', 'Started'), + ('SUCCESS', 'Success'), + ('FAILURE', 'Failure'), + ('RETRY', 'Retry'), + ], default='STARTED') + message = models.TextField(help_text="Detailed execution message") + execution_time = models.FloatField(null=True, blank=True, help_text="Execution time in seconds") + created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + ordering = ['-created_at'] + verbose_name = 'Task Execution Log' + verbose_name_plural = 'Task Execution Logs' + indexes = [ + models.Index(fields=['task_id']), + models.Index(fields=['created_at']), + models.Index(fields=['status']), + ] + + def __str__(self): + return f"{self.task_name} - {self.action} ({self.status})" + + class AccessLog(models.Model): user = models.CharField(max_length=256, blank=True, null=True, editable=False) server = models.CharField(max_length=256, blank=True, null=True, editable=False) diff --git a/vpn/server_plugins/generic.py b/vpn/server_plugins/generic.py index 57fbd11..67cc57f 100644 --- a/vpn/server_plugins/generic.py +++ b/vpn/server_plugins/generic.py @@ -1,6 +1,5 @@ from polymorphic.models import PolymorphicModel from django.db import models -from vpn.tasks import sync_server class Server(PolymorphicModel): @@ -18,8 +17,19 @@ class Server(PolymorphicModel): super().__init__(*args, **kwargs) def save(self, *args, **kwargs): - sync_server.delay(self.id) + # Only sync if the server actually exists and is valid + is_new = self.pk is None super().save(*args, **kwargs) + + # Schedule sync task for existing servers only + if not is_new: + try: + from vpn.tasks import sync_server + sync_server.delay(self.id) + except Exception as e: + import logging + logger = logging.getLogger(__name__) + logger.error(f"Failed to schedule sync for server {self.name}: {e}") def get_server_status(self, *args, **kwargs): return {"name": self.name} diff --git a/vpn/server_plugins/outline.py b/vpn/server_plugins/outline.py index 8ed2c0f..f2af1a1 100644 --- a/vpn/server_plugins/outline.py +++ b/vpn/server_plugins/outline.py @@ -86,15 +86,34 @@ class OutlineServer(Server): from vpn.models import User, ACL logger = logging.getLogger(__name__) logger.debug(f"[{self.name}] Sync all users") - keys = self.client.get_keys() + + try: + keys = self.client.get_keys() + except Exception as e: + logger.error(f"[{self.name}] Failed to get keys from server: {e}") + return False + acls = ACL.objects.filter(server=self) acl_users = set(acl.user for acl in acls) + # Log user synchronization details + user_list = ", ".join([user.username for user in acl_users]) + logger.info(f"[{self.name}] Syncing {len(acl_users)} users: {user_list[:200]}{'...' if len(user_list) > 200 else ''}") + for user in User.objects.all(): if user in acl_users: - self.add_user(user=user) + try: + result = self.add_user(user=user) + logger.debug(f"[{self.name}] Added user {user.username}: {result}") + except Exception as e: + logger.error(f"[{self.name}] Failed to add user {user.username}: {e}") else: - self.delete_user(user=user) + try: + result = self.delete_user(user=user) + if result and 'status' in result and 'deleted' in result['status']: + logger.debug(f"[{self.name}] Removed user {user.username}") + except Exception as e: + logger.error(f"[{self.name}] Failed to remove user {user.username}: {e}") return True diff --git a/vpn/tasks.py b/vpn/tasks.py index 58d59b8..7419c5a 100644 --- a/vpn/tasks.py +++ b/vpn/tasks.py @@ -1,10 +1,54 @@ - import logging +import time +from datetime import datetime, timedelta from celery import group, shared_task from celery.exceptions import Retry logger = logging.getLogger(__name__) +def create_task_log(task_id, task_name, action, status='STARTED', server=None, user=None, message='', execution_time=None): + """Helper function to create task execution log""" + try: + from .models import TaskExecutionLog + TaskExecutionLog.objects.create( + task_id=task_id, + task_name=task_name, + server=server, + user=user, + action=action, + status=status, + message=message, + execution_time=execution_time + ) + except Exception as e: + # Don't fail tasks if logging fails - just log to console + logger.error(f"Failed to create task log (task_id: {task_id}, action: {action}): {e}") + # If table doesn't exist, just continue without logging to DB + if "does not exist" in str(e): + logger.info(f"TaskExecutionLog table not found - run migrations. Task: {task_name}, Action: {action}, Status: {status}") + +@shared_task(name="cleanup_task_logs") +def cleanup_task_logs(): + """Clean up old task execution logs (older than 30 days)""" + from .models import TaskExecutionLog + + try: + cutoff_date = datetime.now() - timedelta(days=30) + old_logs = TaskExecutionLog.objects.filter(created_at__lt=cutoff_date) + count = old_logs.count() + + if count > 0: + old_logs.delete() + logger.info(f"Cleaned up {count} old task execution logs") + return f"Cleaned up {count} old task execution logs" + else: + logger.info("No old task execution logs to clean up") + return "No old task execution logs to clean up" + + except Exception as e: + logger.error(f"Error cleaning up task logs: {e}") + return f"Error cleaning up task logs: {e}" + class TaskFailedException(Exception): def __init__(self, message=""): self.message = message @@ -14,77 +58,180 @@ class TaskFailedException(Exception): @shared_task(name="sync_all_servers", bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60}) def sync_all_users(self): from vpn.server_plugins import Server - - servers = Server.objects.all() - if not servers.exists(): - logger.warning("No servers found for synchronization") - return "No servers to sync" + start_time = time.time() + task_id = self.request.id - tasks = group(sync_users.s(server.id) for server in servers) - result = tasks.apply_async() + create_task_log(task_id, "sync_all_servers", "Starting sync all servers", 'STARTED') - return f"Initiated sync for {servers.count()} servers" + try: + servers = Server.objects.all() + + if not servers.exists(): + message = "No servers found for synchronization" + logger.warning(message) + create_task_log(task_id, "sync_all_servers", "No servers to sync", 'SUCCESS', message=message, execution_time=time.time() - start_time) + return message + + # Filter out servers that might not exist anymore + valid_servers = [] + for server in servers: + try: + # Test basic server access + server.get_server_status() + valid_servers.append(server) + except Exception as e: + logger.warning(f"Skipping server {server.name} (ID: {server.id}) due to connection issues: {e}") + create_task_log(task_id, "sync_all_servers", f"Skipped server {server.name}", 'STARTED', server=server, message=f"Connection failed: {e}") + + # Log all servers that will be synced + server_list = ", ".join([s.name for s in valid_servers]) + if valid_servers: + create_task_log(task_id, "sync_all_servers", f"Found {len(valid_servers)} valid servers", 'STARTED', message=f"Servers: {server_list}") + + tasks = group(sync_users.s(server.id) for server in valid_servers) + result = tasks.apply_async() + + success_message = f"Initiated sync for {len(valid_servers)} servers: {server_list}" + else: + success_message = "No valid servers found for synchronization" + + create_task_log(task_id, "sync_all_servers", "Sync initiated", 'SUCCESS', message=success_message, execution_time=time.time() - start_time) + + return success_message + + except Exception as e: + error_message = f"Error initiating sync: {e}" + logger.error(error_message) + create_task_log(task_id, "sync_all_servers", "Sync failed", 'FAILURE', message=error_message, execution_time=time.time() - start_time) + raise @shared_task(name="sync_all_users_on_server", bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60}) def sync_users(self, server_id): from vpn.server_plugins import Server + start_time = time.time() + task_id = self.request.id + server = None + try: - server = Server.objects.get(id=server_id) + try: + server = Server.objects.get(id=server_id) + except Server.DoesNotExist: + error_message = f"Server with id {server_id} not found - may have been deleted" + logger.error(error_message) + create_task_log(task_id, "sync_all_users_on_server", "Server not found", 'FAILURE', message=error_message, execution_time=time.time() - start_time) + return error_message # Don't raise exception for deleted servers + + # Test server connectivity before proceeding + try: + server.get_server_status() + except Exception as e: + error_message = f"Server {server.name} is not accessible: {e}" + logger.warning(error_message) + create_task_log(task_id, "sync_all_users_on_server", "Server not accessible", 'FAILURE', server=server, message=error_message, execution_time=time.time() - start_time) + return error_message # Don't retry for connectivity issues + + create_task_log(task_id, "sync_all_users_on_server", f"Starting user sync for server {server.name}", 'STARTED', server=server) logger.info(f"Starting user sync for server {server.name}") + # Get all users for this server + from .models import ACL + acls = ACL.objects.filter(server=server).select_related('user') + user_count = acls.count() + user_list = ", ".join([acl.user.username for acl in acls[:10]]) # First 10 users + if user_count > 10: + user_list += f" and {user_count - 10} more" + + create_task_log(task_id, "sync_all_users_on_server", f"Found {user_count} users to sync", 'STARTED', server=server, message=f"Users: {user_list}") + sync_result = server.sync_users() if sync_result: - logger.info(f"Successfully synced users for server {server.name}") - return f"Successfully synced users for server {server.name}" + success_message = f"Successfully synced {user_count} users for server {server.name}" + logger.info(success_message) + create_task_log(task_id, "sync_all_users_on_server", "User sync completed", 'SUCCESS', server=server, message=success_message, execution_time=time.time() - start_time) + return success_message else: - raise TaskFailedException(f"Sync failed for server {server.name}") - - except Server.DoesNotExist: - logger.error(f"Server with id {server_id} not found") - raise TaskFailedException(f"Server with id {server_id} not found") + error_message = f"Sync failed for server {server.name}" + create_task_log(task_id, "sync_all_users_on_server", "User sync failed", 'FAILURE', server=server, message=error_message, execution_time=time.time() - start_time) + raise TaskFailedException(error_message) + + except TaskFailedException: + # Don't retry TaskFailedException + raise except Exception as e: - logger.error(f"Error syncing users for server id {server_id}: {e}") + error_message = f"Error syncing users for server {server.name if server else server_id}: {e}" + logger.error(error_message) + if self.request.retries < 3: - logger.info(f"Retrying sync for server id {server_id} (attempt {self.request.retries + 1})") + retry_message = f"Retrying sync for server {server.name if server else server_id} (attempt {self.request.retries + 1})" + logger.info(retry_message) + create_task_log(task_id, "sync_all_users_on_server", "Retrying user sync", 'RETRY', server=server, message=retry_message) raise self.retry(countdown=60) - raise TaskFailedException(f"Error syncing users for server id {server_id}: {e}") + + create_task_log(task_id, "sync_all_users_on_server", "User sync failed after retries", 'FAILURE', server=server, message=error_message, execution_time=time.time() - start_time) + raise TaskFailedException(error_message) @shared_task(name="sync_server_info", bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 30}) def sync_server(self, id): from vpn.server_plugins import Server + start_time = time.time() + task_id = self.request.id + server = None + try: server = Server.objects.get(id=id) + + create_task_log(task_id, "sync_server_info", f"Starting server info sync for {server.name}", 'STARTED', server=server) logger.info(f"Starting server info sync for {server.name}") sync_result = server.sync() + + success_message = f"Successfully synced server info for {server.name}" + result_details = f"Sync result: {sync_result}" + logger.info(f"{success_message}. {result_details}") + + create_task_log(task_id, "sync_server_info", "Server info synced", 'SUCCESS', server=server, message=f"{success_message}. {result_details}", execution_time=time.time() - start_time) + return {"status": sync_result, "server": server.name} except Server.DoesNotExist: - logger.error(f"Server with id {id} not found") - return {"error": f"Server with id {id} not found"} + error_message = f"Server with id {id} not found" + logger.error(error_message) + create_task_log(task_id, "sync_server_info", "Server not found", 'FAILURE', message=error_message, execution_time=time.time() - start_time) + return {"error": error_message} except Exception as e: - logger.error(f"Error syncing server info for id {id}: {e}") + error_message = f"Error syncing server info for {server.name if server else id}: {e}" + logger.error(error_message) + if self.request.retries < 3: - logger.info(f"Retrying server sync for id {id} (attempt {self.request.retries + 1})") + retry_message = f"Retrying server sync for {server.name if server else id} (attempt {self.request.retries + 1})" + logger.info(retry_message) + create_task_log(task_id, "sync_server_info", "Retrying server sync", 'RETRY', server=server, message=retry_message) raise self.retry(countdown=30) - return {"error": f"Error syncing server info: {e}"} + + create_task_log(task_id, "sync_server_info", "Server sync failed after retries", 'FAILURE', server=server, message=error_message, execution_time=time.time() - start_time) + return {"error": error_message} @shared_task(name="sync_user_on_server", bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 5, 'countdown': 30}) def sync_user(self, user_id, server_id): from .models import User, ACL from vpn.server_plugins import Server + start_time = time.time() + task_id = self.request.id errors = {} result = {} + user = None + server = None try: user = User.objects.get(id=user_id) server = Server.objects.get(id=server_id) + create_task_log(task_id, "sync_user_on_server", f"Starting user sync for {user.username} on {server.name}", 'STARTED', server=server, user=user) logger.info(f"Syncing user {user.username} on server {server.name}") # Check if ACL exists @@ -92,30 +239,48 @@ def sync_user(self, user_id, server_id): if acl_exists: # User should exist on server + action_message = f"Adding/updating user {user.username} on server {server.name}" + create_task_log(task_id, "sync_user_on_server", action_message, 'STARTED', server=server, user=user) + result[server.name] = server.add_user(user) - logger.info(f"Added/updated user {user.username} on server {server.name}") + + success_message = f"Successfully added/updated user {user.username} on server {server.name}" + logger.info(success_message) + create_task_log(task_id, "sync_user_on_server", "User added/updated", 'SUCCESS', server=server, user=user, message=f"{success_message}. Result: {result[server.name]}", execution_time=time.time() - start_time) else: # User should be removed from server + action_message = f"Removing user {user.username} from server {server.name}" + create_task_log(task_id, "sync_user_on_server", action_message, 'STARTED', server=server, user=user) + result[server.name] = server.delete_user(user) - logger.info(f"Removed user {user.username} from server {server.name}") + + success_message = f"Successfully removed user {user.username} from server {server.name}" + logger.info(success_message) + create_task_log(task_id, "sync_user_on_server", "User removed", 'SUCCESS', server=server, user=user, message=f"{success_message}. Result: {result[server.name]}", execution_time=time.time() - start_time) except User.DoesNotExist: error_msg = f"User with id {user_id} not found" logger.error(error_msg) errors["user"] = error_msg + create_task_log(task_id, "sync_user_on_server", "User not found", 'FAILURE', message=error_msg, execution_time=time.time() - start_time) except Server.DoesNotExist: error_msg = f"Server with id {server_id} not found" logger.error(error_msg) errors["server"] = error_msg + create_task_log(task_id, "sync_user_on_server", "Server not found", 'FAILURE', message=error_msg, execution_time=time.time() - start_time) except Exception as e: - error_msg = f"Error syncing user {user_id} on server {server_id}: {e}" + error_msg = f"Error syncing user {user.username if user else user_id} on server {server.name if server else server_id}: {e}" logger.error(error_msg) errors[f"server_{server_id}"] = error_msg # Retry on failure unless it's a permanent error if self.request.retries < 5: - logger.info(f"Retrying user sync for user {user_id} on server {server_id} (attempt {self.request.retries + 1})") + retry_message = f"Retrying user sync for user {user.username if user else user_id} on server {server.name if server else server_id} (attempt {self.request.retries + 1})" + logger.info(retry_message) + create_task_log(task_id, "sync_user_on_server", "Retrying user sync", 'RETRY', server=server, user=user, message=retry_message) raise self.retry(countdown=30) + + create_task_log(task_id, "sync_user_on_server", "User sync failed after retries", 'FAILURE', server=server, user=user, message=error_msg, execution_time=time.time() - start_time) if errors: raise TaskFailedException(message=f"Errors during task: {errors}")