This commit is contained in:
2025-12-24 01:36:12 +00:00
commit c14080ff4e
21 changed files with 4531 additions and 0 deletions

57
src/ai/client.rs Normal file
View File

@@ -0,0 +1,57 @@
use crate::config::Config;
use async_openai::{
config::OpenAIConfig,
types::chat::{
ChatCompletionRequestMessage, ChatCompletionRequestSystemMessageArgs,
ChatCompletionRequestUserMessageArgs,
},
Client,
};
#[derive(Clone)]
pub struct AIClient {
client: Client<OpenAIConfig>,
model: String,
system_prompt: String,
}
impl AIClient {
pub fn new(cfg: &Config) -> Self {
let config = OpenAIConfig::new()
.with_api_base(&cfg.api_base)
.with_api_key(&cfg.api_key);
let client = Client::with_config(config);
Self {
client,
model: cfg.model.clone(),
system_prompt: cfg.system_prompt.clone(),
}
}
pub fn client(&self) -> &Client<OpenAIConfig> {
&self.client
}
pub fn model(&self) -> &str {
&self.model
}
pub fn system_message(&self) -> Result<ChatCompletionRequestMessage, Box<dyn std::error::Error>> {
Ok(ChatCompletionRequestSystemMessageArgs::default()
.content(self.system_prompt.clone())
.build()?
.into())
}
pub fn user_message(
&self,
content: String,
) -> Result<ChatCompletionRequestMessage, Box<dyn std::error::Error>> {
Ok(ChatCompletionRequestUserMessageArgs::default()
.content(content)
.build()?
.into())
}
}

229
src/ai/diagnostics.rs Normal file
View File

@@ -0,0 +1,229 @@
use crate::k8s::KubeClient;
use crate::tools;
use async_openai::{
types::chat::{
ChatCompletionMessageToolCalls, ChatCompletionRequestAssistantMessageArgs,
ChatCompletionRequestMessage, ChatCompletionRequestToolMessageArgs,
ChatCompletionTools, CreateChatCompletionRequestArgs,
},
};
use serde::Deserialize;
use std::time::Duration;
use tracing::{debug, warn};
pub struct DiagnosticEngine {
kube_client: KubeClient,
}
#[derive(Debug, Deserialize)]
struct NodeToolArgs {
node_name: String,
}
#[derive(Debug, Deserialize)]
struct PodDetailsArgs {
namespace: String,
pod_name: String,
}
#[derive(Debug, Deserialize)]
struct PodLogsArgs {
namespace: String,
pod_name: String,
container_name: Option<String>,
tail_lines: Option<i64>,
}
impl DiagnosticEngine {
pub fn new(kube_client: KubeClient) -> Self {
Self { kube_client }
}
/// Diagnoses NotReady nodes using AI with tool calling capability
pub async fn diagnose_nodes(
&self,
ai_client: &super::AIClient,
not_ready_nodes: Vec<String>,
) -> Result<String, Box<dyn std::error::Error>> {
let problem_description = format!(
"The following Kubernetes nodes are in NotReady state: {}. \
Analyze the issue and determine the root cause. \
Use the get_node_details tool to inspect nodes if needed.",
not_ready_nodes.join(", ")
);
let tools: Vec<ChatCompletionTools> =
vec![ChatCompletionTools::Function(tools::get_node_details_tool())];
self.run_diagnosis(ai_client, problem_description, tools)
.await
}
/// Diagnoses problematic pods using AI with tool calling capability
pub async fn diagnose_pod(
&self,
ai_client: &super::AIClient,
namespace: &str,
pod_name: &str,
problem_description: &str,
) -> Result<String, Box<dyn std::error::Error>> {
let full_description = format!(
"Pod {}/{} has the following issue: {}. \
Analyze the problem and determine the root cause. \
Use get_pod_details to inspect the pod state and get_pod_logs to examine recent logs.",
namespace, pod_name, problem_description
);
let tools: Vec<ChatCompletionTools> = vec![
ChatCompletionTools::Function(tools::get_pod_details_tool()),
ChatCompletionTools::Function(tools::get_pod_logs_tool()),
];
self.run_diagnosis(ai_client, full_description, tools)
.await
}
async fn run_diagnosis(
&self,
ai_client: &super::AIClient,
problem_description: String,
tools: Vec<ChatCompletionTools>,
) -> Result<String, Box<dyn std::error::Error>> {
// Add timeout for entire diagnosis
let diagnosis = tokio::time::timeout(
Duration::from_secs(60),
self.run_diagnosis_inner(ai_client, problem_description, tools),
)
.await;
match diagnosis {
Ok(result) => result,
Err(_) => {
warn!("AI diagnosis timed out after 60 seconds");
Err("Diagnosis timeout".into())
}
}
}
async fn run_diagnosis_inner(
&self,
ai_client: &super::AIClient,
problem_description: String,
tools: Vec<ChatCompletionTools>,
) -> Result<String, Box<dyn std::error::Error>> {
let mut messages: Vec<ChatCompletionRequestMessage> = vec![
ai_client.system_message()?,
ai_client.user_message(problem_description)?,
];
// Conversation loop
for iteration in 0..10 {
debug!(iteration = iteration + 1, "AI diagnosis iteration");
let request = CreateChatCompletionRequestArgs::default()
.model(ai_client.model())
.messages(messages.clone())
.tools(tools.clone())
.build()?;
debug!("Sending request to AI");
let response = ai_client.client().chat().create(request).await?;
debug!("Received response from AI");
let choice = response.choices.first().ok_or("No response from AI")?;
let assistant_message = choice.message.clone();
if let Some(tool_calls) = &assistant_message.tool_calls {
debug!(tool_count = tool_calls.len(), "AI calling tools");
messages.push(
ChatCompletionRequestAssistantMessageArgs::default()
.tool_calls(tool_calls.clone())
.build()?
.into(),
);
for tool_call_enum in tool_calls {
if let ChatCompletionMessageToolCalls::Function(tool_call) = tool_call_enum {
let tool_name = &tool_call.function.name;
let tool_args = &tool_call.function.arguments;
debug!(tool = %tool_name, args = %tool_args, "Executing tool");
// Execute tool with error handling
let result = match self.execute_tool(tool_name, tool_args).await {
Ok(data) => {
debug!(tool = %tool_name, "Tool executed successfully");
data
}
Err(e) => {
warn!(tool = %tool_name, error = %e, "Tool execution failed");
// Return error as tool result instead of failing
format!(
"{{\"error\": \"Tool execution failed: {}\", \"available\": false}}",
e.to_string().replace('"', "'")
)
}
};
messages.push(
ChatCompletionRequestToolMessageArgs::default()
.content(result)
.tool_call_id(tool_call.id.clone())
.build()?
.into(),
);
}
}
} else {
if let Some(content) = &assistant_message.content {
debug!("AI provided final diagnosis");
return Ok(content.clone());
} else {
warn!("AI returned no content and no tool calls");
return Err("AI returned no content and no tool calls".into());
}
}
}
warn!("Maximum iterations reached without final answer");
Err("Maximum iterations reached without final answer".into())
}
async fn execute_tool(
&self,
tool_name: &str,
arguments_json: &str,
) -> Result<String, Box<dyn std::error::Error>> {
match tool_name {
"get_node_details" => {
let args: NodeToolArgs = serde_json::from_str(arguments_json)?;
let details = self.kube_client.get_node_details(&args.node_name).await?;
Ok(serde_json::to_string_pretty(&details)?)
}
"get_pod_details" => {
let args: PodDetailsArgs = serde_json::from_str(arguments_json)?;
let details = self
.kube_client
.get_pod_details(&args.namespace, &args.pod_name)
.await?;
Ok(serde_json::to_string_pretty(&details)?)
}
"get_pod_logs" => {
let args: PodLogsArgs = serde_json::from_str(arguments_json)?;
let tail_lines = args.tail_lines.or(Some(50));
let logs = self
.kube_client
.get_pod_logs(
&args.namespace,
&args.pod_name,
args.container_name.as_deref(),
tail_lines,
)
.await?;
Ok(serde_json::to_string_pretty(&logs)?)
}
_ => Err(format!("Unknown tool: {}", tool_name).into()),
}
}
}

