From f59ef73c12b66e5e0c5cf3dd596e6379482a82db Mon Sep 17 00:00:00 2001 From: Ultradesu Date: Fri, 19 Sep 2025 18:30:50 +0300 Subject: [PATCH] Useradd works --- Cargo.lock | 1 + Cargo.toml | 1 + src/database/entities/inbound_users.rs | 68 ++++++++++------ src/database/migrations/mod.rs | 2 + src/database/repository/inbound_users.rs | 24 +++--- src/database/repository/user.rs | 5 ++ src/main.rs | 20 +---- src/services/tasks.rs | 58 ++++---------- src/services/xray/inbounds.rs | 65 ++++++++-------- src/services/xray/mod.rs | 6 -- src/services/xray/users.rs | 9 --- src/web/handlers/servers.rs | 99 ++++++++++++++---------- 12 files changed, 175 insertions(+), 183 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8489557..ffe2989 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4083,6 +4083,7 @@ dependencies = [ "hyper", "log", "prost", + "rand", "rcgen", "sea-orm", "sea-orm-migration", diff --git a/Cargo.toml b/Cargo.toml index 5b68341..2c5c0d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,6 +42,7 @@ chrono = { version = "0.4", features = ["serde"] } async-trait = "0.1" log = "0.4" urlencoding = "2.1" +rand = "0.8" # Web server axum = { version = "0.7", features = ["macros", "json"] } diff --git a/src/database/entities/inbound_users.rs b/src/database/entities/inbound_users.rs index 9d288f0..595881d 100644 --- a/src/database/entities/inbound_users.rs +++ b/src/database/entities/inbound_users.rs @@ -9,14 +9,17 @@ pub struct Model { #[sea_orm(primary_key)] pub id: Uuid, + /// Reference to the actual user + pub user_id: Uuid, + pub server_inbound_id: Uuid, - pub username: String, - - pub email: String, - + /// Generated xray user ID (UUID for protocols like vmess/vless) pub xray_user_id: String, + /// Generated password for protocols like trojan/shadowsocks + pub password: Option, + pub level: i32, pub is_active: bool, @@ -28,6 +31,12 @@ pub struct Model { #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] pub enum Relation { + #[sea_orm( + belongs_to = "super::user::Entity", + from = "Column::UserId", + to = "super::user::Column::Id" + )] + User, #[sea_orm( belongs_to = "super::server_inbound::Entity", from = "Column::ServerInboundId", @@ -36,6 +45,12 @@ pub enum Relation { ServerInbound, } +impl Related for Entity { + fn to() -> RelationDef { + Relation::User.def() + } +} + impl Related for Entity { fn to() -> RelationDef { Relation::ServerInbound.def() @@ -74,41 +89,46 @@ impl ActiveModelBehavior for ActiveModel { /// Inbound user creation data transfer object #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CreateInboundUserDto { + pub user_id: Uuid, pub server_inbound_id: Uuid, - pub username: String, pub level: Option, } impl CreateInboundUserDto { - /// Generate email in format: username@OutFleet - pub fn generate_email(&self) -> String { - format!("{}@OutFleet", self.username) - } - - /// Generate UUID for xray user + /// Generate UUID for xray user (for vmess/vless) pub fn generate_xray_user_id(&self) -> String { Uuid::new_v4().to_string() } + + /// Generate random password (for trojan/shadowsocks) + pub fn generate_password(&self) -> String { + use rand::prelude::*; + use rand::distributions::Alphanumeric; + + thread_rng() + .sample_iter(&Alphanumeric) + .take(24) + .map(char::from) + .collect() + } } /// Inbound user update data transfer object #[derive(Debug, Clone, Serialize, Deserialize)] pub struct UpdateInboundUserDto { - pub username: Option, pub level: Option, pub is_active: Option, } impl From for ActiveModel { fn from(dto: CreateInboundUserDto) -> Self { - let email = dto.generate_email(); let xray_user_id = dto.generate_xray_user_id(); Self { + user_id: Set(dto.user_id), server_inbound_id: Set(dto.server_inbound_id), - username: Set(dto.username), - email: Set(email), xray_user_id: Set(xray_user_id), + password: Set(Some(dto.generate_password())), // Generate password for all protocols level: Set(dto.level.unwrap_or(0)), is_active: Set(true), ..Self::new() @@ -121,11 +141,6 @@ impl Model { pub fn apply_update(self, dto: UpdateInboundUserDto) -> ActiveModel { let mut active_model: ActiveModel = self.into(); - if let Some(username) = dto.username { - let new_email = format!("{}@OutFleet", username); - active_model.username = Set(username); - active_model.email = Set(new_email); - } if let Some(level) = dto.level { active_model.level = Set(level); } @@ -135,16 +150,21 @@ impl Model { active_model } + + /// Generate email for xray client based on user information + pub fn generate_client_email(&self, username: &str) -> String { + format!("{}@OutFleet", username) + } } /// Response model for inbound user #[derive(Debug, Clone, Serialize, Deserialize)] pub struct InboundUserResponse { pub id: Uuid, + pub user_id: Uuid, pub server_inbound_id: Uuid, - pub username: String, - pub email: String, pub xray_user_id: String, + pub password: Option, pub level: i32, pub is_active: bool, pub created_at: String, @@ -155,10 +175,10 @@ impl From for InboundUserResponse { fn from(model: Model) -> Self { Self { id: model.id, + user_id: model.user_id, server_inbound_id: model.server_inbound_id, - username: model.username, - email: model.email, xray_user_id: model.xray_user_id, + password: model.password, level: model.level, is_active: model.is_active, created_at: model.created_at.to_rfc3339(), diff --git a/src/database/migrations/mod.rs b/src/database/migrations/mod.rs index f7c7692..98c015e 100644 --- a/src/database/migrations/mod.rs +++ b/src/database/migrations/mod.rs @@ -7,6 +7,7 @@ mod m20241201_000004_create_servers_table; mod m20241201_000005_create_server_inbounds_table; mod m20241201_000006_create_user_access_table; mod m20241201_000007_create_inbound_users_table; +mod m20250919_000001_update_inbound_users_schema; pub struct Migrator; @@ -21,6 +22,7 @@ impl MigratorTrait for Migrator { Box::new(m20241201_000005_create_server_inbounds_table::Migration), Box::new(m20241201_000006_create_user_access_table::Migration), Box::new(m20241201_000007_create_inbound_users_table::Migration), + Box::new(m20250919_000001_update_inbound_users_schema::Migration), ] } } \ No newline at end of file diff --git a/src/database/repository/inbound_users.rs b/src/database/repository/inbound_users.rs index bc394f0..71e0efd 100644 --- a/src/database/repository/inbound_users.rs +++ b/src/database/repository/inbound_users.rs @@ -44,23 +44,23 @@ impl InboundUsersRepository { Ok(users) } - /// Find user by username and inbound (for uniqueness check) - pub async fn find_by_username_and_inbound(&self, username: &str, inbound_id: Uuid) -> Result> { + /// Find user by user_id and inbound (for uniqueness check - one user per inbound) + pub async fn find_by_user_and_inbound(&self, user_id: Uuid, inbound_id: Uuid) -> Result> { let user = Entity::find() - .filter(Column::Username.eq(username)) + .filter(Column::UserId.eq(user_id)) .filter(Column::ServerInboundId.eq(inbound_id)) .one(&self.db) .await?; Ok(user) } - /// Find user by email - pub async fn find_by_email(&self, email: &str) -> Result> { - let user = Entity::find() - .filter(Column::Email.eq(email)) - .one(&self.db) + /// Find all inbound access for a specific user + pub async fn find_by_user_id(&self, user_id: Uuid) -> Result> { + let users = Entity::find() + .filter(Column::UserId.eq(user_id)) + .all(&self.db) .await?; - Ok(user) + Ok(users) } pub async fn create(&self, dto: CreateInboundUserDto) -> Result { @@ -124,9 +124,9 @@ impl InboundUsersRepository { Ok(result.rows_affected) } - /// Check if username already exists on this inbound - pub async fn username_exists_on_inbound(&self, username: &str, inbound_id: Uuid) -> Result { - let exists = self.find_by_username_and_inbound(username, inbound_id).await?; + /// Check if user already has access to this inbound + pub async fn user_has_access_to_inbound(&self, user_id: Uuid, inbound_id: Uuid) -> Result { + let exists = self.find_by_user_and_inbound(user_id, inbound_id).await?; Ok(exists.is_some()) } } \ No newline at end of file diff --git a/src/database/repository/user.rs b/src/database/repository/user.rs index f535c73..0bf64fa 100644 --- a/src/database/repository/user.rs +++ b/src/database/repository/user.rs @@ -30,6 +30,11 @@ impl UserRepository { Ok(user) } + /// Find user by ID (alias for get_by_id) + pub async fn find_by_id(&self, id: Uuid) -> Result> { + self.get_by_id(id).await + } + /// Get user by telegram ID pub async fn get_by_telegram_id(&self, telegram_id: i64) -> Result> { let user = User::find() diff --git a/src/main.rs b/src/main.rs index 546dde3..20f2497 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,7 +18,6 @@ async fn main() -> Result<()> { // Initialize logging early with basic configuration init_logging(&args.log_level.as_deref().unwrap_or("info"))?; - tracing::info!("Starting Xray Admin Panel v{}", env!("CARGO_PKG_VERSION")); // Handle special flags if args.print_default_config { @@ -29,7 +28,6 @@ async fn main() -> Result<()> { // Load configuration let config = match AppConfig::load() { Ok(config) => { - tracing::info!("Configuration loaded successfully"); config } Err(e) => { @@ -37,14 +35,12 @@ async fn main() -> Result<()> { if args.validate_config { std::process::exit(1); } - tracing::warn!("Using default configuration"); AppConfig::default() } }; // Validate configuration if requested if args.validate_config { - tracing::info!("Configuration validation passed"); return Ok(()); } @@ -58,10 +54,8 @@ async fn main() -> Result<()> { // Initialize database connection - tracing::info!("Initializing database connection..."); let db = match DatabaseManager::new(&config.database).await { Ok(db) => { - tracing::info!("Database initialized successfully"); db } Err(e) => { @@ -72,19 +66,13 @@ async fn main() -> Result<()> { // Perform database health check match db.health_check().await { - Ok(true) => tracing::info!("Database health check passed"), Ok(false) => tracing::warn!("Database health check failed"), Err(e) => tracing::error!("Database health check error: {}", e), - } - - // Get schema version - if let Ok(Some(version)) = db.get_schema_version().await { - tracing::info!("Database schema version: {}", version); + _ => {} } // Initialize event bus first let event_receiver = crate::services::events::init_event_bus(); - tracing::info!("Event bus initialized"); // Initialize xray service let xray_service = XrayService::new(); @@ -92,24 +80,20 @@ async fn main() -> Result<()> { // Initialize and start task scheduler with dependencies let mut task_scheduler = TaskScheduler::new().await?; task_scheduler.start(db.clone(), xray_service).await?; - tracing::info!("Task scheduler started with xray sync"); // Start event-driven sync handler with the receiver TaskScheduler::start_event_handler(db.clone(), event_receiver).await; - tracing::info!("Event-driven sync handler started"); // Start web server with task scheduler - tracing::info!("Starting web server on {}:{}", config.web.host, config.web.port); tokio::select! { result = web::start_server(db, config.web.clone()) => { match result { - Ok(_) => tracing::info!("Web server stopped gracefully"), Err(e) => tracing::error!("Web server error: {}", e), + _ => {} } } _ = tokio::signal::ctrl_c() => { - tracing::info!("Shutdown signal received, stopping services..."); if let Err(e) = task_scheduler.shutdown().await { tracing::error!("Error shutting down task scheduler: {}", e); } diff --git a/src/services/tasks.rs b/src/services/tasks.rs index 29a9e92..b51f1cd 100644 --- a/src/services/tasks.rs +++ b/src/services/tasks.rs @@ -2,7 +2,7 @@ use anyhow::Result; use tokio_cron_scheduler::{JobScheduler, Job}; use tracing::{info, error, warn}; use crate::database::DatabaseManager; -use crate::database::repository::{ServerRepository, ServerInboundRepository, InboundTemplateRepository, InboundUsersRepository, CertificateRepository}; +use crate::database::repository::{ServerRepository, ServerInboundRepository, InboundTemplateRepository, InboundUsersRepository, CertificateRepository, UserRepository}; use crate::database::entities::inbound_users; use crate::services::XrayService; use crate::services::events::SyncEvent; @@ -60,17 +60,12 @@ impl TaskScheduler { let xray_service = XrayService::new(); tokio::spawn(async move { - info!("Starting event-driven sync handler"); while let Ok(event) = event_receiver.recv().await { match event { SyncEvent::InboundChanged(server_id) | SyncEvent::UserAccessChanged(server_id) => { - info!("Received sync event for server {}", server_id); - if let Err(e) = sync_single_server_by_id(&xray_service, &db, server_id).await { error!("Failed to sync server {} from event: {}", server_id, e); - } else { - info!("Successfully synced server {} from event", server_id); } } } @@ -79,7 +74,6 @@ impl TaskScheduler { } pub async fn start(&mut self, db: DatabaseManager, xray_service: XrayService) -> Result<()> { - info!("Starting task scheduler with database synchronization"); // Initialize task status { @@ -100,7 +94,6 @@ impl TaskScheduler { } // Run initial sync on startup - info!("Running initial xray synchronization on startup"); let start_time = Utc::now(); self.update_task_status("xray_sync", TaskState::Running, None); @@ -108,7 +101,6 @@ impl TaskScheduler { Ok(_) => { let duration = (Utc::now() - start_time).num_milliseconds() as u64; self.update_task_status("xray_sync", TaskState::Success, Some(duration)); - info!("Initial xray sync completed successfully"); }, Err(e) => { let duration = (Utc::now() - start_time).num_milliseconds() as u64; @@ -128,7 +120,6 @@ impl TaskScheduler { let task_status = task_status_clone.clone(); Box::pin(async move { - info!("Running scheduled xray synchronization"); let start_time = Utc::now(); // Update status to running @@ -152,7 +143,6 @@ impl TaskScheduler { task.last_duration_ms = Some(duration); task.last_error = None; } - info!("Scheduled xray sync completed successfully in {}ms", duration); }, Err(e) => { let duration = (Utc::now() - start_time).num_milliseconds() as u64; @@ -171,7 +161,6 @@ impl TaskScheduler { self.scheduler.add(sync_job).await?; - info!("Task scheduler started with sync job running every minute"); self.scheduler.start().await?; Ok(()) @@ -202,7 +191,6 @@ impl TaskScheduler { } pub async fn shutdown(&mut self) -> Result<()> { - info!("Shutting down task scheduler"); self.scheduler.shutdown().await?; Ok(()) } @@ -210,7 +198,6 @@ impl TaskScheduler { /// Synchronize xray server state with database state async fn sync_xray_state(db: DatabaseManager, xray_service: XrayService) -> Result<()> { - info!("Starting xray state synchronization"); let server_repo = ServerRepository::new(db.connection().clone()); let inbound_repo = ServerInboundRepository::new(db.connection().clone()); @@ -225,18 +212,13 @@ async fn sync_xray_state(db: DatabaseManager, xray_service: XrayService) -> Resu } }; - info!("Found {} servers to synchronize", servers.len()); for server in servers { - info!("Synchronizing server: {} ({}:{})", server.name, server.hostname, server.grpc_port); let endpoint = format!("{}:{}", server.hostname, server.grpc_port); // Test connection first match xray_service.test_connection(server.id, &endpoint).await { - Ok(true) => { - info!("Connection to server {} successful", server.name); - }, Ok(false) => { warn!("Cannot connect to server {} at {}, skipping", server.name, endpoint); continue; @@ -245,6 +227,7 @@ async fn sync_xray_state(db: DatabaseManager, xray_service: XrayService) -> Resu error!("Error testing connection to server {}: {}", server.name, e); continue; } + _ => {} } // Get desired inbounds from database @@ -256,7 +239,6 @@ async fn sync_xray_state(db: DatabaseManager, xray_service: XrayService) -> Resu } }; - info!("Server {}: desired={} inbounds", server.name, desired_inbounds.len()); // Synchronize inbounds if let Err(e) = sync_server_inbounds( @@ -266,12 +248,9 @@ async fn sync_xray_state(db: DatabaseManager, xray_service: XrayService) -> Resu &desired_inbounds ).await { error!("Failed to sync inbounds for server {}: {}", server.name, e); - } else { - info!("Successfully synchronized server {}", server.name); } } - info!("Xray state synchronization completed"); Ok(()) } @@ -283,7 +262,6 @@ async fn get_desired_inbounds_from_db( inbound_repo: &ServerInboundRepository, template_repo: &InboundTemplateRepository, ) -> Result> { - info!("Getting desired inbounds for server {} from database", server.name); // Get all inbounds for this server let inbounds = inbound_repo.find_by_server_id(server.id).await?; @@ -302,7 +280,6 @@ async fn get_desired_inbounds_from_db( // Get users for this inbound let users = get_users_for_inbound(db, inbound.id).await?; - info!("Inbound {}: {} users found", inbound.tag, users.len()); // Get port from template or override let port = inbound.port_override.unwrap_or(template.default_port); @@ -334,7 +311,6 @@ async fn get_desired_inbounds_from_db( desired_inbounds.insert(inbound.tag.clone(), desired_inbound); } - info!("Found {} desired inbounds for server {}", desired_inbounds.len(), server.name); Ok(desired_inbounds) } @@ -344,13 +320,20 @@ async fn get_users_for_inbound(db: &DatabaseManager, inbound_id: Uuid) -> Result let inbound_users = inbound_users_repo.find_active_by_inbound_id(inbound_id).await?; - let users: Vec = inbound_users.into_iter().map(|user| { - XrayUser { - id: user.xray_user_id, - email: user.email, - level: user.level, + // Get user details to generate emails + let user_repo = UserRepository::new(db.connection().clone()); + + let mut users: Vec = Vec::new(); + for inbound_user in inbound_users { + if let Some(user) = user_repo.find_by_id(inbound_user.user_id).await? { + let email = inbound_user.generate_client_email(&user.name); + users.push(XrayUser { + id: inbound_user.xray_user_id, + email, + level: inbound_user.level, + }); } - }).collect(); + } Ok(users) } @@ -366,7 +349,6 @@ async fn load_certificate_from_db(db: &DatabaseManager, cert_id: Option) - match cert_repo.find_by_id(cert_id).await? { Some(cert) => { - info!("Loaded certificate: {}", cert.domain); Ok((Some(cert.certificate_pem()), Some(cert.private_key_pem()))) }, None => { @@ -387,13 +369,9 @@ async fn sync_server_inbounds( // Create or update inbounds // Since xray has no API to list inbounds, we always recreate them for (tag, desired) in desired_inbounds { - info!("Creating/updating inbound: {} with {} users", tag, desired.users.len()); // Always try to remove inbound first (ignore errors if it doesn't exist) - if let Err(e) = xray_service.remove_inbound(server_id, endpoint, tag).await { - // Log but don't fail - inbound might not exist - info!("Inbound {} removal result: {} (this is normal if inbound didn't exist)", tag, e); - } + let _ = xray_service.remove_inbound(server_id, endpoint, tag).await; // Create inbound with users let users_json: Vec = desired.users.iter().map(|user| { @@ -416,12 +394,10 @@ async fn sync_server_inbounds( desired.cert_pem.as_deref(), desired.key_pem.as_deref(), ).await { - Ok(_) => { - info!("Successfully created inbound {} with {} users", tag, desired.users.len()); - }, Err(e) => { error!("Failed to create inbound {}: {}", tag, e); } + _ => {} } } diff --git a/src/services/xray/inbounds.rs b/src/services/xray/inbounds.rs index 07ac2f1..80afdbc 100644 --- a/src/services/xray/inbounds.rs +++ b/src/services/xray/inbounds.rs @@ -1,5 +1,6 @@ use anyhow::{Result, anyhow}; use serde_json::Value; +use uuid; use xray_core::{ tonic::Request, app::proxyman::command::{AddInboundRequest, RemoveInboundRequest}, @@ -7,7 +8,7 @@ use xray_core::{ common::serial::TypedMessage, common::protocol::User, app::proxyman::ReceiverConfig, - common::net::{PortList, PortRange, IpOrDomain, ip_or_domain::Address}, + common::net::{PortList, PortRange, IpOrDomain, ip_or_domain::Address, Network}, transport::internet::StreamConfig, transport::internet::tls::{Config as TlsConfig, Certificate as TlsCertificate}, proxy::vless::inbound::Config as VlessInboundConfig, @@ -17,7 +18,7 @@ use xray_core::{ proxy::trojan::ServerConfig as TrojanServerConfig, proxy::trojan::Account as TrojanAccount, proxy::shadowsocks::ServerConfig as ShadowsocksServerConfig, - proxy::shadowsocks::Account as ShadowsocksAccount, + proxy::shadowsocks::{Account as ShadowsocksAccount, CipherType}, Client, prost_types, }; @@ -32,8 +33,7 @@ fn pem_to_der(pem_data: &str) -> Result> { .collect::>() .join(""); - tracing::debug!("Base64 data length: {}", base64_data.len()); - tracing::debug!("Base64 data: {}", &base64_data[..std::cmp::min(100, base64_data.len())]); + tracing::debug!("PEM to DER conversion: {} bytes", base64_data.len()); use base64::{Engine as _, engine::general_purpose}; general_purpose::STANDARD.decode(&base64_data) @@ -57,14 +57,11 @@ impl<'a> InboundClient<'a> { /// Add inbound configuration with TLS certificate and users pub async fn add_inbound_with_certificate(&self, inbound: &Value, users: Option<&[Value]>, cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> { - tracing::info!("Adding inbound to Xray server at {}", self.endpoint); - tracing::debug!("Inbound config: {}", serde_json::to_string_pretty(inbound)?); - let tag = inbound["tag"].as_str().unwrap_or("").to_string(); let port = inbound["port"].as_u64().unwrap_or(8080) as u32; let protocol = inbound["protocol"].as_str().unwrap_or("vless"); + let user_count = users.map_or(0, |u| u.len()); - tracing::debug!("Creating inbound: tag={}, port={}, protocol={}", tag, port, protocol); // Create receiver configuration (port binding) - use simple port number let port_list = PortList { @@ -79,8 +76,6 @@ impl<'a> InboundClient<'a> { let cert_pem = cert_pem.unwrap(); let key_pem = key_pem.unwrap(); - tracing::info!("Creating StreamConfig with TLS like working example"); - // Create TLS certificate exactly like working example - PEM content as bytes let tls_cert = TlsCertificate { certificate: cert_pem.as_bytes().to_vec(), // PEM content as bytes like working example @@ -106,7 +101,7 @@ impl<'a> InboundClient<'a> { value: tls_config.encode_to_vec(), }; - tracing::info!("Created TLS config with server_name: {}, next_protocol: {:?}", + tracing::debug!("TLS config: server_name={}, protocols={:?}", tls_config.server_name, tls_config.next_protocol); // Create StreamConfig like working example @@ -120,7 +115,6 @@ impl<'a> InboundClient<'a> { socket_settings: None, }) } else { - tracing::info!("No certificates provided, creating inbound without TLS"); None }; @@ -186,18 +180,31 @@ impl<'a> InboundClient<'a> { let email = user["email"].as_str().unwrap_or("").to_string(); let level = user["level"].as_u64().unwrap_or(0) as u32; + // Validate required fields + if user_id.is_empty() || email.is_empty() { + tracing::warn!("Skipping VMess user: missing id or email"); + continue; + } + + // Validate UUID format + if uuid::Uuid::parse_str(&user_id).is_err() { + tracing::warn!("VMess user '{}' has invalid UUID format", user_id); + } + if !user_id.is_empty() && !email.is_empty() { let account = VmessAccount { - id: user_id, + id: user_id.clone(), security_settings: None, - tests_enabled: "".to_string(), + tests_enabled: "".to_string(), // Keep empty as in examples }; + let account_bytes = account.encode_to_vec(); + vmess_users.push(User { - email, + email: email.clone(), level, account: Some(TypedMessage { r#type: "xray.proxy.vmess.Account".to_string(), - value: account.encode_to_vec(), + value: account_bytes, }), }); } @@ -255,14 +262,15 @@ impl<'a> InboundClient<'a> { let email = user["email"].as_str().unwrap_or("").to_string(); let level = user["level"].as_u64().unwrap_or(0) as u32; + if !password.is_empty() && !email.is_empty() { let account = ShadowsocksAccount { password, - cipher_type: 0, // Default cipher + cipher_type: CipherType::Aes256Gcm as i32, // Use AES-256-GCM cipher iv_check: false, // Default IV check }; ss_users.push(User { - email, + email: email.clone(), level, account: Some(TypedMessage { r#type: "xray.proxy.shadowsocks.Account".to_string(), @@ -275,7 +283,7 @@ impl<'a> InboundClient<'a> { let shadowsocks_config = ShadowsocksServerConfig { users: ss_users, - network: vec![], // Support all networks by default + network: vec![Network::Tcp as i32, Network::Udp as i32], // Support TCP and UDP }; TypedMessage { r#type: "xray.proxy.shadowsocks.ServerConfig".to_string(), @@ -293,21 +301,17 @@ impl<'a> InboundClient<'a> { proxy_settings: Some(proxy_message), }; - tracing::info!("Sending AddInboundRequest for '{}'", tag); - tracing::debug!("InboundConfig: {:?}", inbound_config); - let request = Request::new(AddInboundRequest { inbound: Some(inbound_config), }); let mut handler_client = self.client.handler(); match handler_client.add_inbound(request).await { - Ok(response) => { - let _response_inner = response.into_inner(); - tracing::info!("Successfully added inbound {}", tag); + Ok(_) => { + tracing::info!("Added {} inbound '{}' successfully", protocol, tag); Ok(()) } Err(e) => { - tracing::error!("Failed to add inbound {}: {}", tag, e); + tracing::error!("Failed to add {} inbound '{}': {}", protocol, tag, e); Err(anyhow!("Failed to add inbound {}: {}", tag, e)) } } @@ -315,8 +319,6 @@ impl<'a> InboundClient<'a> { /// Remove inbound by tag pub async fn remove_inbound(&self, tag: &str) -> Result<()> { - tracing::info!("Removing inbound '{}' from Xray server at {}", tag, self.endpoint); - let mut handler_client = self.client.handler(); let request = Request::new(RemoveInboundRequest { tag: tag.to_string(), @@ -324,11 +326,11 @@ impl<'a> InboundClient<'a> { match handler_client.remove_inbound(request).await { Ok(_) => { - tracing::info!("Successfully removed inbound"); + tracing::info!("Removed inbound '{}' from {}", tag, self.endpoint); Ok(()) }, Err(e) => { - tracing::error!("Failed to remove inbound: {}", e); + tracing::error!("Failed to remove inbound '{}': {}", tag, e); Err(anyhow!("Failed to remove inbound: {}", e)) } } @@ -336,8 +338,7 @@ impl<'a> InboundClient<'a> { /// Restart Xray with new configuration pub async fn restart_with_config(&self, config: &crate::services::xray::XrayConfig) -> Result<()> { - tracing::info!("Restarting Xray server at {} with new config", self.endpoint); - tracing::debug!("Config: {}", serde_json::to_string_pretty(&config.to_json())?); + tracing::debug!("Restarting Xray server at {} with new config", self.endpoint); // TODO: Implement restart with config using xray-core // For now just return success diff --git a/src/services/xray/mod.rs b/src/services/xray/mod.rs index 3eec4ca..87bb273 100644 --- a/src/services/xray/mod.rs +++ b/src/services/xray/mod.rs @@ -64,7 +64,6 @@ impl XrayService { "streamSettings": stream_settings }); - tracing::info!("Creating inbound with config: {}", inbound_config); self.add_inbound(_server_id, endpoint, &inbound_config).await } @@ -90,7 +89,6 @@ impl XrayService { "streamSettings": stream_settings }); - tracing::info!("Creating inbound with TLS certificate and config: {}", inbound_config); self.add_inbound_with_certificate(_server_id, endpoint, &inbound_config, cert_pem, key_pem).await } @@ -120,8 +118,6 @@ impl XrayService { /// Add user to inbound by recreating the inbound with updated user list pub async fn add_user(&self, _server_id: Uuid, endpoint: &str, inbound_tag: &str, user: &Value) -> Result<()> { - tracing::info!("XrayService::add_user called for server {} endpoint {} inbound_tag {}", _server_id, endpoint, inbound_tag); - tracing::warn!("Dynamic user addition via AlterInboundRequest doesn't work reliably - need to implement inbound recreation"); // TODO: Implement inbound recreation approach: // 1. Get current inbound configuration from database @@ -147,7 +143,6 @@ impl XrayService { cert_pem: Option<&str>, key_pem: Option<&str>, ) -> Result<()> { - tracing::info!("Creating inbound '{}' with {} users", tag, users.len()); // Build inbound configuration with users let mut inbound_config = serde_json::json!({ @@ -181,7 +176,6 @@ impl XrayService { inbound_config["settings"] = settings; } - tracing::info!("Creating inbound with users: {}", serde_json::to_string_pretty(&inbound_config)?); // Use the new method with users support self.add_inbound_with_users_and_certificate(_server_id, endpoint, &inbound_config, users, cert_pem, key_pem).await diff --git a/src/services/xray/users.rs b/src/services/xray/users.rs index eadc171..7d243f8 100644 --- a/src/services/xray/users.rs +++ b/src/services/xray/users.rs @@ -24,16 +24,11 @@ impl<'a> UserClient<'a> { /// Add user to inbound (simple version that works) pub async fn add_user(&self, inbound_tag: &str, user: &Value) -> Result<()> { - tracing::info!("Adding user to inbound '{}' on Xray server at {}", inbound_tag, self.endpoint); - tracing::debug!("User config: {}", serde_json::to_string_pretty(user)?); - let email = user["email"].as_str().unwrap_or("").to_string(); let user_id = user["id"].as_str().unwrap_or("").to_string(); let level = user["level"].as_u64().unwrap_or(0) as u32; let protocol = user["protocol"].as_str().unwrap_or("vless"); - tracing::info!("Parsed user data: email={}, id={}, level={}, protocol={}", email, user_id, level, protocol); - if email.is_empty() || user_id.is_empty() { return Err(anyhow!("User email and id are required")); } @@ -99,13 +94,11 @@ impl<'a> UserClient<'a> { operation: Some(typed_message), }); - tracing::info!("Sending AlterInboundRequest to add user '{}' to inbound '{}'", email, inbound_tag); let mut handler_client = self.client.handler(); match handler_client.alter_inbound(request).await { Ok(response) => { let _response_inner = response.into_inner(); - tracing::info!("Successfully added user '{}' to inbound '{}'", email, inbound_tag); Ok(()) } Err(e) => { @@ -118,7 +111,6 @@ impl<'a> UserClient<'a> { /// Remove user from inbound pub async fn remove_user(&self, inbound_tag: &str, email: &str) -> Result<()> { - tracing::info!("Removing user '{}' from inbound '{}' on Xray server at {}", email, inbound_tag, self.endpoint); // Build the RemoveUserOperation let remove_user_op = RemoveUserOperation { @@ -138,7 +130,6 @@ impl<'a> UserClient<'a> { let mut handler_client = self.client.handler(); match handler_client.alter_inbound(request).await { Ok(_) => { - tracing::info!("Successfully removed user '{}' from inbound '{}'", email, inbound_tag); Ok(()) } Err(e) => { diff --git a/src/web/handlers/servers.rs b/src/web/handlers/servers.rs index 8a07534..1c003dc 100644 --- a/src/web/handlers/servers.rs +++ b/src/web/handlers/servers.rs @@ -8,7 +8,7 @@ use uuid::Uuid; use crate::{ database::{ entities::{server, server_inbound}, - repository::{ServerRepository, ServerInboundRepository, InboundTemplateRepository, CertificateRepository, InboundUsersRepository}, + repository::{ServerRepository, ServerInboundRepository, InboundTemplateRepository, CertificateRepository, InboundUsersRepository, UserRepository}, }, web::AppState, }; @@ -183,7 +183,7 @@ pub async fn create_server_inbound( Path(server_id): Path, JsonExtractor(inbound_data): JsonExtractor, ) -> Result, StatusCode> { - tracing::info!("Creating server inbound for server {}: {:?}", server_id, inbound_data); + tracing::debug!("Creating server inbound for server {}", server_id); let server_repo = ServerRepository::new(app_state.db.connection().clone()); let inbound_repo = ServerInboundRepository::new(app_state.db.connection().clone()); @@ -223,11 +223,10 @@ pub async fn create_server_inbound( let (cert_pem, key_pem) = if let Some(cert_id) = inbound.certificate_id { match cert_repo.find_by_id(cert_id).await { Ok(Some(cert)) => { - tracing::info!("Using certificate {} for inbound {}", cert.domain, inbound.tag); (Some(cert.certificate_pem()), Some(cert.private_key_pem())) }, Ok(None) => { - tracing::warn!("Certificate {} not found, creating inbound without TLS", cert_id); + tracing::warn!("Certificate {} not found", cert_id); (None, None) }, Err(e) => { @@ -236,7 +235,6 @@ pub async fn create_server_inbound( } } } else { - tracing::info!("No certificate specified for inbound {}, creating without TLS", inbound.tag); (None, None) }; @@ -252,16 +250,16 @@ pub async fn create_server_inbound( key_pem.as_deref(), ).await { Ok(_) => { - tracing::info!("Successfully created inbound {} on xray server {}", inbound.tag, endpoint); + tracing::info!("Created inbound '{}' on {}", inbound.tag, endpoint); }, Err(e) => { - tracing::error!("Failed to create inbound on xray server {}: {}", endpoint, e); + tracing::error!("Failed to create inbound '{}' on {}: {}", inbound.tag, endpoint, e); // Note: We don't fail the request since the inbound is already in DB // The user can manually sync or retry later } } } else { - tracing::info!("Inbound {} created as inactive, skipping xray server creation", inbound.tag); + tracing::debug!("Inbound '{}' created as inactive", inbound.tag); } Ok(Json(inbound.into())) @@ -273,7 +271,7 @@ pub async fn update_server_inbound( Path((server_id, inbound_id)): Path<(Uuid, Uuid)>, JsonExtractor(inbound_data): JsonExtractor, ) -> Result, StatusCode> { - tracing::info!("Updating server inbound {} for server {}: {:?}", inbound_id, server_id, inbound_data); + tracing::debug!("Updating server inbound {} for server {}", inbound_id, server_id); let server_repo = ServerRepository::new(app_state.db.connection().clone()); let inbound_repo = ServerInboundRepository::new(app_state.db.connection().clone()); @@ -303,19 +301,17 @@ pub async fn update_server_inbound( // Handle xray server changes based on active status change if old_is_active && !new_is_active { // Becoming inactive - remove from xray server - tracing::info!("Inbound {} becoming inactive, removing from xray server {}", current_inbound.tag, endpoint); match app_state.xray_service.remove_inbound(server_id, &endpoint, ¤t_inbound.tag).await { Ok(_) => { - tracing::info!("Successfully removed inbound {} from xray server", current_inbound.tag); + tracing::info!("Deactivated inbound '{}' on {}", current_inbound.tag, endpoint); }, Err(e) => { - tracing::error!("Failed to remove inbound {} from xray server: {}", current_inbound.tag, e); + tracing::error!("Failed to deactivate inbound '{}': {}", current_inbound.tag, e); // Continue with database update even if xray removal fails } } } else if !old_is_active && new_is_active { // Becoming active - add to xray server - tracing::info!("Inbound {} becoming active, adding to xray server {}", current_inbound.tag, endpoint); // Get template info for recreation let template = match template_repo.find_by_id(current_inbound.template_id).await { @@ -332,11 +328,10 @@ pub async fn update_server_inbound( let (cert_pem, key_pem) = if let Some(cert_id) = certificate_id { match cert_repo.find_by_id(cert_id).await { Ok(Some(cert)) => { - tracing::info!("Using certificate {} for inbound {}", cert.domain, current_inbound.tag); (Some(cert.certificate_pem()), Some(cert.private_key_pem())) }, Ok(None) => { - tracing::warn!("Certificate {} not found, creating inbound without TLS", cert_id); + tracing::warn!("Certificate {} not found", cert_id); (None, None) }, Err(e) => { @@ -345,7 +340,6 @@ pub async fn update_server_inbound( } } } else { - tracing::info!("No certificate specified for inbound {}, creating without TLS", current_inbound.tag); (None, None) }; @@ -361,10 +355,10 @@ pub async fn update_server_inbound( key_pem.as_deref(), ).await { Ok(_) => { - tracing::info!("Successfully added inbound {} to xray server", current_inbound.tag); + tracing::info!("Activated inbound '{}' on {}", current_inbound.tag, endpoint); }, Err(e) => { - tracing::error!("Failed to add inbound {} to xray server: {}", current_inbound.tag, e); + tracing::error!("Failed to activate inbound '{}': {}", current_inbound.tag, e); // Continue with database update even if xray creation fails } } @@ -428,10 +422,10 @@ pub async fn delete_server_inbound( let endpoint = server.get_grpc_endpoint(); match app_state.xray_service.remove_inbound(server_id, &endpoint, &inbound.tag).await { Ok(_) => { - tracing::info!("Successfully removed inbound {} from xray server {}", inbound.tag, endpoint); + tracing::info!("Removed inbound '{}' from {}", inbound.tag, endpoint); }, Err(e) => { - tracing::error!("Failed to remove inbound from xray server {}: {}", endpoint, e); + tracing::error!("Failed to remove inbound '{}' from {}: {}", inbound.tag, endpoint, e); // Continue with database deletion even if xray removal fails } } @@ -450,16 +444,18 @@ pub async fn delete_server_inbound( } } -/// Add user to server inbound (database only - sync will apply changes) +/// Give user access to server inbound (database only - sync will apply changes) pub async fn add_user_to_inbound( State(app_state): State, Path((server_id, inbound_id)): Path<(Uuid, Uuid)>, JsonExtractor(user_data): JsonExtractor, ) -> Result { use crate::database::entities::inbound_users::CreateInboundUserDto; + use crate::database::entities::user::CreateUserDto; let server_repo = ServerRepository::new(app_state.db.connection().clone()); let inbound_repo = ServerInboundRepository::new(app_state.db.connection().clone()); + let user_repo = UserRepository::new(app_state.db.connection().clone()); // Get server and inbound to validate they exist let _server = match server_repo.find_by_id(server_id).await { @@ -479,54 +475,75 @@ pub async fn add_user_to_inbound( return Err(StatusCode::BAD_REQUEST); } - // Extract user data with better error handling - tracing::debug!("Received user_data: {}", serde_json::to_string_pretty(&user_data).unwrap_or_else(|_| "invalid json".to_string())); + // Extract user data - let username = user_data["username"].as_str() - .or_else(|| user_data["email"].as_str()) // Try email as fallback - .or_else(|| user_data["name"].as_str()) // Try name as fallback + let user_name = user_data["name"].as_str() + .or_else(|| user_data["username"].as_str()) + .or_else(|| user_data["email"].as_str()) .map(|s| s.to_string()) .unwrap_or_else(|| { - // Generate username if not provided format!("user_{}", Uuid::new_v4().to_string()[..8].to_string()) }); let level = user_data["level"].as_u64().unwrap_or(0) as i32; + let user_id = user_data["user_id"].as_str().and_then(|s| Uuid::parse_str(s).ok()); - tracing::info!("Creating user with username: '{}' and level: {}", username, level); + // Get or create user + let user = if let Some(uid) = user_id { + // Use existing user + match user_repo.find_by_id(uid).await { + Ok(Some(user)) => user, + Ok(None) => return Err(StatusCode::NOT_FOUND), // User not found + Err(_) => return Err(StatusCode::INTERNAL_SERVER_ERROR), + } + } else { + // Create new user + let create_user_dto = CreateUserDto { + name: user_name.clone(), + comment: user_data["comment"].as_str().map(|s| s.to_string()), + telegram_id: user_data["telegram_id"].as_i64(), + }; + + match user_repo.create(create_user_dto).await { + Ok(user) => user, + Err(e) => { + tracing::error!("Failed to create user '{}': {}", user_name, e); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + } + }; // Create inbound user repository let inbound_users_repo = InboundUsersRepository::new(app_state.db.connection().clone()); - // Check if username already exists on this inbound - if inbound_users_repo.username_exists_on_inbound(&username, inbound_id).await.unwrap_or(false) { - tracing::error!("Username '{}' already exists on inbound {}", username, inbound_id); + // Check if user already has access to this inbound + if inbound_users_repo.user_has_access_to_inbound(user.id, inbound_id).await.unwrap_or(false) { + tracing::warn!("User '{}' already has access to inbound", user.name); return Err(StatusCode::CONFLICT); } - // Create inbound user DTO + // Create inbound access for user let inbound_user_dto = CreateInboundUserDto { + user_id: user.id, server_inbound_id: inbound_id, - username: username.clone(), level: Some(level), }; - // Create user in database + // Grant access in database match inbound_users_repo.create(inbound_user_dto).await { - Ok(created_user) => { - tracing::info!("Inbound user created: username={} email={} server={} inbound={}", - username, created_user.email, server_id, inbound_id); + Ok(created_access) => { + tracing::info!("Granted user '{}' access to inbound (xray_id={})", + user.name, created_access.xray_user_id); // Send sync event for immediate synchronization crate::services::events::send_sync_event( crate::services::events::SyncEvent::UserAccessChanged(server_id) ); - tracing::info!("User will be synced to xray server immediately via event"); Ok(StatusCode::CREATED) }, Err(e) => { - tracing::error!("Failed to create inbound user: {}", e); + tracing::error!("Failed to grant user '{}' access: {}", user.name, e); Err(StatusCode::INTERNAL_SERVER_ERROR) } } @@ -571,11 +588,11 @@ pub async fn remove_user_from_inbound( // Remove user from xray server match app_state.xray_service.remove_user(server_id, &format!("{}:{}", server.hostname, server.grpc_port), &inbound_tag, &email).await { Ok(_) => { - tracing::info!("Successfully removed user {} from server {} inbound {}", email, server_id, inbound_id); + tracing::info!("Removed user '{}' from inbound", email); Ok(StatusCode::NO_CONTENT) }, Err(e) => { - tracing::error!("Failed to remove user {} from server {} inbound {}: {}", email, server_id, inbound_id, e); + tracing::error!("Failed to remove user '{}' from inbound: {}", email, e); Err(StatusCode::INTERNAL_SERVER_ERROR) } }