""" User admin interface """ import shortuuid from django.contrib import admin from django.utils.safestring import mark_safe from django.utils.html import format_html from django.db.models import Count from django.shortcuts import get_object_or_404 from django.contrib import messages from django.urls import path, reverse from django.http import HttpResponseRedirect, JsonResponse from django.utils.timezone import localtime from mysite.settings import EXTERNAL_ADDRESS from vpn.models import User, ACL, ACLLink, Server, AccessLog, UserStatistics from vpn.forms import UserForm from .base import BaseVPNAdmin, format_bytes @admin.register(User) class UserAdmin(BaseVPNAdmin): form = UserForm list_display = ('username', 'comment', 'registration_date', 'hash_link', 'server_count') search_fields = ('username', 'hash', 'telegram_user_id', 'telegram_username') readonly_fields = ('hash_link', 'vpn_access_summary', 'user_statistics_summary', 'telegram_info_display', 'subscription_management_info') inlines = [] # Inlines will be added by subscription management function fieldsets = ( ('User Information', { 'fields': ('username', 'first_name', 'last_name', 'email', 'comment') }), ('Telegram Integration', { 'fields': ('telegram_username', 'telegram_info_display'), 'classes': ('collapse',), 'description': 'Link existing users to Telegram by setting telegram_username (without @)' }), ('Access Information', { 'fields': ('hash_link', 'is_active', 'vpn_access_summary') }), ('Statistics & Server Management', { 'fields': ('user_statistics_summary',), 'classes': ('wide',) }), ('Subscription Management', { 'fields': ('subscription_management_info',), 'classes': ('wide',), 'description': 'Manage user\'s Xray subscription groups. Use the "User\'s Subscription Groups" section below to add/remove subscriptions.' }), ) @admin.display(description='VPN Access Summary') def vpn_access_summary(self, obj): """Display summary of user's VPN access""" if not obj.pk: return "Save user first to see VPN access" # Get legacy VPN access acl_count = ACL.objects.filter(user=obj).count() legacy_links = ACLLink.objects.filter(acl__user=obj).count() # Get Xray access from vpn.models_xray import UserSubscription xray_subs = UserSubscription.objects.filter(user=obj, active=True).select_related('subscription_group') xray_groups = [sub.subscription_group.name for sub in xray_subs] html = '
' # Legacy VPN section html += '
' html += '

📡 Legacy VPN (Outline/Wireguard)

' if acl_count > 0: html += f'

✅ Access to {acl_count} server(s)

' html += f'

🔗 Total links: {legacy_links}

' else: html += '

No legacy VPN access

' html += '
' # Xray section html += '
' html += '

🚀 Xray VPN

' if xray_groups: html += f'

✅ Active subscriptions: {len(xray_groups)}

' html += '' # Try to get traffic statistics for this user try: from vpn.server_plugins.xray_v2 import XrayServerV2 traffic_total_up = 0 traffic_total_down = 0 servers_checked = set() # Get all Xray servers xray_servers = XrayServerV2.objects.filter(api_enabled=True, stats_enabled=True) for server in xray_servers: if server.name not in servers_checked: try: from vpn.xray_api_v2.client import XrayClient from vpn.xray_api_v2.stats import StatsManager client = XrayClient(server=server.api_address) stats_manager = StatsManager(client) # Get user stats (use email format: username@servername) user_email = f"{obj.username}@{server.name}" user_stats = stats_manager.get_user_stats(user_email) if user_stats: traffic_total_up += user_stats.get('uplink', 0) traffic_total_down += user_stats.get('downlink', 0) servers_checked.add(server.name) except Exception as e: import logging logger = logging.getLogger(__name__) logger.debug(f"Could not get user stats from server {server.name}: {e}") # Format traffic if we got any if traffic_total_up > 0 or traffic_total_down > 0: html += f'

📊 Traffic Statistics:

' html += f'

↑ Upload: {format_bytes(traffic_total_up)}

' html += f'

↓ Download: {format_bytes(traffic_total_down)}

' except Exception as e: import logging logger = logging.getLogger(__name__) logger.debug(f"Could not get traffic stats for user {obj.username}: {e}") else: html += '

No Xray subscriptions

