Xray init support

This commit is contained in:
AB from home.homenet
2025-08-05 01:23:07 +03:00
parent c5a94d17dc
commit ea3d74ccbd
29 changed files with 4309 additions and 294 deletions

23
vpn/xray_api/__init__.py Normal file
View File

@@ -0,0 +1,23 @@
"""
Xray Manager - Python library for managing Xray proxy server via gRPC API.
Supports VLESS, VMess, and Trojan protocols.
"""
from .client import XrayClient
from .models import User, VlessUser, VmessUser, TrojanUser, Stats
from .exceptions import XrayError, APIError, InboundNotFoundError, UserNotFoundError
__version__ = "1.0.0"
__all__ = [
"XrayClient",
"User",
"VlessUser",
"VmessUser",
"TrojanUser",
"Stats",
"XrayError",
"APIError",
"InboundNotFoundError",
"UserNotFoundError"
]

577
vpn/xray_api/client.py Normal file
View File

@@ -0,0 +1,577 @@
"""
Main Xray client for managing proxy server via gRPC API.
"""
import json
import logging
import subprocess
from typing import Any, Dict, List, Optional
from .exceptions import APIError, InboundNotFoundError, UserNotFoundError
from .models import Stats, TrojanUser, User, VlessUser, VmessUser
from .protocols import TrojanProtocol, VlessProtocol, VmessProtocol
logger = logging.getLogger(__name__)
class XrayClient:
"""Main client for Xray server management."""
def __init__(self, server: str):
"""
Initialize Xray client.
Args:
server: Xray gRPC API server address (host:port)
"""
self.server = server
self.hostname = server.split(':')[0] # Extract hostname for client links
# Protocol handlers
self._protocols = {}
# Inbound management
def add_vless_inbound(self, port: int, users: List[VlessUser], tag: Optional[str] = None,
listen: str = "0.0.0.0", network: str = "tcp") -> None:
"""Add VLESS inbound with users."""
protocol = VlessProtocol(port, tag, listen, network)
self._protocols[protocol.tag] = protocol
config = protocol.create_inbound_config(users)
self._add_inbound(config)
def add_vmess_inbound(self, port: int, users: List[VmessUser], tag: Optional[str] = None,
listen: str = "0.0.0.0", network: str = "tcp") -> None:
"""Add VMess inbound with users."""
protocol = VmessProtocol(port, tag, listen, network)
self._protocols[protocol.tag] = protocol
config = protocol.create_inbound_config(users)
self._add_inbound(config)
def add_trojan_inbound(self, port: int, users: List[TrojanUser], tag: Optional[str] = None,
listen: str = "0.0.0.0", network: str = "tcp",
cert_pem: Optional[str] = None, key_pem: Optional[str] = None,
hostname: Optional[str] = None) -> None:
"""Add Trojan inbound with users and optional custom certificates."""
hostname = hostname or self.hostname
protocol = TrojanProtocol(port, tag, listen, network, cert_pem, key_pem, hostname)
self._protocols[protocol.tag] = protocol
config = protocol.create_inbound_config(users)
self._add_inbound(config)
def remove_inbound(self, protocol_type_or_tag: str) -> None:
"""
Remove inbound by protocol type or tag.
Args:
protocol_type_or_tag: Protocol type ('vless', 'vmess', 'trojan') or specific tag
"""
# Try to find by protocol type first
tag_map = {
'vless': 'vless-inbound',
'vmess': 'vmess-inbound',
'trojan': 'trojan-inbound'
}
tag = tag_map.get(protocol_type_or_tag, protocol_type_or_tag)
config = {"tag": tag}
self._remove_inbound(config)
if tag in self._protocols:
del self._protocols[tag]
def list_inbounds(self) -> List[Dict[str, Any]]:
"""List all inbounds."""
return self._list_inbounds()
# User management
def add_user(self, protocol_type_or_tag: str, user: User) -> None:
"""
Add user to existing inbound by recreating it with updated users.
Args:
protocol_type_or_tag: Protocol type ('vless', 'vmess', 'trojan') or specific tag
user: User object matching the protocol type
"""
tag_map = {
'vless': 'vless-inbound',
'vmess': 'vmess-inbound',
'trojan': 'trojan-inbound'
}
# Check if it's a protocol type or direct tag
tag = tag_map.get(protocol_type_or_tag, protocol_type_or_tag)
# If protocol not registered, we need to get inbound info first
if tag not in self._protocols:
logger.debug(f"Protocol for tag '{tag}' not registered, attempting to retrieve inbound info")
# Try to get inbound info to determine protocol
try:
inbounds = self._list_inbounds()
if isinstance(inbounds, dict) and 'inbounds' in inbounds:
inbound_list = inbounds['inbounds']
else:
inbound_list = inbounds if isinstance(inbounds, list) else []
# Find the inbound by tag
for inbound in inbound_list:
if inbound.get('tag') == tag:
# Determine protocol from proxySettings
proxy_settings = inbound.get('proxySettings', {})
typed_message = proxy_settings.get('_TypedMessage_', '')
if 'vless' in typed_message.lower():
from .protocols import VlessProtocol
port = inbound.get('receiverSettings', {}).get('portList', 443)
listen = inbound.get('receiverSettings', {}).get('listen', '0.0.0.0')
network = inbound.get('receiverSettings', {}).get('streamSettings', {}).get('protocolName', 'tcp')
protocol = VlessProtocol(port, tag, listen, network)
self._protocols[tag] = protocol
elif 'vmess' in typed_message.lower():
from .protocols import VmessProtocol
port = inbound.get('receiverSettings', {}).get('portList', 443)
listen = inbound.get('receiverSettings', {}).get('listen', '0.0.0.0')
network = inbound.get('receiverSettings', {}).get('streamSettings', {}).get('protocolName', 'tcp')
protocol = VmessProtocol(port, tag, listen, network)
self._protocols[tag] = protocol
elif 'trojan' in typed_message.lower():
from .protocols import TrojanProtocol
port = inbound.get('receiverSettings', {}).get('portList', 443)
listen = inbound.get('receiverSettings', {}).get('listen', '0.0.0.0')
network = inbound.get('receiverSettings', {}).get('streamSettings', {}).get('protocolName', 'tcp')
protocol = TrojanProtocol(port, tag, listen, network, hostname=self.hostname)
self._protocols[tag] = protocol
break
except Exception as e:
logger.error(f"Failed to retrieve inbound info for tag '{tag}': {e}")
if tag not in self._protocols:
raise InboundNotFoundError(f"Inbound {protocol_type_or_tag} not found or could not determine protocol")
protocol = self._protocols[tag]
# Use the recreate method since direct API doesn't work reliably
self._recreate_inbound_with_user(protocol, user)
def remove_user(self, protocol_type_or_tag: str, email: str) -> None:
"""
Remove user from inbound by recreating it without the user.
Args:
protocol_type_or_tag: Protocol type ('vless', 'vmess', 'trojan') or specific tag
email: User email to remove
"""
tag_map = {
'vless': 'vless-inbound',
'vmess': 'vmess-inbound',
'trojan': 'trojan-inbound'
}
tag = tag_map.get(protocol_type_or_tag, protocol_type_or_tag)
# Use same logic as add_user to find/register protocol
if tag not in self._protocols:
logger.debug(f"Protocol for tag '{tag}' not registered, attempting to retrieve inbound info")
# Try to get inbound info to determine protocol
try:
inbounds = self._list_inbounds()
if isinstance(inbounds, dict) and 'inbounds' in inbounds:
inbound_list = inbounds['inbounds']
else:
inbound_list = inbounds if isinstance(inbounds, list) else []
# Find the inbound by tag
for inbound in inbound_list:
if inbound.get('tag') == tag:
# Determine protocol from proxySettings
proxy_settings = inbound.get('proxySettings', {})
typed_message = proxy_settings.get('_TypedMessage_', '')
if 'vless' in typed_message.lower():
from .protocols import VlessProtocol
port = inbound.get('receiverSettings', {}).get('portList', 443)
listen = inbound.get('receiverSettings', {}).get('listen', '0.0.0.0')
network = inbound.get('receiverSettings', {}).get('streamSettings', {}).get('protocolName', 'tcp')
protocol = VlessProtocol(port, tag, listen, network)
self._protocols[tag] = protocol
elif 'vmess' in typed_message.lower():
from .protocols import VmessProtocol
port = inbound.get('receiverSettings', {}).get('portList', 443)
listen = inbound.get('receiverSettings', {}).get('listen', '0.0.0.0')
network = inbound.get('receiverSettings', {}).get('streamSettings', {}).get('protocolName', 'tcp')
protocol = VmessProtocol(port, tag, listen, network)
self._protocols[tag] = protocol
elif 'trojan' in typed_message.lower():
from .protocols import TrojanProtocol
port = inbound.get('receiverSettings', {}).get('portList', 443)
listen = inbound.get('receiverSettings', {}).get('listen', '0.0.0.0')
network = inbound.get('receiverSettings', {}).get('streamSettings', {}).get('protocolName', 'tcp')
protocol = TrojanProtocol(port, tag, listen, network, hostname=self.hostname)
self._protocols[tag] = protocol
break
except Exception as e:
logger.error(f"Failed to retrieve inbound info for tag '{tag}': {e}")
if tag not in self._protocols:
raise InboundNotFoundError(f"Inbound {protocol_type_or_tag} not found or could not determine protocol")
protocol = self._protocols[tag]
# Use the recreate method
self._recreate_inbound_without_user(protocol, email)
# Client link generation
def generate_client_link(self, protocol_type_or_tag: str, user: User) -> str:
"""
Generate client connection link.
Args:
protocol_type_or_tag: Protocol type ('vless', 'vmess', 'trojan') or specific tag
user: User object
Returns:
Client connection link (vless://, vmess://, trojan://)
"""
# First try to find by protocol type
tag_map = {
'vless': 'vless-inbound',
'vmess': 'vmess-inbound',
'trojan': 'trojan-inbound'
}
# Check if it's a protocol type or direct tag
tag = tag_map.get(protocol_type_or_tag)
if tag and tag in self._protocols:
protocol = self._protocols[tag]
elif protocol_type_or_tag in self._protocols:
protocol = self._protocols[protocol_type_or_tag]
else:
# Try to find any protocol matching the type
for stored_tag, stored_protocol in self._protocols.items():
if protocol_type_or_tag in ['vless', 'vmess', 'trojan']:
protocol_class_name = f"{protocol_type_or_tag.title()}Protocol"
if stored_protocol.__class__.__name__ == protocol_class_name:
protocol = stored_protocol
break
else:
raise InboundNotFoundError(f"Protocol {protocol_type_or_tag} not configured")
return protocol.generate_client_link(user, self.hostname)
# Statistics
def get_server_stats(self) -> Dict[str, Any]:
"""Get server system statistics."""
return self._get_stats_sys()
def get_user_stats(self, protocol_type: str, email: str) -> Stats:
"""
Get user traffic statistics.
Args:
protocol_type: Protocol type
email: User email
Returns:
Stats object with uplink/downlink data
"""
# Implementation would require stats queries
# This is a placeholder for the interface
return Stats(uplink=0, downlink=0)
# Private API methods
def _add_inbound(self, config: Dict[str, Any]) -> None:
"""Add inbound via API."""
result = self._run_api_command("adi", stdin_data=json.dumps(config))
if "error" in result.get("stderr", "").lower():
raise APIError(f"Failed to add inbound: {result['stderr']}")
def _remove_inbound(self, config: Dict[str, Any]) -> None:
"""Remove inbound via API."""
tag = config.get("tag")
if tag:
# Use tag directly as argument instead of JSON
result = self._run_api_command("rmi", args=[tag])
else:
# Fallback to JSON if no tag
result = self._run_api_command("rmi", stdin_data=json.dumps(config))
if result["returncode"] != 0 and "no inbound to remove" not in result.get("stderr", ""):
raise APIError(f"Failed to remove inbound: {result['stderr']}")
def _list_inbounds(self) -> List[Dict[str, Any]]:
"""List inbounds via API."""
result = self._run_api_command("lsi")
if result["returncode"] != 0:
raise APIError(f"Failed to list inbounds: {result['stderr']}")
try:
return json.loads(result["stdout"])
except json.JSONDecodeError:
raise APIError("Invalid JSON response from API")
def _add_user(self, config: Dict[str, Any]) -> None:
"""Add user via API."""
logger.debug(f"Adding user with config: {json.dumps(config, indent=2)}")
result = self._run_api_command("adu", stdin_data=json.dumps(config))
if result["returncode"] != 0 or "error" in result.get("stderr", "").lower():
logger.error(f"Failed to add user. stdout: {result['stdout']}, stderr: {result['stderr']}")
raise APIError(f"Failed to add user: {result['stderr'] or result['stdout']}")
logger.info(f"User {config.get('email')} added successfully to inbound {config.get('tag')}")
def _remove_user(self, inbound_tag: str, email: str) -> None:
"""Remove user via API."""
result = self._run_api_command("rmu", args=[f"--email={email}", inbound_tag])
if "error" in result.get("stderr", "").lower():
raise APIError(f"Failed to remove user: {result['stderr']}")
def _get_stats_sys(self) -> Dict[str, Any]:
"""Get system stats via API."""
result = self._run_api_command("statssys")
if result["returncode"] != 0:
raise APIError(f"Failed to get stats: {result['stderr']}")
try:
return json.loads(result["stdout"])
except json.JSONDecodeError:
raise APIError("Invalid JSON response from API")
def _build_user_config(self, tag: str, user: User, protocol) -> Dict[str, Any]:
"""
Build user configuration for Xray API.
Args:
tag: Inbound tag
user: User object (VlessUser, VmessUser, or TrojanUser)
protocol: Protocol handler
Returns:
User configuration dict for Xray API
"""
from .models import VlessUser, VmessUser, TrojanUser
base_config = {
"tag": tag,
"email": user.email
}
if isinstance(user, VlessUser):
base_config["account"] = {
"_TypedMessage_": "xray.proxy.vless.Account",
"id": user.uuid
}
elif isinstance(user, VmessUser):
base_config["account"] = {
"_TypedMessage_": "xray.proxy.vmess.Account",
"id": user.uuid,
"alterId": getattr(user, 'alter_id', 0)
}
elif isinstance(user, TrojanUser):
base_config["account"] = {
"_TypedMessage_": "xray.proxy.trojan.Account",
"password": user.password
}
else:
raise ValueError(f"Unsupported user type: {type(user)}")
return base_config
def _recreate_inbound_without_user(self, protocol, email: str) -> None:
"""
Recreate inbound without specified user.
"""
# Get existing users from the inbound
existing_users = self._get_existing_users(protocol.tag)
# Filter out the user to remove
all_users = [user for user in existing_users if user.email != email]
if len(all_users) == len(existing_users):
logger.warning(f"User {email} not found in inbound {protocol.tag}")
return
# Remove existing inbound
try:
self.remove_inbound(protocol.tag)
except Exception as e:
logger.warning(f"Failed to remove inbound {protocol.tag}: {e}")
# Recreate inbound with remaining users
if hasattr(protocol, '__class__') and 'Vless' in protocol.__class__.__name__:
self.add_vless_inbound(
port=protocol.port,
users=all_users,
tag=protocol.tag,
listen=protocol.listen,
network=protocol.network
)
elif hasattr(protocol, '__class__') and 'Vmess' in protocol.__class__.__name__:
self.add_vmess_inbound(
port=protocol.port,
users=all_users,
tag=protocol.tag,
listen=protocol.listen,
network=protocol.network
)
elif hasattr(protocol, '__class__') and 'Trojan' in protocol.__class__.__name__:
self.add_trojan_inbound(
port=protocol.port,
users=all_users,
tag=protocol.tag,
listen=protocol.listen,
network=protocol.network,
hostname=getattr(protocol, 'hostname', 'localhost')
)
def _recreate_inbound_with_user(self, protocol, new_user: User) -> None:
"""
Recreate inbound with existing users plus new user.
This is a workaround since Xray API doesn't support reliable dynamic user addition.
"""
# Get existing users from the inbound
existing_users = self._get_existing_users(protocol.tag)
# Check if user already exists
for existing_user in existing_users:
if existing_user.email == new_user.email:
return # User already exists, no need to recreate
# Add new user to existing users list
all_users = existing_users + [new_user]
# Remove existing inbound
try:
self.remove_inbound(protocol.tag)
except Exception as e:
# If removal fails, log but continue - inbound might not exist
pass
# Recreate inbound with all users
if hasattr(protocol, '__class__') and 'Vless' in protocol.__class__.__name__:
self.add_vless_inbound(
port=protocol.port,
users=all_users,
tag=protocol.tag,
listen=protocol.listen,
network=protocol.network
)
elif hasattr(protocol, '__class__') and 'Vmess' in protocol.__class__.__name__:
self.add_vmess_inbound(
port=protocol.port,
users=all_users,
tag=protocol.tag,
listen=protocol.listen,
network=protocol.network
)
elif hasattr(protocol, '__class__') and 'Trojan' in protocol.__class__.__name__:
self.add_trojan_inbound(
port=protocol.port,
users=all_users,
tag=protocol.tag,
listen=protocol.listen,
network=protocol.network,
hostname=getattr(protocol, 'hostname', 'localhost')
)
def _get_existing_users(self, tag: str) -> List[User]:
"""
Get existing users from an inbound.
"""
from .models import VlessUser, VmessUser, TrojanUser
try:
# Use inbounduser API command to get existing users
result = self._run_api_command("inbounduser", args=[f"-tag={tag}"])
if result["returncode"] != 0:
return [] # No users or inbound doesn't exist
import json
user_data = json.loads(result["stdout"])
users = []
if "users" in user_data:
for user_info in user_data["users"]:
email = user_info.get("email", "")
account = user_info.get("account", {})
# Determine protocol based on account type
account_type = account.get("_TypedMessage_", "")
if "vless" in account_type.lower():
users.append(VlessUser(
email=email,
uuid=account.get("id", "")
))
elif "vmess" in account_type.lower():
users.append(VmessUser(
email=email,
uuid=account.get("id", ""),
alter_id=account.get("alterId", 0)
))
elif "trojan" in account_type.lower():
users.append(TrojanUser(
email=email,
password=account.get("password", "")
))
return users
except Exception as e:
# If we can't get existing users, return empty list
return []
def _run_api_command(self, command: str, args: List[str] = None, stdin_data: str = None) -> Dict[str, Any]:
"""
Run xray api command.
Args:
command: API command (adi, rmi, lsi, etc.)
args: Additional command arguments
stdin_data: Data to pass via stdin
Returns:
Dict with stdout, stderr, returncode
"""
cmd = ["xray", "api", command, f"--server={self.server}"]
if args:
cmd.extend(args)
logger.debug(f"Running command: {' '.join(cmd)}")
if stdin_data:
logger.debug(f"With stdin data: {stdin_data}")
try:
result = subprocess.run(
cmd,
input=stdin_data,
text=True,
capture_output=True,
timeout=30
)
logger.debug(f"Command result - returncode: {result.returncode}, stdout: {result.stdout}, stderr: {result.stderr}")
return {
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode
}
except subprocess.TimeoutExpired:
logger.error(f"API command timeout for: {' '.join(cmd)}")
raise APIError("API command timeout")
except FileNotFoundError:
logger.error("xray command not found in PATH")
raise APIError("xray command not found")
except Exception as e:
logger.error(f"Unexpected error running command: {e}")
raise APIError(f"Failed to run command: {e}")

