Added TG bot

This commit is contained in:
Ultradesu
2025-08-15 04:02:22 +03:00
parent 402e4d84fc
commit 36f9e495b5
52 changed files with 6376 additions and 2081 deletions

0
telegram_bot/__init__.py Normal file
View File

621
telegram_bot/admin.py Normal file
View File

@@ -0,0 +1,621 @@
from django.contrib import admin
from django.utils.html import format_html
from django.urls import path, reverse
from django.shortcuts import redirect
from django.contrib import messages
from django.utils import timezone
from .models import BotSettings, TelegramMessage, AccessRequest
from .localization import MessageLocalizer
import logging
logger = logging.getLogger(__name__)
@admin.register(BotSettings)
class BotSettingsAdmin(admin.ModelAdmin):
list_display = ('__str__', 'enabled', 'bot_token_display', 'updated_at')
fieldsets = (
('Bot Configuration', {
'fields': ('bot_token', 'enabled', 'bot_status_display'),
'description': 'Configure bot settings and view current status'
}),
('Connection Settings', {
'fields': ('api_base_url', 'connection_timeout', 'use_proxy', 'proxy_url'),
'classes': ('collapse',)
}),
('Timestamps', {
'fields': ('created_at', 'updated_at'),
'classes': ('collapse',)
}),
)
readonly_fields = ('created_at', 'updated_at', 'bot_status_display')
def bot_token_display(self, obj):
"""Mask bot token for security"""
if obj.bot_token:
token = obj.bot_token
if len(token) > 10:
return f"{token[:6]}...{token[-4:]}"
return "Token set"
return "No token set"
bot_token_display.short_description = "Bot Token"
def bot_status_display(self, obj):
"""Display bot status with control buttons"""
from .bot import TelegramBotManager
import os
from django.conf import settings as django_settings
manager = TelegramBotManager()
# Check if lock file exists - only reliable indicator
lock_dir = os.path.join(getattr(django_settings, 'BASE_DIR', '/tmp'), 'telegram_bot_locks')
lock_path = os.path.join(lock_dir, 'telegram_bot.lock')
is_running = os.path.exists(lock_path)
if is_running:
status_html = '<span style="color: green; font-weight: bold;">🟢 Bot is RUNNING</span>'
else:
status_html = '<span style="color: red; font-weight: bold;">🔴 Bot is STOPPED</span>'
# Add control buttons
status_html += '<br><br>'
if is_running:
status_html += f'<a class="button" href="{reverse("admin:telegram_bot_stop_bot")}">Stop Bot</a> '
status_html += f'<a class="button" href="{reverse("admin:telegram_bot_restart_bot")}">Restart Bot</a>'
else:
if obj.enabled and obj.bot_token:
status_html += f'<a class="button" href="{reverse("admin:telegram_bot_start_bot")}">Start Bot</a>'
else:
status_html += '<span style="color: gray;">Configure bot token and enable bot to start</span>'
return format_html(status_html)
bot_status_display.short_description = "Bot Status"
def get_urls(self):
urls = super().get_urls()
custom_urls = [
path('start-bot/', self.start_bot, name='telegram_bot_start_bot'),
path('stop-bot/', self.stop_bot, name='telegram_bot_stop_bot'),
path('restart-bot/', self.restart_bot, name='telegram_bot_restart_bot'),
]
return custom_urls + urls
def start_bot(self, request):
"""Start the telegram bot"""
try:
from .bot import TelegramBotManager
manager = TelegramBotManager()
manager.start()
messages.success(request, "Bot started successfully!")
except Exception as e:
messages.error(request, f"Failed to start bot: {e}")
logger.error(f"Failed to start bot: {e}")
return redirect('admin:telegram_bot_botsettings_change', object_id=1)
def stop_bot(self, request):
"""Stop the telegram bot"""
try:
from .bot import TelegramBotManager
manager = TelegramBotManager()
manager.stop()
messages.success(request, "Bot stopped successfully!")
except Exception as e:
messages.error(request, f"Failed to stop bot: {e}")
logger.error(f"Failed to stop bot: {e}")
return redirect('admin:telegram_bot_botsettings_change', object_id=1)
def restart_bot(self, request):
"""Restart the telegram bot"""
try:
from .bot import TelegramBotManager
manager = TelegramBotManager()
manager.restart()
messages.success(request, "Bot restarted successfully!")
except Exception as e:
messages.error(request, f"Failed to restart bot: {e}")
logger.error(f"Failed to restart bot: {e}")
return redirect('admin:telegram_bot_botsettings_change', object_id=1)
def has_add_permission(self, request):
# Prevent creating multiple instances
return not BotSettings.objects.exists()
def has_delete_permission(self, request, obj=None):
# Prevent deletion
return False
@admin.register(TelegramMessage)
class TelegramMessageAdmin(admin.ModelAdmin):
list_display = (
'created_at',
'direction_display',
'user_display',
'language_display',
'message_preview',
'linked_user'
)
list_filter = (
'direction',
'created_at',
('linked_user', admin.EmptyFieldListFilter),
)
search_fields = (
'telegram_username',
'telegram_first_name',
'telegram_last_name',
'message_text',
'telegram_user_id'
)
readonly_fields = (
'direction',
'telegram_user_id',
'telegram_username',
'telegram_first_name',
'telegram_last_name',
'chat_id',
'message_id',
'message_text',
'raw_data_display',
'created_at',
'linked_user',
'user_language'
)
fieldsets = (
('Message Info', {
'fields': (
'direction',
'message_text',
'created_at'
)
}),
('Telegram User', {
'fields': (
'telegram_user_id',
'telegram_username',
'telegram_first_name',
'telegram_last_name',
)
}),
('Technical Details', {
'fields': (
'chat_id',
'message_id',
'linked_user',
'raw_data_display'
),
'classes': ('collapse',)
})
)
ordering = ['-created_at']
list_per_page = 50
date_hierarchy = 'created_at'
def direction_display(self, obj):
"""Display direction with icon"""
if obj.direction == 'incoming':
return format_html('<span style="color: blue;">⬇️ Incoming</span>')
else:
return format_html('<span style="color: green;">⬆️ Outgoing</span>')
direction_display.short_description = "Direction"
def user_display(self, obj):
"""Display user info"""
display = obj.display_name
if obj.telegram_user_id:
display += f" (ID: {obj.telegram_user_id})"
return display
user_display.short_description = "Telegram User"
def language_display(self, obj):
"""Display user language"""
lang_map = {'ru': '🇷🇺 RU', 'en': '🇺🇸 EN'}
return lang_map.get(obj.user_language, obj.user_language or 'Unknown')
language_display.short_description = "Language"
def message_preview(self, obj):
"""Show message preview"""
if len(obj.message_text) > 100:
return obj.message_text[:100] + "..."
return obj.message_text
message_preview.short_description = "Message"
def raw_data_display(self, obj):
"""Display raw data as formatted JSON"""
import json
if obj.raw_data:
formatted = json.dumps(obj.raw_data, indent=2, ensure_ascii=False)
return format_html('<pre style="max-width: 800px; overflow: auto;">{}</pre>', formatted)
return "No raw data"
raw_data_display.short_description = "Raw Data"
def has_add_permission(self, request):
# Messages are created automatically by bot
return False
def has_change_permission(self, request, obj=None):
# Messages are read-only
return False
def has_delete_permission(self, request, obj=None):
# Allow deletion for cleanup
return request.user.is_superuser
def get_actions(self, request):
"""Add custom actions"""
actions = super().get_actions(request)
if not request.user.is_superuser:
# Remove delete action for non-superusers
if 'delete_selected' in actions:
del actions['delete_selected']
return actions
@admin.register(AccessRequest)
class AccessRequestAdmin(admin.ModelAdmin):
list_display = (
'created_at',
'user_display',
'approved_display',
'language_display',
'desired_username_display',
'message_preview',
'created_user',
'processed_by'
)
list_filter = (
'approved',
'created_at',
'processed_at',
('processed_by', admin.EmptyFieldListFilter),
)
search_fields = (
'telegram_username',
'telegram_first_name',
'telegram_last_name',
'telegram_user_id',
'message_text'
)
readonly_fields = (
'telegram_user_id',
'telegram_username',
'telegram_first_name',
'telegram_last_name',
'message_text',
'chat_id',
'created_at',
'first_message',
'processed_at',
'processed_by',
'created_user',
'user_language'
)
fieldsets = (
('Request Info', {
'fields': (
'approved',
'admin_comment',
'created_at',
'processed_at',
'processed_by'
)
}),
('User Creation', {
'fields': (
'desired_username',
),
'description': 'Edit username before approving the request'
}),
('Telegram User', {
'fields': (
'telegram_user_id',
'telegram_username',
'telegram_first_name',
'telegram_last_name',
)
}),
('Message Details', {
'fields': (
'message_text',
'chat_id',
'first_message'
),
'classes': ('collapse',)
}),
('Processing Results', {
'fields': (
'created_user',
)
})
)
ordering = ['-created_at']
list_per_page = 50
date_hierarchy = 'created_at'
actions = ['approve_requests']
def user_display(self, obj):
"""Display user info"""
return obj.display_name
user_display.short_description = "Telegram User"
def approved_display(self, obj):
"""Display approved status with colors"""
if obj.approved:
return format_html('<span style="color: green; font-weight: bold;">✅ Approved</span>')
else:
return format_html('<span style="color: orange; font-weight: bold;">🔄 Pending</span>')
approved_display.short_description = "Status"
def message_preview(self, obj):
"""Show message preview"""
if len(obj.message_text) > 100:
return obj.message_text[:100] + "..."
return obj.message_text
message_preview.short_description = "Message"
def desired_username_display(self, obj):
"""Display desired username"""
if obj.desired_username:
return obj.desired_username
else:
fallback = obj.telegram_username or obj.telegram_first_name or f"tg_{obj.telegram_user_id}"
return format_html('<span style="color: gray; font-style: italic;">{}</span>', fallback)
desired_username_display.short_description = "Desired Username"
def language_display(self, obj):
"""Display user language with flag"""
lang_map = {'ru': '🇷🇺 RU', 'en': '🇺🇸 EN'}
return lang_map.get(obj.user_language, obj.user_language or 'Unknown')
language_display.short_description = "Language"
def approve_requests(self, request, queryset):
"""Approve selected access requests"""
pending_requests = queryset.filter(approved=False)
count = 0
errors = []
for access_request in pending_requests:
try:
logger.info(f"Approving request {access_request.id} from user {access_request.telegram_user_id}")
user = self._create_user_from_request(access_request, request.user)
if user:
access_request.approved = True
access_request.processed_by = request.user
access_request.processed_at = timezone.now()
access_request.created_user = user
access_request.save()
logger.info(f"Successfully approved request {access_request.id}, created user {user.username}")
# Send notification to user
self._send_approval_notification(access_request)
count += 1
else:
errors.append(f"Failed to create user for {access_request.display_name}")
except Exception as e:
error_msg = f"Failed to approve request from {access_request.display_name}: {e}"
logger.error(error_msg)
errors.append(error_msg)
if count:
messages.success(request, f"Successfully approved {count} request(s)")
if errors:
for error in errors:
messages.error(request, error)
approve_requests.short_description = "✅ Approve selected requests"
def _create_user_from_request(self, access_request, admin_user):
"""Create User from AccessRequest or link to existing user"""
from vpn.models import User
import secrets
import string
try:
# Check if user already exists by telegram_user_id
existing_user = User.objects.filter(telegram_user_id=access_request.telegram_user_id).first()
if existing_user:
logger.info(f"User already exists: {existing_user.username}")
return existing_user
# Check if we can link to existing user by telegram_username
if access_request.telegram_username:
existing_user_by_username = User.objects.filter(
telegram_username__iexact=access_request.telegram_username,
telegram_user_id__isnull=True # Not yet linked to Telegram
).first()
if existing_user_by_username:
# Link telegram data to existing user
logger.info(f"Linking Telegram @{access_request.telegram_username} to existing user {existing_user_by_username.username}")
existing_user_by_username.telegram_user_id = access_request.telegram_user_id
existing_user_by_username.telegram_username = access_request.telegram_username
existing_user_by_username.telegram_first_name = access_request.telegram_first_name or ""
existing_user_by_username.telegram_last_name = access_request.telegram_last_name or ""
existing_user_by_username.save()
return existing_user_by_username
# Use desired_username if provided, otherwise fallback to Telegram data
username = access_request.desired_username
if not username:
# Fallback to telegram_username, first_name or user_id
username = access_request.telegram_username or access_request.telegram_first_name or f"tg_{access_request.telegram_user_id}"
# Clean username (remove special characters)
username = ''.join(c for c in username if c.isalnum() or c in '_-').lower()
if not username:
username = f"tg_{access_request.telegram_user_id}"
# Make sure username is unique
original_username = username
counter = 1
while User.objects.filter(username=username).exists():
username = f"{original_username}_{counter}"
counter += 1
# Create new user since no existing user found to link
# Generate random password
password = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(12))
logger.info(f"Creating new user with username: {username}")
# Create user
user = User.objects.create_user(
username=username,
password=password,
first_name=access_request.telegram_first_name or "",
last_name=access_request.telegram_last_name or "",
telegram_user_id=access_request.telegram_user_id,
telegram_username=access_request.telegram_username or "",
telegram_first_name=access_request.telegram_first_name or "",
telegram_last_name=access_request.telegram_last_name or "",
is_active=True
)
logger.info(f"Successfully created user {user.username} (ID: {user.id}) from Telegram request {access_request.id}")
return user
except Exception as e:
logger.error(f"Error creating user from request {access_request.id}: {e}")
raise
def _send_approval_notification(self, access_request):
"""Send approval notification via Telegram"""
try:
from .models import BotSettings
from telegram import Bot
import asyncio
settings = BotSettings.get_settings()
if not settings.enabled or not settings.bot_token:
logger.warning("Bot not configured, skipping notification")
return
# Create a simple Bot instance for sending notification
# This bypasses the need for the running bot manager
async def send_notification():
try:
# Create bot with custom request settings
from telegram.request import HTTPXRequest
request_kwargs = {
'connection_pool_size': 1,
'read_timeout': settings.connection_timeout,
'write_timeout': settings.connection_timeout,
'connect_timeout': settings.connection_timeout,
}
if settings.use_proxy and settings.proxy_url:
request_kwargs['proxy'] = settings.proxy_url
request = HTTPXRequest(**request_kwargs)
bot = Bot(token=settings.bot_token, request=request)
# Send localized approval message with new keyboard
from telegram import ReplyKeyboardMarkup, KeyboardButton
language = access_request.user_language or 'en'
# Get localized texts
message = MessageLocalizer.get_message('approval_notification', language)
access_btn_text = MessageLocalizer.get_button_text('access', language)
# Create keyboard with Access button
keyboard = [[KeyboardButton(access_btn_text)]]
reply_markup = ReplyKeyboardMarkup(
keyboard,
resize_keyboard=True,
one_time_keyboard=False
)
await bot.send_message(
chat_id=access_request.telegram_user_id,
text=message,
reply_markup=reply_markup
)
logger.info(f"Sent approval notification to {access_request.telegram_user_id}")
except Exception as e:
logger.error(f"Failed to send Telegram message: {e}")
finally:
try:
# Clean up bot connection
await request.shutdown()
except:
pass
# Run in thread to avoid blocking admin interface
import threading
def run_async_notification():
try:
asyncio.run(send_notification())
except Exception as e:
logger.error(f"Error in notification thread: {e}")
thread = threading.Thread(target=run_async_notification, daemon=True)
thread.start()
except Exception as e:
logger.error(f"Failed to send approval notification: {e}")
def has_add_permission(self, request):
# Requests are created by bot
return False
def has_change_permission(self, request, obj=None):
# Allow changing only status and comment
return True
def save_model(self, request, obj, form, change):
"""Automatically handle approval and user creation"""
# Check if this is a change to approved
was_approved = False
# If desired_username was changed and is empty, set default from Telegram data
if change and 'desired_username' in form.changed_data and not obj.desired_username:
obj.desired_username = obj.telegram_username or obj.telegram_first_name or f"tg_{obj.telegram_user_id}"
if change and 'approved' in form.changed_data and obj.approved:
# Set processed_by and processed_at
if not obj.processed_by:
obj.processed_by = request.user
if not obj.processed_at:
obj.processed_at = timezone.now()
was_approved = True
# If approved and no user created yet, create user
if was_approved and not obj.created_user:
try:
logger.info(f"Auto-creating user for approved request {obj.id}")
user = self._create_user_from_request(obj, request.user)
if user:
obj.created_user = user
messages.success(request, f"User '{user.username}' created successfully!")
logger.info(f"Auto-created user {user.username} for request {obj.id}")
# Send approval notification
self._send_approval_notification(obj)
else:
messages.error(request, f"Failed to create user for approved request {obj.id}")
except Exception as e:
messages.error(request, f"Error creating user: {e}")
logger.error(f"Error auto-creating user for request {obj.id}: {e}")
# Save the object
super().save_model(request, obj, form, change)

