Files
OutFleet/vpn/signals.py

352 lines
14 KiB
Python
Raw Normal View History

2025-08-08 06:50:04 +03:00
"""
Django signals for automatic Xray server synchronization
"""
import logging
from django.db.models.signals import post_save, post_delete, m2m_changed
from django.dispatch import receiver
from django.db import transaction
from celery import group
from .models_xray import (
Inbound,
SubscriptionGroup,
UserSubscription,
Certificate,
ServerInbound
)
from .server_plugins.xray_v2 import XrayServerV2
logger = logging.getLogger(__name__)
def get_active_xray_servers():
"""Get all active Xray servers"""
from .server_plugins import Server
return [
server.get_real_instance()
for server in Server.objects.all()
if hasattr(server.get_real_instance(), 'api_enabled') and
server.get_real_instance().api_enabled
]
def schedule_inbound_sync_for_servers(inbound, servers=None):
"""Schedule inbound deployment on servers"""
if servers is None:
servers = get_active_xray_servers()
if not servers:
logger.warning("No active Xray servers found for inbound sync")
return
logger.info(f"Scheduling inbound {inbound.name} deployment on {len(servers)} servers")
# Schedule deployment tasks
from .tasks import deploy_inbound_on_server
tasks = []
for server in servers:
task = deploy_inbound_on_server.s(server.id, inbound.id)
tasks.append(task)
# Execute all deployments in parallel
job = group(tasks)
result = job.apply_async()
logger.info(f"Scheduled inbound deployment tasks: {result}")
return result
def schedule_user_sync_for_servers(servers=None):
"""Schedule user sync on servers after inbound changes"""
if servers is None:
servers = get_active_xray_servers()
if not servers:
logger.warning("No active Xray servers found for user sync")
return
logger.info(f"Scheduling user sync on {len(servers)} servers")
# Schedule user sync tasks
from .tasks import sync_server_users
tasks = []
for server in servers:
task = sync_server_users.s(server.id)
tasks.append(task)
# Execute all user syncs in parallel with delay to allow inbound sync to complete
job = group(tasks)
result = job.apply_async(countdown=10) # 10 second delay
logger.info(f"Scheduled user sync tasks: {result}")
return result
@receiver(post_save, sender=Inbound)
def inbound_created_or_updated(sender, instance, created, **kwargs):
"""
When an inbound is created or updated, deploy it to all servers
where subscription groups contain this inbound
"""
if created:
logger.info(f"New inbound {instance.name} created, will deploy when added to groups")
else:
logger.info(f"Inbound {instance.name} updated, scheduling redeployment")
# Get all subscription groups that contain this inbound
groups = instance.subscriptiongroup_set.filter(is_active=True)
if groups.exists():
# Get all servers that should have this inbound
servers = get_active_xray_servers()
# Schedule redeployment
transaction.on_commit(lambda: schedule_inbound_sync_for_servers(instance, servers))
# Schedule user sync after inbound update
transaction.on_commit(lambda: schedule_user_sync_for_servers(servers))
@receiver(post_delete, sender=Inbound)
def inbound_deleted(sender, instance, **kwargs):
"""
When an inbound is deleted, remove it from all servers
"""
logger.info(f"Inbound {instance.name} deleted, scheduling removal from servers")
# Schedule removal from all servers
from .tasks import remove_inbound_from_server
servers = get_active_xray_servers()
tasks = []
for server in servers:
task = remove_inbound_from_server.s(server.id, instance.name)
tasks.append(task)
if tasks:
job = group(tasks)
transaction.on_commit(lambda: job.apply_async())
@receiver(m2m_changed, sender=SubscriptionGroup.inbounds.through)
def subscription_group_inbounds_changed(sender, instance, action, pk_set, **kwargs):
"""
When inbounds are added/removed from subscription groups,
automatically deploy/remove them on servers
"""
if action in ['post_add', 'post_remove']:
logger.info(f"Subscription group {instance.name} inbounds changed: {action}")
if action == 'post_add' and pk_set:
# Inbounds were added to the group - deploy them
inbounds = Inbound.objects.filter(pk__in=pk_set)
servers = get_active_xray_servers()
for inbound in inbounds:
logger.info(f"Deploying inbound {inbound.name} (added to group {instance.name})")
transaction.on_commit(
lambda inb=inbound: schedule_inbound_sync_for_servers(inb, servers)
)
# Schedule user sync after all inbounds are deployed
transaction.on_commit(lambda: schedule_user_sync_for_servers(servers))
elif action == 'post_remove' and pk_set:
# Inbounds were removed from the group
inbounds = Inbound.objects.filter(pk__in=pk_set)
for inbound in inbounds:
# Check if inbound is still used by other active groups
other_groups = inbound.subscriptiongroup_set.filter(is_active=True).exclude(id=instance.id)
if not other_groups.exists():
# Inbound is not used by any other group - remove from servers
logger.info(f"Removing inbound {inbound.name} from servers (no longer in any group)")
from .tasks import remove_inbound_from_server
servers = get_active_xray_servers()
tasks = []
for server in servers:
task = remove_inbound_from_server.s(server.id, inbound.name)
tasks.append(task)
if tasks:
job = group(tasks)
transaction.on_commit(lambda: job.apply_async())
2025-08-08 07:39:01 +03:00
@receiver(post_save, sender=ServerInbound)
def server_inbound_created_or_updated(sender, instance, created, **kwargs):
"""
When ServerInbound is created or updated, immediately deploy the inbound
template to the server (not wait for subscription group changes)
"""
logger.info(f"ServerInbound {instance.inbound.name} {'created' if created else 'updated'} for server {instance.server.name}")
if instance.active:
# Deploy inbound immediately
servers = [instance.server.get_real_instance()]
transaction.on_commit(
lambda: schedule_inbound_sync_for_servers(instance.inbound, servers)
)
# Schedule user sync after inbound deployment
transaction.on_commit(lambda: schedule_user_sync_for_servers(servers))
else:
# Remove inbound from server if deactivated
logger.info(f"Removing inbound {instance.inbound.name} from server {instance.server.name} (deactivated)")
from .tasks import remove_inbound_from_server
task = remove_inbound_from_server.s(instance.server.id, instance.inbound.name)
transaction.on_commit(lambda: task.apply_async())
@receiver(post_delete, sender=ServerInbound)
def server_inbound_deleted(sender, instance, **kwargs):
"""
When ServerInbound is deleted, remove the inbound from the server
"""
logger.info(f"ServerInbound {instance.inbound.name} deleted from server {instance.server.name}")
from .tasks import remove_inbound_from_server
task = remove_inbound_from_server.s(instance.server.id, instance.inbound.name)
transaction.on_commit(lambda: task.apply_async())
2025-08-08 06:50:04 +03:00
@receiver(post_save, sender=UserSubscription)
def user_subscription_created_or_updated(sender, instance, created, **kwargs):
"""
When user subscription is created or updated, sync the user to servers
"""
if created:
logger.info(f"New subscription created for user {instance.user.username} in group {instance.subscription_group.name}")
else:
logger.info(f"Subscription updated for user {instance.user.username} in group {instance.subscription_group.name}")
if instance.active:
# Schedule user sync on all servers
servers = get_active_xray_servers()
transaction.on_commit(lambda: schedule_user_sync_for_servers(servers))
@receiver(post_delete, sender=UserSubscription)
def user_subscription_deleted(sender, instance, **kwargs):
"""
When user subscription is deleted, remove user from servers if no other subscriptions
"""
logger.info(f"Subscription deleted for user {instance.user.username} in group {instance.subscription_group.name}")
# Check if user has other active subscriptions
other_subscriptions = UserSubscription.objects.filter(
user=instance.user,
active=True
).exclude(id=instance.id).exists()
if not other_subscriptions:
# User has no more subscriptions - remove from all servers
logger.info(f"User {instance.user.username} has no more subscriptions, removing from servers")
from .tasks import remove_user_from_server
servers = get_active_xray_servers()
tasks = []
for server in servers:
task = remove_user_from_server.s(server.id, instance.user.id)
tasks.append(task)
if tasks:
job = group(tasks)
transaction.on_commit(lambda: job.apply_async())
@receiver(post_save, sender=Certificate)
def certificate_updated(sender, instance, created, **kwargs):
"""
When certificate is updated, redeploy all inbounds that use it
"""
if not created and instance.certificate_pem: # Only on updates when cert is available
logger.info(f"Certificate {instance.domain} updated, redeploying dependent inbounds")
2025-08-08 09:08:18 +03:00
# Find all ServerInbound deployments that use this certificate
from vpn.models_xray import ServerInbound
server_inbounds = ServerInbound.objects.filter(certificate=instance).select_related('server', 'inbound')
# Group by server for efficient syncing
servers_to_sync = set()
for server_inbound in server_inbounds:
servers_to_sync.add(server_inbound.server)
2025-08-08 06:50:04 +03:00
2025-08-08 09:08:18 +03:00
# Schedule sync for each affected server
for server in servers_to_sync:
2025-08-08 06:50:04 +03:00
transaction.on_commit(
2025-08-08 09:08:18 +03:00
lambda srv=server: srv.sync_inbounds()
2025-08-08 06:50:04 +03:00
)
@receiver(post_save, sender=SubscriptionGroup)
def subscription_group_updated(sender, instance, created, **kwargs):
"""
When subscription group is created/updated, sync its state
"""
if created:
logger.info(f"New subscription group {instance.name} created")
else:
logger.info(f"Subscription group {instance.name} updated")
if not instance.is_active:
# Group was deactivated - remove its inbounds from servers if not used elsewhere
logger.info(f"Subscription group {instance.name} deactivated, checking inbounds")
for inbound in instance.inbounds.all():
# Check if inbound is used by other active groups
other_groups = inbound.subscriptiongroup_set.filter(is_active=True).exclude(id=instance.id)
if not other_groups.exists():
# Remove inbound from servers
logger.info(f"Removing inbound {inbound.name} from servers (group deactivated)")
from .tasks import remove_inbound_from_server
servers = get_active_xray_servers()
tasks = []
for server in servers:
task = remove_inbound_from_server.s(server.id, inbound.name)
tasks.append(task)
if tasks:
job = group(tasks)
2025-08-08 07:39:01 +03:00
transaction.on_commit(lambda: job.apply_async())
@receiver(post_save, sender=ServerInbound)
def server_inbound_created_or_updated(sender, instance, created, **kwargs):
"""
When ServerInbound is created or updated, immediately deploy the inbound
template to the server (not wait for subscription group changes)
"""
logger.info(f"ServerInbound {instance.inbound.name} {'created' if created else 'updated'} for server {instance.server.name}")
if instance.active:
# Deploy inbound immediately
servers = [instance.server.get_real_instance()]
transaction.on_commit(
lambda: schedule_inbound_sync_for_servers(instance.inbound, servers)
)
# Schedule user sync after inbound deployment
transaction.on_commit(lambda: schedule_user_sync_for_servers(servers))
else:
# Remove inbound from server if deactivated
logger.info(f"Removing inbound {instance.inbound.name} from server {instance.server.name} (deactivated)")
from .tasks import remove_inbound_from_server
task = remove_inbound_from_server.s(instance.server.id, instance.inbound.name)
transaction.on_commit(lambda: task.apply_async())
@receiver(post_delete, sender=ServerInbound)
def server_inbound_deleted(sender, instance, **kwargs):
"""
When ServerInbound is deleted, remove the inbound from the server
"""
logger.info(f"ServerInbound {instance.inbound.name} deleted from server {instance.server.name}")
from .tasks import remove_inbound_from_server
task = remove_inbound_from_server.s(instance.server.id, instance.inbound.name)
transaction.on_commit(lambda: task.apply_async())