first commit

This commit is contained in:
Ultradesu
2025-08-25 21:48:59 +03:00
commit 98ba8846c3
25 changed files with 6400 additions and 0 deletions

254
src/agent.rs Normal file
View File

@@ -0,0 +1,254 @@
use anyhow::Result;
use clap::Parser;
use futures_util::{SinkExt, StreamExt};
use network_interface::{NetworkInterface, NetworkInterfaceConfig};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tokio::time::sleep;
use tokio_tungstenite::{connect_async, tungstenite::Message};
use tracing::{error, info, warn, debug};
#[derive(Parser, Debug)]
#[command(
name = "yggman-agent",
about = "Yggdrasil network agent for automatic node configuration"
)]
struct Args {
/// Control plane server URL (e.g., ws://localhost:8080/ws/agent)
#[arg(short, long)]
server: String,
/// Node name (optional, will use hostname if not provided)
#[arg(short, long)]
name: Option<String>,
/// Log level (trace, debug, info, warn, error)
#[arg(long, default_value = "info")]
log_level: String,
/// Reconnect interval in seconds
#[arg(long, default_value = "5")]
reconnect_interval: u64,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
enum AgentMessage {
Register {
name: String,
addresses: Vec<String>,
},
Heartbeat,
UpdateAddresses {
addresses: Vec<String>,
},
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
enum ServerMessage {
Config {
node_id: String,
private_key: String,
listen: Vec<String>,
peers: Vec<String>,
allowed_public_keys: Vec<String>,
},
Update {
peers: Vec<String>,
allowed_public_keys: Vec<String>,
},
Error {
message: String,
},
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
// Initialize tracing
tracing_subscriber::fmt()
.with_max_level(args.log_level.parse::<tracing::Level>()?)
.init();
info!("Starting yggman-agent v{}", env!("CARGO_PKG_VERSION"));
info!("Connecting to control plane: {}", args.server);
// Main loop with reconnection logic
loop {
match run_agent(&args).await {
Ok(_) => {
info!("Agent connection closed normally");
}
Err(e) => {
error!("Agent error: {}", e);
}
}
info!(
"Reconnecting in {} seconds...",
args.reconnect_interval
);
sleep(Duration::from_secs(args.reconnect_interval)).await;
}
}
async fn run_agent(args: &Args) -> Result<()> {
// Get node name
let node_name = args.name.clone().unwrap_or_else(|| {
hostname::get()
.map(|h| h.to_string_lossy().to_string())
.unwrap_or_else(|_| "unknown".to_string())
});
// Discover network interfaces
let addresses = discover_addresses()?;
info!("Discovered addresses: {:?}", addresses);
// Connect to WebSocket
let (ws_stream, _) = connect_async(&args.server).await?;
info!("Connected to control plane");
let (mut write, mut read) = ws_stream.split();
// Send registration message
let register_msg = AgentMessage::Register {
name: node_name.clone(),
addresses: addresses.clone(),
};
let json = serde_json::to_string(&register_msg)?;
write.send(Message::Text(json)).await?;
info!("Sent registration for node: {}", node_name);
// Spawn heartbeat task
let (heartbeat_tx, mut heartbeat_rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
if heartbeat_tx.send(()).await.is_err() {
break;
}
}
});
// Main message loop
loop {
tokio::select! {
msg = read.next() => {
match msg {
Some(Ok(Message::Text(text))) => {
match serde_json::from_str::<ServerMessage>(&text) {
Ok(server_msg) => handle_server_message(server_msg).await?,
Err(e) => warn!("Failed to parse server message: {}", e),
}
}
Some(Ok(Message::Close(_))) => {
info!("Server closed connection");
break;
}
Some(Err(e)) => {
error!("WebSocket error: {}", e);
break;
}
None => {
info!("WebSocket stream ended");
break;
}
_ => {}
}
}
_ = heartbeat_rx.recv() => {
let heartbeat = serde_json::to_string(&AgentMessage::Heartbeat)?;
if let Err(e) = write.send(Message::Text(heartbeat)).await {
error!("Failed to send heartbeat: {}", e);
break;
}
debug!("Sent heartbeat");
}
}
}
Ok(())
}
async fn handle_server_message(msg: ServerMessage) -> Result<()> {
match msg {
ServerMessage::Config {
node_id,
private_key,
listen,
peers,
allowed_public_keys,
} => {
info!("Received initial configuration:");
info!(" Node ID: {}", node_id);
info!(" Private Key: {}...", &private_key[..16]);
info!(" Listen endpoints: {:?}", listen);
info!(" Peers: {} configured", peers.len());
for peer in &peers {
debug!(" - {}", peer);
}
info!(" Allowed keys: {} configured", allowed_public_keys.len());
// TODO: Apply configuration to Yggdrasil
info!("Configuration received (not yet applied)");
}
ServerMessage::Update {
peers,
allowed_public_keys,
} => {
info!("Received configuration update:");
info!(" Updated peers: {} configured", peers.len());
for peer in &peers {
debug!(" - {}", peer);
}
info!(" Updated allowed keys: {} configured", allowed_public_keys.len());
// TODO: Apply configuration update to Yggdrasil
info!("Configuration update received (not yet applied)");
}
ServerMessage::Error { message } => {
error!("Server error: {}", message);
}
}
Ok(())
}
fn discover_addresses() -> Result<Vec<String>> {
let interfaces = NetworkInterface::show()?;
let mut addresses = Vec::new();
for interface in interfaces {
// Skip loopback and down interfaces
if interface.name.starts_with("lo") {
continue;
}
for addr in interface.addr {
match addr {
network_interface::Addr::V4(v4) => {
let ip = v4.ip.to_string();
// Skip link-local and private addresses for now
// In production, you might want to be more selective
if !ip.starts_with("127.") && !ip.starts_with("169.254.") {
addresses.push(ip);
}
}
network_interface::Addr::V6(v6) => {
let ip = v6.ip.to_string();
// Skip link-local IPv6
if !ip.starts_with("fe80:") && !ip.starts_with("::1") {
addresses.push(ip);
}
}
}
}
}
// If no addresses found, return empty vec (will use localhost)
Ok(addresses)
}

