fix(chat): melhora confiabilidade da deteccao de novas mensagens

- Implementa deteccao dual: timestamp (lastActivityAt) + contador
- Adiciona persistencia de estado em ~/.local/share/Raven/chat-state.json
- Corrige race condition no servidor com refetch antes do patch
- Adiciona campo lastAgentMessageAt no schema do Convex
- Adiciona logs de diagnostico detalhados

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
rever-tecnologia 2025-12-15 09:44:03 -03:00
parent c4664ab1c7
commit 2293a0275a
5 changed files with 310 additions and 30 deletions

View file

@ -11,9 +11,11 @@ use parking_lot::Mutex;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fs;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tauri::async_runtime::JoinHandle;
use tauri::{Emitter, Manager, WebviewWindowBuilder, WebviewUrl};
use tauri_plugin_notification::NotificationExt;
@ -100,6 +102,77 @@ pub struct SessionStartedEvent {
pub session: ChatSession,
}
// ============================================================================
// PERSISTENCIA DE ESTADO
// ============================================================================
/// Estado persistido do chat para sobreviver a restarts
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ChatPersistedState {
last_unread_count: u32,
sessions: Vec<ChatSession>,
saved_at: u64, // Unix timestamp em ms
}
const STATE_FILE_NAME: &str = "chat-state.json";
const STATE_MAX_AGE_MS: u64 = 3600_000; // 1 hora - ignorar estados mais antigos
fn get_state_file_path() -> Option<PathBuf> {
dirs::data_local_dir().map(|p| p.join("Raven").join(STATE_FILE_NAME))
}
fn save_chat_state(last_unread: u32, sessions: &[ChatSession]) {
let Some(path) = get_state_file_path() else {
return;
};
// Criar diretorio se nao existir
if let Some(parent) = path.parent() {
let _ = fs::create_dir_all(parent);
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let state = ChatPersistedState {
last_unread_count: last_unread,
sessions: sessions.to_vec(),
saved_at: now,
};
if let Ok(json) = serde_json::to_string_pretty(&state) {
let _ = fs::write(&path, json);
crate::log_info!("[CHAT] Estado persistido: unread={}, sessions={}", last_unread, sessions.len());
}
}
fn load_chat_state() -> Option<ChatPersistedState> {
let path = get_state_file_path()?;
let json = fs::read_to_string(&path).ok()?;
let state: ChatPersistedState = serde_json::from_str(&json).ok()?;
// Verificar se estado nao esta muito antigo
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
if now.saturating_sub(state.saved_at) > STATE_MAX_AGE_MS {
crate::log_info!("[CHAT] Estado persistido ignorado (muito antigo)");
return None;
}
crate::log_info!(
"[CHAT] Estado restaurado: unread={}, sessions={}",
state.last_unread_count, state.sessions.len()
);
Some(state)
}
// ============================================================================
// HTTP CLIENT
// ============================================================================
@ -462,10 +535,16 @@ pub struct ChatRuntime {
impl ChatRuntime {
pub fn new() -> Self {
// Tentar restaurar estado persistido
let (sessions, unread) = match load_chat_state() {
Some(state) => (state.sessions, state.last_unread_count),
None => (Vec::new(), 0),
};
Self {
inner: Arc::new(Mutex::new(None)),
last_sessions: Arc::new(Mutex::new(Vec::new())),
last_unread_count: Arc::new(Mutex::new(0)),
last_sessions: Arc::new(Mutex::new(sessions)),
last_unread_count: Arc::new(Mutex::new(unread)),
is_connected: Arc::new(AtomicBool::new(false)),
}
}
@ -510,7 +589,9 @@ impl ChatRuntime {
let is_connected = self.is_connected.clone();
let join_handle = tauri::async_runtime::spawn(async move {
crate::log_info!("Chat iniciando (Convex realtime + fallback por polling)");
crate::log_info!("[CHAT DEBUG] Iniciando sistema de chat");
crate::log_info!("[CHAT DEBUG] Convex URL: {}", convex_clone);
crate::log_info!("[CHAT DEBUG] API Base URL: {}", base_clone);
let mut backoff_ms: u64 = 1_000;
let max_backoff_ms: u64 = 30_000;
@ -522,12 +603,16 @@ impl ChatRuntime {
break;
}
crate::log_info!("[CHAT DEBUG] Tentando conectar ao Convex...");
let client_result = ConvexClient::new(&convex_clone).await;
let mut client = match client_result {
Ok(c) => c,
Ok(c) => {
crate::log_info!("[CHAT DEBUG] Cliente Convex criado com sucesso");
c
}
Err(err) => {
is_connected.store(false, Ordering::Relaxed);
crate::log_warn!("Falha ao criar cliente Convex: {err:?}");
crate::log_warn!("[CHAT DEBUG] FALHA ao criar cliente Convex: {err:?}");
if last_poll.elapsed() >= poll_interval {
poll_and_process_chat_update(
@ -550,16 +635,18 @@ impl ChatRuntime {
let mut args = BTreeMap::new();
args.insert("machineToken".to_string(), token_clone.clone().into());
crate::log_info!("[CHAT DEBUG] Assinando liveChat:checkMachineUpdates...");
let subscribe_result = client.subscribe("liveChat:checkMachineUpdates", args).await;
let mut subscription = match subscribe_result {
Ok(sub) => {
is_connected.store(true, Ordering::Relaxed);
backoff_ms = 1_000;
crate::log_info!("[CHAT DEBUG] CONECTADO ao Convex WebSocket com sucesso!");
sub
}
Err(err) => {
is_connected.store(false, Ordering::Relaxed);
crate::log_warn!("Falha ao assinar liveChat:checkMachineUpdates: {err:?}");
crate::log_warn!("[CHAT DEBUG] FALHA ao assinar checkMachineUpdates: {err:?}");
if last_poll.elapsed() >= poll_interval {
poll_and_process_chat_update(
@ -579,8 +666,12 @@ impl ChatRuntime {
}
};
crate::log_info!("[CHAT DEBUG] Entrando no loop de escuta WebSocket...");
let mut update_count: u64 = 0;
while let Some(next) = subscription.next().await {
update_count += 1;
if stop_clone.load(Ordering::Relaxed) {
crate::log_info!("[CHAT DEBUG] Stop flag detectado, saindo do loop");
break;
}
match next {
@ -601,6 +692,11 @@ impl ChatRuntime {
})
.unwrap_or(0);
crate::log_info!(
"[CHAT DEBUG] UPDATE #{} recebido via WebSocket: hasActive={}, totalUnread={}",
update_count, has_active, total_unread
);
process_chat_update(
&base_clone,
&token_clone,
@ -613,13 +709,13 @@ impl ChatRuntime {
.await;
}
FunctionResult::ConvexError(err) => {
crate::log_warn!("Convex error em checkMachineUpdates: {err:?}");
crate::log_warn!("[CHAT DEBUG] Convex error em checkMachineUpdates: {err:?}");
}
FunctionResult::ErrorMessage(msg) => {
crate::log_warn!("Erro em checkMachineUpdates: {msg}");
crate::log_warn!("[CHAT DEBUG] Erro em checkMachineUpdates: {msg}");
}
FunctionResult::Value(other) => {
crate::log_warn!("Payload inesperado em checkMachineUpdates: {other:?}");
crate::log_warn!("[CHAT DEBUG] Payload inesperado em checkMachineUpdates: {other:?}");
}
}
}
@ -627,10 +723,11 @@ impl ChatRuntime {
is_connected.store(false, Ordering::Relaxed);
if stop_clone.load(Ordering::Relaxed) {
crate::log_info!("[CHAT DEBUG] Stop flag detectado apos loop");
break;
}
crate::log_warn!("Chat realtime desconectado; aplicando fallback e tentando reconectar");
crate::log_warn!("[CHAT DEBUG] WebSocket DESCONECTADO! Aplicando fallback e tentando reconectar...");
if last_poll.elapsed() >= poll_interval {
poll_and_process_chat_update(
&base_clone,
@ -684,8 +781,13 @@ async fn poll_and_process_chat_update(
last_sessions: &Arc<Mutex<Vec<ChatSession>>>,
last_unread_count: &Arc<Mutex<u32>>,
) {
crate::log_info!("[CHAT DEBUG] Executando fallback HTTP polling...");
match poll_chat_updates(base_url, token, None).await {
Ok(result) => {
crate::log_info!(
"[CHAT DEBUG] Polling OK: hasActive={}, totalUnread={}",
result.has_active_sessions, result.total_unread
);
process_chat_update(
base_url,
token,
@ -698,7 +800,7 @@ async fn poll_and_process_chat_update(
.await;
}
Err(err) => {
crate::log_warn!("Chat fallback poll falhou: {err}");
crate::log_warn!("[CHAT DEBUG] Fallback poll FALHOU: {err}");
}
}
}
@ -712,10 +814,18 @@ async fn process_chat_update(
has_active_sessions: bool,
total_unread: u32,
) {
crate::log_info!(
"[CHAT DEBUG] process_chat_update: hasActive={}, totalUnread={}",
has_active_sessions, total_unread
);
// Buscar sessoes completas para ter dados corretos
let mut current_sessions = if has_active_sessions {
fetch_sessions(base_url, token).await.unwrap_or_default()
let sessions = fetch_sessions(base_url, token).await.unwrap_or_default();
crate::log_info!("[CHAT DEBUG] Buscou {} sessoes ativas", sessions.len());
sessions
} else {
crate::log_info!("[CHAT DEBUG] Sem sessoes ativas");
Vec::new()
};
@ -776,14 +886,58 @@ async fn process_chat_update(
}
}
// Atualizar cache de sessoes
*last_sessions.lock() = current_sessions.clone();
// =========================================================================
// DETECCAO ROBUSTA DE NOVAS MENSAGENS
// Usa DUAS estrategias: timestamp E contador (belt and suspenders)
// =========================================================================
// Verificar mensagens nao lidas
let prev_unread = *last_unread_count.lock();
let new_messages = total_unread > prev_unread;
// Estrategia 1: Detectar por lastActivityAt de cada sessao
// Se alguma sessao teve atividade mais recente E tem mensagens nao lidas -> nova mensagem
let mut detected_by_activity = false;
let mut activity_details = String::new();
for session in &current_sessions {
let prev_activity = prev_sessions
.iter()
.find(|s| s.session_id == session.session_id)
.map(|s| s.last_activity_at)
.unwrap_or(0);
// Se lastActivityAt aumentou E ha mensagens nao lidas -> nova mensagem do agente
if session.last_activity_at > prev_activity && session.unread_count > 0 {
detected_by_activity = true;
activity_details = format!(
"sessao={} activity: {} -> {} unread={}",
session.ticket_id, prev_activity, session.last_activity_at, session.unread_count
);
break;
}
}
// Estrategia 2: Fallback por contador total (metodo original)
let detected_by_count = total_unread > prev_unread;
// Nova mensagem se QUALQUER estrategia detectar
let new_messages = detected_by_activity || detected_by_count;
// Log detalhado para diagnostico
crate::log_info!(
"[CHAT] Deteccao: by_activity={} by_count={} (prev={} curr={}) resultado={}",
detected_by_activity, detected_by_count, prev_unread, total_unread, new_messages
);
if detected_by_activity {
crate::log_info!("[CHAT] Detectado por atividade: {}", activity_details);
}
// Atualizar caches APOS deteccao (importante: manter ordem)
*last_sessions.lock() = current_sessions.clone();
*last_unread_count.lock() = total_unread;
// Persistir estado para sobreviver a restarts
save_chat_state(total_unread, &current_sessions);
// Sempre emitir unread-update
let _ = app.emit(
"raven://chat/unread-update",
@ -795,9 +949,17 @@ async fn process_chat_update(
// Notificar novas mensagens - mostrar chat minimizado com badge
if new_messages && total_unread > 0 {
let new_count = total_unread - prev_unread;
let new_count = if total_unread > prev_unread {
total_unread - prev_unread
} else {
1 // Se detectou por activity mas contador nao mudou, assumir 1 nova
};
crate::log_info!("Chat: {} novas mensagens (total={})", new_count, total_unread);
crate::log_info!(
"[CHAT] NOVAS MENSAGENS! count={}, total={}, metodo={}",
new_count, total_unread,
if detected_by_activity { "activity" } else { "count" }
);
let _ = app.emit(
"raven://chat/new-message",
@ -885,6 +1047,16 @@ async fn process_chat_update(
.title(notification_title)
.body(&notification_body)
.show();
} else {
// Log para debug quando NAO ha novas mensagens
if total_unread == 0 {
crate::log_info!("[CHAT DEBUG] Sem mensagens nao lidas (total=0)");
} else if !new_messages {
crate::log_info!(
"[CHAT DEBUG] Sem novas mensagens (prev={} >= total={})",
prev_unread, total_unread
);
}
}
}