Fixed sub links generation

This commit is contained in:
AB from home.homenet
2025-08-05 01:40:10 +03:00
parent ea3d74ccbd
commit 1f7953a74c
5 changed files with 119 additions and 53 deletions

View File

@@ -739,38 +739,42 @@ class XrayCoreServer(Server):
if inbound.protocol == 'vless': if inbound.protocol == 'vless':
user_obj = VlessUser(email=client.email, uuid=str(client.uuid)) user_obj = VlessUser(email=client.email, uuid=str(client.uuid))
try: try:
ctx_link = self.client.generate_client_link(inbound.tag, user_obj) # Use protocol library with database parameters
# Replace hostname in the generated link from vpn.xray_api.protocols import VlessProtocol
if ctx_link and '://' in ctx_link: protocol_handler = VlessProtocol(inbound.port, inbound.tag, inbound.listen, inbound.network)
protocol, rest = ctx_link.split('://', 1)
if '@' in rest: # Get encryption setting from inbound
user_part, host_part = rest.split('@', 1) encryption = getattr(inbound, 'vless_encryption', 'none')
if ':' in host_part:
old_host, port_and_params = host_part.split(':', 1) ctx_link = protocol_handler.generate_client_link(
return f"{protocol}://{user_part}@{client_hostname}:{port_and_params}" user_obj,
# Fallback if link parsing fails client_hostname,
return self._generate_fallback_uri(inbound, client, client_hostname, inbound.port) network=inbound.network,
security=inbound.security,
encryption=encryption
)
return ctx_link
except Exception: except Exception:
return self._generate_fallback_uri(inbound, client, client_hostname, inbound.port) return self._generate_fallback_uri(inbound, client, client_hostname, inbound.port)
elif inbound.protocol == 'vmess': elif inbound.protocol == 'vmess':
user_obj = VmessUser(email=client.email, uuid=str(client.uuid), alter_id=client.alter_id or 0) user_obj = VmessUser(email=client.email, uuid=str(client.uuid), alter_id=client.alter_id or 0)
try: try:
ctx_link = self.client.generate_client_link(inbound.tag, user_obj) # Use protocol library with database parameters
# VMess uses base64 encoded JSON, need to decode and fix hostname from vpn.xray_api.protocols import VmessProtocol
if ctx_link and ctx_link.startswith('vmess://'): protocol_handler = VmessProtocol(inbound.port, inbound.tag, inbound.listen, inbound.network)
import base64, json
encoded_part = ctx_link[8:] # Remove vmess:// # Get encryption setting from inbound
try: encryption = getattr(inbound, 'vmess_encryption', 'auto')
decoded = base64.b64decode(encoded_part).decode('utf-8')
config = json.loads(decoded) ctx_link = protocol_handler.generate_client_link(
config['add'] = client_hostname # Fix hostname user_obj,
fixed_config = base64.b64encode(json.dumps(config).encode('utf-8')).decode('utf-8') client_hostname,
return f"vmess://{fixed_config}" network=inbound.network,
except Exception: security=inbound.security,
pass encryption=encryption
# Fallback if link parsing fails )
return self._generate_fallback_uri(inbound, client, client_hostname, inbound.port) return ctx_link
except Exception: except Exception:
return self._generate_fallback_uri(inbound, client, client_hostname, inbound.port) return self._generate_fallback_uri(inbound, client, client_hostname, inbound.port)
@@ -786,17 +790,17 @@ class XrayCoreServer(Server):
user_obj = TrojanUser(email=client.email, password=password) user_obj = TrojanUser(email=client.email, password=password)
try: try:
ctx_link = self.client.generate_client_link(inbound.tag, user_obj) # Use protocol library with database parameters
# Replace hostname in the generated link from vpn.xray_api.protocols import TrojanProtocol
if ctx_link and '://' in ctx_link: protocol_handler = TrojanProtocol(inbound.port, inbound.tag, inbound.listen, inbound.network)
protocol, rest = ctx_link.split('://', 1)
if '@' in rest: ctx_link = protocol_handler.generate_client_link(
password_part, host_part = rest.split('@', 1) user_obj,
if ':' in host_part: client_hostname,
old_host, port_and_params = host_part.split(':', 1) network=inbound.network,
return f"{protocol}://{password_part}@{client_hostname}:{port_and_params}" security=inbound.security
# Fallback if link parsing fails )
return self._generate_fallback_uri(inbound, client, client_hostname, inbound.port) return ctx_link
except Exception: except Exception:
return self._generate_fallback_uri(inbound, client, client_hostname, inbound.port) return self._generate_fallback_uri(inbound, client, client_hostname, inbound.port)
@@ -814,25 +818,45 @@ class XrayCoreServer(Server):
from urllib.parse import urlencode from urllib.parse import urlencode
if inbound.protocol == 'vless': if inbound.protocol == 'vless':
# VLESS format: vless://uuid@host:port?encryption=none&type=tcp#name # VLESS format: vless://uuid@host:port?encryption=none&type=network#name
# Use encryption and network from inbound settings
encryption = getattr(inbound, 'vless_encryption', 'none')
network = inbound.network or 'tcp'
security = inbound.security or 'none'
params = { params = {
'encryption': 'none', 'encryption': encryption,
'type': 'tcp' 'type': network
} }
# Add security if enabled
if security != 'none':
params['security'] = security
query_string = urlencode(params) query_string = urlencode(params)
return f"vless://{client.uuid}@{server_address}:{server_port}?{query_string}#{self.name}" return f"vless://{client.uuid}@{server_address}:{server_port}?{query_string}#{self.name}"
elif inbound.protocol == 'vmess': elif inbound.protocol == 'vmess':
# VMess format: vmess://uuid@host:port?encryption=auto&type=tcp#name # VMess format: vmess://uuid@host:port?encryption=method&type=network#name
# Use encryption and network from inbound settings
encryption = getattr(inbound, 'vmess_encryption', 'auto')
network = inbound.network or 'tcp'
security = inbound.security or 'none'
params = { params = {
'encryption': 'auto', 'encryption': encryption,
'type': 'tcp' 'type': network
} }
# Add security if enabled
if security != 'none':
params['security'] = security
query_string = urlencode(params) query_string = urlencode(params)
return f"vmess://{client.uuid}@{server_address}:{server_port}?{query_string}#{self.name}" return f"vmess://{client.uuid}@{server_address}:{server_port}?{query_string}#{self.name}"
elif inbound.protocol == 'trojan': elif inbound.protocol == 'trojan':
# Trojan format: trojan://password@host:port?type=tcp#name # Trojan format: trojan://password@host:port?type=network#name
password = getattr(client, 'password', None) password = getattr(client, 'password', None)
if not password: if not password:
# Generate and save password if not exists # Generate and save password if not exists
@@ -840,9 +864,17 @@ class XrayCoreServer(Server):
client.password = password client.password = password
client.save(update_fields=['password']) client.save(update_fields=['password'])
network = inbound.network or 'tcp'
security = inbound.security or 'none'
params = { params = {
'type': 'tcp' 'type': network
} }
# Add security if enabled
if security != 'none':
params['security'] = security
query_string = urlencode(params) query_string = urlencode(params)
return f"trojan://{password}@{server_address}:{server_port}?{query_string}#{self.name}" return f"trojan://{password}@{server_address}:{server_port}?{query_string}#{self.name}"

View File

@@ -32,7 +32,7 @@ class BaseProtocol(ABC):
pass pass
@abstractmethod @abstractmethod
def generate_client_link(self, user: User, hostname: str) -> str: def generate_client_link(self, user: User, hostname: str, network: str = None, security: str = None, **kwargs) -> str:
"""Generate client connection link.""" """Generate client connection link."""
pass pass

View File

@@ -63,9 +63,23 @@ class TrojanProtocol(BaseProtocol):
} }
} }
def generate_client_link(self, user: TrojanUser, hostname: str) -> str: def generate_client_link(self, user: TrojanUser, hostname: str, network: str = None, security: str = None, **kwargs) -> str:
"""Generate Trojan client link.""" """Generate Trojan client link."""
return f"trojan://{user.password}@{hostname}:{self.port}#{user.email}" from urllib.parse import urlencode
# Use provided parameters or defaults
network_type = network or self.network
params = {
'type': network_type
}
# Add security if provided
if security and security != 'none':
params['security'] = security
query_string = urlencode(params)
return f"trojan://{user.password}@{hostname}:{self.port}?{query_string}#{user.email}"
def get_client_note(self) -> str: def get_client_note(self) -> str:
"""Get note for client configuration when using self-signed certificates.""" """Get note for client configuration when using self-signed certificates."""

