fix(chat): correções de SSE, inicialização e área de clique

- Substituir WebSocket por SSE para real-time (chat.rs)
- Corrigir inicialização do chat runtime em lib.rs
- Iniciar unreadByMachine em 0 ao criar sessão (liveChat.ts)
- Corrigir área de clique do chip minimizado (pointer-events)
- Corrigir roteamento SPA no Tauri (index.html?view=chat)
- Corrigir estado inicial isMinimized como true

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
rever-tecnologia 2025-12-09 11:51:33 -03:00
parent 3aee1a6694
commit 3c2d1824fb
7 changed files with 213 additions and 256 deletions

View file

@ -1,10 +1,10 @@
//! Modulo de Chat em Tempo Real
//!
//! Este modulo implementa o sistema de chat entre agentes (dashboard web)
//! e clientes (Raven desktop). Usa WebSocket como metodo
//! e clientes (Raven desktop). Usa Server-Sent Events (SSE) como metodo
//! primario para atualizacoes em tempo real, com fallback para HTTP polling.
use futures_util::{StreamExt, SinkExt};
use futures_util::StreamExt;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use reqwest::Client;
@ -16,8 +16,6 @@ use std::time::Duration;
use tauri::async_runtime::JoinHandle;
use tauri::{Emitter, Manager, WebviewWindowBuilder, WebviewUrl};
use tauri_plugin_notification::NotificationExt;
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use url::Url;
// ============================================================================
// TYPES
@ -401,22 +399,46 @@ pub async fn upload_file(
}
// ============================================================================
// WebSocket TYPES
// SSE TYPES
// ============================================================================
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct WsUpdateEvent {
struct SseUpdateEvent {
has_active_sessions: bool,
sessions: Vec<ChatSessionSummary>,
total_unread: u32,
ts: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct WsEnvelope {
/// Parser de eventos SSE
struct SseEvent {
event: String,
data: serde_json::Value,
data: String,
}
fn parse_sse_line(buffer: &mut String, line: &str) -> Option<SseEvent> {
if line.starts_with("event:") {
buffer.clear();
let event_type = line.trim_start_matches("event:").trim();
buffer.push_str(event_type);
buffer.push('\0'); // Separador interno
None
} else if line.starts_with("data:") {
let data = line.trim_start_matches("data:").trim();
let parts: Vec<&str> = buffer.split('\0').collect();
let event_type = if parts.len() >= 1 && !parts[0].is_empty() {
parts[0].to_string()
} else {
"message".to_string()
};
Some(SseEvent {
event: event_type,
data: data.to_string(),
})
} else {
None
}
}
// ============================================================================
@ -440,7 +462,7 @@ pub struct ChatRuntime {
inner: Arc<Mutex<Option<ChatPollerHandle>>>,
last_sessions: Arc<Mutex<Vec<ChatSession>>>,
last_unread_count: Arc<Mutex<u32>>,
is_using_ws: Arc<AtomicBool>,
is_using_sse: Arc<AtomicBool>,
}
impl ChatRuntime {
@ -449,17 +471,17 @@ impl ChatRuntime {
inner: Arc::new(Mutex::new(None)),
last_sessions: Arc::new(Mutex::new(Vec::new())),
last_unread_count: Arc::new(Mutex::new(0)),
is_using_ws: Arc::new(AtomicBool::new(false)),
is_using_sse: Arc::new(AtomicBool::new(false)),
}
}
/// Retorna true se esta usando WebSocket, false se usando polling HTTP
pub fn is_using_ws(&self) -> bool {
self.is_using_ws.load(Ordering::Relaxed)
/// Retorna true se esta usando SSE, false se usando polling HTTP
pub fn is_using_sse(&self) -> bool {
self.is_using_sse.load(Ordering::Relaxed)
}
/// Inicia o sistema de atualizacoes de chat.
/// Tenta WebSocket primeiro, com fallback automatico para HTTP polling.
/// Tenta SSE primeiro, com fallback automatico para HTTP polling.
pub fn start_polling(
&self,
base_url: String,
@ -471,7 +493,7 @@ impl ChatRuntime {
return Err("URL base invalida".to_string());
}
// Para polling/WS existente
// Para polling/SSE existente
{
let mut guard = self.inner.lock();
if let Some(handle) = guard.take() {
@ -485,12 +507,12 @@ impl ChatRuntime {
let token_clone = token.clone();
let last_sessions = self.last_sessions.clone();
let last_unread_count = self.last_unread_count.clone();
let is_using_ws = self.is_using_ws.clone();
let is_using_sse = self.is_using_sse.clone();
let join_handle = tauri::async_runtime::spawn(async move {
crate::log_info!("Chat iniciando (tentando WebSocket primeiro)");
crate::log_info!("Chat iniciando (tentando SSE primeiro)");
// Loop principal com WebSocket + fallback para polling
// Loop principal com SSE + fallback para polling
loop {
// Verificar se deve parar
if stop_clone.load(Ordering::Relaxed) {
@ -498,14 +520,14 @@ impl ChatRuntime {
break;
}
// Tentar WebSocket primeiro
let ws_result = run_ws_loop(
// Tentar SSE primeiro
let sse_result = run_sse_loop(
&base_clone,
&token_clone,
&app,
&last_sessions,
&last_unread_count,
&is_using_ws,
&is_using_sse,
&stop_clone,
)
.await;
@ -516,16 +538,16 @@ impl ChatRuntime {
break;
}
match ws_result {
match sse_result {
Ok(()) => {
// WS encerrado normalmente (stop signal)
// SSE encerrado normalmente (stop signal)
break;
}
Err(e) => {
crate::log_warn!("WebSocket falhou: {e}. Usando polling HTTP...");
is_using_ws.store(false, Ordering::Relaxed);
crate::log_warn!("SSE falhou: {e}. Usando polling HTTP...");
is_using_sse.store(false, Ordering::Relaxed);
// Executar polling HTTP por 5 minutos, depois tentar WebSocket novamente
// Executar polling HTTP por 5 minutos, depois tentar SSE novamente
let poll_duration = Duration::from_secs(300); // 5 minutos
let poll_result = run_polling_loop(
&base_clone,
@ -542,7 +564,7 @@ impl ChatRuntime {
break;
}
crate::log_info!("Tentando reconectar WebSocket...");
crate::log_info!("Tentando reconectar SSE...");
}
}
}
@ -562,7 +584,7 @@ impl ChatRuntime {
if let Some(handle) = guard.take() {
handle.stop();
}
self.is_using_ws.store(false, Ordering::Relaxed);
self.is_using_sse.store(false, Ordering::Relaxed);
}
pub fn get_sessions(&self) -> Vec<ChatSession> {
@ -571,111 +593,144 @@ impl ChatRuntime {
}
// ============================================================================
// WS LOOP
// SSE LOOP
// ============================================================================
async fn run_ws_loop(
/// Cliente HTTP para SSE com timeout mais longo (conexao persistente)
static SSE_CLIENT: Lazy<Client> = Lazy::new(|| {
Client::builder()
.user_agent("raven-chat-sse/1.0")
.timeout(Duration::from_secs(120)) // Timeout longo para SSE
.connect_timeout(Duration::from_secs(15))
.use_rustls_tls()
.build()
.expect("failed to build SSE http client")
});
async fn run_sse_loop(
base_url: &str,
token: &str,
app: &tauri::AppHandle,
last_sessions: &Arc<Mutex<Vec<ChatSession>>>,
last_unread_count: &Arc<Mutex<u32>>,
is_using_ws: &Arc<AtomicBool>,
is_using_sse: &Arc<AtomicBool>,
stop_flag: &Arc<AtomicBool>,
) -> Result<(), String> {
let ws_url = build_ws_url(base_url, token)?;
crate::log_info!("Conectando WebSocket: {}", ws_url);
let sse_url = format!("{}/api/machines/chat/stream?token={}", base_url, token);
crate::log_info!("Conectando SSE: {}", sse_url);
let (ws_stream, _) = connect_async(ws_url)
// Iniciar request SSE
let response = SSE_CLIENT
.get(&sse_url)
.header("Accept", "text/event-stream")
.header("Cache-Control", "no-cache")
.send()
.await
.map_err(|e| format!("Falha ao conectar WebSocket: {e}"))?;
let (mut write, mut read) = ws_stream.split();
.map_err(|e| format!("Falha ao conectar SSE: {e}"))?;
// Ativar ping periódico para manter conexão viva
let mut heartbeat = tokio::time::interval(Duration::from_secs(45));
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(format!("SSE falhou: status={}, body={}", status, body));
}
is_using_ws.store(true, Ordering::Relaxed);
crate::log_info!("WebSocket conectado com sucesso");
is_using_sse.store(true, Ordering::Relaxed);
crate::log_info!("SSE conectado com sucesso");
// Stream de bytes
let mut stream = response.bytes_stream();
let mut buffer = String::new();
let mut line_buffer = String::new();
// Timeout para detectar conexao morta (60s sem dados = reconectar)
let mut last_data_time = std::time::Instant::now();
let max_silence = Duration::from_secs(60);
loop {
if stop_flag.load(Ordering::Relaxed) {
crate::log_info!("WebSocket encerrado por stop flag");
let _ = write.send(Message::Close(None)).await;
crate::log_info!("SSE encerrado por stop flag");
return Ok(());
}
tokio::select! {
_ = heartbeat.tick() => {
let _ = write.send(Message::Ping(Vec::new())).await;
}
msg = read.next() => {
match msg {
Some(Ok(Message::Text(text))) => {
if let Ok(parsed) = serde_json::from_str::<WsEnvelope>(&text) {
handle_ws_event(
parsed,
base_url,
token,
app,
last_sessions,
last_unread_count,
).await?;
} else {
crate::log_warn!("WebSocket: payload inválido");
}
// Verificar timeout de silencio
if last_data_time.elapsed() > max_silence {
crate::log_warn!("SSE: timeout de silencio ({}s sem dados)", max_silence.as_secs());
return Err("SSE timeout - sem dados".to_string());
}
// Aguardar proximo chunk com timeout
let chunk_result = tokio::time::timeout(
Duration::from_secs(35), // Heartbeat do servidor e a cada 30s
stream.next()
).await;
match chunk_result {
Ok(Some(Ok(bytes))) => {
last_data_time = std::time::Instant::now();
let text = String::from_utf8_lossy(&bytes);
line_buffer.push_str(&text);
// Processar linhas completas
while let Some(newline_pos) = line_buffer.find('\n') {
let line = line_buffer[..newline_pos].trim_end_matches('\r').to_string();
line_buffer = line_buffer[newline_pos + 1..].to_string();
// Linha vazia = fim do evento
if line.is_empty() {
buffer.clear();
continue;
}
Some(Ok(Message::Close(_))) => {
crate::log_info!("WebSocket: conexão encerrada pelo servidor");
return Err("WebSocket fechado pelo servidor".to_string());
}
Some(Ok(_)) => {
// Ignorar outros frames
}
Some(Err(e)) => {
crate::log_warn!("WebSocket erro: {e}");
return Err(format!("Erro WebSocket: {e}"));
}
None => {
crate::log_info!("WebSocket: stream encerrado");
return Err("WebSocket encerrado".to_string());
// Parsear evento SSE
if let Some(event) = parse_sse_line(&mut buffer, &line) {
handle_sse_event(
&event,
base_url,
token,
app,
last_sessions,
last_unread_count,
).await?;
}
}
}
Ok(Some(Err(e))) => {
crate::log_warn!("SSE erro de stream: {e}");
return Err(format!("Erro SSE: {e}"));
}
Ok(None) => {
crate::log_info!("SSE: stream encerrado pelo servidor");
return Err("SSE encerrado".to_string());
}
Err(_) => {
// Timeout aguardando chunk - verificar se conexao ainda viva
if last_data_time.elapsed() > max_silence {
return Err("SSE timeout".to_string());
}
// Caso contrario, continuar aguardando
}
}
}
}
fn build_ws_url(base_url: &str, token: &str) -> Result<Url, String> {
let trimmed = base_url.trim_end_matches('/');
let mut ws_url = if trimmed.starts_with("https://") {
trimmed.replacen("https://", "wss://", 1)
} else if trimmed.starts_with("http://") {
trimmed.replacen("http://", "ws://", 1)
} else {
format!("wss://{}", trimmed)
};
ws_url.push_str("/chat-ws?token=");
ws_url.push_str(token);
Url::parse(&ws_url).map_err(|e| format!("URL WS inválida: {e}"))
}
async fn handle_ws_event(
envelope: WsEnvelope,
async fn handle_sse_event(
event: &SseEvent,
base_url: &str,
token: &str,
app: &tauri::AppHandle,
last_sessions: &Arc<Mutex<Vec<ChatSession>>>,
last_unread_count: &Arc<Mutex<u32>>,
) -> Result<(), String> {
match envelope.event.as_str() {
match event.event.as_str() {
"connected" => {
crate::log_info!("WebSocket: conectado");
crate::log_info!("SSE: conectado");
}
"heartbeat" => {
// noop
// noop - apenas mantem conexao viva
}
"update" => {
let update: WsUpdateEvent = serde_json::from_value(envelope.data)
let update: SseUpdateEvent = serde_json::from_str(&event.data)
.map_err(|e| format!("Payload update inválido: {e}"))?;
process_chat_update(
base_url,
@ -689,15 +744,15 @@ async fn handle_ws_event(
.await;
}
"error" => {
let message = envelope
.data
let error_data: Value = serde_json::from_str(&event.data).unwrap_or_default();
let message = error_data
.get("message")
.and_then(Value::as_str)
.unwrap_or("Erro WebSocket");
.unwrap_or("Erro SSE");
return Err(message.to_string());
}
_ => {
crate::log_info!("WebSocket: evento desconhecido {}", envelope.event);
crate::log_info!("SSE: evento desconhecido {}", event.event);
}
}
Ok(())
@ -924,7 +979,8 @@ fn open_chat_window_internal(app: &tauri::AppHandle, ticket_id: &str) -> Result<
(100.0, 100.0)
};
let url_path = format!("/chat?ticketId={}", ticket_id);
// Usar query param ao inves de path para compatibilidade com SPA
let url_path = format!("index.html?view=chat&ticketId={}", ticket_id);
WebviewWindowBuilder::new(
app,