From 1d3580b1875c92fa052f6fed8e950c2042344c17 Mon Sep 17 00:00:00 2001 From: rever-tecnologia Date: Tue, 9 Dec 2025 15:31:08 -0300 Subject: [PATCH] Use Convex WS client in desktop chat runtime --- apps/desktop/src-tauri/Cargo.lock | 423 ++++++++++++++++++++++++++++- apps/desktop/src-tauri/Cargo.toml | 1 + apps/desktop/src-tauri/src/chat.rs | 405 ++++++--------------------- apps/desktop/src-tauri/src/lib.rs | 21 +- 4 files changed, 510 insertions(+), 340 deletions(-) diff --git a/apps/desktop/src-tauri/Cargo.lock b/apps/desktop/src-tauri/Cargo.lock index 3abe894..c73a23a 100644 --- a/apps/desktop/src-tauri/Cargo.lock +++ b/apps/desktop/src-tauri/Cargo.lock @@ -62,6 +62,7 @@ version = "0.1.0" dependencies = [ "base64 0.22.1", "chrono", + "convex", "futures-util", "get_if_addrs", "hostname", @@ -94,6 +95,12 @@ dependencies = [ "derive_arbitrary", ] +[[package]] +name = "archery" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e0a5f99dfebb87bb342d0f53bb92c81842e100bbb915223e38349580e5441d" + [[package]] name = "async-broadcast" version = "0.7.2" @@ -275,6 +282,12 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.7" @@ -302,6 +315,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bitmaps" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d084b0137aaa901caf9f1e8b21daa6aa24d41cd806e111335541eff9683bd6" + [[package]] name = "block-buffer" version = "0.10.4" @@ -571,6 +590,57 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "convert_case" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9" +dependencies = [ + "unicode-segmentation", +] + +[[package]] +name = "convex" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e7ab85cfc76e9a13d252da8a7933ab52f38b9c51de3f7bb8dbe4e2262bac04" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.13.1", + "bytes", + "convex_sync_types", + "futures", + "imbl", + "rand 0.9.2", + "serde_json", + "thiserror 2.0.17", + "tokio", + "tokio-stream", + "tokio-tungstenite", + "tracing", + "url", + "uuid", +] + +[[package]] +name = "convex_sync_types" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f819ce8fd4370f235f2f5e345499fa219d7d1827cd88923f1fa42942853604a0" +dependencies = [ + "anyhow", + "base64 0.13.1", + "bytes", + "derive_more 2.1.0", + "headers", + "rand 0.9.2", + "serde", + "serde_json", + "strum", + "uuid", +] + [[package]] name = "cookie" version = "0.18.1" @@ -581,6 +651,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation" version = "0.10.1" @@ -604,9 +684,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa95a34622365fa5bbf40b20b75dba8dfa8c94c734aea8ac9a5ca38af14316f1" dependencies = [ "bitflags 2.9.4", - "core-foundation", + "core-foundation 0.10.1", "core-graphics-types", - "foreign-types", + "foreign-types 0.5.0", "libc", ] @@ -617,7 +697,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d44a101f213f6c4cdc1853d4b78aef6db6bdfa3468798cc1d9912f4735013eb" dependencies = [ "bitflags 2.9.4", - "core-foundation", + "core-foundation 0.10.1", "libc", ] @@ -761,6 +841,12 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "data-encoding" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" + [[package]] name = "deranged" version = "0.5.4" @@ -788,13 +874,36 @@ version = "0.99.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6edb4b64a43d977b8e99788fe3a04d483834fba1215a7e02caa415b626497f7f" dependencies = [ - "convert_case", + "convert_case 0.4.0", "proc-macro2", "quote", "rustc_version", "syn 2.0.106", ] +[[package]] +name = "derive_more" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10b768e943bed7bf2cab53df09f4bc34bfd217cdb57d971e769874c9a6710618" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d286bfdaf75e988b4a78e013ecd79c581e06399ab53fbacd2d916c2f904f30b" +dependencies = [ + "convert_case 0.10.0", + "proc-macro2", + "quote", + "rustc_version", + "syn 2.0.106", + "unicode-xid", +] + [[package]] name = "digest" version = "0.10.7" @@ -1081,6 +1190,15 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared 0.1.1", +] + [[package]] name = "foreign-types" version = "0.5.0" @@ -1088,7 +1206,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965" dependencies = [ "foreign-types-macros", - "foreign-types-shared", + "foreign-types-shared 0.3.1", ] [[package]] @@ -1102,6 +1220,12 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "foreign-types-shared" version = "0.3.1" @@ -1127,6 +1251,21 @@ dependencies = [ "new_debug_unreachable", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -1202,6 +1341,7 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -1569,6 +1709,30 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" +[[package]] +name = "headers" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3314d5adb5d94bcdf56771f2e50dbbc80bb4bdf88967526706205ac9eff24eb" +dependencies = [ + "base64 0.22.1", + "bytes", + "headers-core", + "http", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http", +] + [[package]] name = "heck" version = "0.4.1" @@ -1656,6 +1820,12 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "hyper" version = "1.7.0" @@ -1865,6 +2035,29 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "imbl" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4308a675e4cfc1920f36a8f4d8fb62d5533b7da106844bd1ec51c6f1fa94a0c" +dependencies = [ + "archery", + "bitmaps", + "imbl-sized-chunks", + "rand_core 0.9.3", + "rand_xoshiro", + "version_check", +] + +[[package]] +name = "imbl-sized-chunks" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f4241005618a62f8d57b2febd02510fb96e0137304728543dfc5fd6f052c22d" +dependencies = [ + "bitmaps", +] + [[package]] name = "indexmap" version = "1.9.3" @@ -2257,6 +2450,23 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "ndk" version = "0.9.0" @@ -2675,6 +2885,60 @@ dependencies = [ "pathdiff", ] +[[package]] +name = "openssl" +version = "0.10.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328" +dependencies = [ + "bitflags 2.9.4", + "cfg-if", + "foreign-types 0.3.2", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + +[[package]] +name = "openssl-src" +version = "300.5.4+3.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507b3792995dae9b0df8a1c1e3771e8418b7c2d9f0baeba32e6fe8b06c7cb72" +dependencies = [ + "cc", +] + +[[package]] +name = "openssl-sys" +version = "0.9.111" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82cab2d520aa75e3c58898289429321eb788c3106963d0dc886ec7a5f4adc321" +dependencies = [ + "cc", + "libc", + "openssl-src", + "pkg-config", + "vcpkg", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -3280,6 +3544,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "rand_xoshiro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" +dependencies = [ + "rand_core 0.9.3", +] + [[package]] name = "raw-window-handle" version = "0.6.2" @@ -3531,6 +3804,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "schemars" version = "0.8.22" @@ -3588,6 +3870,29 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags 2.9.4", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc1f0cbffaac4852523ce30d8bd3c5cdc873501d96ff467ca09b6767bb8cd5c0" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "selectors" version = "0.24.0" @@ -3596,7 +3901,7 @@ checksum = "0c37578180969d00692904465fb7f6b3d50b9a2b952b87c23d0e2e5cb5013416" dependencies = [ "bitflags 1.3.2", "cssparser", - "derive_more", + "derive_more 0.99.20", "fxhash", "log", "phf 0.8.0", @@ -3675,6 +3980,7 @@ version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" dependencies = [ + "indexmap 2.11.4", "itoa", "memchr", "ryu", @@ -3786,6 +4092,17 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.9" @@ -3861,7 +4178,7 @@ dependencies = [ "bytemuck", "cfg_aliases", "core-graphics", - "foreign-types", + "foreign-types 0.5.0", "js-sys", "log", "objc2 0.5.2", @@ -3943,6 +4260,27 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.27.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af23d6f6c1a224baef9d3f61e287d2761385a5b88fdab4eb4c6f11aeb54c4bcf" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.27.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7695ce3845ea4b33927c055a39dc438a45b059f7c1b3d91d38d10355fb8cbca7" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "subtle" version = "2.6.1" @@ -4037,7 +4375,7 @@ checksum = "959469667dbcea91e5485fc48ba7dd6023face91bb0f1a14681a70f99847c3f7" dependencies = [ "bitflags 2.9.4", "block2 0.6.2", - "core-foundation", + "core-foundation 0.10.1", "core-graphics", "crossbeam-channel", "dispatch", @@ -4601,7 +4939,9 @@ dependencies = [ "io-uring", "libc", "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "slab", "socket2", "tokio-macros", @@ -4619,6 +4959,16 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.4" @@ -4629,6 +4979,32 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a9daff607c6d2bf6c16fd681ccb7eecc83e4e2cdc1ca067ffaadfca5de7f084" +dependencies = [ + "futures-util", + "log", + "native-tls", + "tokio", + "tokio-native-tls", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.16" @@ -4842,6 +5218,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4793cb5e56680ecbb1d843515b23b6de9a75eb04b66643e256a396d43be33c13" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "native-tls", + "rand 0.9.2", + "sha1", + "thiserror 2.0.17", + "url", + "utf-8", +] + [[package]] name = "typeid" version = "1.0.3" @@ -4918,6 +5313,12 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "untrusted" version = "0.9.0" @@ -4972,6 +5373,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version-compare" version = "0.2.0" diff --git a/apps/desktop/src-tauri/Cargo.toml b/apps/desktop/src-tauri/Cargo.toml index 6999835..7732e3c 100644 --- a/apps/desktop/src-tauri/Cargo.toml +++ b/apps/desktop/src-tauri/Cargo.toml @@ -39,6 +39,7 @@ parking_lot = "0.12" hostname = "0.4" base64 = "0.22" sha2 = "0.10" +convex = "0.10.2" # SSE usa reqwest com stream, nao precisa de websocket [target.'cfg(windows)'.dependencies] diff --git a/apps/desktop/src-tauri/src/chat.rs b/apps/desktop/src-tauri/src/chat.rs index 80b3a56..a75c03d 100644 --- a/apps/desktop/src-tauri/src/chat.rs +++ b/apps/desktop/src-tauri/src/chat.rs @@ -4,12 +4,13 @@ //! e clientes (Raven desktop). Usa Server-Sent Events (SSE) como metodo //! primario para atualizacoes em tempo real, com fallback para HTTP polling. +use convex::{ConvexClient, FunctionResult, Value}; use futures_util::StreamExt; use once_cell::sync::Lazy; use parking_lot::Mutex; use reqwest::Client; use serde::{Deserialize, Serialize}; -use serde_json::Value; +use std::collections::BTreeMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -58,6 +59,7 @@ pub struct ChatAttachment { pub mime_type: Option, } +#[allow(dead_code)] #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ChatPollResponse { @@ -66,6 +68,7 @@ pub struct ChatPollResponse { pub total_unread: u32, } +#[allow(dead_code)] #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ChatSessionSummary { @@ -112,6 +115,7 @@ static CHAT_CLIENT: Lazy = Lazy::new(|| { // API FUNCTIONS // ============================================================================ +#[allow(dead_code)] pub async fn poll_chat_updates( base_url: &str, token: &str, @@ -399,59 +403,16 @@ pub async fn upload_file( Ok(data.storage_id) } -// ============================================================================ -// SSE TYPES -// ============================================================================ - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -struct SseUpdateEvent { - has_active_sessions: bool, - sessions: Vec, - total_unread: u32, - ts: i64, -} - -/// Parser de eventos SSE -struct SseEvent { - event: String, - data: String, -} - -fn parse_sse_line(buffer: &mut String, line: &str) -> Option { - if line.starts_with("event:") { - buffer.clear(); - let event_type = line.trim_start_matches("event:").trim(); - buffer.push_str(event_type); - buffer.push('\0'); // Separador interno - None - } else if line.starts_with("data:") { - let data = line.trim_start_matches("data:").trim(); - let parts: Vec<&str> = buffer.split('\0').collect(); - let event_type = if parts.len() >= 1 && !parts[0].is_empty() { - parts[0].to_string() - } else { - "message".to_string() - }; - Some(SseEvent { - event: event_type, - data: data.to_string(), - }) - } else { - None - } -} - // ============================================================================ // CHAT RUNTIME // ============================================================================ -struct ChatPollerHandle { +struct ChatRealtimeHandle { stop_flag: Arc, join_handle: JoinHandle<()>, } -impl ChatPollerHandle { +impl ChatRealtimeHandle { fn stop(self) { self.stop_flag.store(true, Ordering::Relaxed); self.join_handle.abort(); @@ -460,10 +421,10 @@ impl ChatPollerHandle { #[derive(Default, Clone)] pub struct ChatRuntime { - inner: Arc>>, + inner: Arc>>, last_sessions: Arc>>, last_unread_count: Arc>, - is_using_sse: Arc, + is_connected: Arc, } impl ChatRuntime { @@ -472,20 +433,20 @@ impl ChatRuntime { inner: Arc::new(Mutex::new(None)), last_sessions: Arc::new(Mutex::new(Vec::new())), last_unread_count: Arc::new(Mutex::new(0)), - is_using_sse: Arc::new(AtomicBool::new(false)), + is_connected: Arc::new(AtomicBool::new(false)), } } - /// Retorna true se esta usando SSE, false se usando polling HTTP + /// Retorna true se conexao WS Convex esta ativa pub fn is_using_sse(&self) -> bool { - self.is_using_sse.load(Ordering::Relaxed) + self.is_connected.load(Ordering::Relaxed) } - /// Inicia o sistema de atualizacoes de chat. - /// Tenta SSE primeiro, com fallback automatico para HTTP polling. + /// Inicia o sistema de atualizacoes de chat via WebSocket do Convex pub fn start_polling( &self, base_url: String, + convex_url: String, token: String, app: tauri::AppHandle, ) -> Result<(), String> { @@ -493,6 +454,10 @@ impl ChatRuntime { if sanitized_base.is_empty() { return Err("URL base invalida".to_string()); } + let sanitized_convex = convex_url.trim().trim_end_matches('/').to_string(); + if sanitized_convex.is_empty() { + return Err("URL do Convex inválida".to_string()); + } // Para polling/SSE existente { @@ -505,74 +470,90 @@ impl ChatRuntime { let stop_flag = Arc::new(AtomicBool::new(false)); let stop_clone = stop_flag.clone(); let base_clone = sanitized_base.clone(); + let convex_clone = sanitized_convex.clone(); let token_clone = token.clone(); let last_sessions = self.last_sessions.clone(); let last_unread_count = self.last_unread_count.clone(); - let is_using_sse = self.is_using_sse.clone(); + let is_connected = self.is_connected.clone(); let join_handle = tauri::async_runtime::spawn(async move { - crate::log_info!("Chat iniciando (tentando SSE primeiro)"); + crate::log_info!("Chat iniciando via Convex WebSocket"); - // Loop principal com SSE + fallback para polling - loop { - // Verificar se deve parar + let client_result = ConvexClient::new(&convex_clone).await; + let mut client = match client_result { + Ok(c) => c, + Err(err) => { + crate::log_warn!("Falha ao criar cliente Convex: {err:?}"); + return; + } + }; + + let mut args = BTreeMap::new(); + args.insert("machineToken".to_string(), token_clone.clone().into()); + + let subscribe_result = client.subscribe("liveChat:checkMachineUpdates", args).await; + let mut subscription = match subscribe_result { + Ok(sub) => { + is_connected.store(true, Ordering::Relaxed); + sub + } + Err(err) => { + crate::log_warn!("Falha ao assinar liveChat:checkMachineUpdates: {err:?}"); + return; + } + }; + + while let Some(next) = subscription.next().await { if stop_clone.load(Ordering::Relaxed) { - crate::log_info!("Chat encerrado"); break; } + match next { + FunctionResult::Value(Value::Object(obj)) => { + let has_active = obj + .get("hasActiveSessions") + .and_then(|v| match v { + Value::Boolean(b) => Some(*b), + _ => None, + }) + .unwrap_or(false); + let total_unread = obj + .get("totalUnread") + .and_then(|v| match v { + Value::Int64(i) => Some(*i as u32), + Value::Float64(f) => Some(*f as u32), + _ => None, + }) + .unwrap_or(0); - // Tentar SSE primeiro - let sse_result = run_sse_loop( - &base_clone, - &token_clone, - &app, - &last_sessions, - &last_unread_count, - &is_using_sse, - &stop_clone, - ) - .await; - - // Verificar se deve parar - if stop_clone.load(Ordering::Relaxed) { - crate::log_info!("Chat encerrado"); - break; - } - - match sse_result { - Ok(()) => { - // SSE encerrado normalmente (stop signal) - break; - } - Err(e) => { - crate::log_warn!("SSE falhou: {e}. Usando polling HTTP..."); - is_using_sse.store(false, Ordering::Relaxed); - - // Executar polling HTTP por 5 minutos, depois tentar SSE novamente - let poll_duration = Duration::from_secs(300); // 5 minutos - let poll_result = run_polling_loop( + process_chat_update( &base_clone, &token_clone, &app, &last_sessions, &last_unread_count, - &stop_clone, - poll_duration, + has_active, + total_unread, ) .await; - - if poll_result.is_err() || stop_clone.load(Ordering::Relaxed) { - break; - } - - crate::log_info!("Tentando reconectar SSE..."); + } + FunctionResult::ConvexError(err) => { + crate::log_warn!("Convex error em checkMachineUpdates: {err:?}"); + } + FunctionResult::ErrorMessage(msg) => { + crate::log_warn!("Erro em checkMachineUpdates: {msg}"); + } + FunctionResult::Value(other) => { + crate::log_warn!("Payload inesperado em checkMachineUpdates: {other:?}"); } } } + + is_connected.store(false, Ordering::Relaxed); + crate::log_info!("Chat encerrado (Convex WebSocket finalizado)"); }); let mut guard = self.inner.lock(); - *guard = Some(ChatPollerHandle { + *guard = Some(ChatRealtimeHandle { stop_flag, join_handle, }); @@ -585,7 +566,7 @@ impl ChatRuntime { if let Some(handle) = guard.take() { handle.stop(); } - self.is_using_sse.store(false, Ordering::Relaxed); + self.is_connected.store(false, Ordering::Relaxed); } pub fn get_sessions(&self) -> Vec { @@ -593,232 +574,6 @@ impl ChatRuntime { } } -// ============================================================================ -// SSE LOOP -// ============================================================================ - -/// Cliente HTTP para SSE com timeout mais longo (conexao persistente) -static SSE_CLIENT: Lazy = Lazy::new(|| { - Client::builder() - .user_agent("raven-chat-sse/1.0") - .timeout(Duration::from_secs(120)) // Timeout longo para SSE - .connect_timeout(Duration::from_secs(15)) - .use_rustls_tls() - .build() - .expect("failed to build SSE http client") -}); - -async fn run_sse_loop( - base_url: &str, - token: &str, - app: &tauri::AppHandle, - last_sessions: &Arc>>, - last_unread_count: &Arc>, - is_using_sse: &Arc, - stop_flag: &Arc, -) -> Result<(), String> { - let sse_url = format!("{}/api/machines/chat/stream?token={}", base_url, token); - crate::log_info!("Conectando SSE: {}", sse_url); - - // Iniciar request SSE - let response = SSE_CLIENT - .get(&sse_url) - .header("Accept", "text/event-stream") - .header("Cache-Control", "no-cache") - .send() - .await - .map_err(|e| format!("Falha ao conectar SSE: {e}"))?; - - if !response.status().is_success() { - let status = response.status(); - let body = response.text().await.unwrap_or_default(); - return Err(format!("SSE falhou: status={}, body={}", status, body)); - } - - is_using_sse.store(true, Ordering::Relaxed); - crate::log_info!("SSE conectado com sucesso"); - - // Stream de bytes - let mut stream = response.bytes_stream(); - let mut buffer = String::new(); - let mut line_buffer = String::new(); - - // Timeout para detectar conexao morta (60s sem dados = reconectar) - let mut last_data_time = std::time::Instant::now(); - let max_silence = Duration::from_secs(60); - - loop { - if stop_flag.load(Ordering::Relaxed) { - crate::log_info!("SSE encerrado por stop flag"); - return Ok(()); - } - - // Verificar timeout de silencio - if last_data_time.elapsed() > max_silence { - crate::log_warn!("SSE: timeout de silencio ({}s sem dados)", max_silence.as_secs()); - return Err("SSE timeout - sem dados".to_string()); - } - - // Aguardar proximo chunk com timeout - let chunk_result = tokio::time::timeout( - Duration::from_secs(35), // Heartbeat do servidor e a cada 30s - stream.next() - ).await; - - match chunk_result { - Ok(Some(Ok(bytes))) => { - last_data_time = std::time::Instant::now(); - - let text = String::from_utf8_lossy(&bytes); - line_buffer.push_str(&text); - - // Processar linhas completas - while let Some(newline_pos) = line_buffer.find('\n') { - let line = line_buffer[..newline_pos].trim_end_matches('\r').to_string(); - line_buffer = line_buffer[newline_pos + 1..].to_string(); - - // Linha vazia = fim do evento - if line.is_empty() { - buffer.clear(); - continue; - } - - // Parsear evento SSE - if let Some(event) = parse_sse_line(&mut buffer, &line) { - handle_sse_event( - &event, - base_url, - token, - app, - last_sessions, - last_unread_count, - ).await?; - } - } - } - Ok(Some(Err(e))) => { - crate::log_warn!("SSE erro de stream: {e}"); - return Err(format!("Erro SSE: {e}")); - } - Ok(None) => { - crate::log_info!("SSE: stream encerrado pelo servidor"); - return Err("SSE encerrado".to_string()); - } - Err(_) => { - // Timeout aguardando chunk - verificar se conexao ainda viva - if last_data_time.elapsed() > max_silence { - return Err("SSE timeout".to_string()); - } - // Caso contrario, continuar aguardando - } - } - } -} - -async fn handle_sse_event( - event: &SseEvent, - base_url: &str, - token: &str, - app: &tauri::AppHandle, - last_sessions: &Arc>>, - last_unread_count: &Arc>, -) -> Result<(), String> { - match event.event.as_str() { - "connected" => { - crate::log_info!("SSE: conectado"); - } - "heartbeat" => { - // noop - apenas mantem conexao viva - } - "update" => { - let update: SseUpdateEvent = serde_json::from_str(&event.data) - .map_err(|e| format!("Payload update inválido: {e}"))?; - process_chat_update( - base_url, - token, - app, - last_sessions, - last_unread_count, - update.has_active_sessions, - update.total_unread, - ) - .await; - } - "error" => { - let error_data: Value = serde_json::from_str(&event.data).unwrap_or_default(); - let message = error_data - .get("message") - .and_then(Value::as_str) - .unwrap_or("Erro SSE"); - return Err(message.to_string()); - } - _ => { - crate::log_info!("SSE: evento desconhecido {}", event.event); - } - } - Ok(()) -} - -// ============================================================================ -// HTTP POLLING LOOP (FALLBACK) -// ============================================================================ - -async fn run_polling_loop( - base_url: &str, - token: &str, - app: &tauri::AppHandle, - last_sessions: &Arc>>, - last_unread_count: &Arc>, - stop_flag: &Arc, - max_duration: Duration, -) -> Result<(), String> { - crate::log_info!("Iniciando polling HTTP (fallback)"); - - let start = std::time::Instant::now(); - let poll_interval = Duration::from_secs(1); // 1s para ser mais responsivo - let mut last_checked_at: Option = None; - - loop { - // Verificar se deve parar ou se atingiu duracao maxima - if stop_flag.load(Ordering::Relaxed) { - crate::log_info!("Polling HTTP encerrado por stop flag"); - return Ok(()); - } - - if start.elapsed() >= max_duration { - crate::log_info!("Polling HTTP: duracao maxima atingida"); - return Ok(()); - } - - tokio::time::sleep(poll_interval).await; - - // Verificar novamente apos sleep - if stop_flag.load(Ordering::Relaxed) { - return Ok(()); - } - - match poll_chat_updates(base_url, token, last_checked_at).await { - Ok(result) => { - last_checked_at = Some(chrono::Utc::now().timestamp_millis()); - - process_chat_update( - base_url, - token, - app, - last_sessions, - last_unread_count, - result.has_active_sessions, - result.total_unread, - ) - .await; - } - Err(e) => { - crate::log_warn!("Falha no polling de chat: {e}"); - } - } - } -} - // ============================================================================ // SHARED UPDATE PROCESSING // ============================================================================ diff --git a/apps/desktop/src-tauri/src/lib.rs b/apps/desktop/src-tauri/src/lib.rs index 3df3252..fc97ecb 100644 --- a/apps/desktop/src-tauri/src/lib.rs +++ b/apps/desktop/src-tauri/src/lib.rs @@ -23,6 +23,8 @@ use winreg::enums::*; #[cfg(target_os = "windows")] use winreg::RegKey; +const DEFAULT_CONVEX_URL: &str = "https://convex.esdrasrenan.com.br"; + // ============================================================================ // Sistema de Logging para Agente // ============================================================================ @@ -233,9 +235,11 @@ fn start_chat_polling( state: tauri::State, app: tauri::AppHandle, base_url: String, + convex_url: Option, token: String, ) -> Result<(), String> { - state.start_polling(base_url, token, app) + let url = convex_url.unwrap_or_else(|| DEFAULT_CONVEX_URL.to_string()); + state.start_polling(base_url, url, token, app) } #[tauri::command] @@ -667,6 +671,11 @@ async fn try_start_background_agent( .and_then(|v| v.as_str()) .unwrap_or("https://tickets.esdrasrenan.com.br"); + let convex_url = config + .and_then(|c| c.get("convexUrl")) + .and_then(|v| v.as_str()) + .unwrap_or(DEFAULT_CONVEX_URL); + let interval = config .and_then(|c| c.get("heartbeatIntervalSec")) .and_then(|v| v.as_u64()) @@ -688,14 +697,12 @@ async fn try_start_background_agent( .map_err(|e| format!("Falha ao iniciar heartbeat: {e}"))?; // Iniciar sistema de chat (WebSocket + fallback HTTP polling) - if let Err(e) = chat_runtime.start_polling( - api_base_url.to_string(), - token.to_string(), - app.clone(), - ) { + if let Err(e) = + chat_runtime.start_polling(api_base_url.to_string(), convex_url.to_string(), token.to_string(), app.clone()) + { log_warn!("Falha ao iniciar chat em background: {e}"); } else { - log_info!("Chat iniciado com sucesso (WebSocket + fallback polling)"); + log_info!("Chat iniciado com sucesso (Convex WebSocket)"); } log_info!("Agente iniciado com sucesso em background");