2024-10-20 21:57:12 +00:00
import logging
2025-07-20 22:50:22 +03:00
import time
from datetime import datetime , timedelta
2024-10-21 13:22:03 +00:00
from celery import group , shared_task
2025-07-20 22:30:04 +03:00
from celery . exceptions import Retry
2024-10-20 21:57:12 +00:00
logger = logging . getLogger ( __name__ )
2025-07-20 22:50:22 +03:00
def create_task_log ( task_id , task_name , action , status = ' STARTED ' , server = None , user = None , message = ' ' , execution_time = None ) :
""" Helper function to create task execution log """
try :
from . models import TaskExecutionLog
TaskExecutionLog . objects . create (
task_id = task_id ,
task_name = task_name ,
server = server ,
user = user ,
action = action ,
status = status ,
message = message ,
execution_time = execution_time
)
except Exception as e :
# Don't fail tasks if logging fails - just log to console
logger . error ( f " Failed to create task log (task_id: { task_id } , action: { action } ): { e } " )
# If table doesn't exist, just continue without logging to DB
if " does not exist " in str ( e ) :
logger . info ( f " TaskExecutionLog table not found - run migrations. Task: { task_name } , Action: { action } , Status: { status } " )
@shared_task ( name = " cleanup_task_logs " )
def cleanup_task_logs ( ) :
""" Clean up old task execution logs (older than 30 days) """
from . models import TaskExecutionLog
try :
cutoff_date = datetime . now ( ) - timedelta ( days = 30 )
old_logs = TaskExecutionLog . objects . filter ( created_at__lt = cutoff_date )
count = old_logs . count ( )
if count > 0 :
old_logs . delete ( )
logger . info ( f " Cleaned up { count } old task execution logs " )
return f " Cleaned up { count } old task execution logs "
else :
logger . info ( " No old task execution logs to clean up " )
return " No old task execution logs to clean up "
except Exception as e :
logger . error ( f " Error cleaning up task logs: { e } " )
return f " Error cleaning up task logs: { e } "
2024-10-20 21:57:12 +00:00
class TaskFailedException ( Exception ) :
def __init__ ( self , message = " " ) :
self . message = message
super ( ) . __init__ ( f " { self . message } " )
2025-07-20 22:30:04 +03:00
@shared_task ( name = " sync_all_servers " , bind = True , autoretry_for = ( Exception , ) , retry_kwargs = { ' max_retries ' : 3 , ' countdown ' : 60 } )
def sync_all_users ( self ) :
2024-10-21 13:22:03 +00:00
from vpn . server_plugins import Server
2025-07-20 22:50:22 +03:00
start_time = time . time ( )
task_id = self . request . id
2025-07-20 22:30:04 +03:00
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_all_servers " , " Starting sync all servers " , ' STARTED ' )
2024-10-21 13:22:03 +00:00
2025-07-20 22:50:22 +03:00
try :
servers = Server . objects . all ( )
if not servers . exists ( ) :
message = " No servers found for synchronization "
logger . warning ( message )
create_task_log ( task_id , " sync_all_servers " , " No servers to sync " , ' SUCCESS ' , message = message , execution_time = time . time ( ) - start_time )
return message
# Filter out servers that might not exist anymore
valid_servers = [ ]
for server in servers :
try :
# Test basic server access
server . get_server_status ( )
valid_servers . append ( server )
except Exception as e :
logger . warning ( f " Skipping server { server . name } (ID: { server . id } ) due to connection issues: { e } " )
create_task_log ( task_id , " sync_all_servers " , f " Skipped server { server . name } " , ' STARTED ' , server = server , message = f " Connection failed: { e } " )
# Log all servers that will be synced
server_list = " , " . join ( [ s . name for s in valid_servers ] )
if valid_servers :
create_task_log ( task_id , " sync_all_servers " , f " Found { len ( valid_servers ) } valid servers " , ' STARTED ' , message = f " Servers: { server_list } " )
tasks = group ( sync_users . s ( server . id ) for server in valid_servers )
result = tasks . apply_async ( )
success_message = f " Initiated sync for { len ( valid_servers ) } servers: { server_list } "
else :
success_message = " No valid servers found for synchronization "
create_task_log ( task_id , " sync_all_servers " , " Sync initiated " , ' SUCCESS ' , message = success_message , execution_time = time . time ( ) - start_time )
return success_message
except Exception as e :
error_message = f " Error initiating sync: { e } "
logger . error ( error_message )
create_task_log ( task_id , " sync_all_servers " , " Sync failed " , ' FAILURE ' , message = error_message , execution_time = time . time ( ) - start_time )
raise
2024-10-21 13:22:03 +00:00
2025-07-20 22:30:04 +03:00
@shared_task ( name = " sync_all_users_on_server " , bind = True , autoretry_for = ( Exception , ) , retry_kwargs = { ' max_retries ' : 3 , ' countdown ' : 60 } )
def sync_users ( self , server_id ) :
from vpn . server_plugins import Server
2025-07-20 22:50:22 +03:00
start_time = time . time ( )
task_id = self . request . id
server = None
2024-10-21 13:22:03 +00:00
try :
2025-07-20 22:50:22 +03:00
try :
server = Server . objects . get ( id = server_id )
except Server . DoesNotExist :
error_message = f " Server with id { server_id } not found - may have been deleted "
logger . error ( error_message )
create_task_log ( task_id , " sync_all_users_on_server " , " Server not found " , ' FAILURE ' , message = error_message , execution_time = time . time ( ) - start_time )
return error_message # Don't raise exception for deleted servers
# Test server connectivity before proceeding
try :
server . get_server_status ( )
except Exception as e :
error_message = f " Server { server . name } is not accessible: { e } "
logger . warning ( error_message )
create_task_log ( task_id , " sync_all_users_on_server " , " Server not accessible " , ' FAILURE ' , server = server , message = error_message , execution_time = time . time ( ) - start_time )
return error_message # Don't retry for connectivity issues
create_task_log ( task_id , " sync_all_users_on_server " , f " Starting user sync for server { server . name } " , ' STARTED ' , server = server )
2025-07-20 22:30:04 +03:00
logger . info ( f " Starting user sync for server { server . name } " )
2025-07-20 22:50:22 +03:00
# Get all users for this server
from . models import ACL
acls = ACL . objects . filter ( server = server ) . select_related ( ' user ' )
user_count = acls . count ( )
user_list = " , " . join ( [ acl . user . username for acl in acls [ : 10 ] ] ) # First 10 users
if user_count > 10 :
user_list + = f " and { user_count - 10 } more "
create_task_log ( task_id , " sync_all_users_on_server " , f " Found { user_count } users to sync " , ' STARTED ' , server = server , message = f " Users: { user_list } " )
2025-07-20 22:30:04 +03:00
sync_result = server . sync_users ( )
if sync_result :
2025-07-20 22:50:22 +03:00
success_message = f " Successfully synced { user_count } users for server { server . name } "
logger . info ( success_message )
create_task_log ( task_id , " sync_all_users_on_server " , " User sync completed " , ' SUCCESS ' , server = server , message = success_message , execution_time = time . time ( ) - start_time )
return success_message
2025-07-20 22:30:04 +03:00
else :
2025-07-20 22:50:22 +03:00
error_message = f " Sync failed for server { server . name } "
create_task_log ( task_id , " sync_all_users_on_server " , " User sync failed " , ' FAILURE ' , server = server , message = error_message , execution_time = time . time ( ) - start_time )
raise TaskFailedException ( error_message )
except TaskFailedException :
# Don't retry TaskFailedException
raise
2024-10-21 13:22:03 +00:00
except Exception as e :
2025-07-20 22:50:22 +03:00
error_message = f " Error syncing users for server { server . name if server else server_id } : { e } "
logger . error ( error_message )
2025-07-20 22:30:04 +03:00
if self . request . retries < 3 :
2025-07-20 22:50:22 +03:00
retry_message = f " Retrying sync for server { server . name if server else server_id } (attempt { self . request . retries + 1 } ) "
logger . info ( retry_message )
create_task_log ( task_id , " sync_all_users_on_server " , " Retrying user sync " , ' RETRY ' , server = server , message = retry_message )
2025-07-20 22:30:04 +03:00
raise self . retry ( countdown = 60 )
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_all_users_on_server " , " User sync failed after retries " , ' FAILURE ' , server = server , message = error_message , execution_time = time . time ( ) - start_time )
raise TaskFailedException ( error_message )
2024-10-21 13:22:03 +00:00
2025-07-20 22:30:04 +03:00
@shared_task ( name = " sync_server_info " , bind = True , autoretry_for = ( Exception , ) , retry_kwargs = { ' max_retries ' : 3 , ' countdown ' : 30 } )
def sync_server ( self , id ) :
2024-10-20 21:57:12 +00:00
from vpn . server_plugins import Server
2025-07-20 22:30:04 +03:00
2025-07-20 22:50:22 +03:00
start_time = time . time ( )
task_id = self . request . id
server = None
2025-07-20 22:30:04 +03:00
try :
server = Server . objects . get ( id = id )
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_server_info " , f " Starting server info sync for { server . name } " , ' STARTED ' , server = server )
2025-07-20 22:30:04 +03:00
logger . info ( f " Starting server info sync for { server . name } " )
sync_result = server . sync ( )
2025-07-20 22:50:22 +03:00
success_message = f " Successfully synced server info for { server . name } "
result_details = f " Sync result: { sync_result } "
logger . info ( f " { success_message } . { result_details } " )
create_task_log ( task_id , " sync_server_info " , " Server info synced " , ' SUCCESS ' , server = server , message = f " { success_message } . { result_details } " , execution_time = time . time ( ) - start_time )
2025-07-20 22:30:04 +03:00
return { " status " : sync_result , " server " : server . name }
except Server . DoesNotExist :
2025-07-20 22:50:22 +03:00
error_message = f " Server with id { id } not found "
logger . error ( error_message )
create_task_log ( task_id , " sync_server_info " , " Server not found " , ' FAILURE ' , message = error_message , execution_time = time . time ( ) - start_time )
return { " error " : error_message }
2025-07-20 22:30:04 +03:00
except Exception as e :
2025-07-20 22:50:22 +03:00
error_message = f " Error syncing server info for { server . name if server else id } : { e } "
logger . error ( error_message )
2025-07-20 22:30:04 +03:00
if self . request . retries < 3 :
2025-07-20 22:50:22 +03:00
retry_message = f " Retrying server sync for { server . name if server else id } (attempt { self . request . retries + 1 } ) "
logger . info ( retry_message )
create_task_log ( task_id , " sync_server_info " , " Retrying server sync " , ' RETRY ' , server = server , message = retry_message )
2025-07-20 22:30:04 +03:00
raise self . retry ( countdown = 30 )
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_server_info " , " Server sync failed after retries " , ' FAILURE ' , server = server , message = error_message , execution_time = time . time ( ) - start_time )
return { " error " : error_message }
2024-10-20 21:57:12 +00:00
2025-07-21 13:23:10 +03:00
@shared_task ( name = " update_user_statistics " , bind = True , autoretry_for = ( Exception , ) , retry_kwargs = { ' max_retries ' : 3 , ' countdown ' : 60 } )
def update_user_statistics ( self ) :
""" Update cached user statistics from AccessLog data """
from . models import User , AccessLog , UserStatistics , ACLLink
from django . utils import timezone
from datetime import timedelta
from django . db . models import Count , Q
from django . db import transaction
start_time = time . time ( )
task_id = self . request . id
create_task_log ( task_id , " update_user_statistics " , " Starting statistics update " , ' STARTED ' )
try :
now = timezone . now ( )
thirty_days_ago = now - timedelta ( days = 30 )
# Get all users with ACL links
users_with_links = User . objects . filter ( acl__isnull = False ) . distinct ( )
total_users = users_with_links . count ( )
create_task_log ( task_id , " update_user_statistics " , f " Found { total_users } users to process " , ' STARTED ' )
logger . info ( f " Updating statistics for { total_users } users " )
updated_count = 0
with transaction . atomic ( ) :
for user in users_with_links :
logger . debug ( f " Processing user { user . username } " )
# Get all ACL links for this user
acl_links = ACLLink . objects . filter ( acl__user = user ) . select_related ( ' acl__server ' )
for link in acl_links :
server_name = link . acl . server . name
2025-07-21 13:49:43 +03:00
# Calculate total connections for this specific link (all time)
2025-07-21 13:23:10 +03:00
total_connections = AccessLog . objects . filter (
user = user . username ,
server = server_name ,
2025-07-21 13:49:43 +03:00
acl_link_id = link . link ,
2025-07-21 13:23:10 +03:00
action = ' Success '
) . count ( )
# Calculate recent connections (last 30 days)
recent_connections = AccessLog . objects . filter (
user = user . username ,
server = server_name ,
2025-07-21 13:49:43 +03:00
acl_link_id = link . link ,
2025-07-21 13:23:10 +03:00
action = ' Success ' ,
timestamp__gte = thirty_days_ago
) . count ( )
# Generate daily usage data for the last 30 days
daily_usage = [ ]
max_daily = 0
for i in range ( 30 ) :
day_start = ( now - timedelta ( days = 29 - i ) ) . replace ( hour = 0 , minute = 0 , second = 0 , microsecond = 0 )
day_end = day_start + timedelta ( days = 1 )
day_connections = AccessLog . objects . filter (
user = user . username ,
server = server_name ,
2025-07-21 13:49:43 +03:00
acl_link_id = link . link ,
2025-07-21 13:23:10 +03:00
action = ' Success ' ,
timestamp__gte = day_start ,
timestamp__lt = day_end
) . count ( )
daily_usage . append ( day_connections )
max_daily = max ( max_daily , day_connections )
# Update or create statistics for this link
stats , created = UserStatistics . objects . update_or_create (
user = user ,
server_name = server_name ,
acl_link_id = link . link ,
defaults = {
' total_connections ' : total_connections ,
' recent_connections ' : recent_connections ,
' daily_usage ' : daily_usage ,
' max_daily ' : max_daily ,
}
)
action = " created " if created else " updated "
2025-07-21 13:49:43 +03:00
logger . debug ( f " { action } stats for { user . username } on { server_name } (link: { link . link } ): { total_connections } total, { recent_connections } recent " )
2025-07-21 13:23:10 +03:00
updated_count + = 1
logger . debug ( f " Completed processing user { user . username } " )
success_message = f " Successfully updated statistics for { updated_count } user-server-link combinations "
logger . info ( success_message )
create_task_log (
task_id ,
" update_user_statistics " ,
" Statistics update completed " ,
' SUCCESS ' ,
message = success_message ,
execution_time = time . time ( ) - start_time
)
return success_message
except Exception as e :
error_message = f " Error updating user statistics: { e } "
logger . error ( error_message , exc_info = True )
if self . request . retries < 3 :
retry_message = f " Retrying statistics update (attempt { self . request . retries + 1 } ) "
logger . info ( retry_message )
create_task_log ( task_id , " update_user_statistics " , " Retrying statistics update " , ' RETRY ' , message = retry_message )
raise self . retry ( countdown = 60 )
create_task_log (
task_id ,
" update_user_statistics " ,
" Statistics update failed after retries " ,
' FAILURE ' ,
message = error_message ,
execution_time = time . time ( ) - start_time
)
raise
2025-07-20 22:30:04 +03:00
@shared_task ( name = " sync_user_on_server " , bind = True , autoretry_for = ( Exception , ) , retry_kwargs = { ' max_retries ' : 5 , ' countdown ' : 30 } )
def sync_user ( self , user_id , server_id ) :
2024-10-20 21:57:12 +00:00
from . models import User , ACL
from vpn . server_plugins import Server
2025-07-20 22:50:22 +03:00
start_time = time . time ( )
task_id = self . request . id
2024-10-20 21:57:12 +00:00
errors = { }
result = { }
2025-07-20 22:50:22 +03:00
user = None
server = None
2025-07-20 22:30:04 +03:00
2024-10-21 13:22:03 +00:00
try :
2025-07-20 22:30:04 +03:00
user = User . objects . get ( id = user_id )
server = Server . objects . get ( id = server_id )
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_user_on_server " , f " Starting user sync for { user . username } on { server . name } " , ' STARTED ' , server = server , user = user )
2025-07-20 22:30:04 +03:00
logger . info ( f " Syncing user { user . username } on server { server . name } " )
# Check if ACL exists
acl_exists = ACL . objects . filter ( user = user , server = server ) . exists ( )
if acl_exists :
# User should exist on server
2025-07-20 22:50:22 +03:00
action_message = f " Adding/updating user { user . username } on server { server . name } "
create_task_log ( task_id , " sync_user_on_server " , action_message , ' STARTED ' , server = server , user = user )
2024-10-21 13:22:03 +00:00
result [ server . name ] = server . add_user ( user )
2025-07-20 22:50:22 +03:00
success_message = f " Successfully added/updated user { user . username } on server { server . name } "
logger . info ( success_message )
create_task_log ( task_id , " sync_user_on_server " , " User added/updated " , ' SUCCESS ' , server = server , user = user , message = f " { success_message } . Result: { result [ server . name ] } " , execution_time = time . time ( ) - start_time )
2024-10-21 13:22:03 +00:00
else :
2025-07-20 22:30:04 +03:00
# User should be removed from server
2025-07-20 22:50:22 +03:00
action_message = f " Removing user { user . username } from server { server . name } "
create_task_log ( task_id , " sync_user_on_server " , action_message , ' STARTED ' , server = server , user = user )
2024-10-21 13:22:03 +00:00
result [ server . name ] = server . delete_user ( user )
2025-07-20 22:50:22 +03:00
success_message = f " Successfully removed user { user . username } from server { server . name } "
logger . info ( success_message )
create_task_log ( task_id , " sync_user_on_server " , " User removed " , ' SUCCESS ' , server = server , user = user , message = f " { success_message } . Result: { result [ server . name ] } " , execution_time = time . time ( ) - start_time )
2025-07-20 22:30:04 +03:00
except User . DoesNotExist :
error_msg = f " User with id { user_id } not found "
logger . error ( error_msg )
errors [ " user " ] = error_msg
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_user_on_server " , " User not found " , ' FAILURE ' , message = error_msg , execution_time = time . time ( ) - start_time )
2025-07-20 22:30:04 +03:00
except Server . DoesNotExist :
error_msg = f " Server with id { server_id } not found "
logger . error ( error_msg )
errors [ " server " ] = error_msg
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_user_on_server " , " Server not found " , ' FAILURE ' , message = error_msg , execution_time = time . time ( ) - start_time )
2024-10-21 13:22:03 +00:00
except Exception as e :
2025-07-20 22:50:22 +03:00
error_msg = f " Error syncing user { user . username if user else user_id } on server { server . name if server else server_id } : { e } "
2025-07-20 22:30:04 +03:00
logger . error ( error_msg )
errors [ f " server_ { server_id } " ] = error_msg
# Retry on failure unless it's a permanent error
if self . request . retries < 5 :
2025-07-20 22:50:22 +03:00
retry_message = f " Retrying user sync for user { user . username if user else user_id } on server { server . name if server else server_id } (attempt { self . request . retries + 1 } ) "
logger . info ( retry_message )
create_task_log ( task_id , " sync_user_on_server " , " Retrying user sync " , ' RETRY ' , server = server , user = user , message = retry_message )
2025-07-20 22:30:04 +03:00
raise self . retry ( countdown = 30 )
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_user_on_server " , " User sync failed after retries " , ' FAILURE ' , server = server , user = user , message = error_msg , execution_time = time . time ( ) - start_time )
2025-07-20 22:30:04 +03:00
if errors :
raise TaskFailedException ( message = f " Errors during task: { errors } " )
return result