2024-10-20 21:57:12 +00:00
import logging
2025-07-20 22:50:22 +03:00
import time
from datetime import datetime , timedelta
2024-10-21 13:22:03 +00:00
from celery import group , shared_task
2025-07-20 22:30:04 +03:00
from celery . exceptions import Retry
2024-10-20 21:57:12 +00:00
logger = logging . getLogger ( __name__ )
2025-07-20 22:50:22 +03:00
def create_task_log ( task_id , task_name , action , status = ' STARTED ' , server = None , user = None , message = ' ' , execution_time = None ) :
""" Helper function to create task execution log """
try :
from . models import TaskExecutionLog
TaskExecutionLog . objects . create (
task_id = task_id ,
task_name = task_name ,
server = server ,
user = user ,
action = action ,
status = status ,
message = message ,
execution_time = execution_time
)
except Exception as e :
# Don't fail tasks if logging fails - just log to console
logger . error ( f " Failed to create task log (task_id: { task_id } , action: { action } ): { e } " )
# If table doesn't exist, just continue without logging to DB
if " does not exist " in str ( e ) :
logger . info ( f " TaskExecutionLog table not found - run migrations. Task: { task_name } , Action: { action } , Status: { status } " )
@shared_task ( name = " cleanup_task_logs " )
def cleanup_task_logs ( ) :
""" Clean up old task execution logs (older than 30 days) """
from . models import TaskExecutionLog
try :
cutoff_date = datetime . now ( ) - timedelta ( days = 30 )
old_logs = TaskExecutionLog . objects . filter ( created_at__lt = cutoff_date )
count = old_logs . count ( )
if count > 0 :
old_logs . delete ( )
logger . info ( f " Cleaned up { count } old task execution logs " )
return f " Cleaned up { count } old task execution logs "
else :
logger . info ( " No old task execution logs to clean up " )
return " No old task execution logs to clean up "
except Exception as e :
logger . error ( f " Error cleaning up task logs: { e } " )
return f " Error cleaning up task logs: { e } "
2024-10-20 21:57:12 +00:00
class TaskFailedException ( Exception ) :
def __init__ ( self , message = " " ) :
self . message = message
super ( ) . __init__ ( f " { self . message } " )
2025-07-20 22:30:04 +03:00
@shared_task ( name = " sync_all_servers " , bind = True , autoretry_for = ( Exception , ) , retry_kwargs = { ' max_retries ' : 3 , ' countdown ' : 60 } )
def sync_all_users ( self ) :
2024-10-21 13:22:03 +00:00
from vpn . server_plugins import Server
2025-07-20 22:50:22 +03:00
start_time = time . time ( )
task_id = self . request . id
2025-07-20 22:30:04 +03:00
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_all_servers " , " Starting sync all servers " , ' STARTED ' )
2024-10-21 13:22:03 +00:00
2025-07-20 22:50:22 +03:00
try :
servers = Server . objects . all ( )
if not servers . exists ( ) :
message = " No servers found for synchronization "
logger . warning ( message )
create_task_log ( task_id , " sync_all_servers " , " No servers to sync " , ' SUCCESS ' , message = message , execution_time = time . time ( ) - start_time )
return message
# Filter out servers that might not exist anymore
valid_servers = [ ]
for server in servers :
try :
# Test basic server access
server . get_server_status ( )
valid_servers . append ( server )
except Exception as e :
logger . warning ( f " Skipping server { server . name } (ID: { server . id } ) due to connection issues: { e } " )
create_task_log ( task_id , " sync_all_servers " , f " Skipped server { server . name } " , ' STARTED ' , server = server , message = f " Connection failed: { e } " )
# Log all servers that will be synced
server_list = " , " . join ( [ s . name for s in valid_servers ] )
if valid_servers :
create_task_log ( task_id , " sync_all_servers " , f " Found { len ( valid_servers ) } valid servers " , ' STARTED ' , message = f " Servers: { server_list } " )
tasks = group ( sync_users . s ( server . id ) for server in valid_servers )
result = tasks . apply_async ( )
success_message = f " Initiated sync for { len ( valid_servers ) } servers: { server_list } "
else :
success_message = " No valid servers found for synchronization "
create_task_log ( task_id , " sync_all_servers " , " Sync initiated " , ' SUCCESS ' , message = success_message , execution_time = time . time ( ) - start_time )
return success_message
except Exception as e :
error_message = f " Error initiating sync: { e } "
logger . error ( error_message )
create_task_log ( task_id , " sync_all_servers " , " Sync failed " , ' FAILURE ' , message = error_message , execution_time = time . time ( ) - start_time )
raise
2024-10-21 13:22:03 +00:00
2025-07-20 22:30:04 +03:00
@shared_task ( name = " sync_all_users_on_server " , bind = True , autoretry_for = ( Exception , ) , retry_kwargs = { ' max_retries ' : 3 , ' countdown ' : 60 } )
def sync_users ( self , server_id ) :
from vpn . server_plugins import Server
2025-07-20 22:50:22 +03:00
start_time = time . time ( )
task_id = self . request . id
server = None
2024-10-21 13:22:03 +00:00
try :
2025-07-20 22:50:22 +03:00
try :
server = Server . objects . get ( id = server_id )
except Server . DoesNotExist :
error_message = f " Server with id { server_id } not found - may have been deleted "
logger . error ( error_message )
create_task_log ( task_id , " sync_all_users_on_server " , " Server not found " , ' FAILURE ' , message = error_message , execution_time = time . time ( ) - start_time )
return error_message # Don't raise exception for deleted servers
# Test server connectivity before proceeding
try :
server . get_server_status ( )
except Exception as e :
error_message = f " Server { server . name } is not accessible: { e } "
logger . warning ( error_message )
create_task_log ( task_id , " sync_all_users_on_server " , " Server not accessible " , ' FAILURE ' , server = server , message = error_message , execution_time = time . time ( ) - start_time )
return error_message # Don't retry for connectivity issues
create_task_log ( task_id , " sync_all_users_on_server " , f " Starting user sync for server { server . name } " , ' STARTED ' , server = server )
2025-07-20 22:30:04 +03:00
logger . info ( f " Starting user sync for server { server . name } " )
2025-07-20 22:50:22 +03:00
# Get all users for this server
from . models import ACL
acls = ACL . objects . filter ( server = server ) . select_related ( ' user ' )
user_count = acls . count ( )
user_list = " , " . join ( [ acl . user . username for acl in acls [ : 10 ] ] ) # First 10 users
if user_count > 10 :
user_list + = f " and { user_count - 10 } more "
create_task_log ( task_id , " sync_all_users_on_server " , f " Found { user_count } users to sync " , ' STARTED ' , server = server , message = f " Users: { user_list } " )
2025-07-20 22:30:04 +03:00
sync_result = server . sync_users ( )
if sync_result :
2025-07-20 22:50:22 +03:00
success_message = f " Successfully synced { user_count } users for server { server . name } "
logger . info ( success_message )
create_task_log ( task_id , " sync_all_users_on_server " , " User sync completed " , ' SUCCESS ' , server = server , message = success_message , execution_time = time . time ( ) - start_time )
return success_message
2025-07-20 22:30:04 +03:00
else :
2025-07-20 22:50:22 +03:00
error_message = f " Sync failed for server { server . name } "
create_task_log ( task_id , " sync_all_users_on_server " , " User sync failed " , ' FAILURE ' , server = server , message = error_message , execution_time = time . time ( ) - start_time )
raise TaskFailedException ( error_message )
except TaskFailedException :
# Don't retry TaskFailedException
raise
2024-10-21 13:22:03 +00:00
except Exception as e :
2025-07-20 22:50:22 +03:00
error_message = f " Error syncing users for server { server . name if server else server_id } : { e } "
logger . error ( error_message )
2025-07-20 22:30:04 +03:00
if self . request . retries < 3 :
2025-07-20 22:50:22 +03:00
retry_message = f " Retrying sync for server { server . name if server else server_id } (attempt { self . request . retries + 1 } ) "
logger . info ( retry_message )
create_task_log ( task_id , " sync_all_users_on_server " , " Retrying user sync " , ' RETRY ' , server = server , message = retry_message )
2025-07-20 22:30:04 +03:00
raise self . retry ( countdown = 60 )
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_all_users_on_server " , " User sync failed after retries " , ' FAILURE ' , server = server , message = error_message , execution_time = time . time ( ) - start_time )
raise TaskFailedException ( error_message )
2024-10-21 13:22:03 +00:00
2025-07-20 22:30:04 +03:00
@shared_task ( name = " sync_server_info " , bind = True , autoretry_for = ( Exception , ) , retry_kwargs = { ' max_retries ' : 3 , ' countdown ' : 30 } )
def sync_server ( self , id ) :
2024-10-20 21:57:12 +00:00
from vpn . server_plugins import Server
2025-07-20 22:30:04 +03:00
2025-07-20 22:50:22 +03:00
start_time = time . time ( )
task_id = self . request . id
server = None
2025-07-20 22:30:04 +03:00
try :
server = Server . objects . get ( id = id )
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_server_info " , f " Starting server info sync for { server . name } " , ' STARTED ' , server = server )
2025-07-20 22:30:04 +03:00
logger . info ( f " Starting server info sync for { server . name } " )
sync_result = server . sync ( )
2025-07-20 22:50:22 +03:00
success_message = f " Successfully synced server info for { server . name } "
result_details = f " Sync result: { sync_result } "
logger . info ( f " { success_message } . { result_details } " )
create_task_log ( task_id , " sync_server_info " , " Server info synced " , ' SUCCESS ' , server = server , message = f " { success_message } . { result_details } " , execution_time = time . time ( ) - start_time )
2025-07-20 22:30:04 +03:00
return { " status " : sync_result , " server " : server . name }
except Server . DoesNotExist :
2025-07-20 22:50:22 +03:00
error_message = f " Server with id { id } not found "
logger . error ( error_message )
create_task_log ( task_id , " sync_server_info " , " Server not found " , ' FAILURE ' , message = error_message , execution_time = time . time ( ) - start_time )
return { " error " : error_message }
2025-07-20 22:30:04 +03:00
except Exception as e :
2025-07-20 22:50:22 +03:00
error_message = f " Error syncing server info for { server . name if server else id } : { e } "
logger . error ( error_message )
2025-07-20 22:30:04 +03:00
if self . request . retries < 3 :
2025-07-20 22:50:22 +03:00
retry_message = f " Retrying server sync for { server . name if server else id } (attempt { self . request . retries + 1 } ) "
logger . info ( retry_message )
create_task_log ( task_id , " sync_server_info " , " Retrying server sync " , ' RETRY ' , server = server , message = retry_message )
2025-07-20 22:30:04 +03:00
raise self . retry ( countdown = 30 )
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_server_info " , " Server sync failed after retries " , ' FAILURE ' , server = server , message = error_message , execution_time = time . time ( ) - start_time )
return { " error " : error_message }
2024-10-20 21:57:12 +00:00
2025-07-20 22:30:04 +03:00
@shared_task ( name = " sync_user_on_server " , bind = True , autoretry_for = ( Exception , ) , retry_kwargs = { ' max_retries ' : 5 , ' countdown ' : 30 } )
def sync_user ( self , user_id , server_id ) :
2024-10-20 21:57:12 +00:00
from . models import User , ACL
from vpn . server_plugins import Server
2025-07-20 22:50:22 +03:00
start_time = time . time ( )
task_id = self . request . id
2024-10-20 21:57:12 +00:00
errors = { }
result = { }
2025-07-20 22:50:22 +03:00
user = None
server = None
2025-07-20 22:30:04 +03:00
2024-10-21 13:22:03 +00:00
try :
2025-07-20 22:30:04 +03:00
user = User . objects . get ( id = user_id )
server = Server . objects . get ( id = server_id )
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_user_on_server " , f " Starting user sync for { user . username } on { server . name } " , ' STARTED ' , server = server , user = user )
2025-07-20 22:30:04 +03:00
logger . info ( f " Syncing user { user . username } on server { server . name } " )
# Check if ACL exists
acl_exists = ACL . objects . filter ( user = user , server = server ) . exists ( )
if acl_exists :
# User should exist on server
2025-07-20 22:50:22 +03:00
action_message = f " Adding/updating user { user . username } on server { server . name } "
create_task_log ( task_id , " sync_user_on_server " , action_message , ' STARTED ' , server = server , user = user )
2024-10-21 13:22:03 +00:00
result [ server . name ] = server . add_user ( user )
2025-07-20 22:50:22 +03:00
success_message = f " Successfully added/updated user { user . username } on server { server . name } "
logger . info ( success_message )
create_task_log ( task_id , " sync_user_on_server " , " User added/updated " , ' SUCCESS ' , server = server , user = user , message = f " { success_message } . Result: { result [ server . name ] } " , execution_time = time . time ( ) - start_time )
2024-10-21 13:22:03 +00:00
else :
2025-07-20 22:30:04 +03:00
# User should be removed from server
2025-07-20 22:50:22 +03:00
action_message = f " Removing user { user . username } from server { server . name } "
create_task_log ( task_id , " sync_user_on_server " , action_message , ' STARTED ' , server = server , user = user )
2024-10-21 13:22:03 +00:00
result [ server . name ] = server . delete_user ( user )
2025-07-20 22:50:22 +03:00
success_message = f " Successfully removed user { user . username } from server { server . name } "
logger . info ( success_message )
create_task_log ( task_id , " sync_user_on_server " , " User removed " , ' SUCCESS ' , server = server , user = user , message = f " { success_message } . Result: { result [ server . name ] } " , execution_time = time . time ( ) - start_time )
2025-07-20 22:30:04 +03:00
except User . DoesNotExist :
error_msg = f " User with id { user_id } not found "
logger . error ( error_msg )
errors [ " user " ] = error_msg
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_user_on_server " , " User not found " , ' FAILURE ' , message = error_msg , execution_time = time . time ( ) - start_time )
2025-07-20 22:30:04 +03:00
except Server . DoesNotExist :
error_msg = f " Server with id { server_id } not found "
logger . error ( error_msg )
errors [ " server " ] = error_msg
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_user_on_server " , " Server not found " , ' FAILURE ' , message = error_msg , execution_time = time . time ( ) - start_time )
2024-10-21 13:22:03 +00:00
except Exception as e :
2025-07-20 22:50:22 +03:00
error_msg = f " Error syncing user { user . username if user else user_id } on server { server . name if server else server_id } : { e } "
2025-07-20 22:30:04 +03:00
logger . error ( error_msg )
errors [ f " server_ { server_id } " ] = error_msg
# Retry on failure unless it's a permanent error
if self . request . retries < 5 :
2025-07-20 22:50:22 +03:00
retry_message = f " Retrying user sync for user { user . username if user else user_id } on server { server . name if server else server_id } (attempt { self . request . retries + 1 } ) "
logger . info ( retry_message )
create_task_log ( task_id , " sync_user_on_server " , " Retrying user sync " , ' RETRY ' , server = server , user = user , message = retry_message )
2025-07-20 22:30:04 +03:00
raise self . retry ( countdown = 30 )
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_user_on_server " , " User sync failed after retries " , ' FAILURE ' , server = server , user = user , message = error_msg , execution_time = time . time ( ) - start_time )
2025-07-20 22:30:04 +03:00
if errors :
raise TaskFailedException ( message = f " Errors during task: { errors } " )
return result