View File

@@ -0,0 +1,33 @@
"""
Custom exceptions for Xray Manager.
"""
class XrayError(Exception):
"""Base exception for all Xray-related errors."""
pass
class APIError(XrayError):
"""Error occurred during API communication."""
pass
class InboundNotFoundError(XrayError):
"""Inbound with specified tag not found."""
pass
class UserNotFoundError(XrayError):
"""User with specified email not found."""
pass
class ConfigurationError(XrayError):
"""Error in Xray configuration."""
pass
class CertificateError(XrayError):
"""Error related to TLS certificates."""
pass

93
vpn/xray_api/models.py Normal file
View File

@@ -0,0 +1,93 @@
"""
Data models for Xray Manager.
"""
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
from .utils import generate_uuid
@dataclass
class User:
"""Base user model."""
email: str
level: int = 0
def to_dict(self) -> Dict[str, Any]:
"""Convert user to dictionary representation."""
return {
"email": self.email,
"level": self.level
}
@dataclass
class VlessUser(User):
"""VLESS protocol user."""
uuid: str = field(default_factory=generate_uuid)
def to_dict(self) -> Dict[str, Any]:
base = super().to_dict()
base.update({
"id": self.uuid
})
return base
@dataclass
class VmessUser(User):
"""VMess protocol user."""
uuid: str = field(default_factory=generate_uuid)
alter_id: int = 0
def to_dict(self) -> Dict[str, Any]:
base = super().to_dict()
base.update({
"id": self.uuid,
"alterId": self.alter_id
})
return base
@dataclass
class TrojanUser(User):
"""Trojan protocol user."""
password: str = ""
def to_dict(self) -> Dict[str, Any]:
base = super().to_dict()
base.update({
"password": self.password
})
return base
@dataclass
class Inbound:
"""Inbound configuration."""
tag: str
protocol: str
port: int
listen: str = "0.0.0.0"
def to_dict(self) -> Dict[str, Any]:
return {
"tag": self.tag,
"protocol": self.protocol,
"port": self.port,
"listen": self.listen
}
@dataclass
class Stats:
"""Statistics data."""
uplink: int = 0
downlink: int = 0
@property
def total(self) -> int:
"""Total traffic (uplink + downlink)."""
return self.uplink + self.downlink