75
telegram_bot/apps.py Normal file
View File

@@ -0,0 +1,75 @@
from django.apps import AppConfig
import logging
logger = logging.getLogger(__name__)
class TelegramBotConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'telegram_bot'
def ready(self):
"""Called when Django starts - attempt to auto-start bot if enabled"""
import sys
import os
# Skip auto-start in various scenarios
skip_conditions = [
# Management commands
'migrate' in sys.argv,
'makemigrations' in sys.argv,
'collectstatic' in sys.argv,
'shell' in sys.argv,
'test' in sys.argv,
# Celery processes
'celery' in sys.argv,
'worker' in sys.argv,
'beat' in sys.argv,
# Environment variables that indicate worker/beat processes
os.environ.get('CELERY_WORKER_NAME'),
os.environ.get('CELERY_BEAT'),
# Process name detection
any('celery' in arg.lower() for arg in sys.argv),
any('worker' in arg.lower() for arg in sys.argv),
any('beat' in arg.lower() for arg in sys.argv),
]
if any(skip_conditions):
logger.info(f"Skipping Telegram bot auto-start in process: {' '.join(sys.argv)}")
return
# Additional process detection by checking if we're in main process
try:
# Check if this is the main Django process (not a worker)
current_process = os.environ.get('DJANGO_SETTINGS_MODULE')
if not current_process:
logger.info("Skipping bot auto-start: not in main Django process")
return
except:
pass
# Delay import to avoid circular imports
try:
from .bot import TelegramBotManager
import threading
import time
def delayed_autostart():
# Wait a bit for Django to fully initialize
time.sleep(2)
try:
manager = TelegramBotManager()
if manager.auto_start_if_enabled():
logger.info("Telegram bot auto-started successfully")
else:
logger.info("Telegram bot auto-start skipped (disabled or already running)")
except Exception as e:
logger.error(f"Failed to auto-start Telegram bot: {e}")
logger.info("Starting Telegram bot auto-start thread")
# Start in background thread to not block Django startup
thread = threading.Thread(target=delayed_autostart, daemon=True)
thread.start()
except Exception as e:
logger.error(f"Error setting up Telegram bot auto-start: {e}")

