Added usermanagement in TG admin

This commit is contained in:
AB from home.homenet
2025-10-24 18:11:34 +03:00
parent c6892b1a73
commit 78bf75b24e
89 changed files with 4389 additions and 2419 deletions

View File

@@ -1,12 +1,12 @@
use anyhow::{Result, anyhow};
use anyhow::{anyhow, Result};
use serde_json::Value;
use xray_core::Client;
use std::sync::Arc;
use tokio::time::{timeout, Duration};
use xray_core::Client;
// Import submodules from the same directory
use super::stats::StatsClient;
use super::inbounds::InboundClient;
use super::stats::StatsClient;
use super::users::UserClient;
/// Xray gRPC client wrapper
@@ -22,20 +22,17 @@ impl XrayClient {
pub async fn connect(endpoint: &str) -> Result<Self> {
// Apply a 5-second timeout to the connection attempt
let connect_future = Client::from_url(endpoint);
match timeout(Duration::from_secs(5), connect_future).await {
Ok(Ok(client)) => {
Ok(Self {
endpoint: endpoint.to_string(),
client: Arc::new(client),
})
},
Ok(Err(e)) => {
Err(anyhow!("Failed to connect to Xray at {}: {}", endpoint, e))
},
Err(_) => {
Err(anyhow!("Connection to Xray at {} timed out after 5 seconds", endpoint))
}
Ok(Ok(client)) => Ok(Self {
endpoint: endpoint.to_string(),
client: Arc::new(client),
}),
Ok(Err(e)) => Err(anyhow!("Failed to connect to Xray at {}: {}", endpoint, e)),
Err(_) => Err(anyhow!(
"Connection to Xray at {} timed out after 5 seconds",
endpoint
)),
}
}
@@ -52,7 +49,10 @@ impl XrayClient {
}
/// Restart Xray with new configuration
pub async fn restart_with_config(&self, config: &crate::services::xray::XrayConfig) -> Result<()> {
pub async fn restart_with_config(
&self,
config: &crate::services::xray::XrayConfig,
) -> Result<()> {
let inbound_client = InboundClient::new(self.endpoint.clone(), &*self.client);
inbound_client.restart_with_config(config).await
}
@@ -64,15 +64,30 @@ impl XrayClient {
}
/// Add inbound configuration with TLS certificate
pub async fn add_inbound_with_certificate(&self, inbound: &Value, cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> {
pub async fn add_inbound_with_certificate(
&self,
inbound: &Value,
cert_pem: Option<&str>,
key_pem: Option<&str>,
) -> Result<()> {
let inbound_client = InboundClient::new(self.endpoint.clone(), &*self.client);
inbound_client.add_inbound_with_certificate(inbound, None, cert_pem, key_pem).await
inbound_client
.add_inbound_with_certificate(inbound, None, cert_pem, key_pem)
.await
}
/// Add inbound configuration with users and TLS certificate
pub async fn add_inbound_with_users_and_certificate(&self, inbound: &Value, users: &[Value], cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> {
pub async fn add_inbound_with_users_and_certificate(
&self,
inbound: &Value,
users: &[Value],
cert_pem: Option<&str>,
key_pem: Option<&str>,
) -> Result<()> {
let inbound_client = InboundClient::new(self.endpoint.clone(), &*self.client);
inbound_client.add_inbound_with_certificate(inbound, Some(users), cert_pem, key_pem).await
inbound_client
.add_inbound_with_certificate(inbound, Some(users), cert_pem, key_pem)
.await
}
/// Remove inbound by tag
@@ -97,4 +112,4 @@ impl XrayClient {
pub fn endpoint(&self) -> &str {
&self.endpoint
}
}
}

View File

@@ -171,25 +171,26 @@ impl XrayConfig {
dns: None,
routing: Some(RoutingConfig {
domain_strategy: Some("IPIfNonMatch".to_string()),
rules: vec![
RoutingRule {
rule_type: "field".to_string(),
domain: None,
ip: Some(vec!["geoip:private".to_string()]),
port: None,
outbound_tag: "direct".to_string(),
}
],
rules: vec![RoutingRule {
rule_type: "field".to_string(),
domain: None,
ip: Some(vec!["geoip:private".to_string()]),
port: None,
outbound_tag: "direct".to_string(),
}],
}),
policy: Some(PolicyConfig {
levels: {
let mut levels = HashMap::new();
levels.insert("0".to_string(), PolicyLevel {
handshake_timeout: Some(4),
conn_idle: Some(300),
uplink_only: Some(2),
downlink_only: Some(5),
});
levels.insert(
"0".to_string(),
PolicyLevel {
handshake_timeout: Some(4),
conn_idle: Some(300),
uplink_only: Some(2),
downlink_only: Some(5),
},
);
levels
},
system: Some(SystemPolicy {
@@ -282,4 +283,4 @@ impl Default for XrayConfig {
fn default() -> Self {
Self::new()
}
}
}

View File

@@ -1,42 +1,44 @@
use anyhow::{Result, anyhow};
use anyhow::{anyhow, Result};
use prost::Message;
use serde_json::Value;
use uuid;
use xray_core::{
tonic::Request,
app::proxyman::command::{AddInboundRequest, RemoveInboundRequest},
core::InboundHandlerConfig,
common::serial::TypedMessage,
common::protocol::User,
app::proxyman::ReceiverConfig,
common::net::{PortList, PortRange, IpOrDomain, ip_or_domain::Address, Network},
transport::internet::StreamConfig,
transport::internet::tls::{Config as TlsConfig, Certificate as TlsCertificate},
common::net::{ip_or_domain::Address, IpOrDomain, Network, PortList, PortRange},
common::protocol::User,
common::serial::TypedMessage,
core::InboundHandlerConfig,
prost_types,
proxy::shadowsocks::ServerConfig as ShadowsocksServerConfig,
proxy::shadowsocks::{Account as ShadowsocksAccount, CipherType},
proxy::trojan::Account as TrojanAccount,
proxy::trojan::ServerConfig as TrojanServerConfig,
proxy::vless::inbound::Config as VlessInboundConfig,
proxy::vless::Account as VlessAccount,
proxy::vmess::inbound::Config as VmessInboundConfig,
proxy::vmess::Account as VmessAccount,
proxy::trojan::ServerConfig as TrojanServerConfig,
proxy::trojan::Account as TrojanAccount,
proxy::shadowsocks::ServerConfig as ShadowsocksServerConfig,
proxy::shadowsocks::{Account as ShadowsocksAccount, CipherType},
tonic::Request,
transport::internet::tls::{Certificate as TlsCertificate, Config as TlsConfig},
transport::internet::StreamConfig,
Client,
prost_types,
};
use prost::Message;
/// Convert PEM format to DER (x509) format
fn pem_to_der(pem_data: &str) -> Result<Vec<u8>> {
// Remove PEM headers and whitespace, then decode base64
let base64_data: String = pem_data.lines()
let base64_data: String = pem_data
.lines()
.filter(|line| !line.starts_with("-----") && !line.trim().is_empty())
.map(|line| line.trim())
.collect::<Vec<&str>>()
.join("");
tracing::debug!("PEM to DER conversion: {} bytes", base64_data.len());
use base64::{Engine as _, engine::general_purpose};
general_purpose::STANDARD.decode(&base64_data)
use base64::{engine::general_purpose, Engine as _};
general_purpose::STANDARD
.decode(&base64_data)
.map_err(|e| anyhow!("Failed to decode base64 PEM data: {}", e))
}
@@ -52,22 +54,32 @@ impl<'a> InboundClient<'a> {
/// Add inbound configuration
pub async fn add_inbound(&self, inbound: &Value) -> Result<()> {
self.add_inbound_with_certificate(inbound, None, None, None).await
self.add_inbound_with_certificate(inbound, None, None, None)
.await
}
/// Add inbound configuration with TLS certificate and users
pub async fn add_inbound_with_certificate(&self, inbound: &Value, users: Option<&[Value]>, cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> {
pub async fn add_inbound_with_certificate(
&self,
inbound: &Value,
users: Option<&[Value]>,
cert_pem: Option<&str>,
key_pem: Option<&str>,
) -> Result<()> {
let tag = inbound["tag"].as_str().unwrap_or("").to_string();
let port = inbound["port"].as_u64().unwrap_or(8080) as u32;
let protocol = inbound["protocol"].as_str().unwrap_or("vless");
let _user_count = users.map_or(0, |u| u.len());
tracing::info!(
"Adding inbound '{}' with protocol={}, port={}, has_cert={}, has_key={}",
tag, protocol, port, cert_pem.is_some(), key_pem.is_some()
"Adding inbound '{}' with protocol={}, port={}, has_cert={}, has_key={}",
tag,
protocol,
port,
cert_pem.is_some(),
key_pem.is_some()
);
// Create receiver configuration (port binding) - use simple port number
let port_list = PortList {
range: vec![PortRange {
@@ -80,39 +92,42 @@ impl<'a> InboundClient<'a> {
let stream_settings = if cert_pem.is_some() && key_pem.is_some() {
let cert_pem = cert_pem.unwrap();
let key_pem = key_pem.unwrap();
// Create TLS certificate exactly like working example - PEM content as bytes
let tls_cert = TlsCertificate {
certificate: cert_pem.as_bytes().to_vec(), // PEM content as bytes like working example
key: key_pem.as_bytes().to_vec(), // PEM content as bytes like working example
usage: 0,
ocsp_stapling: 3600, // From working example
ocsp_stapling: 3600, // From working example
one_time_loading: true, // From working example
build_chain: false,
certificate_path: "".to_string(), // Empty paths since we use content
key_path: "".to_string(), // Empty paths since we use content
key_path: "".to_string(), // Empty paths since we use content
};
// Create TLS config with proper fields like working example
let mut tls_config = TlsConfig::default();
tls_config.certificate = vec![tls_cert];
tls_config.next_protocol = vec!["h2".to_string(), "http/1.1".to_string()]; // From working example
tls_config.server_name = "localhost".to_string(); // From working example
tls_config.min_version = "1.2".to_string(); // From Marzban examples
// Create TypedMessage for TLS config
let tls_message = TypedMessage {
r#type: "xray.transport.internet.tls.Config".to_string(),
value: tls_config.encode_to_vec(),
};
tracing::debug!("TLS config: server_name={}, protocols={:?}",
tls_config.server_name, tls_config.next_protocol);
tracing::debug!(
"TLS config: server_name={}, protocols={:?}",
tls_config.server_name,
tls_config.next_protocol
);
// Create StreamConfig like working example
Some(StreamConfig {
address: None, // No address in streamSettings according to working example
port: 0, // No port in working example streamSettings
port: 0, // No port in working example streamSettings
protocol_name: "tcp".to_string(),
transport_settings: vec![],
security_type: "xray.transport.internet.tls.Config".to_string(), // Full type like working example
@@ -125,8 +140,8 @@ impl<'a> InboundClient<'a> {
let receiver_config = ReceiverConfig {
port_list: Some(port_list),
listen: Some(IpOrDomain {
address: Some(Address::Ip(vec![0, 0, 0, 0])) // "0.0.0.0" as IPv4 bytes
listen: Some(IpOrDomain {
address: Some(Address::Ip(vec![0, 0, 0, 0])), // "0.0.0.0" as IPv4 bytes
}),
allocation_strategy: None,
stream_settings: stream_settings,
@@ -138,7 +153,7 @@ impl<'a> InboundClient<'a> {
r#type: "xray.app.proxyman.ReceiverConfig".to_string(),
value: receiver_config.encode_to_vec(),
};
// Create proxy configuration based on protocol with users
let proxy_message = match protocol {
"vless" => {
@@ -148,7 +163,7 @@ impl<'a> InboundClient<'a> {
let user_id = user["id"].as_str().unwrap_or("").to_string();
let email = user["email"].as_str().unwrap_or("").to_string();
let level = user["level"].as_u64().unwrap_or(0) as u32;
if !user_id.is_empty() && !email.is_empty() {
let account = VlessAccount {
id: user_id,
@@ -166,7 +181,7 @@ impl<'a> InboundClient<'a> {
}
}
}
let vless_config = VlessInboundConfig {
clients,
decryption: "none".to_string(),
@@ -176,7 +191,7 @@ impl<'a> InboundClient<'a> {
r#type: "xray.proxy.vless.inbound.Config".to_string(),
value: vless_config.encode_to_vec(),
}
},
}
"vmess" => {
let mut vmess_users = vec![];
if let Some(users) = users {
@@ -184,18 +199,18 @@ impl<'a> InboundClient<'a> {
let user_id = user["id"].as_str().unwrap_or("").to_string();
let email = user["email"].as_str().unwrap_or("").to_string();
let level = user["level"].as_u64().unwrap_or(0) as u32;
// Validate required fields
if user_id.is_empty() || email.is_empty() {
tracing::warn!("Skipping VMess user: missing id or email");
continue;
}
// Validate UUID format
if uuid::Uuid::parse_str(&user_id).is_err() {
tracing::warn!("VMess user '{}' has invalid UUID format", user_id);
}
if !user_id.is_empty() && !email.is_empty() {
let account = VmessAccount {
id: user_id.clone(),
@@ -203,7 +218,7 @@ impl<'a> InboundClient<'a> {
tests_enabled: "".to_string(), // Keep empty as in examples
};
let account_bytes = account.encode_to_vec();
vmess_users.push(User {
email: email.clone(),
level,
@@ -215,7 +230,7 @@ impl<'a> InboundClient<'a> {
}
}
}
let vmess_config = VmessInboundConfig {
user: vmess_users,
default: None,
@@ -225,19 +240,21 @@ impl<'a> InboundClient<'a> {
r#type: "xray.proxy.vmess.inbound.Config".to_string(),
value: vmess_config.encode_to_vec(),
}
},
}
"trojan" => {
let mut trojan_users = vec![];
if let Some(users) = users {
for user in users {
let password = user["password"].as_str().or_else(|| user["id"].as_str()).unwrap_or("").to_string();
let password = user["password"]
.as_str()
.or_else(|| user["id"].as_str())
.unwrap_or("")
.to_string();
let email = user["email"].as_str().unwrap_or("").to_string();
let level = user["level"].as_u64().unwrap_or(0) as u32;
if !password.is_empty() && !email.is_empty() {
let account = TrojanAccount {
password,
};
let account = TrojanAccount { password };
trojan_users.push(User {
email,
level,
@@ -249,7 +266,7 @@ impl<'a> InboundClient<'a> {
}
}
}
let trojan_config = TrojanServerConfig {
users: trojan_users,
fallbacks: vec![],
@@ -258,21 +275,24 @@ impl<'a> InboundClient<'a> {
r#type: "xray.proxy.trojan.ServerConfig".to_string(),
value: trojan_config.encode_to_vec(),
}
},
}
"shadowsocks" => {
let mut ss_users = vec![];
if let Some(users) = users {
for user in users {
let password = user["password"].as_str().or_else(|| user["id"].as_str()).unwrap_or("").to_string();
let password = user["password"]
.as_str()
.or_else(|| user["id"].as_str())
.unwrap_or("")
.to_string();
let email = user["email"].as_str().unwrap_or("").to_string();
let level = user["level"].as_u64().unwrap_or(0) as u32;
if !password.is_empty() && !email.is_empty() {
let account = ShadowsocksAccount {
password,
cipher_type: CipherType::Aes256Gcm as i32, // Use AES-256-GCM cipher
iv_check: false, // Default IV check
iv_check: false, // Default IV check
};
ss_users.push(User {
email: email.clone(),
@@ -285,7 +305,7 @@ impl<'a> InboundClient<'a> {
}
}
}
let shadowsocks_config = ShadowsocksServerConfig {
users: ss_users,
network: vec![Network::Tcp as i32, Network::Udp as i32], // Support TCP and UDP
@@ -294,7 +314,7 @@ impl<'a> InboundClient<'a> {
r#type: "xray.proxy.shadowsocks.ServerConfig".to_string(),
value: shadowsocks_config.encode_to_vec(),
}
},
}
_ => {
return Err(anyhow!("Unsupported protocol: {}", protocol));
}
@@ -328,12 +348,12 @@ impl<'a> InboundClient<'a> {
let request = Request::new(RemoveInboundRequest {
tag: tag.to_string(),
});
match handler_client.remove_inbound(request).await {
Ok(_) => {
tracing::info!("Removed inbound '{}' from {}", tag, self.endpoint);
Ok(())
},
}
Err(e) => {
tracing::error!("Failed to remove inbound '{}': {}", tag, e);
Err(anyhow!("Failed to remove inbound: {}", e))
@@ -342,11 +362,17 @@ impl<'a> InboundClient<'a> {
}
/// Restart Xray with new configuration
pub async fn restart_with_config(&self, config: &crate::services::xray::XrayConfig) -> Result<()> {
tracing::debug!("Restarting Xray server at {} with new config", self.endpoint);
pub async fn restart_with_config(
&self,
config: &crate::services::xray::XrayConfig,
) -> Result<()> {
tracing::debug!(
"Restarting Xray server at {} with new config",
self.endpoint
);
// TODO: Implement restart with config using xray-core
// For now just return success
Ok(())
}
}
}

View File

@@ -1,16 +1,16 @@
use anyhow::Result;
use serde_json::Value;
use uuid::Uuid;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::time::{Duration, Instant, timeout};
use tokio::time::{timeout, Duration, Instant};
use tracing::{error, warn};
use uuid::Uuid;
pub mod client;
pub mod config;
pub mod stats;
pub mod inbounds;
pub mod stats;
pub mod users;
pub use client::XrayClient;
@@ -30,7 +30,7 @@ impl CachedConnection {
created_at: Instant::now(),
}
}
fn is_expired(&self, ttl: Duration) -> bool {
self.created_at.elapsed() > ttl
}
@@ -51,7 +51,7 @@ impl XrayService {
connection_ttl: Duration::from_secs(300), // 5 minutes TTL
}
}
/// Get or create cached client for endpoint
async fn get_or_create_client(&self, endpoint: &str) -> Result<XrayClient> {
// Check cache first
@@ -63,21 +63,20 @@ impl XrayService {
}
}
}
// Create new connection
let client = XrayClient::connect(endpoint).await?;
let cached_connection = CachedConnection::new(client.clone());
// Update cache
{
let mut cache = self.connection_cache.write().await;
cache.insert(endpoint.to_string(), cached_connection);
}
Ok(client)
}
/// Test connection to Xray server with timeout
pub async fn test_connection(&self, _server_id: Uuid, endpoint: &str) -> Result<bool> {
// Apply a 3-second timeout to the entire test operation
@@ -85,12 +84,12 @@ impl XrayService {
Ok(Ok(_client)) => {
// Connection successful
Ok(true)
},
}
Ok(Err(e)) => {
// Connection failed with error
warn!("Failed to connect to Xray at {}: {}", endpoint, e);
Ok(false)
},
}
Err(_) => {
// Operation timed out
warn!("Connection test to Xray at {} timed out", endpoint);
@@ -100,7 +99,12 @@ impl XrayService {
}
/// Apply full configuration to Xray server
pub async fn apply_config(&self, _server_id: Uuid, endpoint: &str, config: &XrayConfig) -> Result<()> {
pub async fn apply_config(
&self,
_server_id: Uuid,
endpoint: &str,
config: &XrayConfig,
) -> Result<()> {
let client = self.get_or_create_client(endpoint).await?;
client.restart_with_config(config).await
}
@@ -124,8 +128,9 @@ impl XrayService {
"settings": base_settings,
"streamSettings": stream_settings
});
self.add_inbound(_server_id, endpoint, &inbound_config).await
self.add_inbound(_server_id, endpoint, &inbound_config)
.await
}
/// Create inbound from template with TLS certificate
@@ -149,26 +154,51 @@ impl XrayService {
"settings": base_settings,
"streamSettings": stream_settings
});
self.add_inbound_with_certificate(_server_id, endpoint, &inbound_config, cert_pem, key_pem).await
self.add_inbound_with_certificate(_server_id, endpoint, &inbound_config, cert_pem, key_pem)
.await
}
/// Add inbound to running Xray instance
pub async fn add_inbound(&self, _server_id: Uuid, endpoint: &str, inbound: &Value) -> Result<()> {
pub async fn add_inbound(
&self,
_server_id: Uuid,
endpoint: &str,
inbound: &Value,
) -> Result<()> {
let client = self.get_or_create_client(endpoint).await?;
client.add_inbound(inbound).await
}
/// Add inbound with certificate to running Xray instance
pub async fn add_inbound_with_certificate(&self, _server_id: Uuid, endpoint: &str, inbound: &Value, cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> {
pub async fn add_inbound_with_certificate(
&self,
_server_id: Uuid,
endpoint: &str,
inbound: &Value,
cert_pem: Option<&str>,
key_pem: Option<&str>,
) -> Result<()> {
let client = self.get_or_create_client(endpoint).await?;
client.add_inbound_with_certificate(inbound, cert_pem, key_pem).await
client
.add_inbound_with_certificate(inbound, cert_pem, key_pem)
.await
}
/// Add inbound with users and certificate to running Xray instance
pub async fn add_inbound_with_users_and_certificate(&self, _server_id: Uuid, endpoint: &str, inbound: &Value, users: &[Value], cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> {
pub async fn add_inbound_with_users_and_certificate(
&self,
_server_id: Uuid,
endpoint: &str,
inbound: &Value,
users: &[Value],
cert_pem: Option<&str>,
key_pem: Option<&str>,
) -> Result<()> {
let client = self.get_or_create_client(endpoint).await?;
client.add_inbound_with_users_and_certificate(inbound, users, cert_pem, key_pem).await
client
.add_inbound_with_users_and_certificate(inbound, users, cert_pem, key_pem)
.await
}
/// Remove inbound from running Xray instance
@@ -178,15 +208,20 @@ impl XrayService {
}
/// Add user to inbound by recreating the inbound with updated user list
pub async fn add_user(&self, _server_id: Uuid, endpoint: &str, inbound_tag: &str, user: &Value) -> Result<()> {
pub async fn add_user(
&self,
_server_id: Uuid,
endpoint: &str,
inbound_tag: &str,
user: &Value,
) -> Result<()> {
// TODO: Implement inbound recreation approach:
// 1. Get current inbound configuration from database
// 2. Get existing users from database
// 2. Get existing users from database
// 3. Remove old inbound from xray
// 4. Create new inbound with all users (existing + new)
// For now, return error to indicate this needs to be implemented
Err(anyhow::anyhow!("User addition requires inbound recreation - not yet implemented. Use web interface to recreate inbound with users."))
}
@@ -204,7 +239,6 @@ impl XrayService {
cert_pem: Option<&str>,
key_pem: Option<&str>,
) -> Result<()> {
// Build inbound configuration with users
let mut inbound_config = serde_json::json!({
"tag": tag,
@@ -213,37 +247,53 @@ impl XrayService {
"settings": base_settings,
"streamSettings": stream_settings
});
// Add users to settings based on protocol
if !users.is_empty() {
let mut settings = inbound_config["settings"].clone();
match protocol {
"vless" | "vmess" => {
settings["clients"] = serde_json::Value::Array(users.to_vec());
},
}
"trojan" => {
settings["clients"] = serde_json::Value::Array(users.to_vec());
},
}
"shadowsocks" => {
// For shadowsocks, users are handled differently
if let Some(user) = users.first() {
settings["password"] = user["password"].clone();
}
},
}
_ => {
return Err(anyhow::anyhow!("Unsupported protocol for users: {}", protocol));
return Err(anyhow::anyhow!(
"Unsupported protocol for users: {}",
protocol
));
}
}
inbound_config["settings"] = settings;
}
// Use the new method with users support
self.add_inbound_with_users_and_certificate(_server_id, endpoint, &inbound_config, users, cert_pem, key_pem).await
self.add_inbound_with_users_and_certificate(
_server_id,
endpoint,
&inbound_config,
users,
cert_pem,
key_pem,
)
.await
}
/// Remove user from inbound
pub async fn remove_user(&self, _server_id: Uuid, endpoint: &str, inbound_tag: &str, email: &str) -> Result<()> {
pub async fn remove_user(
&self,
_server_id: Uuid,
endpoint: &str,
inbound_tag: &str,
email: &str,
) -> Result<()> {
let client = self.get_or_create_client(endpoint).await?;
client.remove_user(inbound_tag, email).await
}
@@ -255,11 +305,17 @@ impl XrayService {
}
/// Query specific statistics
pub async fn query_stats(&self, _server_id: Uuid, endpoint: &str, pattern: &str, reset: bool) -> Result<Value> {
pub async fn query_stats(
&self,
_server_id: Uuid,
endpoint: &str,
pattern: &str,
reset: bool,
) -> Result<Value> {
let client = self.get_or_create_client(endpoint).await?;
client.query_stats(pattern, reset).await
}
/// Sync entire server with batch operations using single client
pub async fn sync_server_inbounds_optimized(
&self,
@@ -269,21 +325,25 @@ impl XrayService {
) -> Result<()> {
// Get single client for all operations
let client = self.get_or_create_client(endpoint).await?;
// Perform all operations with the same client
for (tag, desired) in desired_inbounds {
// Always try to remove inbound first (ignore errors if it doesn't exist)
let _ = client.remove_inbound(tag).await;
// Create inbound with users
let users_json: Vec<Value> = desired.users.iter().map(|user| {
serde_json::json!({
"id": user.id,
"email": user.email,
"level": user.level
let users_json: Vec<Value> = desired
.users
.iter()
.map(|user| {
serde_json::json!({
"id": user.id,
"email": user.email,
"level": user.level
})
})
}).collect();
.collect();
// Build inbound config
let inbound_config = serde_json::json!({
"tag": desired.tag,
@@ -292,20 +352,23 @@ impl XrayService {
"settings": desired.settings,
"streamSettings": desired.stream_settings
});
match client.add_inbound_with_users_and_certificate(
&inbound_config,
&users_json,
desired.cert_pem.as_deref(),
desired.key_pem.as_deref(),
).await {
match client
.add_inbound_with_users_and_certificate(
&inbound_config,
&users_json,
desired.cert_pem.as_deref(),
desired.key_pem.as_deref(),
)
.await
{
Err(e) => {
error!("Failed to create inbound {}: {}", tag, e);
}
_ => {}
}
}
Ok(())
}
}
@@ -314,4 +377,4 @@ impl Default for XrayService {
fn default() -> Self {
Self::new()
}
}
}

View File

@@ -1,8 +1,8 @@
use anyhow::{Result, anyhow};
use anyhow::{anyhow, Result};
use serde_json::Value;
use xray_core::{
tonic::Request,
app::stats::command::{GetStatsRequest, QueryStatsRequest},
tonic::Request,
Client,
};
@@ -19,7 +19,7 @@ impl<'a> StatsClient<'a> {
/// Get server statistics
pub async fn get_stats(&self) -> Result<Value> {
tracing::info!("Getting stats from Xray server at {}", self.endpoint);
let request = Request::new(GetStatsRequest {
name: "".to_string(),
reset: false,
@@ -44,8 +44,13 @@ impl<'a> StatsClient<'a> {
/// Query specific statistics with pattern
pub async fn query_stats(&self, pattern: &str, reset: bool) -> Result<Value> {
tracing::info!("Querying stats with pattern '{}', reset: {} from {}", pattern, reset, self.endpoint);
tracing::info!(
"Querying stats with pattern '{}', reset: {} from {}",
pattern,
reset,
self.endpoint
);
let request = Request::new(QueryStatsRequest {
pattern: pattern.to_string(),
reset,
@@ -67,4 +72,4 @@ impl<'a> StatsClient<'a> {
}
}
}
}
}

View File

@@ -1,16 +1,16 @@
use anyhow::{Result, anyhow};
use anyhow::{anyhow, Result};
use prost::Message;
use serde_json::Value;
use xray_core::{
tonic::Request,
app::proxyman::command::{AlterInboundRequest, AddUserOperation, RemoveUserOperation},
common::serial::TypedMessage,
app::proxyman::command::{AddUserOperation, AlterInboundRequest, RemoveUserOperation},
common::protocol::User,
common::serial::TypedMessage,
proxy::trojan::Account as TrojanAccount,
proxy::vless::Account as VlessAccount,
proxy::vmess::Account as VmessAccount,
proxy::trojan::Account as TrojanAccount,
tonic::Request,
Client,
};
use prost::Message;
pub struct UserClient<'a> {
endpoint: String,
@@ -28,11 +28,11 @@ impl<'a> UserClient<'a> {
let user_id = user["id"].as_str().unwrap_or("").to_string();
let level = user["level"].as_u64().unwrap_or(0) as u32;
let protocol = user["protocol"].as_str().unwrap_or("vless");
if email.is_empty() || user_id.is_empty() {
return Err(anyhow!("User email and id are required"));
}
// Create user account based on protocol
let account_message = match protocol {
"vless" => {
@@ -45,7 +45,7 @@ impl<'a> UserClient<'a> {
r#type: "xray.proxy.vless.Account".to_string(),
value: account.encode_to_vec(),
}
},
}
"vmess" => {
let account = VmessAccount {
id: user_id,
@@ -56,7 +56,7 @@ impl<'a> UserClient<'a> {
r#type: "xray.proxy.vmess.Account".to_string(),
value: account.encode_to_vec(),
}
},
}
"trojan" => {
let account = TrojanAccount {
password: user_id, // For trojan, use password instead of UUID
@@ -65,36 +65,35 @@ impl<'a> UserClient<'a> {
r#type: "xray.proxy.trojan.Account".to_string(),
value: account.encode_to_vec(),
}
},
}
_ => {
return Err(anyhow!("Unsupported protocol for user: {}", protocol));
}
};
// Create user protobuf message
let user_proto = User {
level: level,
email: email.clone(),
account: Some(account_message),
};
// Build the AddUserOperation
let add_user_op = AddUserOperation {
user: Some(user_proto),
};
let typed_message = TypedMessage {
r#type: "xray.app.proxyman.command.AddUserOperation".to_string(),
value: add_user_op.encode_to_vec(),
};
// Build the AlterInboundRequest
let request = Request::new(AlterInboundRequest {
tag: inbound_tag.to_string(),
operation: Some(typed_message),
});
let mut handler_client = self.client.handler();
match handler_client.alter_inbound(request).await {
Ok(response) => {
@@ -102,40 +101,57 @@ impl<'a> UserClient<'a> {
Ok(())
}
Err(e) => {
tracing::error!("gRPC error adding user '{}' to inbound '{}': status={}, message={}",
email, inbound_tag, e.code(), e.message());
Err(anyhow!("Failed to add user '{}' to inbound '{}': {}", email, inbound_tag, e))
tracing::error!(
"gRPC error adding user '{}' to inbound '{}': status={}, message={}",
email,
inbound_tag,
e.code(),
e.message()
);
Err(anyhow!(
"Failed to add user '{}' to inbound '{}': {}",
email,
inbound_tag,
e
))
}
}
}
/// Remove user from inbound
pub async fn remove_user(&self, inbound_tag: &str, email: &str) -> Result<()> {
// Build the RemoveUserOperation
let remove_user_op = RemoveUserOperation {
email: email.to_string(),
};
let typed_message = TypedMessage {
r#type: "xray.app.proxyman.command.RemoveUserOperation".to_string(),
value: remove_user_op.encode_to_vec(),
};
let request = Request::new(AlterInboundRequest {
tag: inbound_tag.to_string(),
operation: Some(typed_message),
});
let mut handler_client = self.client.handler();
match handler_client.alter_inbound(request).await {
Ok(_) => {
Ok(())
}
Ok(_) => Ok(()),
Err(e) => {
tracing::error!("Failed to remove user '{}' from inbound '{}': {}", email, inbound_tag, e);
Err(anyhow!("Failed to remove user '{}' from inbound '{}': {}", email, inbound_tag, e))
tracing::error!(
"Failed to remove user '{}' from inbound '{}': {}",
email,
inbound_tag,
e
);
Err(anyhow!(
"Failed to remove user '{}' from inbound '{}': {}",
email,
inbound_tag,
e
))
}
}
}
}
}