Use Convex WS client in desktop chat runtime
This commit is contained in:
parent
988bf25010
commit
1d3580b187
4 changed files with 510 additions and 340 deletions
|
|
@ -4,12 +4,13 @@
|
|||
//! e clientes (Raven desktop). Usa Server-Sent Events (SSE) como metodo
|
||||
//! primario para atualizacoes em tempo real, com fallback para HTTP polling.
|
||||
|
||||
use convex::{ConvexClient, FunctionResult, Value};
|
||||
use futures_util::StreamExt;
|
||||
use once_cell::sync::Lazy;
|
||||
use parking_lot::Mutex;
|
||||
use reqwest::Client;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
|
@ -58,6 +59,7 @@ pub struct ChatAttachment {
|
|||
pub mime_type: Option<String>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ChatPollResponse {
|
||||
|
|
@ -66,6 +68,7 @@ pub struct ChatPollResponse {
|
|||
pub total_unread: u32,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ChatSessionSummary {
|
||||
|
|
@ -112,6 +115,7 @@ static CHAT_CLIENT: Lazy<Client> = Lazy::new(|| {
|
|||
// API FUNCTIONS
|
||||
// ============================================================================
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn poll_chat_updates(
|
||||
base_url: &str,
|
||||
token: &str,
|
||||
|
|
@ -399,59 +403,16 @@ pub async fn upload_file(
|
|||
Ok(data.storage_id)
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// SSE TYPES
|
||||
// ============================================================================
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct SseUpdateEvent {
|
||||
has_active_sessions: bool,
|
||||
sessions: Vec<ChatSessionSummary>,
|
||||
total_unread: u32,
|
||||
ts: i64,
|
||||
}
|
||||
|
||||
/// Parser de eventos SSE
|
||||
struct SseEvent {
|
||||
event: String,
|
||||
data: String,
|
||||
}
|
||||
|
||||
fn parse_sse_line(buffer: &mut String, line: &str) -> Option<SseEvent> {
|
||||
if line.starts_with("event:") {
|
||||
buffer.clear();
|
||||
let event_type = line.trim_start_matches("event:").trim();
|
||||
buffer.push_str(event_type);
|
||||
buffer.push('\0'); // Separador interno
|
||||
None
|
||||
} else if line.starts_with("data:") {
|
||||
let data = line.trim_start_matches("data:").trim();
|
||||
let parts: Vec<&str> = buffer.split('\0').collect();
|
||||
let event_type = if parts.len() >= 1 && !parts[0].is_empty() {
|
||||
parts[0].to_string()
|
||||
} else {
|
||||
"message".to_string()
|
||||
};
|
||||
Some(SseEvent {
|
||||
event: event_type,
|
||||
data: data.to_string(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// CHAT RUNTIME
|
||||
// ============================================================================
|
||||
|
||||
struct ChatPollerHandle {
|
||||
struct ChatRealtimeHandle {
|
||||
stop_flag: Arc<AtomicBool>,
|
||||
join_handle: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl ChatPollerHandle {
|
||||
impl ChatRealtimeHandle {
|
||||
fn stop(self) {
|
||||
self.stop_flag.store(true, Ordering::Relaxed);
|
||||
self.join_handle.abort();
|
||||
|
|
@ -460,10 +421,10 @@ impl ChatPollerHandle {
|
|||
|
||||
#[derive(Default, Clone)]
|
||||
pub struct ChatRuntime {
|
||||
inner: Arc<Mutex<Option<ChatPollerHandle>>>,
|
||||
inner: Arc<Mutex<Option<ChatRealtimeHandle>>>,
|
||||
last_sessions: Arc<Mutex<Vec<ChatSession>>>,
|
||||
last_unread_count: Arc<Mutex<u32>>,
|
||||
is_using_sse: Arc<AtomicBool>,
|
||||
is_connected: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl ChatRuntime {
|
||||
|
|
@ -472,20 +433,20 @@ impl ChatRuntime {
|
|||
inner: Arc::new(Mutex::new(None)),
|
||||
last_sessions: Arc::new(Mutex::new(Vec::new())),
|
||||
last_unread_count: Arc::new(Mutex::new(0)),
|
||||
is_using_sse: Arc::new(AtomicBool::new(false)),
|
||||
is_connected: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Retorna true se esta usando SSE, false se usando polling HTTP
|
||||
/// Retorna true se conexao WS Convex esta ativa
|
||||
pub fn is_using_sse(&self) -> bool {
|
||||
self.is_using_sse.load(Ordering::Relaxed)
|
||||
self.is_connected.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Inicia o sistema de atualizacoes de chat.
|
||||
/// Tenta SSE primeiro, com fallback automatico para HTTP polling.
|
||||
/// Inicia o sistema de atualizacoes de chat via WebSocket do Convex
|
||||
pub fn start_polling(
|
||||
&self,
|
||||
base_url: String,
|
||||
convex_url: String,
|
||||
token: String,
|
||||
app: tauri::AppHandle,
|
||||
) -> Result<(), String> {
|
||||
|
|
@ -493,6 +454,10 @@ impl ChatRuntime {
|
|||
if sanitized_base.is_empty() {
|
||||
return Err("URL base invalida".to_string());
|
||||
}
|
||||
let sanitized_convex = convex_url.trim().trim_end_matches('/').to_string();
|
||||
if sanitized_convex.is_empty() {
|
||||
return Err("URL do Convex inválida".to_string());
|
||||
}
|
||||
|
||||
// Para polling/SSE existente
|
||||
{
|
||||
|
|
@ -505,74 +470,90 @@ impl ChatRuntime {
|
|||
let stop_flag = Arc::new(AtomicBool::new(false));
|
||||
let stop_clone = stop_flag.clone();
|
||||
let base_clone = sanitized_base.clone();
|
||||
let convex_clone = sanitized_convex.clone();
|
||||
let token_clone = token.clone();
|
||||
let last_sessions = self.last_sessions.clone();
|
||||
let last_unread_count = self.last_unread_count.clone();
|
||||
let is_using_sse = self.is_using_sse.clone();
|
||||
let is_connected = self.is_connected.clone();
|
||||
|
||||
let join_handle = tauri::async_runtime::spawn(async move {
|
||||
crate::log_info!("Chat iniciando (tentando SSE primeiro)");
|
||||
crate::log_info!("Chat iniciando via Convex WebSocket");
|
||||
|
||||
// Loop principal com SSE + fallback para polling
|
||||
loop {
|
||||
// Verificar se deve parar
|
||||
let client_result = ConvexClient::new(&convex_clone).await;
|
||||
let mut client = match client_result {
|
||||
Ok(c) => c,
|
||||
Err(err) => {
|
||||
crate::log_warn!("Falha ao criar cliente Convex: {err:?}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut args = BTreeMap::new();
|
||||
args.insert("machineToken".to_string(), token_clone.clone().into());
|
||||
|
||||
let subscribe_result = client.subscribe("liveChat:checkMachineUpdates", args).await;
|
||||
let mut subscription = match subscribe_result {
|
||||
Ok(sub) => {
|
||||
is_connected.store(true, Ordering::Relaxed);
|
||||
sub
|
||||
}
|
||||
Err(err) => {
|
||||
crate::log_warn!("Falha ao assinar liveChat:checkMachineUpdates: {err:?}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
while let Some(next) = subscription.next().await {
|
||||
if stop_clone.load(Ordering::Relaxed) {
|
||||
crate::log_info!("Chat encerrado");
|
||||
break;
|
||||
}
|
||||
match next {
|
||||
FunctionResult::Value(Value::Object(obj)) => {
|
||||
let has_active = obj
|
||||
.get("hasActiveSessions")
|
||||
.and_then(|v| match v {
|
||||
Value::Boolean(b) => Some(*b),
|
||||
_ => None,
|
||||
})
|
||||
.unwrap_or(false);
|
||||
let total_unread = obj
|
||||
.get("totalUnread")
|
||||
.and_then(|v| match v {
|
||||
Value::Int64(i) => Some(*i as u32),
|
||||
Value::Float64(f) => Some(*f as u32),
|
||||
_ => None,
|
||||
})
|
||||
.unwrap_or(0);
|
||||
|
||||
// Tentar SSE primeiro
|
||||
let sse_result = run_sse_loop(
|
||||
&base_clone,
|
||||
&token_clone,
|
||||
&app,
|
||||
&last_sessions,
|
||||
&last_unread_count,
|
||||
&is_using_sse,
|
||||
&stop_clone,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Verificar se deve parar
|
||||
if stop_clone.load(Ordering::Relaxed) {
|
||||
crate::log_info!("Chat encerrado");
|
||||
break;
|
||||
}
|
||||
|
||||
match sse_result {
|
||||
Ok(()) => {
|
||||
// SSE encerrado normalmente (stop signal)
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
crate::log_warn!("SSE falhou: {e}. Usando polling HTTP...");
|
||||
is_using_sse.store(false, Ordering::Relaxed);
|
||||
|
||||
// Executar polling HTTP por 5 minutos, depois tentar SSE novamente
|
||||
let poll_duration = Duration::from_secs(300); // 5 minutos
|
||||
let poll_result = run_polling_loop(
|
||||
process_chat_update(
|
||||
&base_clone,
|
||||
&token_clone,
|
||||
&app,
|
||||
&last_sessions,
|
||||
&last_unread_count,
|
||||
&stop_clone,
|
||||
poll_duration,
|
||||
has_active,
|
||||
total_unread,
|
||||
)
|
||||
.await;
|
||||
|
||||
if poll_result.is_err() || stop_clone.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
crate::log_info!("Tentando reconectar SSE...");
|
||||
}
|
||||
FunctionResult::ConvexError(err) => {
|
||||
crate::log_warn!("Convex error em checkMachineUpdates: {err:?}");
|
||||
}
|
||||
FunctionResult::ErrorMessage(msg) => {
|
||||
crate::log_warn!("Erro em checkMachineUpdates: {msg}");
|
||||
}
|
||||
FunctionResult::Value(other) => {
|
||||
crate::log_warn!("Payload inesperado em checkMachineUpdates: {other:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
is_connected.store(false, Ordering::Relaxed);
|
||||
crate::log_info!("Chat encerrado (Convex WebSocket finalizado)");
|
||||
});
|
||||
|
||||
let mut guard = self.inner.lock();
|
||||
*guard = Some(ChatPollerHandle {
|
||||
*guard = Some(ChatRealtimeHandle {
|
||||
stop_flag,
|
||||
join_handle,
|
||||
});
|
||||
|
|
@ -585,7 +566,7 @@ impl ChatRuntime {
|
|||
if let Some(handle) = guard.take() {
|
||||
handle.stop();
|
||||
}
|
||||
self.is_using_sse.store(false, Ordering::Relaxed);
|
||||
self.is_connected.store(false, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn get_sessions(&self) -> Vec<ChatSession> {
|
||||
|
|
@ -593,232 +574,6 @@ impl ChatRuntime {
|
|||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// SSE LOOP
|
||||
// ============================================================================
|
||||
|
||||
/// Cliente HTTP para SSE com timeout mais longo (conexao persistente)
|
||||
static SSE_CLIENT: Lazy<Client> = Lazy::new(|| {
|
||||
Client::builder()
|
||||
.user_agent("raven-chat-sse/1.0")
|
||||
.timeout(Duration::from_secs(120)) // Timeout longo para SSE
|
||||
.connect_timeout(Duration::from_secs(15))
|
||||
.use_rustls_tls()
|
||||
.build()
|
||||
.expect("failed to build SSE http client")
|
||||
});
|
||||
|
||||
async fn run_sse_loop(
|
||||
base_url: &str,
|
||||
token: &str,
|
||||
app: &tauri::AppHandle,
|
||||
last_sessions: &Arc<Mutex<Vec<ChatSession>>>,
|
||||
last_unread_count: &Arc<Mutex<u32>>,
|
||||
is_using_sse: &Arc<AtomicBool>,
|
||||
stop_flag: &Arc<AtomicBool>,
|
||||
) -> Result<(), String> {
|
||||
let sse_url = format!("{}/api/machines/chat/stream?token={}", base_url, token);
|
||||
crate::log_info!("Conectando SSE: {}", sse_url);
|
||||
|
||||
// Iniciar request SSE
|
||||
let response = SSE_CLIENT
|
||||
.get(&sse_url)
|
||||
.header("Accept", "text/event-stream")
|
||||
.header("Cache-Control", "no-cache")
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("Falha ao conectar SSE: {e}"))?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
let status = response.status();
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
return Err(format!("SSE falhou: status={}, body={}", status, body));
|
||||
}
|
||||
|
||||
is_using_sse.store(true, Ordering::Relaxed);
|
||||
crate::log_info!("SSE conectado com sucesso");
|
||||
|
||||
// Stream de bytes
|
||||
let mut stream = response.bytes_stream();
|
||||
let mut buffer = String::new();
|
||||
let mut line_buffer = String::new();
|
||||
|
||||
// Timeout para detectar conexao morta (60s sem dados = reconectar)
|
||||
let mut last_data_time = std::time::Instant::now();
|
||||
let max_silence = Duration::from_secs(60);
|
||||
|
||||
loop {
|
||||
if stop_flag.load(Ordering::Relaxed) {
|
||||
crate::log_info!("SSE encerrado por stop flag");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Verificar timeout de silencio
|
||||
if last_data_time.elapsed() > max_silence {
|
||||
crate::log_warn!("SSE: timeout de silencio ({}s sem dados)", max_silence.as_secs());
|
||||
return Err("SSE timeout - sem dados".to_string());
|
||||
}
|
||||
|
||||
// Aguardar proximo chunk com timeout
|
||||
let chunk_result = tokio::time::timeout(
|
||||
Duration::from_secs(35), // Heartbeat do servidor e a cada 30s
|
||||
stream.next()
|
||||
).await;
|
||||
|
||||
match chunk_result {
|
||||
Ok(Some(Ok(bytes))) => {
|
||||
last_data_time = std::time::Instant::now();
|
||||
|
||||
let text = String::from_utf8_lossy(&bytes);
|
||||
line_buffer.push_str(&text);
|
||||
|
||||
// Processar linhas completas
|
||||
while let Some(newline_pos) = line_buffer.find('\n') {
|
||||
let line = line_buffer[..newline_pos].trim_end_matches('\r').to_string();
|
||||
line_buffer = line_buffer[newline_pos + 1..].to_string();
|
||||
|
||||
// Linha vazia = fim do evento
|
||||
if line.is_empty() {
|
||||
buffer.clear();
|
||||
continue;
|
||||
}
|
||||
|
||||
// Parsear evento SSE
|
||||
if let Some(event) = parse_sse_line(&mut buffer, &line) {
|
||||
handle_sse_event(
|
||||
&event,
|
||||
base_url,
|
||||
token,
|
||||
app,
|
||||
last_sessions,
|
||||
last_unread_count,
|
||||
).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Some(Err(e))) => {
|
||||
crate::log_warn!("SSE erro de stream: {e}");
|
||||
return Err(format!("Erro SSE: {e}"));
|
||||
}
|
||||
Ok(None) => {
|
||||
crate::log_info!("SSE: stream encerrado pelo servidor");
|
||||
return Err("SSE encerrado".to_string());
|
||||
}
|
||||
Err(_) => {
|
||||
// Timeout aguardando chunk - verificar se conexao ainda viva
|
||||
if last_data_time.elapsed() > max_silence {
|
||||
return Err("SSE timeout".to_string());
|
||||
}
|
||||
// Caso contrario, continuar aguardando
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_sse_event(
|
||||
event: &SseEvent,
|
||||
base_url: &str,
|
||||
token: &str,
|
||||
app: &tauri::AppHandle,
|
||||
last_sessions: &Arc<Mutex<Vec<ChatSession>>>,
|
||||
last_unread_count: &Arc<Mutex<u32>>,
|
||||
) -> Result<(), String> {
|
||||
match event.event.as_str() {
|
||||
"connected" => {
|
||||
crate::log_info!("SSE: conectado");
|
||||
}
|
||||
"heartbeat" => {
|
||||
// noop - apenas mantem conexao viva
|
||||
}
|
||||
"update" => {
|
||||
let update: SseUpdateEvent = serde_json::from_str(&event.data)
|
||||
.map_err(|e| format!("Payload update inválido: {e}"))?;
|
||||
process_chat_update(
|
||||
base_url,
|
||||
token,
|
||||
app,
|
||||
last_sessions,
|
||||
last_unread_count,
|
||||
update.has_active_sessions,
|
||||
update.total_unread,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
"error" => {
|
||||
let error_data: Value = serde_json::from_str(&event.data).unwrap_or_default();
|
||||
let message = error_data
|
||||
.get("message")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or("Erro SSE");
|
||||
return Err(message.to_string());
|
||||
}
|
||||
_ => {
|
||||
crate::log_info!("SSE: evento desconhecido {}", event.event);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// HTTP POLLING LOOP (FALLBACK)
|
||||
// ============================================================================
|
||||
|
||||
async fn run_polling_loop(
|
||||
base_url: &str,
|
||||
token: &str,
|
||||
app: &tauri::AppHandle,
|
||||
last_sessions: &Arc<Mutex<Vec<ChatSession>>>,
|
||||
last_unread_count: &Arc<Mutex<u32>>,
|
||||
stop_flag: &Arc<AtomicBool>,
|
||||
max_duration: Duration,
|
||||
) -> Result<(), String> {
|
||||
crate::log_info!("Iniciando polling HTTP (fallback)");
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
let poll_interval = Duration::from_secs(1); // 1s para ser mais responsivo
|
||||
let mut last_checked_at: Option<i64> = None;
|
||||
|
||||
loop {
|
||||
// Verificar se deve parar ou se atingiu duracao maxima
|
||||
if stop_flag.load(Ordering::Relaxed) {
|
||||
crate::log_info!("Polling HTTP encerrado por stop flag");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if start.elapsed() >= max_duration {
|
||||
crate::log_info!("Polling HTTP: duracao maxima atingida");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
|
||||
// Verificar novamente apos sleep
|
||||
if stop_flag.load(Ordering::Relaxed) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match poll_chat_updates(base_url, token, last_checked_at).await {
|
||||
Ok(result) => {
|
||||
last_checked_at = Some(chrono::Utc::now().timestamp_millis());
|
||||
|
||||
process_chat_update(
|
||||
base_url,
|
||||
token,
|
||||
app,
|
||||
last_sessions,
|
||||
last_unread_count,
|
||||
result.has_active_sessions,
|
||||
result.total_unread,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(e) => {
|
||||
crate::log_warn!("Falha no polling de chat: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// SHARED UPDATE PROCESSING
|
||||
// ============================================================================
|
||||
|
|
|
|||
|
|
@ -23,6 +23,8 @@ use winreg::enums::*;
|
|||
#[cfg(target_os = "windows")]
|
||||
use winreg::RegKey;
|
||||
|
||||
const DEFAULT_CONVEX_URL: &str = "https://convex.esdrasrenan.com.br";
|
||||
|
||||
// ============================================================================
|
||||
// Sistema de Logging para Agente
|
||||
// ============================================================================
|
||||
|
|
@ -233,9 +235,11 @@ fn start_chat_polling(
|
|||
state: tauri::State<ChatRuntime>,
|
||||
app: tauri::AppHandle,
|
||||
base_url: String,
|
||||
convex_url: Option<String>,
|
||||
token: String,
|
||||
) -> Result<(), String> {
|
||||
state.start_polling(base_url, token, app)
|
||||
let url = convex_url.unwrap_or_else(|| DEFAULT_CONVEX_URL.to_string());
|
||||
state.start_polling(base_url, url, token, app)
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
|
|
@ -667,6 +671,11 @@ async fn try_start_background_agent(
|
|||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("https://tickets.esdrasrenan.com.br");
|
||||
|
||||
let convex_url = config
|
||||
.and_then(|c| c.get("convexUrl"))
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or(DEFAULT_CONVEX_URL);
|
||||
|
||||
let interval = config
|
||||
.and_then(|c| c.get("heartbeatIntervalSec"))
|
||||
.and_then(|v| v.as_u64())
|
||||
|
|
@ -688,14 +697,12 @@ async fn try_start_background_agent(
|
|||
.map_err(|e| format!("Falha ao iniciar heartbeat: {e}"))?;
|
||||
|
||||
// Iniciar sistema de chat (WebSocket + fallback HTTP polling)
|
||||
if let Err(e) = chat_runtime.start_polling(
|
||||
api_base_url.to_string(),
|
||||
token.to_string(),
|
||||
app.clone(),
|
||||
) {
|
||||
if let Err(e) =
|
||||
chat_runtime.start_polling(api_base_url.to_string(), convex_url.to_string(), token.to_string(), app.clone())
|
||||
{
|
||||
log_warn!("Falha ao iniciar chat em background: {e}");
|
||||
} else {
|
||||
log_info!("Chat iniciado com sucesso (WebSocket + fallback polling)");
|
||||
log_info!("Chat iniciado com sucesso (Convex WebSocket)");
|
||||
}
|
||||
|
||||
log_info!("Agente iniciado com sucesso em background");
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue