This commit is contained in:
2025-12-24 02:46:22 +00:00
parent c14080ff4e
commit 016ba6583d
16 changed files with 1148 additions and 70 deletions

117
src/events/correlation.rs Normal file
View File

@@ -0,0 +1,117 @@
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
/// Tracks issues by node to detect patterns and enable smart correlation
#[derive(Clone)]
pub struct CorrelationEngine {
// node_name -> (pod_name, timestamp)
node_issues: Arc<RwLock<HashMap<String, Vec<(String, Instant)>>>>,
// node_name -> timestamp of last diagnosis (to avoid duplicate diagnoses)
diagnosed_nodes: Arc<RwLock<HashMap<String, Instant>>>,
}
impl CorrelationEngine {
pub fn new() -> Self {
Self {
node_issues: Arc::new(RwLock::new(HashMap::new())),
diagnosed_nodes: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Records a pod issue associated with a node
pub async fn record_pod_issue(&self, node_name: &str, pod_name: String) {
let mut issues = self.node_issues.write().await;
let node_issues = issues.entry(node_name.to_string()).or_insert_with(Vec::new);
// Clean old entries (older than 5 minutes)
node_issues.retain(|(_, timestamp)| timestamp.elapsed() < Duration::from_secs(300));
// Add new issue
node_issues.push((pod_name, Instant::now()));
debug!(node = %node_name, count = node_issues.len(), "Recorded pod issue on node");
}
/// Checks if there's a mass failure on a node (>5 pods in last 60 seconds)
/// Also checks if we already diagnosed this node recently to avoid duplicates
pub async fn is_mass_failure(&self, node_name: &str) -> (bool, Vec<String>) {
// First check if we already diagnosed this node recently (within last 5 minutes)
let diagnosed = self.diagnosed_nodes.read().await;
if let Some(last_diagnosis) = diagnosed.get(node_name) {
if last_diagnosis.elapsed() < Duration::from_secs(300) {
debug!(
node = %node_name,
elapsed_secs = last_diagnosis.elapsed().as_secs(),
"Skipping duplicate diagnosis - node recently diagnosed"
);
return (false, vec![]);
}
}
drop(diagnosed);
let issues = self.node_issues.read().await;
if let Some(node_issues) = issues.get(node_name) {
let recent_count = node_issues
.iter()
.filter(|(_, timestamp)| timestamp.elapsed() < Duration::from_secs(60))
.count();
if recent_count >= 5 {
let affected_pods: Vec<String> = node_issues
.iter()
.filter(|(_, timestamp)| timestamp.elapsed() < Duration::from_secs(60))
.map(|(name, _)| name.clone())
.collect();
warn!(
node = %node_name,
affected_pods = recent_count,
"Detected mass failure on node"
);
return (true, affected_pods);
}
}
(false, vec![])
}
/// Marks a node as diagnosed (prevents duplicate diagnoses)
pub async fn mark_node_diagnosed(&self, node_name: &str) {
let mut diagnosed = self.diagnosed_nodes.write().await;
diagnosed.insert(node_name.to_string(), Instant::now());
debug!(node = %node_name, "Marked node as diagnosed");
}
/// Clears recorded issues for a node (call when node becomes healthy)
pub async fn clear_node_issues(&self, node_name: &str) {
let mut issues = self.node_issues.write().await;
let mut diagnosed = self.diagnosed_nodes.write().await;
if issues.remove(node_name).is_some() {
info!(node = %node_name, "Cleared node issues - node recovered");
}
// Also clear diagnosis marker
diagnosed.remove(node_name);
}
/// Gets count of recent issues on a node
pub async fn get_recent_issue_count(&self, node_name: &str) -> usize {
let issues = self.node_issues.read().await;
issues
.get(node_name)
.map(|node_issues| {
node_issues
.iter()
.filter(|(_, timestamp)| timestamp.elapsed() < Duration::from_secs(300))
.count()
})
.unwrap_or(0)
}
}

86
src/events/formatter.rs Normal file
View File

@@ -0,0 +1,86 @@
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use std::time::{Duration, Instant};
use tracing::debug;
/// Deduplicates and formats diagnosis results to avoid spam
#[derive(Clone)]
pub struct DiagnosisFormatter {
// Hash of diagnosis content -> (timestamp, count)
seen_diagnoses: Arc<RwLock<HashMap<String, (Instant, usize)>>>,
}
impl DiagnosisFormatter {
pub fn new() -> Self {
Self {
seen_diagnoses: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Checks if this diagnosis is a duplicate and should be suppressed
/// Returns: (should_display, display_suffix)
pub async fn should_display(&self, diagnosis: &str) -> (bool, Option<String>) {
let diagnosis_hash = Self::hash_diagnosis(diagnosis);
let mut seen = self.seen_diagnoses.write().await;
// Clean old entries (older than 10 minutes)
seen.retain(|_, (timestamp, _)| timestamp.elapsed() < Duration::from_secs(600));
if let Some((last_seen, count)) = seen.get_mut(&diagnosis_hash) {
// Similar diagnosis seen recently
if last_seen.elapsed() < Duration::from_secs(300) {
// Within 5 minutes - increment count and suppress
*count += 1;
*last_seen = Instant::now();
debug!(
hash = %diagnosis_hash,
count = *count,
"Suppressing duplicate diagnosis"
);
return (false, Some(format!(" (seen {} times in last 5min)", count)));
} else {
// More than 5 minutes - reset and show
*last_seen = Instant::now();
*count = 1;
}
} else {
// First time seeing this diagnosis
seen.insert(diagnosis_hash.clone(), (Instant::now(), 1));
}
(true, None)
}
/// Creates a simplified hash of diagnosis to detect duplicates
/// Focuses on root cause rather than resource names
fn hash_diagnosis(diagnosis: &str) -> String {
// Extract key phrases from diagnosis
let normalized = diagnosis
.to_lowercase()
.lines()
.filter(|line| {
line.contains("root cause:")
|| line.contains("cause:")
|| line.contains("problem:")
|| line.contains("severity:")
})
.collect::<Vec<_>>()
.join(" ");
// Simple hash based on content
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
normalized.hash(&mut hasher);
format!("{:x}", hasher.finish())
}
/// Periodically clean old entries
pub async fn cleanup(&self) {
let mut seen = self.seen_diagnoses.write().await;
seen.retain(|_, (timestamp, _)| timestamp.elapsed() < Duration::from_secs(600));
debug!("Cleaned up diagnosis cache, {} entries remain", seen.len());
}
}

View File

@@ -1,23 +1,41 @@
use crate::ai::{AIClient, DiagnosticEngine};
use crate::k8s::{KubeClient, NodeEvent, NodeEventType, PodEvent, PodEventType};
use crate::telegram::TelegramNotifier;
use std::sync::Arc;
use tokio::sync::Semaphore;
use tracing::{error, info, warn};
use super::correlation::CorrelationEngine;
use super::formatter::DiagnosisFormatter;
#[derive(Clone)]
pub struct EventHandler {
kube_client: KubeClient,
ai_client: AIClient,
// Semaphore to limit concurrent AI diagnoses
diagnosis_semaphore: Arc<Semaphore>,
// Correlation engine to detect patterns
correlation: CorrelationEngine,
// Formatter to avoid duplicate diagnoses
formatter: DiagnosisFormatter,
// Optional Telegram notifier
telegram: Option<TelegramNotifier>,
}
impl EventHandler {
pub fn new(kube_client: KubeClient, ai_client: AIClient, max_concurrent: usize) -> Self {
pub fn new(
kube_client: KubeClient,
ai_client: AIClient,
max_concurrent: usize,
telegram: Option<TelegramNotifier>,
) -> Self {
Self {
kube_client,
ai_client,
diagnosis_semaphore: Arc::new(Semaphore::new(max_concurrent)),
correlation: CorrelationEngine::new(),
formatter: DiagnosisFormatter::new(),
telegram,
}
}
@@ -47,13 +65,73 @@ impl EventHandler {
}
NodeEventType::BecameReady => {
info!(node = %event.node_name, "Node became Ready");
// Clear correlation data for this node since it recovered
self.correlation.clear_node_issues(&event.node_name).await;
// Mark as resolved in Telegram
if let Some(ref telegram) = self.telegram {
telegram.mark_node_resolved(&event.node_name).await;
}
}
}
}
/// Handle pod event and trigger AI diagnostics if needed
pub async fn handle_pod_event(&self, event: PodEvent) {
// First, get pod details to determine which node it's on
let node_name = match self
.kube_client
.get_pod_details(&event.namespace, &event.pod_name)
.await
{
Ok(details) => details.node_name,
Err(e) => {
warn!(
pod = %event.pod_name,
namespace = %event.namespace,
error = %e,
"Failed to get pod details for correlation"
);
None
}
};
// If pod is on a node, record the issue for correlation
if let Some(ref node) = node_name {
self.correlation
.record_pod_issue(node, format!("{}/{}", event.namespace, event.pod_name))
.await;
// Check if this is part of a mass failure
let (is_mass_failure, affected_pods) = self.correlation.is_mass_failure(node).await;
if is_mass_failure {
info!(
node = %node,
affected_pods = affected_pods.len(),
"Detected mass pod failure on node - diagnosing node instead"
);
// Diagnose the node with context about affected pods
self.diagnose_node_with_pods(node, affected_pods).await;
return; // Don't diagnose individual pod
}
}
// Build problem description with node context
let problem_description = match &event.event_type {
PodEventType::Recovered => {
info!(
pod = %event.pod_name,
namespace = %event.namespace,
"Pod recovered - marking as resolved"
);
// Mark as resolved in Telegram
if let Some(ref telegram) = self.telegram {
telegram.mark_pod_resolved(&event.namespace, &event.pod_name).await;
}
return; // No diagnosis needed for recovery
}
PodEventType::HighRestartCount { count } => {
warn!(
pod = %event.pod_name,
@@ -117,7 +195,7 @@ impl EventHandler {
}
};
self.diagnose_pod(&event.namespace, &event.pod_name, &problem_description)
self.diagnose_pod(&event.namespace, &event.pod_name, &problem_description, node_name.as_deref())
.await;
}
@@ -129,55 +207,138 @@ impl EventHandler {
let diagnostic_engine = DiagnosticEngine::new(self.kube_client.clone());
match diagnostic_engine
let diagnosis_opt = match diagnostic_engine
.diagnose_nodes(&self.ai_client, vec![node_name.to_string()])
.await
{
Ok(diagnosis) => {
info!(
node = %node_name,
diagnosis = %diagnosis,
"AI diagnosis completed"
"Node diagnosis completed:\n{}", diagnosis
);
Some(diagnosis)
}
Err(e) => {
let error_msg = e.to_string();
error!(
node = %node_name,
error = %e,
error = %error_msg,
"AI diagnosis failed"
);
None
}
};
// Send to Telegram if configured (after match is complete)
if let Some(ref telegram) = self.telegram {
if let Some(diagnosis) = diagnosis_opt {
telegram.send_node_diagnosis(node_name, &diagnosis).await;
}
}
// Permit is automatically released when _permit is dropped
}
async fn diagnose_pod(&self, namespace: &str, pod_name: &str, problem: &str) {
async fn diagnose_pod(&self, namespace: &str, pod_name: &str, problem: &str, node_name: Option<&str>) {
// Acquire semaphore permit to limit concurrency
let _permit = self.diagnosis_semaphore.acquire().await.unwrap();
info!(pod = %pod_name, namespace = %namespace, "Starting AI diagnosis (acquired permit)");
info!(pod = %pod_name, namespace = %namespace, node = ?node_name, "Starting AI diagnosis (acquired permit)");
let diagnostic_engine = DiagnosticEngine::new(self.kube_client.clone());
match diagnostic_engine
.diagnose_pod(&self.ai_client, namespace, pod_name, problem)
// Add node context to problem description if available
let full_problem = if let Some(node) = node_name {
format!("{} (Pod is on node: {})", problem, node)
} else {
problem.to_string()
};
let diagnosis_opt = match diagnostic_engine
.diagnose_pod(&self.ai_client, namespace, pod_name, &full_problem)
.await
{
Ok(diagnosis) => {
info!(
pod = %pod_name,
namespace = %namespace,
diagnosis = %diagnosis,
"AI diagnosis completed"
"AI diagnosis completed:\n{}", diagnosis
);
Some(diagnosis)
}
Err(e) => {
let error_msg = e.to_string();
error!(
pod = %pod_name,
namespace = %namespace,
error = %e,
error = %error_msg,
"AI diagnosis failed"
);
None
}
};
// Send to Telegram if configured (after match is complete)
if let Some(ref telegram) = self.telegram {
if let Some(diagnosis) = diagnosis_opt {
telegram.send_pod_diagnosis(namespace, pod_name, &diagnosis).await;
}
}
// Permit is automatically released when _permit is dropped
}
/// Diagnose a node with context about affected pods (mass failure scenario)
async fn diagnose_node_with_pods(&self, node_name: &str, affected_pods: Vec<String>) {
// Mark node as diagnosed to prevent duplicate diagnoses for subsequent pod events
self.correlation.mark_node_diagnosed(node_name).await;
// Acquire semaphore permit to limit concurrency
let _permit = self.diagnosis_semaphore.acquire().await.unwrap();
info!(
node = %node_name,
affected_pods = affected_pods.len(),
"Starting grouped AI diagnosis (acquired permit)"
);
let diagnostic_engine = DiagnosticEngine::new(self.kube_client.clone());
// Create a detailed problem description for mass failure
let problem_description = format!(
"Node {} has issues affecting {} pods. Affected pods: {}. \
This appears to be a node-level problem rather than individual pod issues. \
Analyze the node state and determine the root cause.",
node_name,
affected_pods.len(),
affected_pods.join(", ")
);
let diagnosis_opt = match diagnostic_engine
.diagnose_node_with_context(&self.ai_client, node_name, &problem_description)
.await
{
Ok(diagnosis) => {
info!(
node = %node_name,
affected_pods = affected_pods.len(),
"Grouped diagnosis completed:\n{}", diagnosis
);
Some(diagnosis)
}
Err(e) => {
let error_msg = e.to_string();
error!(
node = %node_name,
error = %error_msg,
"Grouped AI diagnosis failed"
);
None
}
};
// Send to Telegram with grouped context (after match is complete)
if let Some(ref telegram) = self.telegram {
if let Some(diagnosis) = diagnosis_opt {
telegram.send_grouped_diagnosis(node_name, affected_pods.len(), &diagnosis).await;
}
}
// Permit is automatically released when _permit is dropped

View File

@@ -1,3 +1,7 @@
mod correlation;
mod formatter;
mod handler;
pub use correlation::CorrelationEngine;
pub use formatter::DiagnosisFormatter;
pub use handler::EventHandler;