5
src/ai/mod.rs Normal file
View File

@@ -0,0 +1,5 @@
mod client;
mod diagnostics;
pub use client::AIClient;
pub use diagnostics::DiagnosticEngine;

24
src/config.rs Normal file
View File

@@ -0,0 +1,24 @@
use serde::Deserialize;
use std::fs;
#[derive(Debug, Deserialize)]
pub struct Config {
pub api_base: String,
pub api_key: String,
pub model: String,
pub system_prompt: String,
#[serde(default = "default_max_concurrent_diagnoses")]
pub max_concurrent_diagnoses: usize,
}
fn default_max_concurrent_diagnoses() -> usize {
1
}
impl Config {
pub fn load(path: &str) -> Result<Self, Box<dyn std::error::Error>> {
let content = fs::read_to_string(path)?;
let config: Config = toml::from_str(&content)?;
Ok(config)
}
}

185
src/events/handler.rs Normal file
View File

@@ -0,0 +1,185 @@
use crate::ai::{AIClient, DiagnosticEngine};
use crate::k8s::{KubeClient, NodeEvent, NodeEventType, PodEvent, PodEventType};
use std::sync::Arc;
use tokio::sync::Semaphore;
use tracing::{error, info, warn};
#[derive(Clone)]
pub struct EventHandler {
kube_client: KubeClient,
ai_client: AIClient,
// Semaphore to limit concurrent AI diagnoses
diagnosis_semaphore: Arc<Semaphore>,
}
impl EventHandler {
pub fn new(kube_client: KubeClient, ai_client: AIClient, max_concurrent: usize) -> Self {
Self {
kube_client,
ai_client,
diagnosis_semaphore: Arc::new(Semaphore::new(max_concurrent)),
}
}
/// Handle node event and trigger AI diagnostics if needed
pub async fn handle_node_event(&self, event: NodeEvent) {
match event.event_type {
NodeEventType::BecameNotReady => {
warn!(
node = %event.node_name,
"Node became NotReady, starting AI diagnostics"
);
self.diagnose_node(&event.node_name).await;
}
NodeEventType::ConditionChanged {
ref condition_type,
ref status,
ref reason,
} => {
warn!(
node = %event.node_name,
condition = %condition_type,
status = %status,
reason = ?reason,
"Problematic condition detected, starting AI diagnostics"
);
self.diagnose_node(&event.node_name).await;
}
NodeEventType::BecameReady => {
info!(node = %event.node_name, "Node became Ready");
}
}
}
/// Handle pod event and trigger AI diagnostics if needed
pub async fn handle_pod_event(&self, event: PodEvent) {
let problem_description = match &event.event_type {
PodEventType::HighRestartCount { count } => {
warn!(
pod = %event.pod_name,
namespace = %event.namespace,
restart_count = count,
"Pod has high restart count"
);
format!("High restart count: {} restarts", count)
}
PodEventType::CrashLoopBackOff => {
warn!(
pod = %event.pod_name,
namespace = %event.namespace,
"Pod in CrashLoopBackOff"
);
"Container is in CrashLoopBackOff state".to_string()
}
PodEventType::ImagePullError => {
warn!(
pod = %event.pod_name,
namespace = %event.namespace,
"Pod has image pull error"
);
"Failed to pull container image".to_string()
}
PodEventType::Pending { reason } => {
warn!(
pod = %event.pod_name,
namespace = %event.namespace,
reason = ?reason,
"Pod stuck in Pending"
);
format!(
"Pod stuck in Pending state. Reason: {}",
reason.as_deref().unwrap_or("Unknown")
)
}
PodEventType::Failed { reason } => {
warn!(
pod = %event.pod_name,
namespace = %event.namespace,
reason = ?reason,
"Pod in Failed state"
);
format!(
"Pod in Failed state. Reason: {}",
reason.as_deref().unwrap_or("Unknown")
)
}
PodEventType::ContainerCreating { duration_seconds } => {
warn!(
pod = %event.pod_name,
namespace = %event.namespace,
duration_seconds = duration_seconds,
"Container creating for too long"
);
format!(
"Container has been creating for {} seconds",
duration_seconds
)
}
};
self.diagnose_pod(&event.namespace, &event.pod_name, &problem_description)
.await;
}
async fn diagnose_node(&self, node_name: &str) {
// Acquire semaphore permit to limit concurrency
let _permit = self.diagnosis_semaphore.acquire().await.unwrap();
info!(node = %node_name, "Starting AI diagnosis (acquired permit)");
let diagnostic_engine = DiagnosticEngine::new(self.kube_client.clone());
match diagnostic_engine
.diagnose_nodes(&self.ai_client, vec![node_name.to_string()])
.await
{
Ok(diagnosis) => {
info!(
node = %node_name,
diagnosis = %diagnosis,
"AI diagnosis completed"
);
}
Err(e) => {
error!(
node = %node_name,
error = %e,
"AI diagnosis failed"
);
}
}
// Permit is automatically released when _permit is dropped
}
async fn diagnose_pod(&self, namespace: &str, pod_name: &str, problem: &str) {
// Acquire semaphore permit to limit concurrency
let _permit = self.diagnosis_semaphore.acquire().await.unwrap();
info!(pod = %pod_name, namespace = %namespace, "Starting AI diagnosis (acquired permit)");
let diagnostic_engine = DiagnosticEngine::new(self.kube_client.clone());
match diagnostic_engine
.diagnose_pod(&self.ai_client, namespace, pod_name, problem)
.await
{
Ok(diagnosis) => {
info!(
pod = %pod_name,
namespace = %namespace,
diagnosis = %diagnosis,
"AI diagnosis completed"
);
}
Err(e) => {
error!(
pod = %pod_name,
namespace = %namespace,
error = %e,
"AI diagnosis failed"
);
}
}
// Permit is automatically released when _permit is dropped
}
}

