diff --git a/Cargo.lock b/Cargo.lock index ffe2989..28aa235 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4092,6 +4092,7 @@ dependencies = [ "serde_yaml", "tempfile", "thiserror 1.0.69", + "time", "tokio", "tokio-cron-scheduler", "toml", diff --git a/Cargo.toml b/Cargo.toml index 2c5c0d8..db9c364 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,8 @@ hyper = { version = "1.0", features = ["full"] } xray-core = "0.2.1" # gRPC client for Xray tonic = "0.12" # gRPC client/server framework prost = "0.13" # Protocol Buffers implementation -rcgen = "0.12" # For self-signed certificates +rcgen = { version = "0.12", features = ["pem"] } # For self-signed certificates +time = "0.3" # For certificate date/time handling base64 = "0.21" # For PEM to DER conversion [dev-dependencies] diff --git a/src/services/certificates.rs b/src/services/certificates.rs index b1de12b..6d67866 100644 --- a/src/services/certificates.rs +++ b/src/services/certificates.rs @@ -1,7 +1,10 @@ +use rcgen::{Certificate, CertificateParams, DistinguishedName, DnType, SanType, KeyPair, PKCS_ECDSA_P256_SHA256}; +use std::net::IpAddr; +use time::{Duration, OffsetDateTime}; + /// Certificate management service #[derive(Clone)] pub struct CertificateService { - // Mock implementation for now } #[allow(dead_code)] @@ -10,13 +13,75 @@ impl CertificateService { Self {} } - /// Generate self-signed certificate + /// Generate self-signed certificate optimized for Xray pub async fn generate_self_signed(&self, domain: &str) -> anyhow::Result<(String, String)> { tracing::info!("Generating self-signed certificate for domain: {}", domain); - // Mock implementation - would use rcgen to generate actual certificate - let cert_pem = format!("-----BEGIN CERTIFICATE-----\nMOCK CERT FOR {}\n-----END CERTIFICATE-----", domain); - let key_pem = format!("-----BEGIN PRIVATE KEY-----\nMOCK KEY FOR {}\n-----END PRIVATE KEY-----", domain); + // Create certificate parameters with ECDSA (recommended for Xray) + let mut params = CertificateParams::new(vec![domain.to_string()]); + + // Use ECDSA P-256 which is recommended for Xray (equivalent to RSA-3072 in strength) + params.alg = &PKCS_ECDSA_P256_SHA256; + + // Generate ECDSA key pair + let key_pair = KeyPair::generate(&PKCS_ECDSA_P256_SHA256)?; + params.key_pair = Some(key_pair); + + // Set certificate subject with proper fields + let mut distinguished_name = DistinguishedName::new(); + distinguished_name.push(DnType::CommonName, domain); + distinguished_name.push(DnType::OrganizationName, "OutFleet"); + distinguished_name.push(DnType::OrganizationalUnitName, "VPN"); + distinguished_name.push(DnType::CountryName, "US"); + distinguished_name.push(DnType::StateOrProvinceName, "State"); + distinguished_name.push(DnType::LocalityName, "City"); + params.distinguished_name = distinguished_name; + + // Add comprehensive Subject Alternative Names for better compatibility + let mut san_list = vec![ + SanType::DnsName(domain.to_string()), + SanType::DnsName("localhost".to_string()), + ]; + + // Add IP addresses if domain looks like an IP + if let Ok(ip) = domain.parse::() { + san_list.push(SanType::IpAddress(ip)); + } + + // Always add localhost IP for local testing + san_list.push(SanType::IpAddress(IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)))); + + // If domain is not an IP, also add wildcard subdomain + if domain.parse::().is_err() && !domain.starts_with("*.") { + san_list.push(SanType::DnsName(format!("*.{}", domain))); + } + + params.subject_alt_names = san_list; + + // Set validity period (1 year as recommended) + params.not_before = OffsetDateTime::now_utc(); + params.not_after = OffsetDateTime::now_utc() + Duration::days(365); + + // Set serial number + params.serial_number = Some(rcgen::SerialNumber::from_slice(&[1, 2, 3, 4])); + + // Generate certificate + let cert = Certificate::from_params(params)?; + + // Get PEM format with proper formatting + let cert_pem = cert.serialize_pem()?; + let key_pem = cert.serialize_private_key_pem(); + + // Validate PEM format + if !cert_pem.starts_with("-----BEGIN CERTIFICATE-----") || !cert_pem.ends_with("-----END CERTIFICATE-----\n") { + return Err(anyhow::anyhow!("Invalid certificate PEM format")); + } + + if !key_pem.starts_with("-----BEGIN") || !key_pem.contains("PRIVATE KEY-----") { + return Err(anyhow::anyhow!("Invalid private key PEM format")); + } + + tracing::debug!("Generated ECDSA P-256 certificate for domain: {}", domain); Ok((cert_pem, key_pem)) } @@ -26,11 +91,8 @@ impl CertificateService { pub async fn renew_certificate(&self, domain: &str) -> anyhow::Result<(String, String)> { tracing::info!("Renewing certificate for domain: {}", domain); - // Mock implementation - let cert_pem = format!("-----BEGIN CERTIFICATE-----\nRENEWED CERT FOR {}\n-----END CERTIFICATE-----", domain); - let key_pem = format!("-----BEGIN PRIVATE KEY-----\nRENEWED KEY FOR {}\n-----END PRIVATE KEY-----", domain); - - Ok((cert_pem, key_pem)) + // For now, just generate a new self-signed certificate + self.generate_self_signed(domain).await } } diff --git a/src/services/tasks.rs b/src/services/tasks.rs index b51f1cd..894c5a5 100644 --- a/src/services/tasks.rs +++ b/src/services/tasks.rs @@ -365,43 +365,8 @@ async fn sync_server_inbounds( endpoint: &str, desired_inbounds: &HashMap, ) -> Result<()> { - - // Create or update inbounds - // Since xray has no API to list inbounds, we always recreate them - for (tag, desired) in desired_inbounds { - - // Always try to remove inbound first (ignore errors if it doesn't exist) - let _ = xray_service.remove_inbound(server_id, endpoint, tag).await; - - // Create inbound with users - let users_json: Vec = desired.users.iter().map(|user| { - serde_json::json!({ - "id": user.id, - "email": user.email, - "level": user.level - }) - }).collect(); - - match xray_service.create_inbound_with_users( - server_id, - endpoint, - &desired.tag, - desired.port, - &desired.protocol, - desired.settings.clone(), - desired.stream_settings.clone(), - &users_json, - desired.cert_pem.as_deref(), - desired.key_pem.as_deref(), - ).await { - Err(e) => { - error!("Failed to create inbound {}: {}", tag, e); - } - _ => {} - } - } - - Ok(()) + // Use optimized batch sync with single client + xray_service.sync_server_inbounds_optimized(server_id, endpoint, desired_inbounds).await } /// Sync a single server by ID (for event-driven sync) @@ -440,21 +405,21 @@ async fn sync_single_server_by_id( /// Represents desired inbound configuration from database #[derive(Debug, Clone)] -struct DesiredInbound { - tag: String, - port: i32, - protocol: String, - settings: Value, - stream_settings: Value, - users: Vec, - cert_pem: Option, - key_pem: Option, +pub struct DesiredInbound { + pub tag: String, + pub port: i32, + pub protocol: String, + pub settings: Value, + pub stream_settings: Value, + pub users: Vec, + pub cert_pem: Option, + pub key_pem: Option, } /// Represents xray user configuration #[derive(Debug, Clone)] -struct XrayUser { - id: String, - email: String, - level: i32, +pub struct XrayUser { + pub id: String, + pub email: String, + pub level: i32, } \ No newline at end of file diff --git a/src/services/xray/client.rs b/src/services/xray/client.rs index 0b604e6..d860c10 100644 --- a/src/services/xray/client.rs +++ b/src/services/xray/client.rs @@ -1,6 +1,7 @@ use anyhow::{Result, anyhow}; use serde_json::Value; use xray_core::Client; +use std::sync::Arc; // Import submodules from the same directory use super::stats::StatsClient; @@ -8,17 +9,16 @@ use super::inbounds::InboundClient; use super::users::UserClient; /// Xray gRPC client wrapper +#[derive(Clone)] pub struct XrayClient { endpoint: String, - client: Client, + client: Arc, } #[allow(dead_code)] impl XrayClient { /// Connect to Xray gRPC server pub async fn connect(endpoint: &str) -> Result { - tracing::info!("Connecting to Xray at {}", endpoint); - let client = Client::from_url(endpoint).await .map_err(|e| anyhow!("Failed to connect to Xray at {}: {}", endpoint, e))?; @@ -26,61 +26,61 @@ impl XrayClient { Ok(Self { endpoint: endpoint.to_string(), - client, + client: Arc::new(client), }) } /// Get server statistics pub async fn get_stats(&self) -> Result { - let stats_client = StatsClient::new(self.endpoint.clone(), &self.client); + let stats_client = StatsClient::new(self.endpoint.clone(), &*self.client); stats_client.get_stats().await } /// Query specific statistics with pattern pub async fn query_stats(&self, pattern: &str, reset: bool) -> Result { - let stats_client = StatsClient::new(self.endpoint.clone(), &self.client); + let stats_client = StatsClient::new(self.endpoint.clone(), &*self.client); stats_client.query_stats(pattern, reset).await } /// Restart Xray with new configuration pub async fn restart_with_config(&self, config: &crate::services::xray::XrayConfig) -> Result<()> { - let inbound_client = InboundClient::new(self.endpoint.clone(), &self.client); + let inbound_client = InboundClient::new(self.endpoint.clone(), &*self.client); inbound_client.restart_with_config(config).await } /// Add inbound configuration pub async fn add_inbound(&self, inbound: &Value) -> Result<()> { - let inbound_client = InboundClient::new(self.endpoint.clone(), &self.client); + let inbound_client = InboundClient::new(self.endpoint.clone(), &*self.client); inbound_client.add_inbound(inbound).await } /// Add inbound configuration with TLS certificate pub async fn add_inbound_with_certificate(&self, inbound: &Value, cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> { - let inbound_client = InboundClient::new(self.endpoint.clone(), &self.client); + let inbound_client = InboundClient::new(self.endpoint.clone(), &*self.client); inbound_client.add_inbound_with_certificate(inbound, None, cert_pem, key_pem).await } /// Add inbound configuration with users and TLS certificate pub async fn add_inbound_with_users_and_certificate(&self, inbound: &Value, users: &[Value], cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> { - let inbound_client = InboundClient::new(self.endpoint.clone(), &self.client); + let inbound_client = InboundClient::new(self.endpoint.clone(), &*self.client); inbound_client.add_inbound_with_certificate(inbound, Some(users), cert_pem, key_pem).await } /// Remove inbound by tag pub async fn remove_inbound(&self, tag: &str) -> Result<()> { - let inbound_client = InboundClient::new(self.endpoint.clone(), &self.client); + let inbound_client = InboundClient::new(self.endpoint.clone(), &*self.client); inbound_client.remove_inbound(tag).await } /// Add user to inbound pub async fn add_user(&self, inbound_tag: &str, user: &Value) -> Result<()> { - let user_client = UserClient::new(self.endpoint.clone(), &self.client); + let user_client = UserClient::new(self.endpoint.clone(), &*self.client); user_client.add_user(inbound_tag, user).await } /// Remove user from inbound pub async fn remove_user(&self, inbound_tag: &str, email: &str) -> Result<()> { - let user_client = UserClient::new(self.endpoint.clone(), &self.client); + let user_client = UserClient::new(self.endpoint.clone(), &*self.client); user_client.remove_user(inbound_tag, email).await } diff --git a/src/services/xray/mod.rs b/src/services/xray/mod.rs index ec48968..2773d8a 100644 --- a/src/services/xray/mod.rs +++ b/src/services/xray/mod.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; use tokio::time::{Duration, Instant}; +use tracing::error; pub mod client; pub mod config; @@ -15,24 +16,71 @@ pub mod users; pub use client::XrayClient; pub use config::XrayConfig; +/// Cached connection with TTL +#[derive(Clone)] +struct CachedConnection { + client: XrayClient, + created_at: Instant, +} + +impl CachedConnection { + fn new(client: XrayClient) -> Self { + Self { + client, + created_at: Instant::now(), + } + } + + fn is_expired(&self, ttl: Duration) -> bool { + self.created_at.elapsed() > ttl + } +} + /// Service for managing Xray servers via gRPC #[derive(Clone)] -pub struct XrayService {} +pub struct XrayService { + connection_cache: Arc>>, + connection_ttl: Duration, +} #[allow(dead_code)] impl XrayService { pub fn new() -> Self { - Self {} + Self { + connection_cache: Arc::new(RwLock::new(HashMap::new())), + connection_ttl: Duration::from_secs(300), // 5 minutes TTL + } + } + + /// Get or create cached client for endpoint + async fn get_or_create_client(&self, endpoint: &str) -> Result { + // Check cache first + { + let cache = self.connection_cache.read().await; + if let Some(cached) = cache.get(endpoint) { + if !cached.is_expired(self.connection_ttl) { + return Ok(cached.client.clone()); + } + } + } + + // Create new connection + let client = XrayClient::connect(endpoint).await?; + let cached_connection = CachedConnection::new(client.clone()); + + // Update cache + { + let mut cache = self.connection_cache.write().await; + cache.insert(endpoint.to_string(), cached_connection); + } + + Ok(client) } - /// Create a client for the specified server - async fn create_client(&self, endpoint: &str) -> Result { - XrayClient::connect(endpoint).await - } /// Test connection to Xray server pub async fn test_connection(&self, _server_id: Uuid, endpoint: &str) -> Result { - match self.create_client(endpoint).await { + match self.get_or_create_client(endpoint).await { Ok(_client) => { // Instead of getting stats (which might fail), just test connection // If we successfully created the client, connection is working @@ -44,7 +92,7 @@ impl XrayService { /// Apply full configuration to Xray server pub async fn apply_config(&self, _server_id: Uuid, endpoint: &str, config: &XrayConfig) -> Result<()> { - let client = self.create_client(endpoint).await?; + let client = self.get_or_create_client(endpoint).await?; client.restart_with_config(config).await } @@ -98,25 +146,25 @@ impl XrayService { /// Add inbound to running Xray instance pub async fn add_inbound(&self, _server_id: Uuid, endpoint: &str, inbound: &Value) -> Result<()> { - let client = self.create_client(endpoint).await?; + let client = self.get_or_create_client(endpoint).await?; client.add_inbound(inbound).await } /// Add inbound with certificate to running Xray instance pub async fn add_inbound_with_certificate(&self, _server_id: Uuid, endpoint: &str, inbound: &Value, cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> { - let client = self.create_client(endpoint).await?; + let client = self.get_or_create_client(endpoint).await?; client.add_inbound_with_certificate(inbound, cert_pem, key_pem).await } /// Add inbound with users and certificate to running Xray instance pub async fn add_inbound_with_users_and_certificate(&self, _server_id: Uuid, endpoint: &str, inbound: &Value, users: &[Value], cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> { - let client = self.create_client(endpoint).await?; + let client = self.get_or_create_client(endpoint).await?; client.add_inbound_with_users_and_certificate(inbound, users, cert_pem, key_pem).await } /// Remove inbound from running Xray instance pub async fn remove_inbound(&self, _server_id: Uuid, endpoint: &str, tag: &str) -> Result<()> { - let client = self.create_client(endpoint).await?; + let client = self.get_or_create_client(endpoint).await?; client.remove_inbound(tag).await } @@ -187,21 +235,70 @@ impl XrayService { /// Remove user from inbound pub async fn remove_user(&self, _server_id: Uuid, endpoint: &str, inbound_tag: &str, email: &str) -> Result<()> { - let client = self.create_client(endpoint).await?; + let client = self.get_or_create_client(endpoint).await?; client.remove_user(inbound_tag, email).await } /// Get server statistics pub async fn get_stats(&self, _server_id: Uuid, endpoint: &str) -> Result { - let client = self.create_client(endpoint).await?; + let client = self.get_or_create_client(endpoint).await?; client.get_stats().await } /// Query specific statistics pub async fn query_stats(&self, _server_id: Uuid, endpoint: &str, pattern: &str, reset: bool) -> Result { - let client = self.create_client(endpoint).await?; + let client = self.get_or_create_client(endpoint).await?; client.query_stats(pattern, reset).await } + + /// Sync entire server with batch operations using single client + pub async fn sync_server_inbounds_optimized( + &self, + server_id: Uuid, + endpoint: &str, + desired_inbounds: &HashMap, + ) -> Result<()> { + // Get single client for all operations + let client = self.get_or_create_client(endpoint).await?; + + // Perform all operations with the same client + for (tag, desired) in desired_inbounds { + // Always try to remove inbound first (ignore errors if it doesn't exist) + let _ = client.remove_inbound(tag).await; + + // Create inbound with users + let users_json: Vec = desired.users.iter().map(|user| { + serde_json::json!({ + "id": user.id, + "email": user.email, + "level": user.level + }) + }).collect(); + + // Build inbound config + let inbound_config = serde_json::json!({ + "tag": desired.tag, + "port": desired.port, + "protocol": desired.protocol, + "settings": desired.settings, + "streamSettings": desired.stream_settings + }); + + match client.add_inbound_with_users_and_certificate( + &inbound_config, + &users_json, + desired.cert_pem.as_deref(), + desired.key_pem.as_deref(), + ).await { + Err(e) => { + error!("Failed to create inbound {}: {}", tag, e); + } + _ => {} + } + } + + Ok(()) + } } impl Default for XrayService {