"""
User admin interface
"""
import shortuuid
from django.contrib import admin
from django.utils.safestring import mark_safe
from django.utils.html import format_html
from django.db.models import Count
from django.shortcuts import get_object_or_404
from django.contrib import messages
from django.urls import path, reverse
from django.http import HttpResponseRedirect, JsonResponse
from django.utils.timezone import localtime
from mysite.settings import EXTERNAL_ADDRESS
from vpn.models import User, ACL, ACLLink, Server, AccessLog, UserStatistics
from vpn.forms import UserForm
from .base import BaseVPNAdmin, format_bytes
@admin.register(User)
class UserAdmin(BaseVPNAdmin):
form = UserForm
list_display = ('username', 'comment', 'registration_date', 'hash_link', 'server_count')
search_fields = ('username', 'hash', 'telegram_user_id', 'telegram_username')
readonly_fields = ('hash_link', 'vpn_access_summary', 'user_statistics_summary', 'telegram_info_display', 'subscription_management_info')
inlines = [] # Inlines will be added by subscription management function
fieldsets = (
('User Information', {
'fields': ('username', 'first_name', 'last_name', 'email', 'comment')
}),
('Telegram Integration', {
'fields': ('telegram_username', 'telegram_info_display'),
'classes': ('collapse',),
'description': 'Link existing users to Telegram by setting telegram_username (without @)'
}),
('Access Information', {
'fields': ('hash_link', 'is_active', 'vpn_access_summary')
}),
('Statistics & Server Management', {
'fields': ('user_statistics_summary',),
'classes': ('wide',)
}),
('Subscription Management', {
'fields': ('subscription_management_info',),
'classes': ('wide',),
'description': 'Manage user\'s Xray subscription groups. Use the "User\'s Subscription Groups" section below to add/remove subscriptions.'
}),
)
@admin.display(description='VPN Access Summary')
def vpn_access_summary(self, obj):
"""Display summary of user's VPN access"""
if not obj.pk:
return "Save user first to see VPN access"
# Get legacy VPN access
acl_count = ACL.objects.filter(user=obj).count()
legacy_links = ACLLink.objects.filter(acl__user=obj).count()
# Get Xray access
from vpn.models_xray import UserSubscription
xray_subs = UserSubscription.objects.filter(user=obj, active=True).select_related('subscription_group')
xray_groups = [sub.subscription_group.name for sub in xray_subs]
html = '
'
# Legacy VPN section
html += '
'
html += '
đĄ Legacy VPN (Outline/Wireguard)
'
if acl_count > 0:
html += f'
â Access to {acl_count} server(s)
'
html += f'
đ Total links: {legacy_links}
'
else:
html += '
No legacy VPN access
'
html += '
'
# Xray section
html += '
'
html += '
đ Xray VPN
'
if xray_groups:
html += f'
â Active subscriptions: {len(xray_groups)}
'
html += '
'
for group in xray_groups:
html += f'
{group}
'
html += '
'
# Try to get traffic statistics for this user
try:
from vpn.server_plugins.xray_v2 import XrayServerV2
traffic_total_up = 0
traffic_total_down = 0
servers_checked = set()
# Get all Xray servers
xray_servers = XrayServerV2.objects.filter(api_enabled=True, stats_enabled=True)
for server in xray_servers:
if server.name not in servers_checked:
try:
from vpn.xray_api_v2.client import XrayClient
from vpn.xray_api_v2.stats import StatsManager
client = XrayClient(server=server.api_address)
stats_manager = StatsManager(client)
# Get user stats (use email format: username@servername)
user_email = f"{obj.username}@{server.name}"
user_stats = stats_manager.get_user_stats(user_email)
if user_stats:
traffic_total_up += user_stats.get('uplink', 0)
traffic_total_down += user_stats.get('downlink', 0)
servers_checked.add(server.name)
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.debug(f"Could not get user stats from server {server.name}: {e}")
# Format traffic if we got any
if traffic_total_up > 0 or traffic_total_down > 0:
html += f'
đ Traffic Statistics:
'
html += f'
â Upload: {format_bytes(traffic_total_up)}
'
html += f'
â Download: {format_bytes(traffic_total_down)}
'
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.debug(f"Could not get traffic stats for user {obj.username}: {e}")
else:
html += '
',
portal_url, json_url
)
@admin.display(description='User Statistics Summary')
def user_statistics_summary(self, obj):
"""Display user statistics with integrated server management"""
try:
from vpn.models import UserStatistics
from django.db import models
# Get statistics for this user
user_stats = UserStatistics.objects.filter(user=obj).aggregate(
total_connections=models.Sum('total_connections'),
recent_connections=models.Sum('recent_connections'),
total_links=models.Count('id'),
max_daily_peak=models.Max('max_daily')
)
# Get server breakdown
server_breakdown = UserStatistics.objects.filter(user=obj).values('server_name').annotate(
connections=models.Sum('total_connections'),
links=models.Count('id')
).order_by('-connections')
# Get all ACLs and links for this user
user_acls = ACL.objects.filter(user=obj).select_related('server').prefetch_related('links')
# Get available servers not yet assigned
all_servers = Server.objects.all()
assigned_server_ids = [acl.server.id for acl in user_acls]
unassigned_servers = all_servers.exclude(id__in=assigned_server_ids)
html = '
'
# Overall Statistics
html += '
'
html += f'
'
html += f'
Total Uses: {user_stats["total_connections"] or 0}
'
html += f'
Recent (30d): {user_stats["recent_connections"] or 0}
'
html += f'
Total Links: {user_stats["total_links"] or 0}
'
if user_stats["max_daily_peak"]:
html += f'
Daily Peak: {user_stats["max_daily_peak"]}
'
html += f'
'
html += '
'
# Server Management
if user_acls:
html += '
đ Server Access & Links
'
for acl in user_acls:
server = acl.server
links = list(acl.links.all())
# Server header (no slow server status checks)
# Determine server type icon and label
if server.server_type == 'Outline':
type_icon = 'đĩ'
type_label = 'Outline'
elif server.server_type == 'Wireguard':
type_icon = 'đĸ'
type_label = 'Wireguard'
elif server.server_type in ['xray_core', 'xray_v2']:
type_icon = 'đŖ'
type_label = 'Xray'
else:
type_icon = 'â'
type_label = server.server_type
html += f'
'
html += f'
{type_icon} {server.name} ({type_label})
'
# Server stats
server_stat = next((s for s in server_breakdown if s['server_name'] == server.name), None)
if server_stat:
html += f''
html += f'đ {server_stat["connections"]} uses ({server_stat["links"]} links)'
html += f''
html += f'
'
html += '
'
# Links display
if links:
for link in links:
# Get link stats
link_stats = UserStatistics.objects.filter(
user=obj, server_name=server.name, acl_link_id=link.link
).first()
html += '
'
html += f'
'
html += f'
'
html += f'{link.link[:16]}...' if len(link.link) > 16 else link.link
html += f'
'
if link.comment:
html += f'
{link.comment}
'
html += f'
'
# Link stats and actions
html += f'
'
if link_stats:
html += f''
html += f'⨠{link_stats.total_connections}'
html += f''
# Test link button
html += f'đ'
# Delete button
html += f''
# Last access
if link.last_access_time:
local_time = localtime(link.last_access_time)
html += f''
html += f'{local_time.strftime("%m-%d %H:%M")}'
html += f''
else:
html += f''
html += f'Never'
html += f''
html += f'
'
# Add link button
html += f'
'
html += f''
html += f'
'
html += '
' # End server-section
# Add server access section
if unassigned_servers:
html += '
'
html += '
â Available Servers
'
html += '
'
for server in unassigned_servers:
# Determine server type icon and label
if server.server_type == 'Outline':
type_icon = 'đĩ'
type_label = 'Outline'
elif server.server_type == 'Wireguard':
type_icon = 'đĸ'
type_label = 'Wireguard'
elif server.server_type in ['xray_core', 'xray_v2']:
type_icon = 'đŖ'
type_label = 'Xray'
else:
type_icon = 'â'
type_label = server.server_type
html += f''
html += '
'
html += '
' # End user-management-section
return mark_safe(html)
except Exception as e:
return mark_safe(f'Error loading management interface: {e}')
@admin.display(description='Recent Activity')
def recent_activity_display(self, obj):
"""Display recent activity in compact admin-friendly format"""
try:
from datetime import timedelta
from django.utils import timezone
# Get recent access logs for this user (last 7 days, limited)
seven_days_ago = timezone.now() - timedelta(days=7)
recent_logs = AccessLog.objects.filter(
user=obj.username,
timestamp__gte=seven_days_ago
).order_by('-timestamp')[:15] # Limit to 15 most recent
if not recent_logs:
return mark_safe('
No recent activity (last 7 days)
')
html = '
'
# Header
html += '
'
html += f'đ Recent Activity ({recent_logs.count()} entries, last 7 days)'
html += '
'
# Activity entries
for i, log in enumerate(recent_logs):
bg_color = '#ffffff' if i % 2 == 0 else '#f8f9fa'
local_time = localtime(log.timestamp)
# Status icon and color
if log.action == 'Success':
icon = 'â '
status_color = '#28a745'
elif log.action == 'Failed':
icon = 'â'
status_color = '#dc3545'
else:
icon = 'âšī¸'
status_color = '#6c757d'
html += f'
'
# Left side - server and link info
html += f'
'
html += f'{icon}'
html += f'
'
html += f'
{log.server}
'
if log.acl_link_id:
link_short = log.acl_link_id[:12] + '...' if len(log.acl_link_id) > 12 else log.acl_link_id
html += f'
{link_short}
'
html += f'
'
# Right side - timestamp and status
html += f'
'
html += f'
{local_time.strftime("%m-%d %H:%M")}
'
html += f'
{log.action}
'
html += f'
'
html += f'
'
# Footer with summary if there are more entries
total_recent = AccessLog.objects.filter(
user=obj.username,
timestamp__gte=seven_days_ago
).count()
if total_recent > 15:
html += f'
'
html += f'Showing 15 of {total_recent} entries from last 7 days'
html += f'
'
html += '
'
return mark_safe(html)
except Exception as e:
return mark_safe(f'Error loading activity: {e}')
@admin.display(description='Telegram Account')
def telegram_info_display(self, obj):
"""Display Telegram account information"""
if not obj.telegram_user_id:
if obj.telegram_username:
return mark_safe(f'
'
f'đ Ready to link: @{obj.telegram_username} '
f'User will be automatically linked when they message the bot
')
else:
return mark_safe('No Telegram account linked')
html = '
'
html += '
đą Telegram Account Information
'
# Telegram User ID
html += f'
User ID:{obj.telegram_user_id}
'
# Telegram Username
if obj.telegram_username:
html += f'
Username: @{obj.telegram_username}
'
# Telegram Names
name_parts = []
if obj.telegram_first_name:
name_parts.append(obj.telegram_first_name)
if obj.telegram_last_name:
name_parts.append(obj.telegram_last_name)
if name_parts:
full_name = ' '.join(name_parts)
html += f'
Name: {full_name}
'
# Telegram Phone (if available)
if obj.telegram_phone:
html += f'
Phone: {obj.telegram_phone}
'
# Access requests count (if any)
try:
from telegram_bot.models import AccessRequest
requests_count = AccessRequest.objects.filter(telegram_user_id=obj.telegram_user_id).count()
if requests_count > 0:
html += f'
đ Access Requests: {requests_count}
'
# Show latest request status
latest_request = AccessRequest.objects.filter(telegram_user_id=obj.telegram_user_id).order_by('-created_at').first()
if latest_request:
status_color = '#28a745' if latest_request.approved else '#ffc107'
status_text = 'Approved' if latest_request.approved else 'Pending'
html += f'
Latest: {status_text}
'
except:
pass # Telegram bot app might not be available
# Add unlink button
unlink_url = reverse('admin:vpn_user_unlink_telegram', args=[obj.pk])
html += f'
'
return mark_safe(html)
@admin.display(description='Subscription Management')
def subscription_management_info(self, obj):
"""Display subscription management information and quick access"""
if not obj.pk:
return "Save user first to manage subscriptions"
try:
from vpn.models_xray import UserSubscription, SubscriptionGroup
# Get user's current subscriptions
user_subscriptions = UserSubscription.objects.filter(user=obj).select_related('subscription_group')
active_subs = user_subscriptions.filter(active=True)
inactive_subs = user_subscriptions.filter(active=False)
# Get available subscription groups
all_groups = SubscriptionGroup.objects.filter(is_active=True)
subscribed_group_ids = user_subscriptions.values_list('subscription_group_id', flat=True)
available_groups = all_groups.exclude(id__in=subscribed_group_ids)
html = '
'
html += '
đ Xray Subscription Management
'
# Active subscriptions
if active_subs.exists():
html += '
'
html += '
â Active Subscriptions
'
for sub in active_subs:
html += f'
'
html += f'{sub.subscription_group.name}'
if sub.subscription_group.description:
html += f' - {sub.subscription_group.description[:50]}{"..." if len(sub.subscription_group.description) > 50 else ""}'
html += f''
html += f'Since: {sub.created_at.strftime("%Y-%m-%d")}'
html += f'
'
html += '
'
# Inactive subscriptions
if inactive_subs.exists():
html += '
'
html += '
â Inactive Subscriptions
'
for sub in inactive_subs:
html += f'
'
html += f'{sub.subscription_group.name}'
html += f'
'
html += '
'
# Available subscription groups
if available_groups.exists():
html += '
'
html += '
â Available Subscription Groups
'
html += '
'
for group in available_groups[:10]: # Limit to avoid clutter
html += f''
html += f'{group.name}'
html += f''
if available_groups.count() > 10:
html += f'+{available_groups.count() - 10} more...'
html += '
'
html += '
'
# Quick access links
html += '
'
html += '
đ Quick Access
'
html += '
'
# Link to standalone UserSubscription admin
subscription_admin_url = f"/admin/vpn/usersubscription/?user__id__exact={obj.id}"
html += f'đ Manage All Subscriptions'
# Link to add new subscription
add_subscription_url = f"/admin/vpn/usersubscription/add/?user={obj.id}"
html += f'â Add New Subscription'
# Link to subscription groups admin
groups_admin_url = "/admin/vpn/subscriptiongroup/"
html += f'âī¸ Manage Groups'
html += '
'
html += '
'
# Statistics
total_subs = user_subscriptions.count()
if total_subs > 0:
html += '
'
html += f'đ Total: {total_subs} subscription(s) | Active: {active_subs.count()} | Inactive: {inactive_subs.count()}'
html += '
'
html += '
'
return mark_safe(html)
except Exception as e:
return mark_safe(f'
â Error loading subscription management: {e}
')
@admin.display(description='Allowed servers', ordering='server_count')
def server_count(self, obj):
return obj.server_count
def get_queryset(self, request):
qs = super().get_queryset(request)
qs = qs.annotate(server_count=Count('acl'))
return qs
def get_urls(self):
"""Add custom URLs for link management"""
urls = super().get_urls()
custom_urls = [
path('/add-link/', self.admin_site.admin_view(self.add_link_view), name='user_add_link'),
path('/delete-link//', self.admin_site.admin_view(self.delete_link_view), name='user_delete_link'),
path('/add-server-access/', self.admin_site.admin_view(self.add_server_access_view), name='user_add_server_access'),
path('/unlink-telegram/', self.admin_site.admin_view(self.unlink_telegram_view), name='vpn_user_unlink_telegram'),
]
return custom_urls + urls
def add_link_view(self, request, user_id):
"""AJAX view to add a new link for user on specific server"""
if request.method == 'POST':
try:
user = User.objects.get(pk=user_id)
server_id = request.POST.get('server_id')
comment = request.POST.get('comment', '')
if not server_id:
return JsonResponse({'error': 'Server ID is required'}, status=400)
server = Server.objects.get(pk=server_id)
acl = ACL.objects.get(user=user, server=server)
# Create new link
new_link = ACLLink.objects.create(
acl=acl,
comment=comment,
link=shortuuid.ShortUUID().random(length=16)
)
return JsonResponse({
'success': True,
'link_id': new_link.id,
'link': new_link.link,
'comment': new_link.comment,
'url': f"{EXTERNAL_ADDRESS}/ss/{new_link.link}#{server.name}"
})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
return JsonResponse({'error': 'Invalid request method'}, status=405)
def delete_link_view(self, request, user_id, link_id):
"""AJAX view to delete a specific link"""
if request.method == 'POST':
try:
user = User.objects.get(pk=user_id)
link = ACLLink.objects.get(pk=link_id, acl__user=user)
link.delete()
return JsonResponse({'success': True})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
return JsonResponse({'error': 'Invalid request method'}, status=405)
def add_server_access_view(self, request, user_id):
"""AJAX view to add server access for user"""
if request.method == 'POST':
try:
user = User.objects.get(pk=user_id)
server_id = request.POST.get('server_id')
if not server_id:
return JsonResponse({'error': 'Server ID is required'}, status=400)
server = Server.objects.get(pk=server_id)
# Check if ACL already exists
if ACL.objects.filter(user=user, server=server).exists():
return JsonResponse({'error': 'User already has access to this server'}, status=400)
# Create new ACL (with default link)
acl = ACL.objects.create(user=user, server=server)
return JsonResponse({
'success': True,
'server_name': server.name,
'server_type': server.server_type,
'acl_id': acl.id
})
except Exception as e:
return JsonResponse({'error': str(e)}, status=500)
return JsonResponse({'error': 'Invalid request method'}, status=405)
def unlink_telegram_view(self, request, user_id):
"""Unlink Telegram account from user"""
user = get_object_or_404(User, pk=user_id)
if request.method == 'GET':
# Store original Telegram info for logging
telegram_info = {
'user_id': user.telegram_user_id,
'username': user.telegram_username,
'first_name': user.telegram_first_name,
'last_name': user.telegram_last_name
}
# Clear all Telegram fields
user.telegram_user_id = None
user.telegram_username = ""
user.telegram_first_name = ""
user.telegram_last_name = ""
user.telegram_phone = ""
user.save()
# Also clean up any related access requests
try:
from telegram_bot.models import AccessRequest
AccessRequest.objects.filter(telegram_user_id=telegram_info['user_id']).delete()
except:
pass # Telegram bot app might not be available
messages.success(
request,
f"Telegram account {'@' + telegram_info['username'] if telegram_info['username'] else telegram_info['user_id']} "
f"has been unlinked from user '{user.username}'"
)
return HttpResponseRedirect(reverse('admin:vpn_user_change', args=[user_id]))
def change_view(self, request, object_id, form_url='', extra_context=None):
"""Override change view to add user management data and fix layout"""
extra_context = extra_context or {}
if object_id:
try:
user = User.objects.get(pk=object_id)
extra_context.update({
'user_object': user,
'external_address': EXTERNAL_ADDRESS,
})
except User.DoesNotExist:
pass
return super().change_view(request, object_id, form_url, extra_context)