View File

@@ -42,9 +42,25 @@ class VlessProtocol(BaseProtocol):
} }
} }
def generate_client_link(self, user: VlessUser, hostname: str) -> str: def generate_client_link(self, user: VlessUser, hostname: str, network: str = None, security: str = None, **kwargs) -> str:
"""Generate VLESS client link.""" """Generate VLESS client link."""
return f"vless://{user.uuid}@{hostname}:{self.port}?encryption=none&type={self.network}#{user.email}" from urllib.parse import urlencode
# Use provided parameters or defaults
network_type = network or self.network
encryption = kwargs.get('encryption', 'none')
params = {
'encryption': encryption,
'type': network_type
}
# Add security if provided
if security and security != 'none':
params['security'] = security
query_string = urlencode(params)
return f"vless://{user.uuid}@{hostname}:{self.port}?{query_string}#{user.email}"
def _user_to_client(self, user: VlessUser) -> Dict[str, Any]: def _user_to_client(self, user: VlessUser) -> Dict[str, Any]:
"""Convert VlessUser to client configuration.""" """Convert VlessUser to client configuration."""

View File

@@ -43,8 +43,12 @@ class VmessProtocol(BaseProtocol):
} }
} }
def generate_client_link(self, user: VmessUser, hostname: str) -> str: def generate_client_link(self, user: VmessUser, hostname: str, network: str = None, security: str = None, **kwargs) -> str:
"""Generate VMess client link.""" """Generate VMess client link."""
# Use provided parameters or defaults
network_type = network or self.network
encryption = kwargs.get('encryption', 'auto')
config = { config = {
"v": "2", "v": "2",
"ps": user.email, "ps": user.email,
@@ -52,11 +56,11 @@ class VmessProtocol(BaseProtocol):
"port": str(self.port), "port": str(self.port),
"id": user.uuid, "id": user.uuid,
"aid": str(user.alter_id), "aid": str(user.alter_id),
"net": self.network, "net": network_type,
"type": "none", "type": "none",
"host": "", "host": "",
"path": "", "path": "",
"tls": "" "tls": security if security and security != 'none' else ""
} }
config_json = json.dumps(config, separators=(',', ':')) config_json = json.dumps(config, separators=(',', ':'))