1060
telegram_bot/bot.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,207 @@
"""
Message localization for Telegram bot
"""
from typing import Optional, Dict, Any
import logging
logger = logging.getLogger(__name__)
# Translation dictionaries
MESSAGES = {
'en': {
'help_text': "📋 Welcome! Use buttons below to navigate.\n\n📊 Access - View your VPN subscriptions\n\nFor support contact administrator.",
'access_request_created': "Access request created, please wait.",
'new_user_welcome': "Welcome! To get access to VPN services, please request access using the button below.",
'pending_request_msg': "Your access request is pending approval. Please wait for administrator to review it.",
'choose_subscription': "**Choose subscription option:**",
'all_in_one_desc': "🌍 **All-in-one** - Get all subscriptions in one link",
'group_desc': "**Group** - Get specific group subscription",
'select_option': "Select an option below:",
'no_subscriptions': "❌ You don't have any active Xray subscriptions.\n\nPlease contact administrator for access.",
'group_subscription': "**Group: {group_name}**",
'subscription_link': "**🔗 Subscription Link:**",
'web_portal': "**🌐 Web Portal:**",
'tap_to_copy': "_Tap the subscription link to copy it. Use it in your Xray client._",
'all_in_one_subscription': "🌍 **All-in-one Subscription**",
'your_access_includes': "**Your Access Includes:**",
'universal_subscription_link': "**🔗 Universal Subscription Link:**",
'all_subscriptions_note': "_This link includes all your active subscriptions. Tap to copy._",
'error_loading_subscriptions': "❌ Error loading subscriptions. Please try again later.",
'error_loading_group': "❌ Error loading group subscription. Please try again later.",
'received_content': "Received your {message_type}. An administrator will review it.",
'approval_notification': "✅ Access approved!",
'content_types': {
'photo': 'photo',
'document': 'document',
'voice': 'voice',
'video': 'video',
'content': 'content'
},
'buttons': {
'access': "🌍 Get access",
'all_in_one': "🌍 All-in-one",
'back': "⬅️ Back",
'group_prefix': "Group: ",
'request_access': "🔑 Request Access"
}
},
'ru': {
'help_text': "📋 Добро пожаловать! Используйте кнопки ниже для навигации.\n\n📊 Доступ - Просмотр VPN подписок\n\nДля поддержки обратитесь к администратору.",
'access_request_created': "Запрос на доступ создан, ожидайте.",
'new_user_welcome': "Добро пожаловать! Для получения доступа к VPN сервисам, пожалуйста запросите доступ с помощью кнопки ниже.",
'pending_request_msg': "Ваш запрос на доступ ожидает одобрения. Пожалуйста, дождитесь рассмотрения администратором.",
'choose_subscription': "**Выберите вариант подписки:**",
'all_in_one_desc': "🌍 **Все в одном** - Получить все подписки в одной ссылке",
'group_desc': "**Группа** - Получить подписку на группу",
'select_option': "Выберите вариант ниже:",
'no_subscriptions': "У вас нет активных Xray подписок.\n\nОбратитесь к администратору для получения доступа.",
'group_subscription': "**Группа: {group_name}**",
'subscription_link': "**🔗 **",
'web_portal': "**🌐 Веб-портал пользователя:**",
'tap_to_copy': "_Нажмите на ссылку чтобы скопировать. Используйте в вашем Xray клиенте как подписку._",
'all_in_one_subscription': "🌍 **Подписка «Все в одном»**",
'your_access_includes': "**Ваш доступ включает:**",
'universal_subscription_link': "**🔗 Универсальная ссылка на подписку:**",
'all_subscriptions_note': "_Эта ссылка включает все ваши активные подписки. Нажмите чтобы скопировать._",
'error_loading_subscriptions': "❌ Ошибка загрузки подписок. Попробуйте позже.",
'error_loading_group': "❌ Ошибка загрузки подписки группы. Попробуйте позже.",
'received_content': "Получен ваш {message_type}. Администратор его рассмотрит.",
'approval_notification': "✅ Доступ одобрен!",
'content_types': {
'photo': 'фото',
'document': 'документ',
'voice': 'голосовое сообщение',
'video': 'видео',
'content': 'контент'
},
'buttons': {
'access': "🌍 Получить VPN",
'all_in_one': "🌍 Все в одном",
'back': "⬅️ Назад",
'group_prefix': "Группа: ",
'request_access': "🔑 Запросить доступ"
}
}
}
class MessageLocalizer:
"""Class for bot message localization"""
@staticmethod
def get_user_language(telegram_user) -> str:
"""
Determines user language from Telegram language_code
Args:
telegram_user: Telegram user object
Returns:
str: Language code ('ru' or 'en')
"""
if not telegram_user:
return 'en'
language_code = getattr(telegram_user, 'language_code', None)
if not language_code:
return 'en'
# Support Russian and English
if language_code.startswith('ru'):
return 'ru'
else:
return 'en'
@staticmethod
def get_message(key: str, language: str = 'en', **kwargs) -> str:
"""
Gets localized message
Args:
key: Message key
language: Language code
**kwargs: Formatting parameters
Returns:
str: Localized message
"""
try:
# Fallback to English if language not supported
if language not in MESSAGES:
language = 'en'
message = MESSAGES[language].get(key, MESSAGES['en'].get(key, f"Missing translation: {key}"))
# Format with parameters
if kwargs:
try:
message = message.format(**kwargs)
except (KeyError, ValueError) as e:
logger.warning(f"Error formatting message {key}: {e}")
return message
except Exception as e:
logger.error(f"Error getting message {key} for language {language}: {e}")
return f"Error: {key}"
@staticmethod
def get_button_text(button_key: str, language: str = 'en') -> str:
"""
Gets button text
Args:
button_key: Button key
language: Language code
Returns:
str: Button text
"""
try:
if language not in MESSAGES:
language = 'en'
buttons = MESSAGES[language].get('buttons', {})
return buttons.get(button_key, MESSAGES['en']['buttons'].get(button_key, button_key))
except Exception as e:
logger.error(f"Error getting button text {button_key} for language {language}: {e}")
return button_key
@staticmethod
def get_content_type_name(content_type: str, language: str = 'en') -> str:
"""
Gets localized content type name
Args:
content_type: Content type
language: Language code
Returns:
str: Localized name
"""
try:
if language not in MESSAGES:
language = 'en'
content_types = MESSAGES[language].get('content_types', {})
return content_types.get(content_type, content_type)
except Exception as e:
logger.error(f"Error getting content type {content_type} for language {language}: {e}")
return content_type
# Convenience functions for use in code
def get_localized_message(telegram_user, message_key: str, **kwargs) -> str:
"""Get localized message for user"""
language = MessageLocalizer.get_user_language(telegram_user)
return MessageLocalizer.get_message(message_key, language, **kwargs)
def get_localized_button(telegram_user, button_key: str) -> str:
"""Get localized button text for user"""
language = MessageLocalizer.get_user_language(telegram_user)
return MessageLocalizer.get_button_text(button_key, language)
def get_user_language(telegram_user) -> str:
"""Get user language"""
return MessageLocalizer.get_user_language(telegram_user)

