mirror of
https://github.com/house-of-vanity/OutFleet.git
synced 2025-10-26 18:19:07 +00:00
Cert works
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
use anyhow::{Result, anyhow};
|
||||
use serde_json::Value;
|
||||
use xray_core::Client;
|
||||
use std::sync::Arc;
|
||||
|
||||
// Import submodules from the same directory
|
||||
use super::stats::StatsClient;
|
||||
@@ -8,17 +9,16 @@ use super::inbounds::InboundClient;
|
||||
use super::users::UserClient;
|
||||
|
||||
/// Xray gRPC client wrapper
|
||||
#[derive(Clone)]
|
||||
pub struct XrayClient {
|
||||
endpoint: String,
|
||||
client: Client,
|
||||
client: Arc<Client>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl XrayClient {
|
||||
/// Connect to Xray gRPC server
|
||||
pub async fn connect(endpoint: &str) -> Result<Self> {
|
||||
tracing::info!("Connecting to Xray at {}", endpoint);
|
||||
|
||||
let client = Client::from_url(endpoint).await
|
||||
.map_err(|e| anyhow!("Failed to connect to Xray at {}: {}", endpoint, e))?;
|
||||
|
||||
@@ -26,61 +26,61 @@ impl XrayClient {
|
||||
|
||||
Ok(Self {
|
||||
endpoint: endpoint.to_string(),
|
||||
client,
|
||||
client: Arc::new(client),
|
||||
})
|
||||
}
|
||||
|
||||
/// Get server statistics
|
||||
pub async fn get_stats(&self) -> Result<Value> {
|
||||
let stats_client = StatsClient::new(self.endpoint.clone(), &self.client);
|
||||
let stats_client = StatsClient::new(self.endpoint.clone(), &*self.client);
|
||||
stats_client.get_stats().await
|
||||
}
|
||||
|
||||
/// Query specific statistics with pattern
|
||||
pub async fn query_stats(&self, pattern: &str, reset: bool) -> Result<Value> {
|
||||
let stats_client = StatsClient::new(self.endpoint.clone(), &self.client);
|
||||
let stats_client = StatsClient::new(self.endpoint.clone(), &*self.client);
|
||||
stats_client.query_stats(pattern, reset).await
|
||||
}
|
||||
|
||||
/// Restart Xray with new configuration
|
||||
pub async fn restart_with_config(&self, config: &crate::services::xray::XrayConfig) -> Result<()> {
|
||||
let inbound_client = InboundClient::new(self.endpoint.clone(), &self.client);
|
||||
let inbound_client = InboundClient::new(self.endpoint.clone(), &*self.client);
|
||||
inbound_client.restart_with_config(config).await
|
||||
}
|
||||
|
||||
/// Add inbound configuration
|
||||
pub async fn add_inbound(&self, inbound: &Value) -> Result<()> {
|
||||
let inbound_client = InboundClient::new(self.endpoint.clone(), &self.client);
|
||||
let inbound_client = InboundClient::new(self.endpoint.clone(), &*self.client);
|
||||
inbound_client.add_inbound(inbound).await
|
||||
}
|
||||
|
||||
/// Add inbound configuration with TLS certificate
|
||||
pub async fn add_inbound_with_certificate(&self, inbound: &Value, cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> {
|
||||
let inbound_client = InboundClient::new(self.endpoint.clone(), &self.client);
|
||||
let inbound_client = InboundClient::new(self.endpoint.clone(), &*self.client);
|
||||
inbound_client.add_inbound_with_certificate(inbound, None, cert_pem, key_pem).await
|
||||
}
|
||||
|
||||
/// Add inbound configuration with users and TLS certificate
|
||||
pub async fn add_inbound_with_users_and_certificate(&self, inbound: &Value, users: &[Value], cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> {
|
||||
let inbound_client = InboundClient::new(self.endpoint.clone(), &self.client);
|
||||
let inbound_client = InboundClient::new(self.endpoint.clone(), &*self.client);
|
||||
inbound_client.add_inbound_with_certificate(inbound, Some(users), cert_pem, key_pem).await
|
||||
}
|
||||
|
||||
/// Remove inbound by tag
|
||||
pub async fn remove_inbound(&self, tag: &str) -> Result<()> {
|
||||
let inbound_client = InboundClient::new(self.endpoint.clone(), &self.client);
|
||||
let inbound_client = InboundClient::new(self.endpoint.clone(), &*self.client);
|
||||
inbound_client.remove_inbound(tag).await
|
||||
}
|
||||
|
||||
/// Add user to inbound
|
||||
pub async fn add_user(&self, inbound_tag: &str, user: &Value) -> Result<()> {
|
||||
let user_client = UserClient::new(self.endpoint.clone(), &self.client);
|
||||
let user_client = UserClient::new(self.endpoint.clone(), &*self.client);
|
||||
user_client.add_user(inbound_tag, user).await
|
||||
}
|
||||
|
||||
/// Remove user from inbound
|
||||
pub async fn remove_user(&self, inbound_tag: &str, email: &str) -> Result<()> {
|
||||
let user_client = UserClient::new(self.endpoint.clone(), &self.client);
|
||||
let user_client = UserClient::new(self.endpoint.clone(), &*self.client);
|
||||
user_client.remove_user(inbound_tag, email).await
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::time::{Duration, Instant};
|
||||
use tracing::error;
|
||||
|
||||
pub mod client;
|
||||
pub mod config;
|
||||
@@ -15,24 +16,71 @@ pub mod users;
|
||||
pub use client::XrayClient;
|
||||
pub use config::XrayConfig;
|
||||
|
||||
/// Cached connection with TTL
|
||||
#[derive(Clone)]
|
||||
struct CachedConnection {
|
||||
client: XrayClient,
|
||||
created_at: Instant,
|
||||
}
|
||||
|
||||
impl CachedConnection {
|
||||
fn new(client: XrayClient) -> Self {
|
||||
Self {
|
||||
client,
|
||||
created_at: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
fn is_expired(&self, ttl: Duration) -> bool {
|
||||
self.created_at.elapsed() > ttl
|
||||
}
|
||||
}
|
||||
|
||||
/// Service for managing Xray servers via gRPC
|
||||
#[derive(Clone)]
|
||||
pub struct XrayService {}
|
||||
pub struct XrayService {
|
||||
connection_cache: Arc<RwLock<HashMap<String, CachedConnection>>>,
|
||||
connection_ttl: Duration,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl XrayService {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
Self {
|
||||
connection_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
connection_ttl: Duration::from_secs(300), // 5 minutes TTL
|
||||
}
|
||||
}
|
||||
|
||||
/// Get or create cached client for endpoint
|
||||
async fn get_or_create_client(&self, endpoint: &str) -> Result<XrayClient> {
|
||||
// Check cache first
|
||||
{
|
||||
let cache = self.connection_cache.read().await;
|
||||
if let Some(cached) = cache.get(endpoint) {
|
||||
if !cached.is_expired(self.connection_ttl) {
|
||||
return Ok(cached.client.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create new connection
|
||||
let client = XrayClient::connect(endpoint).await?;
|
||||
let cached_connection = CachedConnection::new(client.clone());
|
||||
|
||||
// Update cache
|
||||
{
|
||||
let mut cache = self.connection_cache.write().await;
|
||||
cache.insert(endpoint.to_string(), cached_connection);
|
||||
}
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
/// Create a client for the specified server
|
||||
async fn create_client(&self, endpoint: &str) -> Result<XrayClient> {
|
||||
XrayClient::connect(endpoint).await
|
||||
}
|
||||
|
||||
/// Test connection to Xray server
|
||||
pub async fn test_connection(&self, _server_id: Uuid, endpoint: &str) -> Result<bool> {
|
||||
match self.create_client(endpoint).await {
|
||||
match self.get_or_create_client(endpoint).await {
|
||||
Ok(_client) => {
|
||||
// Instead of getting stats (which might fail), just test connection
|
||||
// If we successfully created the client, connection is working
|
||||
@@ -44,7 +92,7 @@ impl XrayService {
|
||||
|
||||
/// Apply full configuration to Xray server
|
||||
pub async fn apply_config(&self, _server_id: Uuid, endpoint: &str, config: &XrayConfig) -> Result<()> {
|
||||
let client = self.create_client(endpoint).await?;
|
||||
let client = self.get_or_create_client(endpoint).await?;
|
||||
client.restart_with_config(config).await
|
||||
}
|
||||
|
||||
@@ -98,25 +146,25 @@ impl XrayService {
|
||||
|
||||
/// Add inbound to running Xray instance
|
||||
pub async fn add_inbound(&self, _server_id: Uuid, endpoint: &str, inbound: &Value) -> Result<()> {
|
||||
let client = self.create_client(endpoint).await?;
|
||||
let client = self.get_or_create_client(endpoint).await?;
|
||||
client.add_inbound(inbound).await
|
||||
}
|
||||
|
||||
/// Add inbound with certificate to running Xray instance
|
||||
pub async fn add_inbound_with_certificate(&self, _server_id: Uuid, endpoint: &str, inbound: &Value, cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> {
|
||||
let client = self.create_client(endpoint).await?;
|
||||
let client = self.get_or_create_client(endpoint).await?;
|
||||
client.add_inbound_with_certificate(inbound, cert_pem, key_pem).await
|
||||
}
|
||||
|
||||
/// Add inbound with users and certificate to running Xray instance
|
||||
pub async fn add_inbound_with_users_and_certificate(&self, _server_id: Uuid, endpoint: &str, inbound: &Value, users: &[Value], cert_pem: Option<&str>, key_pem: Option<&str>) -> Result<()> {
|
||||
let client = self.create_client(endpoint).await?;
|
||||
let client = self.get_or_create_client(endpoint).await?;
|
||||
client.add_inbound_with_users_and_certificate(inbound, users, cert_pem, key_pem).await
|
||||
}
|
||||
|
||||
/// Remove inbound from running Xray instance
|
||||
pub async fn remove_inbound(&self, _server_id: Uuid, endpoint: &str, tag: &str) -> Result<()> {
|
||||
let client = self.create_client(endpoint).await?;
|
||||
let client = self.get_or_create_client(endpoint).await?;
|
||||
client.remove_inbound(tag).await
|
||||
}
|
||||
|
||||
@@ -187,21 +235,70 @@ impl XrayService {
|
||||
|
||||
/// Remove user from inbound
|
||||
pub async fn remove_user(&self, _server_id: Uuid, endpoint: &str, inbound_tag: &str, email: &str) -> Result<()> {
|
||||
let client = self.create_client(endpoint).await?;
|
||||
let client = self.get_or_create_client(endpoint).await?;
|
||||
client.remove_user(inbound_tag, email).await
|
||||
}
|
||||
|
||||
/// Get server statistics
|
||||
pub async fn get_stats(&self, _server_id: Uuid, endpoint: &str) -> Result<Value> {
|
||||
let client = self.create_client(endpoint).await?;
|
||||
let client = self.get_or_create_client(endpoint).await?;
|
||||
client.get_stats().await
|
||||
}
|
||||
|
||||
/// Query specific statistics
|
||||
pub async fn query_stats(&self, _server_id: Uuid, endpoint: &str, pattern: &str, reset: bool) -> Result<Value> {
|
||||
let client = self.create_client(endpoint).await?;
|
||||
let client = self.get_or_create_client(endpoint).await?;
|
||||
client.query_stats(pattern, reset).await
|
||||
}
|
||||
|
||||
/// Sync entire server with batch operations using single client
|
||||
pub async fn sync_server_inbounds_optimized(
|
||||
&self,
|
||||
server_id: Uuid,
|
||||
endpoint: &str,
|
||||
desired_inbounds: &HashMap<String, crate::services::tasks::DesiredInbound>,
|
||||
) -> Result<()> {
|
||||
// Get single client for all operations
|
||||
let client = self.get_or_create_client(endpoint).await?;
|
||||
|
||||
// Perform all operations with the same client
|
||||
for (tag, desired) in desired_inbounds {
|
||||
// Always try to remove inbound first (ignore errors if it doesn't exist)
|
||||
let _ = client.remove_inbound(tag).await;
|
||||
|
||||
// Create inbound with users
|
||||
let users_json: Vec<Value> = desired.users.iter().map(|user| {
|
||||
serde_json::json!({
|
||||
"id": user.id,
|
||||
"email": user.email,
|
||||
"level": user.level
|
||||
})
|
||||
}).collect();
|
||||
|
||||
// Build inbound config
|
||||
let inbound_config = serde_json::json!({
|
||||
"tag": desired.tag,
|
||||
"port": desired.port,
|
||||
"protocol": desired.protocol,
|
||||
"settings": desired.settings,
|
||||
"streamSettings": desired.stream_settings
|
||||
});
|
||||
|
||||
match client.add_inbound_with_users_and_certificate(
|
||||
&inbound_config,
|
||||
&users_json,
|
||||
desired.cert_pem.as_deref(),
|
||||
desired.key_pem.as_deref(),
|
||||
).await {
|
||||
Err(e) => {
|
||||
error!("Failed to create inbound {}: {}", tag, e);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for XrayService {
|
||||
|
||||
Reference in New Issue
Block a user