init rust. WIP: tls for inbounds

This commit is contained in:
Ultradesu
2025-09-18 02:56:59 +03:00
parent 777af49ebf
commit 8aff8f2fb5
206 changed files with 14301 additions and 21560 deletions

View File

@@ -0,0 +1,41 @@
/// Certificate management service
#[derive(Clone)]
pub struct CertificateService {
// Mock implementation for now
}
#[allow(dead_code)]
impl CertificateService {
pub fn new() -> Self {
Self {}
}
/// Generate self-signed certificate
pub async fn generate_self_signed(&self, domain: &str) -> anyhow::Result<(String, String)> {
tracing::info!("Generating self-signed certificate for domain: {}", domain);
// Mock implementation - would use rcgen to generate actual certificate
let cert_pem = format!("-----BEGIN CERTIFICATE-----\nMOCK CERT FOR {}\n-----END CERTIFICATE-----", domain);
let key_pem = format!("-----BEGIN PRIVATE KEY-----\nMOCK KEY FOR {}\n-----END PRIVATE KEY-----", domain);
Ok((cert_pem, key_pem))
}
/// Renew certificate
pub async fn renew_certificate(&self, domain: &str) -> anyhow::Result<(String, String)> {
tracing::info!("Renewing certificate for domain: {}", domain);
// Mock implementation
let cert_pem = format!("-----BEGIN CERTIFICATE-----\nRENEWED CERT FOR {}\n-----END CERTIFICATE-----", domain);
let key_pem = format!("-----BEGIN PRIVATE KEY-----\nRENEWED KEY FOR {}\n-----END PRIVATE KEY-----", domain);
Ok((cert_pem, key_pem))
}
}
impl Default for CertificateService {
fn default() -> Self {
Self::new()
}
}

30
src/services/events.rs Normal file
View File

@@ -0,0 +1,30 @@
use std::sync::OnceLock;
use tokio::sync::broadcast;
use uuid::Uuid;
#[derive(Clone, Debug)]
pub enum SyncEvent {
InboundChanged(Uuid), // server_id
UserAccessChanged(Uuid), // server_id
}
static EVENT_SENDER: OnceLock<broadcast::Sender<SyncEvent>> = OnceLock::new();
/// Initialize the event bus and return a receiver
pub fn init_event_bus() -> broadcast::Receiver<SyncEvent> {
let (tx, rx) = broadcast::channel(100);
EVENT_SENDER.set(tx).expect("Event bus already initialized");
rx
}
/// Send a sync event (non-blocking)
pub fn send_sync_event(event: SyncEvent) {
if let Some(sender) = EVENT_SENDER.get() {
match sender.send(event.clone()) {
Ok(_) => tracing::info!("Event sent: {:?}", event),
Err(_) => tracing::warn!("No event receivers"),
}
} else {
tracing::error!("Event bus not initialized");
}
}

7
src/services/mod.rs Normal file
View File

@@ -0,0 +1,7 @@
pub mod xray;
pub mod certificates;
pub mod events;
pub mod tasks;
pub use xray::XrayService;
pub use tasks::TaskScheduler;

484
src/services/tasks.rs Normal file
View File