View File

View File

@@ -0,0 +1,99 @@
import logging
import signal
import sys
import time
from django.core.management.base import BaseCommand
from django.utils import timezone
from telegram_bot.models import BotSettings, BotStatus
from telegram_bot.bot import TelegramBotManager
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'Run the Telegram bot'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.bot_manager = None
self.running = False
def add_arguments(self, parser):
parser.add_argument(
'--force',
action='store_true',
help='Force start even if bot is disabled in settings',
)
def handle(self, *args, **options):
"""Main command handler"""
# Set up signal handlers
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
# Check settings
settings = BotSettings.get_settings()
if not settings.bot_token:
self.stdout.write(
self.style.ERROR('Bot token is not configured. Please configure it in the admin panel.')
)
return
if not settings.enabled and not options['force']:
self.stdout.write(
self.style.WARNING('Bot is disabled in settings. Use --force to override.')
)
return
# Initialize bot manager
self.bot_manager = TelegramBotManager()
try:
# Start the bot
self.stdout.write(self.style.SUCCESS('Starting Telegram bot...'))
self.bot_manager.start()
self.running = True
self.stdout.write(
self.style.SUCCESS(f'Bot is running. Press Ctrl+C to stop.')
)
# Keep the main thread alive
while self.running:
time.sleep(1)
# Check if bot is still running
if not self.bot_manager.is_running:
self.stdout.write(
self.style.ERROR('Bot stopped unexpectedly. Check logs for errors.')
)
break
except KeyboardInterrupt:
self.stdout.write('\nReceived interrupt signal...')
except Exception as e:
self.stdout.write(
self.style.ERROR(f'Error running bot: {e}')
)
logger.error(f'Error running bot: {e}', exc_info=True)
# Update status
status = BotStatus.get_status()
status.is_running = False
status.last_error = str(e)
status.last_stopped = timezone.now()
status.save()
finally:
# Stop the bot
if self.bot_manager:
self.stdout.write('Stopping bot...')
self.bot_manager.stop()
self.stdout.write(self.style.SUCCESS('Bot stopped.'))
def signal_handler(self, signum, frame):
"""Handle shutdown signals"""
self.stdout.write('\nShutting down gracefully...')
self.running = False