' html += '
' html += '
' return format_html(html) @admin.display(description='User Portal', ordering='hash') def hash_link(self, obj): portal_url = f"{EXTERNAL_ADDRESS}/u/{obj.hash}" json_url = f"{EXTERNAL_ADDRESS}/stat/{obj.hash}" return format_html( '
' + '🌐 Portal' + '📄 JSON' + '
', portal_url, json_url ) @admin.display(description='User Statistics Summary') def user_statistics_summary(self, obj): """Display user statistics with integrated server management""" try: from vpn.models import UserStatistics from django.db import models # Get statistics for this user user_stats = UserStatistics.objects.filter(user=obj).aggregate( total_connections=models.Sum('total_connections'), recent_connections=models.Sum('recent_connections'), total_links=models.Count('id'), max_daily_peak=models.Max('max_daily') ) # Get server breakdown server_breakdown = UserStatistics.objects.filter(user=obj).values('server_name').annotate( connections=models.Sum('total_connections'), links=models.Count('id') ).order_by('-connections') # Get all ACLs and links for this user user_acls = ACL.objects.filter(user=obj).select_related('server').prefetch_related('links') # Get available servers not yet assigned all_servers = Server.objects.all() assigned_server_ids = [acl.server.id for acl in user_acls] unassigned_servers = all_servers.exclude(id__in=assigned_server_ids) html = '
' # Overall Statistics html += '
' html += f'
' html += f'
Total Uses: {user_stats["total_connections"] or 0}
' html += f'
Recent (30d): {user_stats["recent_connections"] or 0}
' html += f'
Total Links: {user_stats["total_links"] or 0}
' if user_stats["max_daily_peak"]: html += f'
Daily Peak: {user_stats["max_daily_peak"]}
' html += f'
' html += '
' # Server Management if user_acls: html += '

🔗 Server Access & Links

' for acl in user_acls: server = acl.server links = list(acl.links.all()) # Server header (no slow server status checks) # Determine server type icon and label if server.server_type == 'Outline': type_icon = 'đŸ”ĩ' type_label = 'Outline' elif server.server_type == 'Wireguard': type_icon = 'đŸŸĸ' type_label = 'Wireguard' elif server.server_type in ['xray_core', 'xray_v2']: type_icon = 'đŸŸŖ' type_label = 'Xray' else: type_icon = '❓' type_label = server.server_type html += f'
' html += f'
{type_icon} {server.name} ({type_label})
' # Server stats server_stat = next((s for s in server_breakdown if s['server_name'] == server.name), None) if server_stat: html += f'' html += f'📊 {server_stat["connections"]} uses ({server_stat["links"]} links)' html += f'' html += f'
' html += '
' # Links display if links: for link in links: # Get link stats link_stats = UserStatistics.objects.filter( user=obj, server_name=server.name, acl_link_id=link.link ).first() html += '' # Add link button html += f'
' html += f'' html += f'
' html += '
' # End server-section # Add server access section if unassigned_servers: html += '
' html += '
➕ Available Servers
' html += '
' for server in unassigned_servers: # Determine server type icon and label if server.server_type == 'Outline': type_icon = 'đŸ”ĩ' type_label = 'Outline' elif server.server_type == 'Wireguard': type_icon = 'đŸŸĸ' type_label = 'Wireguard' elif server.server_type in ['xray_core', 'xray_v2']: type_icon = 'đŸŸŖ' type_label = 'Xray' else: type_icon = '❓' type_label = server.server_type html += f'' html += '
' html += '
' # End user-management-section return mark_safe(html) except Exception as e: return mark_safe(f'Error loading management interface: {e}') @admin.display(description='Recent Activity') def recent_activity_display(self, obj): """Display recent activity in compact admin-friendly format""" try: from datetime import timedelta from django.utils import timezone # Get recent access logs for this user (last 7 days, limited) seven_days_ago = timezone.now() - timedelta(days=7) recent_logs = AccessLog.objects.filter( user=obj.username, timestamp__gte=seven_days_ago ).order_by('-timestamp')[:15] # Limit to 15 most recent if not recent_logs: return mark_safe('
No recent activity (last 7 days)
') html = '
' # Header html += '
' html += f'📊 Recent Activity ({recent_logs.count()} entries, last 7 days)' html += '
' # Activity entries for i, log in enumerate(recent_logs): bg_color = '#ffffff' if i % 2 == 0 else '#f8f9fa' local_time = localtime(log.timestamp) # Status icon and color if log.action == 'Success': icon = '✅' status_color = '#28a745' elif log.action == 'Failed': icon = '❌' status_color = '#dc3545' else: icon = 'â„šī¸' status_color = '#6c757d' html += f'
' # Left side - server and link info html += f'
' html += f'{icon}' html += f'
' html += f'
{log.server}
' if log.acl_link_id: link_short = log.acl_link_id[:12] + '...' if len(log.acl_link_id) > 12 else log.acl_link_id html += f'
{link_short}
' html += f'
' # Right side - timestamp and status html += f'
' html += f'
{local_time.strftime("%m-%d %H:%M")}
' html += f'
{log.action}
' html += f'
' html += f'
' # Footer with summary if there are more entries total_recent = AccessLog.objects.filter( user=obj.username, timestamp__gte=seven_days_ago ).count() if total_recent > 15: html += f'
' html += f'Showing 15 of {total_recent} entries from last 7 days' html += f'
' html += '
' return mark_safe(html) except Exception as e: return mark_safe(f'Error loading activity: {e}') @admin.display(description='Telegram Account') def telegram_info_display(self, obj): """Display Telegram account information""" if not obj.telegram_user_id: if obj.telegram_username: return mark_safe(f'
' f'🔗 Ready to link: @{obj.telegram_username}
' f'User will be automatically linked when they message the bot
') else: return mark_safe('No Telegram account linked') html = '
' html += '

