mirror of
https://github.com/house-of-vanity/yggman.git
synced 2025-12-16 19:17:56 +00:00
Added agent
This commit is contained in:
138
src/agent.rs
138
src/agent.rs
@@ -1,9 +1,11 @@
|
||||
use anyhow::Result;
|
||||
use anyhow::{Result, anyhow};
|
||||
use clap::Parser;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use network_interface::{NetworkInterface, NetworkInterfaceConfig};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::path::Path;
|
||||
use tokio::time::sleep;
|
||||
use tokio_tungstenite::{connect_async, tungstenite::Message};
|
||||
use tracing::{error, info, warn, debug};
|
||||
@@ -73,11 +75,18 @@ async fn main() -> Result<()> {
|
||||
.init();
|
||||
|
||||
info!("Starting yggman-agent v{}", env!("CARGO_PKG_VERSION"));
|
||||
|
||||
// Check for yggdrasil config file
|
||||
let ygg_config_path = find_yggdrasil_config().ok_or_else(|| {
|
||||
anyhow!("Yggdrasil config file not found. Please ensure yggdrasil.conf exists at /etc/yggdrasil.conf or /etc/yggdrasil/yggdrasil.conf")
|
||||
})?;
|
||||
info!("Found Yggdrasil config at: {}", ygg_config_path);
|
||||
|
||||
info!("Connecting to control plane: {}", args.server);
|
||||
|
||||
// Main loop with reconnection logic
|
||||
loop {
|
||||
match run_agent(&args).await {
|
||||
match run_agent(&args, &ygg_config_path).await {
|
||||
Ok(_) => {
|
||||
info!("Agent connection closed normally");
|
||||
}
|
||||
@@ -94,7 +103,7 @@ async fn main() -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_agent(args: &Args) -> Result<()> {
|
||||
async fn run_agent(args: &Args, ygg_config_path: &str) -> Result<()> {
|
||||
// Get node name
|
||||
let node_name = args.name.clone().unwrap_or_else(|| {
|
||||
hostname::get()
|
||||
@@ -133,6 +142,37 @@ async fn run_agent(args: &Args) -> Result<()> {
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Spawn address scanning task
|
||||
let (address_scan_tx, mut address_scan_rx) = tokio::sync::mpsc::channel(1);
|
||||
let current_addresses = Arc::new(tokio::sync::RwLock::new(addresses.clone()));
|
||||
let current_addresses_clone = current_addresses.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(60)); // Scan every minute
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
match discover_addresses() {
|
||||
Ok(new_addresses) => {
|
||||
let mut current = current_addresses_clone.write().await;
|
||||
|
||||
// Check if addresses have changed
|
||||
if *current != new_addresses {
|
||||
info!("Address change detected: {:?} -> {:?}", *current, new_addresses);
|
||||
*current = new_addresses.clone();
|
||||
|
||||
if address_scan_tx.send(new_addresses).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to scan addresses: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Main message loop
|
||||
loop {
|
||||
@@ -141,7 +181,7 @@ async fn run_agent(args: &Args) -> Result<()> {
|
||||
match msg {
|
||||
Some(Ok(Message::Text(text))) => {
|
||||
match serde_json::from_str::<ServerMessage>(&text) {
|
||||
Ok(server_msg) => handle_server_message(server_msg).await?,
|
||||
Ok(server_msg) => handle_server_message(server_msg, ygg_config_path).await?,
|
||||
Err(e) => warn!("Failed to parse server message: {}", e),
|
||||
}
|
||||
}
|
||||
@@ -168,13 +208,24 @@ async fn run_agent(args: &Args) -> Result<()> {
|
||||
}
|
||||
debug!("Sent heartbeat");
|
||||
}
|
||||
Some(new_addresses) = address_scan_rx.recv() => {
|
||||
let update_msg = AgentMessage::UpdateAddresses {
|
||||
addresses: new_addresses,
|
||||
};
|
||||
let json = serde_json::to_string(&update_msg)?;
|
||||
if let Err(e) = write.send(Message::Text(json)).await {
|
||||
error!("Failed to send address update: {}", e);
|
||||
break;
|
||||
}
|
||||
info!("Sent address update to control plane");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_server_message(msg: ServerMessage) -> Result<()> {
|
||||
async fn handle_server_message(msg: ServerMessage, ygg_config_path: &str) -> Result<()> {
|
||||
match msg {
|
||||
ServerMessage::Config {
|
||||
node_id,
|
||||
@@ -193,8 +244,11 @@ async fn handle_server_message(msg: ServerMessage) -> Result<()> {
|
||||
}
|
||||
info!(" Allowed keys: {} configured", allowed_public_keys.len());
|
||||
|
||||
// TODO: Apply configuration to Yggdrasil
|
||||
info!("Configuration received (not yet applied)");
|
||||
// Apply configuration to Yggdrasil
|
||||
match write_yggdrasil_config(ygg_config_path, &private_key, &listen, &peers, &allowed_public_keys).await {
|
||||
Ok(_) => info!("Configuration successfully applied to {}", ygg_config_path),
|
||||
Err(e) => error!("Failed to write Yggdrasil config: {}", e),
|
||||
}
|
||||
}
|
||||
ServerMessage::Update {
|
||||
peers,
|
||||
@@ -207,8 +261,12 @@ async fn handle_server_message(msg: ServerMessage) -> Result<()> {
|
||||
}
|
||||
info!(" Updated allowed keys: {} configured", allowed_public_keys.len());
|
||||
|
||||
// TODO: Apply configuration update to Yggdrasil
|
||||
info!("Configuration update received (not yet applied)");
|
||||
// Apply configuration update to Yggdrasil
|
||||
// For updates we need to read current config and update only peers/allowed keys
|
||||
match update_yggdrasil_config(ygg_config_path, &peers, &allowed_public_keys).await {
|
||||
Ok(_) => info!("Configuration update successfully applied to {}", ygg_config_path),
|
||||
Err(e) => error!("Failed to update Yggdrasil config: {}", e),
|
||||
}
|
||||
}
|
||||
ServerMessage::Error { message } => {
|
||||
error!("Server error: {}", message);
|
||||
@@ -251,4 +309,66 @@ fn discover_addresses() -> Result<Vec<String>> {
|
||||
|
||||
// If no addresses found, return empty vec (will use localhost)
|
||||
Ok(addresses)
|
||||
}
|
||||
|
||||
fn find_yggdrasil_config() -> Option<String> {
|
||||
let possible_paths = vec![
|
||||
"/etc/yggdrasil.conf",
|
||||
"/etc/yggdrasil/yggdrasil.conf",
|
||||
];
|
||||
|
||||
for path in possible_paths {
|
||||
if Path::new(path).exists() {
|
||||
return Some(path.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
async fn write_yggdrasil_config(
|
||||
config_path: &str,
|
||||
private_key: &str,
|
||||
listen: &[String],
|
||||
peers: &[String],
|
||||
allowed_public_keys: &[String]
|
||||
) -> Result<()> {
|
||||
use serde_json::json;
|
||||
|
||||
let config = json!({
|
||||
"PrivateKey": private_key,
|
||||
"Listen": listen,
|
||||
"Peers": peers,
|
||||
"AllowedPublicKeys": allowed_public_keys,
|
||||
"InterfacePeers": {},
|
||||
"NodeInfo": {},
|
||||
"NodeInfoPrivacy": false
|
||||
});
|
||||
|
||||
let config_json = serde_json::to_string_pretty(&config)?;
|
||||
tokio::fs::write(config_path, config_json).await?;
|
||||
|
||||
info!("Yggdrasil configuration written to {}", config_path);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_yggdrasil_config(
|
||||
config_path: &str,
|
||||
peers: &[String],
|
||||
allowed_public_keys: &[String]
|
||||
) -> Result<()> {
|
||||
// Read current config
|
||||
let current_config = tokio::fs::read_to_string(config_path).await?;
|
||||
let mut config: serde_json::Value = serde_json::from_str(¤t_config)?;
|
||||
|
||||
// Update only peers and allowed public keys
|
||||
config["Peers"] = serde_json::json!(peers);
|
||||
config["AllowedPublicKeys"] = serde_json::json!(allowed_public_keys);
|
||||
|
||||
// Write updated config back
|
||||
let updated_config = serde_json::to_string_pretty(&config)?;
|
||||
tokio::fs::write(config_path, updated_config).await?;
|
||||
|
||||
info!("Yggdrasil configuration updated in {}", config_path);
|
||||
Ok(())
|
||||
}
|
||||
@@ -102,6 +102,15 @@ impl ConfigManager {
|
||||
self.config.load_full()
|
||||
}
|
||||
|
||||
pub fn update_listen_template(&self, new_template: Vec<String>) {
|
||||
let current = self.config.load_full();
|
||||
let mut new_config = current.as_ref().clone();
|
||||
new_config.nodes.default_listen_endpoints = new_template;
|
||||
|
||||
self.config.store(Arc::new(new_config));
|
||||
tracing::info!("Listen template updated in memory");
|
||||
}
|
||||
|
||||
|
||||
/// Load configuration from multiple sources with precedence:
|
||||
/// CLI args > Environment variables > Config file > Defaults
|
||||
|
||||
@@ -3,6 +3,7 @@ use crate::config::{AppConfig, ConfigManager};
|
||||
use crate::core::context::AppContext;
|
||||
use crate::core::module::ModuleManager;
|
||||
use crate::error::Result;
|
||||
use crate::settings_manager::SettingsManager;
|
||||
use tokio::signal;
|
||||
|
||||
pub struct Application {
|
||||
@@ -10,9 +11,18 @@ pub struct Application {
|
||||
}
|
||||
|
||||
impl Application {
|
||||
pub fn new(config: AppConfig) -> Self {
|
||||
pub fn new(config: AppConfig, settings_manager: SettingsManager) -> Self {
|
||||
let config_manager = Arc::new(ConfigManager::new(config));
|
||||
let context = Arc::new(AppContext::new(config_manager));
|
||||
let context = Arc::new(AppContext::new(config_manager, Arc::new(settings_manager)));
|
||||
let module_manager = ModuleManager::new(context);
|
||||
|
||||
Self {
|
||||
module_manager,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_managers(config_manager: ConfigManager, settings_manager: SettingsManager) -> Self {
|
||||
let context = Arc::new(AppContext::new(Arc::new(config_manager), Arc::new(settings_manager)));
|
||||
let module_manager = ModuleManager::new(context);
|
||||
|
||||
Self {
|
||||
|
||||
@@ -1,14 +1,17 @@
|
||||
use std::sync::Arc;
|
||||
use crate::config::ConfigManager;
|
||||
use crate::settings_manager::SettingsManager;
|
||||
|
||||
pub struct AppContext {
|
||||
pub config_manager: Arc<ConfigManager>,
|
||||
pub settings_manager: Arc<SettingsManager>,
|
||||
}
|
||||
|
||||
impl AppContext {
|
||||
pub fn new(config_manager: Arc<ConfigManager>) -> Self {
|
||||
pub fn new(config_manager: Arc<ConfigManager>, settings_manager: Arc<SettingsManager>) -> Self {
|
||||
Self {
|
||||
config_manager,
|
||||
settings_manager,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -47,17 +47,30 @@ pub async fn migrate_database(db: &DatabaseConnection) -> Result<(), DbErr> {
|
||||
let schema = Schema::new(backend);
|
||||
|
||||
// Create nodes table if it doesn't exist
|
||||
let mut create_table_stmt = schema.create_table_from_entity(crate::database::entities::node::Entity);
|
||||
let mut create_nodes_stmt = schema.create_table_from_entity(crate::database::entities::node::Entity);
|
||||
|
||||
// Convert to SQL
|
||||
let sql = match backend {
|
||||
DbBackend::Sqlite => create_table_stmt.if_not_exists().to_string(SqliteQueryBuilder),
|
||||
DbBackend::Postgres => create_table_stmt.if_not_exists().to_string(PostgresQueryBuilder),
|
||||
DbBackend::MySql => create_table_stmt.if_not_exists().to_string(MysqlQueryBuilder),
|
||||
let nodes_sql = match backend {
|
||||
DbBackend::Sqlite => create_nodes_stmt.if_not_exists().to_string(SqliteQueryBuilder),
|
||||
DbBackend::Postgres => create_nodes_stmt.if_not_exists().to_string(PostgresQueryBuilder),
|
||||
DbBackend::MySql => create_nodes_stmt.if_not_exists().to_string(MysqlQueryBuilder),
|
||||
};
|
||||
|
||||
// Execute the statement
|
||||
db.execute(Statement::from_string(backend, sql)).await?;
|
||||
db.execute(Statement::from_string(backend, nodes_sql)).await?;
|
||||
|
||||
// Create settings table if it doesn't exist
|
||||
let mut create_settings_stmt = schema.create_table_from_entity(crate::database::entities::settings::Entity);
|
||||
|
||||
// Convert to SQL
|
||||
let settings_sql = match backend {
|
||||
DbBackend::Sqlite => create_settings_stmt.if_not_exists().to_string(SqliteQueryBuilder),
|
||||
DbBackend::Postgres => create_settings_stmt.if_not_exists().to_string(PostgresQueryBuilder),
|
||||
DbBackend::MySql => create_settings_stmt.if_not_exists().to_string(MysqlQueryBuilder),
|
||||
};
|
||||
|
||||
// Execute the statement
|
||||
db.execute(Statement::from_string(backend, settings_sql)).await?;
|
||||
|
||||
tracing::info!("Database migration completed");
|
||||
Ok(())
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
pub mod node;
|
||||
pub mod node;
|
||||
pub mod settings;
|
||||
47
src/database/entities/settings.rs
Normal file
47
src/database/entities/settings.rs
Normal file
@@ -0,0 +1,47 @@
|
||||
use sea_orm::entity::prelude::*;
|
||||
use sea_orm::Set;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
|
||||
#[sea_orm(table_name = "settings")]
|
||||
pub struct Model {
|
||||
#[sea_orm(primary_key, auto_increment = false)]
|
||||
pub key: String,
|
||||
pub value: String,
|
||||
pub created_at: DateTime,
|
||||
pub updated_at: DateTime,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
|
||||
pub enum Relation {}
|
||||
|
||||
impl ActiveModelBehavior for ActiveModel {}
|
||||
|
||||
impl Model {
|
||||
pub fn parse_json_value<T>(&self) -> Result<T, serde_json::Error>
|
||||
where
|
||||
T: for<'de> serde::Deserialize<'de>
|
||||
{
|
||||
serde_json::from_str(&self.value)
|
||||
}
|
||||
}
|
||||
|
||||
impl ActiveModel {
|
||||
pub fn new(key: String, value: &impl serde::Serialize) -> Result<Self, serde_json::Error> {
|
||||
let value_json = serde_json::to_string(value)?;
|
||||
let now = chrono::Utc::now().naive_utc();
|
||||
|
||||
Ok(Self {
|
||||
key: Set(key),
|
||||
value: Set(value_json),
|
||||
created_at: Set(now),
|
||||
updated_at: Set(now),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn update_value(&mut self, value: &impl serde::Serialize) -> Result<(), serde_json::Error> {
|
||||
let value_json = serde_json::to_string(value)?;
|
||||
self.value = Set(value_json);
|
||||
self.updated_at = Set(chrono::Utc::now().naive_utc());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
17
src/main.rs
17
src/main.rs
@@ -5,6 +5,7 @@ mod database;
|
||||
mod error;
|
||||
mod modules;
|
||||
mod node_manager;
|
||||
mod settings_manager;
|
||||
mod yggdrasil;
|
||||
mod websocket_state;
|
||||
|
||||
@@ -53,9 +54,21 @@ async fn main() -> Result<()> {
|
||||
database::migrate_database(&db).await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to migrate database: {}", e))?;
|
||||
|
||||
let mut app = core::app::Application::new(config);
|
||||
// Create settings manager and initialize defaults
|
||||
let settings_manager = settings_manager::SettingsManager::new(db.clone());
|
||||
settings_manager.initialize_defaults().await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to initialize settings: {}", e))?;
|
||||
|
||||
app.register_module(Box::new(modules::web::WebModule::new(db)));
|
||||
// Create config manager first
|
||||
let config_manager = config::ConfigManager::new(config);
|
||||
|
||||
// Load settings from database to config
|
||||
settings_manager.load_settings_to_config(&config_manager).await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to load settings to config: {}", e))?;
|
||||
|
||||
let mut app = core::app::Application::new_with_managers(config_manager, settings_manager.clone());
|
||||
|
||||
app.register_module(Box::new(modules::web::WebModule::new(db, settings_manager)));
|
||||
|
||||
app.run().await?;
|
||||
|
||||
|
||||
@@ -14,20 +14,29 @@ use crate::core::context::AppContext;
|
||||
use crate::core::module::Module;
|
||||
use crate::error::Result;
|
||||
use crate::node_manager::NodeManager;
|
||||
use crate::settings_manager::SettingsManager;
|
||||
use crate::yggdrasil::{Node, YggdrasilConfig};
|
||||
|
||||
#[derive(Clone)]
|
||||
struct AppState {
|
||||
node_manager: Arc<NodeManager>,
|
||||
context: Arc<AppContext>,
|
||||
}
|
||||
|
||||
pub struct WebModule {
|
||||
name: String,
|
||||
context: Option<Arc<AppContext>>,
|
||||
node_manager: Arc<NodeManager>,
|
||||
settings_manager: Arc<SettingsManager>,
|
||||
}
|
||||
|
||||
impl WebModule {
|
||||
pub fn new(db: DatabaseConnection) -> Self {
|
||||
pub fn new(db: DatabaseConnection, settings_manager: SettingsManager) -> Self {
|
||||
Self {
|
||||
name: "web".to_string(),
|
||||
context: None,
|
||||
node_manager: Arc::new(NodeManager::new(db)),
|
||||
settings_manager: Arc::new(settings_manager),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -51,10 +60,14 @@ impl Module for WebModule {
|
||||
|
||||
tracing::info!("Starting web server on port {}", port);
|
||||
|
||||
let node_manager = self.node_manager.clone();
|
||||
let app_state = AppState {
|
||||
node_manager: self.node_manager.clone(),
|
||||
context: context.clone(),
|
||||
};
|
||||
|
||||
let app = Router::new()
|
||||
.route("/", get(index_handler))
|
||||
.route("/edit/:id", get(edit_page_handler))
|
||||
.route("/api/nodes", get(get_nodes_handler))
|
||||
.route("/api/nodes", post(add_node_handler))
|
||||
.route("/api/nodes/:id", get(get_node_handler))
|
||||
@@ -62,9 +75,11 @@ impl Module for WebModule {
|
||||
.route("/api/nodes/:id", delete(delete_node_handler))
|
||||
.route("/api/configs", get(get_configs_handler))
|
||||
.route("/api/nodes/:id/config", get(get_node_config_handler))
|
||||
.route("/api/settings/listen-template", get(get_listen_template_handler))
|
||||
.route("/api/settings/listen-template", put(update_listen_template_handler))
|
||||
.route("/ws/agent", get(ws_agent_handler))
|
||||
.layer(CorsLayer::permissive())
|
||||
.with_state(node_manager);
|
||||
.with_state(app_state);
|
||||
|
||||
let bind_addr = format!("{}:{}", config.server.bind_address, port);
|
||||
let listener = tokio::net::TcpListener::bind(&bind_addr)
|
||||
@@ -96,9 +111,9 @@ struct NodesResponse {
|
||||
}
|
||||
|
||||
async fn get_nodes_handler(
|
||||
State(node_manager): State<Arc<NodeManager>>,
|
||||
State(app_state): State<AppState>,
|
||||
) -> Json<NodesResponse> {
|
||||
let nodes = node_manager.get_all_nodes().await;
|
||||
let nodes = app_state.node_manager.get_all_nodes().await;
|
||||
Json(NodesResponse { nodes })
|
||||
}
|
||||
|
||||
@@ -116,13 +131,13 @@ struct AddNodeResponse {
|
||||
}
|
||||
|
||||
async fn add_node_handler(
|
||||
State(node_manager): State<Arc<NodeManager>>,
|
||||
State(app_state): State<AppState>,
|
||||
Json(payload): Json<AddNodeRequest>,
|
||||
) -> Json<AddNodeResponse> {
|
||||
match node_manager.add_node(payload.name, payload.listen, payload.addresses).await {
|
||||
match app_state.node_manager.add_node(payload.name, payload.listen, payload.addresses).await {
|
||||
Ok(_) => {
|
||||
// Broadcast update to all connected agents
|
||||
crate::websocket_state::broadcast_configuration_update(&node_manager).await;
|
||||
crate::websocket_state::broadcast_configuration_update(&app_state.node_manager).await;
|
||||
|
||||
Json(AddNodeResponse {
|
||||
success: true,
|
||||
@@ -150,10 +165,10 @@ struct NodeConfig {
|
||||
}
|
||||
|
||||
async fn get_configs_handler(
|
||||
State(node_manager): State<Arc<NodeManager>>,
|
||||
State(app_state): State<AppState>,
|
||||
) -> Json<ConfigsResponse> {
|
||||
let nodes = node_manager.get_all_nodes().await;
|
||||
let configs_map = node_manager.generate_configs().await;
|
||||
let nodes = app_state.node_manager.get_all_nodes().await;
|
||||
let configs_map = app_state.node_manager.generate_configs().await;
|
||||
|
||||
let mut configs = Vec::new();
|
||||
for node in nodes {
|
||||
@@ -172,10 +187,10 @@ async fn get_configs_handler(
|
||||
|
||||
// Get single node handler
|
||||
async fn get_node_handler(
|
||||
State(node_manager): State<Arc<NodeManager>>,
|
||||
State(app_state): State<AppState>,
|
||||
Path(node_id): Path<String>,
|
||||
) -> std::result::Result<Json<Node>, StatusCode> {
|
||||
match node_manager.get_node_by_id(&node_id).await {
|
||||
match app_state.node_manager.get_node_by_id(&node_id).await {
|
||||
Some(node) => Ok(Json(node)),
|
||||
None => Err(StatusCode::NOT_FOUND),
|
||||
}
|
||||
@@ -183,14 +198,14 @@ async fn get_node_handler(
|
||||
|
||||
// Update node handler
|
||||
async fn update_node_handler(
|
||||
State(node_manager): State<Arc<NodeManager>>,
|
||||
State(app_state): State<AppState>,
|
||||
Path(node_id): Path<String>,
|
||||
Json(payload): Json<AddNodeRequest>,
|
||||
) -> std::result::Result<Json<AddNodeResponse>, StatusCode> {
|
||||
match node_manager.update_node(&node_id, payload.name, payload.listen, payload.addresses).await {
|
||||
match app_state.node_manager.update_node(&node_id, payload.name, payload.listen, payload.addresses).await {
|
||||
Ok(_) => {
|
||||
// Broadcast update to all connected agents
|
||||
crate::websocket_state::broadcast_configuration_update(&node_manager).await;
|
||||
crate::websocket_state::broadcast_configuration_update(&app_state.node_manager).await;
|
||||
|
||||
Ok(Json(AddNodeResponse {
|
||||
success: true,
|
||||
@@ -212,13 +227,13 @@ async fn update_node_handler(
|
||||
|
||||
// Delete node handler
|
||||
async fn delete_node_handler(
|
||||
State(node_manager): State<Arc<NodeManager>>,
|
||||
State(app_state): State<AppState>,
|
||||
Path(node_id): Path<String>,
|
||||
) -> std::result::Result<Json<AddNodeResponse>, StatusCode> {
|
||||
match node_manager.remove_node(&node_id).await {
|
||||
match app_state.node_manager.remove_node(&node_id).await {
|
||||
Ok(_) => {
|
||||
// Broadcast update to all connected agents
|
||||
crate::websocket_state::broadcast_configuration_update(&node_manager).await;
|
||||
crate::websocket_state::broadcast_configuration_update(&app_state.node_manager).await;
|
||||
|
||||
Ok(Json(AddNodeResponse {
|
||||
success: true,
|
||||
@@ -240,17 +255,17 @@ async fn delete_node_handler(
|
||||
|
||||
// Get node configuration for agent
|
||||
async fn get_node_config_handler(
|
||||
State(node_manager): State<Arc<NodeManager>>,
|
||||
State(app_state): State<AppState>,
|
||||
Path(node_id): Path<String>,
|
||||
) -> std::result::Result<Json<NodeConfig>, StatusCode> {
|
||||
// Get the node
|
||||
let node = match node_manager.get_node_by_id(&node_id).await {
|
||||
let node = match app_state.node_manager.get_node_by_id(&node_id).await {
|
||||
Some(node) => node,
|
||||
None => return Err(StatusCode::NOT_FOUND),
|
||||
};
|
||||
|
||||
// Generate configurations for all nodes
|
||||
let configs_map = node_manager.generate_configs().await;
|
||||
let configs_map = app_state.node_manager.generate_configs().await;
|
||||
|
||||
// Get config for this specific node
|
||||
match configs_map.get(&node_id) {
|
||||
@@ -267,7 +282,67 @@ async fn get_node_config_handler(
|
||||
// WebSocket handler for agents
|
||||
async fn ws_agent_handler(
|
||||
ws: WebSocketUpgrade,
|
||||
State(node_manager): State<Arc<NodeManager>>,
|
||||
State(app_state): State<AppState>,
|
||||
) -> Response {
|
||||
ws.on_upgrade(move |socket| crate::modules::websocket::handle_agent_socket(socket, node_manager))
|
||||
ws.on_upgrade(move |socket| crate::modules::websocket::handle_agent_socket(socket, app_state.node_manager, app_state.context))
|
||||
}
|
||||
|
||||
// Edit page handler
|
||||
async fn edit_page_handler(Path(node_id): Path<String>) -> Html<String> {
|
||||
let html = include_str!("../../static/edit.html");
|
||||
let content = html.replace("{{NODE_ID}}", &node_id);
|
||||
Html(content)
|
||||
}
|
||||
|
||||
// Listen template handlers
|
||||
#[derive(serde::Serialize, serde::Deserialize)]
|
||||
struct ListenTemplateResponse {
|
||||
template: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
struct UpdateListenTemplateRequest {
|
||||
template: Vec<String>,
|
||||
}
|
||||
|
||||
async fn get_listen_template_handler(
|
||||
State(app_state): State<AppState>,
|
||||
) -> Json<ListenTemplateResponse> {
|
||||
match app_state.context.settings_manager.get_listen_template().await {
|
||||
Ok(template) => Json(ListenTemplateResponse { template }),
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to get listen template from database: {}", e);
|
||||
// Return fallback default
|
||||
Json(ListenTemplateResponse {
|
||||
template: vec!["tcp://0.0.0.0:9001".to_string()],
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn update_listen_template_handler(
|
||||
State(app_state): State<AppState>,
|
||||
Json(payload): Json<UpdateListenTemplateRequest>,
|
||||
) -> Json<serde_json::Value> {
|
||||
tracing::info!("Listen template update request: {:?}", payload.template);
|
||||
|
||||
// Save to database
|
||||
match app_state.context.settings_manager.set_listen_template(payload.template.clone()).await {
|
||||
Ok(_) => {
|
||||
// Update in-memory config
|
||||
app_state.context.config_manager.update_listen_template(payload.template);
|
||||
|
||||
Json(serde_json::json!({
|
||||
"success": true,
|
||||
"message": "Listen template updated successfully"
|
||||
}))
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to save listen template: {}", e);
|
||||
Json(serde_json::json!({
|
||||
"success": false,
|
||||
"message": format!("Failed to save template: {}", e)
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@ use std::sync::Arc;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::node_manager::NodeManager;
|
||||
use crate::core::context::AppContext;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
@@ -42,6 +43,7 @@ pub enum ServerMessage {
|
||||
pub async fn handle_agent_socket(
|
||||
socket: WebSocket,
|
||||
node_manager: Arc<NodeManager>,
|
||||
context: Arc<AppContext>,
|
||||
) {
|
||||
let (mut sender, mut receiver) = socket.split();
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel::<ServerMessage>(100);
|
||||
@@ -68,9 +70,14 @@ pub async fn handle_agent_socket(
|
||||
AgentMessage::Register { name, addresses } => {
|
||||
info!("Agent registration: {} with addresses {:?}", name, addresses);
|
||||
|
||||
// Get default endpoints from config
|
||||
// TODO: Get from actual config, for now use hardcoded
|
||||
let default_listen = vec!["tcp://0.0.0.0:9001".to_string()];
|
||||
// Get default endpoints from settings database
|
||||
let default_listen = match context.settings_manager.get_listen_template().await {
|
||||
Ok(template) => template,
|
||||
Err(e) => {
|
||||
error!("Failed to get listen template from database: {}", e);
|
||||
vec!["tcp://0.0.0.0:9001".to_string()] // fallback
|
||||
}
|
||||
};
|
||||
|
||||
// Check if node already exists
|
||||
let node = if let Some(existing_node) = node_manager.get_node_by_name(&name).await {
|
||||
@@ -139,7 +146,28 @@ pub async fn handle_agent_socket(
|
||||
AgentMessage::UpdateAddresses { addresses } => {
|
||||
if let Some(id) = &node_id {
|
||||
info!("Address update for {}: {:?}", id, addresses);
|
||||
// TODO: Update node addresses in database
|
||||
|
||||
// Get current node information
|
||||
if let Some(current_node) = node_manager.get_node_by_id(id).await {
|
||||
// Update node with new addresses
|
||||
match node_manager.update_node(
|
||||
id,
|
||||
current_node.name.clone(),
|
||||
current_node.listen.clone(),
|
||||
addresses
|
||||
).await {
|
||||
Ok(_) => {
|
||||
info!("Updated addresses for node {}", id);
|
||||
// Broadcast configuration update to all agents
|
||||
crate::websocket_state::broadcast_configuration_update(&node_manager).await;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to update addresses for node {}: {}", id, e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("Cannot update addresses for unknown node: {}", id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
97
src/settings_manager.rs
Normal file
97
src/settings_manager.rs
Normal file
@@ -0,0 +1,97 @@
|
||||
use sea_orm::{DatabaseConnection, EntityTrait, QueryFilter, ColumnTrait};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::database::entities::settings::{Entity as SettingsEntity, ActiveModel};
|
||||
use crate::error::AppError;
|
||||
use crate::config::ConfigManager;
|
||||
|
||||
const LISTEN_TEMPLATE_KEY: &str = "listen_template";
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SettingsManager {
|
||||
db: Arc<DatabaseConnection>,
|
||||
}
|
||||
|
||||
impl SettingsManager {
|
||||
pub fn new(db: DatabaseConnection) -> Self {
|
||||
Self {
|
||||
db: Arc::new(db),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_listen_template(&self) -> Result<Vec<String>, AppError> {
|
||||
match SettingsEntity::find()
|
||||
.filter(crate::database::entities::settings::Column::Key.eq(LISTEN_TEMPLATE_KEY))
|
||||
.one(&*self.db)
|
||||
.await
|
||||
.map_err(|e| AppError::Config(format!("Database error: {}", e)))?
|
||||
{
|
||||
Some(setting) => {
|
||||
setting.parse_json_value::<Vec<String>>()
|
||||
.map_err(|e| AppError::Config(format!("Failed to parse listen template: {}", e)))
|
||||
},
|
||||
None => {
|
||||
// Return default template if not found
|
||||
Ok(vec!["tcp://0.0.0.0:9001".to_string()])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn set_listen_template(&self, template: Vec<String>) -> Result<(), AppError> {
|
||||
// Check if setting already exists
|
||||
let existing = SettingsEntity::find()
|
||||
.filter(crate::database::entities::settings::Column::Key.eq(LISTEN_TEMPLATE_KEY))
|
||||
.one(&*self.db)
|
||||
.await
|
||||
.map_err(|e| AppError::Config(format!("Database error: {}", e)))?;
|
||||
|
||||
if let Some(existing_setting) = existing {
|
||||
// Update existing setting
|
||||
let mut active_model: ActiveModel = existing_setting.into();
|
||||
active_model.update_value(&template)
|
||||
.map_err(|e| AppError::Config(format!("Failed to serialize template: {}", e)))?;
|
||||
|
||||
SettingsEntity::update(active_model)
|
||||
.exec(&*self.db)
|
||||
.await
|
||||
.map_err(|e| AppError::Config(format!("Database error: {}", e)))?;
|
||||
} else {
|
||||
// Create new setting
|
||||
let active_model = ActiveModel::new(LISTEN_TEMPLATE_KEY.to_string(), &template)
|
||||
.map_err(|e| AppError::Config(format!("Failed to serialize template: {}", e)))?;
|
||||
|
||||
SettingsEntity::insert(active_model)
|
||||
.exec(&*self.db)
|
||||
.await
|
||||
.map_err(|e| AppError::Config(format!("Database error: {}", e)))?;
|
||||
}
|
||||
|
||||
tracing::info!("Listen template saved to database: {:?}", template);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn initialize_defaults(&self) -> Result<(), AppError> {
|
||||
// Check if listen template exists, if not create default
|
||||
if SettingsEntity::find()
|
||||
.filter(crate::database::entities::settings::Column::Key.eq(LISTEN_TEMPLATE_KEY))
|
||||
.one(&*self.db)
|
||||
.await
|
||||
.map_err(|e| AppError::Config(format!("Database error: {}", e)))?
|
||||
.is_none()
|
||||
{
|
||||
let default_template = vec!["tcp://0.0.0.0:9001".to_string()];
|
||||
self.set_listen_template(default_template).await?;
|
||||
tracing::info!("Initialized default listen template");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn load_settings_to_config(&self, config_manager: &ConfigManager) -> Result<(), AppError> {
|
||||
// Load listen template from database and update config
|
||||
let template = self.get_listen_template().await?;
|
||||
config_manager.update_listen_template(template);
|
||||
tracing::info!("Loaded settings from database to config");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user