3
src/events/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
mod handler;
pub use handler::EventHandler;

20
src/k8s/client.rs Normal file
View File

@@ -0,0 +1,20 @@
use kube::{Client, Config};
#[derive(Clone)]
pub struct KubeClient {
client: Client,
}
impl KubeClient {
/// Creates a client using standard credentials
/// (kubeconfig or service account from environment)
pub async fn new() -> Result<Self, Box<dyn std::error::Error>> {
let config = Config::infer().await?;
let client = Client::try_from(config)?;
Ok(Self { client })
}
pub fn inner(&self) -> &Client {
&self.client
}
}

10
src/k8s/mod.rs Normal file
View File

@@ -0,0 +1,10 @@
mod client;
mod nodes;
mod pods;
mod pod_watcher;
mod types;
mod watcher;
pub use client::KubeClient;
pub use pod_watcher::{PodEvent, PodEventType, PodWatcher};
pub use watcher::{NodeEvent, NodeEventType, NodeWatcher};

181
src/k8s/nodes.rs Normal file
View File

@@ -0,0 +1,181 @@
use super::client::KubeClient;
use super::types::{NodeCondition, NodeDetails, NodeResources, NodeStatus};
use k8s_openapi::api::core::v1::{Node, Pod};
use kube::{api::ListParams, Api};
impl KubeClient {
/// Gets list of all nodes and their statuses
pub async fn get_nodes(&self) -> Result<Vec<NodeStatus>, Box<dyn std::error::Error>> {
let nodes: Api<Node> = Api::all(self.inner().clone());
let node_list = nodes.list(&ListParams::default()).await?;
let mut result = Vec::new();
for node in node_list.items {
let name = node
.metadata
.name
.clone()
.unwrap_or_else(|| "unknown".to_string());
let ready = node
.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.and_then(|conditions| {
conditions
.iter()
.find(|c| c.type_ == "Ready")
.map(|c| c.status == "True")
})
.unwrap_or(false);
let version = node
.status
.as_ref()
.and_then(|s| s.node_info.as_ref())
.map(|info| info.kubelet_version.clone())
.unwrap_or_else(|| "unknown".to_string());
let internal_ip = node
.status
.as_ref()
.and_then(|s| s.addresses.as_ref())
.and_then(|addresses| {
addresses
.iter()
.find(|a| a.type_ == "InternalIP")
.map(|a| a.address.clone())
});
result.push(NodeStatus {
name,
ready,
version,
internal_ip,
});
}
Ok(result)
}
/// Gets detailed information about a specific node for diagnostics
pub async fn get_node_details(
&self,
node_name: &str,
) -> Result<NodeDetails, Box<dyn std::error::Error>> {
let nodes: Api<Node> = Api::all(self.inner().clone());
let node = nodes.get(node_name).await?;
// Conditions
let conditions = node
.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.map(|conds| {
conds
.iter()
.map(|c| NodeCondition {
type_: c.type_.clone(),
status: c.status.clone(),
reason: c.reason.clone(),
message: c.message.clone(),
})
.collect()
})
.unwrap_or_default();
// Resources - capacity
let capacity = node
.status
.as_ref()
.and_then(|s| s.capacity.as_ref())
.map(|c| NodeResources {
cpu: c.get("cpu").map(|q| q.0.clone()).unwrap_or_default(),
memory: c.get("memory").map(|q| q.0.clone()).unwrap_or_default(),
pods: c.get("pods").map(|q| q.0.clone()).unwrap_or_default(),
})
.unwrap_or(NodeResources {
cpu: "unknown".to_string(),
memory: "unknown".to_string(),
pods: "unknown".to_string(),
});
// Resources - allocatable
let allocatable = node
.status
.as_ref()
.and_then(|s| s.allocatable.as_ref())
.map(|a| NodeResources {
cpu: a.get("cpu").map(|q| q.0.clone()).unwrap_or_default(),
memory: a.get("memory").map(|q| q.0.clone()).unwrap_or_default(),
pods: a.get("pods").map(|q| q.0.clone()).unwrap_or_default(),
})
.unwrap_or(NodeResources {
cpu: "unknown".to_string(),
memory: "unknown".to_string(),
pods: "unknown".to_string(),
});
// Taints
let taints = node
.spec
.as_ref()
.and_then(|s| s.taints.as_ref())
.map(|t| {
t.iter()
.map(|taint| {
format!(
"{}={} (effect: {})",
taint.key,
taint.value.as_deref().unwrap_or(""),
taint.effect
)
})
.collect()
})
.unwrap_or_default();
// Labels
let labels = node
.metadata
.labels
.clone()
.unwrap_or_default()
.into_iter()
.collect::<Vec<_>>();
// Count pods on the node
let pods: Api<Pod> = Api::all(self.inner().clone());
let pod_list = pods.list(&ListParams::default()).await?;
let pod_count = pod_list
.items
.iter()
.filter(|pod| {
pod.spec
.as_ref()
.and_then(|s| s.node_name.as_ref())
.map(|n| n == node_name)
.unwrap_or(false)
})
.count();
let version = node
.status
.as_ref()
.and_then(|s| s.node_info.as_ref())
.map(|info| info.kubelet_version.clone())
.unwrap_or_else(|| "unknown".to_string());
Ok(NodeDetails {
name: node_name.to_string(),
conditions,
capacity,
allocatable,
taints,
labels,
pod_count,
version,
})
}
}

