Initial commit: StudyOS platform
This commit is contained in:
213
server/routes/chat.js
Normal file
213
server/routes/chat.js
Normal file
@@ -0,0 +1,213 @@
|
||||
const express = require('express');
|
||||
const db = require('../db');
|
||||
const { buildSystemPrompt } = require('../systemPromptBuilder');
|
||||
const { streamCompletion } = require('../lib/llm');
|
||||
const router = express.Router();
|
||||
|
||||
// POST /api/chat/stream — SSE streaming endpoint
|
||||
router.post('/stream', async (req, res) => {
|
||||
const { conversation_id, message, pdf_ids = [], attachment_texts = [] } = req.body;
|
||||
|
||||
if (!conversation_id || !message) {
|
||||
return res.status(400).json({ error: 'conversation_id and message are required' });
|
||||
}
|
||||
|
||||
const convId = parseInt(conversation_id, 10);
|
||||
if (Number.isNaN(convId)) {
|
||||
return res.status(400).json({ error: 'Invalid conversation_id' });
|
||||
}
|
||||
|
||||
// Set SSE headers immediately
|
||||
res.setHeader('Content-Type', 'text/event-stream');
|
||||
res.setHeader('Cache-Control', 'no-cache');
|
||||
res.setHeader('Connection', 'keep-alive');
|
||||
res.setHeader('X-Accel-Buffering', 'no');
|
||||
res.flushHeaders();
|
||||
|
||||
const sendEvent = (obj) => {
|
||||
res.write(`data: ${JSON.stringify(obj)}\n\n`);
|
||||
};
|
||||
|
||||
try {
|
||||
// 1. Fetch conversation + model + progress + PDF contents
|
||||
const conv = db.prepare('SELECT * FROM conversations WHERE id = ?').get(convId);
|
||||
if (!conv) {
|
||||
sendEvent({ error: 'Conversation not found' });
|
||||
res.end();
|
||||
return;
|
||||
}
|
||||
|
||||
let model = null;
|
||||
if (conv.model_id) {
|
||||
model = db.prepare('SELECT * FROM models WHERE id = ?').get(conv.model_id);
|
||||
}
|
||||
if (!model) {
|
||||
model = db.prepare('SELECT * FROM models WHERE is_default_main = 1 LIMIT 1').get();
|
||||
}
|
||||
if (!model) {
|
||||
sendEvent({ error: 'No model configured' });
|
||||
res.end();
|
||||
return;
|
||||
}
|
||||
|
||||
const progressRows = db.prepare('SELECT * FROM progress').all();
|
||||
|
||||
let pdfContents = [];
|
||||
if (pdf_ids.length > 0) {
|
||||
const validIds = pdf_ids.map(id => parseInt(id, 10)).filter(id => !isNaN(id) && id > 0);
|
||||
if (validIds.length > 0) {
|
||||
const placeholders = validIds.map(() => '?').join(',');
|
||||
pdfContents = db.prepare(`SELECT * FROM pdfs WHERE id IN (${placeholders})`).all(...validIds);
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Build system prompt
|
||||
const systemPrompt = buildSystemPrompt(conv, progressRows, pdfContents, attachment_texts);
|
||||
|
||||
// 3. Load existing messages, removing consecutive duplicates
|
||||
const rawMessages = db.prepare(
|
||||
'SELECT id, role, content FROM messages WHERE conversation_id = ? ORDER BY id'
|
||||
).all(convId);
|
||||
|
||||
// Fix: remove trailing user messages from failed streams (no assistant after)
|
||||
let delCount = 0;
|
||||
while (rawMessages.length > 0 && rawMessages[rawMessages.length - 1].role === 'user') {
|
||||
const last = rawMessages.pop();
|
||||
db.prepare('DELETE FROM messages WHERE id = ?').run(last.id);
|
||||
delCount++;
|
||||
}
|
||||
|
||||
// Filter: skip duplicate consecutive users (keep only the last in sequence)
|
||||
const existingMessages = [];
|
||||
for (let i = 0; i < rawMessages.length; i++) {
|
||||
const curr = rawMessages[i];
|
||||
if (curr.role === 'user' && i + 1 < rawMessages.length && rawMessages[i + 1].role === 'user') {
|
||||
db.prepare('DELETE FROM messages WHERE id = ?').run(curr.id);
|
||||
continue;
|
||||
}
|
||||
existingMessages.push({ role: curr.role, content: curr.content });
|
||||
}
|
||||
|
||||
const messages = [
|
||||
...existingMessages.filter(m => m.role === 'user' || m.role === 'assistant'),
|
||||
{ role: 'user', content: message },
|
||||
];
|
||||
|
||||
// 5. Stream via llm.streamCompletion()
|
||||
let assistantText = '';
|
||||
let errorOccurred = false;
|
||||
|
||||
for await (const chunk of streamCompletion(model, messages, systemPrompt)) {
|
||||
if (chunk.error) {
|
||||
sendEvent({ error: chunk.error });
|
||||
errorOccurred = true;
|
||||
break;
|
||||
}
|
||||
if (chunk.token) {
|
||||
assistantText += chunk.token;
|
||||
sendEvent({ token: chunk.token });
|
||||
}
|
||||
if (chunk.done) {
|
||||
assistantText = chunk.fullText;
|
||||
}
|
||||
}
|
||||
|
||||
if (errorOccurred) {
|
||||
sendEvent({ done: true, full_text: '' });
|
||||
res.end();
|
||||
return;
|
||||
}
|
||||
|
||||
// 6. Parse exercise_logged JSON from response
|
||||
const exerciseLogs = [];
|
||||
const rawMatches = [];
|
||||
const fenceRegex = /```json\s*([\s\S]*?)\s*```/g;
|
||||
const fenceMatches = [...assistantText.matchAll(fenceRegex)];
|
||||
for (const fenceMatch of fenceMatches) {
|
||||
try {
|
||||
const parsed = JSON.parse(fenceMatch[1]);
|
||||
if (parsed && parsed.exercise_logged) {
|
||||
const entries = Array.isArray(parsed.exercise_logged)
|
||||
? parsed.exercise_logged
|
||||
: [parsed.exercise_logged];
|
||||
for (const entry of entries) {
|
||||
if (entry && entry.topic) exerciseLogs.push(entry);
|
||||
}
|
||||
rawMatches.push(fenceMatch[0]);
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('[chat] exercise JSON parse error:', e.message);
|
||||
}
|
||||
}
|
||||
|
||||
// Alternative: inline JSON without fence
|
||||
const inlineRegex = /\{[^{}]*"exercise_logged"[^{}]*\}/g;
|
||||
const inlineMatches = [...assistantText.matchAll(inlineRegex)];
|
||||
for (const inlineMatch of inlineMatches) {
|
||||
try {
|
||||
const parsed = JSON.parse(inlineMatch[0]);
|
||||
if (parsed && parsed.exercise_logged) {
|
||||
const entries = Array.isArray(parsed.exercise_logged)
|
||||
? parsed.exercise_logged
|
||||
: [parsed.exercise_logged];
|
||||
for (const entry of entries) {
|
||||
if (entry && entry.topic) exerciseLogs.push(entry);
|
||||
}
|
||||
rawMatches.push(inlineMatch[0]);
|
||||
}
|
||||
} catch (e) {
|
||||
// silent
|
||||
}
|
||||
}
|
||||
|
||||
// 7. Upsert progress table for each exercise
|
||||
for (const exerciseLogged of exerciseLogs) {
|
||||
const topic = exerciseLogged.topic;
|
||||
const correct = exerciseLogged.correct === true ? 1 : 0;
|
||||
|
||||
const existing = db.prepare('SELECT * FROM progress WHERE topic = ?').get(topic);
|
||||
if (existing) {
|
||||
db.prepare(`
|
||||
UPDATE progress SET
|
||||
exercises_done = exercises_done + 1,
|
||||
exercises_correct = exercises_correct + ?,
|
||||
last_session = datetime('now'),
|
||||
notes = ?
|
||||
WHERE topic = ?
|
||||
`).run(correct, existing.notes, topic);
|
||||
} else {
|
||||
db.prepare(`
|
||||
INSERT INTO progress (topic, exercises_done, exercises_correct, last_session, notes)
|
||||
VALUES (?, 1, ?, datetime('now'), '[]')
|
||||
`).run(topic, correct);
|
||||
}
|
||||
}
|
||||
|
||||
// Strip JSON blocks from text before saving
|
||||
let cleanText = assistantText;
|
||||
for (const raw of rawMatches) {
|
||||
cleanText = cleanText.replace(raw, '');
|
||||
}
|
||||
cleanText = cleanText.trim();
|
||||
|
||||
// Save user message now that streaming succeeded
|
||||
db.prepare('INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)')
|
||||
.run(convId, 'user', message);
|
||||
|
||||
// Save assistant response
|
||||
db.prepare('INSERT INTO messages (conversation_id, role, content) VALUES (?, ?, ?)')
|
||||
.run(convId, 'assistant', cleanText);
|
||||
|
||||
// Update conversation updated_at
|
||||
db.prepare("UPDATE conversations SET updated_at = datetime('now') WHERE id = ?").run(convId);
|
||||
|
||||
sendEvent({ done: true, full_text: cleanText });
|
||||
res.end();
|
||||
} catch (err) {
|
||||
console.error('[chat] stream error:', err.message);
|
||||
sendEvent({ error: err.message });
|
||||
res.end();
|
||||
}
|
||||
});
|
||||
|
||||
module.exports = router;
|
||||
Reference in New Issue
Block a user