📱 Telegram Account Information

' # Telegram User ID html += f'

User ID: {obj.telegram_user_id}

' # Telegram Username if obj.telegram_username: html += f'

Username: @{obj.telegram_username}

' # Telegram Names name_parts = [] if obj.telegram_first_name: name_parts.append(obj.telegram_first_name) if obj.telegram_last_name: name_parts.append(obj.telegram_last_name) if name_parts: full_name = ' '.join(name_parts) html += f'

Name: {full_name}

' # Telegram Phone (if available) if obj.telegram_phone: html += f'

Phone: {obj.telegram_phone}

' # Access requests count (if any) try: from telegram_bot.models import AccessRequest requests_count = AccessRequest.objects.filter(telegram_user_id=obj.telegram_user_id).count() if requests_count > 0: html += f'

📝 Access Requests: {requests_count}

' # Show latest request status latest_request = AccessRequest.objects.filter(telegram_user_id=obj.telegram_user_id).order_by('-created_at').first() if latest_request: status_color = '#28a745' if latest_request.approved else '#ffc107' status_text = 'Approved' if latest_request.approved else 'Pending' html += f'

Latest: {status_text}

' except: pass # Telegram bot app might not be available # Add unlink button unlink_url = reverse('admin:vpn_user_unlink_telegram', args=[obj.pk]) html += f'
' html += f'🔗đŸ’Ĩ Unlink Telegram Account' html += '
' html += '
' return mark_safe(html) @admin.display(description='Subscription Management') def subscription_management_info(self, obj): """Display subscription management information and quick access""" if not obj.pk: return "Save user first to manage subscriptions" try: from vpn.models_xray import UserSubscription, SubscriptionGroup # Get user's current subscriptions user_subscriptions = UserSubscription.objects.filter(user=obj).select_related('subscription_group') active_subs = user_subscriptions.filter(active=True) inactive_subs = user_subscriptions.filter(active=False) # Get available subscription groups all_groups = SubscriptionGroup.objects.filter(is_active=True) subscribed_group_ids = user_subscriptions.values_list('subscription_group_id', flat=True) available_groups = all_groups.exclude(id__in=subscribed_group_ids) html = '
' html += '

🚀 Xray Subscription Management