@@ -0,0 +1,484 @@
use anyhow::Result;
use tokio_cron_scheduler::{JobScheduler, Job};
use tracing::{info, error, warn};
use crate::database::DatabaseManager;
use crate::database::repository::{ServerRepository, ServerInboundRepository, InboundTemplateRepository, InboundUsersRepository, CertificateRepository};
use crate::database::entities::inbound_users;
use crate::services::XrayService;
use crate::services::events::SyncEvent;
use sea_orm::{EntityTrait, ColumnTrait, QueryFilter, RelationTrait, JoinType};
use uuid::Uuid;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use chrono::{DateTime, Utc};
use serde::{Serialize, Deserialize};
pub struct TaskScheduler {
scheduler: JobScheduler,
task_status: Arc<RwLock<HashMap<String, TaskStatus>>>,
}
/// Status of a background task
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskStatus {
pub name: String,
pub description: String,
pub schedule: String,
pub status: TaskState,
pub last_run: Option<DateTime<Utc>>,
pub next_run: Option<DateTime<Utc>>,
pub total_runs: u64,
pub success_count: u64,
pub error_count: u64,
pub last_error: Option<String>,
pub last_duration_ms: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TaskState {
Idle,
Running,
Success,
Error,
}
impl TaskScheduler {
pub async fn new() -> Result<Self> {
let scheduler = JobScheduler::new().await?;
let task_status = Arc::new(RwLock::new(HashMap::new()));
Ok(Self { scheduler, task_status })
}
/// Get current status of all tasks
pub fn get_task_status(&self) -> HashMap<String, TaskStatus> {
self.task_status.read().unwrap().clone()
}
/// Start event-driven sync handler
pub async fn start_event_handler(db: DatabaseManager, mut event_receiver: tokio::sync::broadcast::Receiver<SyncEvent>) {
let xray_service = XrayService::new();
tokio::spawn(async move {
info!("Starting event-driven sync handler");
while let Ok(event) = event_receiver.recv().await {
match event {
SyncEvent::InboundChanged(server_id) | SyncEvent::UserAccessChanged(server_id) => {
info!("Received sync event for server {}", server_id);
if let Err(e) = sync_single_server_by_id(&xray_service, &db, server_id).await {
error!("Failed to sync server {} from event: {}", server_id, e);
} else {
info!("Successfully synced server {} from event", server_id);
}
}
}
}
});
}
pub async fn start(&mut self, db: DatabaseManager, xray_service: XrayService) -> Result<()> {
info!("Starting task scheduler with database synchronization");
// Initialize task status
{
let mut status = self.task_status.write().unwrap();
status.insert("xray_sync".to_string(), TaskStatus {
name: "Xray Synchronization".to_string(),
description: "Synchronizes database state with xray servers".to_string(),
schedule: "0 * * * * * (every minute)".to_string(),
status: TaskState::Idle,
last_run: None,
next_run: Some(Utc::now() + chrono::Duration::minutes(1)),
total_runs: 0,
success_count: 0,
error_count: 0,
last_error: None,
last_duration_ms: None,
});
}
// Run initial sync on startup
info!("Running initial xray synchronization on startup");
let start_time = Utc::now();
self.update_task_status("xray_sync", TaskState::Running, None);
match sync_xray_state(db.clone(), xray_service.clone()).await {
Ok(_) => {
let duration = (Utc::now() - start_time).num_milliseconds() as u64;
self.update_task_status("xray_sync", TaskState::Success, Some(duration));
info!("Initial xray sync completed successfully");
},
Err(e) => {
let duration = (Utc::now() - start_time).num_milliseconds() as u64;
self.update_task_status_with_error("xray_sync", e.to_string(), Some(duration));
error!("Initial xray sync failed: {}", e);
}
}
// Add synchronization task that runs every minute
let db_clone = db.clone();
let xray_service_clone = xray_service.clone();
let task_status_clone = self.task_status.clone();
let sync_job = Job::new_async("0 */5 * * * *", move |_uuid, _l| {
let db = db_clone.clone();
let xray_service = xray_service_clone.clone();
let task_status = task_status_clone.clone();
Box::pin(async move {
info!("Running scheduled xray synchronization");
let start_time = Utc::now();
// Update status to running
{
let mut status = task_status.write().unwrap();
if let Some(task) = status.get_mut("xray_sync") {
task.status = TaskState::Running;
task.last_run = Some(start_time);
task.total_runs += 1;
task.next_run = Some(start_time + chrono::Duration::minutes(1));
}
}
match sync_xray_state(db, xray_service).await {
Ok(_) => {
let duration = (Utc::now() - start_time).num_milliseconds() as u64;
let mut status = task_status.write().unwrap();
if let Some(task) = status.get_mut("xray_sync") {
task.status = TaskState::Success;
task.success_count += 1;
task.last_duration_ms = Some(duration);
task.last_error = None;
}
info!("Scheduled xray sync completed successfully in {}ms", duration);
},
Err(e) => {
let duration = (Utc::now() - start_time).num_milliseconds() as u64;
let mut status = task_status.write().unwrap();
if let Some(task) = status.get_mut("xray_sync") {
task.status = TaskState::Error;
task.error_count += 1;
task.last_duration_ms = Some(duration);
task.last_error = Some(e.to_string());
}
error!("Scheduled xray sync failed: {}", e);
}
}
})
})?;
self.scheduler.add(sync_job).await?;
info!("Task scheduler started with sync job running every minute");
self.scheduler.start().await?;
Ok(())
}
fn update_task_status(&self, task_id: &str, state: TaskState, duration_ms: Option<u64>) {
let mut status = self.task_status.write().unwrap();
if let Some(task) = status.get_mut(task_id) {
task.status = state;
task.last_run = Some(Utc::now());
task.total_runs += 1;
task.success_count += 1;
task.last_duration_ms = duration_ms;
task.last_error = None;
}
}
fn update_task_status_with_error(&self, task_id: &str, error: String, duration_ms: Option<u64>) {
let mut status = self.task_status.write().unwrap();
if let Some(task) = status.get_mut(task_id) {
task.status = TaskState::Error;
task.last_run = Some(Utc::now());
task.total_runs += 1;
task.error_count += 1;
task.last_duration_ms = duration_ms;
task.last_error = Some(error);
}
}
pub async fn shutdown(&mut self) -> Result<()> {
info!("Shutting down task scheduler");
self.scheduler.shutdown().await?;
Ok(())
}
}
/// Synchronize xray server state with database state
async fn sync_xray_state(db: DatabaseManager, xray_service: XrayService) -> Result<()> {
info!("Starting xray state synchronization");
let server_repo = ServerRepository::new(db.connection().clone());
let inbound_repo = ServerInboundRepository::new(db.connection().clone());
let template_repo = InboundTemplateRepository::new(db.connection().clone());
// Get all servers from database
let servers = match server_repo.find_all().await {
Ok(servers) => servers,
Err(e) => {
error!("Failed to fetch servers: {}", e);
return Err(e.into());
}
};
info!("Found {} servers to synchronize", servers.len());
for server in servers {
info!("Synchronizing server: {} ({}:{})", server.name, server.hostname, server.grpc_port);
let endpoint = format!("{}:{}", server.hostname, server.grpc_port);
// Test connection first
match xray_service.test_connection(server.id, &endpoint).await {
Ok(true) => {
info!("Connection to server {} successful", server.name);
},
Ok(false) => {
warn!("Cannot connect to server {} at {}, skipping", server.name, endpoint);
continue;
},
Err(e) => {
error!("Error testing connection to server {}: {}", server.name, e);
continue;
}
}
// Get desired inbounds from database
let desired_inbounds = match get_desired_inbounds_from_db(&db, &server, &inbound_repo, &template_repo).await {
Ok(inbounds) => inbounds,
Err(e) => {
error!("Failed to get desired inbounds for server {}: {}", server.name, e);
continue;
}
};
info!("Server {}: desired={} inbounds", server.name, desired_inbounds.len());
// Synchronize inbounds
if let Err(e) = sync_server_inbounds(
&xray_service,
server.id,
&endpoint,
&desired_inbounds
).await {
error!("Failed to sync inbounds for server {}: {}", server.name, e);
} else {
info!("Successfully synchronized server {}", server.name);
}
}
info!("Xray state synchronization completed");
Ok(())
}
/// Get desired inbounds configuration from database
async fn get_desired_inbounds_from_db(
db: &DatabaseManager,
server: &crate::database::entities::server::Model,
inbound_repo: &ServerInboundRepository,
template_repo: &InboundTemplateRepository,
) -> Result<HashMap<String, DesiredInbound>> {
info!("Getting desired inbounds for server {} from database", server.name);
// Get all inbounds for this server
let inbounds = inbound_repo.find_by_server_id(server.id).await?;
let mut desired_inbounds = HashMap::new();
for inbound in inbounds {
// Get template for this inbound
let template = match template_repo.find_by_id(inbound.template_id).await? {
Some(template) => template,
None => {
warn!("Template {} not found for inbound {}, skipping", inbound.template_id, inbound.tag);
continue;
}
};
// Get users for this inbound
let users = get_users_for_inbound(db, inbound.id).await?;
info!("Inbound {}: {} users found", inbound.tag, users.len());
// Get port from template or override
let port = inbound.port_override.unwrap_or(template.default_port);
// Get certificate if specified
let (cert_pem, key_pem) = if let Some(_cert_id) = inbound.certificate_id {
match load_certificate_from_db(db, inbound.certificate_id).await {
Ok((cert, key)) => (cert, key),
Err(e) => {
warn!("Failed to load certificate for inbound {}: {}", inbound.tag, e);
(None, None)
}
}
} else {
(None, None)
};
let desired_inbound = DesiredInbound {
tag: inbound.tag.clone(),
port,
protocol: template.protocol.clone(),
settings: template.base_settings.clone(),
stream_settings: template.stream_settings.clone(),
users,
cert_pem,
key_pem,
};
desired_inbounds.insert(inbound.tag.clone(), desired_inbound);
}
info!("Found {} desired inbounds for server {}", desired_inbounds.len(), server.name);
Ok(desired_inbounds)
}
/// Get users for specific inbound from database
async fn get_users_for_inbound(db: &DatabaseManager, inbound_id: Uuid) -> Result<Vec<XrayUser>> {
let inbound_users_repo = InboundUsersRepository::new(db.connection().clone());
let inbound_users = inbound_users_repo.find_active_by_inbound_id(inbound_id).await?;
let users: Vec<XrayUser> = inbound_users.into_iter().map(|user| {
XrayUser {
id: user.xray_user_id,
email: user.email,
level: user.level,
}
}).collect();
Ok(users)
}
/// Load certificate from database
async fn load_certificate_from_db(db: &DatabaseManager, cert_id: Option<Uuid>) -> Result<(Option<String>, Option<String>)> {
let cert_id = match cert_id {
Some(id) => id,
None => return Ok((None, None)),
};
let cert_repo = CertificateRepository::new(db.connection().clone());
match cert_repo.find_by_id(cert_id).await? {
Some(cert) => {
info!("Loaded certificate: {}", cert.domain);
Ok((Some(cert.certificate_pem()), Some(cert.private_key_pem())))
},
None => {
warn!("Certificate {} not found", cert_id);
Ok((None, None))
}
}
}
/// Synchronize inbounds for a single server
async fn sync_server_inbounds(
xray_service: &XrayService,
server_id: Uuid,
endpoint: &str,
desired_inbounds: &HashMap<String, DesiredInbound>,
) -> Result<()> {
// Create or update inbounds
// Since xray has no API to list inbounds, we always recreate them
for (tag, desired) in desired_inbounds {
info!("Creating/updating inbound: {} with {} users", tag, desired.users.len());
// Always try to remove inbound first (ignore errors if it doesn't exist)
if let Err(e) = xray_service.remove_inbound(server_id, endpoint, tag).await {
// Log but don't fail - inbound might not exist
info!("Inbound {} removal result: {} (this is normal if inbound didn't exist)", tag, e);
}
// Create inbound with users
let users_json: Vec<Value> = desired.users.iter().map(|user| {
serde_json::json!({
"id": user.id,
"email": user.email,
"level": user.level
})
}).collect();
match xray_service.create_inbound_with_users(
server_id,
endpoint,
&desired.tag,
desired.port,
&desired.protocol,
desired.settings.clone(),
desired.stream_settings.clone(),
&users_json,
desired.cert_pem.as_deref(),
desired.key_pem.as_deref(),
).await {
Ok(_) => {
info!("Successfully created inbound {} with {} users", tag, desired.users.len());
},
Err(e) => {
error!("Failed to create inbound {}: {}", tag, e);
}
}
}
Ok(())
}
/// Sync a single server by ID (for event-driven sync)
async fn sync_single_server_by_id(
xray_service: &XrayService,
db: &DatabaseManager,
server_id: Uuid,
) -> Result<()> {
let server_repo = ServerRepository::new(db.connection().clone());
let inbound_repo = ServerInboundRepository::new(db.connection().clone());
let template_repo = InboundTemplateRepository::new(db.connection().clone());
// Get server
let server = match server_repo.find_by_id(server_id).await? {
Some(server) => server,
None => {
warn!("Server {} not found for sync", server_id);
return Ok(());
}
};
// For now, sync all servers (can add active/inactive flag later)
// Get desired inbounds from database
let desired_inbounds = get_desired_inbounds_from_db(db, &server, &inbound_repo, &template_repo).await?;
// Build endpoint
let endpoint = format!("{}:{}", server.hostname, server.grpc_port);
// Sync server
sync_server_inbounds(xray_service, server_id, &endpoint, &desired_inbounds).await?;
Ok(())
}
/// Represents desired inbound configuration from database
#[derive(Debug, Clone)]
struct DesiredInbound {
tag: String,
port: i32,
protocol: String,
settings: Value,
stream_settings: Value,
users: Vec<XrayUser>,
cert_pem: Option<String>,
key_pem: Option<String>,
}
/// Represents xray user configuration
#[derive(Debug, Clone)]
struct XrayUser {
id: String,
email: String,
level: i32,
}

View File

@@ -0,0 +1,91 @@
use anyhow::{Result, anyhow};
use serde_json::Value;
use xray_core::Client;
// Import submodules from the same directory
use super::stats::StatsClient;
use super::inbounds::InboundClient;
use super::users::UserClient;
/// Xray gRPC client wrapper
pub struct XrayClient {
endpoint: String,
client: Client,
}
#[allow(dead_code)]
impl XrayClient {
/// Connect to Xray gRPC server
pub async fn connect(endpoint: &str) -> Result<Self> {
tracing::info!("Connecting to Xray at {}", endpoint);
let client = Client::from_url(endpoint).await
.map_err(|e| anyhow!("Failed to connect to Xray at {}: {}", endpoint, e))?;
// Don't clone - we'll use &self.client when calling methods
Ok(Self {
endpoint: endpoint.to_string(),
client,
})
}
/// Get server statistics
pub async fn get_stats(&self) -> Result<Value> {
let stats_client = StatsClient::new(self.endpoint.clone(), &self.client);
stats_client.get_stats().await
}
/// Query specific statistics with pattern
pub async fn query_stats(&self, pattern: &str, reset: bool) -> Result<Value> {
let stats_client = StatsClient::new(self.endpoint.clone(), &self.client);
stats_client.query_stats(pattern, reset).await
}
/// Restart Xray with new configuration
pub async fn restart_with_config(&self, config: &crate::services::xray::XrayConfig) -> Result<()> {
let inbound_client = InboundClient::new(self.endpoint.clone(), &self.client);
inbound_client.restart_with_config(config).await
}
/// Add inbound configuration
pub async fn add_inbound(&self, inbound: &Value) -> Result<()> {
let inbound_client = InboundClient::new(self.endpoint.clone(), &self.client);
inbound_client.add_inbound(inbound).await
}
/// Add inbound configuration with TLS certificate
pub async fn add_inbound_with_certificate(&self, inbound: &Value, cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> {
let inbound_client = InboundClient::new(self.endpoint.clone(), &self.client);
inbound_client.add_inbound_with_certificate(inbound, None, cert_pem, key_pem).await
}
/// Add inbound configuration with users and TLS certificate
pub async fn add_inbound_with_users_and_certificate(&self, inbound: &Value, users: &[Value], cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> {
let inbound_client = InboundClient::new(self.endpoint.clone(), &self.client);
inbound_client.add_inbound_with_certificate(inbound, Some(users), cert_pem, key_pem).await
}
/// Remove inbound by tag
pub async fn remove_inbound(&self, tag: &str) -> Result<()> {
let inbound_client = InboundClient::new(self.endpoint.clone(), &self.client);
inbound_client.remove_inbound(tag).await
}
/// Add user to inbound
pub async fn add_user(&self, inbound_tag: &str, user: &Value) -> Result<()> {
let user_client = UserClient::new(self.endpoint.clone(), &self.client);
user_client.add_user(inbound_tag, user).await
}
/// Remove user from inbound
pub async fn remove_user(&self, inbound_tag: &str, email: &str) -> Result<()> {
let user_client = UserClient::new(self.endpoint.clone(), &self.client);
user_client.remove_user(inbound_tag, email).await
}
/// Get connection endpoint
pub fn endpoint(&self) -> &str {
&self.endpoint
}
}

285
src/services/xray/config.rs Normal file
View File

@@ -0,0 +1,285 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
/// Xray configuration structure
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct XrayConfig {
pub log: LogConfig,
pub api: ApiConfig,
pub dns: Option<DnsConfig>,
pub routing: Option<RoutingConfig>,
pub policy: Option<PolicyConfig>,
pub inbounds: Vec<InboundConfig>,
pub outbounds: Vec<OutboundConfig>,
pub transport: Option<TransportConfig>,
pub stats: Option<StatsConfig>,
pub reverse: Option<ReverseConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogConfig {
pub access: Option<String>,
pub error: Option<String>,
#[serde(rename = "loglevel")]
pub log_level: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApiConfig {
pub tag: String,
pub listen: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DnsConfig {
pub servers: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoutingConfig {
#[serde(rename = "domainStrategy")]
pub domain_strategy: Option<String>,
pub rules: Vec<RoutingRule>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoutingRule {
#[serde(rename = "type")]
pub rule_type: String,
pub domain: Option<Vec<String>>,
pub ip: Option<Vec<String>>,
pub port: Option<String>,
#[serde(rename = "outboundTag")]
pub outbound_tag: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PolicyConfig {
pub levels: HashMap<String, PolicyLevel>,
pub system: Option<SystemPolicy>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PolicyLevel {
#[serde(rename = "handshakeTimeout")]
pub handshake_timeout: Option<u32>,
#[serde(rename = "connIdle")]
pub conn_idle: Option<u32>,
#[serde(rename = "uplinkOnly")]
pub uplink_only: Option<u32>,
#[serde(rename = "downlinkOnly")]
pub downlink_only: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemPolicy {
#[serde(rename = "statsInboundUplink")]
pub stats_inbound_uplink: Option<bool>,
#[serde(rename = "statsInboundDownlink")]
pub stats_inbound_downlink: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InboundConfig {
pub tag: String,
pub port: u16,
pub listen: Option<String>,
pub protocol: String,
pub settings: Value,
#[serde(rename = "streamSettings")]
pub stream_settings: Option<Value>,
pub sniffing: Option<SniffingConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OutboundConfig {
pub tag: String,
pub protocol: String,
pub settings: Value,
#[serde(rename = "streamSettings")]
pub stream_settings: Option<Value>,
pub mux: Option<MuxConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SniffingConfig {
pub enabled: bool,
#[serde(rename = "destOverride")]
pub dest_override: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MuxConfig {
pub enabled: bool,
pub concurrency: Option<i32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransportConfig {
#[serde(rename = "tcpSettings")]
pub tcp_settings: Option<Value>,
#[serde(rename = "kcpSettings")]
pub kcp_settings: Option<Value>,
#[serde(rename = "wsSettings")]
pub ws_settings: Option<Value>,
#[serde(rename = "httpSettings")]
pub http_settings: Option<Value>,
#[serde(rename = "dsSettings")]
pub ds_settings: Option<Value>,
#[serde(rename = "quicSettings")]
pub quic_settings: Option<Value>,
#[serde(rename = "grpcSettings")]
pub grpc_settings: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatsConfig {}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReverseConfig {
pub bridges: Option<Vec<BridgeConfig>>,
pub portals: Option<Vec<PortalConfig>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BridgeConfig {
pub tag: String,
pub domain: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PortalConfig {
pub tag: String,
pub domain: String,
}
#[allow(dead_code)]
impl XrayConfig {
/// Create a new basic Xray configuration
pub fn new() -> Self {
Self {
log: LogConfig {
access: Some("/var/log/xray/access.log".to_string()),
error: Some("/var/log/xray/error.log".to_string()),
log_level: "warning".to_string(),
},
api: ApiConfig {
tag: "api".to_string(),
listen: "127.0.0.1:2053".to_string(),
},
dns: None,
routing: Some(RoutingConfig {
domain_strategy: Some("IPIfNonMatch".to_string()),
rules: vec![
RoutingRule {
rule_type: "field".to_string(),
domain: None,
ip: Some(vec!["geoip:private".to_string()]),
port: None,
outbound_tag: "direct".to_string(),
}
],
}),
policy: Some(PolicyConfig {
levels: {
let mut levels = HashMap::new();
levels.insert("0".to_string(), PolicyLevel {
handshake_timeout: Some(4),
conn_idle: Some(300),
uplink_only: Some(2),
downlink_only: Some(5),
});
levels
},
system: Some(SystemPolicy {
stats_inbound_uplink: Some(true),
stats_inbound_downlink: Some(true),
}),
}),
inbounds: vec![],
outbounds: vec![
OutboundConfig {
tag: "direct".to_string(),
protocol: "freedom".to_string(),
settings: serde_json::json!({}),
stream_settings: None,
mux: None,
},
OutboundConfig {
tag: "blocked".to_string(),
protocol: "blackhole".to_string(),
settings: serde_json::json!({
"response": {
"type": "http"
}
}),
stream_settings: None,
mux: None,
},
],
transport: None,
stats: Some(StatsConfig {}),
reverse: None,
}
}
/// Add inbound to configuration
pub fn add_inbound(&mut self, inbound: InboundConfig) {
self.inbounds.push(inbound);
}
/// Remove inbound by tag
pub fn remove_inbound(&mut self, tag: &str) -> bool {
let initial_len = self.inbounds.len();
self.inbounds.retain(|inbound| inbound.tag != tag);
self.inbounds.len() != initial_len
}
/// Find inbound by tag
pub fn find_inbound(&self, tag: &str) -> Option<&InboundConfig> {
self.inbounds.iter().find(|inbound| inbound.tag == tag)
}
/// Find inbound by tag (mutable)
pub fn find_inbound_mut(&mut self, tag: &str) -> Option<&mut InboundConfig> {
self.inbounds.iter_mut().find(|inbound| inbound.tag == tag)
}
/// Convert to JSON Value
pub fn to_json(&self) -> Value {
serde_json::to_value(self).unwrap_or(Value::Null)
}
/// Create from JSON Value
pub fn from_json(value: &Value) -> Result<Self, serde_json::Error> {
serde_json::from_value(value.clone())
}
/// Validate configuration
pub fn validate(&self) -> Result<(), String> {
// Check for duplicate inbound tags
let mut tags = std::collections::HashSet::new();
for inbound in &self.inbounds {
if !tags.insert(&inbound.tag) {
return Err(format!("Duplicate inbound tag: {}", inbound.tag));
}
}
// Check for duplicate outbound tags
tags.clear();
for outbound in &self.outbounds {
if !tags.insert(&outbound.tag) {
return Err(format!("Duplicate outbound tag: {}", outbound.tag));
}
}
Ok(())
}
}
impl Default for XrayConfig {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,325 @@
use anyhow::{Result, anyhow};
use serde_json::Value;
use xray_core::{
tonic::Request,
app::proxyman::command::{AddInboundRequest, RemoveInboundRequest},
core::InboundHandlerConfig,
common::serial::TypedMessage,
common::protocol::User,
app::proxyman::ReceiverConfig,
common::net::{PortList, PortRange},
transport::internet::StreamConfig,
transport::internet::tls::{Config as TlsConfig, Certificate as TlsCertificate},
proxy::vless::inbound::Config as VlessInboundConfig,
proxy::vless::Account as VlessAccount,
proxy::vmess::inbound::Config as VmessInboundConfig,
proxy::vmess::Account as VmessAccount,
proxy::trojan::ServerConfig as TrojanServerConfig,
proxy::trojan::Account as TrojanAccount,
proxy::shadowsocks::ServerConfig as ShadowsocksServerConfig,
proxy::shadowsocks::Account as ShadowsocksAccount,
Client,
prost_types,
};
use prost::Message;
pub struct InboundClient<'a> {
endpoint: String,
client: &'a Client,
}
impl<'a> InboundClient<'a> {
pub fn new(endpoint: String, client: &'a Client) -> Self {
Self { endpoint, client }
}
/// Add inbound configuration
pub async fn add_inbound(&self, inbound: &Value) -> Result<()> {
self.add_inbound_with_certificate(inbound, None, None, None).await
}
/// Add inbound configuration with TLS certificate and users
pub async fn add_inbound_with_certificate(&self, inbound: &Value, users: Option<&[Value]>, cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> {
tracing::info!("Adding inbound to Xray server at {}", self.endpoint);
tracing::debug!("Inbound config: {}", serde_json::to_string_pretty(inbound)?);
let tag = inbound["tag"].as_str().unwrap_or("").to_string();
let port = inbound["port"].as_u64().unwrap_or(8080) as u32;
let protocol = inbound["protocol"].as_str().unwrap_or("vless");
tracing::debug!("Creating inbound: tag={}, port={}, protocol={}", tag, port, protocol);
// Create receiver configuration (port binding) - use simple port number
let port_list = PortList {
range: vec![PortRange {
from: port,
to: port,
}],
};
// Create stream settings with TLS if certificates are provided
let stream_settings = if cert_pem.is_some() && key_pem.is_some() {
let cert_pem = cert_pem.unwrap();
let key_pem = key_pem.unwrap();
tracing::info!("Creating TLS stream settings for inbound");
tracing::debug!("Certificate length: {}, Key length: {}", cert_pem.len(), key_pem.len());
// Create TLS certificate with OneTimeLoading = true
// Convert PEM strings to byte vectors (certificate should be raw bytes, not PEM string)
let tls_cert = TlsCertificate {
certificate: cert_pem.as_bytes().to_vec(), // PEM as bytes
key: key_pem.as_bytes().to_vec(), // PEM key as bytes
usage: 0, // Default usage
ocsp_stapling: 0, // Default OCSP
one_time_loading: true, // OneTimeLoading = true as in example
build_chain: false,
certificate_path: "".to_string(),
key_path: "".to_string(),
};
// Create TLS config using Default and set only necessary fields
let mut tls_config = TlsConfig::default();
tls_config.certificate = vec![tls_cert];
// Create TLS security settings using prost_types::Any instead of TypedMessage
let tls_any = prost_types::Any::from_msg(&tls_config)
.map_err(|e| anyhow!("Failed to serialize TLS config: {}", e))?;
let tls_message = TypedMessage {
r#type: tls_any.type_url,
value: tls_any.value,
};
// Create stream config with TLS security settings
Some(StreamConfig {
address: None,
port: port,
protocol_name: "tcp".to_string(),
transport_settings: vec![],
security_type: "tls".to_string(),
security_settings: vec![tls_message],
socket_settings: None,
})
} else {
tracing::info!("No certificates provided, creating inbound without TLS");
None
};
let receiver_config = ReceiverConfig {
port_list: Some(port_list),
listen: None,
allocation_strategy: None,
stream_settings: stream_settings,
receive_original_destination: false,
sniffing_settings: None,
};
let receiver_message = TypedMessage {
r#type: "xray.app.proxyman.ReceiverConfig".to_string(),
value: receiver_config.encode_to_vec(),
};
// Create proxy configuration based on protocol with users
let proxy_message = match protocol {
"vless" => {
let mut clients = vec![];
if let Some(users) = users {
for user in users {
let user_id = user["id"].as_str().unwrap_or("").to_string();
let email = user["email"].as_str().unwrap_or("").to_string();
let level = user["level"].as_u64().unwrap_or(0) as u32;
if !user_id.is_empty() && !email.is_empty() {
let account = VlessAccount {
id: user_id,
encryption: "none".to_string(),
flow: "".to_string(),
};
clients.push(User {
email,
level,
account: Some(TypedMessage {
r#type: "xray.proxy.vless.Account".to_string(),
value: account.encode_to_vec(),
}),
});
}
}
}
let vless_config = VlessInboundConfig {
clients,
decryption: "none".to_string(),
fallbacks: vec![],
};
TypedMessage {
r#type: "xray.proxy.vless.inbound.Config".to_string(),
value: vless_config.encode_to_vec(),
}
},
"vmess" => {
let mut vmess_users = vec![];
if let Some(users) = users {
for user in users {
let user_id = user["id"].as_str().unwrap_or("").to_string();
let email = user["email"].as_str().unwrap_or("").to_string();
let level = user["level"].as_u64().unwrap_or(0) as u32;
if !user_id.is_empty() && !email.is_empty() {
let account = VmessAccount {
id: user_id,
security_settings: None,
tests_enabled: "".to_string(),
};
vmess_users.push(User {
email,
level,
account: Some(TypedMessage {
r#type: "xray.proxy.vmess.Account".to_string(),
value: account.encode_to_vec(),
}),
});
}
}
}
let vmess_config = VmessInboundConfig {
user: vmess_users,
default: None,
detour: None,
};
TypedMessage {
r#type: "xray.proxy.vmess.inbound.Config".to_string(),
value: vmess_config.encode_to_vec(),
}
},
"trojan" => {
let mut trojan_users = vec![];
if let Some(users) = users {
for user in users {
let password = user["password"].as_str().or_else(|| user["id"].as_str()).unwrap_or("").to_string();
let email = user["email"].as_str().unwrap_or("").to_string();
let level = user["level"].as_u64().unwrap_or(0) as u32;
if !password.is_empty() && !email.is_empty() {
let account = TrojanAccount {
password,
};
trojan_users.push(User {
email,
level,
account: Some(TypedMessage {
r#type: "xray.proxy.trojan.Account".to_string(),
value: account.encode_to_vec(),
}),
});
}
}
}
let trojan_config = TrojanServerConfig {
users: trojan_users,
fallbacks: vec![],
};
TypedMessage {
r#type: "xray.proxy.trojan.ServerConfig".to_string(),
value: trojan_config.encode_to_vec(),
}
},
"shadowsocks" => {
let mut ss_users = vec![];
if let Some(users) = users {
for user in users {
let password = user["password"].as_str().or_else(|| user["id"].as_str()).unwrap_or("").to_string();
let email = user["email"].as_str().unwrap_or("").to_string();
let level = user["level"].as_u64().unwrap_or(0) as u32;
if !password.is_empty() && !email.is_empty() {
let account = ShadowsocksAccount {
password,
cipher_type: 0, // Default cipher
iv_check: false, // Default IV check
};
ss_users.push(User {
email,
level,
account: Some(TypedMessage {
r#type: "xray.proxy.shadowsocks.Account".to_string(),
value: account.encode_to_vec(),
}),
});
}
}
}
let shadowsocks_config = ShadowsocksServerConfig {
users: ss_users,
network: vec![], // Support all networks by default
};
TypedMessage {
r#type: "xray.proxy.shadowsocks.ServerConfig".to_string(),
value: shadowsocks_config.encode_to_vec(),
}
},
_ => {
return Err(anyhow!("Unsupported protocol: {}", protocol));
}
};
let inbound_config = InboundHandlerConfig {
tag: tag.clone(),
receiver_settings: Some(receiver_message),
proxy_settings: Some(proxy_message),
};
let request = Request::new(AddInboundRequest {
inbound: Some(inbound_config),
});
tracing::info!("Sending AddInboundRequest for '{}'", tag);
let mut handler_client = self.client.handler();
match handler_client.add_inbound(request).await {
Ok(response) => {
let _response_inner = response.into_inner();
tracing::info!("Successfully added inbound {}", tag);
Ok(())
}
Err(e) => {
tracing::error!("Failed to add inbound {}: {}", tag, e);
Err(anyhow!("Failed to add inbound {}: {}", tag, e))
}
}
}
/// Remove inbound by tag
pub async fn remove_inbound(&self, tag: &str) -> Result<()> {
tracing::info!("Removing inbound '{}' from Xray server at {}", tag, self.endpoint);
let mut handler_client = self.client.handler();
let request = Request::new(RemoveInboundRequest {
tag: tag.to_string(),
});
match handler_client.remove_inbound(request).await {
Ok(_) => {
tracing::info!("Successfully removed inbound");
Ok(())
},
Err(e) => {
tracing::error!("Failed to remove inbound: {}", e);
Err(anyhow!("Failed to remove inbound: {}", e))
}
}
}
/// Restart Xray with new configuration
pub async fn restart_with_config(&self, config: &crate::services::xray::XrayConfig) -> Result<()> {
tracing::info!("Restarting Xray server at {} with new config", self.endpoint);
tracing::debug!("Config: {}", serde_json::to_string_pretty(&config.to_json())?);
// TODO: Implement restart with config using xray-core
// For now just return success
Ok(())
}
}

213
src/services/xray/mod.rs Normal file
View File

@@ -0,0 +1,213 @@
use anyhow::Result;
use serde_json::Value;
use uuid::Uuid;
pub mod client;
pub mod config;
pub mod stats;
pub mod inbounds;
pub mod users;
pub use client::XrayClient;
pub use config::XrayConfig;
/// Service for managing Xray servers via gRPC
#[derive(Clone)]
pub struct XrayService {}
#[allow(dead_code)]
impl XrayService {
pub fn new() -> Self {
Self {}
}
/// Create a client for the specified server
async fn create_client(&self, endpoint: &str) -> Result<XrayClient> {
XrayClient::connect(endpoint).await
}
/// Test connection to Xray server
pub async fn test_connection(&self, _server_id: Uuid, endpoint: &str) -> Result<bool> {
match self.create_client(endpoint).await {
Ok(_client) => {
// Instead of getting stats (which might fail), just test connection
// If we successfully created the client, connection is working
Ok(true)
},
Err(_) => Ok(false),
}
}
/// Apply full configuration to Xray server
pub async fn apply_config(&self, _server_id: Uuid, endpoint: &str, config: &XrayConfig) -> Result<()> {
let client = self.create_client(endpoint).await?;
client.restart_with_config(config).await
}
/// Create inbound from template
pub async fn create_inbound(
&self,
_server_id: Uuid,
endpoint: &str,
tag: &str,
port: i32,
protocol: &str,
base_settings: Value,
stream_settings: Value,
) -> Result<()> {
// Build inbound configuration from template
let inbound_config = serde_json::json!({
"tag": tag,
"port": port,
"protocol": protocol,
"settings": base_settings,
"streamSettings": stream_settings
});
tracing::info!("Creating inbound with config: {}", inbound_config);
self.add_inbound(_server_id, endpoint, &inbound_config).await
}
/// Create inbound from template with TLS certificate
pub async fn create_inbound_with_certificate(
&self,
_server_id: Uuid,
endpoint: &str,
tag: &str,
port: i32,
protocol: &str,
base_settings: Value,
stream_settings: Value,
cert_pem: Option<&str>,
key_pem: Option<&str>,
) -> Result<()> {
// Build inbound configuration from template
let inbound_config = serde_json::json!({
"tag": tag,
"port": port,
"protocol": protocol,
"settings": base_settings,
"streamSettings": stream_settings
});
tracing::info!("Creating inbound with TLS certificate and config: {}", inbound_config);
self.add_inbound_with_certificate(_server_id, endpoint, &inbound_config, cert_pem, key_pem).await
}
/// Add inbound to running Xray instance
pub async fn add_inbound(&self, _server_id: Uuid, endpoint: &str, inbound: &Value) -> Result<()> {
let client = self.create_client(endpoint).await?;
client.add_inbound(inbound).await
}
/// Add inbound with certificate to running Xray instance
pub async fn add_inbound_with_certificate(&self, _server_id: Uuid, endpoint: &str, inbound: &Value, cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> {
let client = self.create_client(endpoint).await?;
client.add_inbound_with_certificate(inbound, cert_pem, key_pem).await
}
/// Add inbound with users and certificate to running Xray instance
pub async fn add_inbound_with_users_and_certificate(&self, _server_id: Uuid, endpoint: &str, inbound: &Value, users: &[Value], cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> {
let client = self.create_client(endpoint).await?;
client.add_inbound_with_users_and_certificate(inbound, users, cert_pem, key_pem).await
}
/// Remove inbound from running Xray instance
pub async fn remove_inbound(&self, _server_id: Uuid, endpoint: &str, tag: &str) -> Result<()> {
let client = self.create_client(endpoint).await?;
client.remove_inbound(tag).await
}
/// Add user to inbound by recreating the inbound with updated user list
pub async fn add_user(&self, _server_id: Uuid, endpoint: &str, inbound_tag: &str, user: &Value) -> Result<()> {
tracing::info!("XrayService::add_user called for server {} endpoint {} inbound_tag {}", _server_id, endpoint, inbound_tag);
tracing::warn!("Dynamic user addition via AlterInboundRequest doesn't work reliably - need to implement inbound recreation");
// TODO: Implement inbound recreation approach:
// 1. Get current inbound configuration from database
// 2. Get existing users from database
// 3. Remove old inbound from xray
// 4. Create new inbound with all users (existing + new)
// For now, return error to indicate this needs to be implemented
Err(anyhow::anyhow!("User addition requires inbound recreation - not yet implemented. Use web interface to recreate inbound with users."))
}
/// Create inbound with users list (for inbound recreation approach)
pub async fn create_inbound_with_users(
&self,
_server_id: Uuid,
endpoint: &str,
tag: &str,
port: i32,
protocol: &str,
base_settings: Value,
stream_settings: Value,
users: &[Value],
cert_pem: Option<&str>,
key_pem: Option<&str>,
) -> Result<()> {
tracing::info!("Creating inbound '{}' with {} users", tag, users.len());
// Build inbound configuration with users
let mut inbound_config = serde_json::json!({
"tag": tag,
"port": port,
"protocol": protocol,
"settings": base_settings,
"streamSettings": stream_settings
});
// Add users to settings based on protocol
if !users.is_empty() {
let mut settings = inbound_config["settings"].clone();
match protocol {
"vless" | "vmess" => {
settings["clients"] = serde_json::Value::Array(users.to_vec());
},
"trojan" => {
settings["clients"] = serde_json::Value::Array(users.to_vec());
},
"shadowsocks" => {
// For shadowsocks, users are handled differently
if let Some(user) = users.first() {
settings["password"] = user["password"].clone();
}
},
_ => {
return Err(anyhow::anyhow!("Unsupported protocol for users: {}", protocol));
}
}
inbound_config["settings"] = settings;
}
tracing::info!("Creating inbound with users: {}", serde_json::to_string_pretty(&inbound_config)?);
// Use the new method with users support
self.add_inbound_with_users_and_certificate(_server_id, endpoint, &inbound_config, users, cert_pem, key_pem).await
}
/// Remove user from inbound
pub async fn remove_user(&self, _server_id: Uuid, endpoint: &str, inbound_tag: &str, email: &str) -> Result<()> {
let client = self.create_client(endpoint).await?;
client.remove_user(inbound_tag, email).await
}
/// Get server statistics
pub async fn get_stats(&self, _server_id: Uuid, endpoint: &str) -> Result<Value> {
let client = self.create_client(endpoint).await?;
client.get_stats().await
}
/// Query specific statistics
pub async fn query_stats(&self, _server_id: Uuid, endpoint: &str, pattern: &str, reset: bool) -> Result<Value> {
let client = self.create_client(endpoint).await?;
client.query_stats(pattern, reset).await
}
}
impl Default for XrayService {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,70 @@
use anyhow::{Result, anyhow};
use serde_json::Value;
use xray_core::{
tonic::Request,
app::stats::command::{GetStatsRequest, QueryStatsRequest},
Client,
};
pub struct StatsClient<'a> {
endpoint: String,
client: &'a Client,
}
impl<'a> StatsClient<'a> {
pub fn new(endpoint: String, client: &'a Client) -> Self {
Self { endpoint, client }
}
/// Get server statistics
pub async fn get_stats(&self) -> Result<Value> {
tracing::info!("Getting stats from Xray server at {}", self.endpoint);
let request = Request::new(GetStatsRequest {
name: "".to_string(),
reset: false,
});
let mut stats_client = self.client.stats();
match stats_client.get_stats(request).await {
Ok(response) => {
let stats = response.into_inner();
tracing::debug!("Stats: {:?}", stats);
let stats_json = serde_json::json!({
"stats": format!("{:?}", stats.stat)
});
Ok(stats_json)
}
Err(e) => {
tracing::error!("Failed to get stats: {}", e);
Err(anyhow!("Failed to get stats: {}", e))
}
}
}
/// Query specific statistics with pattern
pub async fn query_stats(&self, pattern: &str, reset: bool) -> Result<Value> {
tracing::info!("Querying stats with pattern '{}', reset: {} from {}", pattern, reset, self.endpoint);
let request = Request::new(QueryStatsRequest {
pattern: pattern.to_string(),
reset,
});
let mut stats_client = self.client.stats();
match stats_client.query_stats(request).await {
Ok(response) => {
let stats = response.into_inner();
tracing::debug!("Query stats: {:?}", stats);
let stats_json = serde_json::json!({
"stat": format!("{:?}", stats.stat)
});
Ok(stats_json)
}
Err(e) => {
tracing::error!("Failed to query stats: {}", e);
Err(anyhow!("Failed to query stats: {}", e))
}
}
}
}

150
src/services/xray/users.rs Normal file
View File

@@ -0,0 +1,150 @@
use anyhow::{Result, anyhow};
use serde_json::Value;
use xray_core::{
tonic::Request,
app::proxyman::command::{AlterInboundRequest, AddUserOperation, RemoveUserOperation},
common::serial::TypedMessage,
common::protocol::User,
proxy::vless::Account as VlessAccount,
proxy::vmess::Account as VmessAccount,
proxy::trojan::Account as TrojanAccount,
Client,
};
use prost::Message;
pub struct UserClient<'a> {
endpoint: String,
client: &'a Client,
}
impl<'a> UserClient<'a> {
pub fn new(endpoint: String, client: &'a Client) -> Self {
Self { endpoint, client }
}
/// Add user to inbound (simple version that works)
pub async fn add_user(&self, inbound_tag: &str, user: &Value) -> Result<()> {
tracing::info!("Adding user to inbound '{}' on Xray server at {}", inbound_tag, self.endpoint);
tracing::debug!("User config: {}", serde_json::to_string_pretty(user)?);
let email = user["email"].as_str().unwrap_or("").to_string();
let user_id = user["id"].as_str().unwrap_or("").to_string();
let level = user["level"].as_u64().unwrap_or(0) as u32;
let protocol = user["protocol"].as_str().unwrap_or("vless");
tracing::info!("Parsed user data: email={}, id={}, level={}, protocol={}", email, user_id, level, protocol);
if email.is_empty() || user_id.is_empty() {
return Err(anyhow!("User email and id are required"));
}
// Create user account based on protocol
let account_message = match protocol {
"vless" => {
let account = VlessAccount {
id: user_id.clone(),
encryption: "none".to_string(),
flow: "".to_string(), // Empty flow for basic VLESS
};
TypedMessage {
r#type: "xray.proxy.vless.Account".to_string(),
value: account.encode_to_vec(),
}
},
"vmess" => {
let account = VmessAccount {
id: user_id,
security_settings: None,
tests_enabled: "".to_string(),
};
TypedMessage {
r#type: "xray.proxy.vmess.Account".to_string(),
value: account.encode_to_vec(),
}
},
"trojan" => {
let account = TrojanAccount {
password: user_id, // For trojan, use password instead of UUID
};
TypedMessage {
r#type: "xray.proxy.trojan.Account".to_string(),
value: account.encode_to_vec(),
}
},
_ => {
return Err(anyhow!("Unsupported protocol for user: {}", protocol));
}
};
// Create user protobuf message
let user_proto = User {
level: level,
email: email.clone(),
account: Some(account_message),
};
// Build the AddUserOperation
let add_user_op = AddUserOperation {
user: Some(user_proto),
};
let typed_message = TypedMessage {
r#type: "xray.app.proxyman.command.AddUserOperation".to_string(),
value: add_user_op.encode_to_vec(),
};
// Build the AlterInboundRequest
let request = Request::new(AlterInboundRequest {
tag: inbound_tag.to_string(),
operation: Some(typed_message),
});
tracing::info!("Sending AlterInboundRequest to add user '{}' to inbound '{}'", email, inbound_tag);
let mut handler_client = self.client.handler();
match handler_client.alter_inbound(request).await {
Ok(response) => {
let _response_inner = response.into_inner();
tracing::info!("Successfully added user '{}' to inbound '{}'", email, inbound_tag);
Ok(())
}
Err(e) => {
tracing::error!("gRPC error adding user '{}' to inbound '{}': status={}, message={}",
email, inbound_tag, e.code(), e.message());
Err(anyhow!("Failed to add user '{}' to inbound '{}': {}", email, inbound_tag, e))
}
}
}
/// Remove user from inbound
pub async fn remove_user(&self, inbound_tag: &str, email: &str) -> Result<()> {
tracing::info!("Removing user '{}' from inbound '{}' on Xray server at {}", email, inbound_tag, self.endpoint);
// Build the RemoveUserOperation
let remove_user_op = RemoveUserOperation {
email: email.to_string(),
};
let typed_message = TypedMessage {
r#type: "xray.app.proxyman.command.RemoveUserOperation".to_string(),
value: remove_user_op.encode_to_vec(),
};
let request = Request::new(AlterInboundRequest {
tag: inbound_tag.to_string(),
operation: Some(typed_message),
});
let mut handler_client = self.client.handler();
match handler_client.alter_inbound(request).await {
Ok(_) => {
tracing::info!("Successfully removed user '{}' from inbound '{}'", email, inbound_tag);
Ok(())
}
Err(e) => {
tracing::error!("Failed to remove user '{}' from inbound '{}': {}", email, inbound_tag, e);
Err(anyhow!("Failed to remove user '{}' from inbound '{}': {}", email, inbound_tag, e))
}
}
}
}