View File

@@ -0,0 +1,112 @@
import logging
import os
from django.core.management.base import BaseCommand
from django.utils import timezone
from telegram_bot.models import BotSettings, BotStatus
from telegram_bot.bot import TelegramBotManager
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'Check Telegram bot status and optionally start it'
def add_arguments(self, parser):
parser.add_argument(
'--auto-start',
action='store_true',
help='Automatically start bot if enabled in settings',
)
parser.add_argument(
'--sync-status',
action='store_true',
help='Sync database status with real bot state',
)
def handle(self, *args, **options):
"""Check bot status"""
try:
manager = TelegramBotManager()
settings = BotSettings.get_settings()
status = BotStatus.get_status()
# Show current configuration
self.stdout.write(f"Bot Configuration:")
self.stdout.write(f" Enabled: {settings.enabled}")
self.stdout.write(f" Token configured: {'Yes' if settings.bot_token else 'No'}")
# Show status
real_running = manager.is_running
db_running = status.is_running
self.stdout.write(f"\nBot Status:")
self.stdout.write(f" Database status: {'Running' if db_running else 'Stopped'}")
self.stdout.write(f" Real status: {'Running' if real_running else 'Stopped'}")
# Check lock file status
from django.conf import settings as django_settings
lock_dir = os.path.join(getattr(django_settings, 'BASE_DIR', '/tmp'), 'telegram_bot_locks')
lock_path = os.path.join(lock_dir, 'telegram_bot.lock')
if os.path.exists(lock_path):
try:
with open(lock_path, 'r') as f:
lock_pid = f.read().strip()
self.stdout.write(f" Lock file: exists (PID: {lock_pid})")
except:
self.stdout.write(f" Lock file: exists (unreadable)")
else:
self.stdout.write(f" Lock file: not found")
if db_running != real_running:
self.stdout.write(
self.style.WARNING("⚠️ Status mismatch detected!")
)
if options['sync_status']:
status.is_running = real_running
if not real_running:
status.last_stopped = timezone.now()
status.save()
self.stdout.write(
self.style.SUCCESS("✅ Status synchronized")
)
# Show timestamps
if status.last_started:
self.stdout.write(f" Last started: {status.last_started}")
if status.last_stopped:
self.stdout.write(f" Last stopped: {status.last_stopped}")
if status.last_error:
self.stdout.write(f" Last error: {status.last_error}")
# Auto-start if requested
if options['auto_start']:
if not real_running and settings.enabled and settings.bot_token:
self.stdout.write("\nAttempting to start bot...")
try:
manager.start()
self.stdout.write(
self.style.SUCCESS("✅ Bot started successfully")
)
except Exception as e:
self.stdout.write(
self.style.ERROR(f"❌ Failed to start bot: {e}")
)
elif real_running:
self.stdout.write(
self.style.SUCCESS("✅ Bot is already running")
)
elif not settings.enabled:
self.stdout.write(
self.style.WARNING("⚠️ Bot is disabled in settings")
)
elif not settings.bot_token:
self.stdout.write(
self.style.ERROR("❌ Bot token not configured")
)
except Exception as e:
self.stdout.write(
self.style.ERROR(f"❌ Error checking bot status: {e}")
)

