const express = require('express'); const db = require('../db'); const { buildSystemPrompt } = require('../systemPromptBuilder'); const { streamCompletion } = require('../lib/llm'); const { embedQuery, topK } = require('../lib/rag'); const { broadcastBuddy } = require('../lib/broadcast'); const router = express.Router(); // POST /api/chat/stream — SSE streaming endpoint router.post('/stream', async (req, res) => { const { conversation_id, message, pdf_ids = [], attachment_texts = [] } = req.body; if (!conversation_id || !message) { return res.status(400).json({ error: 'conversation_id and message are required' }); } const convId = parseInt(conversation_id, 10); if (Number.isNaN(convId)) { return res.status(400).json({ error: 'Invalid conversation_id' }); } // Set SSE headers immediately res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); res.setHeader('X-Accel-Buffering', 'no'); res.flushHeaders(); const sendEvent = (obj) => { res.write(`data: ${JSON.stringify(obj)}\n\n`); }; try { // 1. Fetch conversation + model + progress + PDF contents const conv = db.prepare('SELECT * FROM conversations WHERE id = ?').get(convId); if (!conv) { sendEvent({ error: 'Conversation not found' }); res.end(); return; } let model = null; if (conv.model_id) { model = db.prepare('SELECT * FROM models WHERE id = ?').get(conv.model_id); } if (!model) { model = db.prepare('SELECT * FROM models WHERE is_default_main = 1 LIMIT 1').get(); } if (!model) { sendEvent({ error: 'No model configured' }); res.end(); return; } const progressRows = db.prepare('SELECT * FROM progress').all(); let pdfContents = []; if (pdf_ids.length > 0) { const validIds = pdf_ids.map(id => parseInt(id, 10)).filter(id => !Number.isNaN(id) && id > 0); if (validIds.length > 0) { const placeholders = validIds.map(() => '?').join(','); pdfContents = db.prepare(`SELECT * FROM pdfs WHERE id IN (${placeholders})`).all(...validIds); } } // 2. RAG: embed user message and fetch top-k chunks let ragChunks = []; let difficulty = 'normal'; try { // First-topic match — picks first progress row whose topic appears in the message. // Multi-topic messages will match only the first topic found. const activeProgress = progressRows.find((r) => r.topic && message.toLowerCase().includes(r.topic.toLowerCase())); difficulty = activeProgress?.difficulty_level || 'normal'; } catch (err) { console.error('[chat] difficulty detection error:', err.message); difficulty = 'normal'; } if (pdf_ids.length > 0) { try { const queryVec = await embedQuery(message); const validPdfIds = pdf_ids.map(id => parseInt(id, 10)).filter(id => !Number.isNaN(id) && id > 0); if (validPdfIds.length > 0) { ragChunks = await topK(queryVec, validPdfIds, 3); } } catch (ragErr) { console.warn('[chat] RAG failed:', ragErr.message); } } // 2b. Build system prompt const systemPrompt = buildSystemPrompt(conv, progressRows, pdfContents, attachment_texts, ragChunks, difficulty); // 3. Load existing messages, removing consecutive duplicates const rawMessages = db.prepare( 'SELECT id, role, content FROM messages WHERE conversation_id = ? ORDER BY id' ).all(convId); // Fix: only remove the VERY LAST message if it's a user message matching the current input const lastMsg = rawMessages[rawMessages.length - 1]; if (lastMsg && lastMsg.role === 'user' && lastMsg.content === message) { rawMessages.pop(); db.prepare('DELETE FROM messages WHERE id = ?').run(lastMsg.id); } // Filter: skip duplicate consecutive users (keep only the last in sequence) const existingMessages = []; for (let i = 0; i < rawMessages.length; i++) { const curr = rawMessages[i]; if (curr.role === 'user' && i + 1 < rawMessages.length && rawMessages[i + 1].role === 'user') { db.prepare('DELETE FROM messages WHERE id = ?').run(curr.id); continue; } existingMessages.push({ role: curr.role, content: curr.content }); } const messages = [ ...existingMessages.filter(m => m.role === 'user' || m.role === 'assistant'), { role: 'user', content: message }, ]; // Save user message BEFORE streaming so it persists even if server crashes db.prepare('INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)') .run(convId, 'user', message); // 5. Stream via llm.streamCompletion() let assistantText = ''; let errorOccurred = false; for await (const chunk of streamCompletion(model, messages, systemPrompt)) { if (chunk.error) { sendEvent({ error: chunk.error }); errorOccurred = true; break; } if (chunk.token) { assistantText += chunk.token; sendEvent({ token: chunk.token }); } if (chunk.done) { assistantText = chunk.fullText; } } if (errorOccurred) { sendEvent({ done: true, full_text: '' }); res.end(); return; } // 6. Parse exercise_logged JSON from response const exerciseLogs = []; const rawMatches = []; const fenceRegex = /```json\s*([\s\S]*?)\s*```/g; const fenceMatches = [...assistantText.matchAll(fenceRegex)]; for (const fenceMatch of fenceMatches) { try { const parsed = JSON.parse(fenceMatch[1]); if (parsed && parsed.exercise_logged) { const entries = Array.isArray(parsed.exercise_logged) ? parsed.exercise_logged : [parsed.exercise_logged]; for (const entry of entries) { if (entry && entry.topic) exerciseLogs.push(entry); } rawMatches.push(fenceMatch[0]); } } catch (e) { console.error('[chat] exercise JSON parse error:', e.message); } } // Alternative: inline JSON without fence const inlineRegex = /\{[^{}]*"exercise_logged"[^{}]*\}/g; const inlineMatches = [...assistantText.matchAll(inlineRegex)]; for (const inlineMatch of inlineMatches) { try { const parsed = JSON.parse(inlineMatch[0]); if (parsed && parsed.exercise_logged) { const entries = Array.isArray(parsed.exercise_logged) ? parsed.exercise_logged : [parsed.exercise_logged]; for (const entry of entries) { if (entry && entry.topic) exerciseLogs.push(entry); } rawMatches.push(inlineMatch[0]); } } catch (e) { // silent } } // Deduplicate exercises by topic+correct combination const seen = new Set(); const dedupedLogs = []; for (const entry of exerciseLogs) { const key = `${entry.topic}|${entry.correct}`; if (!seen.has(key)) { seen.add(key); dedupedLogs.push(entry); } } // 7. Upsert progress table for each exercise + track streaks let lastSuggestedTopic = null; let difficultyChanged = false; let newDifficulty = difficulty; let newGlobalStreak = 0; for (const exerciseLogged of dedupedLogs) { const topic = exerciseLogged.topic; const correct = exerciseLogged.correct === true ? 1 : 0; const isWrong = correct === 0; const existing = db.prepare('SELECT * FROM progress WHERE topic = ?').get(topic); if (existing) { let newWrongStreak = isWrong ? (existing.wrong_streak || 0) + 1 : 0; newGlobalStreak = isWrong ? (existing.global_wrong_streak || 0) + 1 : Math.max(0, (existing.global_wrong_streak || 0) - 1); let newDiff = existing.difficulty_level || 'normal'; // Difficulty adjustment based on global streak if (newGlobalStreak >= 3 && newDiff !== 'easy') { newDiff = 'easy'; difficultyChanged = true; } else if (newGlobalStreak === 0 && existing.exercises_done > 0 && (existing.exercises_correct / existing.exercises_done) >= 0.8 && newDiff !== 'hard') { newDiff = 'hard'; difficultyChanged = true; } else if (newGlobalStreak >= 1 && newGlobalStreak < 3 && newDiff === 'easy') { newDiff = 'normal'; difficultyChanged = true; } newDifficulty = newDiff; db.prepare(` UPDATE progress SET exercises_done = exercises_done + 1, exercises_correct = exercises_correct + ?, last_session = datetime('now'), notes = ?, wrong_streak = ?, global_wrong_streak = ?, difficulty_level = ? WHERE topic = ? `).run(correct, existing.notes, newWrongStreak, newGlobalStreak, newDiff, topic); // Auto-fork suggest after 2 consecutive wrong answers on same topic if (isWrong && newWrongStreak >= 2) { lastSuggestedTopic = topic; } } else { let newWrongStreak = isWrong ? 1 : 0; newGlobalStreak = isWrong ? 1 : 0; let newDiff = isWrong ? 'normal' : 'normal'; db.prepare(` INSERT INTO progress (topic, exercises_done, exercises_correct, last_session, notes, wrong_streak, global_wrong_streak, difficulty_level) VALUES (?, 1, ?, datetime('now'), '[]', ?, ?, ?) `).run(topic, correct, newWrongStreak, newGlobalStreak, newDiff); if (isWrong) { lastSuggestedTopic = topic; } } } // Also detect wrong answers from response text heuristics (fallback when no exercise_logged) const wrongHeuristic = /incorrect|incorrecta|no es correcto|mal|error/i.test(assistantText); if (wrongHeuristic && exerciseLogs.length === 0) { // Topic extracted from first sentence (fragile but functional enough for heuristic fallback) const topic = message.split(/[.!?\n]/)[0].slice(0, 50); const existing = db.prepare('SELECT * FROM progress WHERE topic = ?').get(topic); if (existing) { const newWrongStreak = (existing.wrong_streak || 0) + 1; const newGlobalStreak = (existing.global_wrong_streak || 0) + 1; db.prepare(` UPDATE progress SET wrong_streak = ?, global_wrong_streak = ?, exercises_done = exercises_done + 1 WHERE topic = ? `).run(newWrongStreak, newGlobalStreak, topic); if (newWrongStreak >= 2) { lastSuggestedTopic = topic; } } } // Strip JSON blocks from text before saving let cleanText = assistantText; for (const raw of rawMatches) { cleanText = cleanText.replace(raw, ''); } cleanText = cleanText.trim(); // Save assistant response const msgInfo = db.prepare('INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)') .run(convId, 'assistant', cleanText); const assistantMsgId = msgInfo.lastInsertRowid; // Update conversation updated_at db.prepare("UPDATE conversations SET updated_at = datetime('now') WHERE id = ?").run(convId); // Emit SSE events for streak/difficulty if (lastSuggestedTopic) { sendEvent({ auto_fork_suggest: { topic: lastSuggestedTopic, parent_id: convId, wrong_streak: 2 } }); } if (difficultyChanged) { sendEvent({ difficulty_changed: { level: newDifficulty, global_wrong_streak: newGlobalStreak } }); } // Broadcast buddy message if conversation has buddy_meta if (conv.buddy_meta) { try { broadcastBuddy({ type: 'buddy_msg', conv_id: convId, msg_id: assistantMsgId }); } catch (e) { // silent } } sendEvent({ done: true, full_text: cleanText }); res.end(); } catch (err) { console.error('[chat] stream error:', err.message); sendEvent({ error: err.message }); res.end(); } }); module.exports = router;