const express = require('express'); const router = express.Router(); const cozeChatService = require('../services/cozeChatService'); const arkChatService = require('../services/arkChatService'); const ToolExecutor = require('../services/toolExecutor'); const contextKeywordTracker = require('../services/contextKeywordTracker'); const { shouldForceKnowledgeRoute } = require('../services/realtimeDialogRouting'); const { isBrandHarmful, getTextSafeReply } = require('../services/contentSafeGuard'); const db = require('../db'); // 存储文字对话的会话状态(sessionId -> session) const chatSessions = new Map(); function normalizeAssistantText(text) { let result = String(text || '') .replace(/\r/g, ' ') .replace(/\n{2,}/g, '。') .replace(/\n/g, ' ') .replace(/。{2,}/g, '。') .replace(/([!?;,])\1+/g, '$1') .replace(/([。!?;,])\s*([。!?;,])/g, '$2') .replace(/\s+/g, ' ') .trim(); if (isBrandHarmful(result)) { console.warn(`[Chat][SafeGuard] blocked harmful content: ${JSON.stringify(result.slice(0, 200))}`); return getTextSafeReply(); } return result; } async function loadHandoffMessages(sessionId, voiceSubtitles = []) { let voiceMessages = []; try { const dbHistory = await db.getHistoryForLLM(sessionId, 20); if (dbHistory.length > 0) { voiceMessages = dbHistory; console.log(`[Chat] Loaded ${dbHistory.length} messages from DB for session ${sessionId}`); } } catch (e) { console.warn('[DB] getHistoryForLLM failed:', e.message); } if (voiceMessages.length === 0 && voiceSubtitles.length > 0) { const recentSubtitles = voiceSubtitles.slice(-10); for (const sub of recentSubtitles) { voiceMessages.push({ role: sub.role === 'user' ? 'user' : 'assistant', content: sub.text, }); } } return voiceMessages; } function buildDeterministicHandoffSummary(messages = []) { const normalizedMessages = (Array.isArray(messages) ? messages : []) .filter((item) => item && (item.role === 'user' || item.role === 'assistant') && String(item.content || '').trim()) .slice(-8); if (!normalizedMessages.length) { return ''; } const userMessages = normalizedMessages.filter((item) => item.role === 'user'); const currentQuestion = String(userMessages[userMessages.length - 1]?.content || '').trim(); const previousQuestion = String(userMessages[userMessages.length - 2]?.content || '').trim(); const assistantFacts = normalizedMessages .filter((item) => item.role === 'assistant') .slice(-2) .map((item) => String(item.content || '').trim()) .filter(Boolean) .map((item) => item.slice(0, 60)) .join(';'); const parts = []; if (currentQuestion) { parts.push(`当前问题:${currentQuestion}`); } if (previousQuestion && previousQuestion !== currentQuestion) { parts.push(`上一轮关注:${previousQuestion}`); } if (assistantFacts) { parts.push(`已给信息:${assistantFacts}`); } return parts.join(';'); } async function buildChatSessionState(sessionId, voiceSubtitles = [], userId = null) { const voiceMessages = await loadHandoffMessages(sessionId, voiceSubtitles); voiceMessages .filter((item) => item.role === 'user') .slice(-6) .forEach((item) => contextKeywordTracker.updateSession(sessionId, item.content)); const handoffSummary = buildDeterministicHandoffSummary(voiceMessages); return { userId: userId || `user_${sessionId.slice(0, 12)}`, profileUserId: userId || null, conversationId: null, voiceMessages, handoffSummary, handoffSummaryUsed: false, createdAt: Date.now(), lastActiveAt: Date.now(), fromVoice: voiceSubtitles.length > 0 || voiceMessages.length > 0, }; } function buildInitialContextMessages(session) { const summary = String(session?.handoffSummary || '').trim(); const extraMessages = []; if (summary && !session?.handoffSummaryUsed) { extraMessages.push({ role: 'assistant', content: `会话交接摘要:${summary}` }); } if (Array.isArray(session?.voiceMessages) && session.voiceMessages.length > 0) { extraMessages.push(...session.voiceMessages.slice(-6)); } return extraMessages; } async function buildKnowledgeContextMessages(sessionId, session) { const recentMessages = await db.getRecentMessages(sessionId, 20).catch(() => []); const scopedMessages = session?.fromVoice && session?.handoffSummaryUsed ? recentMessages.filter((item) => !/^voice_/i.test(String(item?.source || ''))) : recentMessages; const dbHistory = scopedMessages .filter((item) => item && (item.role === 'user' || item.role === 'assistant')) .map((item) => ({ role: item.role, content: item.content })); const summary = String(session?.handoffSummary || '').trim(); if (!summary || session?.handoffSummaryUsed) { return dbHistory; } return [ { role: 'assistant', content: `会话交接摘要:${summary}` }, ...dbHistory, ]; } function extractKnowledgeReply(result) { if (result && result.results && Array.isArray(result.results)) { return result.results.map((item) => item.content || JSON.stringify(item)).join('\n'); } if (result && result.error) { return result.error; } return typeof result === 'string' ? result : ''; } function buildFastGreetingReply(message) { const text = String(message || '').trim(); if (!/^(喂|你好|您好|嗨|哈喽|hello|hi|在吗|在不在|早上好|中午好|下午好|晚上好|早安|晚安)[,,!。??~~\s]*[啊呀吧呢哦嗯嘛哈的了]*[!。??~~]*$/i.test(text)) { return ''; } return '你好😊!我是大沃智能助手。你可以直接问我一成系统、德国PM产品、招商合作、营养科普等问题,我会尽量快速给你准确回复。'; } async function tryKnowledgeReply(sessionId, session, message) { const text = String(message || '').trim(); if (!text) return null; const context = await buildKnowledgeContextMessages(sessionId, session); if (!shouldForceKnowledgeRoute(text, context)) { return null; } const result = await ToolExecutor.execute('search_knowledge', { query: text, session_id: sessionId, _session: { userId: session?.userId || null, profileUserId: session?.profileUserId || null } }, context); if (!result?.hit) { return null; } const content = normalizeAssistantText(extractKnowledgeReply(result)); if (!content) { return null; } return { content, meta: { route: 'search_knowledge', original_text: text, tool_name: 'search_knowledge', tool_args: { query: text }, source: result?.source || null, original_query: result?.original_query || text, rewritten_query: result?.rewritten_query || null, selected_dataset_ids: result?.selected_dataset_ids || null, selected_kb_routes: result?.selected_kb_routes || null, hit: typeof result?.hit === 'boolean' ? result.hit : null, reason: result?.reason || null, error_type: result?.errorType || null, latency_ms: result?.latency_ms || null, }, }; } /** * POST /api/chat/start * 创建文字对话会话,可选传入语音通话的历史字幕 */ router.post('/start', async (req, res) => { const { sessionId, voiceSubtitles = [], userId = null } = req.body; if (!sessionId) { return res.status(400).json({ success: false, error: 'sessionId is required' }); } if (!cozeChatService.isConfigured()) { return res.status(500).json({ success: false, error: 'Coze 智能体未配置,请设置 COZE_API_TOKEN 和 COZE_BOT_ID' }); } const sessionState = await buildChatSessionState(sessionId, voiceSubtitles, userId); // 更新数据库会话模式为 chat try { await db.createSession(sessionId, userId || `user_${sessionId.slice(0, 12)}`, 'chat'); } catch (e) {} chatSessions.set(sessionId, sessionState); console.log(`[Chat] Session started: ${sessionId}, fromVoice: ${sessionState.fromVoice}, voiceMessages: ${sessionState.voiceMessages.length}, summary: ${sessionState.handoffSummary ? 'yes' : 'no'}`); res.json({ success: true, data: { sessionId, messageCount: sessionState.voiceMessages.length, fromVoice: sessionState.fromVoice, }, }); }); /** * POST /api/chat/send * 发送文字消息并获取 Coze 智能体回复(非流式) */ router.post('/send', async (req, res) => { try { const { sessionId, message, userId = null } = req.body; if (!sessionId || !message) { return res.status(400).json({ success: false, error: 'sessionId and message are required' }); } let session = chatSessions.get(sessionId); // 自动创建会话(如果不存在) if (!session) { session = await buildChatSessionState(sessionId, [], userId); chatSessions.set(sessionId, session); } if (userId) { session.userId = userId; session.profileUserId = userId; } session.lastActiveAt = Date.now(); contextKeywordTracker.updateSession(sessionId, message); console.log(`[Chat] User(${sessionId}): ${message}`); // 写入数据库:用户消息 db.addMessage(sessionId, 'user', message, 'chat_user').catch(e => console.warn('[DB] addMessage failed:', e.message)); const fastGreetingReply = buildFastGreetingReply(message); if (fastGreetingReply) { db.addMessage(sessionId, 'assistant', fastGreetingReply, 'chat_bot').catch(e => console.warn('[DB] addMessage failed:', e.message)); return res.json({ success: true, data: { content: fastGreetingReply, }, }); } const knowledgeReply = await tryKnowledgeReply(sessionId, session, message); if (knowledgeReply) { session.handoffSummaryUsed = true; db.addMessage(sessionId, 'assistant', knowledgeReply.content, 'chat_bot', 'search_knowledge', knowledgeReply.meta).catch(e => console.warn('[DB] addMessage failed:', e.message)); return res.json({ success: true, data: { content: knowledgeReply.content, }, }); } // 首次对话时注入语音历史作为上下文,之后 Coze 自动管理会话历史 const extraMessages = !session.conversationId ? buildInitialContextMessages(session) : []; const result = await cozeChatService.chat( session.userId, message, session.conversationId, extraMessages ); const normalizedContent = normalizeAssistantText(result.content); // 保存 Coze 返回的 conversationId session.conversationId = result.conversationId; session.handoffSummaryUsed = true; console.log(`[Chat] Assistant(${sessionId}): ${normalizedContent?.substring(0, 100)}`); // 写入数据库:AI 回复 if (normalizedContent) { db.addMessage(sessionId, 'assistant', normalizedContent, 'chat_bot').catch(e => console.warn('[DB] addMessage failed:', e.message)); } res.json({ success: true, data: { content: normalizedContent, }, }); } catch (error) { console.error('[Chat] Send failed:', error.message); res.status(500).json({ success: false, error: error.message }); } }); /** * GET /api/chat/history/:sessionId * 获取会话状态 */ router.get('/history/:sessionId', (req, res) => { const session = chatSessions.get(req.params.sessionId); if (!session) { return res.json({ success: true, data: [] }); } res.json({ success: true, data: { conversationId: session.conversationId, fromVoice: session.fromVoice, }, }); }); /** * POST /api/chat/send-stream * 流式发送文字消息(SSE),逐字输出 Coze 智能体回复 */ router.post('/send-stream', async (req, res) => { const { sessionId, message, userId = null } = req.body; if (!sessionId || !message) { return res.status(400).json({ success: false, error: 'sessionId and message are required' }); } let session = chatSessions.get(sessionId); if (!session) { session = await buildChatSessionState(sessionId, [], userId); chatSessions.set(sessionId, session); } if (userId) { session.userId = userId; session.profileUserId = userId; } session.lastActiveAt = Date.now(); contextKeywordTracker.updateSession(sessionId, message); console.log(`[Chat][SSE] User(${sessionId}): ${message}`); // 写入数据库:用户消息 db.addMessage(sessionId, 'user', message, 'chat_user').catch(e => console.warn('[DB] addMessage failed:', e.message)); // 设置 SSE 响应头 res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); res.setHeader('X-Accel-Buffering', 'no'); res.flushHeaders(); const fastGreetingReply = buildFastGreetingReply(message); if (fastGreetingReply) { db.addMessage(sessionId, 'assistant', fastGreetingReply, 'chat_bot').catch(e => console.warn('[DB] addMessage failed:', e.message)); res.write(`data: ${JSON.stringify({ type: 'done', content: fastGreetingReply })}\n\n`); return res.end(); } try { const knowledgeReply = await tryKnowledgeReply(sessionId, session, message); if (knowledgeReply) { session.handoffSummaryUsed = true; db.addMessage(sessionId, 'assistant', knowledgeReply.content, 'chat_bot', 'search_knowledge', knowledgeReply.meta).catch(e => console.warn('[DB] addMessage failed:', e.message)); res.write(`data: ${JSON.stringify({ type: 'done', content: knowledgeReply.content })}\n\n`); return res.end(); } // 首次对话时注入语音历史作为上下文 const extraMessages = !session.conversationId ? buildInitialContextMessages(session) : []; // 流式缓冲检测:累积 chunk 内容,实时检测有害关键词 let streamBuffer = ''; let harmfulDetected = false; const result = await cozeChatService.chatStream( session.userId, message, session.conversationId, extraMessages, { onChunk: (text) => { if (harmfulDetected) return; streamBuffer += text; // 实时检测流式内容是否包含有害关键词 if (isBrandHarmful(streamBuffer)) { harmfulDetected = true; console.warn(`[Chat][SSE][SafeGuard] harmful content detected in stream, intercepting session=${sessionId} buffer=${JSON.stringify(streamBuffer.slice(0, 200))}`); // 发送重置信号,告诉前端丢弃已收到的 chunk res.write(`data: ${JSON.stringify({ type: 'stream_reset', reason: 'content_safety' })}\n\n`); return; } res.write(`data: ${JSON.stringify({ type: 'chunk', content: text })}\n\n`); }, onDone: () => {}, } ); // 如果流式过程中检测到有害内容,使用安全回复替换 const finalContent = harmfulDetected ? getTextSafeReply() : normalizeAssistantText(result.content); // 保存 Coze 返回的 conversationId session.conversationId = result.conversationId; session.handoffSummaryUsed = true; console.log(`[Chat][SSE] Assistant(${sessionId}): ${finalContent?.substring(0, 100)}${harmfulDetected ? ' [SAFE_REPLACED]' : ''}`); // 写入数据库:AI 回复 if (finalContent) { db.addMessage(sessionId, 'assistant', finalContent, 'chat_bot').catch(e => console.warn('[DB] addMessage failed:', e.message)); } res.write(`data: ${JSON.stringify({ type: 'done', content: finalContent })}\n\n`); res.end(); } catch (error) { console.error('[Chat][SSE] Stream failed:', error.message); res.write(`data: ${JSON.stringify({ type: 'error', error: error.message })}\n\n`); res.end(); } }); /** * DELETE /api/chat/:sessionId * 删除对话会话 */ router.delete('/:sessionId', (req, res) => { chatSessions.delete(req.params.sessionId); res.json({ success: true }); }); // 定时清理过期会话(30 分钟无活动) setInterval(() => { const now = Date.now(); const TTL = 30 * 60 * 1000; for (const [id, session] of chatSessions) { if (now - (session.lastActiveAt || session.createdAt) > TTL) { chatSessions.delete(id); console.log(`[Chat] Session expired and cleaned: ${id}`); } } }, 5 * 60 * 1000); module.exports = router;