140
src/cli.rs Normal file
View File

@@ -0,0 +1,140 @@
use clap::Parser;
use serde::{Deserialize, Serialize};
#[derive(Parser, Debug)]
#[command(
name = "yggman",
version = env!("CARGO_PKG_VERSION"),
about = "Yggdrasil Network Control Plane Manager",
long_about = "A centralized control plane for managing Yggdrasil node configurations and mesh topology"
)]
pub struct CliArgs {
/// Configuration file path
#[arg(short, long, default_value = "config.toml", env = "YGGMAN_CONFIG")]
pub config: String,
/// Server bind address
#[arg(long, env = "YGGMAN_BIND_ADDRESS")]
pub bind_address: Option<String>,
/// Server port
#[arg(short, long, env = "YGGMAN_PORT")]
pub port: Option<u16>,
/// Number of worker threads
#[arg(long, env = "YGGMAN_WORKERS")]
pub workers: Option<usize>,
/// Database URL (SQLite: sqlite://path.db, PostgreSQL: postgresql://user:pass@host:port/db)
#[arg(long, env = "YGGMAN_DATABASE_URL")]
pub database_url: Option<String>,
/// Maximum database connections
#[arg(long, env = "YGGMAN_MAX_DB_CONNECTIONS")]
pub max_db_connections: Option<u32>,
/// Maximum peers per node
#[arg(long, env = "YGGMAN_MAX_PEERS")]
pub max_peers: Option<usize>,
/// Topology update interval in seconds
#[arg(long, env = "YGGMAN_TOPOLOGY_UPDATE_INTERVAL")]
pub topology_update_interval: Option<u64>,
/// Log level (trace, debug, info, warn, error)
#[arg(long, default_value = "info", env = "YGGMAN_LOG_LEVEL")]
pub log_level: String,
/// Enable debug mode
#[arg(long, env = "YGGMAN_DEBUG")]
pub debug: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnvConfig {
#[serde(default)]
pub server: EnvServerConfig,
#[serde(default)]
pub database: EnvDatabaseConfig,
#[serde(default)]
pub nodes: EnvNodesConfig,
#[serde(default)]
pub log_level: Option<String>,
#[serde(default)]
pub debug: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnvServerConfig {
pub bind_address: Option<String>,
pub port: Option<u16>,
pub workers: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnvDatabaseConfig {
pub url: Option<String>,
pub max_connections: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnvNodesConfig {
pub max_peers_per_node: Option<usize>,
pub topology_update_interval: Option<u64>,
}
impl Default for EnvConfig {
fn default() -> Self {
Self {
server: EnvServerConfig::default(),
database: EnvDatabaseConfig::default(),
nodes: EnvNodesConfig::default(),
log_level: None,
debug: None,
}
}
}
impl Default for EnvServerConfig {
fn default() -> Self {
Self {
bind_address: None,
port: None,
workers: None,
}
}
}
impl Default for EnvDatabaseConfig {
fn default() -> Self {
Self {
url: None,
max_connections: None,
}
}
}
impl Default for EnvNodesConfig {
fn default() -> Self {
Self {
max_peers_per_node: None,
topology_update_interval: None,
}
}
}
impl CliArgs {
pub fn parse_args() -> Self {
Self::parse()
}
}
/// Load environment variables with YGGMAN_ prefix
pub fn load_env_config() -> Result<EnvConfig, envy::Error> {
envy::prefixed("YGGMAN_").from_env::<EnvConfig>()
}

169
src/config/mod.rs Normal file
View File

@@ -0,0 +1,169 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use arc_swap::ArcSwap;
use std::sync::Arc;
use crate::cli::{CliArgs, EnvConfig};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AppConfig {
#[serde(default)]
pub server: ServerConfig,
#[serde(default)]
pub database: DatabaseConfig,
#[serde(default)]
pub nodes: NodesConfig,
#[serde(default)]
pub modules: HashMap<String, serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerConfig {
pub bind_address: String,
pub port: u16,
pub workers: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseConfig {
pub url: String,
pub max_connections: u32,
pub connect_timeout: u64,
pub acquire_timeout: u64,
pub idle_timeout: u64,
pub max_lifetime: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodesConfig {
pub max_peers_per_node: usize,
pub topology_update_interval: u64,
pub default_listen_endpoints: Vec<String>,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
bind_address: "127.0.0.1".to_string(),
port: 8080,
workers: 4,
}
}
}
impl Default for DatabaseConfig {
fn default() -> Self {
Self {
url: "sqlite://yggman.db".to_string(),
max_connections: 10,
connect_timeout: 30,
acquire_timeout: 30,
idle_timeout: 600,
max_lifetime: 3600,
}
}
}
impl Default for NodesConfig {
fn default() -> Self {
Self {
max_peers_per_node: 3,
topology_update_interval: 60,
default_listen_endpoints: vec!["tcp://0.0.0.0:9001".to_string()],
}
}
}
impl Default for AppConfig {
fn default() -> Self {
Self {
server: ServerConfig::default(),
database: DatabaseConfig::default(),
nodes: NodesConfig::default(),
modules: HashMap::new(),
}
}
}
pub struct ConfigManager {
config: Arc<ArcSwap<AppConfig>>,
}
impl ConfigManager {
pub fn new(config: AppConfig) -> Self {
Self {
config: Arc::new(ArcSwap::from_pointee(config)),
}
}
pub fn get(&self) -> Arc<AppConfig> {
self.config.load_full()
}
/// Load configuration from multiple sources with precedence:
/// CLI args > Environment variables > Config file > Defaults
pub fn load_merged_config(cli_args: &CliArgs, env_config: &EnvConfig) -> Result<AppConfig, crate::error::AppError> {
// Start with default config
let mut config = AppConfig::default();
// Load from config file if it exists
if std::path::Path::new(&cli_args.config).exists() {
let content = std::fs::read_to_string(&cli_args.config)
.map_err(|e| crate::error::AppError::Config(format!("Failed to read config file: {}", e)))?;
config = toml::from_str(&content)
.map_err(|e| crate::error::AppError::Config(format!("Failed to parse config file: {}", e)))?;
}
// Override with environment variables
if let Some(bind_address) = &env_config.server.bind_address {
config.server.bind_address = bind_address.clone();
}
if let Some(port) = env_config.server.port {
config.server.port = port;
}
if let Some(workers) = env_config.server.workers {
config.server.workers = workers;
}
if let Some(db_url) = &env_config.database.url {
config.database.url = db_url.clone();
}
if let Some(max_connections) = env_config.database.max_connections {
config.database.max_connections = max_connections;
}
if let Some(max_peers) = env_config.nodes.max_peers_per_node {
config.nodes.max_peers_per_node = max_peers;
}
if let Some(topology_update) = env_config.nodes.topology_update_interval {
config.nodes.topology_update_interval = topology_update;
}
// Override with CLI arguments (highest priority)
if let Some(bind_address) = &cli_args.bind_address {
config.server.bind_address = bind_address.clone();
}
if let Some(port) = cli_args.port {
config.server.port = port;
}
if let Some(workers) = cli_args.workers {
config.server.workers = workers;
}
if let Some(db_url) = &cli_args.database_url {
config.database.url = db_url.clone();
}
if let Some(max_connections) = cli_args.max_db_connections {
config.database.max_connections = max_connections;
}
if let Some(max_peers) = cli_args.max_peers {
config.nodes.max_peers_per_node = max_peers;
}
if let Some(topology_update) = cli_args.topology_update_interval {
config.nodes.topology_update_interval = topology_update;
}
Ok(config)
}
}

53
src/core/app.rs Normal file
View File

@@ -0,0 +1,53 @@
use std::sync::Arc;
use crate::config::{AppConfig, ConfigManager};
use crate::core::context::AppContext;
use crate::core::module::ModuleManager;
use crate::error::Result;
use tokio::signal;
pub struct Application {
module_manager: ModuleManager,
}
impl Application {
pub fn new(config: AppConfig) -> Self {
let config_manager = Arc::new(ConfigManager::new(config));
let context = Arc::new(AppContext::new(config_manager));
let module_manager = ModuleManager::new(context);
Self {
module_manager,
}
}
pub fn register_module(&mut self, module: Box<dyn crate::core::module::Module>) {
self.module_manager.register(module);
}
pub async fn run(mut self) -> Result<()> {
tracing::info!("Starting application");
self.module_manager.init_all().await?;
self.module_manager.start_all().await?;
tokio::select! {
_ = signal::ctrl_c() => {
tracing::info!("Received SIGINT, shutting down");
}
}
self.shutdown().await?;
Ok(())
}
async fn shutdown(self) -> Result<()> {
tracing::info!("Shutting down application");
self.module_manager.stop_all().await?;
tracing::info!("Application shutdown complete");
Ok(())
}
}

14
src/core/context.rs Normal file
View File

@@ -0,0 +1,14 @@
use std::sync::Arc;
use crate::config::ConfigManager;
pub struct AppContext {
pub config_manager: Arc<ConfigManager>,
}
impl AppContext {
pub fn new(config_manager: Arc<ConfigManager>) -> Self {
Self {
config_manager,
}
}
}

3
src/core/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod app;
pub mod context;
pub mod module;

57
src/core/module.rs Normal file
View File

@@ -0,0 +1,57 @@
use async_trait::async_trait;
use std::sync::Arc;
use crate::core::context::AppContext;
use crate::error::Result;
#[async_trait]
pub trait Module: Send + Sync {
fn name(&self) -> &str;
async fn init(&mut self, context: Arc<AppContext>) -> Result<()>;
async fn start(&self) -> Result<()>;
async fn stop(&self) -> Result<()>;
}
pub struct ModuleManager {
modules: Vec<Box<dyn Module>>,
context: Arc<AppContext>,
}
impl ModuleManager {
pub fn new(context: Arc<AppContext>) -> Self {
Self {
modules: Vec::new(),
context,
}
}
pub fn register(&mut self, module: Box<dyn Module>) {
self.modules.push(module);
}
pub async fn init_all(&mut self) -> Result<()> {
for module in &mut self.modules {
tracing::info!("Initializing module: {}", module.name());
module.init(self.context.clone()).await?;
}
Ok(())
}
pub async fn start_all(&self) -> Result<()> {
for module in &self.modules {
tracing::info!("Starting module: {}", module.name());
module.start().await?;
}
Ok(())
}
pub async fn stop_all(&self) -> Result<()> {
for module in self.modules.iter().rev() {
tracing::info!("Stopping module: {}", module.name());
module.stop().await?;
}
Ok(())
}
}

View File

@@ -0,0 +1,64 @@
use sea_orm::{Database, DatabaseConnection, DbErr, ConnectionTrait};
use sea_orm::{Schema, DbBackend, Statement};
use migration::prelude::{SqliteQueryBuilder, PostgresQueryBuilder, MysqlQueryBuilder};
use std::time::Duration;
use std::path::Path;
use crate::config::DatabaseConfig;
pub async fn create_connection(config: &DatabaseConfig) -> Result<DatabaseConnection, DbErr> {
// Create SQLite database file if it doesn't exist
if config.url.starts_with("sqlite://") {
let db_path = config.url.strip_prefix("sqlite://").unwrap_or(&config.url);
// Create parent directories if they don't exist
if let Some(parent) = Path::new(db_path).parent() {
if !parent.exists() {
std::fs::create_dir_all(parent)
.map_err(|e| DbErr::Custom(format!("Failed to create database directory: {}", e)))?;
}
}
// Create empty database file if it doesn't exist
if !Path::new(db_path).exists() {
std::fs::File::create(db_path)
.map_err(|e| DbErr::Custom(format!("Failed to create database file: {}", e)))?;
tracing::info!("Created SQLite database file: {}", db_path);
}
}
let mut options = sea_orm::ConnectOptions::new(&config.url);
options
.max_connections(config.max_connections)
.min_connections(1)
.connect_timeout(Duration::from_secs(config.connect_timeout))
.acquire_timeout(Duration::from_secs(config.acquire_timeout))
.idle_timeout(Duration::from_secs(config.idle_timeout))
.max_lifetime(Duration::from_secs(config.max_lifetime))
.sqlx_logging(true)
.sqlx_logging_level(tracing::log::LevelFilter::Debug);
Database::connect(options).await
}
pub async fn migrate_database(db: &DatabaseConnection) -> Result<(), DbErr> {
// Get the database backend
let backend = db.get_database_backend();
let schema = Schema::new(backend);
// Create nodes table if it doesn't exist
let mut create_table_stmt = schema.create_table_from_entity(crate::database::entities::node::Entity);
// Convert to SQL
let sql = match backend {
DbBackend::Sqlite => create_table_stmt.if_not_exists().to_string(SqliteQueryBuilder),
DbBackend::Postgres => create_table_stmt.if_not_exists().to_string(PostgresQueryBuilder),
DbBackend::MySql => create_table_stmt.if_not_exists().to_string(MysqlQueryBuilder),
};
// Execute the statement
db.execute(Statement::from_string(backend, sql)).await?;
tracing::info!("Database migration completed");
Ok(())
}

View File

@@ -0,0 +1 @@
pub mod node;

View File

@@ -0,0 +1,82 @@
use sea_orm::entity::prelude::*;
use sea_orm::Set;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Serialize, Deserialize)]
#[sea_orm(table_name = "nodes")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub id: String,
pub name: String,
pub public_key: String,
pub private_key: String,
pub listen: String, // JSON array stored as string
pub addresses: String, // JSON array stored as string
pub created_at: DateTimeUtc,
pub updated_at: DateTimeUtc,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {
fn new() -> Self {
Self {
id: Set(uuid::Uuid::new_v4().to_string()),
created_at: Set(chrono::Utc::now()),
updated_at: Set(chrono::Utc::now()),
..ActiveModelTrait::default()
}
}
fn before_save<'life0, 'async_trait, C>(
mut self,
_db: &'life0 C,
_insert: bool,
) -> core::pin::Pin<Box<dyn core::future::Future<Output = Result<Self, DbErr>> + core::marker::Send + 'async_trait>>
where
'life0: 'async_trait,
C: 'async_trait + ConnectionTrait,
Self: 'async_trait,
{
Box::pin(async move {
self.updated_at = Set(chrono::Utc::now());
Ok(self)
})
}
}
// Conversion functions between database model and domain model
impl From<Model> for crate::yggdrasil::Node {
fn from(model: Model) -> Self {
let listen: Vec<String> = serde_json::from_str(&model.listen).unwrap_or_default();
let addresses: Vec<String> = serde_json::from_str(&model.addresses).unwrap_or_default();
crate::yggdrasil::Node {
id: model.id,
name: model.name,
public_key: model.public_key,
private_key: model.private_key,
listen,
addresses,
}
}
}
impl From<&crate::yggdrasil::Node> for ActiveModel {
fn from(node: &crate::yggdrasil::Node) -> Self {
let listen = serde_json::to_string(&node.listen).unwrap_or_default();
let addresses = serde_json::to_string(&node.addresses).unwrap_or_default();
ActiveModel {
id: Set(node.id.clone()),
name: Set(node.name.clone()),
public_key: Set(node.public_key.clone()),
private_key: Set(node.private_key.clone()),
listen: Set(listen),
addresses: Set(addresses),
created_at: Set(chrono::Utc::now()),
updated_at: Set(chrono::Utc::now()),
}
}
}

4
src/database/mod.rs Normal file
View File

@@ -0,0 +1,4 @@
pub mod entities;
pub mod connection;
pub use connection::*;

15
src/error.rs Normal file
View File

@@ -0,0 +1,15 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Configuration error: {0}")]
Config(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
}
pub type Result<T> = std::result::Result<T, AppError>;

63
src/main.rs Normal file
View File

@@ -0,0 +1,63 @@
mod cli;
mod config;
mod core;
mod database;
mod error;
mod modules;
mod node_manager;
mod yggdrasil;
mod websocket_state;
use anyhow::Result;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[tokio::main]
async fn main() -> Result<()> {
// Parse command line arguments
let cli_args = cli::CliArgs::parse_args();
// Load environment variables with YGGMAN_ prefix
let env_config = cli::load_env_config()
.unwrap_or_else(|_| cli::EnvConfig::default());
// Initialize tracing with log level from CLI or env
let log_level = if cli_args.debug {
"debug"
} else {
&cli_args.log_level
};
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| format!("yggman={},info", log_level).into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
tracing::info!("Starting yggman v{}", env!("CARGO_PKG_VERSION"));
tracing::debug!("CLI args: {:?}", cli_args);
tracing::debug!("Environment config: {:?}", env_config);
// Load merged configuration
let config = config::ConfigManager::load_merged_config(&cli_args, &env_config)?;
tracing::info!("Configuration loaded from: CLI args, env vars, config file: {}", cli_args.config);
tracing::info!("Database URL: {}", config.database.url);
// Initialize database connection
let db = database::create_connection(&config.database).await
.map_err(|e| anyhow::anyhow!("Failed to connect to database: {}", e))?;
tracing::info!("Database connection established");
// Run migrations
database::migrate_database(&db).await
.map_err(|e| anyhow::anyhow!("Failed to migrate database: {}", e))?;
let mut app = core::app::Application::new(config);
app.register_module(Box::new(modules::web::WebModule::new(db)));
app.run().await?;
Ok(())
}

40
src/modules/example.rs Normal file
View File

@@ -0,0 +1,40 @@
use async_trait::async_trait;
use std::sync::Arc;
use crate::core::context::AppContext;
use crate::core::module::Module;
use crate::error::Result;
pub struct ExampleModule {
name: String,
context: Option<Arc<AppContext>>,
}
#[async_trait]
impl Module for ExampleModule {
fn name(&self) -> &str {
&self.name
}
async fn init(&mut self, context: Arc<AppContext>) -> Result<()> {
self.context = Some(context);
tracing::debug!("Example module initialized");
Ok(())
}
async fn start(&self) -> Result<()> {
tracing::debug!("Example module started");
if let Some(ctx) = &self.context {
let config = ctx.config_manager.get();
tracing::debug!("Current config: {:?}", config);
}
Ok(())
}
async fn stop(&self) -> Result<()> {
tracing::debug!("Example module stopped");
Ok(())
}
}

3
src/modules/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod example;
pub mod web;
pub mod websocket;

273
src/modules/web.rs Normal file
View File

@@ -0,0 +1,273 @@
use async_trait::async_trait;
use axum::{
extract::{State, Path, WebSocketUpgrade},
http::StatusCode,
response::{Html, Json, Response},
routing::{get, post, put, delete},
Router,
};
use std::sync::Arc;
use tower_http::cors::CorsLayer;
use sea_orm::DatabaseConnection;
use crate::core::context::AppContext;
use crate::core::module::Module;
use crate::error::Result;
use crate::node_manager::NodeManager;
use crate::yggdrasil::{Node, YggdrasilConfig};
pub struct WebModule {
name: String,
context: Option<Arc<AppContext>>,
node_manager: Arc<NodeManager>,
}
impl WebModule {
pub fn new(db: DatabaseConnection) -> Self {
Self {
name: "web".to_string(),
context: None,
node_manager: Arc::new(NodeManager::new(db)),
}
}
}
#[async_trait]
impl Module for WebModule {
fn name(&self) -> &str {
&self.name
}
async fn init(&mut self, context: Arc<AppContext>) -> Result<()> {
self.context = Some(context);
tracing::info!("Web module initialized");
Ok(())
}
async fn start(&self) -> Result<()> {
let context = self.context.as_ref().unwrap();
let config = context.config_manager.get();
let port = config.server.port;
tracing::info!("Starting web server on port {}", port);
let node_manager = self.node_manager.clone();
let app = Router::new()
.route("/", get(index_handler))
.route("/api/nodes", get(get_nodes_handler))
.route("/api/nodes", post(add_node_handler))
.route("/api/nodes/:id", get(get_node_handler))
.route("/api/nodes/:id", put(update_node_handler))
.route("/api/nodes/:id", delete(delete_node_handler))
.route("/api/configs", get(get_configs_handler))
.route("/api/nodes/:id/config", get(get_node_config_handler))
.route("/ws/agent", get(ws_agent_handler))
.layer(CorsLayer::permissive())
.with_state(node_manager);
let bind_addr = format!("{}:{}", config.server.bind_address, port);
let listener = tokio::net::TcpListener::bind(&bind_addr)
.await
.map_err(|e| crate::error::AppError::Io(e))?;
tokio::spawn(async move {
axum::serve(listener, app)
.await
.expect("Failed to run web server");
});
Ok(())
}
async fn stop(&self) -> Result<()> {
tracing::info!("Web module stopped");
Ok(())
}
}
async fn index_handler() -> Html<&'static str> {
Html(include_str!("../../static/index.html"))
}
#[derive(serde::Serialize)]
struct NodesResponse {
nodes: Vec<Node>,
}
async fn get_nodes_handler(
State(node_manager): State<Arc<NodeManager>>,
) -> Json<NodesResponse> {
let nodes = node_manager.get_all_nodes().await;
Json(NodesResponse { nodes })
}
#[derive(serde::Deserialize)]
struct AddNodeRequest {
name: String,
listen: Vec<String>,
addresses: Vec<String>,
}
#[derive(serde::Serialize)]
struct AddNodeResponse {
success: bool,
message: String,
}
async fn add_node_handler(
State(node_manager): State<Arc<NodeManager>>,
Json(payload): Json<AddNodeRequest>,
) -> Json<AddNodeResponse> {
match node_manager.add_node(payload.name, payload.listen, payload.addresses).await {
Ok(_) => {
// Broadcast update to all connected agents
crate::websocket_state::broadcast_configuration_update(&node_manager).await;
Json(AddNodeResponse {
success: true,
message: "Node added successfully".to_string(),
})
}
Err(e) => Json(AddNodeResponse {
success: false,
message: format!("Failed to add node: {}", e),
}),
}
}
#[derive(serde::Serialize)]
struct ConfigsResponse {
configs: Vec<NodeConfig>,
}
#[derive(serde::Serialize)]
struct NodeConfig {
node_id: String,
node_name: String,
node_addresses: Vec<String>,
config: YggdrasilConfig,
}
async fn get_configs_handler(
State(node_manager): State<Arc<NodeManager>>,
) -> Json<ConfigsResponse> {
let nodes = node_manager.get_all_nodes().await;
let configs_map = node_manager.generate_configs().await;
let mut configs = Vec::new();
for node in nodes {
if let Some(config) = configs_map.get(&node.id) {
configs.push(NodeConfig {
node_id: node.id.clone(),
node_name: node.name.clone(),
node_addresses: node.addresses.clone(),
config: config.clone(),
});
}
}
Json(ConfigsResponse { configs })
}
// Get single node handler
async fn get_node_handler(
State(node_manager): State<Arc<NodeManager>>,
Path(node_id): Path<String>,
) -> std::result::Result<Json<Node>, StatusCode> {
match node_manager.get_node_by_id(&node_id).await {
Some(node) => Ok(Json(node)),
None => Err(StatusCode::NOT_FOUND),
}
}
// Update node handler
async fn update_node_handler(
State(node_manager): State<Arc<NodeManager>>,
Path(node_id): Path<String>,
Json(payload): Json<AddNodeRequest>,
) -> std::result::Result<Json<AddNodeResponse>, StatusCode> {
match node_manager.update_node(&node_id, payload.name, payload.listen, payload.addresses).await {
Ok(_) => {
// Broadcast update to all connected agents
crate::websocket_state::broadcast_configuration_update(&node_manager).await;
Ok(Json(AddNodeResponse {
success: true,
message: "Node updated successfully".to_string(),
}))
}
Err(e) => {
if e.to_string().contains("Node not found") {
Err(StatusCode::NOT_FOUND)
} else {
Ok(Json(AddNodeResponse {
success: false,
message: format!("Failed to update node: {}", e),
}))
}
}
}
}
// Delete node handler
async fn delete_node_handler(
State(node_manager): State<Arc<NodeManager>>,
Path(node_id): Path<String>,
) -> std::result::Result<Json<AddNodeResponse>, StatusCode> {
match node_manager.remove_node(&node_id).await {
Ok(_) => {
// Broadcast update to all connected agents
crate::websocket_state::broadcast_configuration_update(&node_manager).await;
Ok(Json(AddNodeResponse {
success: true,
message: "Node deleted successfully".to_string(),
}))
}
Err(e) => {
if e.to_string().contains("Node not found") {
Err(StatusCode::NOT_FOUND)
} else {
Ok(Json(AddNodeResponse {
success: false,
message: format!("Failed to delete node: {}", e),
}))
}
}
}
}
// Get node configuration for agent
async fn get_node_config_handler(
State(node_manager): State<Arc<NodeManager>>,
Path(node_id): Path<String>,
) -> std::result::Result<Json<NodeConfig>, StatusCode> {
// Get the node
let node = match node_manager.get_node_by_id(&node_id).await {
Some(node) => node,
None => return Err(StatusCode::NOT_FOUND),
};
// Generate configurations for all nodes
let configs_map = node_manager.generate_configs().await;
// Get config for this specific node
match configs_map.get(&node_id) {
Some(config) => Ok(Json(NodeConfig {
node_id: node.id.clone(),
node_name: node.name.clone(),
node_addresses: node.addresses.clone(),
config: config.clone(),
})),
None => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
}
// WebSocket handler for agents
async fn ws_agent_handler(
ws: WebSocketUpgrade,
State(node_manager): State<Arc<NodeManager>>,
) -> Response {
ws.on_upgrade(move |socket| crate::modules::websocket::handle_agent_socket(socket, node_manager))
}

164
src/modules/websocket.rs Normal file
View File

@@ -0,0 +1,164 @@
use axum::extract::ws::{Message, WebSocket};
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::{debug, error, info, warn};
use crate::node_manager::NodeManager;
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum AgentMessage {
Register {
name: String,
addresses: Vec<String>,
},
Heartbeat,
UpdateAddresses {
addresses: Vec<String>,
},
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ServerMessage {
Config {
node_id: String,
private_key: String,
listen: Vec<String>,
peers: Vec<String>,
allowed_public_keys: Vec<String>,
},
Update {
peers: Vec<String>,
allowed_public_keys: Vec<String>,
},
Error {
message: String,
},
}
pub async fn handle_agent_socket(
socket: WebSocket,
node_manager: Arc<NodeManager>,
) {
let (mut sender, mut receiver) = socket.split();
let (tx, mut rx) = tokio::sync::mpsc::channel::<ServerMessage>(100);
let mut node_id: Option<String> = None;
// Spawn task to forward messages from channel to WebSocket
let send_task = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if let Ok(json) = serde_json::to_string(&msg) {
if sender.send(Message::Text(json)).await.is_err() {
break;
}
}
}
});
// Handle incoming messages
while let Some(msg) = receiver.next().await {
if let Ok(Message::Text(text)) = msg {
match serde_json::from_str::<AgentMessage>(&text) {
Ok(agent_msg) => {
match agent_msg {
AgentMessage::Register { name, addresses } => {
info!("Agent registration: {} with addresses {:?}", name, addresses);
// Get default endpoints from config
// TODO: Get from actual config, for now use hardcoded
let default_listen = vec!["tcp://0.0.0.0:9001".to_string()];
// Check if node already exists
let node = if let Some(existing_node) = node_manager.get_node_by_name(&name).await {
info!("Reusing existing node: {} ({})", existing_node.name, existing_node.id);
// Update addresses for existing node
match node_manager.update_node(&existing_node.id, name.clone(), default_listen.clone(), addresses).await {
Ok(_) => {
// Get the updated node
node_manager.get_node_by_id(&existing_node.id).await
}
Err(e) => {
warn!("Failed to update existing node addresses: {}", e);
Some(existing_node)
}
}
} else {
// Create new node
info!("Creating new node: {}", name);
match node_manager.add_node(name.clone(), default_listen.clone(), addresses).await {
Ok(_) => {
// Get the newly created node
node_manager.get_node_by_name(&name).await
}
Err(e) => {
let error_msg = ServerMessage::Error {
message: format!("Failed to register node: {}", e),
};
let _ = tx.send(error_msg).await;
None
}
}
};
if let Some(node) = node {
node_id = Some(node.id.clone());
// Register connection
crate::websocket_state::register_agent_connection(node.id.clone(), tx.clone()).await;
// Generate config for this node
let configs = node_manager.generate_configs().await;
if let Some(config) = configs.get(&node.id) {
let peers: Vec<String> = config.peers.clone();
let allowed_keys: Vec<String> = config.allowed_public_keys.clone();
let response = ServerMessage::Config {
node_id: node.id.clone(),
private_key: node.private_key.clone(),
listen: default_listen,
peers,
allowed_public_keys: allowed_keys,
};
if let Err(e) = tx.send(response).await {
error!("Failed to send config to agent: {}", e);
}
// Notify other agents about node connection
crate::websocket_state::broadcast_configuration_update(&node_manager).await;
}
}
}
AgentMessage::Heartbeat => {
debug!("Heartbeat from {:?}", node_id);
}
AgentMessage::UpdateAddresses { addresses } => {
if let Some(id) = &node_id {
info!("Address update for {}: {:?}", id, addresses);
// TODO: Update node addresses in database
}
}
}
}
Err(e) => {
warn!("Failed to parse agent message: {}", e);
}
}
}
}
// Clean up
if let Some(id) = node_id {
crate::websocket_state::unregister_agent_connection(&id).await;
info!("Agent {} disconnected", id);
}
// Abort send task
send_task.abort();
}

222
src/node_manager.rs Normal file
View File

@@ -0,0 +1,222 @@
use crate::yggdrasil::{Node, YggdrasilConfig};
use crate::database::entities::node as node_entity;
use ed25519_dalek::{SigningKey, VerifyingKey};
use sea_orm::{DatabaseConnection, EntityTrait, ActiveModelTrait};
use std::collections::HashMap;
pub struct NodeManager {
db: DatabaseConnection,
}
impl NodeManager {
pub fn new(db: DatabaseConnection) -> Self {
Self { db }
}
pub async fn add_node(&self, name: String, listen: Vec<String>, addresses: Vec<String>) -> Result<(), crate::error::AppError> {
let signing_key = SigningKey::from_bytes(&rand::random());
let verifying_key: VerifyingKey = signing_key.verifying_key();
let private_seed = signing_key.to_bytes();
let public_key_bytes = verifying_key.to_bytes();
// Yggdrasil expects a 64-byte private key (32-byte seed + 32-byte public key)
let mut full_private_key = Vec::with_capacity(64);
full_private_key.extend_from_slice(&private_seed);
full_private_key.extend_from_slice(&public_key_bytes);
let private_key = hex::encode(full_private_key);
let public_key = hex::encode(public_key_bytes);
let node = Node {
id: format!("node-{}", uuid_simple()),
name: name.clone(),
public_key: public_key.clone(),
private_key,
listen,
addresses,
};
// Save to database
let active_model = node_entity::ActiveModel::from(&node);
active_model.insert(&self.db).await
.map_err(|e| crate::error::AppError::Config(format!("Database error: {}", e)))?;
Ok(())
}
pub async fn update_node(&self, node_id: &str, name: String, listen: Vec<String>, addresses: Vec<String>) -> Result<(), crate::error::AppError> {
// Check if node exists
let existing_node = node_entity::Entity::find_by_id(node_id)
.one(&self.db)
.await
.map_err(|e| crate::error::AppError::Config(format!("Database error: {}", e)))?;
if existing_node.is_none() {
return Err(crate::error::AppError::Config("Node not found".to_string()));
}
// Update the node
let mut active_model: node_entity::ActiveModel = existing_node.unwrap().into();
active_model.name = sea_orm::Set(name);
active_model.listen = sea_orm::Set(serde_json::to_string(&listen).unwrap_or_default());
active_model.addresses = sea_orm::Set(serde_json::to_string(&addresses).unwrap_or_default());
active_model.update(&self.db).await
.map_err(|e| crate::error::AppError::Config(format!("Database error: {}", e)))?;
Ok(())
}
pub async fn remove_node(&self, node_id: &str) -> Result<(), crate::error::AppError> {
let result = node_entity::Entity::delete_by_id(node_id)
.exec(&self.db)
.await
.map_err(|e| crate::error::AppError::Config(format!("Database error: {}", e)))?;
if result.rows_affected == 0 {
return Err(crate::error::AppError::Config("Node not found".to_string()));
}
Ok(())
}
pub async fn get_node_by_id(&self, node_id: &str) -> Option<Node> {
match node_entity::Entity::find_by_id(node_id).one(&self.db).await {
Ok(Some(model)) => Some(Node::from(model)),
_ => None,
}
}
pub async fn get_node_by_name(&self, name: &str) -> Option<Node> {
use sea_orm::{ColumnTrait, QueryFilter};
match node_entity::Entity::find()
.filter(node_entity::Column::Name.eq(name))
.one(&self.db).await {
Ok(Some(model)) => Some(Node::from(model)),
_ => None,
}
}
pub async fn get_all_nodes(&self) -> Vec<Node> {
match node_entity::Entity::find().all(&self.db).await {
Ok(models) => models.into_iter().map(Node::from).collect(),
Err(e) => {
tracing::error!("Failed to fetch nodes from database: {}", e);
Vec::new()
}
}
}
pub async fn generate_configs(&self) -> HashMap<String, YggdrasilConfig> {
let nodes = self.get_all_nodes().await;
let mut configs = HashMap::new();
let all_public_keys: Vec<String> = nodes
.iter()
.map(|n| n.public_key.clone())
.collect();
for node in &nodes {
let mut config = YggdrasilConfig::default();
config.private_key = node.private_key.clone();
config.listen = node.listen.clone();
let mut other_keys = all_public_keys.clone();
other_keys.retain(|k| k != &node.public_key);
config.allowed_public_keys = other_keys;
// Build peers from other nodes' listen endpoints
let mut peers: Vec<String> = Vec::new();
for other_node in &nodes {
if other_node.id != node.id {
// For each listen endpoint, create peers for all node addresses
for listen_addr in &other_node.listen {
// If no addresses provided, use localhost
let addresses_to_use = if other_node.addresses.is_empty() {
vec!["127.0.0.1".to_string()]
} else {
other_node.addresses.clone()
};
for address in &addresses_to_use {
if let Some(peer_addr) = convert_listen_to_peer_with_address(listen_addr, &other_node.public_key, address) {
peers.push(peer_addr);
}
}
}
}
}
config.peers = peers;
let mut node_info = HashMap::new();
node_info.insert("name".to_string(), serde_json::Value::String(node.name.clone()));
config.node_info = node_info;
configs.insert(node.id.clone(), config);
}
configs
}
}
fn uuid_simple() -> String {
use rand::Rng;
let mut rng = rand::thread_rng();
let bytes: Vec<u8> = (0..16).map(|_| rng.r#gen()).collect();
hex::encode(bytes)
}
fn convert_listen_to_peer_with_address(listen_addr: &str, public_key: &str, address: &str) -> Option<String> {
// Parse the listen address and convert to peer format
// Listen format: tcp://[::]:1234 or tcp://0.0.0.0:1234
// Peer format: tcp://REAL_IP:1234?key=PUBLIC_KEY
if listen_addr.contains("unix://") {
// Unix sockets are local only, skip
return None;
}
// Extract protocol and port
let parts: Vec<&str> = listen_addr.split("://").collect();
if parts.len() != 2 {
return None;
}
let protocol = parts[0];
let addr_part = parts[1];
// Extract port from address
let port = if addr_part.contains("]:") {
// IPv6 format [::]:port
addr_part.split("]:").nth(1)
} else {
// IPv4 format 0.0.0.0:port
addr_part.split(':').nth(1)
};
let port = port?;
// Check for query parameters in the original listen address
let (port_clean, params) = if port.contains('?') {
let parts: Vec<&str> = port.split('?').collect();
(parts[0], Some(parts[1]))
} else {
(port, None)
};
// Build the peer address with the specified IP
let mut peer_addr = format!("{}://{}:{}?key={}", protocol, address, port_clean, public_key);
// Add any additional parameters from the listen address
if let Some(params) = params {
peer_addr.push('&');
peer_addr.push_str(params);
}
Some(peer_addr)
}

72
src/websocket_state.rs Normal file
View File

@@ -0,0 +1,72 @@
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{info, warn};
use crate::modules::websocket::ServerMessage;
use crate::node_manager::NodeManager;
type ConnectionMap = Arc<RwLock<HashMap<String, tokio::sync::mpsc::Sender<ServerMessage>>>>;
lazy_static::lazy_static! {
static ref AGENT_CONNECTIONS: ConnectionMap = Arc::new(RwLock::new(HashMap::new()));
}
pub async fn register_agent_connection(node_id: String, tx: tokio::sync::mpsc::Sender<ServerMessage>) {
let mut connections = AGENT_CONNECTIONS.write().await;
connections.insert(node_id.clone(), tx);
info!("Registered agent connection for node: {}", node_id);
}
pub async fn unregister_agent_connection(node_id: &str) {
let mut connections = AGENT_CONNECTIONS.write().await;
connections.remove(node_id);
info!("Unregistered agent connection for node: {}", node_id);
}
pub async fn broadcast_configuration_update(node_manager: &Arc<NodeManager>) {
let mut connections = AGENT_CONNECTIONS.write().await;
let configs = node_manager.generate_configs().await;
info!("Broadcasting configuration update to {} connected agents", connections.len());
let mut failed_connections = Vec::new();
for (node_id, tx) in connections.iter() {
if let Some(config) = configs.get(node_id) {
let update = ServerMessage::Update {
peers: config.peers.clone(),
allowed_public_keys: config.allowed_public_keys.clone(),
};
if let Err(e) = tx.send(update).await {
warn!("Failed to send update to node {}: {}", node_id, e);
failed_connections.push(node_id.clone());
}
} else {
// Node was deleted, send empty configuration to disconnect agent gracefully
let update = ServerMessage::Update {
peers: vec![],
allowed_public_keys: vec![],
};
if let Err(e) = tx.send(update).await {
warn!("Failed to send final update to deleted node {}: {}", node_id, e);
failed_connections.push(node_id.clone());
} else {
info!("Sent final empty config to deleted node {}", node_id);
failed_connections.push(node_id.clone());
}
}
}
// Remove failed connections
for node_id in failed_connections {
connections.remove(&node_id);
info!("Removed failed connection for node: {}", node_id);
}
}
pub async fn get_connected_agents_count() -> usize {
AGENT_CONNECTIONS.read().await.len()
}

54
src/yggdrasil/mod.rs Normal file
View File

@@ -0,0 +1,54 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct YggdrasilConfig {
#[serde(rename = "PrivateKey")]
pub private_key: String,
pub peers: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub listen: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub allowed_public_keys: Vec<String>,
#[serde(rename = "IfName")]
pub if_name: String,
#[serde(rename = "IfMTU")]
pub if_mtu: u16,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_info_privacy: Option<bool>,
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub node_info: HashMap<String, serde_json::Value>,
}
impl Default for YggdrasilConfig {
fn default() -> Self {
Self {
private_key: String::new(),
peers: Vec::new(),
listen: Vec::new(),
allowed_public_keys: Vec::new(),
if_name: "auto".to_string(),
if_mtu: 65535,
node_info_privacy: Some(false),
node_info: HashMap::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Node {
pub id: String,
pub name: String,
pub public_key: String,
pub private_key: String,
pub listen: Vec<String>,
pub addresses: Vec<String>, // Real IP addresses of the node
}