293
src/k8s/pod_watcher.rs Normal file
View File

@@ -0,0 +1,293 @@
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Pod;
use kube::{
runtime::{watcher, watcher::Config, WatchStreamExt},
Api, ResourceExt,
};
use std::collections::HashMap;
use tracing::{debug, info, warn};
use super::client::KubeClient;
#[derive(Debug, Clone)]
pub struct PodEvent {
pub pod_name: String,
pub namespace: String,
pub event_type: PodEventType,
}
#[derive(Debug, Clone, PartialEq)]
pub enum PodEventType {
HighRestartCount { count: i32 },
Pending { reason: Option<String> },
Failed { reason: Option<String> },
CrashLoopBackOff,
ImagePullError,
ContainerCreating { duration_seconds: i64 },
}
pub struct PodWatcher {
api: Api<Pod>,
// Track restart counts to detect increases
restart_counts: HashMap<String, i32>,
// Track already reported issues to prevent spam
reported_issues: HashMap<String, PodEventType>,
}
impl PodWatcher {
pub fn new(kube_client: &KubeClient) -> Self {
let api = Api::all(kube_client.inner().clone());
Self {
api,
restart_counts: HashMap::new(),
reported_issues: HashMap::new(),
}
}
/// Start watching pod events
pub async fn watch(
mut self,
) -> Result<
impl futures::Stream<Item = Result<PodEvent, Box<dyn std::error::Error + Send + Sync>>>,
Box<dyn std::error::Error>,
> {
info!("Starting pod watcher");
// Initialize current state
let pods = self.api.list(&Default::default()).await?;
for pod in pods.items {
let key = Self::pod_key(&pod);
let restart_count = Self::get_restart_count(&pod);
self.restart_counts.insert(key, restart_count);
}
let stream = watcher(self.api.clone(), Config::default())
.applied_objects()
.try_filter_map(move |pod| {
let event = self.process_pod_event(pod);
futures::future::ready(Ok(event))
})
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
Ok(stream)
}
fn process_pod_event(&mut self, pod: Pod) -> Option<PodEvent> {
let name = pod.name_any();
let namespace = pod.namespace().unwrap_or_else(|| "default".to_string());
let key = format!("{}/{}", namespace, name);
let phase = pod
.status
.as_ref()
.and_then(|s| s.phase.as_deref())
.unwrap_or("Unknown");
// Helper to check if we should report this issue
let should_report = |event_type: &PodEventType| -> bool {
match self.reported_issues.get(&key) {
Some(prev) if prev == event_type => false, // Already reported same issue
_ => true,
}
};
// Check for high restart count
let current_restart_count = Self::get_restart_count(&pod);
let previous_restart_count = self.restart_counts.get(&key).copied().unwrap_or(0);
if current_restart_count > previous_restart_count && current_restart_count >= 3 {
let event_type = PodEventType::HighRestartCount {
count: current_restart_count,
};
if should_report(&event_type) {
warn!(
pod = %name,
namespace = %namespace,
restart_count = current_restart_count,
"Pod has high restart count"
);
self.restart_counts.insert(key.clone(), current_restart_count);
self.reported_issues.insert(key.clone(), event_type.clone());
return Some(PodEvent {
pod_name: name,
namespace,
event_type,
});
}
}
self.restart_counts.insert(key.clone(), current_restart_count);
// Check container states for specific errors
if let Some(container_statuses) = pod
.status
.as_ref()
.and_then(|s| s.container_statuses.as_ref())
{
for cs in container_statuses {
if let Some(waiting) = cs.state.as_ref().and_then(|s| s.waiting.as_ref()) {
let reason = waiting.reason.as_deref().unwrap_or("");
match reason {
"CrashLoopBackOff" => {
let event_type = PodEventType::CrashLoopBackOff;
if should_report(&event_type) {
warn!(
pod = %name,
namespace = %namespace,
container = %cs.name,
"Container in CrashLoopBackOff"
);
self.reported_issues.insert(key.clone(), event_type.clone());
return Some(PodEvent {
pod_name: name,
namespace,
event_type,
});
}
}
"ImagePullBackOff" | "ErrImagePull" => {
let event_type = PodEventType::ImagePullError;
if should_report(&event_type) {
warn!(
pod = %name,
namespace = %namespace,
container = %cs.name,
"Image pull error"
);
self.reported_issues.insert(key.clone(), event_type.clone());
return Some(PodEvent {
pod_name: name,
namespace,
event_type,
});
}
}
"ContainerCreating" => {
// Check how long it's been creating
if let Some(start_time) = pod
.status
.as_ref()
.and_then(|s| s.start_time.as_ref())
{
let duration = chrono::Utc::now()
.signed_duration_since(start_time.0)
.num_seconds();
if duration > 300 {
// 5 minutes
let event_type = PodEventType::ContainerCreating {
duration_seconds: duration,
};
if should_report(&event_type) {
warn!(
pod = %name,
namespace = %namespace,
duration_seconds = duration,
"Container creating for too long"
);
self.reported_issues.insert(key.clone(), event_type.clone());
return Some(PodEvent {
pod_name: name,
namespace,
event_type,
});
}
}
}
}
_ => {}
}
}
}
}
// Check pod phase
match phase {
"Pending" => {
if let Some(reason) = Self::get_pending_reason(&pod) {
let event_type = PodEventType::Pending {
reason: Some(reason.clone()),
};
if should_report(&event_type) {
warn!(
pod = %name,
namespace = %namespace,
reason = %reason,
"Pod stuck in Pending"
);
self.reported_issues.insert(key.clone(), event_type.clone());
return Some(PodEvent {
pod_name: name,
namespace,
event_type,
});
}
}
}
"Failed" => {
let reason = Self::get_failure_reason(&pod);
let event_type = PodEventType::Failed {
reason: reason.clone(),
};
if should_report(&event_type) {
warn!(
pod = %name,
namespace = %namespace,
reason = ?reason,
"Pod in Failed state"
);
self.reported_issues.insert(key, event_type.clone());
return Some(PodEvent {
pod_name: name,
namespace,
event_type,
});
}
}
_ => {}
}
None
}
fn pod_key(pod: &Pod) -> String {
format!(
"{}/{}",
pod.namespace().unwrap_or_else(|| "default".to_string()),
pod.name_any()
)
}
fn get_restart_count(pod: &Pod) -> i32 {
pod.status
.as_ref()
.and_then(|s| s.container_statuses.as_ref())
.map(|cs| cs.iter().map(|c| c.restart_count).sum())
.unwrap_or(0)
}
fn get_pending_reason(pod: &Pod) -> Option<String> {
pod.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.and_then(|conditions| {
conditions
.iter()
.find(|c| c.type_ == "PodScheduled" && c.status == "False")
.and_then(|c| c.reason.clone())
})
}
fn get_failure_reason(pod: &Pod) -> Option<String> {
pod.status
.as_ref()
.and_then(|s| s.reason.clone())
.or_else(|| {
pod.status
.as_ref()
.and_then(|s| s.message.clone())
})
}
}