View File

@@ -0,0 +1,70 @@
# Generated by Django 5.1.7 on 2025-08-14 11:18
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='BotSettings',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('bot_token', models.CharField(help_text='Telegram Bot Token from @BotFather', max_length=255)),
('enabled', models.BooleanField(default=False, help_text='Enable/Disable the bot')),
('welcome_message', models.TextField(default='Hello! Your message has been received. An administrator will review it.', help_text='Message sent when user starts conversation')),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
],
options={
'verbose_name': 'Bot Settings',
'verbose_name_plural': 'Bot Settings',
},
),
migrations.CreateModel(
name='BotStatus',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('is_running', models.BooleanField(default=False)),
('last_started', models.DateTimeField(blank=True, null=True)),
('last_stopped', models.DateTimeField(blank=True, null=True)),
('last_error', models.TextField(blank=True)),
('last_update_id', models.BigIntegerField(blank=True, help_text='Last processed update ID from Telegram', null=True)),
],
options={
'verbose_name': 'Bot Status',
'verbose_name_plural': 'Bot Status',
},
),
migrations.CreateModel(
name='TelegramMessage',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('direction', models.CharField(choices=[('incoming', 'Incoming'), ('outgoing', 'Outgoing')], db_index=True, max_length=10)),
('telegram_user_id', models.BigIntegerField(db_index=True)),
('telegram_username', models.CharField(blank=True, db_index=True, max_length=255, null=True)),
('telegram_first_name', models.CharField(blank=True, max_length=255, null=True)),
('telegram_last_name', models.CharField(blank=True, max_length=255, null=True)),
('chat_id', models.BigIntegerField(db_index=True)),
('message_id', models.BigIntegerField(blank=True, null=True)),
('message_text', models.TextField(blank=True)),
('raw_data', models.JSONField(blank=True, default=dict, help_text='Full message data from Telegram')),
('created_at', models.DateTimeField(auto_now_add=True, db_index=True)),
('linked_user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='telegram_messages', to=settings.AUTH_USER_MODEL)),
],
options={
'verbose_name': 'Telegram Message',
'verbose_name_plural': 'Telegram Messages',
'ordering': ['-created_at'],
'indexes': [models.Index(fields=['-created_at', 'direction'], name='telegram_bo_created_19b81b_idx'), models.Index(fields=['telegram_user_id', '-created_at'], name='telegram_bo_telegra_f71f27_idx')],
},
),
]

View File

@@ -0,0 +1,33 @@
# Generated by Django 5.1.7 on 2025-08-14 12:09
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('telegram_bot', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='botsettings',
name='api_base_url',
field=models.URLField(blank=True, default='https://api.telegram.org', help_text='Telegram API base URL (change for local bot API server)'),
),
migrations.AddField(
model_name='botsettings',
name='connection_timeout',
field=models.IntegerField(default=30, help_text='Connection timeout in seconds'),
),
migrations.AddField(
model_name='botsettings',
name='proxy_url',
field=models.URLField(blank=True, help_text='Proxy URL (e.g., http://proxy:8080 or socks5://proxy:1080)'),
),
migrations.AddField(
model_name='botsettings',
name='use_proxy',
field=models.BooleanField(default=False, help_text='Enable proxy for Telegram API connections'),
),
]

