2024-10-20 21:57:12 +00:00
import logging
2025-07-20 22:50:22 +03:00
import time
from datetime import datetime , timedelta
2024-10-21 13:22:03 +00:00
from celery import group , shared_task
2025-07-20 22:30:04 +03:00
from celery . exceptions import Retry
2024-10-20 21:57:12 +00:00
logger = logging . getLogger ( __name__ )
2025-07-20 22:50:22 +03:00
def create_task_log ( task_id , task_name , action , status = ' STARTED ' , server = None , user = None , message = ' ' , execution_time = None ) :
""" Helper function to create task execution log """
try :
from . models import TaskExecutionLog
TaskExecutionLog . objects . create (
task_id = task_id ,
task_name = task_name ,
server = server ,
user = user ,
action = action ,
status = status ,
message = message ,
execution_time = execution_time
)
except Exception as e :
# Don't fail tasks if logging fails - just log to console
logger . error ( f " Failed to create task log (task_id: { task_id } , action: { action } ): { e } " )
# If table doesn't exist, just continue without logging to DB
if " does not exist " in str ( e ) :
logger . info ( f " TaskExecutionLog table not found - run migrations. Task: { task_name } , Action: { action } , Status: { status } " )
@shared_task ( name = " cleanup_task_logs " )
def cleanup_task_logs ( ) :
""" Clean up old task execution logs (older than 30 days) """
from . models import TaskExecutionLog
try :
cutoff_date = datetime . now ( ) - timedelta ( days = 30 )
old_logs = TaskExecutionLog . objects . filter ( created_at__lt = cutoff_date )
count = old_logs . count ( )
if count > 0 :
old_logs . delete ( )
logger . info ( f " Cleaned up { count } old task execution logs " )
return f " Cleaned up { count } old task execution logs "
else :
logger . info ( " No old task execution logs to clean up " )
return " No old task execution logs to clean up "
except Exception as e :
logger . error ( f " Error cleaning up task logs: { e } " )
return f " Error cleaning up task logs: { e } "
2025-08-05 01:23:07 +03:00
@shared_task ( name = " sync_xray_inbounds " , bind = True , autoretry_for = ( Exception , ) , retry_kwargs = { ' max_retries ' : 3 , ' countdown ' : 30 } )
def sync_xray_inbounds ( self , server_id ) :
""" Stage 1: Sync inbounds for Xray server. """
from vpn . server_plugins import Server
2025-08-08 05:46:36 +03:00
from vpn . server_plugins . xray_v2 import XrayServerV2
2025-08-05 01:23:07 +03:00
start_time = time . time ( )
task_id = self . request . id
server = None
try :
server = Server . objects . get ( id = server_id )
2025-08-08 05:46:36 +03:00
if not isinstance ( server . get_real_instance ( ) , XrayServerV2 ) :
2025-08-05 01:23:07 +03:00
error_message = f " Server { server . name } is not an Xray server "
logger . error ( error_message )
create_task_log ( task_id , " sync_xray_inbounds " , " Wrong server type " , ' FAILURE ' , server = server , message = error_message , execution_time = time . time ( ) - start_time )
return { " error " : error_message }
create_task_log ( task_id , " sync_xray_inbounds " , f " Starting inbound sync for { server . name } " , ' STARTED ' , server = server )
logger . info ( f " Starting inbound sync for Xray server { server . name } " )
real_server = server . get_real_instance ( )
inbound_result = real_server . sync_inbounds ( )
success_message = f " Successfully synced inbounds for { server . name } "
logger . info ( f " { success_message } . Result: { inbound_result } " )
create_task_log ( task_id , " sync_xray_inbounds " , " Inbound sync completed " , ' SUCCESS ' , server = server , message = f " { success_message } . Result: { inbound_result } " , execution_time = time . time ( ) - start_time )
return inbound_result
except Server . DoesNotExist :
error_message = f " Server with id { server_id } not found "
logger . error ( error_message )
create_task_log ( task_id , " sync_xray_inbounds " , " Server not found " , ' FAILURE ' , message = error_message , execution_time = time . time ( ) - start_time )
return { " error " : error_message }
except Exception as e :
error_message = f " Error syncing inbounds for { server . name if server else server_id } : { e } "
logger . error ( error_message )
if self . request . retries < 3 :
retry_message = f " Retrying inbound sync for { server . name if server else server_id } (attempt { self . request . retries + 1 } ) "
logger . info ( retry_message )
create_task_log ( task_id , " sync_xray_inbounds " , " Retrying inbound sync " , ' RETRY ' , server = server , message = retry_message )
raise self . retry ( countdown = 30 )
create_task_log ( task_id , " sync_xray_inbounds " , " Inbound sync failed after retries " , ' FAILURE ' , server = server , message = error_message , execution_time = time . time ( ) - start_time )
return { " error " : error_message }
@shared_task ( name = " sync_xray_users " , bind = True , autoretry_for = ( Exception , ) , retry_kwargs = { ' max_retries ' : 3 , ' countdown ' : 30 } )
def sync_xray_users ( self , server_id ) :
""" Stage 2: Sync users for Xray server. """
from vpn . server_plugins import Server
2025-08-08 05:46:36 +03:00
from vpn . server_plugins . xray_v2 import XrayServerV2
2025-08-05 01:23:07 +03:00
start_time = time . time ( )
task_id = self . request . id
server = None
try :
server = Server . objects . get ( id = server_id )
2025-08-08 05:46:36 +03:00
if not isinstance ( server . get_real_instance ( ) , XrayServerV2 ) :
2025-08-05 01:23:07 +03:00
error_message = f " Server { server . name } is not an Xray server "
logger . error ( error_message )
create_task_log ( task_id , " sync_xray_users " , " Wrong server type " , ' FAILURE ' , server = server , message = error_message , execution_time = time . time ( ) - start_time )
return { " error " : error_message }
create_task_log ( task_id , " sync_xray_users " , f " Starting user sync for { server . name } " , ' STARTED ' , server = server )
logger . info ( f " Starting user sync for Xray server { server . name } " )
real_server = server . get_real_instance ( )
user_result = real_server . sync_users ( )
success_message = f " Successfully synced { user_result . get ( ' users_added ' , 0 ) } users for { server . name } "
logger . info ( f " { success_message } . Result: { user_result } " )
create_task_log ( task_id , " sync_xray_users " , " User sync completed " , ' SUCCESS ' , server = server , message = f " { success_message } . Result: { user_result } " , execution_time = time . time ( ) - start_time )
return user_result
except Server . DoesNotExist :
error_message = f " Server with id { server_id } not found "
logger . error ( error_message )
create_task_log ( task_id , " sync_xray_users " , " Server not found " , ' FAILURE ' , message = error_message , execution_time = time . time ( ) - start_time )
return { " error " : error_message }
except Exception as e :
error_message = f " Error syncing users for { server . name if server else server_id } : { e } "
logger . error ( error_message )
if self . request . retries < 3 :
retry_message = f " Retrying user sync for { server . name if server else server_id } (attempt { self . request . retries + 1 } ) "
logger . info ( retry_message )
create_task_log ( task_id , " sync_xray_users " , " Retrying user sync " , ' RETRY ' , server = server , message = retry_message )
raise self . retry ( countdown = 30 )
create_task_log ( task_id , " sync_xray_users " , " User sync failed after retries " , ' FAILURE ' , server = server , message = error_message , execution_time = time . time ( ) - start_time )
return { " error " : error_message }
2024-10-20 21:57:12 +00:00
class TaskFailedException ( Exception ) :
def __init__ ( self , message = " " ) :
self . message = message
super ( ) . __init__ ( f " { self . message } " )
2025-07-20 22:30:04 +03:00
@shared_task ( name = " sync_all_servers " , bind = True , autoretry_for = ( Exception , ) , retry_kwargs = { ' max_retries ' : 3 , ' countdown ' : 60 } )
def sync_all_users ( self ) :
2024-10-21 13:22:03 +00:00
from vpn . server_plugins import Server
2025-07-20 22:50:22 +03:00
start_time = time . time ( )
task_id = self . request . id
2025-07-20 22:30:04 +03:00
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_all_servers " , " Starting sync all servers " , ' STARTED ' )
2024-10-21 13:22:03 +00:00
2025-07-20 22:50:22 +03:00
try :
servers = Server . objects . all ( )
if not servers . exists ( ) :
message = " No servers found for synchronization "
logger . warning ( message )
create_task_log ( task_id , " sync_all_servers " , " No servers to sync " , ' SUCCESS ' , message = message , execution_time = time . time ( ) - start_time )
return message
# Filter out servers that might not exist anymore
valid_servers = [ ]
for server in servers :
try :
# Test basic server access
server . get_server_status ( )
valid_servers . append ( server )
except Exception as e :
logger . warning ( f " Skipping server { server . name } (ID: { server . id } ) due to connection issues: { e } " )
create_task_log ( task_id , " sync_all_servers " , f " Skipped server { server . name } " , ' STARTED ' , server = server , message = f " Connection failed: { e } " )
# Log all servers that will be synced
server_list = " , " . join ( [ s . name for s in valid_servers ] )
if valid_servers :
create_task_log ( task_id , " sync_all_servers " , f " Found { len ( valid_servers ) } valid servers " , ' STARTED ' , message = f " Servers: { server_list } " )
tasks = group ( sync_users . s ( server . id ) for server in valid_servers )
result = tasks . apply_async ( )
success_message = f " Initiated sync for { len ( valid_servers ) } servers: { server_list } "
else :
success_message = " No valid servers found for synchronization "
create_task_log ( task_id , " sync_all_servers " , " Sync initiated " , ' SUCCESS ' , message = success_message , execution_time = time . time ( ) - start_time )
return success_message
except Exception as e :
error_message = f " Error initiating sync: { e } "
logger . error ( error_message )
create_task_log ( task_id , " sync_all_servers " , " Sync failed " , ' FAILURE ' , message = error_message , execution_time = time . time ( ) - start_time )
raise
2024-10-21 13:22:03 +00:00
2025-07-20 22:30:04 +03:00
@shared_task ( name = " sync_all_users_on_server " , bind = True , autoretry_for = ( Exception , ) , retry_kwargs = { ' max_retries ' : 3 , ' countdown ' : 60 } )
def sync_users ( self , server_id ) :
from vpn . server_plugins import Server
2025-07-20 22:50:22 +03:00
start_time = time . time ( )
task_id = self . request . id
server = None
2024-10-21 13:22:03 +00:00
try :
2025-07-20 22:50:22 +03:00
try :
server = Server . objects . get ( id = server_id )
except Server . DoesNotExist :
error_message = f " Server with id { server_id } not found - may have been deleted "
2025-07-21 17:40:03 +03:00
logger . warning ( error_message )
create_task_log ( task_id , " sync_all_users_on_server " , " Server not found " , ' SUCCESS ' , message = error_message , execution_time = time . time ( ) - start_time )
2025-07-20 22:50:22 +03:00
return error_message # Don't raise exception for deleted servers
# Test server connectivity before proceeding
try :
server . get_server_status ( )
except Exception as e :
error_message = f " Server { server . name } is not accessible: { e } "
logger . warning ( error_message )
create_task_log ( task_id , " sync_all_users_on_server " , " Server not accessible " , ' FAILURE ' , server = server , message = error_message , execution_time = time . time ( ) - start_time )
return error_message # Don't retry for connectivity issues
create_task_log ( task_id , " sync_all_users_on_server " , f " Starting user sync for server { server . name } " , ' STARTED ' , server = server )
2025-07-20 22:30:04 +03:00
logger . info ( f " Starting user sync for server { server . name } " )
2025-07-20 22:50:22 +03:00
# Get all users for this server
from . models import ACL
acls = ACL . objects . filter ( server = server ) . select_related ( ' user ' )
user_count = acls . count ( )
user_list = " , " . join ( [ acl . user . username for acl in acls [ : 10 ] ] ) # First 10 users
if user_count > 10 :
user_list + = f " and { user_count - 10 } more "
create_task_log ( task_id , " sync_all_users_on_server " , f " Found { user_count } users to sync " , ' STARTED ' , server = server , message = f " Users: { user_list } " )
2025-08-08 05:46:36 +03:00
# For Xray servers, use the new sync methods
from vpn . server_plugins . xray_v2 import XrayServerV2
if isinstance ( server . get_real_instance ( ) , XrayServerV2 ) :
logger . info ( f " Using XrayServerV2 sync for server { server . name } " )
# Just call the sync method which schedules tasks asynchronously
sync_result = server . sync_users ( )
logger . info ( f " Scheduled async sync for Xray server { server . name } " )
2025-08-05 01:23:07 +03:00
else :
# For non-Xray servers, just sync users
sync_result = server . sync_users ( )
# Check if sync was successful (can be boolean or dict/string)
sync_successful = bool ( sync_result ) and (
sync_result is not False and
( isinstance ( sync_result , str ) and " failed " not in sync_result . lower ( ) ) or
isinstance ( sync_result , dict ) or
sync_result is True
)
2025-07-20 22:30:04 +03:00
2025-08-05 01:23:07 +03:00
if sync_successful :
2025-07-20 22:50:22 +03:00
success_message = f " Successfully synced { user_count } users for server { server . name } "
2025-08-05 01:23:07 +03:00
if isinstance ( sync_result , ( str , dict ) ) :
success_message + = f " . Details: { sync_result } "
2025-07-20 22:50:22 +03:00
logger . info ( success_message )
create_task_log ( task_id , " sync_all_users_on_server " , " User sync completed " , ' SUCCESS ' , server = server , message = success_message , execution_time = time . time ( ) - start_time )
return success_message
2025-07-20 22:30:04 +03:00
else :
2025-08-05 01:23:07 +03:00
error_message = f " Sync failed for server { server . name } . Result: { sync_result } "
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_all_users_on_server " , " User sync failed " , ' FAILURE ' , server = server , message = error_message , execution_time = time . time ( ) - start_time )
raise TaskFailedException ( error_message )
except TaskFailedException :
# Don't retry TaskFailedException
raise
2024-10-21 13:22:03 +00:00
except Exception as e :
2025-07-20 22:50:22 +03:00
error_message = f " Error syncing users for server { server . name if server else server_id } : { e } "
logger . error ( error_message )
2025-07-20 22:30:04 +03:00
if self . request . retries < 3 :
2025-07-20 22:50:22 +03:00
retry_message = f " Retrying sync for server { server . name if server else server_id } (attempt { self . request . retries + 1 } ) "
logger . info ( retry_message )
create_task_log ( task_id , " sync_all_users_on_server " , " Retrying user sync " , ' RETRY ' , server = server , message = retry_message )
2025-07-20 22:30:04 +03:00
raise self . retry ( countdown = 60 )
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_all_users_on_server " , " User sync failed after retries " , ' FAILURE ' , server = server , message = error_message , execution_time = time . time ( ) - start_time )
raise TaskFailedException ( error_message )
2024-10-21 13:22:03 +00:00
2025-07-20 22:30:04 +03:00
@shared_task ( name = " sync_server_info " , bind = True , autoretry_for = ( Exception , ) , retry_kwargs = { ' max_retries ' : 3 , ' countdown ' : 30 } )
def sync_server ( self , id ) :
2024-10-20 21:57:12 +00:00
from vpn . server_plugins import Server
2025-07-20 22:30:04 +03:00
2025-07-20 22:50:22 +03:00
start_time = time . time ( )
task_id = self . request . id
server = None
2025-07-20 22:30:04 +03:00
try :
server = Server . objects . get ( id = id )
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_server_info " , f " Starting server info sync for { server . name } " , ' STARTED ' , server = server )
2025-07-20 22:30:04 +03:00
logger . info ( f " Starting server info sync for { server . name } " )
sync_result = server . sync ( )
2025-07-20 22:50:22 +03:00
success_message = f " Successfully synced server info for { server . name } "
result_details = f " Sync result: { sync_result } "
logger . info ( f " { success_message } . { result_details } " )
create_task_log ( task_id , " sync_server_info " , " Server info synced " , ' SUCCESS ' , server = server , message = f " { success_message } . { result_details } " , execution_time = time . time ( ) - start_time )
2025-07-20 22:30:04 +03:00
return { " status " : sync_result , " server " : server . name }
except Server . DoesNotExist :
2025-07-20 22:50:22 +03:00
error_message = f " Server with id { id } not found "
logger . error ( error_message )
create_task_log ( task_id , " sync_server_info " , " Server not found " , ' FAILURE ' , message = error_message , execution_time = time . time ( ) - start_time )
return { " error " : error_message }
2025-07-20 22:30:04 +03:00
except Exception as e :
2025-07-20 22:50:22 +03:00
error_message = f " Error syncing server info for { server . name if server else id } : { e } "
logger . error ( error_message )
2025-07-20 22:30:04 +03:00
if self . request . retries < 3 :
2025-07-20 22:50:22 +03:00
retry_message = f " Retrying server sync for { server . name if server else id } (attempt { self . request . retries + 1 } ) "
logger . info ( retry_message )
create_task_log ( task_id , " sync_server_info " , " Retrying server sync " , ' RETRY ' , server = server , message = retry_message )
2025-07-20 22:30:04 +03:00
raise self . retry ( countdown = 30 )
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_server_info " , " Server sync failed after retries " , ' FAILURE ' , server = server , message = error_message , execution_time = time . time ( ) - start_time )
return { " error " : error_message }
2024-10-20 21:57:12 +00:00
2025-07-21 13:23:10 +03:00
@shared_task ( name = " update_user_statistics " , bind = True , autoretry_for = ( Exception , ) , retry_kwargs = { ' max_retries ' : 3 , ' countdown ' : 60 } )
def update_user_statistics ( self ) :
""" Update cached user statistics from AccessLog data """
from . models import User , AccessLog , UserStatistics , ACLLink
from django . utils import timezone
from datetime import timedelta
from django . db . models import Count , Q
from django . db import transaction
start_time = time . time ( )
task_id = self . request . id
create_task_log ( task_id , " update_user_statistics " , " Starting statistics update " , ' STARTED ' )
try :
now = timezone . now ( )
thirty_days_ago = now - timedelta ( days = 30 )
# Get all users with ACL links
users_with_links = User . objects . filter ( acl__isnull = False ) . distinct ( )
total_users = users_with_links . count ( )
create_task_log ( task_id , " update_user_statistics " , f " Found { total_users } users to process " , ' STARTED ' )
logger . info ( f " Updating statistics for { total_users } users " )
updated_count = 0
with transaction . atomic ( ) :
for user in users_with_links :
logger . debug ( f " Processing user { user . username } " )
# Get all ACL links for this user
acl_links = ACLLink . objects . filter ( acl__user = user ) . select_related ( ' acl__server ' )
for link in acl_links :
server_name = link . acl . server . name
2025-07-21 13:49:43 +03:00
# Calculate total connections for this specific link (all time)
2025-07-21 13:23:10 +03:00
total_connections = AccessLog . objects . filter (
user = user . username ,
server = server_name ,
2025-07-21 13:49:43 +03:00
acl_link_id = link . link ,
2025-07-21 13:23:10 +03:00
action = ' Success '
) . count ( )
# Calculate recent connections (last 30 days)
recent_connections = AccessLog . objects . filter (
user = user . username ,
server = server_name ,
2025-07-21 13:49:43 +03:00
acl_link_id = link . link ,
2025-07-21 13:23:10 +03:00
action = ' Success ' ,
timestamp__gte = thirty_days_ago
) . count ( )
# Generate daily usage data for the last 30 days
daily_usage = [ ]
max_daily = 0
for i in range ( 30 ) :
day_start = ( now - timedelta ( days = 29 - i ) ) . replace ( hour = 0 , minute = 0 , second = 0 , microsecond = 0 )
day_end = day_start + timedelta ( days = 1 )
day_connections = AccessLog . objects . filter (
user = user . username ,
server = server_name ,
2025-07-21 13:49:43 +03:00
acl_link_id = link . link ,
2025-07-21 13:23:10 +03:00
action = ' Success ' ,
timestamp__gte = day_start ,
timestamp__lt = day_end
) . count ( )
daily_usage . append ( day_connections )
max_daily = max ( max_daily , day_connections )
# Update or create statistics for this link
stats , created = UserStatistics . objects . update_or_create (
user = user ,
server_name = server_name ,
acl_link_id = link . link ,
defaults = {
' total_connections ' : total_connections ,
' recent_connections ' : recent_connections ,
' daily_usage ' : daily_usage ,
' max_daily ' : max_daily ,
}
)
action = " created " if created else " updated "
2025-07-21 13:49:43 +03:00
logger . debug ( f " { action } stats for { user . username } on { server_name } (link: { link . link } ): { total_connections } total, { recent_connections } recent " )
2025-07-21 13:23:10 +03:00
updated_count + = 1
logger . debug ( f " Completed processing user { user . username } " )
success_message = f " Successfully updated statistics for { updated_count } user-server-link combinations "
logger . info ( success_message )
create_task_log (
task_id ,
" update_user_statistics " ,
" Statistics update completed " ,
' SUCCESS ' ,
message = success_message ,
execution_time = time . time ( ) - start_time
)
return success_message
except Exception as e :
error_message = f " Error updating user statistics: { e } "
logger . error ( error_message , exc_info = True )
if self . request . retries < 3 :
retry_message = f " Retrying statistics update (attempt { self . request . retries + 1 } ) "
logger . info ( retry_message )
create_task_log ( task_id , " update_user_statistics " , " Retrying statistics update " , ' RETRY ' , message = retry_message )
raise self . retry ( countdown = 60 )
create_task_log (
task_id ,
" update_user_statistics " ,
" Statistics update failed after retries " ,
' FAILURE ' ,
message = error_message ,
execution_time = time . time ( ) - start_time
)
raise
2025-07-20 22:30:04 +03:00
@shared_task ( name = " sync_user_on_server " , bind = True , autoretry_for = ( Exception , ) , retry_kwargs = { ' max_retries ' : 5 , ' countdown ' : 30 } )
def sync_user ( self , user_id , server_id ) :
2024-10-20 21:57:12 +00:00
from . models import User , ACL
from vpn . server_plugins import Server
2025-07-20 22:50:22 +03:00
start_time = time . time ( )
task_id = self . request . id
2024-10-20 21:57:12 +00:00
errors = { }
result = { }
2025-07-20 22:50:22 +03:00
user = None
server = None
2025-07-20 22:30:04 +03:00
2024-10-21 13:22:03 +00:00
try :
2025-07-20 22:30:04 +03:00
user = User . objects . get ( id = user_id )
server = Server . objects . get ( id = server_id )
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_user_on_server " , f " Starting user sync for { user . username } on { server . name } " , ' STARTED ' , server = server , user = user )
2025-07-20 22:30:04 +03:00
logger . info ( f " Syncing user { user . username } on server { server . name } " )
# Check if ACL exists
acl_exists = ACL . objects . filter ( user = user , server = server ) . exists ( )
if acl_exists :
# User should exist on server
2025-07-20 22:50:22 +03:00
action_message = f " Adding/updating user { user . username } on server { server . name } "
create_task_log ( task_id , " sync_user_on_server " , action_message , ' STARTED ' , server = server , user = user )
2024-10-21 13:22:03 +00:00
result [ server . name ] = server . add_user ( user )
2025-07-20 22:50:22 +03:00
success_message = f " Successfully added/updated user { user . username } on server { server . name } "
logger . info ( success_message )
create_task_log ( task_id , " sync_user_on_server " , " User added/updated " , ' SUCCESS ' , server = server , user = user , message = f " { success_message } . Result: { result [ server . name ] } " , execution_time = time . time ( ) - start_time )
2024-10-21 13:22:03 +00:00
else :
2025-07-20 22:30:04 +03:00
# User should be removed from server
2025-07-20 22:50:22 +03:00
action_message = f " Removing user { user . username } from server { server . name } "
create_task_log ( task_id , " sync_user_on_server " , action_message , ' STARTED ' , server = server , user = user )
2024-10-21 13:22:03 +00:00
result [ server . name ] = server . delete_user ( user )
2025-07-20 22:50:22 +03:00
success_message = f " Successfully removed user { user . username } from server { server . name } "
logger . info ( success_message )
create_task_log ( task_id , " sync_user_on_server " , " User removed " , ' SUCCESS ' , server = server , user = user , message = f " { success_message } . Result: { result [ server . name ] } " , execution_time = time . time ( ) - start_time )
2025-07-20 22:30:04 +03:00
except User . DoesNotExist :
error_msg = f " User with id { user_id } not found "
logger . error ( error_msg )
errors [ " user " ] = error_msg
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_user_on_server " , " User not found " , ' FAILURE ' , message = error_msg , execution_time = time . time ( ) - start_time )
2025-07-20 22:30:04 +03:00
except Server . DoesNotExist :
error_msg = f " Server with id { server_id } not found "
logger . error ( error_msg )
errors [ " server " ] = error_msg
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_user_on_server " , " Server not found " , ' FAILURE ' , message = error_msg , execution_time = time . time ( ) - start_time )
2024-10-21 13:22:03 +00:00
except Exception as e :
2025-07-20 22:50:22 +03:00
error_msg = f " Error syncing user { user . username if user else user_id } on server { server . name if server else server_id } : { e } "
2025-07-20 22:30:04 +03:00
logger . error ( error_msg )
errors [ f " server_ { server_id } " ] = error_msg
# Retry on failure unless it's a permanent error
if self . request . retries < 5 :
2025-07-20 22:50:22 +03:00
retry_message = f " Retrying user sync for user { user . username if user else user_id } on server { server . name if server else server_id } (attempt { self . request . retries + 1 } ) "
logger . info ( retry_message )
create_task_log ( task_id , " sync_user_on_server " , " Retrying user sync " , ' RETRY ' , server = server , user = user , message = retry_message )
2025-07-20 22:30:04 +03:00
raise self . retry ( countdown = 30 )
2025-07-20 22:50:22 +03:00
create_task_log ( task_id , " sync_user_on_server " , " User sync failed after retries " , ' FAILURE ' , server = server , user = user , message = error_msg , execution_time = time . time ( ) - start_time )
2025-07-20 22:30:04 +03:00
if errors :
raise TaskFailedException ( message = f " Errors during task: { errors } " )
2025-08-08 05:46:36 +03:00
return result
@shared_task ( name = " sync_user_xray_access " , bind = True )
def sync_user_xray_access ( self , user_id , server_id ) :
"""
Sync user ' s Xray access based on subscription groups.
Creates inbounds on server if needed and adds user to them .
"""
from . models import User , Server
2025-08-08 06:50:04 +03:00
from . models_xray import SubscriptionGroup , Inbound
2025-08-08 05:46:36 +03:00
from vpn . xray_api_v2 . client import XrayClient
2025-08-08 06:50:04 +03:00
from vpn . server_plugins . xray_v2 import XrayServerV2
2025-08-08 05:46:36 +03:00
start_time = time . time ( )
task_id = self . request . id
try :
user = User . objects . get ( id = user_id )
server = Server . objects . get ( id = server_id )
2025-08-08 06:50:04 +03:00
# Get server instance
real_server = server . get_real_instance ( )
if not isinstance ( real_server , XrayServerV2 ) :
raise ValueError ( f " Server { server . name } is not an Xray v2 server " )
2025-08-08 05:46:36 +03:00
create_task_log (
task_id , " sync_user_xray_access " ,
f " Starting Xray sync for { user . username } on { server . name } " ,
' STARTED ' , server = server , user = user
)
# Get user's active subscription groups
user_groups = SubscriptionGroup . objects . filter (
usersubscription__user = user ,
usersubscription__active = True ,
is_active = True
) . prefetch_related ( ' inbounds ' )
if not user_groups . exists ( ) :
logger . info ( f " User { user . username } has no active subscriptions " )
return { " status " : " No active subscriptions " }
# Collect all inbounds from user's groups
user_inbounds = Inbound . objects . filter (
subscriptiongroup__in = user_groups
) . distinct ( )
logger . info ( f " User { user . username } has access to { user_inbounds . count ( ) } inbounds " )
# Connect to Xray server
2025-08-08 06:50:04 +03:00
client = XrayClient ( real_server . api_address )
2025-08-08 05:46:36 +03:00
# Get existing inbounds on server
try :
existing_result = client . execute_command ( ' lsi ' ) # List inbounds
existing_inbounds = existing_result . get ( ' inbounds ' , [ ] ) if existing_result else [ ]
existing_tags = { ib . get ( ' tag ' ) for ib in existing_inbounds if ib . get ( ' tag ' ) }
except Exception as e :
logger . warning ( f " Failed to list existing inbounds: { e } " )
existing_tags = set ( )
results = {
' inbounds_created ' : [ ] ,
' users_added ' : [ ] ,
' errors ' : [ ]
}
# Process each inbound
for inbound in user_inbounds :
try :
# Check if inbound exists on server
if inbound . name not in existing_tags :
logger . info ( f " Creating inbound { inbound . name } on server " )
# Build inbound configuration
if not inbound . full_config :
inbound . build_config ( )
inbound . save ( )
# Add inbound to server
client . execute_command ( ' adi ' , json_files = [ inbound . full_config ] )
results [ ' inbounds_created ' ] . append ( inbound . name )
# Add user to inbound
logger . info ( f " Adding user { user . username } to inbound { inbound . name } " )
# Create user config based on protocol
import uuid
# Generate user UUID based on username and inbound
user_uuid = str ( uuid . uuid5 ( uuid . NAMESPACE_DNS , f " { user . username } - { inbound . name } " ) )
if inbound . protocol == ' vless ' :
user_config = {
" email " : f " { user . username } @ { server . name } " ,
" id " : user_uuid ,
" level " : 0
}
elif inbound . protocol == ' vmess ' :
user_config = {
" email " : f " { user . username } @ { server . name } " ,
" id " : user_uuid ,
" level " : 0 ,
" alterId " : 0
}
elif inbound . protocol == ' trojan ' :
user_config = {
" email " : f " { user . username } @ { server . name } " ,
" password " : user_uuid ,
" level " : 0
}
else :
logger . warning ( f " Unsupported protocol: { inbound . protocol } " )
continue
# Add user to inbound
add_request = {
" inboundTag " : inbound . name ,
" user " : user_config
}
client . execute_command ( ' adu ' , json_files = [ add_request ] )
results [ ' users_added ' ] . append ( f " { user . username } -> { inbound . name } " )
except Exception as e :
error_msg = f " Error processing inbound { inbound . name } : { e } "
logger . error ( error_msg )
results [ ' errors ' ] . append ( error_msg )
# Log results
success_msg = (
f " Xray sync completed for { user . username } : "
f " Created { len ( results [ ' inbounds_created ' ] ) } inbounds, "
f " Added user to { len ( results [ ' users_added ' ] ) } inbounds "
)
create_task_log (
task_id , " sync_user_xray_access " ,
" Xray sync completed " , ' SUCCESS ' ,
server = server , user = user ,
message = success_msg ,
execution_time = time . time ( ) - start_time
)
return results
except Exception as e :
error_msg = f " Error in Xray sync: { e } "
logger . error ( error_msg , exc_info = True )
create_task_log (
task_id , " sync_user_xray_access " ,
" Xray sync failed " , ' FAILURE ' ,
message = error_msg ,
execution_time = time . time ( ) - start_time
)
raise
@shared_task ( name = " sync_server_users " , bind = True )
def sync_server_users ( self , server_id ) :
"""
Sync all users for a specific Xray server .
This is called by XrayServerV2 . sync_users ( )
"""
from vpn . server_plugins import Server
from vpn . models import User , ACL
from vpn . models_xray import UserSubscription
try :
server = Server . objects . get ( id = server_id )
real_server = server . get_real_instance ( )
# Get all users who should have access to this server
# For Xray v2, users access through subscription groups
users_to_sync = User . objects . filter (
xray_subscriptions__active = True ,
xray_subscriptions__subscription_group__is_active = True
) . distinct ( )
logger . info ( f " Syncing { users_to_sync . count ( ) } users for Xray server { server . name } " )
added_count = 0
for user in users_to_sync :
try :
if real_server . add_user ( user ) :
added_count + = 1
except Exception as e :
logger . error ( f " Failed to sync user { user . username } on server { server . name } : { e } " )
logger . info ( f " Successfully synced { added_count } users for server { server . name } " )
return { " users_added " : added_count , " total_users " : users_to_sync . count ( ) }
except Exception as e :
logger . error ( f " Error syncing users for server { server_id } : { e } " )
raise
@shared_task ( name = " sync_server_inbounds " , bind = True )
2025-08-08 06:50:04 +03:00
def sync_server_inbounds ( self , server_id , auto_sync_users = True ) :
2025-08-08 05:46:36 +03:00
"""
Sync all inbounds for a specific Xray server .
This is called by XrayServerV2 . sync_inbounds ( )
"""
from vpn . server_plugins import Server
2025-08-08 06:50:04 +03:00
from vpn . models_xray import SubscriptionGroup , ServerInbound , UserSubscription
2025-08-08 05:46:36 +03:00
try :
server = Server . objects . get ( id = server_id )
real_server = server . get_real_instance ( )
# Get all subscription groups
groups = SubscriptionGroup . objects . filter ( is_active = True ) . prefetch_related ( ' inbounds ' )
deployed_count = 0
for group in groups :
for inbound in group . inbounds . all ( ) :
try :
2025-08-08 06:50:04 +03:00
# Get users for this inbound
users_with_access = [ ]
group_users = [
sub . user for sub in
UserSubscription . objects . filter (
subscription_group = group ,
active = True
) . select_related ( ' user ' )
]
users_with_access . extend ( group_users )
# Remove duplicates
users_with_access = list ( set ( users_with_access ) )
# Deploy inbound with users
if real_server . deploy_inbound ( inbound , users = users_with_access ) :
2025-08-08 05:46:36 +03:00
deployed_count + = 1
2025-08-08 06:50:04 +03:00
# Mark as deployed
ServerInbound . objects . update_or_create (
server = server ,
inbound = inbound ,
defaults = { ' active ' : True }
)
logger . info ( f " Deployed inbound { inbound . name } with { len ( users_with_access ) } users on server { server . name } " )
else :
logger . error ( f " Failed to deploy inbound { inbound . name } on server { server . name } " )
2025-08-08 05:46:36 +03:00
except Exception as e :
logger . error ( f " Failed to deploy inbound { inbound . name } on server { server . name } : { e } " )
logger . info ( f " Successfully deployed { deployed_count } inbounds on server { server . name } " )
2025-08-08 06:50:04 +03:00
# Automatically sync users after inbound deployment if requested
if auto_sync_users and deployed_count > 0 :
logger . info ( f " Scheduling user sync for server { server . name } after inbound deployment " )
sync_server_users . apply_async ( args = [ server_id ] , countdown = 5 ) # 5 second delay
return { " inbounds_deployed " : deployed_count , " auto_sync_users " : auto_sync_users }
2025-08-08 05:46:36 +03:00
except Exception as e :
logger . error ( f " Error syncing inbounds for server { server_id } : { e } " )
raise
2025-08-08 06:50:04 +03:00
@shared_task ( name = " deploy_inbound_on_server " , bind = True )
def deploy_inbound_on_server ( self , server_id , inbound_id ) :
"""
Deploy a specific inbound on a specific server
"""
from vpn . server_plugins import Server
from vpn . models_xray import Inbound
try :
server = Server . objects . get ( id = server_id )
real_server = server . get_real_instance ( )
inbound = Inbound . objects . get ( id = inbound_id )
logger . info ( f " Deploying inbound { inbound . name } on server { server . name } " )
# Get all users that should have access to this inbound
from vpn . models_xray import UserSubscription
users_with_access = [ ]
# Find users through subscription groups
for group in inbound . subscriptiongroup_set . filter ( is_active = True ) :
group_users = [
sub . user for sub in
UserSubscription . objects . filter (
subscription_group = group ,
active = True
) . select_related ( ' user ' )
]
users_with_access . extend ( group_users )
# Remove duplicates
users_with_access = list ( set ( users_with_access ) )
logger . info ( f " Deploying inbound { inbound . name } with { len ( users_with_access ) } users " )
# Deploy inbound with users
if real_server . deploy_inbound ( inbound , users = users_with_access ) :
# Mark as deployed
from vpn . models_xray import ServerInbound
ServerInbound . objects . update_or_create (
server = server ,
inbound = inbound ,
defaults = { ' active ' : True }
)
logger . info ( f " Successfully deployed inbound { inbound . name } on server { server . name } " )
return { " success " : True , " inbound " : inbound . name , " server " : server . name , " users " : len ( users_with_access ) }
else :
logger . error ( f " Failed to deploy inbound { inbound . name } on server { server . name } " )
return { " success " : False , " inbound " : inbound . name , " server " : server . name , " error " : " Deployment failed " }
except Exception as e :
logger . error ( f " Error deploying inbound { inbound_id } on server { server_id } : { e } " )
return { " success " : False , " error " : str ( e ) }
@shared_task ( name = " remove_inbound_from_server " , bind = True )
def remove_inbound_from_server ( self , server_id , inbound_name ) :
"""
Remove a specific inbound from a specific server
"""
from vpn . server_plugins import Server
from vpn . xray_api_v2 . client import XrayClient
try :
server = Server . objects . get ( id = server_id )
real_server = server . get_real_instance ( )
logger . info ( f " Removing inbound { inbound_name } from server { server . name } " )
# Remove inbound using Xray API
client = XrayClient ( server = real_server . api_address )
result = client . remove_inbound ( inbound_name )
# Remove from ServerInbound tracking
from vpn . models_xray import ServerInbound , Inbound
try :
inbound = Inbound . objects . get ( name = inbound_name )
ServerInbound . objects . filter ( server = server , inbound = inbound ) . delete ( )
except Inbound . DoesNotExist :
pass # Inbound was already deleted from Django
logger . info ( f " Successfully removed inbound { inbound_name } from server { server . name } " )
return { " success " : True , " inbound " : inbound_name , " server " : server . name }
except Exception as e :
logger . error ( f " Error removing inbound { inbound_name } from server { server_id } : { e } " )
return { " success " : False , " error " : str ( e ) }
@shared_task ( name = " remove_user_from_server " , bind = True )
def remove_user_from_server ( self , server_id , user_id ) :
"""
Remove a specific user from a specific server
"""
from vpn . server_plugins import Server
from vpn . models import User
try :
server = Server . objects . get ( id = server_id )
real_server = server . get_real_instance ( )
user = User . objects . get ( id = user_id )
logger . info ( f " Removing user { user . username } from server { server . name } " )
result = real_server . delete_user ( user )
if result :
logger . info ( f " Successfully removed user { user . username } from server { server . name } " )
return { " success " : True , " user " : user . username , " server " : server . name }
else :
logger . warning ( f " Failed to remove user { user . username } from server { server . name } " )
return { " success " : False , " user " : user . username , " server " : server . name , " error " : " Removal failed " }
except Exception as e :
logger . error ( f " Error removing user { user_id } from server { server_id } : { e } " )
return { " success " : False , " error " : str ( e ) }
2025-08-08 05:46:36 +03:00
@shared_task ( name = " generate_certificate_task " , bind = True )
def generate_certificate_task ( self , certificate_id ) :
"""
Generate Let ' s Encrypt certificate for a domain
"""
from . models_xray import Certificate
from vpn . letsencrypt . letsencrypt_dns import get_certificate_for_domain
from django . utils import timezone
from datetime import timedelta
start_time = time . time ( )
task_id = self . request . id
try :
cert = Certificate . objects . get ( id = certificate_id )
create_task_log (
task_id , " generate_certificate_task " ,
f " Starting certificate generation for { cert . domain } " ,
' STARTED '
)
# Check if we have credentials
if not cert . credentials :
raise ValueError ( f " No credentials configured for { cert . domain } " )
# Get Cloudflare token from credentials
cf_token = cert . credentials . get_credential ( ' api_token ' )
if not cf_token :
raise ValueError ( f " No Cloudflare API token found for { cert . domain } " )
logger . info ( f " Generating certificate for { cert . domain } using email { cert . acme_email } " )
# Request certificate using the library function
cert_pem , key_pem = get_certificate_for_domain (
domain = cert . domain ,
email = cert . acme_email ,
cloudflare_token = cf_token ,
staging = False # Production certificate
)
# Update certificate object
cert . certificate_pem = cert_pem
cert . private_key_pem = key_pem
cert . expires_at = timezone . now ( ) + timedelta ( days = 90 ) # Let's Encrypt certs are valid for 90 days
cert . last_renewed = timezone . now ( )
cert . save ( )
success_msg = f " Certificate for { cert . domain } generated successfully "
logger . info ( success_msg )
create_task_log (
task_id , " generate_certificate_task " ,
" Certificate generated " , ' SUCCESS ' ,
message = success_msg ,
execution_time = time . time ( ) - start_time
)
return { " status " : " success " , " domain " : cert . domain }
except Certificate . DoesNotExist :
error_msg = f " Certificate with id { certificate_id } not found "
logger . error ( error_msg )
create_task_log (
task_id , " generate_certificate_task " ,
" Certificate not found " , ' FAILURE ' ,
message = error_msg ,
execution_time = time . time ( ) - start_time
)
raise
except Exception as e :
error_msg = f " Failed to generate certificate: { e } "
logger . error ( error_msg , exc_info = True )
create_task_log (
task_id , " generate_certificate_task " ,
" Certificate generation failed " , ' FAILURE ' ,
message = error_msg ,
execution_time = time . time ( ) - start_time
)
raise
@shared_task ( name = " renew_certificates " , bind = True )
def renew_certificates ( self ) :
"""
Check and renew certificates that are about to expire .
"""
2025-08-08 06:50:04 +03:00
from . models_xray import Certificate
2025-08-08 05:46:36 +03:00
from . letsencrypt import get_certificate_for_domain
from datetime import datetime
start_time = time . time ( )
task_id = self . request . id
create_task_log ( task_id , " renew_certificates " , " Starting certificate renewal check " , ' STARTED ' )
try :
# Get certificates that need renewal
certs_to_renew = Certificate . objects . filter (
auto_renew = True ,
cert_type = ' letsencrypt '
)
renewed_count = 0
errors = [ ]
for cert in certs_to_renew :
if not cert . needs_renewal :
continue
try :
logger . info ( f " Renewing certificate for { cert . domain } " )
# Check if we have credentials
if not cert . credentials :
logger . warning ( f " No credentials configured for { cert . domain } " )
continue
# Get Cloudflare token from credentials
cf_token = cert . credentials . get_credential ( ' api_token ' )
cf_email = cert . credentials . get_credential ( ' email ' , ' admin@example.com ' )
if not cf_token :
logger . error ( f " No Cloudflare API token found for { cert . domain } " )
continue
# Renew certificate
cert_pem , key_pem = get_certificate_for_domain (
domain = cert . domain ,
email = cf_email ,
cloudflare_token = cf_token ,
staging = False # Production certificate
)
# Update certificate
cert . certificate_pem = cert_pem
cert . private_key_pem = key_pem
cert . last_renewed = datetime . now ( )
cert . expires_at = datetime . now ( ) + timedelta ( days = 90 ) # Let's Encrypt certs are valid for 90 days
cert . save ( )
renewed_count + = 1
logger . info ( f " Successfully renewed certificate for { cert . domain } " )
except Exception as e :
error_msg = f " Failed to renew certificate for { cert . domain } : { e } "
logger . error ( error_msg )
errors . append ( error_msg )
# Summary
if renewed_count > 0 or errors :
summary = f " Renewed { renewed_count } certificates "
if errors :
summary + = f " , { len ( errors ) } errors "
create_task_log (
task_id , " renew_certificates " ,
" Certificate renewal completed " ,
' SUCCESS ' if not errors else ' PARTIAL ' ,
message = summary ,
execution_time = time . time ( ) - start_time
)
else :
create_task_log (
task_id , " renew_certificates " ,
" No certificates need renewal " ,
' SUCCESS ' ,
execution_time = time . time ( ) - start_time
)
return {
' renewed ' : renewed_count ,
' errors ' : errors
}
except Exception as e :
error_msg = f " Certificate renewal task failed: { e } "
logger . error ( error_msg , exc_info = True )
create_task_log (
task_id , " renew_certificates " ,
" Certificate renewal failed " ,
' FAILURE ' ,
message = error_msg ,
execution_time = time . time ( ) - start_time
)
raise