feat(chat): desktop usando Convex WS direto e fallback WS dedicado
This commit is contained in:
parent
8db7c3c810
commit
a8f5ff9d51
14 changed files with 735 additions and 458 deletions
|
|
@ -1,21 +1,23 @@
|
|||
//! Modulo de Chat em Tempo Real
|
||||
//!
|
||||
//! Este modulo implementa o sistema de chat entre agentes (dashboard web)
|
||||
//! e clientes (Raven desktop). Usa SSE (Server-Sent Events) como metodo
|
||||
//! e clientes (Raven desktop). Usa WebSocket como metodo
|
||||
//! primario para atualizacoes em tempo real, com fallback para HTTP polling.
|
||||
|
||||
use futures_util::StreamExt;
|
||||
use futures_util::{StreamExt, SinkExt};
|
||||
use once_cell::sync::Lazy;
|
||||
use parking_lot::Mutex;
|
||||
use reqwest::Client;
|
||||
use reqwest_eventsource::{Event, EventSource};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tauri::async_runtime::JoinHandle;
|
||||
use tauri::{Emitter, Manager, WebviewWindowBuilder, WebviewUrl};
|
||||
use tauri_plugin_notification::NotificationExt;
|
||||
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
|
||||
use url::Url;
|
||||
|
||||
// ============================================================================
|
||||
// TYPES
|
||||
|
|
@ -399,18 +401,24 @@ pub async fn upload_file(
|
|||
}
|
||||
|
||||
// ============================================================================
|
||||
// SSE (Server-Sent Events) TYPES
|
||||
// WebSocket TYPES
|
||||
// ============================================================================
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct SseUpdateEvent {
|
||||
struct WsUpdateEvent {
|
||||
has_active_sessions: bool,
|
||||
sessions: Vec<ChatSessionSummary>,
|
||||
total_unread: u32,
|
||||
ts: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct WsEnvelope {
|
||||
event: String,
|
||||
data: serde_json::Value,
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// CHAT RUNTIME
|
||||
// ============================================================================
|
||||
|
|
@ -432,7 +440,7 @@ pub struct ChatRuntime {
|
|||
inner: Arc<Mutex<Option<ChatPollerHandle>>>,
|
||||
last_sessions: Arc<Mutex<Vec<ChatSession>>>,
|
||||
last_unread_count: Arc<Mutex<u32>>,
|
||||
is_using_sse: Arc<AtomicBool>,
|
||||
is_using_ws: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl ChatRuntime {
|
||||
|
|
@ -441,17 +449,17 @@ impl ChatRuntime {
|
|||
inner: Arc::new(Mutex::new(None)),
|
||||
last_sessions: Arc::new(Mutex::new(Vec::new())),
|
||||
last_unread_count: Arc::new(Mutex::new(0)),
|
||||
is_using_sse: Arc::new(AtomicBool::new(false)),
|
||||
is_using_ws: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Retorna true se esta usando SSE, false se usando polling HTTP
|
||||
pub fn is_using_sse(&self) -> bool {
|
||||
self.is_using_sse.load(Ordering::Relaxed)
|
||||
/// Retorna true se esta usando WebSocket, false se usando polling HTTP
|
||||
pub fn is_using_ws(&self) -> bool {
|
||||
self.is_using_ws.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Inicia o sistema de atualizacoes de chat.
|
||||
/// Tenta SSE primeiro, com fallback automatico para HTTP polling.
|
||||
/// Tenta WebSocket primeiro, com fallback automatico para HTTP polling.
|
||||
pub fn start_polling(
|
||||
&self,
|
||||
base_url: String,
|
||||
|
|
@ -463,7 +471,7 @@ impl ChatRuntime {
|
|||
return Err("URL base invalida".to_string());
|
||||
}
|
||||
|
||||
// Para polling/SSE existente
|
||||
// Para polling/WS existente
|
||||
{
|
||||
let mut guard = self.inner.lock();
|
||||
if let Some(handle) = guard.take() {
|
||||
|
|
@ -477,12 +485,12 @@ impl ChatRuntime {
|
|||
let token_clone = token.clone();
|
||||
let last_sessions = self.last_sessions.clone();
|
||||
let last_unread_count = self.last_unread_count.clone();
|
||||
let is_using_sse = self.is_using_sse.clone();
|
||||
let is_using_ws = self.is_using_ws.clone();
|
||||
|
||||
let join_handle = tauri::async_runtime::spawn(async move {
|
||||
crate::log_info!("Chat iniciando (tentando SSE primeiro)");
|
||||
crate::log_info!("Chat iniciando (tentando WebSocket primeiro)");
|
||||
|
||||
// Loop principal com SSE + fallback para polling
|
||||
// Loop principal com WebSocket + fallback para polling
|
||||
loop {
|
||||
// Verificar se deve parar
|
||||
if stop_clone.load(Ordering::Relaxed) {
|
||||
|
|
@ -490,14 +498,14 @@ impl ChatRuntime {
|
|||
break;
|
||||
}
|
||||
|
||||
// Tentar SSE primeiro
|
||||
let sse_result = run_sse_loop(
|
||||
// Tentar WebSocket primeiro
|
||||
let ws_result = run_ws_loop(
|
||||
&base_clone,
|
||||
&token_clone,
|
||||
&app,
|
||||
&last_sessions,
|
||||
&last_unread_count,
|
||||
&is_using_sse,
|
||||
&is_using_ws,
|
||||
&stop_clone,
|
||||
)
|
||||
.await;
|
||||
|
|
@ -508,16 +516,16 @@ impl ChatRuntime {
|
|||
break;
|
||||
}
|
||||
|
||||
match sse_result {
|
||||
match ws_result {
|
||||
Ok(()) => {
|
||||
// SSE encerrado normalmente (stop signal)
|
||||
// WS encerrado normalmente (stop signal)
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
crate::log_warn!("SSE falhou: {e}. Usando polling HTTP...");
|
||||
is_using_sse.store(false, Ordering::Relaxed);
|
||||
crate::log_warn!("WebSocket falhou: {e}. Usando polling HTTP...");
|
||||
is_using_ws.store(false, Ordering::Relaxed);
|
||||
|
||||
// Executar polling HTTP por 5 minutos, depois tentar SSE novamente
|
||||
// Executar polling HTTP por 5 minutos, depois tentar WebSocket novamente
|
||||
let poll_duration = Duration::from_secs(300); // 5 minutos
|
||||
let poll_result = run_polling_loop(
|
||||
&base_clone,
|
||||
|
|
@ -534,7 +542,7 @@ impl ChatRuntime {
|
|||
break;
|
||||
}
|
||||
|
||||
crate::log_info!("Tentando reconectar SSE...");
|
||||
crate::log_info!("Tentando reconectar WebSocket...");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -554,7 +562,7 @@ impl ChatRuntime {
|
|||
if let Some(handle) = guard.take() {
|
||||
handle.stop();
|
||||
}
|
||||
self.is_using_sse.store(false, Ordering::Relaxed);
|
||||
self.is_using_ws.store(false, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn get_sessions(&self) -> Vec<ChatSession> {
|
||||
|
|
@ -563,90 +571,138 @@ impl ChatRuntime {
|
|||
}
|
||||
|
||||
// ============================================================================
|
||||
// SSE LOOP
|
||||
// WS LOOP
|
||||
// ============================================================================
|
||||
|
||||
async fn run_sse_loop(
|
||||
async fn run_ws_loop(
|
||||
base_url: &str,
|
||||
token: &str,
|
||||
app: &tauri::AppHandle,
|
||||
last_sessions: &Arc<Mutex<Vec<ChatSession>>>,
|
||||
last_unread_count: &Arc<Mutex<u32>>,
|
||||
is_using_sse: &Arc<AtomicBool>,
|
||||
is_using_ws: &Arc<AtomicBool>,
|
||||
stop_flag: &Arc<AtomicBool>,
|
||||
) -> Result<(), String> {
|
||||
let sse_url = format!("{}/api/machines/chat/stream?token={}", base_url, token);
|
||||
crate::log_info!("Conectando SSE: {}", sse_url);
|
||||
let ws_url = build_ws_url(base_url, token)?;
|
||||
crate::log_info!("Conectando WebSocket: {}", ws_url);
|
||||
|
||||
let request = CHAT_CLIENT.get(&sse_url);
|
||||
let mut es = EventSource::new(request).map_err(|e| format!("Falha ao criar EventSource: {e}"))?;
|
||||
let (ws_stream, _) = connect_async(ws_url)
|
||||
.await
|
||||
.map_err(|e| format!("Falha ao conectar WebSocket: {e}"))?;
|
||||
let (mut write, mut read) = ws_stream.split();
|
||||
|
||||
is_using_sse.store(true, Ordering::Relaxed);
|
||||
crate::log_info!("SSE conectado com sucesso");
|
||||
// Ativar ping periódico para manter conexão viva
|
||||
let mut heartbeat = tokio::time::interval(Duration::from_secs(45));
|
||||
|
||||
is_using_ws.store(true, Ordering::Relaxed);
|
||||
crate::log_info!("WebSocket conectado com sucesso");
|
||||
|
||||
loop {
|
||||
// Verificar stop flag periodicamente
|
||||
if stop_flag.load(Ordering::Relaxed) {
|
||||
crate::log_info!("SSE encerrado por stop flag");
|
||||
crate::log_info!("WebSocket encerrado por stop flag");
|
||||
let _ = write.send(Message::Close(None)).await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Usar timeout para poder verificar stop flag
|
||||
let event = tokio::time::timeout(Duration::from_secs(1), es.next()).await;
|
||||
|
||||
match event {
|
||||
Ok(Some(Ok(Event::Open))) => {
|
||||
crate::log_info!("SSE: conexao aberta");
|
||||
tokio::select! {
|
||||
_ = heartbeat.tick() => {
|
||||
let _ = write.send(Message::Ping(Vec::new())).await;
|
||||
}
|
||||
Ok(Some(Ok(Event::Message(msg)))) => {
|
||||
let event_type = msg.event.as_str();
|
||||
|
||||
match event_type {
|
||||
"connected" => {
|
||||
crate::log_info!("SSE: evento connected recebido");
|
||||
}
|
||||
"heartbeat" => {
|
||||
// Ignorar heartbeats silenciosamente
|
||||
}
|
||||
"update" => {
|
||||
// Processar update de chat
|
||||
if let Ok(update) = serde_json::from_str::<SseUpdateEvent>(&msg.data) {
|
||||
process_chat_update(
|
||||
msg = read.next() => {
|
||||
match msg {
|
||||
Some(Ok(Message::Text(text))) => {
|
||||
if let Ok(parsed) = serde_json::from_str::<WsEnvelope>(&text) {
|
||||
handle_ws_event(
|
||||
parsed,
|
||||
base_url,
|
||||
token,
|
||||
app,
|
||||
last_sessions,
|
||||
last_unread_count,
|
||||
update.has_active_sessions,
|
||||
update.total_unread,
|
||||
)
|
||||
.await;
|
||||
).await?;
|
||||
} else {
|
||||
crate::log_warn!("WebSocket: payload inválido");
|
||||
}
|
||||
}
|
||||
"error" => {
|
||||
crate::log_warn!("SSE: erro recebido do servidor: {}", msg.data);
|
||||
return Err(format!("Erro SSE do servidor: {}", msg.data));
|
||||
Some(Ok(Message::Close(_))) => {
|
||||
crate::log_info!("WebSocket: conexão encerrada pelo servidor");
|
||||
return Err("WebSocket fechado pelo servidor".to_string());
|
||||
}
|
||||
_ => {
|
||||
crate::log_info!("SSE: evento desconhecido: {}", event_type);
|
||||
Some(Ok(_)) => {
|
||||
// Ignorar outros frames
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
crate::log_warn!("WebSocket erro: {e}");
|
||||
return Err(format!("Erro WebSocket: {e}"));
|
||||
}
|
||||
None => {
|
||||
crate::log_info!("WebSocket: stream encerrado");
|
||||
return Err("WebSocket encerrado".to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Some(Err(e))) => {
|
||||
crate::log_warn!("SSE erro: {e}");
|
||||
return Err(format!("Erro SSE: {e}"));
|
||||
}
|
||||
Ok(None) => {
|
||||
crate::log_info!("SSE: stream encerrado");
|
||||
return Err("Stream SSE encerrado".to_string());
|
||||
}
|
||||
Err(_) => {
|
||||
// Timeout - continuar loop para verificar stop flag
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn build_ws_url(base_url: &str, token: &str) -> Result<Url, String> {
|
||||
let trimmed = base_url.trim_end_matches('/');
|
||||
let mut ws_url = if trimmed.starts_with("https://") {
|
||||
trimmed.replacen("https://", "wss://", 1)
|
||||
} else if trimmed.starts_with("http://") {
|
||||
trimmed.replacen("http://", "ws://", 1)
|
||||
} else {
|
||||
format!("wss://{}", trimmed)
|
||||
};
|
||||
ws_url.push_str("/chat-ws?token=");
|
||||
ws_url.push_str(token);
|
||||
Url::parse(&ws_url).map_err(|e| format!("URL WS inválida: {e}"))
|
||||
}
|
||||
|
||||
async fn handle_ws_event(
|
||||
envelope: WsEnvelope,
|
||||
base_url: &str,
|
||||
token: &str,
|
||||
app: &tauri::AppHandle,
|
||||
last_sessions: &Arc<Mutex<Vec<ChatSession>>>,
|
||||
last_unread_count: &Arc<Mutex<u32>>,
|
||||
) -> Result<(), String> {
|
||||
match envelope.event.as_str() {
|
||||
"connected" => {
|
||||
crate::log_info!("WebSocket: conectado");
|
||||
}
|
||||
"heartbeat" => {
|
||||
// noop
|
||||
}
|
||||
"update" => {
|
||||
let update: WsUpdateEvent = serde_json::from_value(envelope.data)
|
||||
.map_err(|e| format!("Payload update inválido: {e}"))?;
|
||||
process_chat_update(
|
||||
base_url,
|
||||
token,
|
||||
app,
|
||||
last_sessions,
|
||||
last_unread_count,
|
||||
update.has_active_sessions,
|
||||
update.total_unread,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
"error" => {
|
||||
let message = envelope
|
||||
.data
|
||||
.get("message")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or("Erro WebSocket");
|
||||
return Err(message.to_string());
|
||||
}
|
||||
_ => {
|
||||
crate::log_info!("WebSocket: evento desconhecido {}", envelope.event);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// HTTP POLLING LOOP (FALLBACK)
|
||||
// ============================================================================
|
||||
|
|
|
|||
|
|
@ -245,8 +245,8 @@ fn stop_chat_polling(state: tauri::State<ChatRuntime>) -> Result<(), String> {
|
|||
}
|
||||
|
||||
#[tauri::command]
|
||||
fn is_chat_using_sse(state: tauri::State<ChatRuntime>) -> bool {
|
||||
state.is_using_sse()
|
||||
fn is_chat_using_ws(state: tauri::State<ChatRuntime>) -> bool {
|
||||
state.is_using_ws()
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
|
|
@ -492,7 +492,7 @@ pub fn run() {
|
|||
// Chat commands
|
||||
start_chat_polling,
|
||||
stop_chat_polling,
|
||||
is_chat_using_sse,
|
||||
is_chat_using_ws,
|
||||
get_chat_sessions,
|
||||
fetch_chat_sessions,
|
||||
fetch_chat_messages,
|
||||
|
|
@ -632,7 +632,7 @@ fn setup_tray(app: &tauri::AppHandle) -> tauri::Result<()> {
|
|||
async fn try_start_background_agent(
|
||||
app: &tauri::AppHandle,
|
||||
agent_runtime: AgentRuntime,
|
||||
chat_runtime: ChatRuntime,
|
||||
_chat_runtime: ChatRuntime,
|
||||
) -> Result<(), String> {
|
||||
log_info!("Verificando credenciais salvas para iniciar agente...");
|
||||
|
||||
|
|
@ -687,18 +687,7 @@ async fn try_start_background_agent(
|
|||
)
|
||||
.map_err(|e| format!("Falha ao iniciar heartbeat: {e}"))?;
|
||||
|
||||
log_info!("Agente iniciado com sucesso em background");
|
||||
|
||||
// Iniciar chat polling
|
||||
if let Err(e) = chat_runtime.start_polling(
|
||||
api_base_url.to_string(),
|
||||
token.to_string(),
|
||||
app.clone(),
|
||||
) {
|
||||
log_warn!("Falha ao iniciar chat polling: {e}");
|
||||
} else {
|
||||
log_info!("Chat polling iniciado com sucesso");
|
||||
}
|
||||
log_info!("Agente iniciado com sucesso em background (chat via Convex WebSocket no frontend)");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue