From 016ba6583dee38b55ba2c6a7d8ef920aa00ef068 Mon Sep 17 00:00:00 2001 From: AB-UK Date: Wed, 24 Dec 2025 02:46:22 +0000 Subject: [PATCH] Init --- .gitignore | 1 + Cargo.lock | 494 ++++++++++++++++++++++++++++++++++---- Cargo.toml | 1 + config.toml | 10 - config.toml.example | 28 +++ src/ai/diagnostics.rs | 19 ++ src/config.rs | 4 +- src/events/correlation.rs | 117 +++++++++ src/events/formatter.rs | 86 +++++++ src/events/handler.rs | 187 ++++++++++++++- src/events/mod.rs | 4 + src/k8s/pod_watcher.rs | 27 +++ src/k8s/pods.rs | 16 ++ src/k8s/types.rs | 9 + src/main.rs | 14 ++ src/telegram.rs | 201 ++++++++++++++++ 16 files changed, 1148 insertions(+), 70 deletions(-) delete mode 100644 config.toml create mode 100644 config.toml.example create mode 100644 src/events/correlation.rs create mode 100644 src/events/formatter.rs create mode 100644 src/telegram.rs diff --git a/.gitignore b/.gitignore index 3ff8249..0494cae 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target .crush +config.toml diff --git a/Cargo.lock b/Cargo.lock index d669e5b..6d6981e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -59,14 +59,14 @@ checksum = "afb7051804e03daf32cd7e45e7a655bb6cea9283309d2253babfb38c09f4ea03" dependencies = [ "async-openai-macros", "backoff", - "base64", + "base64 0.22.1", "bytes", "derive_builder", "eventsource-stream", "futures", "getrandom 0.3.4", "rand 0.9.2", - "reqwest", + "reqwest 0.12.28", "reqwest-eventsource", "secrecy", "serde", @@ -150,12 +150,24 @@ dependencies = [ "tokio", ] +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.10.0" @@ -375,6 +387,7 @@ dependencies = [ "futures", "k8s-openapi", "kube", + "reqwest 0.11.27", "serde", "serde_json", "tokio", @@ -407,6 +420,15 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "enum-ordinalize" version = "4.3.2" @@ -433,6 +455,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys 0.61.2", +] + [[package]] name = "event-listener" version = "5.4.1" @@ -465,6 +497,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "find-msvc-tools" version = "0.1.5" @@ -483,6 +521,21 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -624,6 +677,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "h2" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.15.5" @@ -647,10 +719,10 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3314d5adb5d94bcdf56771f2e50dbbc80bb4bdf88967526706205ac9eff24eb" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "headers-core", - "http", + "http 1.4.0", "httpdate", "mime", "sha1", @@ -662,7 +734,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" dependencies = [ - "http", + "http 1.4.0", ] [[package]] @@ -674,6 +746,17 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.4.0" @@ -684,6 +767,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -691,7 +785,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http", + "http 1.4.0", ] [[package]] @@ -702,8 +796,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -719,6 +813,30 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "hyper" +version = "0.14.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2 0.5.10", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.8.1" @@ -729,8 +847,8 @@ dependencies = [ "bytes", "futures-channel", "futures-core", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "httparse", "itoa", "pin-project-lite", @@ -749,8 +867,8 @@ dependencies = [ "bytes", "futures-util", "headers", - "http", - "hyper", + "http 1.4.0", + "hyper 1.8.1", "hyper-rustls", "hyper-util", "pin-project-lite", @@ -766,8 +884,8 @@ version = "0.27.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" dependencies = [ - "http", - "hyper", + "http 1.4.0", + "hyper 1.8.1", "hyper-util", "log", "rustls", @@ -784,32 +902,45 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ - "hyper", + "hyper 1.8.1", "hyper-util", "pin-project-lite", "tokio", "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper 0.14.32", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-util" version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures-channel", "futures-core", "futures-util", - "http", - "http-body", - "hyper", + "http 1.4.0", + "http-body 1.0.1", + "hyper 1.8.1", "ipnet", "libc", "percent-encoding", "pin-project-lite", - "socket2", + "socket2 0.6.1", "tokio", "tower-service", "tracing", @@ -1039,7 +1170,7 @@ version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8847402328d8301354c94d605481f25a6bdc1ed65471fd96af8eca71141b13" dependencies = [ - "base64", + "base64 0.22.1", "chrono", "serde", "serde-value", @@ -1065,16 +1196,16 @@ version = "0.97.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d539b6493d162ae5ab691762be972b6a1c20f6d8ddafaae305c0e2111b589d99" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "chrono", "either", "futures", "home", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-http-proxy", "hyper-rustls", "hyper-timeout", @@ -1084,7 +1215,7 @@ dependencies = [ "kube-core", "pem", "rustls", - "rustls-pemfile", + "rustls-pemfile 2.2.0", "secrecy", "serde", "serde_json", @@ -1105,7 +1236,7 @@ checksum = "98a87cc0046cf6b62cbb63ae1fbc366ee8ba29269f575289679473754ff5d7a7" dependencies = [ "chrono", "form_urlencoded", - "http", + "http 1.4.0", "json-patch", "k8s-openapi", "schemars", @@ -1168,6 +1299,12 @@ version = "0.2.178" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" +[[package]] +name = "linux-raw-sys" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" + [[package]] name = "litemap" version = "0.8.1" @@ -1243,6 +1380,23 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework 2.11.1", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nom" version = "7.1.3" @@ -1277,12 +1431,50 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "openssl" +version = "0.10.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328" +dependencies = [ + "bitflags 2.10.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "openssl-probe" version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "openssl-sys" +version = "0.9.111" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82cab2d520aa75e3c58898289429321eb788c3106963d0dc886ec7a5f4adc321" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -1327,7 +1519,7 @@ version = "3.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" dependencies = [ - "base64", + "base64 0.22.1", "serde_core", ] @@ -1412,6 +1604,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + [[package]] name = "potential_utf" version = "0.1.4" @@ -1452,7 +1650,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2", + "socket2 0.6.1", "thiserror 2.0.17", "tokio", "tracing", @@ -1489,7 +1687,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2", + "socket2 0.6.1", "tracing", "windows-sys 0.60.2", ] @@ -1574,7 +1772,7 @@ version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -1606,20 +1804,60 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +[[package]] +name = "reqwest" +version = "0.11.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" +dependencies = [ + "base64 0.21.7", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.32", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls-pemfile 1.0.4", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 0.1.2", + "system-configuration", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "reqwest" version = "0.12.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" dependencies = [ - "base64", + "base64 0.22.1", "bytes", "futures-core", "futures-util", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-rustls", "hyper-util", "js-sys", @@ -1634,7 +1872,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.2", "tokio", "tokio-rustls", "tokio-util", @@ -1660,7 +1898,7 @@ dependencies = [ "mime", "nom", "pin-project-lite", - "reqwest", + "reqwest 0.12.28", "thiserror 1.0.69", ] @@ -1684,6 +1922,19 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" +[[package]] +name = "rustix" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" +dependencies = [ + "bitflags 2.10.0", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.61.2", +] + [[package]] name = "rustls" version = "0.23.35" @@ -1706,7 +1957,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 2.2.0", "rustls-pki-types", "schannel", "security-framework 2.11.1", @@ -1724,6 +1975,15 @@ dependencies = [ "security-framework 3.5.1", ] +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.7", +] + [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -1821,7 +2081,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation 0.9.4", "core-foundation-sys", "libc", @@ -1834,7 +2094,7 @@ version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation 0.10.1", "core-foundation-sys", "libc", @@ -2007,6 +2267,16 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "socket2" version = "0.6.1" @@ -2046,6 +2316,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "sync_wrapper" version = "1.0.2" @@ -2066,6 +2342,40 @@ dependencies = [ "syn", ] +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation 0.9.4", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "tempfile" +version = "3.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c" +dependencies = [ + "fastrand", + "getrandom 0.3.4", + "once_cell", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -2152,7 +2462,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.6.1", "tokio-macros", "windows-sys 0.61.2", ] @@ -2168,6 +2478,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.4" @@ -2253,7 +2573,7 @@ dependencies = [ "futures-core", "futures-util", "pin-project-lite", - "sync_wrapper", + "sync_wrapper 1.0.2", "tokio", "tokio-util", "tower-layer", @@ -2267,12 +2587,12 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ - "base64", - "bitflags", + "base64 0.22.1", + "bitflags 2.10.0", "bytes", "futures-util", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "iri-string", "mime", "pin-project-lite", @@ -2422,6 +2742,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" @@ -2602,6 +2928,15 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -2629,6 +2964,21 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -2662,6 +3012,12 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -2674,6 +3030,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -2686,6 +3048,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -2710,6 +3078,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -2722,6 +3096,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -2734,6 +3114,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -2746,6 +3132,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -2767,6 +3159,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "wit-bindgen" version = "0.46.0" diff --git a/Cargo.toml b/Cargo.toml index d5084b4..3998e11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,3 +15,4 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } futures = "0.3" chrono = "0.4" +reqwest = { version = "0.11", features = ["json"] } diff --git a/config.toml b/config.toml deleted file mode 100644 index 2462fb7..0000000 --- a/config.toml +++ /dev/null @@ -1,10 +0,0 @@ -# API Configuration -api_base = "http://localhost:11434/v1" -api_key = "ollama" -model = "qwen3-tools:latest" - -# AI concurrency settings -max_concurrent_diagnoses = 1 # Maximum parallel AI diagnosis requests - -# System prompt for the AI assistant -system_prompt = "You are a Kubernetes diagnostic system. Analyze issues using provided tools and output ONLY a brief technical diagnosis in plain text. Rules: 1) Call tools to gather data. 2) After getting data, output diagnosis as plain text - NO XML, NO function calls in output. 3) Format: 'Resource [name] - [problem]. Cause: [technical reason].' 4) Maximum 3 sentences. 5) If tool fails with error, diagnose based on error message. 6) Output must be plain text diagnosis, not tool calls." diff --git a/config.toml.example b/config.toml.example new file mode 100644 index 0000000..488a50e --- /dev/null +++ b/config.toml.example @@ -0,0 +1,28 @@ +# API Configuration +api_base = "http://localhost:11434/v1" +api_key = "ollama" +model = "devstral-small-2:latest" +#model = "qwen3-tools:latest" + +# AI concurrency settings +max_concurrent_diagnoses = 1 # Maximum parallel AI diagnosis requests + +# Telegram notifications (optional - leave empty to disable) +telegram_bot_token = "8339158626:AAG3O0hmFYsQdW43ikcGBwSa5RnYIl4axFE" +telegram_chat_id = "124317807" + +# System prompt for the AI assistant +system_prompt = """You are a Kubernetes diagnostic expert. Analyze issues using provided tools. + +OUTPUT FORMAT: +🔍 [Resource]: [name] +📋 Problem: [one sentence] +🔎 Root Cause: [1-2 short sentences with technical details] + +RULES: +1. Use tools to gather data first +2. If tool fails, diagnose from the error message +3. Be concise and technical +4. Focus on actionable root cause +5. For node issues affecting multiple pods, diagnose the node problem not individual pods +""" diff --git a/src/ai/diagnostics.rs b/src/ai/diagnostics.rs index b22dd3b..44bc49f 100644 --- a/src/ai/diagnostics.rs +++ b/src/ai/diagnostics.rs @@ -83,6 +83,25 @@ impl DiagnosticEngine { .await } + /// Diagnoses a node with custom context (e.g., mass pod failures) + pub async fn diagnose_node_with_context( + &self, + ai_client: &super::AIClient, + _node_name: &str, + context: &str, + ) -> Result> { + let full_description = format!( + "{}. Use the get_node_details tool to inspect the node if needed.", + context + ); + + let tools: Vec = + vec![ChatCompletionTools::Function(tools::get_node_details_tool())]; + + self.run_diagnosis(ai_client, full_description, tools) + .await + } + async fn run_diagnosis( &self, ai_client: &super::AIClient, diff --git a/src/config.rs b/src/config.rs index 2b01030..88b4d11 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,7 +1,7 @@ use serde::Deserialize; use std::fs; -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] pub struct Config { pub api_base: String, pub api_key: String, @@ -9,6 +9,8 @@ pub struct Config { pub system_prompt: String, #[serde(default = "default_max_concurrent_diagnoses")] pub max_concurrent_diagnoses: usize, + pub telegram_bot_token: Option, + pub telegram_chat_id: Option, } fn default_max_concurrent_diagnoses() -> usize { diff --git a/src/events/correlation.rs b/src/events/correlation.rs new file mode 100644 index 0000000..aa16fec --- /dev/null +++ b/src/events/correlation.rs @@ -0,0 +1,117 @@ +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; +use std::time::{Duration, Instant}; +use tracing::{debug, info, warn}; + +/// Tracks issues by node to detect patterns and enable smart correlation +#[derive(Clone)] +pub struct CorrelationEngine { + // node_name -> (pod_name, timestamp) + node_issues: Arc>>>, + // node_name -> timestamp of last diagnosis (to avoid duplicate diagnoses) + diagnosed_nodes: Arc>>, +} + +impl CorrelationEngine { + pub fn new() -> Self { + Self { + node_issues: Arc::new(RwLock::new(HashMap::new())), + diagnosed_nodes: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Records a pod issue associated with a node + pub async fn record_pod_issue(&self, node_name: &str, pod_name: String) { + let mut issues = self.node_issues.write().await; + let node_issues = issues.entry(node_name.to_string()).or_insert_with(Vec::new); + + // Clean old entries (older than 5 minutes) + node_issues.retain(|(_, timestamp)| timestamp.elapsed() < Duration::from_secs(300)); + + // Add new issue + node_issues.push((pod_name, Instant::now())); + + debug!(node = %node_name, count = node_issues.len(), "Recorded pod issue on node"); + } + + /// Checks if there's a mass failure on a node (>5 pods in last 60 seconds) + /// Also checks if we already diagnosed this node recently to avoid duplicates + pub async fn is_mass_failure(&self, node_name: &str) -> (bool, Vec) { + // First check if we already diagnosed this node recently (within last 5 minutes) + let diagnosed = self.diagnosed_nodes.read().await; + if let Some(last_diagnosis) = diagnosed.get(node_name) { + if last_diagnosis.elapsed() < Duration::from_secs(300) { + debug!( + node = %node_name, + elapsed_secs = last_diagnosis.elapsed().as_secs(), + "Skipping duplicate diagnosis - node recently diagnosed" + ); + return (false, vec![]); + } + } + drop(diagnosed); + + let issues = self.node_issues.read().await; + + if let Some(node_issues) = issues.get(node_name) { + let recent_count = node_issues + .iter() + .filter(|(_, timestamp)| timestamp.elapsed() < Duration::from_secs(60)) + .count(); + + if recent_count >= 5 { + let affected_pods: Vec = node_issues + .iter() + .filter(|(_, timestamp)| timestamp.elapsed() < Duration::from_secs(60)) + .map(|(name, _)| name.clone()) + .collect(); + + warn!( + node = %node_name, + affected_pods = recent_count, + "Detected mass failure on node" + ); + + return (true, affected_pods); + } + } + + (false, vec![]) + } + + /// Marks a node as diagnosed (prevents duplicate diagnoses) + pub async fn mark_node_diagnosed(&self, node_name: &str) { + let mut diagnosed = self.diagnosed_nodes.write().await; + diagnosed.insert(node_name.to_string(), Instant::now()); + debug!(node = %node_name, "Marked node as diagnosed"); + } + + /// Clears recorded issues for a node (call when node becomes healthy) + pub async fn clear_node_issues(&self, node_name: &str) { + let mut issues = self.node_issues.write().await; + let mut diagnosed = self.diagnosed_nodes.write().await; + + if issues.remove(node_name).is_some() { + info!(node = %node_name, "Cleared node issues - node recovered"); + } + + // Also clear diagnosis marker + diagnosed.remove(node_name); + } + + /// Gets count of recent issues on a node + pub async fn get_recent_issue_count(&self, node_name: &str) -> usize { + let issues = self.node_issues.read().await; + + issues + .get(node_name) + .map(|node_issues| { + node_issues + .iter() + .filter(|(_, timestamp)| timestamp.elapsed() < Duration::from_secs(300)) + .count() + }) + .unwrap_or(0) + } +} diff --git a/src/events/formatter.rs b/src/events/formatter.rs new file mode 100644 index 0000000..f13b226 --- /dev/null +++ b/src/events/formatter.rs @@ -0,0 +1,86 @@ +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; +use std::time::{Duration, Instant}; +use tracing::debug; + +/// Deduplicates and formats diagnosis results to avoid spam +#[derive(Clone)] +pub struct DiagnosisFormatter { + // Hash of diagnosis content -> (timestamp, count) + seen_diagnoses: Arc>>, +} + +impl DiagnosisFormatter { + pub fn new() -> Self { + Self { + seen_diagnoses: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Checks if this diagnosis is a duplicate and should be suppressed + /// Returns: (should_display, display_suffix) + pub async fn should_display(&self, diagnosis: &str) -> (bool, Option) { + let diagnosis_hash = Self::hash_diagnosis(diagnosis); + let mut seen = self.seen_diagnoses.write().await; + + // Clean old entries (older than 10 minutes) + seen.retain(|_, (timestamp, _)| timestamp.elapsed() < Duration::from_secs(600)); + + if let Some((last_seen, count)) = seen.get_mut(&diagnosis_hash) { + // Similar diagnosis seen recently + if last_seen.elapsed() < Duration::from_secs(300) { + // Within 5 minutes - increment count and suppress + *count += 1; + *last_seen = Instant::now(); + debug!( + hash = %diagnosis_hash, + count = *count, + "Suppressing duplicate diagnosis" + ); + return (false, Some(format!(" (seen {} times in last 5min)", count))); + } else { + // More than 5 minutes - reset and show + *last_seen = Instant::now(); + *count = 1; + } + } else { + // First time seeing this diagnosis + seen.insert(diagnosis_hash.clone(), (Instant::now(), 1)); + } + + (true, None) + } + + /// Creates a simplified hash of diagnosis to detect duplicates + /// Focuses on root cause rather than resource names + fn hash_diagnosis(diagnosis: &str) -> String { + // Extract key phrases from diagnosis + let normalized = diagnosis + .to_lowercase() + .lines() + .filter(|line| { + line.contains("root cause:") + || line.contains("cause:") + || line.contains("problem:") + || line.contains("severity:") + }) + .collect::>() + .join(" "); + + // Simple hash based on content + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + let mut hasher = DefaultHasher::new(); + normalized.hash(&mut hasher); + format!("{:x}", hasher.finish()) + } + + /// Periodically clean old entries + pub async fn cleanup(&self) { + let mut seen = self.seen_diagnoses.write().await; + seen.retain(|_, (timestamp, _)| timestamp.elapsed() < Duration::from_secs(600)); + debug!("Cleaned up diagnosis cache, {} entries remain", seen.len()); + } +} diff --git a/src/events/handler.rs b/src/events/handler.rs index 3a0a770..d55c4a1 100644 --- a/src/events/handler.rs +++ b/src/events/handler.rs @@ -1,23 +1,41 @@ use crate::ai::{AIClient, DiagnosticEngine}; use crate::k8s::{KubeClient, NodeEvent, NodeEventType, PodEvent, PodEventType}; +use crate::telegram::TelegramNotifier; use std::sync::Arc; use tokio::sync::Semaphore; use tracing::{error, info, warn}; +use super::correlation::CorrelationEngine; +use super::formatter::DiagnosisFormatter; + #[derive(Clone)] pub struct EventHandler { kube_client: KubeClient, ai_client: AIClient, // Semaphore to limit concurrent AI diagnoses diagnosis_semaphore: Arc, + // Correlation engine to detect patterns + correlation: CorrelationEngine, + // Formatter to avoid duplicate diagnoses + formatter: DiagnosisFormatter, + // Optional Telegram notifier + telegram: Option, } impl EventHandler { - pub fn new(kube_client: KubeClient, ai_client: AIClient, max_concurrent: usize) -> Self { + pub fn new( + kube_client: KubeClient, + ai_client: AIClient, + max_concurrent: usize, + telegram: Option, + ) -> Self { Self { kube_client, ai_client, diagnosis_semaphore: Arc::new(Semaphore::new(max_concurrent)), + correlation: CorrelationEngine::new(), + formatter: DiagnosisFormatter::new(), + telegram, } } @@ -47,13 +65,73 @@ impl EventHandler { } NodeEventType::BecameReady => { info!(node = %event.node_name, "Node became Ready"); + // Clear correlation data for this node since it recovered + self.correlation.clear_node_issues(&event.node_name).await; + + // Mark as resolved in Telegram + if let Some(ref telegram) = self.telegram { + telegram.mark_node_resolved(&event.node_name).await; + } } } } /// Handle pod event and trigger AI diagnostics if needed pub async fn handle_pod_event(&self, event: PodEvent) { + // First, get pod details to determine which node it's on + let node_name = match self + .kube_client + .get_pod_details(&event.namespace, &event.pod_name) + .await + { + Ok(details) => details.node_name, + Err(e) => { + warn!( + pod = %event.pod_name, + namespace = %event.namespace, + error = %e, + "Failed to get pod details for correlation" + ); + None + } + }; + + // If pod is on a node, record the issue for correlation + if let Some(ref node) = node_name { + self.correlation + .record_pod_issue(node, format!("{}/{}", event.namespace, event.pod_name)) + .await; + + // Check if this is part of a mass failure + let (is_mass_failure, affected_pods) = self.correlation.is_mass_failure(node).await; + + if is_mass_failure { + info!( + node = %node, + affected_pods = affected_pods.len(), + "Detected mass pod failure on node - diagnosing node instead" + ); + + // Diagnose the node with context about affected pods + self.diagnose_node_with_pods(node, affected_pods).await; + return; // Don't diagnose individual pod + } + } + + // Build problem description with node context let problem_description = match &event.event_type { + PodEventType::Recovered => { + info!( + pod = %event.pod_name, + namespace = %event.namespace, + "Pod recovered - marking as resolved" + ); + // Mark as resolved in Telegram + if let Some(ref telegram) = self.telegram { + telegram.mark_pod_resolved(&event.namespace, &event.pod_name).await; + } + return; // No diagnosis needed for recovery + } PodEventType::HighRestartCount { count } => { warn!( pod = %event.pod_name, @@ -117,7 +195,7 @@ impl EventHandler { } }; - self.diagnose_pod(&event.namespace, &event.pod_name, &problem_description) + self.diagnose_pod(&event.namespace, &event.pod_name, &problem_description, node_name.as_deref()) .await; } @@ -129,55 +207,138 @@ impl EventHandler { let diagnostic_engine = DiagnosticEngine::new(self.kube_client.clone()); - match diagnostic_engine + let diagnosis_opt = match diagnostic_engine .diagnose_nodes(&self.ai_client, vec![node_name.to_string()]) .await { Ok(diagnosis) => { info!( node = %node_name, - diagnosis = %diagnosis, - "AI diagnosis completed" + "Node diagnosis completed:\n{}", diagnosis ); + Some(diagnosis) } Err(e) => { + let error_msg = e.to_string(); error!( node = %node_name, - error = %e, + error = %error_msg, "AI diagnosis failed" ); + None + } + }; + + // Send to Telegram if configured (after match is complete) + if let Some(ref telegram) = self.telegram { + if let Some(diagnosis) = diagnosis_opt { + telegram.send_node_diagnosis(node_name, &diagnosis).await; } } // Permit is automatically released when _permit is dropped } - async fn diagnose_pod(&self, namespace: &str, pod_name: &str, problem: &str) { + async fn diagnose_pod(&self, namespace: &str, pod_name: &str, problem: &str, node_name: Option<&str>) { // Acquire semaphore permit to limit concurrency let _permit = self.diagnosis_semaphore.acquire().await.unwrap(); - info!(pod = %pod_name, namespace = %namespace, "Starting AI diagnosis (acquired permit)"); + info!(pod = %pod_name, namespace = %namespace, node = ?node_name, "Starting AI diagnosis (acquired permit)"); let diagnostic_engine = DiagnosticEngine::new(self.kube_client.clone()); - match diagnostic_engine - .diagnose_pod(&self.ai_client, namespace, pod_name, problem) + // Add node context to problem description if available + let full_problem = if let Some(node) = node_name { + format!("{} (Pod is on node: {})", problem, node) + } else { + problem.to_string() + }; + + let diagnosis_opt = match diagnostic_engine + .diagnose_pod(&self.ai_client, namespace, pod_name, &full_problem) .await { Ok(diagnosis) => { info!( pod = %pod_name, namespace = %namespace, - diagnosis = %diagnosis, - "AI diagnosis completed" + "AI diagnosis completed:\n{}", diagnosis ); + Some(diagnosis) } Err(e) => { + let error_msg = e.to_string(); error!( pod = %pod_name, namespace = %namespace, - error = %e, + error = %error_msg, "AI diagnosis failed" ); + None + } + }; + + // Send to Telegram if configured (after match is complete) + if let Some(ref telegram) = self.telegram { + if let Some(diagnosis) = diagnosis_opt { + telegram.send_pod_diagnosis(namespace, pod_name, &diagnosis).await; + } + } + // Permit is automatically released when _permit is dropped + } + + /// Diagnose a node with context about affected pods (mass failure scenario) + async fn diagnose_node_with_pods(&self, node_name: &str, affected_pods: Vec) { + // Mark node as diagnosed to prevent duplicate diagnoses for subsequent pod events + self.correlation.mark_node_diagnosed(node_name).await; + + // Acquire semaphore permit to limit concurrency + let _permit = self.diagnosis_semaphore.acquire().await.unwrap(); + + info!( + node = %node_name, + affected_pods = affected_pods.len(), + "Starting grouped AI diagnosis (acquired permit)" + ); + + let diagnostic_engine = DiagnosticEngine::new(self.kube_client.clone()); + + // Create a detailed problem description for mass failure + let problem_description = format!( + "Node {} has issues affecting {} pods. Affected pods: {}. \ + This appears to be a node-level problem rather than individual pod issues. \ + Analyze the node state and determine the root cause.", + node_name, + affected_pods.len(), + affected_pods.join(", ") + ); + + let diagnosis_opt = match diagnostic_engine + .diagnose_node_with_context(&self.ai_client, node_name, &problem_description) + .await + { + Ok(diagnosis) => { + info!( + node = %node_name, + affected_pods = affected_pods.len(), + "Grouped diagnosis completed:\n{}", diagnosis + ); + Some(diagnosis) + } + Err(e) => { + let error_msg = e.to_string(); + error!( + node = %node_name, + error = %error_msg, + "Grouped AI diagnosis failed" + ); + None + } + }; + + // Send to Telegram with grouped context (after match is complete) + if let Some(ref telegram) = self.telegram { + if let Some(diagnosis) = diagnosis_opt { + telegram.send_grouped_diagnosis(node_name, affected_pods.len(), &diagnosis).await; } } // Permit is automatically released when _permit is dropped diff --git a/src/events/mod.rs b/src/events/mod.rs index 62033bf..1122132 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -1,3 +1,7 @@ +mod correlation; +mod formatter; mod handler; +pub use correlation::CorrelationEngine; +pub use formatter::DiagnosisFormatter; pub use handler::EventHandler; diff --git a/src/k8s/pod_watcher.rs b/src/k8s/pod_watcher.rs index 8ea0ece..d373612 100644 --- a/src/k8s/pod_watcher.rs +++ b/src/k8s/pod_watcher.rs @@ -24,6 +24,7 @@ pub enum PodEventType { CrashLoopBackOff, ImagePullError, ContainerCreating { duration_seconds: i64 }, + Recovered, // Pod returned to healthy state } pub struct PodWatcher { @@ -83,6 +84,24 @@ impl PodWatcher { .and_then(|s| s.phase.as_deref()) .unwrap_or("Unknown"); + // Check if pod is now healthy (Running with all containers ready) + let is_healthy = phase == "Running" && Self::all_containers_ready(&pod); + + // If pod was problematic and is now healthy - emit recovery event + if is_healthy && self.reported_issues.contains_key(&key) { + info!( + pod = %name, + namespace = %namespace, + "Pod recovered from previous issue" + ); + self.reported_issues.remove(&key); + return Some(PodEvent { + pod_name: name, + namespace, + event_type: PodEventType::Recovered, + }); + } + // Helper to check if we should report this issue let should_report = |event_type: &PodEventType| -> bool { match self.reported_issues.get(&key) { @@ -290,4 +309,12 @@ impl PodWatcher { .and_then(|s| s.message.clone()) }) } + + fn all_containers_ready(pod: &Pod) -> bool { + pod.status + .as_ref() + .and_then(|s| s.container_statuses.as_ref()) + .map(|cs| cs.iter().all(|c| c.ready)) + .unwrap_or(false) + } } diff --git a/src/k8s/pods.rs b/src/k8s/pods.rs index 6133157..07e91e6 100644 --- a/src/k8s/pods.rs +++ b/src/k8s/pods.rs @@ -73,6 +73,21 @@ impl KubeClient { ("Unknown".to_string(), None, None) }; + // Extract resource requests/limits from pod spec + let resources = pod.spec.as_ref().and_then(|spec| { + spec.containers.iter().find(|container| container.name == c.name).and_then(|container| { + container.resources.as_ref().map(|res| { + use super::types::ContainerResources; + ContainerResources { + requests_cpu: res.requests.as_ref().and_then(|r| r.get("cpu").map(|q| q.0.clone())), + requests_memory: res.requests.as_ref().and_then(|r| r.get("memory").map(|q| q.0.clone())), + limits_cpu: res.limits.as_ref().and_then(|l| l.get("cpu").map(|q| q.0.clone())), + limits_memory: res.limits.as_ref().and_then(|l| l.get("memory").map(|q| q.0.clone())), + } + }) + }) + }); + ContainerStatus { name: c.name.clone(), ready: c.ready, @@ -80,6 +95,7 @@ impl KubeClient { state, state_reason, state_message, + resources, } }) .collect() diff --git a/src/k8s/types.rs b/src/k8s/types.rs index 597e195..fe71526 100644 --- a/src/k8s/types.rs +++ b/src/k8s/types.rs @@ -45,6 +45,14 @@ pub struct PodCondition { pub message: Option, } +#[derive(Debug, Serialize)] +pub struct ContainerResources { + pub requests_cpu: Option, + pub requests_memory: Option, + pub limits_cpu: Option, + pub limits_memory: Option, +} + #[derive(Debug, Serialize)] pub struct ContainerStatus { pub name: String, @@ -53,6 +61,7 @@ pub struct ContainerStatus { pub state: String, pub state_reason: Option, pub state_message: Option, + pub resources: Option, } #[derive(Debug, Serialize)] diff --git a/src/main.rs b/src/main.rs index 258fc9f..439b6e6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ mod ai; mod config; mod events; mod k8s; +mod telegram; mod tools; use ai::AIClient; @@ -11,6 +12,7 @@ use futures::StreamExt; use k8s::{KubeClient, NodeWatcher, PodWatcher}; use tracing::{error, info}; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; +use telegram::create_notifier; #[tokio::main] async fn main() -> Result<(), Box> { @@ -37,11 +39,23 @@ async fn main() -> Result<(), Box> { "AI client initialized" ); + // Initialize Telegram notifier (optional) + let telegram = create_notifier( + cfg.telegram_bot_token.clone(), + cfg.telegram_chat_id.clone(), + ); + if telegram.is_some() { + info!("Telegram notifications enabled"); + } else { + info!("Telegram notifications disabled (not configured)"); + } + // Create event handler with concurrency limit let event_handler = EventHandler::new( kube_client.clone(), ai_client, cfg.max_concurrent_diagnoses, + telegram, ); // Start node watcher diff --git a/src/telegram.rs b/src/telegram.rs new file mode 100644 index 0000000..a88f1f5 --- /dev/null +++ b/src/telegram.rs @@ -0,0 +1,201 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::{debug, error, info, warn}; + +#[derive(Clone)] +pub struct TelegramNotifier { + bot_token: String, + chat_id: String, + client: reqwest::Client, + // Store message IDs for resources to mark them as resolved later + // Key format: "node:name" or "pod:namespace/name" + message_ids: Arc>>, +} + +#[derive(Serialize)] +struct SendMessageRequest { + chat_id: String, + text: String, + parse_mode: String, +} + +#[derive(Deserialize)] +struct SendMessageResponse { + ok: bool, + result: Option, +} + +#[derive(Deserialize)] +struct MessageResult { + message_id: i64, +} + +impl TelegramNotifier { + pub fn new(bot_token: String, chat_id: String) -> Self { + Self { + bot_token, + chat_id, + client: reqwest::Client::new(), + message_ids: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Sends a node diagnosis notification to Telegram and stores message_id + pub async fn send_node_diagnosis(&self, node: &str, diagnosis: &str) { + // Format message with Markdown + let message = format!("🔴 *Node Diagnosis*\n\n*Node:* `{}`\n\n```\n{}\n```", node, diagnosis); + + if let Ok(message_id) = self.send_message(&message).await { + debug!(node = %node, message_id = message_id, "Telegram node notification sent"); + // Store message_id for this node + let key = format!("node:{}", node); + self.message_ids.write().await.insert(key, message_id); + } else { + error!("Failed to send Telegram notification"); + } + } + + /// Sends a pod diagnosis notification and stores message_id + pub async fn send_pod_diagnosis(&self, namespace: &str, pod_name: &str, diagnosis: &str) { + let message = format!("🤖 *Pod Diagnosis*\n\n*Pod:* `{}/{}`\n\n```\n{}\n```", namespace, pod_name, diagnosis); + + if let Ok(message_id) = self.send_message(&message).await { + debug!(pod = %pod_name, namespace = %namespace, message_id = message_id, "Telegram pod notification sent"); + // Store message_id for this pod + let key = format!("pod:{}/{}", namespace, pod_name); + self.message_ids.write().await.insert(key, message_id); + } else { + error!("Failed to send Telegram notification"); + } + } + + /// Sends a grouped diagnosis notification for node issues + pub async fn send_grouped_diagnosis(&self, node: &str, affected_pods: usize, diagnosis: &str) { + let message = format!( + "🔴 *Node Issue Detected*\n\n*Node:* `{}`\n*Affected Pods:* {}\n\n```\n{}\n```", + node, affected_pods, diagnosis + ); + + if let Ok(message_id) = self.send_message(&message).await { + debug!(node = %node, message_id = message_id, "Telegram grouped notification sent"); + // Store message_id for this node + let key = format!("node:{}", node); + self.message_ids.write().await.insert(key, message_id); + } else { + error!("Failed to send Telegram notification"); + } + } + + /// Marks a node as resolved by editing the message + pub async fn mark_node_resolved(&self, node: &str) { + let key = format!("node:{}", node); + let message_ids = self.message_ids.read().await; + + if let Some(&message_id) = message_ids.get(&key) { + drop(message_ids); // Release lock before async call + + let resolved_text = format!("✅ *Node Recovered*\n\n*Node:* `{}`\n\n_Issue has been resolved_", node); + if let Err(e) = self.edit_message(message_id, &resolved_text).await { + warn!(node = %node, error = %e, "Failed to edit resolved message"); + } else { + info!(node = %node, "Marked node as resolved in Telegram"); + // Remove from tracking after marking resolved + self.message_ids.write().await.remove(&key); + } + } else { + debug!(node = %node, "No message_id found for node (might not have been diagnosed via Telegram)"); + } + } + + /// Marks a pod as resolved by editing the message + pub async fn mark_pod_resolved(&self, namespace: &str, pod_name: &str) { + let key = format!("pod:{}/{}", namespace, pod_name); + let message_ids = self.message_ids.read().await; + + if let Some(&message_id) = message_ids.get(&key) { + drop(message_ids); // Release lock before async call + + let resolved_text = format!("✅ *Pod Recovered*\n\n*Pod:* `{}/{}`\n\n_Issue has been resolved_", namespace, pod_name); + if let Err(e) = self.edit_message(message_id, &resolved_text).await { + warn!(pod = %pod_name, namespace = %namespace, error = %e, "Failed to edit resolved message"); + } else { + info!(pod = %pod_name, namespace = %namespace, "Marked pod as resolved in Telegram"); + // Remove from tracking after marking resolved + self.message_ids.write().await.remove(&key); + } + } else { + debug!(pod = %pod_name, namespace = %namespace, "No message_id found for pod"); + } + } + + async fn send_message(&self, text: &str) -> Result> { + let url = format!("https://api.telegram.org/bot{}/sendMessage", self.bot_token); + + let request = SendMessageRequest { + chat_id: self.chat_id.clone(), + text: text.to_string(), + parse_mode: "Markdown".to_string(), + }; + + let response = self.client + .post(&url) + .json(&request) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await?; + return Err(format!("Telegram API error {}: {}", status, body).into()); + } + + let response_data: SendMessageResponse = response.json().await?; + + if response_data.ok { + if let Some(result) = response_data.result { + Ok(result.message_id) + } else { + Err("No message_id in response".into()) + } + } else { + Err("Telegram API returned ok=false".into()) + } + } + + async fn edit_message(&self, message_id: i64, text: &str) -> Result<(), Box> { + let url = format!("https://api.telegram.org/bot{}/editMessageText", self.bot_token); + + let payload = serde_json::json!({ + "chat_id": self.chat_id, + "message_id": message_id, + "text": text, + "parse_mode": "Markdown" + }); + + let response = self.client + .post(&url) + .json(&payload) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await?; + return Err(format!("Telegram API error {}: {}", status, body).into()); + } + + Ok(()) + } +} + +/// Optional wrapper - returns None if Telegram not configured +pub fn create_notifier(bot_token: Option, chat_id: Option) -> Option { + match (bot_token, chat_id) { + (Some(token), Some(chat)) if !token.is_empty() && !chat.is_empty() => { + Some(TelegramNotifier::new(token, chat)) + } + _ => None, + } +}