149
src/k8s/pods.rs Normal file
View File

@@ -0,0 +1,149 @@
use super::client::KubeClient;
use super::types::{ContainerStatus, PodCondition, PodDetails, PodLogs};
use k8s_openapi::api::core::v1::Pod;
use kube::{api::LogParams, Api};
impl KubeClient {
/// Gets detailed information about a specific pod for diagnostics
pub async fn get_pod_details(
&self,
namespace: &str,
pod_name: &str,
) -> Result<PodDetails, Box<dyn std::error::Error>> {
let pods: Api<Pod> = Api::namespaced(self.inner().clone(), namespace);
let pod = pods.get(pod_name).await?;
// Extract phase
let phase = pod
.status
.as_ref()
.and_then(|s| s.phase.clone())
.unwrap_or_else(|| "Unknown".to_string());
// Extract node name
let node_name = pod.spec.as_ref().and_then(|s| s.node_name.clone());
// Extract conditions
let conditions = pod
.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.map(|conds| {
conds
.iter()
.map(|c| PodCondition {
type_: c.type_.clone(),
status: c.status.clone(),
reason: c.reason.clone(),
message: c.message.clone(),
})
.collect()
})
.unwrap_or_default();
// Extract container statuses
let container_statuses: Vec<ContainerStatus> = pod
.status
.as_ref()
.and_then(|s| s.container_statuses.as_ref())
.map(|cs| {
cs.iter()
.map(|c| {
let (state, state_reason, state_message) = if let Some(waiting) =
&c.state.as_ref().and_then(|s| s.waiting.as_ref())
{
(
"Waiting".to_string(),
waiting.reason.clone(),
waiting.message.clone(),
)
} else if let Some(_running) =
&c.state.as_ref().and_then(|s| s.running.as_ref())
{
("Running".to_string(), None, None)
} else if let Some(terminated) =
&c.state.as_ref().and_then(|s| s.terminated.as_ref())
{
(
"Terminated".to_string(),
terminated.reason.clone(),
terminated.message.clone(),
)
} else {
("Unknown".to_string(), None, None)
};
ContainerStatus {
name: c.name.clone(),
ready: c.ready,
restart_count: c.restart_count,
state,
state_reason,
state_message,
}
})
.collect()
})
.unwrap_or_default();
// Calculate total restart count
let restart_count: i32 = container_statuses.iter().map(|c| c.restart_count).sum();
// Extract start time
let start_time = pod
.status
.as_ref()
.and_then(|s| s.start_time.as_ref())
.map(|t| t.0.to_rfc3339());
Ok(PodDetails {
name: pod_name.to_string(),
namespace: namespace.to_string(),
phase,
node_name,
conditions,
container_statuses,
restart_count,
start_time,
})
}
/// Gets recent logs from a pod's container
pub async fn get_pod_logs(
&self,
namespace: &str,
pod_name: &str,
container_name: Option<&str>,
tail_lines: Option<i64>,
) -> Result<PodLogs, Box<dyn std::error::Error>> {
let pods: Api<Pod> = Api::namespaced(self.inner().clone(), namespace);
// Determine which container to get logs from
let container = if let Some(name) = container_name {
name.to_string()
} else {
// Get first container name
let pod = pods.get(pod_name).await?;
pod.spec
.as_ref()
.and_then(|s| s.containers.first())
.map(|c| c.name.clone())
.ok_or("No containers found in pod")?
};
let log_params = LogParams {
container: Some(container.clone()),
tail_lines,
..Default::default()
};
let logs = pods.logs(pod_name, &log_params).await?;
Ok(PodLogs {
pod_name: pod_name.to_string(),
namespace: namespace.to_string(),
container_name: container,
logs,
})
}
}

