Files
OutFleet/vpn/admin_xray.py

833 lines
34 KiB
Python
Raw Normal View History

2025-08-08 05:46:36 +03:00
"""
Admin interface for new Xray models.
"""
import json
from django.contrib import admin, messages
from django.utils.safestring import mark_safe
from django.utils.html import format_html
from django.db import models
from django.forms import CheckboxSelectMultiple, Textarea
from django.shortcuts import render, redirect
from django.urls import path, reverse
from django.http import JsonResponse, HttpResponseRedirect
from .models_xray import (
2025-08-08 06:50:04 +03:00
Credentials, Certificate,
2025-08-08 05:46:36 +03:00
Inbound, SubscriptionGroup, UserSubscription, ServerInbound
)
2025-08-08 06:50:04 +03:00
# Credentials admin available through direct URL but not in main menu
2025-08-08 05:46:36 +03:00
class CredentialsAdmin(admin.ModelAdmin):
2025-08-08 06:50:04 +03:00
"""Admin for credentials management (accessible via direct URL only)"""
2025-08-08 05:46:36 +03:00
list_display = ('name', 'cred_type', 'description', 'created_at')
list_filter = ('cred_type',)
search_fields = ('name', 'description')
fieldsets = (
('Basic Information', {
'fields': ('name', 'cred_type', 'description')
}),
('Credentials Data', {
'fields': ('credentials_help', 'credentials'),
2025-08-08 06:50:04 +03:00
'description': 'Enter credentials as JSON'
2025-08-08 05:46:36 +03:00
}),
('Preview', {
'fields': ('credentials_display',),
'classes': ('collapse',),
}),
('Timestamps', {
'fields': ('created_at', 'updated_at'),
'classes': ('collapse',)
})
)
readonly_fields = ('credentials_display', 'credentials_help', 'created_at', 'updated_at')
formfield_overrides = {
2025-08-08 06:50:04 +03:00
models.JSONField: {'widget': Textarea(attrs={'rows': 6, 'style': 'font-family: monospace;'})},
2025-08-08 05:46:36 +03:00
}
def credentials_help(self, obj):
2025-08-08 06:50:04 +03:00
"""Display help for different credential formats"""
2025-08-08 05:46:36 +03:00
examples = {
'cloudflare': {
2025-08-08 06:50:04 +03:00
'api_token': 'your_cloudflare_api_token_here'
2025-08-08 05:46:36 +03:00
},
2025-08-08 06:50:04 +03:00
'digitalocean': {
'token': 'your_digitalocean_token_here'
2025-08-08 05:46:36 +03:00
},
2025-08-08 06:50:04 +03:00
'aws_route53': {
'access_key_id': 'your_access_key_id',
'secret_access_key': 'your_secret_access_key',
'region': 'us-east-1'
2025-08-08 05:46:36 +03:00
}
}
html = '<div style="background: #f8f9fa; padding: 15px; border-radius: 4px; margin-bottom: 10px;">'
html += '<h4>JSON Examples:</h4>'
for cred_type, example in examples.items():
html += '<div style="margin: 10px 0;">'
html += '<strong>' + cred_type.title() + ':</strong>'
json_str = json.dumps(example, indent=2)
html += '<pre style="background: white; padding: 8px; border-radius: 3px; font-size: 12px;">' + json_str + '</pre>'
html += '</div>'
html += '<p><strong>Note:</strong> Make sure your JSON is valid. Use double quotes for strings.</p>'
html += '</div>'
return mark_safe(html)
credentials_help.short_description = 'Credentials Format Help'
def credentials_display(self, obj):
"""Display credentials in a safe format"""
if obj.credentials:
# Hide sensitive values
safe_creds = {}
for key, value in obj.credentials.items():
if any(sensitive in key.lower() for sensitive in ['token', 'key', 'password', 'secret']):
safe_creds[key] = '*' * 8
else:
safe_creds[key] = value
return format_html(
'<pre style="background: #f4f4f4; padding: 10px; border-radius: 4px;">{}</pre>',
json.dumps(safe_creds, indent=2)
)
return '-'
credentials_display.short_description = 'Credentials (Preview)'
2025-08-08 07:47:23 +03:00
# Register Credentials admin but hide from main menu for Django + button functionality
@admin.register(Credentials)
class CredentialsHiddenAdmin(CredentialsAdmin):
"""Credentials admin hidden from main menu but available for + button"""
def has_module_permission(self, request):
"""Hide this model from the main admin index"""
return False
2025-08-08 06:50:04 +03:00
2025-08-08 05:46:36 +03:00
@admin.register(Certificate)
class CertificateAdmin(admin.ModelAdmin):
"""Admin for certificate management"""
list_display = (
'domain', 'cert_type', 'status_display',
'expires_at', 'auto_renew', 'action_buttons'
)
list_filter = ('cert_type', 'auto_renew')
search_fields = ('domain',)
2025-08-08 06:50:04 +03:00
actions = ['rotate_selected_certificates']
2025-08-08 05:46:36 +03:00
fieldsets = (
('Certificate Request', {
2025-08-08 06:50:04 +03:00
'fields': ('domain', 'cert_type', 'acme_email', 'auto_renew'),
'description': 'For Let\'s Encrypt certificates, provide email for ACME registration and select/create credentials below.'
}),
('API Credentials', {
'fields': ('credentials',),
'description': 'Select API credentials for automatic Let\'s Encrypt certificate generation'
2025-08-08 05:46:36 +03:00
}),
2025-08-08 06:50:04 +03:00
('Certificate Generation Status', {
'fields': ('generation_help',),
2025-08-08 05:46:36 +03:00
'classes': ('wide',)
}),
('Certificate Data', {
2025-08-08 06:50:04 +03:00
'fields': ('certificate_info', 'certificate_pem', 'private_key_pem'),
2025-08-08 05:46:36 +03:00
'classes': ('collapse',),
2025-08-08 06:50:04 +03:00
'description': 'Detailed certificate information'
2025-08-08 05:46:36 +03:00
}),
('Renewal Settings', {
'fields': ('last_renewed',),
'classes': ('collapse',)
}),
('Timestamps', {
'fields': ('created_at', 'updated_at'),
'classes': ('collapse',)
})
)
readonly_fields = (
2025-08-08 06:50:04 +03:00
'certificate_info', 'status_display', 'generation_help',
2025-08-08 05:46:36 +03:00
'expires_at', 'last_renewed', 'created_at', 'updated_at'
)
def generation_help(self, obj):
"""Show help text for certificate generation"""
if not obj.pk:
return mark_safe('<div style="background: #e3f2fd; padding: 10px; border-radius: 4px;">'
'<p><strong>How it works:</strong></p>'
'<ol>'
'<li>Fill in the domain name</li>'
'<li>Select certificate type (Let\'s Encrypt recommended)</li>'
'<li>For Let\'s Encrypt: provide email for ACME account registration</li>'
'<li>Select credentials with Cloudflare API token</li>'
'<li>Save - certificate will be generated automatically</li>'
'</ol>'
'</div>')
if obj.cert_type == 'letsencrypt' and not obj.certificate_pem:
return mark_safe('<div style="background: #fff3e0; padding: 10px; border-radius: 4px;">'
'<p><strong>⏳ Certificate not generated yet</strong></p>'
'<p>Certificate will be generated automatically using Let\'s Encrypt DNS-01 challenge.</p>'
'</div>')
if obj.certificate_pem:
days = obj.days_until_expiration if obj.days_until_expiration is not None else 'Unknown'
return mark_safe('<div style="background: #e8f5e8; padding: 10px; border-radius: 4px;">'
'<p><strong>✅ Certificate generated successfully</strong></p>'
f'<p>Expires: {obj.expires_at}</p>'
f'<p>Days remaining: {days}</p>'
'</div>')
return '-'
generation_help.short_description = 'Certificate Generation Status'
def status_display(self, obj):
"""Display certificate status"""
if obj.is_expired:
return format_html(
'<span style="color: red;">❌ Expired</span>'
)
elif obj.needs_renewal:
return format_html(
'<span style="color: orange;">⚠️ Needs renewal ({} days)</span>',
obj.days_until_expiration
)
else:
return format_html(
'<span style="color: green;">✅ Valid ({} days)</span>',
obj.days_until_expiration
)
status_display.short_description = 'Status'
def certificate_preview(self, obj):
"""Preview certificate info"""
if obj.certificate_pem:
lines = obj.certificate_pem.strip().split('\n')
preview = '\n'.join(lines[:5] + ['...'] + lines[-3:])
return format_html(
'<pre style="background: #f4f4f4; padding: 10px; font-family: monospace; font-size: 12px;">{}</pre>',
preview
)
return '-'
certificate_preview.short_description = 'Certificate Preview'
def action_buttons(self, obj):
"""Action buttons for certificate"""
buttons = []
if obj.needs_renewal and obj.auto_renew:
renew_url = reverse('admin:certificate_renew', args=[obj.pk])
buttons.append(
f'<a href="{renew_url}" class="button" style="background: #ff9800;">🔄 Renew Now</a>'
)
if obj.cert_type == 'self_signed':
regenerate_url = reverse('admin:certificate_regenerate', args=[obj.pk])
buttons.append(
f'<a href="{regenerate_url}" class="button">🔄 Regenerate</a>'
)
return format_html(' '.join(buttons)) if buttons else '-'
action_buttons.short_description = 'Actions'
def get_urls(self):
urls = super().get_urls()
custom_urls = [
path('<int:cert_id>/renew/',
self.admin_site.admin_view(self.renew_certificate_view),
name='certificate_renew'),
path('<int:cert_id>/regenerate/',
self.admin_site.admin_view(self.regenerate_certificate_view),
name='certificate_regenerate'),
]
return custom_urls + urls
def renew_certificate_view(self, request, cert_id):
"""Renew Let's Encrypt certificate"""
try:
cert = Certificate.objects.get(pk=cert_id)
# TODO: Implement renewal logic
messages.success(request, f'Certificate for {cert.domain} renewed successfully!')
except Exception as e:
messages.error(request, f'Failed to renew certificate: {e}')
return redirect('admin:vpn_certificate_change', cert_id)
def regenerate_certificate_view(self, request, cert_id):
"""Regenerate self-signed certificate"""
try:
cert = Certificate.objects.get(pk=cert_id)
# TODO: Implement regeneration logic
messages.success(request, f'Certificate for {cert.domain} regenerated successfully!')
except Exception as e:
messages.error(request, f'Failed to regenerate certificate: {e}')
return redirect('admin:vpn_certificate_change', cert_id)
def save_model(self, request, obj, form, change):
"""Auto-generate certificate for Let's Encrypt after saving"""
super().save_model(request, obj, form, change)
# Auto-generate Let's Encrypt certificate if needed
if obj.cert_type == 'letsencrypt' and not obj.certificate_pem:
try:
self.generate_letsencrypt_certificate(obj, request)
except Exception as e:
messages.warning(request, f'Certificate saved but auto-generation failed: {e}')
def generate_letsencrypt_certificate(self, cert_obj, request):
"""Generate Let's Encrypt certificate using DNS-01 challenge"""
if not cert_obj.credentials:
messages.error(request, 'Credentials required for Let\'s Encrypt certificate generation')
return
if not cert_obj.acme_email:
messages.error(request, 'ACME email address required for Let\'s Encrypt certificate generation')
return
try:
from vpn.letsencrypt.letsencrypt_dns import get_certificate_for_domain
from datetime import datetime, timedelta
from django.utils import timezone
# Get Cloudflare credentials
api_token = cert_obj.credentials.get_credential('api_token')
if not api_token:
messages.error(request, 'Cloudflare API token not found in credentials')
return
messages.info(request, f'🔄 Generating Let\'s Encrypt certificate for {cert_obj.domain} using {cert_obj.acme_email}...')
# Schedule certificate generation via Celery
from vpn.tasks import generate_certificate_task
task = generate_certificate_task.delay(cert_obj.id)
messages.success(
request,
f'🔄 Certificate generation scheduled for {cert_obj.domain}. Task ID: {task.id}'
)
except ImportError:
messages.warning(request, 'Let\'s Encrypt DNS challenge library not available')
except Exception as e:
messages.error(request, f'Failed to generate certificate: {str(e)}')
# Log the full error for debugging
import logging
logger = logging.getLogger(__name__)
logger.error(f'Certificate generation failed for {cert_obj.domain}: {e}', exc_info=True)
2025-08-08 06:50:04 +03:00
def certificate_info(self, obj):
"""Display detailed certificate information"""
if not obj.pk:
return "Save certificate to see details"
if not obj.certificate_pem:
return "Certificate not generated yet"
html = '<div style="background: #f8f9fa; padding: 15px; border-radius: 4px;">'
# Import here to avoid circular imports
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
# Parse certificate
cert = x509.load_pem_x509_certificate(obj.certificate_pem.encode(), default_backend())
# Basic info
html += '<h4>📜 Certificate Information</h4>'
html += '<table style="width: 100%; font-size: 12px;">'
html += f'<tr><td><strong>Subject:</strong></td><td>{cert.subject.rfc4514_string()}</td></tr>'
html += f'<tr><td><strong>Issuer:</strong></td><td>{cert.issuer.rfc4514_string()}</td></tr>'
html += f'<tr><td><strong>Serial Number:</strong></td><td>{cert.serial_number}</td></tr>'
# Use UTC versions to avoid deprecation warnings
try:
# Try new UTC properties first (cryptography >= 42.0.0)
valid_from = cert.not_valid_before_utc
valid_until = cert.not_valid_after_utc
cert_not_after = valid_until
except AttributeError:
# Fall back to old properties for older cryptography versions
valid_from = cert.not_valid_before
valid_until = cert.not_valid_after
cert_not_after = cert.not_valid_after
if cert_not_after.tzinfo is None:
cert_not_after = cert_not_after.replace(tzinfo=timezone.utc)
html += f'<tr><td><strong>Valid From:</strong></td><td>{valid_from}</td></tr>'
html += f'<tr><td><strong>Valid Until:</strong></td><td>{valid_until}</td></tr>'
# Status
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
days_until_expiry = (cert_not_after - now).days
if days_until_expiry < 0:
status = f'<span style="color: red;">❌ Expired {abs(days_until_expiry)} days ago</span>'
elif days_until_expiry < 30:
status = f'<span style="color: orange;">⚠️ Expires in {days_until_expiry} days</span>'
else:
status = f'<span style="color: green;">✅ Valid for {days_until_expiry} days</span>'
html += f'<tr><td><strong>Status:</strong></td><td>{status}</td></tr>'
# Extensions
try:
san = cert.extensions.get_extension_for_oid(x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
domains = [name.value for name in san.value]
html += f'<tr><td><strong>Domains:</strong></td><td>{", ".join(domains)}</td></tr>'
except:
# No SAN extension or other error
pass
html += '</table>'
except ImportError:
html += '<p>⚠️ Install cryptography package to see detailed certificate information</p>'
except Exception as e:
html += f'<p>❌ Error parsing certificate: {e}</p>'
html += '</div>'
return mark_safe(html)
certificate_info.short_description = 'Certificate Details'
def rotate_selected_certificates(self, request, queryset):
"""Admin action to rotate selected certificates"""
from vpn.tasks import generate_certificate_task
# Filter only Let's Encrypt certificates
valid_certs = queryset.filter(cert_type='letsencrypt')
if not valid_certs.exists():
self.message_user(request, "No Let's Encrypt certificates selected. Only Let's Encrypt certificates can be rotated.", level='ERROR')
return
# Check for certificates without credentials
certs_without_creds = valid_certs.filter(credentials__isnull=True)
if certs_without_creds.exists():
domains = ', '.join(certs_without_creds.values_list('domain', flat=True))
self.message_user(request, f"The following certificates have no credentials configured and will be skipped: {domains}", level='WARNING')
# Filter certificates that have credentials
certs_to_rotate = valid_certs.filter(credentials__isnull=False)
if not certs_to_rotate.exists():
self.message_user(request, "No certificates with valid credentials found.", level='ERROR')
return
# Launch rotation tasks
rotated_count = 0
task_ids = []
for certificate in certs_to_rotate:
try:
task = generate_certificate_task.delay(certificate.id)
task_ids.append(task.id)
rotated_count += 1
except Exception as e:
self.message_user(request, f"Failed to start rotation for {certificate.domain}: {str(e)}", level='ERROR')
if rotated_count > 0:
domains = ', '.join(certs_to_rotate.values_list('domain', flat=True))
task_list = ', '.join(task_ids)
self.message_user(
request,
f'Successfully initiated certificate rotation for {rotated_count} certificate(s): {domains}. '
f'Task IDs: {task_list}. Certificates will be automatically redeployed to all servers once generated.',
level='SUCCESS'
)
rotate_selected_certificates.short_description = "🔄 Rotate selected Let's Encrypt certificates"
2025-08-08 05:46:36 +03:00
@admin.register(Inbound)
class InboundAdmin(admin.ModelAdmin):
2025-08-08 07:39:01 +03:00
"""Admin for inbound template management"""
2025-08-08 05:46:36 +03:00
list_display = (
'name', 'protocol', 'port', 'network',
2025-08-08 08:35:47 +03:00
'security', 'group_count'
2025-08-08 05:46:36 +03:00
)
list_filter = ('protocol', 'network', 'security')
2025-08-08 07:39:01 +03:00
search_fields = ('name',)
2025-08-08 05:46:36 +03:00
fieldsets = (
('Basic Configuration', {
2025-08-08 07:39:01 +03:00
'fields': ('name', 'protocol', 'port'),
2025-08-08 08:35:47 +03:00
'description': 'Certificates are configured per-server in Server admin → Inbound Templates tab'
2025-08-08 05:46:36 +03:00
}),
('Transport & Security', {
2025-08-08 08:35:47 +03:00
'fields': ('network', 'security', 'listen_address')
2025-08-08 05:46:36 +03:00
}),
('Advanced Settings', {
'fields': ('enable_sniffing', 'full_config_display'),
'classes': ('collapse',),
'description': 'Configuration is auto-generated based on basic settings above'
}),
('Timestamps', {
'fields': ('created_at', 'updated_at'),
'classes': ('collapse',)
})
)
readonly_fields = ('full_config_display', 'created_at', 'updated_at')
def group_count(self, obj):
"""Number of groups this inbound belongs to"""
return obj.subscriptiongroup_set.count()
group_count.short_description = 'Groups'
def full_config_display(self, obj):
"""Display full config in formatted JSON"""
if obj.full_config:
return format_html(
'<pre style="background: #f4f4f4; padding: 10px; border-radius: 4px; max-height: 400px; overflow-y: auto;">{}</pre>',
json.dumps(obj.full_config, indent=2)
)
return 'Not generated yet'
full_config_display.short_description = 'Configuration Preview'
def save_model(self, request, obj, form, change):
"""Generate config on save"""
try:
# Always regenerate config to reflect any changes
obj.build_config()
2025-08-08 06:50:04 +03:00
if change:
messages.success(request, f'✅ Inbound "{obj.name}" updated. Changes will be automatically deployed to servers.')
else:
messages.success(request, f'✅ Inbound "{obj.name}" created. It will be deployed when added to subscription groups.')
2025-08-08 05:46:36 +03:00
except Exception as e:
messages.warning(request, f'Inbound saved but config generation failed: {e}')
# Set empty dict if generation fails
if not obj.full_config:
obj.full_config = {}
super().save_model(request, obj, form, change)
class InboundInline(admin.TabularInline):
"""Inline for inbounds in subscription groups"""
model = SubscriptionGroup.inbounds.through
extra = 1
verbose_name = "Inbound"
verbose_name_plural = "Inbounds in this group"
class SubscriptionGroupAdmin(admin.ModelAdmin):
"""Admin for subscription groups"""
list_display = ('name', 'is_active', 'inbound_count', 'user_count', 'created_at')
list_filter = ('is_active',)
search_fields = ('name', 'description')
filter_horizontal = ('inbounds',)
fieldsets = (
('Group Information', {
'fields': ('name', 'description', 'is_active')
}),
('Inbounds', {
'fields': ('inbounds',),
2025-08-08 06:50:04 +03:00
'description': 'Select inbounds to include in this group. ' +
'<br><strong>🚀 Auto-sync enabled:</strong> Changes will be automatically deployed to servers!'
2025-08-08 05:46:36 +03:00
}),
('Statistics', {
'fields': ('group_statistics',),
'classes': ('collapse',)
})
)
readonly_fields = ('group_statistics',)
2025-08-08 06:50:04 +03:00
def save_model(self, request, obj, form, change):
"""Override save to notify about auto-sync"""
super().save_model(request, obj, form, change)
if change:
messages.success(
request,
f'Subscription group "{obj.name}" updated. Changes will be automatically synchronized to all Xray servers.'
)
else:
messages.success(
request,
f'Subscription group "{obj.name}" created. Inbounds will be automatically deployed when you add them to this group.'
)
2025-08-08 05:46:36 +03:00
def group_statistics(self, obj):
"""Display group statistics"""
if obj.pk:
stats = {
'Total Inbounds': obj.inbound_count,
'Active Users': obj.user_count,
'Protocols': list(obj.inbounds.values_list('protocol', flat=True).distinct()),
'Ports': list(obj.inbounds.values_list('port', flat=True).distinct())
}
html = '<div style="background: #f4f4f4; padding: 10px; border-radius: 4px;">'
for key, value in stats.items():
if isinstance(value, list):
value = ', '.join(map(str, value))
html += f'<div><strong>{key}:</strong> {value}</div>'
html += '</div>'
return format_html(html)
return 'Save to see statistics'
group_statistics.short_description = 'Group Statistics'
2025-08-08 07:39:01 +03:00
2025-08-08 05:46:36 +03:00
class UserSubscriptionInline(admin.TabularInline):
"""Inline for user subscriptions"""
model = UserSubscription
extra = 0
fields = ('subscription_group', 'active', 'created_at')
readonly_fields = ('created_at',)
verbose_name = "Subscription Group"
verbose_name_plural = "User's Subscription Groups"
# Extension for User admin
def add_subscription_management_to_user(UserAdmin):
"""Add subscription management to existing User admin"""
2025-08-08 12:41:33 +03:00
# Add inline only - no fieldset or widget
2025-08-08 05:46:36 +03:00
if hasattr(UserAdmin, 'inlines'):
UserAdmin.inlines = list(UserAdmin.inlines) + [UserSubscriptionInline]
else:
UserAdmin.inlines = [UserSubscriptionInline]
2025-08-08 07:39:01 +03:00
# UserSubscription admin will be integrated into unified Subscriptions admin
2025-08-08 05:46:36 +03:00
class UserSubscriptionAdmin(admin.ModelAdmin):
2025-08-08 07:39:01 +03:00
"""Admin for user subscriptions (integrated into unified Subscriptions admin)"""
2025-08-08 05:46:36 +03:00
list_display = ('user', 'subscription_group', 'active', 'created_at')
list_filter = ('active', 'subscription_group')
search_fields = ('user__username', 'subscription_group__name')
date_hierarchy = 'created_at'
def has_add_permission(self, request):
2025-08-08 07:39:01 +03:00
return True # Allow adding subscriptions
2025-08-08 05:46:36 +03:00
2025-08-08 07:39:01 +03:00
# ServerInbound admin is integrated into XrayServerV2 admin, not shown in main menu
2025-08-08 05:46:36 +03:00
class ServerInboundAdmin(admin.ModelAdmin):
"""Admin for server-inbound deployment tracking"""
2025-08-08 08:35:47 +03:00
list_display = ('server', 'inbound', 'certificate_status', 'active', 'deployed_at', 'updated_at')
2025-08-08 05:46:36 +03:00
list_filter = ('active', 'inbound__protocol', 'deployed_at')
search_fields = ('server__name', 'inbound__name')
date_hierarchy = 'deployed_at'
fieldsets = (
2025-08-08 07:39:01 +03:00
('Template Deployment', {
2025-08-08 05:46:36 +03:00
'fields': ('server', 'inbound', 'active')
}),
2025-08-08 08:35:47 +03:00
('Certificate Configuration', {
'fields': ('certificate', 'certificate_info'),
'description': 'Certificate for TLS. If not specified, will be auto-selected by server hostname.'
}),
2025-08-08 05:46:36 +03:00
('Timestamps', {
'fields': ('deployed_at', 'updated_at'),
'classes': ('collapse',)
})
)
2025-08-08 08:35:47 +03:00
readonly_fields = ('deployed_at', 'updated_at', 'certificate_info')
def certificate_status(self, obj):
"""Display certificate status"""
if not obj.requires_certificate():
return format_html('<span style="color: gray;">-</span>')
cert = obj.get_certificate()
if cert:
if cert.is_expired:
return format_html('<span style="color: red;">❌ {}</span>', cert.domain)
else:
return format_html('<span style="color: green;">✅ {}</span>', cert.domain)
else:
return format_html('<span style="color: orange;">⚠️ No cert</span>')
certificate_status.short_description = 'Certificate'
def certificate_info(self, obj):
"""Display certificate information and selection logic"""
html = '<div style="background: #f8f9fa; padding: 10px; border-radius: 4px;">'
if not obj.requires_certificate():
html += '<p><strong>Certificate not required</strong> for this protocol/security combination.</p>'
else:
cert = obj.get_certificate()
if obj.certificate:
html += f'<p><strong>✅ Explicit certificate:</strong> {obj.certificate.domain}</p>'
elif cert:
html += f'<p><strong>🔄 Auto-selected:</strong> {cert.domain} (matches server hostname)</p>'
else:
server_hostname = getattr(obj.server.get_real_instance(), 'client_hostname', 'N/A')
html += f'<p><strong>⚠️ No certificate found</strong></p>'
html += f'<p>Server hostname: <code>{server_hostname}</code></p>'
html += f'<p>Consider creating certificate for this domain or select one manually above.</p>'
html += '</div>'
return format_html(html)
certificate_info.short_description = 'Certificate Selection Info'
2025-08-08 07:39:01 +03:00
# Unified Subscriptions Admin with tabs
@admin.register(SubscriptionGroup)
class UnifiedSubscriptionsAdmin(admin.ModelAdmin):
"""Unified admin for managing both Subscription Groups and User Subscriptions"""
2025-08-08 05:46:36 +03:00
2025-08-08 07:39:01 +03:00
# Use SubscriptionGroup as the base model but provide access to UserSubscription via tabs
list_display = ('name', 'is_active', 'inbound_count', 'user_count', 'created_at')
list_filter = ('is_active',)
search_fields = ('name', 'description')
filter_horizontal = ('inbounds',)
def get_urls(self):
"""Add custom URLs for user subscriptions tab"""
urls = super().get_urls()
custom_urls = [
path('user-subscriptions/',
self.admin_site.admin_view(self.user_subscriptions_view),
name='vpn_usersubscription_changelist_tab'),
]
return custom_urls + urls
def user_subscriptions_view(self, request):
"""Redirect to user subscriptions with tab navigation"""
from django.shortcuts import redirect
return redirect('/admin/vpn/usersubscription/')
def changelist_view(self, request, extra_context=None):
"""Override changelist to add tab navigation"""
extra_context = extra_context or {}
extra_context.update({
'show_tab_navigation': True,
'current_tab': 'subscription_groups'
})
return super().changelist_view(request, extra_context)
def change_view(self, request, object_id, form_url='', extra_context=None):
"""Override change view to add tab navigation"""
extra_context = extra_context or {}
extra_context.update({
'show_tab_navigation': True,
'current_tab': 'subscription_groups'
})
return super().change_view(request, object_id, form_url, extra_context)
def add_view(self, request, form_url='', extra_context=None):
"""Override add view to add tab navigation"""
extra_context = extra_context or {}
extra_context.update({
'show_tab_navigation': True,
'current_tab': 'subscription_groups'
})
return super().add_view(request, form_url, extra_context)
# Copy fieldsets and methods from SubscriptionGroupAdmin
fieldsets = (
('Group Information', {
'fields': ('name', 'description', 'is_active')
}),
('Inbounds', {
'fields': ('inbounds',),
'description': 'Select inbounds to include in this group. ' +
'<br><strong>🚀 Auto-sync enabled:</strong> Changes will be automatically deployed to servers!'
}),
('Statistics', {
'fields': ('group_statistics',),
'classes': ('collapse',)
})
)
readonly_fields = ('group_statistics',)
def save_model(self, request, obj, form, change):
"""Override save to notify about auto-sync"""
super().save_model(request, obj, form, change)
if change:
messages.success(
request,
f'Subscription group "{obj.name}" updated. Changes will be automatically synchronized to all Xray servers.'
2025-08-08 05:46:36 +03:00
)
2025-08-08 07:39:01 +03:00
else:
messages.success(
request,
f'Subscription group "{obj.name}" created. Inbounds will be automatically deployed when you add them to this group.'
)
def group_statistics(self, obj):
"""Display group statistics"""
if obj.pk:
stats = {
'Total Inbounds': obj.inbound_count,
'Active Users': obj.user_count,
'Protocols': list(obj.inbounds.values_list('protocol', flat=True).distinct()),
'Ports': list(obj.inbounds.values_list('port', flat=True).distinct())
}
html = '<div style="background: #f4f4f4; padding: 10px; border-radius: 4px;">'
for key, value in stats.items():
if isinstance(value, list):
value = ', '.join(map(str, value))
html += f'<div><strong>{key}:</strong> {value}</div>'
html += '</div>'
return format_html(html)
return 'Save to see statistics'
group_statistics.short_description = 'Group Statistics'
# UserSubscription admin with tab navigation (hidden from main menu)
@admin.register(UserSubscription)
class UserSubscriptionTabAdmin(UserSubscriptionAdmin):
"""UserSubscription admin with tab navigation"""
def has_module_permission(self, request):
"""Hide this model from the main admin index"""
return False
def has_view_permission(self, request, obj=None):
"""Allow viewing through direct URL access"""
return request.user.is_staff
def has_add_permission(self, request):
"""Allow adding through direct URL access"""
return request.user.is_staff
def has_change_permission(self, request, obj=None):
"""Allow changing through direct URL access"""
return request.user.is_staff
def has_delete_permission(self, request, obj=None):
"""Allow deleting through direct URL access"""
return request.user.is_staff
def changelist_view(self, request, extra_context=None):
"""Override changelist to add tab navigation"""
extra_context = extra_context or {}
extra_context.update({
'show_tab_navigation': True,
'current_tab': 'user_subscriptions'
})
return super().changelist_view(request, extra_context)
def change_view(self, request, object_id, form_url='', extra_context=None):
"""Override change view to add tab navigation"""
extra_context = extra_context or {}
extra_context.update({
'show_tab_navigation': True,
'current_tab': 'user_subscriptions'
})
return super().change_view(request, object_id, form_url, extra_context)
def add_view(self, request, form_url='', extra_context=None):
"""Override add view to add tab navigation"""
extra_context = extra_context or {}
extra_context.update({
'show_tab_navigation': True,
'current_tab': 'user_subscriptions'
})
return super().add_view(request, form_url, extra_context)