feat: SSE para chat desktop, rate limiting, retry, testes e atualizacao de stack

- Implementa Server-Sent Events (SSE) para chat no desktop com fallback HTTP
- Adiciona rate limiting nas APIs de chat (poll, messages, sessions)
- Adiciona retry com backoff exponencial para mutations
- Cria testes para modulo liveChat (20 testes)
- Corrige testes de SMTP (unit tests para extractEnvelopeAddress)
- Adiciona indice by_status_lastActivity para cron de sessoes inativas
- Atualiza stack: Bun 1.3.4, React 19, recharts 3, noble/hashes 2, etc

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
esdrasrenan 2025-12-07 16:29:18 -03:00
parent 0e0bd9a49c
commit d01c37522f
19 changed files with 1465 additions and 443 deletions

View file

@ -62,11 +62,13 @@ version = "0.1.0"
dependencies = [
"base64 0.22.1",
"chrono",
"futures-util",
"get_if_addrs",
"hostname",
"once_cell",
"parking_lot",
"reqwest",
"reqwest-eventsource",
"serde",
"serde_json",
"sha2",
@ -985,6 +987,17 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "eventsource-stream"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab"
dependencies = [
"futures-core",
"nom",
"pin-project-lite",
]
[[package]]
name = "fastrand"
version = "2.3.0"
@ -1159,6 +1172,12 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-timer"
version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
[[package]]
name = "futures-util"
version = "0.3.31"
@ -2166,6 +2185,12 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "minisign-verify"
version = "0.2.4"
@ -2269,6 +2294,16 @@ version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb"
[[package]]
name = "nom"
version = "7.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
dependencies = [
"memchr",
"minimal-lexical",
]
[[package]]
name = "notify-rust"
version = "4.11.7"
@ -3364,6 +3399,22 @@ dependencies = [
"webpki-roots",
]
[[package]]
name = "reqwest-eventsource"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "632c55746dbb44275691640e7b40c907c16a2dc1a5842aa98aaec90da6ec6bde"
dependencies = [
"eventsource-stream",
"futures-core",
"futures-timer",
"mime",
"nom",
"pin-project-lite",
"reqwest",
"thiserror 1.0.69",
]
[[package]]
name = "ring"
version = "0.17.14"

View file

@ -29,6 +29,8 @@ serde_json = "1"
sysinfo = { version = "0.31", default-features = false, features = ["multithread", "network", "system", "disk"] }
get_if_addrs = "0.5"
reqwest = { version = "0.12", features = ["json", "rustls-tls", "blocking"], default-features = false }
reqwest-eventsource = "0.6"
futures-util = "0.3"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
once_cell = "1.19"
thiserror = "1.0"

View file

@ -1,19 +1,21 @@
//! Modulo de Chat em Tempo Real
//!
//! Este modulo implementa o sistema de chat entre agentes (dashboard web)
//! e clientes (Raven desktop). Inclui polling de mensagens, gerenciamento
//! de janelas de chat e emissao de eventos.
//! e clientes (Raven desktop). Usa SSE (Server-Sent Events) como metodo
//! primario para atualizacoes em tempo real, com fallback para HTTP polling.
use futures_util::StreamExt;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use reqwest::Client;
use reqwest_eventsource::{Event, EventSource};
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tauri::async_runtime::JoinHandle;
use tauri::{Emitter, Manager, WebviewWindowBuilder, WebviewUrl};
use tauri_plugin_notification::NotificationExt;
use tokio::sync::Notify;
// ============================================================================
// TYPES
@ -396,18 +398,32 @@ pub async fn upload_file(
Ok(data.storage_id)
}
// ============================================================================
// SSE (Server-Sent Events) TYPES
// ============================================================================
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct SseUpdateEvent {
has_active_sessions: bool,
sessions: Vec<ChatSessionSummary>,
total_unread: u32,
ts: i64,
}
// ============================================================================
// CHAT RUNTIME
// ============================================================================
struct ChatPollerHandle {
stop_signal: Arc<Notify>,
stop_flag: Arc<AtomicBool>,
join_handle: JoinHandle<()>,
is_using_sse: Arc<AtomicBool>,
}
impl ChatPollerHandle {
fn stop(self) {
self.stop_signal.notify_waiters();
self.stop_flag.store(true, Ordering::Relaxed);
self.join_handle.abort();
}
}
@ -417,6 +433,7 @@ pub struct ChatRuntime {
inner: Arc<Mutex<Option<ChatPollerHandle>>>,
last_sessions: Arc<Mutex<Vec<ChatSession>>>,
last_unread_count: Arc<Mutex<u32>>,
is_using_sse: Arc<AtomicBool>,
}
impl ChatRuntime {
@ -425,9 +442,17 @@ impl ChatRuntime {
inner: Arc::new(Mutex::new(None)),
last_sessions: Arc::new(Mutex::new(Vec::new())),
last_unread_count: Arc::new(Mutex::new(0)),
is_using_sse: Arc::new(AtomicBool::new(false)),
}
}
/// Retorna true se esta usando SSE, false se usando polling HTTP
pub fn is_using_sse(&self) -> bool {
self.is_using_sse.load(Ordering::Relaxed)
}
/// Inicia o sistema de atualizacoes de chat.
/// Tenta SSE primeiro, com fallback automatico para HTTP polling.
pub fn start_polling(
&self,
base_url: String,
@ -439,7 +464,7 @@ impl ChatRuntime {
return Err("URL base invalida".to_string());
}
// Para polling existente
// Para polling/SSE existente
{
let mut guard = self.inner.lock();
if let Some(handle) = guard.take() {
@ -447,207 +472,70 @@ impl ChatRuntime {
}
}
let stop_signal = Arc::new(Notify::new());
let stop_clone = stop_signal.clone();
let stop_flag = Arc::new(AtomicBool::new(false));
let stop_clone = stop_flag.clone();
let base_clone = sanitized_base.clone();
let token_clone = token.clone();
let last_sessions = self.last_sessions.clone();
let last_unread_count = self.last_unread_count.clone();
let is_using_sse = self.is_using_sse.clone();
let join_handle = tauri::async_runtime::spawn(async move {
crate::log_info!("Chat polling iniciado");
let mut last_checked_at: Option<i64> = None;
let poll_interval = Duration::from_secs(2); // Intervalo reduzido para maior responsividade
crate::log_info!("Chat iniciando (tentando SSE primeiro)");
// Loop principal com SSE + fallback para polling
loop {
tokio::select! {
_ = stop_clone.notified() => {
crate::log_info!("Chat polling encerrado");
// Verificar se deve parar
if stop_clone.load(Ordering::Relaxed) {
crate::log_info!("Chat encerrado");
break;
}
// Tentar SSE primeiro
let sse_result = run_sse_loop(
&base_clone,
&token_clone,
&app,
&last_sessions,
&last_unread_count,
&is_using_sse,
&stop_clone,
)
.await;
// Verificar se deve parar
if stop_clone.load(Ordering::Relaxed) {
crate::log_info!("Chat encerrado");
break;
}
match sse_result {
Ok(()) => {
// SSE encerrado normalmente (stop signal)
break;
}
_ = tokio::time::sleep(poll_interval) => {
match poll_chat_updates(&base_clone, &token_clone, last_checked_at).await {
Ok(result) => {
last_checked_at = Some(chrono::Utc::now().timestamp_millis());
Err(e) => {
crate::log_warn!("SSE falhou: {e}. Usando polling HTTP...");
is_using_sse.store(false, Ordering::Relaxed);
// DEBUG: Log do resultado do polling
crate::log_info!(
"[CHAT DEBUG] poll_chat_updates: has_active={}, total_unread={}, sessions_count={}",
result.has_active_sessions,
result.total_unread,
result.sessions.len()
);
// Executar polling HTTP por 5 minutos, depois tentar SSE novamente
let poll_duration = Duration::from_secs(300); // 5 minutos
let poll_result = run_polling_loop(
&base_clone,
&token_clone,
&app,
&last_sessions,
&last_unread_count,
&stop_clone,
poll_duration,
)
.await;
// Buscar sessoes completas para ter dados corretos
let current_sessions = if result.has_active_sessions {
let sessions = fetch_sessions(&base_clone, &token_clone).await.unwrap_or_default();
crate::log_info!(
"[CHAT DEBUG] fetch_sessions: {} sessoes encontradas",
sessions.len()
);
for s in &sessions {
crate::log_info!(
"[CHAT DEBUG] Sessao: id={}, ticket={}, unread={}",
s.session_id,
s.ticket_id,
s.unread_count
);
}
sessions
} else {
Vec::new()
};
// Verificar sessoes anteriores
let prev_sessions: Vec<ChatSession> = last_sessions.lock().clone();
let prev_session_ids: Vec<String> = prev_sessions.iter().map(|s| s.session_id.clone()).collect();
let current_session_ids: Vec<String> = current_sessions.iter().map(|s| s.session_id.clone()).collect();
// Detectar novas sessoes
for session in &current_sessions {
if !prev_session_ids.contains(&session.session_id) {
// Nova sessao! Emitir evento
crate::log_info!(
"Nova sessao de chat: ticket={}, session={}",
session.ticket_id,
session.session_id
);
let _ = app.emit(
"raven://chat/session-started",
SessionStartedEvent {
session: session.clone(),
},
);
// Enviar notificacao nativa do Windows
let notification_title = format!(
"Chat iniciado - Chamado #{}",
session.ticket_ref
);
let notification_body = format!(
"{} iniciou um chat de suporte.\nClique no icone do Raven para abrir.",
session.agent_name
);
if let Err(e) = app
.notification()
.builder()
.title(&notification_title)
.body(&notification_body)
.show()
{
crate::log_warn!(
"Falha ao enviar notificacao de nova sessao: {e}"
);
}
}
}
// Detectar sessoes encerradas
for prev_session in &prev_sessions {
if !current_session_ids.contains(&prev_session.session_id) {
// Sessao foi encerrada! Emitir evento
crate::log_info!(
"Sessao de chat encerrada: ticket={}, session={}",
prev_session.ticket_id,
prev_session.session_id
);
let _ = app.emit(
"raven://chat/session-ended",
serde_json::json!({
"sessionId": prev_session.session_id,
"ticketId": prev_session.ticket_id
}),
);
}
}
// Atualizar cache de sessoes
*last_sessions.lock() = current_sessions.clone();
// Verificar mensagens nao lidas
let prev_unread = *last_unread_count.lock();
let new_messages = result.total_unread > prev_unread;
*last_unread_count.lock() = result.total_unread;
// DEBUG: Log de unread count
crate::log_info!(
"[CHAT DEBUG] Unread check: prev={}, current={}, new_messages={}",
prev_unread,
result.total_unread,
new_messages
);
// Sempre emitir unread-update com sessoes completas
crate::log_info!(
"[CHAT DEBUG] Emitindo unread-update: totalUnread={}, sessions={}",
result.total_unread,
current_sessions.len()
);
let _ = app.emit(
"raven://chat/unread-update",
serde_json::json!({
"totalUnread": result.total_unread,
"sessions": current_sessions
}),
);
// Notificar novas mensagens (quando aumentou)
if new_messages && result.total_unread > 0 {
crate::log_info!("[CHAT DEBUG] NOVA MENSAGEM DETECTADA! Emitindo evento new-message");
let new_count = result.total_unread - prev_unread;
crate::log_info!(
"Chat: {} novas mensagens (total={})",
new_count,
result.total_unread
);
// Emitir evento para o frontend atualizar UI
let _ = app.emit(
"raven://chat/new-message",
serde_json::json!({
"totalUnread": result.total_unread,
"newCount": new_count,
"sessions": current_sessions
}),
);
// Abrir janela de chat automaticamente para a sessao com nova mensagem
if let Some(session) = current_sessions.first() {
crate::log_info!(
"[CHAT DEBUG] Abrindo janela de chat para ticket={}",
session.ticket_id
);
if let Err(e) = open_chat_window(&app, &session.ticket_id) {
crate::log_warn!("Falha ao abrir janela de chat: {e}");
}
}
// Enviar notificacao nativa do Windows
let notification_title = "Nova mensagem de suporte";
let notification_body = if new_count == 1 {
"Voce recebeu 1 nova mensagem no chat".to_string()
} else {
format!("Voce recebeu {} novas mensagens no chat", new_count)
};
if let Err(e) = app
.notification()
.builder()
.title(notification_title)
.body(&notification_body)
.show()
{
crate::log_warn!(
"Falha ao enviar notificacao de nova mensagem: {e}"
);
}
}
}
Err(e) => {
crate::log_warn!("Falha no polling de chat: {e}");
}
if poll_result.is_err() || stop_clone.load(Ordering::Relaxed) {
break;
}
crate::log_info!("Tentando reconectar SSE...");
}
}
}
@ -655,8 +543,9 @@ impl ChatRuntime {
let mut guard = self.inner.lock();
*guard = Some(ChatPollerHandle {
stop_signal,
stop_flag,
join_handle,
is_using_sse: self.is_using_sse.clone(),
});
Ok(())
@ -667,6 +556,7 @@ impl ChatRuntime {
if let Some(handle) = guard.take() {
handle.stop();
}
self.is_using_sse.store(false, Ordering::Relaxed);
}
pub fn get_sessions(&self) -> Vec<ChatSession> {
@ -674,6 +564,277 @@ impl ChatRuntime {
}
}
// ============================================================================
// SSE LOOP
// ============================================================================
async fn run_sse_loop(
base_url: &str,
token: &str,
app: &tauri::AppHandle,
last_sessions: &Arc<Mutex<Vec<ChatSession>>>,
last_unread_count: &Arc<Mutex<u32>>,
is_using_sse: &Arc<AtomicBool>,
stop_flag: &Arc<AtomicBool>,
) -> Result<(), String> {
let sse_url = format!("{}/api/machines/chat/stream?token={}", base_url, token);
crate::log_info!("Conectando SSE: {}", sse_url);
let request = CHAT_CLIENT.get(&sse_url);
let mut es = EventSource::new(request).map_err(|e| format!("Falha ao criar EventSource: {e}"))?;
is_using_sse.store(true, Ordering::Relaxed);
crate::log_info!("SSE conectado com sucesso");
loop {
// Verificar stop flag periodicamente
if stop_flag.load(Ordering::Relaxed) {
crate::log_info!("SSE encerrado por stop flag");
return Ok(());
}
// Usar timeout para poder verificar stop flag
let event = tokio::time::timeout(Duration::from_secs(1), es.next()).await;
match event {
Ok(Some(Ok(Event::Open))) => {
crate::log_info!("SSE: conexao aberta");
}
Ok(Some(Ok(Event::Message(msg)))) => {
let event_type = msg.event.as_str();
match event_type {
"connected" => {
crate::log_info!("SSE: evento connected recebido");
}
"heartbeat" => {
// Ignorar heartbeats silenciosamente
}
"update" => {
// Processar update de chat
if let Ok(update) = serde_json::from_str::<SseUpdateEvent>(&msg.data) {
process_chat_update(
base_url,
token,
app,
last_sessions,
last_unread_count,
update.has_active_sessions,
update.total_unread,
)
.await;
}
}
"error" => {
crate::log_warn!("SSE: erro recebido do servidor: {}", msg.data);
return Err(format!("Erro SSE do servidor: {}", msg.data));
}
_ => {
crate::log_info!("SSE: evento desconhecido: {}", event_type);
}
}
}
Ok(Some(Err(e))) => {
crate::log_warn!("SSE erro: {e}");
return Err(format!("Erro SSE: {e}"));
}
Ok(None) => {
crate::log_info!("SSE: stream encerrado");
return Err("Stream SSE encerrado".to_string());
}
Err(_) => {
// Timeout - continuar loop para verificar stop flag
}
}
}
}
// ============================================================================
// HTTP POLLING LOOP (FALLBACK)
// ============================================================================
async fn run_polling_loop(
base_url: &str,
token: &str,
app: &tauri::AppHandle,
last_sessions: &Arc<Mutex<Vec<ChatSession>>>,
last_unread_count: &Arc<Mutex<u32>>,
stop_flag: &Arc<AtomicBool>,
max_duration: Duration,
) -> Result<(), String> {
crate::log_info!("Iniciando polling HTTP (fallback)");
let start = std::time::Instant::now();
let poll_interval = Duration::from_secs(2);
let mut last_checked_at: Option<i64> = None;
loop {
// Verificar se deve parar ou se atingiu duracao maxima
if stop_flag.load(Ordering::Relaxed) {
crate::log_info!("Polling HTTP encerrado por stop flag");
return Ok(());
}
if start.elapsed() >= max_duration {
crate::log_info!("Polling HTTP: duracao maxima atingida");
return Ok(());
}
tokio::time::sleep(poll_interval).await;
// Verificar novamente apos sleep
if stop_flag.load(Ordering::Relaxed) {
return Ok(());
}
match poll_chat_updates(base_url, token, last_checked_at).await {
Ok(result) => {
last_checked_at = Some(chrono::Utc::now().timestamp_millis());
process_chat_update(
base_url,
token,
app,
last_sessions,
last_unread_count,
result.has_active_sessions,
result.total_unread,
)
.await;
}
Err(e) => {
crate::log_warn!("Falha no polling de chat: {e}");
}
}
}
}
// ============================================================================
// SHARED UPDATE PROCESSING
// ============================================================================
async fn process_chat_update(
base_url: &str,
token: &str,
app: &tauri::AppHandle,
last_sessions: &Arc<Mutex<Vec<ChatSession>>>,
last_unread_count: &Arc<Mutex<u32>>,
has_active_sessions: bool,
total_unread: u32,
) {
// Buscar sessoes completas para ter dados corretos
let current_sessions = if has_active_sessions {
fetch_sessions(base_url, token).await.unwrap_or_default()
} else {
Vec::new()
};
// Verificar sessoes anteriores
let prev_sessions: Vec<ChatSession> = last_sessions.lock().clone();
let prev_session_ids: Vec<String> = prev_sessions.iter().map(|s| s.session_id.clone()).collect();
let current_session_ids: Vec<String> = current_sessions.iter().map(|s| s.session_id.clone()).collect();
// Detectar novas sessoes
for session in &current_sessions {
if !prev_session_ids.contains(&session.session_id) {
crate::log_info!(
"Nova sessao de chat: ticket={}, session={}",
session.ticket_id,
session.session_id
);
let _ = app.emit(
"raven://chat/session-started",
SessionStartedEvent {
session: session.clone(),
},
);
// Notificacao nativa
let notification_title = format!("Chat iniciado - Chamado #{}", session.ticket_ref);
let notification_body = format!(
"{} iniciou um chat de suporte.\nClique no icone do Raven para abrir.",
session.agent_name
);
let _ = app
.notification()
.builder()
.title(&notification_title)
.body(&notification_body)
.show();
}
}
// Detectar sessoes encerradas
for prev_session in &prev_sessions {
if !current_session_ids.contains(&prev_session.session_id) {
crate::log_info!(
"Sessao de chat encerrada: ticket={}, session={}",
prev_session.ticket_id,
prev_session.session_id
);
let _ = app.emit(
"raven://chat/session-ended",
serde_json::json!({
"sessionId": prev_session.session_id,
"ticketId": prev_session.ticket_id
}),
);
}
}
// Atualizar cache de sessoes
*last_sessions.lock() = current_sessions.clone();
// Verificar mensagens nao lidas
let prev_unread = *last_unread_count.lock();
let new_messages = total_unread > prev_unread;
*last_unread_count.lock() = total_unread;
// Sempre emitir unread-update
let _ = app.emit(
"raven://chat/unread-update",
serde_json::json!({
"totalUnread": total_unread,
"sessions": current_sessions
}),
);
// Notificar novas mensagens
if new_messages && total_unread > 0 {
let new_count = total_unread - prev_unread;
crate::log_info!("Chat: {} novas mensagens (total={})", new_count, total_unread);
let _ = app.emit(
"raven://chat/new-message",
serde_json::json!({
"totalUnread": total_unread,
"newCount": new_count,
"sessions": current_sessions
}),
);
// Abrir janela de chat
if let Some(session) = current_sessions.first() {
let _ = open_chat_window(app, &session.ticket_id);
}
// Notificacao nativa
let notification_title = "Nova mensagem de suporte";
let notification_body = if new_count == 1 {
"Voce recebeu 1 nova mensagem no chat".to_string()
} else {
format!("Voce recebeu {} novas mensagens no chat", new_count)
};
let _ = app
.notification()
.builder()
.title(notification_title)
.body(&notification_body)
.show();
}
}
// ============================================================================
// WINDOW MANAGEMENT
// ============================================================================