76
src/k8s/types.rs Normal file
View File

@@ -0,0 +1,76 @@
use serde::Serialize;
// Node types
#[derive(Debug, Serialize)]
pub struct NodeStatus {
pub name: String,
pub ready: bool,
pub version: String,
pub internal_ip: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct NodeCondition {
pub type_: String,
pub status: String,
pub reason: Option<String>,
pub message: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct NodeResources {
pub cpu: String,
pub memory: String,
pub pods: String,
}
#[derive(Debug, Serialize)]
pub struct NodeDetails {
pub name: String,
pub conditions: Vec<NodeCondition>,
pub capacity: NodeResources,
pub allocatable: NodeResources,
pub taints: Vec<String>,
pub labels: Vec<(String, String)>,
pub pod_count: usize,
pub version: String,
}
// Pod types
#[derive(Debug, Serialize)]
pub struct PodCondition {
pub type_: String,
pub status: String,
pub reason: Option<String>,
pub message: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct ContainerStatus {
pub name: String,
pub ready: bool,
pub restart_count: i32,
pub state: String,
pub state_reason: Option<String>,
pub state_message: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct PodDetails {
pub name: String,
pub namespace: String,
pub phase: String,
pub node_name: Option<String>,
pub conditions: Vec<PodCondition>,
pub container_statuses: Vec<ContainerStatus>,
pub restart_count: i32,
pub start_time: Option<String>,
}
#[derive(Debug, Serialize)]
pub struct PodLogs {
pub pod_name: String,
pub namespace: String,
pub container_name: String,
pub logs: String,
}

170
src/k8s/watcher.rs Normal file
View File

@@ -0,0 +1,170 @@
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Node;
use kube::{
runtime::{watcher, watcher::Config, WatchStreamExt},
Api, ResourceExt,
};
use std::collections::HashMap;
use tracing::{debug, info, warn};
use super::client::KubeClient;
#[derive(Debug, Clone)]
pub struct NodeEvent {
pub node_name: String,
pub ready: bool,
pub event_type: NodeEventType,
}
#[derive(Debug, Clone, PartialEq)]
pub enum NodeEventType {
BecameNotReady,
BecameReady,
ConditionChanged { condition_type: String, status: String, reason: Option<String> },
}
pub struct NodeWatcher {
api: Api<Node>,
previous_states: HashMap<String, bool>,
}
impl NodeWatcher {
pub fn new(kube_client: &KubeClient) -> Self {
let api = Api::all(kube_client.inner().clone());
Self {
api,
previous_states: HashMap::new(),
}
}
/// Start watching node events
pub async fn watch(
mut self,
) -> Result<
impl futures::Stream<Item = Result<NodeEvent, Box<dyn std::error::Error + Send + Sync>>>,
Box<dyn std::error::Error>,
> {
info!("Starting node watcher");
// Initialize current state
let nodes = self.api.list(&Default::default()).await?;
for node in nodes.items {
let name = node.name_any();
let ready = Self::is_node_ready(&node);
self.previous_states.insert(name.clone(), ready);
debug!("Initial state: {} = Ready:{}", name, ready);
}
let stream = watcher(self.api.clone(), Config::default())
.applied_objects()
.try_filter_map(move |node| {
let event = self.process_node_event(node);
futures::future::ready(Ok(event))
})
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
Ok(stream)
}
fn process_node_event(&mut self, node: Node) -> Option<NodeEvent> {
let name = node.name_any();
let current_ready = Self::is_node_ready(&node);
let previous_ready = self.previous_states.get(&name).copied();
let event = match (previous_ready, current_ready) {
// Node became NotReady
(Some(true), false) | (None, false) => {
warn!("Node {} became NotReady", name);
Some(NodeEvent {
node_name: name.clone(),
ready: false,
event_type: NodeEventType::BecameNotReady,
})
}
// Node became Ready
(Some(false), true) => {
info!("Node {} became Ready", name);
Some(NodeEvent {
node_name: name.clone(),
ready: true,
event_type: NodeEventType::BecameReady,
})
}
// Check for condition changes even if Ready state is the same
_ => {
if let Some(problematic_condition) = Self::find_problematic_condition(&node) {
warn!(
"Node {} has problematic condition: {}",
name, problematic_condition.condition_type
);
Some(NodeEvent {
node_name: name.clone(),
ready: current_ready,
event_type: NodeEventType::ConditionChanged {
condition_type: problematic_condition.condition_type,
status: problematic_condition.status,
reason: problematic_condition.reason,
},
})
} else {
None
}
}
};
// Update state
self.previous_states.insert(name, current_ready);
event
}
fn is_node_ready(node: &Node) -> bool {
node.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.and_then(|conditions| {
conditions
.iter()
.find(|c| c.type_ == "Ready")
.map(|c| c.status == "True")
})
.unwrap_or(false)
}
fn find_problematic_condition(node: &Node) -> Option<ProblematicCondition> {
node.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.and_then(|conditions| {
for condition in conditions {
// Check for problematic conditions
match condition.type_.as_str() {
"Ready" if condition.status != "True" => {
return Some(ProblematicCondition {
condition_type: condition.type_.clone(),
status: condition.status.clone(),
reason: condition.reason.clone(),
});
}
"MemoryPressure" | "DiskPressure" | "PIDPressure"
if condition.status == "True" =>
{
return Some(ProblematicCondition {
condition_type: condition.type_.clone(),
status: condition.status.clone(),
reason: condition.reason.clone(),
});
}
_ => {}
}
}
None
})
}
}
#[derive(Debug)]
struct ProblematicCondition {
condition_type: String,
status: String,
reason: Option<String>,
}

123
src/main.rs Normal file
View File

@@ -0,0 +1,123 @@
mod ai;
mod config;
mod events;
mod k8s;
mod tools;
use ai::AIClient;
use config::Config;
use events::EventHandler;
use futures::StreamExt;
use k8s::{KubeClient, NodeWatcher, PodWatcher};
use tracing::{error, info};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into()))
.init();
info!("Starting duty-ai-ops daemon");
// Load configuration
let cfg = Config::load("config.toml")?;
info!("Configuration loaded");
// Initialize clients
let kube_client = KubeClient::new().await?;
info!("Connected to Kubernetes cluster");
let ai_client = AIClient::new(&cfg);
info!(
model = %cfg.model,
max_concurrent = cfg.max_concurrent_diagnoses,
"AI client initialized"
);
// Create event handler with concurrency limit
let event_handler = EventHandler::new(
kube_client.clone(),
ai_client,
cfg.max_concurrent_diagnoses,
);
// Start node watcher
let node_watcher = NodeWatcher::new(&kube_client);
let node_stream = node_watcher.watch().await?;
tokio::pin!(node_stream);
// Start pod watcher
let pod_watcher = PodWatcher::new(&kube_client);
let pod_stream = pod_watcher.watch().await?;
tokio::pin!(pod_stream);
info!("Node and Pod watchers started, monitoring for events");
// Setup graceful shutdown
let ctrl_c = tokio::signal::ctrl_c();
tokio::pin!(ctrl_c);
// Event loop
loop {
tokio::select! {
// Handle shutdown signal
_ = &mut ctrl_c => {
info!("Received shutdown signal, exiting gracefully");
break;
}
// Handle node events
Some(event_result) = node_stream.next() => {
match event_result {
Ok(event) => {
let handler = event_handler.clone();
tokio::spawn(async move {
handler.handle_node_event(event).await;
});
}
Err(e) => {
error!(error = %e, "Error processing node watch event");
}
}
}
// Handle pod events
Some(event_result) = pod_stream.next() => {
match event_result {
Ok(event) => {
let handler = event_handler.clone();
tokio::spawn(async move {
handler.handle_pod_event(event).await;
});
}
Err(e) => {
error!(error = %e, "Error processing pod watch event");
}
}
}
// If streams end unexpectedly
else => {
error!("Watch streams ended unexpectedly, restarting...");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
// Recreate watchers and streams
let node_watcher = NodeWatcher::new(&kube_client);
let new_node_stream = node_watcher.watch().await?;
node_stream.set(new_node_stream);
let pod_watcher = PodWatcher::new(&kube_client);
let new_pod_stream = pod_watcher.watch().await?;
pod_stream.set(new_pod_stream);
info!("Watchers restarted");
}
}
}
info!("Daemon stopped");
Ok(())
}

5
src/tools/mod.rs Normal file
View File

@@ -0,0 +1,5 @@
mod node_details;
mod pod_tools;
pub use node_details::get_node_details_tool;
pub use pod_tools::{get_pod_details_tool, get_pod_logs_tool};

25
src/tools/node_details.rs Normal file
View File

@@ -0,0 +1,25 @@
use async_openai::types::chat::{ChatCompletionTool, FunctionObject};
use serde_json::json;
/// Creates tool definition for getting detailed node information
pub fn get_node_details_tool() -> ChatCompletionTool {
ChatCompletionTool {
function: FunctionObject {
name: "get_node_details".to_string(),
description: Some(
"Get detailed diagnostic information about a specific Kubernetes node including conditions, resources, taints, labels, and pod count".to_string(),
),
parameters: Some(json!({
"type": "object",
"properties": {
"node_name": {
"type": "string",
"description": "The name of the node to get details for"
}
},
"required": ["node_name"]
})),
strict: None,
},
}
}

64
src/tools/pod_tools.rs Normal file
View File

@@ -0,0 +1,64 @@
use async_openai::types::chat::{ChatCompletionTool, FunctionObject};
use serde_json::json;
/// Creates tool definition for getting detailed pod information
pub fn get_pod_details_tool() -> ChatCompletionTool {
ChatCompletionTool {
function: FunctionObject {
name: "get_pod_details".to_string(),
description: Some(
"Get detailed diagnostic information about a specific Kubernetes pod including phase, conditions, container statuses, and restart counts".to_string(),
),
parameters: Some(json!({
"type": "object",
"properties": {
"namespace": {
"type": "string",
"description": "The namespace of the pod"
},
"pod_name": {
"type": "string",
"description": "The name of the pod to get details for"
}
},
"required": ["namespace", "pod_name"]
})),
strict: None,
},
}
}
/// Creates tool definition for getting pod logs
pub fn get_pod_logs_tool() -> ChatCompletionTool {
ChatCompletionTool {
function: FunctionObject {
name: "get_pod_logs".to_string(),
description: Some(
"Get recent logs from a pod's container to diagnose runtime issues and errors".to_string(),
),
parameters: Some(json!({
"type": "object",
"properties": {
"namespace": {
"type": "string",
"description": "The namespace of the pod"
},
"pod_name": {
"type": "string",
"description": "The name of the pod to get logs from"
},
"container_name": {
"type": "string",
"description": "Optional: specific container name. If not provided, gets logs from first container"
},
"tail_lines": {
"type": "integer",
"description": "Number of recent log lines to retrieve. Default is 50 lines"
}
},
"required": ["namespace", "pod_name"]
})),
strict: None,
},
}
}