View File

@@ -0,0 +1,15 @@
"""
Protocol-specific implementations for Xray Manager.
"""
from .base import BaseProtocol
from .vless import VlessProtocol
from .vmess import VmessProtocol
from .trojan import TrojanProtocol
__all__ = [
"BaseProtocol",
"VlessProtocol",
"VmessProtocol",
"TrojanProtocol"
]

View File

@@ -0,0 +1,45 @@
"""
Base protocol implementation.
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from ..models import User
class BaseProtocol(ABC):
"""Base class for all protocol implementations."""
def __init__(self, port: int, tag: Optional[str] = None, listen: str = "0.0.0.0", network: str = "tcp"):
self.port = port
self.tag = tag or self._default_tag()
self.listen = listen
self.network = network
@abstractmethod
def _default_tag(self) -> str:
"""Return default tag for this protocol."""
pass
@abstractmethod
def create_inbound_config(self, users: List[User]) -> Dict[str, Any]:
"""Create inbound configuration for this protocol."""
pass
@abstractmethod
def create_user_config(self, user: User) -> Dict[str, Any]:
"""Create user configuration for adding to existing inbound."""
pass
@abstractmethod
def generate_client_link(self, user: User, hostname: str) -> str:
"""Generate client connection link."""
pass
def _base_inbound_config(self) -> Dict[str, Any]:
"""Common inbound configuration."""
return {
"listen": self.listen,
"port": self.port,
"tag": self.tag
}

View File

@@ -0,0 +1,80 @@
"""
Trojan protocol implementation.
"""
from typing import List, Dict, Any, Optional
from .base import BaseProtocol
from ..models import User, TrojanUser
from ..utils import generate_self_signed_cert, pem_to_lines
from ..exceptions import CertificateError
class TrojanProtocol(BaseProtocol):
"""Trojan protocol handler."""
def __init__(self, port: int, tag: Optional[str] = None, listen: str = "0.0.0.0",
network: str = "tcp", cert_pem: Optional[str] = None,
key_pem: Optional[str] = None, hostname: str = "localhost"):
super().__init__(port, tag, listen, network)
self.hostname = hostname
if cert_pem and key_pem:
self.cert_pem = cert_pem
self.key_pem = key_pem
else:
# Generate self-signed certificate
self.cert_pem, self.key_pem = generate_self_signed_cert(hostname)
def _default_tag(self) -> str:
return "trojan-inbound"
def create_inbound_config(self, users: List[TrojanUser]) -> Dict[str, Any]:
"""Create Trojan inbound configuration."""
config = self._base_inbound_config()
config.update({
"protocol": "trojan",
"settings": {
"_TypedMessage_": "xray.proxy.trojan.Config",
"clients": [self._user_to_client(user) for user in users],
"fallbacks": [{"dest": 80}]
},
"streamSettings": {
"network": self.network,
"security": "tls",
"tlsSettings": {
"alpn": ["http/1.1"],
"certificates": [{
"certificate": pem_to_lines(self.cert_pem),
"key": pem_to_lines(self.key_pem),
"usage": "encipherment"
}]
}
}
})
return {"inbounds": [config]}
def create_user_config(self, user: TrojanUser) -> Dict[str, Any]:
"""Create user configuration for Trojan."""
return {
"inboundTag": self.tag,
"proxySettings": {
"_TypedMessage_": "xray.proxy.trojan.Config",
"clients": [self._user_to_client(user)]
}
}
def generate_client_link(self, user: TrojanUser, hostname: str) -> str:
"""Generate Trojan client link."""
return f"trojan://{user.password}@{hostname}:{self.port}#{user.email}"
def get_client_note(self) -> str:
"""Get note for client configuration when using self-signed certificates."""
return "Add 'allowInsecure: true' to TLS settings for self-signed certificates"
def _user_to_client(self, user: TrojanUser) -> Dict[str, Any]:
"""Convert TrojanUser to client configuration."""
return {
"password": user.password,
"level": user.level,
"email": user.email
}

View File

@@ -0,0 +1,55 @@
"""
VLESS protocol implementation.
"""
from typing import List, Dict, Any, Optional
from .base import BaseProtocol
from ..models import User, VlessUser
class VlessProtocol(BaseProtocol):
"""VLESS protocol handler."""
def __init__(self, port: int, tag: Optional[str] = None, listen: str = "0.0.0.0", network: str = "tcp"):
super().__init__(port, tag, listen, network)
def _default_tag(self) -> str:
return "vless-inbound"
def create_inbound_config(self, users: List[VlessUser]) -> Dict[str, Any]:
"""Create VLESS inbound configuration."""
config = self._base_inbound_config()
config.update({
"protocol": "vless",
"settings": {
"_TypedMessage_": "xray.proxy.vless.inbound.Config",
"clients": [self._user_to_client(user) for user in users],
"decryption": "none"
},
"streamSettings": {
"network": self.network
}
})
return {"inbounds": [config]}
def create_user_config(self, user: VlessUser) -> Dict[str, Any]:
"""Create user configuration for VLESS."""
return {
"inboundTag": self.tag,
"proxySettings": {
"_TypedMessage_": "xray.proxy.vless.inbound.Config",
"clients": [self._user_to_client(user)]
}
}
def generate_client_link(self, user: VlessUser, hostname: str) -> str:
"""Generate VLESS client link."""
return f"vless://{user.uuid}@{hostname}:{self.port}?encryption=none&type={self.network}#{user.email}"
def _user_to_client(self, user: VlessUser) -> Dict[str, Any]:
"""Convert VlessUser to client configuration."""
return {
"id": user.uuid,
"level": user.level,
"email": user.email
}

View File

@@ -0,0 +1,73 @@
"""
VMess protocol implementation.
"""
import json
import base64
from typing import List, Dict, Any, Optional
from .base import BaseProtocol
from ..models import User, VmessUser
class VmessProtocol(BaseProtocol):
"""VMess protocol handler."""
def __init__(self, port: int, tag: Optional[str] = None, listen: str = "0.0.0.0", network: str = "tcp"):
super().__init__(port, tag, listen, network)
def _default_tag(self) -> str:
return "vmess-inbound"
def create_inbound_config(self, users: List[VmessUser]) -> Dict[str, Any]:
"""Create VMess inbound configuration."""
config = self._base_inbound_config()
config.update({
"protocol": "vmess",
"settings": {
"_TypedMessage_": "xray.proxy.vmess.inbound.Config",
"clients": [self._user_to_client(user) for user in users]
},
"streamSettings": {
"network": self.network
}
})
return {"inbounds": [config]}
def create_user_config(self, user: VmessUser) -> Dict[str, Any]:
"""Create user configuration for VMess."""
return {
"inboundTag": self.tag,
"proxySettings": {
"_TypedMessage_": "xray.proxy.vmess.inbound.Config",
"clients": [self._user_to_client(user)]
}
}
def generate_client_link(self, user: VmessUser, hostname: str) -> str:
"""Generate VMess client link."""
config = {
"v": "2",
"ps": user.email,
"add": hostname,
"port": str(self.port),
"id": user.uuid,
"aid": str(user.alter_id),
"net": self.network,
"type": "none",
"host": "",
"path": "",
"tls": ""
}
config_json = json.dumps(config, separators=(',', ':'))
encoded = base64.b64encode(config_json.encode()).decode()
return f"vmess://{encoded}"
def _user_to_client(self, user: VmessUser) -> Dict[str, Any]:
"""Convert VmessUser to client configuration."""
return {
"id": user.uuid,
"alterId": user.alter_id,
"level": user.level,
"email": user.email
}

77
vpn/xray_api/utils.py Normal file
View File

@@ -0,0 +1,77 @@
"""
Utility functions for Xray Manager.
"""
import uuid
import base64
import secrets
from typing import List
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
import datetime
def generate_uuid() -> str:
"""Generate a random UUID for VLESS/VMess users."""
return str(uuid.uuid4())
def generate_self_signed_cert(hostname: str = "localhost") -> tuple[str, str]:
"""
Generate self-signed certificate for Trojan.
Args:
hostname: Common name for certificate
Returns:
Tuple of (certificate_pem, private_key_pem)
"""
# Generate private key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
# Create certificate
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(NameOID.COMMON_NAME, hostname),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=365)
).add_extension(
x509.SubjectAlternativeName([
x509.DNSName(hostname),
]),
critical=False,
).sign(private_key, hashes.SHA256())
# Convert to PEM format
cert_pem = cert.public_bytes(serialization.Encoding.PEM)
key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
return cert_pem.decode(), key_pem.decode()
def pem_to_lines(pem_data: str) -> List[str]:
"""Convert PEM data to list of lines for Xray JSON format."""
return pem_data.strip().split('\n')