View File

@@ -0,0 +1,42 @@
# Generated by Django 5.1.7 on 2025-08-14 12:24
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('telegram_bot', '0002_add_connection_settings'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='AccessRequest',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('telegram_user_id', models.BigIntegerField(db_index=True, help_text='Telegram user ID who made the request')),
('telegram_username', models.CharField(blank=True, help_text='Telegram username (without @)', max_length=255, null=True)),
('telegram_first_name', models.CharField(blank=True, help_text='First name from Telegram', max_length=255, null=True)),
('telegram_last_name', models.CharField(blank=True, help_text='Last name from Telegram', max_length=255, null=True)),
('message_text', models.TextField(help_text='The message sent by user when requesting access')),
('chat_id', models.BigIntegerField(help_text='Telegram chat ID for sending notifications')),
('status', models.CharField(choices=[('pending', 'Pending'), ('approved', 'Approved'), ('rejected', 'Rejected')], db_index=True, default='pending', max_length=20)),
('admin_comment', models.TextField(blank=True, help_text='Admin comment for approval/rejection')),
('created_at', models.DateTimeField(auto_now_add=True)),
('processed_at', models.DateTimeField(blank=True, null=True)),
('created_user', models.ForeignKey(blank=True, help_text='User created from this request (when approved)', null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL)),
('first_message', models.ForeignKey(blank=True, help_text='First message from this user', null=True, on_delete=django.db.models.deletion.SET_NULL, to='telegram_bot.telegrammessage')),
('processed_by', models.ForeignKey(blank=True, help_text='Admin who processed this request', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='processed_requests', to=settings.AUTH_USER_MODEL)),
],
options={
'verbose_name': 'Access Request',
'verbose_name_plural': 'Access Requests',
'ordering': ['-created_at'],
'indexes': [models.Index(fields=['telegram_user_id'], name='telegram_bo_telegra_e3429d_idx'), models.Index(fields=['status', '-created_at'], name='telegram_bo_status_cf9310_idx'), models.Index(fields=['-created_at'], name='telegram_bo_created_c82a74_idx')],
'constraints': [models.UniqueConstraint(fields=('telegram_user_id',), name='unique_telegram_user_request')],
},
),
]

View File

@@ -0,0 +1,37 @@
# Generated by Django 5.1.7 on 2025-08-14 13:49
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('telegram_bot', '0003_accessrequest'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.RemoveIndex(
model_name='accessrequest',
name='telegram_bo_status_cf9310_idx',
),
migrations.RemoveField(
model_name='accessrequest',
name='status',
),
migrations.AddField(
model_name='accessrequest',
name='approved',
field=models.BooleanField(db_index=True, default=False, help_text='Request approved by administrator'),
),
migrations.AlterField(
model_name='accessrequest',
name='admin_comment',
field=models.TextField(blank=True, help_text='Admin comment for approval'),
),
migrations.AddIndex(
model_name='accessrequest',
index=models.Index(fields=['approved', '-created_at'], name='telegram_bo_approve_7ae92d_idx'),
),
]

View File

@@ -0,0 +1,25 @@
# Generated by Django 5.1.7 on 2025-08-14 22:24
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('telegram_bot', '0004_remove_accessrequest_telegram_bo_status_cf9310_idx_and_more'),
]
operations = [
migrations.DeleteModel(
name='BotStatus',
),
migrations.RemoveField(
model_name='botsettings',
name='welcome_message',
),
migrations.AddField(
model_name='botsettings',
name='help_message',
field=models.TextField(default='📋 Available commands:\n/start - Start conversation\n📊 Access - View your VPN subscriptions\n\nFor support contact administrator.', help_text='Help message sent for unrecognized commands'),
),
]

View File

@@ -0,0 +1,18 @@
# Generated by Django 5.1.7 on 2025-08-14 22:30
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('telegram_bot', '0005_delete_botstatus_remove_botsettings_welcome_message_and_more'),
]
operations = [
migrations.AddField(
model_name='accessrequest',
name='desired_username',
field=models.CharField(blank=True, help_text='Desired username for VPN user (defaults to Telegram username)', max_length=150),
),
]

View File

@@ -0,0 +1,27 @@
# Generated by Django 5.1.7 on 2025-08-14 22:54
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('telegram_bot', '0006_accessrequest_desired_username'),
]
operations = [
migrations.RemoveField(
model_name='botsettings',
name='help_message',
),
migrations.AddField(
model_name='accessrequest',
name='user_language',
field=models.CharField(default='en', help_text="User's preferred language (en/ru)", max_length=10),
),
migrations.AddField(
model_name='telegrammessage',
name='user_language',
field=models.CharField(default='en', help_text="User's preferred language (en/ru)", max_length=10),
),
]

View File

292
telegram_bot/models.py Normal file
View File