' # Active subscriptions if active_subs.exists(): html += '
' html += '
✅ Active Subscriptions
' for sub in active_subs: html += f'
' html += f'{sub.subscription_group.name}' if sub.subscription_group.description: html += f' - {sub.subscription_group.description[:50]}{"..." if len(sub.subscription_group.description) > 50 else ""}' html += f'' html += f'Since: {sub.created_at.strftime("%Y-%m-%d")}' html += f'
' html += '
' # Inactive subscriptions if inactive_subs.exists(): html += '
' html += '
❌ Inactive Subscriptions
' for sub in inactive_subs: html += f'
' html += f'{sub.subscription_group.name}' html += f'
' html += '
' # Available subscription groups if available_groups.exists(): html += '
' html += '
➕ Available Subscription Groups
' html += '
' for group in available_groups[:10]: # Limit to avoid clutter html += f'' html += f'{group.name}' html += f'' if available_groups.count() > 10: html += f'+{available_groups.count() - 10} more...' html += '
' html += '
' # Quick access links html += '
' html += '
🔗 Quick Access
' html += '
' # Link to standalone UserSubscription admin subscription_admin_url = f"/admin/vpn/usersubscription/?user__id__exact={obj.id}" html += f'📋 Manage All Subscriptions' # Link to add new subscription add_subscription_url = f"/admin/vpn/usersubscription/add/?user={obj.id}" html += f'➕ Add New Subscription' # Link to subscription groups admin groups_admin_url = "/admin/vpn/subscriptiongroup/" html += f'âš™ī¸ Manage Groups' html += '
' html += '
' # Statistics total_subs = user_subscriptions.count() if total_subs > 0: html += '
' html += f'📊 Total: {total_subs} subscription(s) | Active: {active_subs.count()} | Inactive: {inactive_subs.count()}' html += '
' html += '
' return mark_safe(html) except Exception as e: return mark_safe(f'
❌ Error loading subscription management: {e}
') @admin.display(description='Allowed servers', ordering='server_count') def server_count(self, obj): return obj.server_count def get_queryset(self, request): qs = super().get_queryset(request) qs = qs.annotate(server_count=Count('acl')) return qs def get_urls(self): """Add custom URLs for link management""" urls = super().get_urls() custom_urls = [ path('/add-link/', self.admin_site.admin_view(self.add_link_view), name='user_add_link'), path('/delete-link//', self.admin_site.admin_view(self.delete_link_view), name='user_delete_link'), path('/add-server-access/', self.admin_site.admin_view(self.add_server_access_view), name='user_add_server_access'), path('/unlink-telegram/', self.admin_site.admin_view(self.unlink_telegram_view), name='vpn_user_unlink_telegram'), ] return custom_urls + urls def add_link_view(self, request, user_id): """AJAX view to add a new link for user on specific server""" if request.method == 'POST': try: user = User.objects.get(pk=user_id) server_id = request.POST.get('server_id') comment = request.POST.get('comment', '') if not server_id: return JsonResponse({'error': 'Server ID is required'}, status=400) server = Server.objects.get(pk=server_id) acl = ACL.objects.get(user=user, server=server) # Create new link new_link = ACLLink.objects.create( acl=acl, comment=comment, link=shortuuid.ShortUUID().random(length=16) ) return JsonResponse({ 'success': True, 'link_id': new_link.id, 'link': new_link.link, 'comment': new_link.comment, 'url': f"{EXTERNAL_ADDRESS}/ss/{new_link.link}#{server.name}" }) except Exception as e: return JsonResponse({'error': str(e)}, status=500) return JsonResponse({'error': 'Invalid request method'}, status=405) def delete_link_view(self, request, user_id, link_id): """AJAX view to delete a specific link""" if request.method == 'POST': try: user = User.objects.get(pk=user_id) link = ACLLink.objects.get(pk=link_id, acl__user=user) link.delete() return JsonResponse({'success': True}) except Exception as e: return JsonResponse({'error': str(e)}, status=500) return JsonResponse({'error': 'Invalid request method'}, status=405) def add_server_access_view(self, request, user_id): """AJAX view to add server access for user""" if request.method == 'POST': try: user = User.objects.get(pk=user_id) server_id = request.POST.get('server_id') if not server_id: return JsonResponse({'error': 'Server ID is required'}, status=400) server = Server.objects.get(pk=server_id) # Check if ACL already exists if ACL.objects.filter(user=user, server=server).exists(): return JsonResponse({'error': 'User already has access to this server'}, status=400) # Create new ACL (with default link) acl = ACL.objects.create(user=user, server=server) return JsonResponse({ 'success': True, 'server_name': server.name, 'server_type': server.server_type, 'acl_id': acl.id }) except Exception as e: return JsonResponse({'error': str(e)}, status=500) return JsonResponse({'error': 'Invalid request method'}, status=405) def unlink_telegram_view(self, request, user_id): """Unlink Telegram account from user""" user = get_object_or_404(User, pk=user_id) if request.method == 'GET': # Store original Telegram info for logging telegram_info = { 'user_id': user.telegram_user_id, 'username': user.telegram_username, 'first_name': user.telegram_first_name, 'last_name': user.telegram_last_name } # Clear all Telegram fields user.telegram_user_id = None user.telegram_username = "" user.telegram_first_name = "" user.telegram_last_name = "" user.telegram_phone = "" user.save() # Also clean up any related access requests try: from telegram_bot.models import AccessRequest AccessRequest.objects.filter(telegram_user_id=telegram_info['user_id']).delete() except: pass # Telegram bot app might not be available messages.success( request, f"Telegram account {'@' + telegram_info['username'] if telegram_info['username'] else telegram_info['user_id']} " f"has been unlinked from user '{user.username}'" ) return HttpResponseRedirect(reverse('admin:vpn_user_change', args=[user_id])) def change_view(self, request, object_id, form_url='', extra_context=None): """Override change view to add user management data and fix layout""" extra_context = extra_context or {} if object_id: try: user = User.objects.get(pk=object_id) extra_context.update({ 'user_object': user, 'external_address': EXTERNAL_ADDRESS, }) except User.DoesNotExist: pass return super().change_view(request, object_id, form_url, extra_context)