@@ -0,0 +1,292 @@
from django.db import models
from django.contrib.auth import get_user_model
from django.utils import timezone
import json
User = get_user_model()
class BotSettings(models.Model):
"""Singleton model for bot settings"""
bot_token = models.CharField(
max_length=255,
help_text="Telegram Bot Token from @BotFather"
)
enabled = models.BooleanField(
default=False,
help_text="Enable/Disable the bot"
)
use_proxy = models.BooleanField(
default=False,
help_text="Enable proxy for Telegram API connections"
)
proxy_url = models.URLField(
blank=True,
help_text="Proxy URL (e.g., http://proxy:8080 or socks5://proxy:1080)"
)
api_base_url = models.URLField(
blank=True,
default="https://api.telegram.org",
help_text="Telegram API base URL (change for local bot API server)"
)
connection_timeout = models.IntegerField(
default=30,
help_text="Connection timeout in seconds"
)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
verbose_name = "Bot Settings"
verbose_name_plural = "Bot Settings"
def save(self, *args, **kwargs):
# Ensure only one instance exists
self.pk = 1
super().save(*args, **kwargs)
def delete(self, *args, **kwargs):
# Prevent deletion
pass
@classmethod
def get_settings(cls):
"""Get or create singleton settings"""
obj, created = cls.objects.get_or_create(pk=1)
return obj
def __str__(self):
return f"Bot Settings ({'Enabled' if self.enabled else 'Disabled'})"
class TelegramMessage(models.Model):
"""Store all telegram messages"""
DIRECTION_CHOICES = [
('incoming', 'Incoming'),
('outgoing', 'Outgoing'),
]
direction = models.CharField(
max_length=10,
choices=DIRECTION_CHOICES,
db_index=True
)
# Telegram user info
telegram_user_id = models.BigIntegerField(db_index=True)
telegram_username = models.CharField(
max_length=255,
blank=True,
null=True,
db_index=True
)
telegram_first_name = models.CharField(
max_length=255,
blank=True,
null=True
)
telegram_last_name = models.CharField(
max_length=255,
blank=True,
null=True
)
user_language = models.CharField(
max_length=10,
default='en',
help_text="User's preferred language (en/ru)"
)
# Message info
chat_id = models.BigIntegerField(db_index=True)
message_id = models.BigIntegerField(null=True, blank=True)
message_text = models.TextField(blank=True)
# Additional data
raw_data = models.JSONField(
default=dict,
blank=True,
help_text="Full message data from Telegram"
)
# Timestamps
created_at = models.DateTimeField(auto_now_add=True, db_index=True)
# Optional link to VPN user if identified
linked_user = models.ForeignKey(
User,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name='telegram_messages'
)
class Meta:
verbose_name = "Telegram Message"
verbose_name_plural = "Telegram Messages"
ordering = ['-created_at']
indexes = [
models.Index(fields=['-created_at', 'direction']),
models.Index(fields=['telegram_user_id', '-created_at']),
]
def __str__(self):
username = self.telegram_username or f"ID:{self.telegram_user_id}"
direction_icon = "⬇️" if self.direction == 'incoming' else "⬆️"
text_preview = self.message_text[:50] + "..." if len(self.message_text) > 50 else self.message_text
return f"{direction_icon} {username}: {text_preview}"
@property
def full_name(self):
"""Get full name of telegram user"""
parts = []
if self.telegram_first_name:
parts.append(self.telegram_first_name)
if self.telegram_last_name:
parts.append(self.telegram_last_name)
return " ".join(parts) if parts else f"User {self.telegram_user_id}"
@property
def display_name(self):
"""Get best available display name"""
if self.telegram_username:
return f"@{self.telegram_username}"
return self.full_name
class AccessRequest(models.Model):
"""Access requests from Telegram users"""
# Telegram user information
telegram_user_id = models.BigIntegerField(
db_index=True,
help_text="Telegram user ID who made the request"
)
telegram_username = models.CharField(
max_length=255,
blank=True,
null=True,
help_text="Telegram username (without @)"
)
telegram_first_name = models.CharField(
max_length=255,
blank=True,
null=True,
help_text="First name from Telegram"
)
telegram_last_name = models.CharField(
max_length=255,
blank=True,
null=True,
help_text="Last name from Telegram"
)
# Request details
message_text = models.TextField(
help_text="The message sent by user when requesting access"
)
chat_id = models.BigIntegerField(
help_text="Telegram chat ID for sending notifications"
)
# Username for VPN user creation
desired_username = models.CharField(
max_length=150,
blank=True,
help_text="Desired username for VPN user (defaults to Telegram username)"
)
# User language
user_language = models.CharField(
max_length=10,
default='en',
help_text="User's preferred language (en/ru)"
)
# Status and processing
approved = models.BooleanField(
default=False,
db_index=True,
help_text="Request approved by administrator"
)
admin_comment = models.TextField(
blank=True,
help_text="Admin comment for approval"
)
# Related objects
created_user = models.ForeignKey(
User,
null=True,
blank=True,
on_delete=models.SET_NULL,
help_text="User created from this request (when approved)"
)
processed_by = models.ForeignKey(
User,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name='processed_requests',
help_text="Admin who processed this request"
)
first_message = models.ForeignKey(
TelegramMessage,
null=True,
blank=True,
on_delete=models.SET_NULL,
help_text="First message from this user"
)
# Timestamps
created_at = models.DateTimeField(auto_now_add=True)
processed_at = models.DateTimeField(null=True, blank=True)
class Meta:
verbose_name = "Access Request"
verbose_name_plural = "Access Requests"
ordering = ['-created_at']
indexes = [
models.Index(fields=['telegram_user_id']),
models.Index(fields=['approved', '-created_at']),
models.Index(fields=['-created_at']),
]
constraints = [
models.UniqueConstraint(
fields=['telegram_user_id'],
name='unique_telegram_user_request'
)
]
def __str__(self):
username = self.telegram_username or f"ID:{self.telegram_user_id}"
status = "Approved" if self.approved else "Pending"
return f"Request from @{username} ({status})"
@property
def display_name(self):
"""Get best available display name"""
if self.telegram_username:
return f"@{self.telegram_username}"
name_parts = []
if self.telegram_first_name:
name_parts.append(self.telegram_first_name)
if self.telegram_last_name:
name_parts.append(self.telegram_last_name)
if name_parts:
return " ".join(name_parts)
return f"User {self.telegram_user_id}"
@property
def full_name(self):
"""Get full name of telegram user"""
parts = []
if self.telegram_first_name:
parts.append(self.telegram_first_name)
if self.telegram_last_name:
parts.append(self.telegram_last_name)
return " ".join(parts) if parts else None

3
telegram_bot/tests.py Normal file
View File

@@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

3
telegram_bot/views.py Normal file
View File

@@ -0,0 +1,3 @@
from django.shortcuts import render
# Create your views here.