- New: conversationSummarizer.js (LLM summary every 3 turns, loadBestSummary, persistFinalSummary) - db/index.js: conversation_summaries table, upsertConversationSummary, getSessionSummary - redisClient.js: setSummary/getSummary (TTL 2h) - nativeVoiceGateway.js: _turnCount tracking, trigger summarize, persist on close - realtimeDialogRouting.js: inject summary context, reduce history 5->3 rounds - Fix: messages source ENUM missing 'search_knowledge' causing chat DB writes to fail
1272 lines
57 KiB
JavaScript
1272 lines
57 KiB
JavaScript
const { WebSocket, WebSocketServer } = require('ws');
|
||
const url = require('url');
|
||
const db = require('../db');
|
||
const { correctAsrText } = require('./fastAsrCorrector');
|
||
const contextKeywordTracker = require('./contextKeywordTracker');
|
||
const { isBrandHarmful, getVoiceSafeReply, BRAND_HARMFUL_PATTERN, BRAND_POSITIVE_LEGALITY_PATTERN } = require('./contentSafeGuard');
|
||
const {
|
||
MsgType,
|
||
unmarshal,
|
||
createStartConnectionMessage,
|
||
createStartSessionMessage,
|
||
createAudioMessage,
|
||
createChatTTSTextMessage,
|
||
createSayHelloMessage,
|
||
createChatRAGTextMessage,
|
||
} = require('./realtimeDialogProtocol');
|
||
const {
|
||
getRuleBasedDirectRouteDecision,
|
||
normalizeKnowledgeAlias,
|
||
normalizeTextForSpeech,
|
||
splitTextForSpeech,
|
||
estimateSpeechDurationMs,
|
||
shouldForceKnowledgeRoute,
|
||
isPureChitchat,
|
||
resolveReply,
|
||
} = require('./realtimeDialogRouting');
|
||
const ToolExecutor = require('./toolExecutor');
|
||
const {
|
||
DEFAULT_VOICE_ASSISTANT_PROFILE,
|
||
DEFAULT_CONSULTANT_CONTACT,
|
||
resolveAssistantProfile,
|
||
getAssistantDisplayName,
|
||
buildVoiceSystemRole,
|
||
buildVoiceGreeting,
|
||
} = require('./assistantProfileConfig');
|
||
const { getAssistantProfile } = require('./assistantProfileService');
|
||
const redisClient = require('./redisClient');
|
||
const { checkProductLinkTrigger } = require('./productLinkTrigger');
|
||
const { triggerSummarizeIfNeeded, persistFinalSummary } = require('./conversationSummarizer');
|
||
|
||
const sessions = new Map();
|
||
|
||
const CONSULTANT_REFERRAL_PATTERN = /咨询(?:专业|你的)?顾问|健康管理顾问|联系顾问|一对一指导|咨询专业|咨询医生|咨询营养师|咨询专业人士|建议.*咨询|问问医生|问问.*营养师/;
|
||
|
||
const IDLE_TIMEOUT_MS = 5 * 60 * 1000;
|
||
const AUDIO_KEEPALIVE_INTERVAL_MS = 20 * 1000;
|
||
// 3200 bytes ≈ 66ms of silence at 24kHz s16le mono (larger frame to ensure S2S acceptance)
|
||
const SILENT_AUDIO_FRAME = Buffer.alloc(3200, 0);
|
||
|
||
const DEFAULT_VOICE_BOT_NAME = getAssistantDisplayName(DEFAULT_VOICE_ASSISTANT_PROFILE);
|
||
|
||
const DEFAULT_VOICE_SYSTEM_ROLE = buildVoiceSystemRole();
|
||
|
||
const DEFAULT_VOICE_SPEAKING_STYLE = '整体语气亲切自然、轻快有温度,像熟悉行业的朋友在语音聊天。优先短句和口语化表达,先给结论,再补一句最有帮助的信息。不要播音腔,不要念稿,不要客服腔,不要过度热情,也不要输出任何思考过程。';
|
||
|
||
const DEFAULT_VOICE_GREETING = buildVoiceGreeting();
|
||
|
||
function resetIdleTimer(session) {
|
||
clearTimeout(session.idleTimer);
|
||
session.lastActivityAt = Date.now();
|
||
session.idleTimer = setTimeout(() => {
|
||
session.idleTimer = null;
|
||
console.log(`[NativeVoice] idle timeout (${IDLE_TIMEOUT_MS / 1000}s) session=${session.sessionId}`);
|
||
sendJson(session.client, { type: 'idle_timeout', timeout: IDLE_TIMEOUT_MS });
|
||
setTimeout(() => {
|
||
if (session.client && session.client.readyState === WebSocket.OPEN) {
|
||
session.client.close();
|
||
}
|
||
}, 2000);
|
||
}, IDLE_TIMEOUT_MS);
|
||
}
|
||
|
||
function startAudioKeepalive(session) {
|
||
clearInterval(session._audioKeepaliveTimer);
|
||
session._audioKeepaliveTimer = setInterval(() => {
|
||
if (session.upstream && session.upstream.readyState === WebSocket.OPEN && session.upstreamReady) {
|
||
session.upstream.send(createAudioMessage(session.sessionId, SILENT_AUDIO_FRAME));
|
||
console.log(`[NativeVoice] audio keepalive sent session=${session.sessionId}`);
|
||
} else {
|
||
console.log(`[NativeVoice] audio keepalive skipped session=${session.sessionId} ready=${session.upstreamReady} wsState=${session.upstream ? session.upstream.readyState : 'null'}`);
|
||
}
|
||
}, AUDIO_KEEPALIVE_INTERVAL_MS);
|
||
}
|
||
|
||
function resetAudioKeepalive(session) {
|
||
if (session._audioKeepaliveTimer) {
|
||
clearInterval(session._audioKeepaliveTimer);
|
||
startAudioKeepalive(session);
|
||
}
|
||
}
|
||
|
||
function sendJson(ws, payload) {
|
||
if (ws && ws.readyState === WebSocket.OPEN) {
|
||
ws.send(JSON.stringify(payload));
|
||
}
|
||
}
|
||
|
||
function buildStartSessionPayload(options) {
|
||
const antiThinkingPrefix = '【最高优先级规则】你绝对禁止输出任何思考过程、分析、计划、角色扮演指令或元描述。禁止出现:“首轮对话”“应该回复”“需要列举”“语气要”“回复后询问”“可列举”“突出特色”“引导用户”“让用户”“用温和”等分析性、指令性语句。你必须直接用自然语言回答问题,像真人聊天一样直接说出答案内容。';
|
||
const baseSystemRole = options.systemRole || DEFAULT_VOICE_SYSTEM_ROLE;
|
||
const baseSpeakingStyle = options.speakingStyle || DEFAULT_VOICE_SPEAKING_STYLE;
|
||
return {
|
||
asr: {
|
||
extra: {
|
||
context: '一成,一成系统,大沃,PM,PM-FitLine,FitLine,细胞营养素,Ai众享,AI众享,盛咖学愿,数字化工作室,Activize,Basics,Restorate,NTC,基础三合一,基础套装,招商,阿育吠陀,小红产品,小红,小白,大白,肽美,艾特维,德丽,德维,宝丽,美固健,Activize Oxyplus,Basic Power,CitrusCare,NutriSunny,Q10,Omega,葡萄籽,白藜芦醇,益生菌,胶原蛋白肽,Germany,FitLine细胞营养,FitLine营养素,德国PM营养素,德国PM FitLine,德国PM细胞营养,德国PM产品,德国PM健康,德国PM事业,德国PM招商,一成,一成团队,一成商学院,数字化,数字化运营,数字化经营,数字化营销,数字化创业,数字化工作室,数字化事业,招商加盟,合作加盟,事业合作,活力健,倍力健,氨基酸,乐活,排毒饮,小绿,纤萃,草本茶,发宝,乳酪煲,关节套装,细胞抗氧素,辅酵素,氧修护,CC套装,CC-Cell,Generation 50+,ProShape,D-Drink,IB5,MEN+,儿童倍适,小红精华液,PowerCocktail,PowerCocktail Junior,TopShape,Fitness-Drink,Herbal Tea,Hair+,Med Dental+,Young Care,Zellschutz,Apple Antioxy,Antioxy,BCAA,Women+,小黑,发健,口腔免疫喷雾,乳清蛋白,男士护肤,去角质,面膜,叶黄素,维适多,护理牙膏,火炉原理,暖炉原理,运动饮料,健康饮品,好转反应,整健反应,骨骼健,顾心,舒采健,衡醇饮,小粉C,异黄酮,倍适,眼霜,洁面,爽肤水',
|
||
boosting_table_id: 'ab4fde15-79b5-47e9-82b6-5125cca39f63',
|
||
nbest: 1,
|
||
},
|
||
},
|
||
tts: {
|
||
speaker: options.speaker || process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts',
|
||
audio_config: {
|
||
channel: 1,
|
||
format: 'pcm_s16le',
|
||
sample_rate: 24000,
|
||
},
|
||
},
|
||
dialog: {
|
||
dialog_id: '',
|
||
bot_name: options.botName || '大沃',
|
||
system_role: normalizeTextForSpeech(`${antiThinkingPrefix} ${baseSystemRole}`),
|
||
speaking_style: normalizeTextForSpeech(baseSpeakingStyle),
|
||
extra: {
|
||
input_mod: 'audio',
|
||
model: options.modelVersion || 'SC2.0',
|
||
strict_audit: false,
|
||
audit_response: '抱歉,这个问题我暂时无法回答。',
|
||
},
|
||
},
|
||
};
|
||
}
|
||
|
||
function parseJsonPayload(message) {
|
||
try {
|
||
return JSON.parse(message.payload.toString('utf8'));
|
||
} catch (error) {
|
||
return null;
|
||
}
|
||
}
|
||
|
||
function extractRawText(jsonPayload) {
|
||
const text = jsonPayload?.text
|
||
|| jsonPayload?.content
|
||
|| jsonPayload?.results?.[0]?.text
|
||
|| jsonPayload?.results?.[0]?.alternatives?.[0]?.text
|
||
|| '';
|
||
return String(text || '').trim();
|
||
}
|
||
|
||
function extractUserText(jsonPayload, sessionId = null) {
|
||
let text = extractRawText(jsonPayload);
|
||
text = correctAsrText(text);
|
||
if (sessionId) {
|
||
contextKeywordTracker.updateSession(sessionId, normalizeKnowledgeAlias(text));
|
||
}
|
||
return text;
|
||
}
|
||
|
||
const THINKING_PATTERN = /^(首轮对话|用户想|用户问|应该回复|需要列举|可列举|突出特色|引导进一步|引导用户|让用户|回复后询问|语气要|用温和|需热情|需简洁|需专业)/;
|
||
const THINKING_MID_PATTERN = /(?:需客观回复|应说明其|回复后询问|引导.*对话|用.*口吻回复|语气要.*热情|需要.*引导|应该.*回复|先.*再.*最后)/;
|
||
|
||
function sanitizeAssistantText(text) {
|
||
if (!text) return text;
|
||
if (isBrandHarmful(text)) {
|
||
console.warn(`[NativeVoice][SafeGuard] blocked harmful content: ${JSON.stringify(text.slice(0, 200))}`);
|
||
return getVoiceSafeReply();
|
||
}
|
||
if (THINKING_PATTERN.test(text.trim())) {
|
||
console.warn(`[NativeVoice][SafeGuard] blocked thinking output: ${JSON.stringify(text.slice(0, 200))}`);
|
||
return null;
|
||
}
|
||
return text;
|
||
}
|
||
|
||
function isFinalUserPayload(jsonPayload) {
|
||
if (jsonPayload?.is_final === true) {
|
||
return true;
|
||
}
|
||
if (Array.isArray(jsonPayload?.results)) {
|
||
return jsonPayload.results.some((item) => item && item.is_interim === false);
|
||
}
|
||
return false;
|
||
}
|
||
|
||
function persistUserSpeech(session, text) {
|
||
const cleanText = (text || '').trim();
|
||
if (!cleanText) return false;
|
||
const now = Date.now();
|
||
if (session.lastPersistedUserText === cleanText && now - (session.lastPersistedUserAt || 0) < 5000) {
|
||
return false;
|
||
}
|
||
session.lastPersistedUserText = cleanText;
|
||
session.lastPersistedUserAt = now;
|
||
session.latestUserText = cleanText;
|
||
session.latestUserTurnSeq = (session.latestUserTurnSeq || 0) + 1;
|
||
session._turnCount = (session._turnCount || 0) + 1;
|
||
resetIdleTimer(session);
|
||
db.addMessage(session.sessionId, 'user', cleanText, 'voice_asr').catch((e) => console.warn('[NativeVoice][DB] add user failed:', e.message));
|
||
redisClient.pushMessage(session.sessionId, { role: 'user', content: cleanText, source: 'voice_asr' }).catch(() => {});
|
||
sendJson(session.client, {
|
||
type: 'subtitle',
|
||
role: 'user',
|
||
text: cleanText,
|
||
isFinal: true,
|
||
sequence: `native_user_${now}`,
|
||
});
|
||
return true;
|
||
}
|
||
|
||
function persistAssistantSpeech(session, text, { source = 'voice_bot', toolName = null, persistToDb = true, meta = null } = {}) {
|
||
const cleanText = sanitizeAssistantText((text || '').trim());
|
||
if (!cleanText) return false;
|
||
const now = Date.now();
|
||
if (session.lastPersistedAssistantText === cleanText && now - (session.lastPersistedAssistantAt || 0) < 5000) {
|
||
return false;
|
||
}
|
||
session.lastPersistedAssistantText = cleanText;
|
||
session.lastPersistedAssistantAt = now;
|
||
resetIdleTimer(session);
|
||
if (persistToDb) {
|
||
db.addMessage(session.sessionId, 'assistant', cleanText, source, toolName, meta).catch((e) => console.warn('[NativeVoice][DB] add assistant failed:', e.message));
|
||
redisClient.pushMessage(session.sessionId, { role: 'assistant', content: cleanText, source }).catch(() => {});
|
||
}
|
||
sendJson(session.client, {
|
||
type: 'subtitle',
|
||
role: 'assistant',
|
||
text: cleanText,
|
||
isFinal: true,
|
||
source,
|
||
toolName,
|
||
sequence: `native_assistant_${now}`,
|
||
});
|
||
// 异步触发摘要检查(每N轮)
|
||
if (persistToDb) {
|
||
triggerSummarizeIfNeeded(session, session.sessionId);
|
||
}
|
||
if (CONSULTANT_REFERRAL_PATTERN.test(cleanText)) {
|
||
const contact = session.consultantContact || DEFAULT_CONSULTANT_CONTACT;
|
||
if (contact.mobile || contact.wx_qr_code || contact.wechat_id) {
|
||
console.log(`[NativeVoice] consultant referral detected session=${session.sessionId} text=${JSON.stringify(cleanText.slice(0, 80))}`);
|
||
sendJson(session.client, {
|
||
type: 'consultant_contact',
|
||
name: contact.name || '大沃专业健康管理顾问',
|
||
mobile: contact.mobile || '',
|
||
wx_qr_code: contact.wx_qr_code || '',
|
||
wechat_id: contact.wechat_id || '',
|
||
message: '如需个性化健康建议,可联系大沃专业健康管理顾问',
|
||
});
|
||
}
|
||
}
|
||
return true;
|
||
}
|
||
|
||
function appendAssistantStream(session, payload) {
|
||
const chunkText = extractRawText(payload);
|
||
if (!chunkText) {
|
||
return '';
|
||
}
|
||
const replyId = payload?.reply_id || '';
|
||
if (replyId && session.assistantStreamReplyId && session.assistantStreamReplyId !== replyId) {
|
||
session.assistantStreamBuffer = '';
|
||
}
|
||
session.assistantStreamReplyId = replyId || session.assistantStreamReplyId || '';
|
||
session.assistantStreamBuffer = `${session.assistantStreamBuffer || ''}${chunkText}`;
|
||
return session.assistantStreamBuffer;
|
||
}
|
||
|
||
function flushAssistantStream(session, { source = 'voice_bot', toolName = null, meta = null } = {}) {
|
||
const fullText = (session.assistantStreamBuffer || '').trim();
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
if (!fullText) {
|
||
return false;
|
||
}
|
||
return persistAssistantSpeech(session, fullText, { source, toolName, meta });
|
||
}
|
||
|
||
|
||
async function loadHandoffSummaryForVoice(session) {
|
||
try {
|
||
const history = await db.getHistoryForLLM(session.sessionId, 10);
|
||
if (!history.length) {
|
||
session.handoffSummary = '';
|
||
session.handoffSummaryUsed = false;
|
||
return;
|
||
}
|
||
session.handoffSummary = buildDeterministicHandoffSummary(history);
|
||
session.handoffSummaryUsed = false;
|
||
console.log(`[NativeVoice] Handoff summary prepared for ${session.sessionId}: ${session.handoffSummary ? 'yes' : 'no'}`);
|
||
} catch (error) {
|
||
session.handoffSummary = '';
|
||
session.handoffSummaryUsed = false;
|
||
console.warn('[NativeVoice] loadHandoffSummaryForVoice failed:', error.message);
|
||
}
|
||
}
|
||
|
||
function buildDeterministicHandoffSummary(messages = []) {
|
||
const normalizedMessages = (Array.isArray(messages) ? messages : [])
|
||
.filter((item) => item && (item.role === 'user' || item.role === 'assistant') && String(item.content || '').trim())
|
||
.slice(-8);
|
||
if (!normalizedMessages.length) {
|
||
return '';
|
||
}
|
||
const userMessages = normalizedMessages.filter((item) => item.role === 'user');
|
||
const currentQuestion = String(userMessages[userMessages.length - 1]?.content || '').trim();
|
||
const previousQuestion = String(userMessages[userMessages.length - 2]?.content || '').trim();
|
||
const assistantFacts = normalizedMessages
|
||
.filter((item) => item.role === 'assistant')
|
||
.slice(-2)
|
||
.map((item) => String(item.content || '').trim())
|
||
.filter(Boolean)
|
||
.map((item) => item.slice(0, 60))
|
||
.join(';');
|
||
const parts = [];
|
||
if (currentQuestion) {
|
||
parts.push(`当前问题:${currentQuestion}`);
|
||
}
|
||
if (previousQuestion && previousQuestion !== currentQuestion) {
|
||
parts.push(`上一轮关注:${previousQuestion}`);
|
||
}
|
||
if (assistantFacts) {
|
||
parts.push(`已给信息:${assistantFacts}`);
|
||
}
|
||
return parts.join(';');
|
||
}
|
||
|
||
async function sendSpeechText(session, speechText) {
|
||
const chunks = splitTextForSpeech(speechText);
|
||
if (!chunks.length || !session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
|
||
return;
|
||
}
|
||
console.log(`[NativeVoice] sendSpeechText start session=${session.sessionId} chunks=${chunks.length} textLen=${speechText.length}`);
|
||
session.currentSpeechText = speechText;
|
||
session.isSendingChatTTSText = true;
|
||
session.currentTtsType = 'chat_tts_text';
|
||
session.chatTTSUntil = Date.now() + estimateSpeechDurationMs(speechText) + 800;
|
||
clearTimeout(session.chatTTSTimer);
|
||
session.chatTTSTimer = setTimeout(() => {
|
||
session.chatTTSTimer = null;
|
||
if ((session.chatTTSUntil || 0) <= Date.now()) {
|
||
session.isSendingChatTTSText = false;
|
||
}
|
||
}, Math.max(200, session.chatTTSUntil - Date.now() + 50));
|
||
sendJson(session.client, { type: 'tts_reset', ttsType: 'chat_tts_text' });
|
||
for (let index = 0; index < chunks.length; index += 1) {
|
||
const chunk = chunks[index];
|
||
console.log(`[NativeVoice] sendSpeechText chunk session=${session.sessionId} index=${index + 1}/${chunks.length} len=${chunk.length} start=${index === 0} end=false text=${JSON.stringify(chunk.slice(0, 80))}`);
|
||
session.upstream.send(createChatTTSTextMessage(session.sessionId, {
|
||
start: index === 0,
|
||
end: false,
|
||
content: chunk,
|
||
}));
|
||
}
|
||
console.log(`[NativeVoice] sendSpeechText end session=${session.sessionId}`);
|
||
session.upstream.send(createChatTTSTextMessage(session.sessionId, {
|
||
start: false,
|
||
end: true,
|
||
content: '',
|
||
}));
|
||
}
|
||
|
||
function sendReady(session) {
|
||
if (session.readySent) {
|
||
return;
|
||
}
|
||
session.readySent = true;
|
||
sendJson(session.client, { type: 'ready' });
|
||
}
|
||
|
||
function sendGreeting(session) {
|
||
if (session.hasSentGreeting) {
|
||
sendReady(session);
|
||
return;
|
||
}
|
||
session.hasSentGreeting = true;
|
||
const greetingText = session.greetingText || DEFAULT_VOICE_GREETING;
|
||
console.log(`[NativeVoice] sendGreeting session=${session.sessionId} text=${JSON.stringify(greetingText.slice(0, 80))}`);
|
||
persistAssistantSpeech(session, greetingText, { source: 'voice_bot' });
|
||
clearTimeout(session.greetingTimer);
|
||
clearTimeout(session.readyTimer);
|
||
session.greetingSentAt = Date.now();
|
||
session.greetingProtectionUntil = Date.now() + 2000;
|
||
session.currentSpeechText = greetingText;
|
||
try {
|
||
session.upstream.send(createSayHelloMessage(session.sessionId, greetingText));
|
||
console.log(`[NativeVoice] sendSayHello event=300 session=${session.sessionId}`);
|
||
} catch (error) {
|
||
session.hasSentGreeting = false;
|
||
session.greetingProtectionUntil = 0;
|
||
console.warn('[NativeVoice] SayHello failed:', error.message);
|
||
}
|
||
sendReady(session);
|
||
}
|
||
|
||
async function replayGreeting(session) {
|
||
const greetingText = String(session.greetingText || '').trim();
|
||
if (!greetingText || !session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
|
||
return;
|
||
}
|
||
if (session.greetingSentAt && Date.now() - session.greetingSentAt < 6000) {
|
||
console.log(`[NativeVoice] replayGreeting skipped (too soon) session=${session.sessionId}`);
|
||
return;
|
||
}
|
||
console.log(`[NativeVoice] replayGreeting session=${session.sessionId} text=${JSON.stringify(greetingText.slice(0, 80))}`);
|
||
session.greetingSentAt = Date.now();
|
||
session.directSpeakUntil = Date.now() + estimateSpeechDurationMs(greetingText) + 800;
|
||
try {
|
||
session.upstream.send(createSayHelloMessage(session.sessionId, greetingText));
|
||
} catch (error) {
|
||
console.warn('[NativeVoice] replayGreeting SayHello failed:', error.message);
|
||
}
|
||
}
|
||
|
||
async function sendExternalRag(session, items) {
|
||
if (!session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
|
||
return;
|
||
}
|
||
const ragItems = Array.isArray(items) ? items.filter((item) => item && item.content) : [];
|
||
if (!ragItems.length) {
|
||
return;
|
||
}
|
||
session.upstream.send(createChatRAGTextMessage(session.sessionId, JSON.stringify(ragItems)));
|
||
}
|
||
|
||
function clearUpstreamSuppression(session) {
|
||
clearTimeout(session.suppressReplyTimer);
|
||
session.suppressReplyTimer = null;
|
||
session.suppressUpstreamUntil = 0;
|
||
session.awaitingUpstreamReply = false;
|
||
session.pendingAssistantSource = null;
|
||
session.pendingAssistantToolName = null;
|
||
session.pendingAssistantMeta = null;
|
||
session._pendingEvidencePack = null;
|
||
session.pendingAssistantTurnSeq = 0;
|
||
session.blockUpstreamAudio = false;
|
||
sendJson(session.client, { type: 'assistant_pending', active: false });
|
||
}
|
||
|
||
function suppressUpstreamReply(session, durationMs) {
|
||
clearTimeout(session.suppressReplyTimer);
|
||
session.awaitingUpstreamReply = true;
|
||
session.suppressUpstreamUntil = Date.now() + Math.max(1000, durationMs);
|
||
session.suppressReplyTimer = setTimeout(() => {
|
||
session.suppressReplyTimer = null;
|
||
if ((session.suppressUpstreamUntil || 0) > Date.now()) {
|
||
return;
|
||
}
|
||
clearUpstreamSuppression(session);
|
||
}, Math.max(300, session.suppressUpstreamUntil - Date.now()));
|
||
}
|
||
|
||
async function processReply(session, text, turnSeq = session.latestUserTurnSeq || 0) {
|
||
const cleanText = (text || '').trim();
|
||
if (!cleanText) return;
|
||
if (session.processingReply) {
|
||
session.queuedUserText = cleanText;
|
||
session.queuedUserTurnSeq = turnSeq;
|
||
console.log(`[NativeVoice] processReply queued(busy) session=${session.sessionId} text=${JSON.stringify(cleanText.slice(0, 80))}`);
|
||
return;
|
||
}
|
||
const now = Date.now();
|
||
if (session.directSpeakUntil && now < session.directSpeakUntil) {
|
||
session.queuedUserText = cleanText;
|
||
session.queuedUserTurnSeq = turnSeq;
|
||
console.log(`[NativeVoice] processReply queued(speaking) session=${session.sessionId} waitMs=${session.directSpeakUntil - now} text=${JSON.stringify(cleanText.slice(0, 80))}`);
|
||
return;
|
||
}
|
||
const activeTurnSeq = turnSeq || session.latestUserTurnSeq || 0;
|
||
session.processingReply = true;
|
||
sendJson(session.client, { type: 'assistant_pending', active: true });
|
||
// KB-First: 所有非闲聊查询都视为KB候选,阻断S2S音频等待KB结果
|
||
let isKnowledgeCandidate = !isPureChitchat(cleanText);
|
||
if (isKnowledgeCandidate) {
|
||
session.blockUpstreamAudio = true;
|
||
suppressUpstreamReply(session, 30000);
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'processing' });
|
||
// 过渡语已移除:KB查询优化后延迟已降至~2.6s,无需填充
|
||
session._fillerActive = false;
|
||
}
|
||
console.log(`[NativeVoice] processReply start session=${session.sessionId} text=${JSON.stringify(cleanText.slice(0, 120))} blocked=${session.blockUpstreamAudio} kbCandidate=${isKnowledgeCandidate}`);
|
||
try {
|
||
// KB预查缓存消费:如果partial阶段已启动预查且文本匹配,直接使用缓存结果
|
||
let resolveResult = null;
|
||
if (isKnowledgeCandidate && session.pendingKbPrequery && session._kbPrequeryText) {
|
||
const preText = (session._kbPrequeryText || '').replace(/[啊哦嗯呢呀哎诶额,。!?、,.\s]/g, '');
|
||
const finalText = cleanText.replace(/[啊哦嗯呢呀哎诶额,。!?、,.\s]/g, '');
|
||
// 放宽相似度:子串包含 或 重叠字符占比>=60% 即视为匹配
|
||
let isSimilar = preText && finalText && (finalText.includes(preText) || preText.includes(finalText));
|
||
if (!isSimilar && preText && finalText) {
|
||
const shorter = preText.length <= finalText.length ? preText : finalText;
|
||
const longer = preText.length <= finalText.length ? finalText : preText;
|
||
let overlap = 0;
|
||
for (let i = 0; i < shorter.length; i++) {
|
||
if (longer.includes(shorter[i])) overlap++;
|
||
}
|
||
isSimilar = overlap / shorter.length >= 0.45;
|
||
}
|
||
if (isSimilar) {
|
||
const prequeryResult = await session.pendingKbPrequery;
|
||
// 只复用 hit 结果;no-hit 可能因 partial 文本路由不完整,用完整文本 re-search
|
||
if (prequeryResult && prequeryResult.delivery !== 'upstream_chat') {
|
||
console.log(`[NativeVoice] using KB prequery cache (hit) session=${session.sessionId} preText=${JSON.stringify(session._kbPrequeryText.slice(0, 60))}`);
|
||
resolveResult = prequeryResult;
|
||
} else {
|
||
console.log(`[NativeVoice] prequery no-hit, re-searching with full text session=${session.sessionId} preText=${JSON.stringify((session._kbPrequeryText || '').slice(0, 40))} finalText=${JSON.stringify(cleanText.slice(0, 40))}`);
|
||
}
|
||
} else {
|
||
console.log(`[NativeVoice] KB prequery text mismatch, re-querying session=${session.sessionId} pre=${JSON.stringify(preText.slice(0, 40))} final=${JSON.stringify(finalText.slice(0, 40))}`);
|
||
}
|
||
}
|
||
session.pendingKbPrequery = null;
|
||
session._kbPrequeryText = '';
|
||
session._kbPrequeryStartedAt = 0;
|
||
if (!resolveResult) {
|
||
resolveResult = await resolveReply(session.sessionId, session, cleanText);
|
||
}
|
||
const { delivery, speechText, ragItems, source, toolName, routeDecision, responseMeta, evidencePack } = resolveResult;
|
||
// 产品链接触发检测:用户请求查看产品详情时推送对应链接
|
||
const productLinkResult = checkProductLinkTrigger(cleanText);
|
||
if (productLinkResult.triggered && productLinkResult.product) {
|
||
console.log(`[NativeVoice] product link triggered session=${session.sessionId} product=${productLinkResult.product.name}`);
|
||
sendJson(session.client, {
|
||
type: 'product_link',
|
||
product: productLinkResult.product.name,
|
||
link: productLinkResult.product.link,
|
||
description: productLinkResult.product.description,
|
||
});
|
||
}
|
||
if (activeTurnSeq !== (session.latestUserTurnSeq || 0)) {
|
||
console.log(`[NativeVoice] stale reply ignored session=${session.sessionId} activeTurn=${activeTurnSeq} latestTurn=${session.latestUserTurnSeq || 0}`);
|
||
clearUpstreamSuppression(session);
|
||
return;
|
||
}
|
||
if (delivery === 'upstream_chat') {
|
||
// kbCandidate 但 S2S 未调工具 → 放开 S2S 自然回复
|
||
// 依赖:1) system prompt 品牌保护指令引导 S2S 调工具 2) isBrandHarmful 流式拦截兜底
|
||
if (isKnowledgeCandidate) {
|
||
console.log(`[NativeVoice] processReply kbCandidate+upstream_chat, unblock S2S session=${session.sessionId}`);
|
||
}
|
||
session.blockUpstreamAudio = false;
|
||
session._lastPartialAt = 0;
|
||
session._pendingEvidencePack = null;
|
||
session.awaitingUpstreamReply = true;
|
||
session.pendingAssistantSource = 'voice_bot';
|
||
session.pendingAssistantToolName = null;
|
||
session.pendingAssistantMeta = responseMeta;
|
||
session.pendingAssistantTurnSeq = activeTurnSeq;
|
||
console.log(`[NativeVoice] processReply handoff session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=upstream_chat`);
|
||
return;
|
||
}
|
||
if (delivery === 'external_rag') {
|
||
if (!session.blockUpstreamAudio) {
|
||
session.blockUpstreamAudio = true;
|
||
}
|
||
session.discardNextAssistantResponse = true;
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'knowledge_hit' });
|
||
const ragContent = (ragItems || []).filter((item) => item && item.content && item.kind !== 'context');
|
||
if (ragContent.length > 0) {
|
||
console.log(`[NativeVoice] processReply sending external_rag to S2S session=${session.sessionId} route=${routeDecision?.route || 'unknown'} items=${ragContent.length}`);
|
||
// KB话题记忆:记录本轮用户原始问题和时间戳,用于保护窗口和追问enrichment
|
||
if (responseMeta?.hit !== false && responseMeta?.reason !== 'honest_fallback') {
|
||
session._lastKbTopic = cleanText;
|
||
session._lastKbHitAt = Date.now();
|
||
}
|
||
session._pendingEvidencePack = evidencePack || null;
|
||
// 不提前发KB原文作字幕:等S2S event 351返回实际语音文本后再更新字幕
|
||
// 这样字幕和语音保持一致(S2S会基于RAG内容生成自然口语化的回答)
|
||
session._pendingExternalRagReply = true;
|
||
await sendExternalRag(session, ragContent);
|
||
session.awaitingUpstreamReply = true;
|
||
session.pendingAssistantSource = source;
|
||
session.pendingAssistantToolName = toolName;
|
||
session.pendingAssistantMeta = responseMeta;
|
||
session.pendingAssistantTurnSeq = activeTurnSeq;
|
||
} else {
|
||
console.log(`[NativeVoice] processReply external_rag empty content, fallback to upstream session=${session.sessionId}`);
|
||
session.blockUpstreamAudio = false;
|
||
clearUpstreamSuppression(session);
|
||
}
|
||
return;
|
||
}
|
||
if (!speechText) {
|
||
console.log(`[NativeVoice] processReply empty session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=${delivery || 'unknown'}`);
|
||
session.isSendingChatTTSText = false;
|
||
session.chatTTSUntil = 0;
|
||
return;
|
||
}
|
||
console.log(`[NativeVoice] processReply resolved session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=local_tts source=${source} tool=${toolName || 'chat'} speechLen=${speechText.length}`);
|
||
session.directSpeakUntil = Date.now() + estimateSpeechDurationMs(speechText) + 800;
|
||
suppressUpstreamReply(session, estimateSpeechDurationMs(speechText) + 1800);
|
||
session.lastDeliveredAssistantTurnSeq = activeTurnSeq;
|
||
persistAssistantSpeech(session, speechText, { source, toolName, meta: responseMeta });
|
||
await sendSpeechText(session, speechText);
|
||
} catch (error) {
|
||
console.error('[NativeVoice] processReply failed:', error.message);
|
||
sendJson(session.client, { type: 'error', error: error.message });
|
||
} finally {
|
||
session.processingReply = false;
|
||
if (!session.awaitingUpstreamReply) {
|
||
session.blockUpstreamAudio = false;
|
||
}
|
||
if (!session.awaitingUpstreamReply) {
|
||
sendJson(session.client, { type: 'assistant_pending', active: false });
|
||
}
|
||
const pending = session.queuedUserText;
|
||
const pendingTurnSeq = session.queuedUserTurnSeq || 0;
|
||
session.queuedUserText = '';
|
||
session.queuedUserTurnSeq = 0;
|
||
if (pending && pendingTurnSeq && pendingTurnSeq !== activeTurnSeq && (!session.directSpeakUntil || Date.now() >= session.directSpeakUntil)) {
|
||
setTimeout(() => {
|
||
session.blockUpstreamAudio = true;
|
||
processReply(session, pending, pendingTurnSeq).catch((err) => {
|
||
console.error('[NativeVoice] queued processReply failed:', err.message);
|
||
});
|
||
}, 200);
|
||
} else if (pending && pendingTurnSeq && pendingTurnSeq !== activeTurnSeq) {
|
||
const waitMs = Math.max(200, session.directSpeakUntil - Date.now() + 200);
|
||
clearTimeout(session.queuedReplyTimer);
|
||
session.queuedReplyTimer = setTimeout(() => {
|
||
session.queuedReplyTimer = null;
|
||
const queuedText = session.queuedUserText || pending;
|
||
const queuedTurnSeq = session.queuedUserTurnSeq || pendingTurnSeq;
|
||
session.queuedUserText = '';
|
||
session.queuedUserTurnSeq = 0;
|
||
session.blockUpstreamAudio = true;
|
||
processReply(session, queuedText, queuedTurnSeq).catch((err) => {
|
||
console.error('[NativeVoice] delayed queued processReply failed:', err.message);
|
||
});
|
||
}, waitMs);
|
||
}
|
||
}
|
||
}
|
||
|
||
function handleUpstreamMessage(session, data) {
|
||
let message;
|
||
try {
|
||
message = unmarshal(data);
|
||
} catch (error) {
|
||
console.warn('[NativeVoice] unmarshal failed:', error.message);
|
||
return;
|
||
}
|
||
|
||
if (message.type === MsgType.AUDIO_ONLY_SERVER) {
|
||
const isDefaultTts = !session.currentTtsType || session.currentTtsType === 'default';
|
||
const isSuppressingUpstreamAudio = (session.suppressUpstreamUntil || 0) > Date.now() && isDefaultTts;
|
||
// 用户刚停止说话后短暂阻止默认TTS音频,给event 459的blockUpstreamAudio留时间生效
|
||
const isUserJustSpeaking = isDefaultTts && session._lastPartialAt && (Date.now() - session._lastPartialAt < 800);
|
||
// blockUpstreamAudio 生效时:仅放行 external_rag 和限时过渡语音频,其余全部阻断
|
||
// 修复:旧逻辑只阻断 isDefaultTts,导致 chat_tts_text 窗口期 S2S 自主回复音频泄漏
|
||
const isBlockPassthrough = session.currentTtsType === 'external_rag' ||
|
||
(session._fillerActive && (session.chatTTSUntil || 0) > Date.now());
|
||
if ((session.blockUpstreamAudio && !isBlockPassthrough) || isSuppressingUpstreamAudio || isUserJustSpeaking) {
|
||
if (!session._audioBlockLogOnce) {
|
||
session._audioBlockLogOnce = true;
|
||
console.log(`[NativeVoice] audio blocked session=${session.sessionId} ttsType=${session.currentTtsType} block=${session.blockUpstreamAudio} suppress=${isSuppressingUpstreamAudio}`);
|
||
}
|
||
return;
|
||
}
|
||
session._audioBlockLogOnce = false;
|
||
if (session.client && session.client.readyState === WebSocket.OPEN) {
|
||
session.client.send(message.payload, { binary: true });
|
||
}
|
||
return;
|
||
}
|
||
|
||
const payload = parseJsonPayload(message);
|
||
if (message.type === MsgType.ERROR) {
|
||
console.error(`[NativeVoice] upstream error session=${session.sessionId} code=${message.event} payload=${message.payload.toString('utf8').slice(0, 200)}`);
|
||
sendJson(session.client, { type: 'error', error: message.payload.toString('utf8') });
|
||
return;
|
||
}
|
||
|
||
if (message.type !== MsgType.FULL_SERVER) {
|
||
return;
|
||
}
|
||
|
||
if (message.event === 150) {
|
||
session.upstreamReady = true;
|
||
console.log(`[NativeVoice] upstream ready session=${session.sessionId}`);
|
||
resetIdleTimer(session);
|
||
startAudioKeepalive(session);
|
||
sendGreeting(session);
|
||
return;
|
||
}
|
||
|
||
if (message.event === 300) {
|
||
console.log(`[NativeVoice] SayHello response session=${session.sessionId}`);
|
||
return;
|
||
}
|
||
|
||
if (message.event === 350) {
|
||
session.currentTtsType = payload?.tts_type || '';
|
||
if (payload?.tts_type === 'chat_tts_text' && session.pendingGreetingAck) {
|
||
session.pendingGreetingAck = false;
|
||
clearTimeout(session.greetingAckTimer);
|
||
session.greetingAckTimer = null;
|
||
}
|
||
if (session.blockUpstreamAudio && payload?.tts_type === 'external_rag') {
|
||
session.blockUpstreamAudio = false;
|
||
session.suppressUpstreamUntil = 0;
|
||
clearTimeout(session.suppressReplyTimer);
|
||
session.suppressReplyTimer = null;
|
||
// 注意:不清除discardNextAssistantResponse,让它拦截S2S默认回复的残留event 351
|
||
// 该标记会在KB回复的event 550 chunks到达时自动清除
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
// 清除过渡语的chat TTS状态,确保external_rag回复不被isLocalChatTTSTextActive拦截
|
||
session.isSendingChatTTSText = false;
|
||
session.chatTTSUntil = 0;
|
||
session.currentSpeechText = '';
|
||
session._fillerActive = false;
|
||
clearTimeout(session.chatTTSTimer);
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'rag_response_start' });
|
||
console.log(`[NativeVoice] unblock for external_rag tts session=${session.sessionId}`);
|
||
} else if (session.blockUpstreamAudio && payload?.tts_type === 'chat_tts_text') {
|
||
console.log(`[NativeVoice] chat_tts_text started, keeping block for S2S default response session=${session.sessionId}`);
|
||
}
|
||
console.log(`[NativeVoice] upstream tts_event session=${session.sessionId} ttsType=${payload?.tts_type || ''}`);
|
||
sendJson(session.client, { type: 'tts_event', payload });
|
||
return;
|
||
}
|
||
|
||
const isLocalChatTTSTextActive = !!session.isSendingChatTTSText && (session.chatTTSUntil || 0) > Date.now();
|
||
const isSuppressingUpstreamReply = (session.suppressUpstreamUntil || 0) > Date.now();
|
||
|
||
if (message.event === 351) {
|
||
if (isLocalChatTTSTextActive || session.blockUpstreamAudio || isSuppressingUpstreamReply) {
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
return;
|
||
}
|
||
if (session.discardNextAssistantResponse) {
|
||
session.discardNextAssistantResponse = false;
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
console.log(`[NativeVoice] discarded stale assistant response (kb-nohit retrigger) session=${session.sessionId}`);
|
||
return;
|
||
}
|
||
const pendingAssistantSource = session.pendingAssistantSource || 'voice_bot';
|
||
const pendingAssistantToolName = session.pendingAssistantToolName || null;
|
||
const pendingAssistantMeta = session.pendingAssistantMeta || null;
|
||
const pendingAssistantTurnSeq = session.pendingAssistantTurnSeq || session.latestUserTurnSeq || 0;
|
||
session.awaitingUpstreamReply = false;
|
||
sendJson(session.client, { type: 'assistant_pending', active: false });
|
||
if (pendingAssistantTurnSeq && session.lastDeliveredAssistantTurnSeq === pendingAssistantTurnSeq) {
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
session.pendingAssistantSource = null;
|
||
session.pendingAssistantToolName = null;
|
||
session.pendingAssistantMeta = null;
|
||
session._pendingEvidencePack = null;
|
||
console.log(`[NativeVoice] duplicate assistant final ignored (351) session=${session.sessionId} turn=${pendingAssistantTurnSeq}`);
|
||
return;
|
||
}
|
||
const assistantText = extractRawText(payload);
|
||
if (assistantText) {
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
// 过渡语的event 351:不持久化,直接丢弃
|
||
if (session._fillerActive) {
|
||
console.log(`[NativeVoice] discarded filler assistant text session=${session.sessionId}`);
|
||
session._fillerActive = false;
|
||
return;
|
||
}
|
||
// 清除external_rag等待标记,KB回复已到达
|
||
if (session._pendingExternalRagReply) {
|
||
session._pendingExternalRagReply = false;
|
||
}
|
||
console.log(`[NativeVoice] upstream assistant session=${session.sessionId} text=${JSON.stringify(assistantText.slice(0, 120))}`);
|
||
session.lastDeliveredAssistantTurnSeq = pendingAssistantTurnSeq;
|
||
persistAssistantSpeech(session, assistantText, {
|
||
source: pendingAssistantSource,
|
||
toolName: pendingAssistantToolName,
|
||
meta: pendingAssistantMeta,
|
||
});
|
||
// KB回复完成后重新阻断音频,防止下一个问题的S2S默认回复在early block前泄露
|
||
if (session.currentTtsType === 'external_rag') {
|
||
session.blockUpstreamAudio = true;
|
||
console.log(`[NativeVoice] re-blocked after KB response session=${session.sessionId}`);
|
||
}
|
||
} else {
|
||
const didFlush = flushAssistantStream(session, {
|
||
source: pendingAssistantSource,
|
||
toolName: pendingAssistantToolName,
|
||
meta: pendingAssistantMeta,
|
||
});
|
||
if (didFlush) {
|
||
session.lastDeliveredAssistantTurnSeq = pendingAssistantTurnSeq;
|
||
}
|
||
}
|
||
session.pendingAssistantSource = null;
|
||
session.pendingAssistantToolName = null;
|
||
session.pendingAssistantMeta = null;
|
||
session._pendingEvidencePack = null;
|
||
return;
|
||
}
|
||
|
||
if (message.event === 550) {
|
||
// external_rag chunks到达时,清除discardNextAssistantResponse(默认回复的351已过或不会来)
|
||
if (session.discardNextAssistantResponse && session.currentTtsType === 'external_rag') {
|
||
session.discardNextAssistantResponse = false;
|
||
console.log(`[NativeVoice] cleared discardNextAssistantResponse for external_rag stream session=${session.sessionId}`);
|
||
}
|
||
if (isLocalChatTTSTextActive || session.blockUpstreamAudio || isSuppressingUpstreamReply || session.discardNextAssistantResponse) {
|
||
return;
|
||
}
|
||
if (session.awaitingUpstreamReply) {
|
||
session.awaitingUpstreamReply = false;
|
||
sendJson(session.client, { type: 'assistant_pending', active: false });
|
||
}
|
||
const fullText = appendAssistantStream(session, payload);
|
||
if (fullText) {
|
||
// 品牌安全检测:S2S模型输出传销等负面内容时,立即阻断音频并注入安全回复
|
||
if (fullText.length >= 4 && isBrandHarmful(fullText)) {
|
||
console.warn(`[NativeVoice][SafeGuard] harmful content detected in stream, blocking audio session=${session.sessionId} text=${JSON.stringify(fullText.slice(0, 120))}`);
|
||
session.blockUpstreamAudio = true;
|
||
session.discardNextAssistantResponse = true;
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'harmful_blocked' });
|
||
// 注入安全回复语音,替代有害内容
|
||
const safeReply = getVoiceSafeReply();
|
||
persistAssistantSpeech(session, safeReply, { source: 'voice_bot' });
|
||
sendSpeechText(session, safeReply).catch((err) => {
|
||
console.warn('[NativeVoice][SafeGuard] sendSpeechText failed:', err.message);
|
||
});
|
||
return;
|
||
}
|
||
// 检测思考模式:S2S模型输出分析/计划而非直接回答,立即阻断
|
||
if (fullText.length >= 10 && (THINKING_PATTERN.test(fullText.trim()) || THINKING_MID_PATTERN.test(fullText))) {
|
||
console.warn(`[NativeVoice][SafeGuard] thinking detected in stream, blocking audio session=${session.sessionId} text=${JSON.stringify(fullText.slice(0, 120))}`);
|
||
session.blockUpstreamAudio = true;
|
||
session.discardNextAssistantResponse = true;
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'thinking_blocked' });
|
||
return;
|
||
}
|
||
console.log(`[NativeVoice] upstream assistant chunk session=${session.sessionId} len=${fullText.length} text=${JSON.stringify(fullText.slice(0, 120))}`);
|
||
}
|
||
return;
|
||
}
|
||
|
||
if (message.event === 559) {
|
||
if (isLocalChatTTSTextActive || isSuppressingUpstreamReply) {
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
return;
|
||
}
|
||
if (session.blockUpstreamAudio) {
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
console.log(`[NativeVoice] blocked response ended (559), keeping block session=${session.sessionId}`);
|
||
return;
|
||
}
|
||
if (session.discardNextAssistantResponse) {
|
||
session.discardNextAssistantResponse = false;
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
console.log(`[NativeVoice] discarded stale stream end (559, kb-nohit retrigger) session=${session.sessionId}`);
|
||
return;
|
||
}
|
||
// external_rag流期间,阻止默认回复的559过早flush部分KB文本
|
||
if (session._pendingExternalRagReply) {
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
console.log(`[NativeVoice] suppressed 559 flush during external_rag flow session=${session.sessionId}`);
|
||
return;
|
||
}
|
||
const pendingAssistantTurnSeq = session.pendingAssistantTurnSeq || session.latestUserTurnSeq || 0;
|
||
if (pendingAssistantTurnSeq && session.lastDeliveredAssistantTurnSeq === pendingAssistantTurnSeq) {
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
session.pendingAssistantSource = null;
|
||
session.pendingAssistantToolName = null;
|
||
session.pendingAssistantMeta = null;
|
||
session._pendingEvidencePack = null;
|
||
console.log(`[NativeVoice] duplicate assistant final ignored (559) session=${session.sessionId} turn=${pendingAssistantTurnSeq}`);
|
||
return;
|
||
}
|
||
session.awaitingUpstreamReply = false;
|
||
session.blockUpstreamAudio = false;
|
||
sendJson(session.client, { type: 'assistant_pending', active: false });
|
||
const didFlush = flushAssistantStream(session, {
|
||
source: session.pendingAssistantSource || 'voice_bot',
|
||
toolName: session.pendingAssistantToolName || null,
|
||
meta: session.pendingAssistantMeta || null,
|
||
});
|
||
if (didFlush) {
|
||
session.lastDeliveredAssistantTurnSeq = pendingAssistantTurnSeq;
|
||
}
|
||
session.pendingAssistantSource = null;
|
||
session.pendingAssistantToolName = null;
|
||
session.pendingAssistantMeta = null;
|
||
session._pendingEvidencePack = null;
|
||
return;
|
||
}
|
||
|
||
if (message.event === 450 || (message.event === 451 && !isFinalUserPayload(payload))) {
|
||
const text = extractUserText(payload, session.sessionId);
|
||
if (text) {
|
||
const now = Date.now();
|
||
const isDirectSpeaking = session.directSpeakUntil && now < session.directSpeakUntil;
|
||
const isChatTTSSpeaking = session.isSendingChatTTSText && (session.chatTTSUntil || 0) > now;
|
||
// TTS回声检测:播放期间如果ASR识别文本是当前播放文本的子串,判定为回声,忽略
|
||
if ((isDirectSpeaking || isChatTTSSpeaking) && session.currentSpeechText) {
|
||
const normalizedPartial = text.replace(/[,。!?、,.\s]/g, '');
|
||
const normalizedSpeech = session.currentSpeechText.replace(/[,。!?、,.\s]/g, '');
|
||
if (normalizedPartial.length <= 3 || normalizedSpeech.includes(normalizedPartial)) {
|
||
if (!session._echoLogOnce) {
|
||
session._echoLogOnce = true;
|
||
console.log(`[NativeVoice] TTS echo detected, ignoring partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 80))}`);
|
||
}
|
||
return;
|
||
}
|
||
session._echoLogOnce = false;
|
||
} else {
|
||
session._echoLogOnce = false;
|
||
}
|
||
// Greeting保护窗口:发送问候语后短暂保护期内忽略barge-in
|
||
if (session.greetingProtectionUntil && now < session.greetingProtectionUntil) {
|
||
console.log(`[NativeVoice] greeting protection active, ignoring partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 80))}`);
|
||
return;
|
||
}
|
||
console.log(`[NativeVoice] upstream partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 120))}`);
|
||
const normalizedPartial = normalizeKnowledgeAlias(text);
|
||
session.latestUserText = normalizedPartial;
|
||
session._lastPartialAt = now;
|
||
// KB-First: 非闲聊文本一律提前阻断S2S音频,防止有害内容播出
|
||
if (normalizedPartial.length >= 6 && !session.blockUpstreamAudio && !isPureChitchat(normalizedPartial)) {
|
||
session.blockUpstreamAudio = true;
|
||
session.currentTtsType = 'default';
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'early_block' });
|
||
console.log(`[NativeVoice] early block: non-chitchat partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 80))}`);
|
||
// KB预查询:提前启动知识库查询,减少final ASR后的等待时间
|
||
const kbPrequeryDebounce = 600;
|
||
if (normalizedPartial.length >= 8 && (!session._kbPrequeryStartedAt || now - session._kbPrequeryStartedAt > kbPrequeryDebounce)) {
|
||
session._kbPrequeryStartedAt = now;
|
||
session._kbPrequeryText = normalizedPartial;
|
||
console.log(`[NativeVoice] KB prequery started session=${session.sessionId} text=${JSON.stringify(normalizedPartial.slice(0, 80))}`);
|
||
session.pendingKbPrequery = resolveReply(session.sessionId, session, normalizedPartial).catch((err) => {
|
||
console.warn(`[NativeVoice] KB prequery failed session=${session.sessionId}:`, err.message);
|
||
return null;
|
||
});
|
||
}
|
||
}
|
||
// 用户开口说话时立即打断所有 AI 播放(包括 S2S external_rag 音频)
|
||
const isS2SAudioPlaying = !session.blockUpstreamAudio && session.currentTtsType === 'external_rag';
|
||
if (isDirectSpeaking || isChatTTSSpeaking || isS2SAudioPlaying) {
|
||
console.log(`[NativeVoice] user barge-in (partial) session=${session.sessionId} direct=${isDirectSpeaking} chatTTS=${isChatTTSSpeaking} s2sRag=${isS2SAudioPlaying}`);
|
||
session.directSpeakUntil = 0;
|
||
session.isSendingChatTTSText = false;
|
||
session.chatTTSUntil = 0;
|
||
session.currentSpeechText = '';
|
||
clearTimeout(session.chatTTSTimer);
|
||
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
|
||
clearUpstreamSuppression(session);
|
||
}
|
||
// 阻断 S2S 音频转发,防止用户打断后仍听到残留音频
|
||
session.blockUpstreamAudio = true;
|
||
}
|
||
// 无论当前是否在播放,都发送 tts_reset 确保客户端停止所有音频播放
|
||
if (!session._lastBargeInResetAt || now - session._lastBargeInResetAt > 500) {
|
||
session._lastBargeInResetAt = now;
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
|
||
}
|
||
sendJson(session.client, {
|
||
type: 'subtitle',
|
||
role: 'user',
|
||
text: text,
|
||
isFinal: false,
|
||
sequence: `native_partial_${Date.now()}`,
|
||
});
|
||
}
|
||
return;
|
||
}
|
||
|
||
if (message.event === 459 || (message.event === 451 && isFinalUserPayload(payload))) {
|
||
const rawFinalText = extractUserText(payload, session.sessionId) || '';
|
||
const finalText = normalizeKnowledgeAlias(rawFinalText) || session.latestUserText || '';
|
||
const now459 = Date.now();
|
||
// 双事件去重:S2S可能同时发送event 459和event 451(is_final),用去标点归一化文本+时间窗口去重
|
||
const normalizedForDedup = finalText.replace(/[,。!?、,.?!\s]/g, '');
|
||
if (normalizedForDedup && session._lastFinalNormalized === normalizedForDedup && now459 - (session._lastFinalAt || 0) < 1500) {
|
||
console.log(`[NativeVoice] duplicate final ignored (event=${message.event}) session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 80))}`);
|
||
return;
|
||
}
|
||
session._lastFinalNormalized = normalizedForDedup;
|
||
session._lastFinalAt = now459;
|
||
// TTS回声检测(final级别):播放期间ASR最终识别文本如果是当前播放文本的子串,判定为回声
|
||
const isDirectSpeaking459 = session.directSpeakUntil && now459 < session.directSpeakUntil;
|
||
const isChatTTSSpeaking459 = session.isSendingChatTTSText && (session.chatTTSUntil || 0) > now459;
|
||
if ((isDirectSpeaking459 || isChatTTSSpeaking459) && session.currentSpeechText && finalText) {
|
||
const normalizedFinal = finalText.replace(/[,。!?、,.\s]/g, '');
|
||
const normalizedSpeech = session.currentSpeechText.replace(/[,。!?、,.\s]/g, '');
|
||
if (normalizedFinal.length <= 4 || normalizedSpeech.includes(normalizedFinal)) {
|
||
console.log(`[NativeVoice] TTS echo detected in final, ignoring session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 80))}`);
|
||
return;
|
||
}
|
||
}
|
||
// Greeting保护窗口
|
||
if (session.greetingProtectionUntil && now459 < session.greetingProtectionUntil && finalText) {
|
||
console.log(`[NativeVoice] greeting protection active, ignoring final session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 80))}`);
|
||
return;
|
||
}
|
||
console.log(`[NativeVoice] upstream final session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 120))}`);
|
||
if (session.directSpeakUntil && Date.now() < session.directSpeakUntil) {
|
||
console.log(`[NativeVoice] user interrupt during speaking session=${session.sessionId}`);
|
||
session.directSpeakUntil = 0;
|
||
session.isSendingChatTTSText = false;
|
||
session.chatTTSUntil = 0;
|
||
session.currentSpeechText = '';
|
||
clearTimeout(session.chatTTSTimer);
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
|
||
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
|
||
clearUpstreamSuppression(session);
|
||
}
|
||
} else if (session.isSendingChatTTSText && (session.chatTTSUntil || 0) > Date.now()) {
|
||
console.log(`[NativeVoice] user interrupt chatTTS during speaking session=${session.sessionId}`);
|
||
session.isSendingChatTTSText = false;
|
||
session.chatTTSUntil = 0;
|
||
session.currentSpeechText = '';
|
||
clearTimeout(session.chatTTSTimer);
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
|
||
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
|
||
clearUpstreamSuppression(session);
|
||
}
|
||
}
|
||
if (persistUserSpeech(session, rawFinalText || finalText)) {
|
||
session.blockUpstreamAudio = true;
|
||
session.currentTtsType = 'default';
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'new_turn' });
|
||
processReply(session, finalText).catch((error) => {
|
||
console.error('[NativeVoice] processReply error:', error.message);
|
||
});
|
||
}
|
||
return;
|
||
}
|
||
|
||
sendJson(session.client, {
|
||
type: 'event',
|
||
event: message.event,
|
||
payload,
|
||
});
|
||
}
|
||
|
||
function attachClientHandlers(session) {
|
||
session.client.on('message', async (raw, isBinary) => {
|
||
if (isBinary) {
|
||
if (session.upstream && session.upstream.readyState === WebSocket.OPEN && session.upstreamReady) {
|
||
session.upstream.send(createAudioMessage(session.sessionId, raw));
|
||
resetAudioKeepalive(session);
|
||
}
|
||
return;
|
||
}
|
||
|
||
let parsed;
|
||
try {
|
||
parsed = JSON.parse(raw.toString('utf8'));
|
||
} catch (error) {
|
||
sendJson(session.client, { type: 'error', error: 'invalid client json' });
|
||
return;
|
||
}
|
||
|
||
if (parsed.type === 'start') {
|
||
session.userId = parsed.userId || session.userId || null;
|
||
const remoteProfileResult = await getAssistantProfile({ userId: session.userId });
|
||
const assistantProfile = resolveAssistantProfile({
|
||
...(remoteProfileResult.profile || {}),
|
||
...(session.assistantProfile || {}),
|
||
...((parsed.assistantProfile && typeof parsed.assistantProfile === 'object') ? parsed.assistantProfile : {}),
|
||
});
|
||
session.assistantProfile = assistantProfile;
|
||
session.botName = parsed.botName || getAssistantDisplayName(assistantProfile) || DEFAULT_VOICE_BOT_NAME;
|
||
session.systemRole = buildVoiceSystemRole(assistantProfile);
|
||
session.speakingStyle = parsed.speakingStyle || session.speakingStyle || DEFAULT_VOICE_SPEAKING_STYLE;
|
||
session.speaker = parsed.speaker || process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts';
|
||
session.modelVersion = parsed.modelVersion || 'O';
|
||
session.greetingText = buildVoiceGreeting(assistantProfile);
|
||
// 立即发送 ready,不等 upstream event 150,大幅缩短前端等待时间
|
||
sendReady(session);
|
||
session.upstream = createUpstreamConnection(session);
|
||
loadHandoffSummaryForVoice(session).catch((error) => {
|
||
console.warn('[NativeVoice] async loadHandoffSummaryForVoice failed:', error.message);
|
||
});
|
||
return;
|
||
}
|
||
|
||
if (parsed.type === 'stop') {
|
||
session.client.close();
|
||
return;
|
||
}
|
||
|
||
if (parsed.type === 'replay_greeting') {
|
||
replayGreeting(session).catch((error) => {
|
||
console.warn('[NativeVoice] replayGreeting failed:', error.message);
|
||
});
|
||
return;
|
||
}
|
||
|
||
if (parsed.type === 'text' && parsed.text) {
|
||
if (persistUserSpeech(session, parsed.text)) {
|
||
processReply(session, parsed.text, session.latestUserTurnSeq).catch((error) => {
|
||
console.error('[NativeVoice] text processReply failed:', error.message);
|
||
});
|
||
}
|
||
}
|
||
});
|
||
|
||
session.client.on('close', () => {
|
||
clearTimeout(session.chatTTSTimer);
|
||
clearTimeout(session.greetingTimer);
|
||
clearTimeout(session.greetingAckTimer);
|
||
clearTimeout(session.readyTimer);
|
||
clearTimeout(session.suppressReplyTimer);
|
||
clearTimeout(session.idleTimer);
|
||
clearInterval(session._audioKeepaliveTimer);
|
||
if (session.upstream && session.upstream.readyState === WebSocket.OPEN) {
|
||
session.upstream.close();
|
||
}
|
||
// 会话结束时持久化摘要到 MySQL
|
||
persistFinalSummary(session).catch((err) => {
|
||
console.warn('[NativeVoice] persistFinalSummary failed:', err.message);
|
||
});
|
||
sessions.delete(session.sessionId);
|
||
});
|
||
}
|
||
|
||
function createUpstreamConnection(session) {
|
||
const upstream = new WebSocket('wss://openspeech.bytedance.com/api/v3/realtime/dialogue', {
|
||
headers: {
|
||
'X-Api-Resource-Id': 'volc.speech.dialog',
|
||
'X-Api-Access-Key': process.env.VOLC_S2S_TOKEN,
|
||
'X-Api-App-Key': process.env.VOLC_DIALOG_APP_KEY || 'PlgvMymc7f3tQnJ6',
|
||
'X-Api-App-ID': process.env.VOLC_S2S_APP_ID,
|
||
'X-Api-Connect-Id': session.sessionId,
|
||
},
|
||
});
|
||
|
||
upstream.on('open', () => {
|
||
upstream.send(createStartConnectionMessage());
|
||
upstream.send(createStartSessionMessage(session.sessionId, buildStartSessionPayload(session)));
|
||
});
|
||
|
||
upstream.on('message', (data, isBinary) => {
|
||
if (!isBinary && typeof data === 'string') {
|
||
sendJson(session.client, { type: 'server_text', text: data });
|
||
return;
|
||
}
|
||
handleUpstreamMessage(session, Buffer.isBuffer(data) ? data : Buffer.from(data));
|
||
});
|
||
|
||
upstream.on('error', (error) => {
|
||
console.error('[NativeVoice] upstream ws error:', error.message);
|
||
sendJson(session.client, { type: 'error', error: `语音服务连接异常: ${error.message}` });
|
||
});
|
||
|
||
upstream.on('close', (code) => {
|
||
console.log(`[NativeVoice] upstream closed session=${session.sessionId} code=${code}`);
|
||
session.upstreamReady = false;
|
||
clearInterval(session._audioKeepaliveTimer);
|
||
session._audioKeepaliveTimer = null;
|
||
sendJson(session.client, { type: 'upstream_closed', code });
|
||
setTimeout(() => {
|
||
if (session.client && session.client.readyState === WebSocket.OPEN) {
|
||
session.client.close();
|
||
}
|
||
}, 3000);
|
||
});
|
||
|
||
return upstream;
|
||
}
|
||
|
||
function createSession(client, sessionId) {
|
||
const assistantProfile = resolveAssistantProfile();
|
||
const session = {
|
||
sessionId,
|
||
client,
|
||
upstream: null,
|
||
upstreamReady: false,
|
||
isSendingChatTTSText: false,
|
||
latestUserText: '',
|
||
latestUserTurnSeq: 0,
|
||
queuedUserText: '',
|
||
queuedUserTurnSeq: 0,
|
||
processingReply: false,
|
||
blockUpstreamAudio: false,
|
||
directSpeakUntil: 0,
|
||
queuedReplyTimer: null,
|
||
lastPersistedAssistantText: '',
|
||
lastPersistedAssistantAt: 0,
|
||
assistantStreamBuffer: '',
|
||
assistantStreamReplyId: '',
|
||
currentTtsType: '',
|
||
currentSpeechText: '',
|
||
greetingProtectionUntil: 0,
|
||
_echoLogOnce: false,
|
||
_fillerActive: false,
|
||
_pendingExternalRagReply: false,
|
||
_pendingEvidencePack: null,
|
||
_lastPartialAt: 0,
|
||
pendingKbPrequery: null,
|
||
_kbPrequeryText: '',
|
||
_kbPrequeryStartedAt: 0,
|
||
_lastKbTopic: '',
|
||
_lastKbHitAt: 0,
|
||
assistantProfile,
|
||
botName: getAssistantDisplayName(assistantProfile),
|
||
systemRole: buildVoiceSystemRole(assistantProfile),
|
||
speakingStyle: DEFAULT_VOICE_SPEAKING_STYLE,
|
||
speaker: process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts',
|
||
modelVersion: 'O',
|
||
greetingText: buildVoiceGreeting(assistantProfile),
|
||
hasSentGreeting: false,
|
||
greetingTimer: null,
|
||
greetingAckTimer: null,
|
||
pendingGreetingAck: false,
|
||
greetingRetryCount: 0,
|
||
readyTimer: null,
|
||
readySent: false,
|
||
handoffSummary: '',
|
||
handoffSummaryUsed: false,
|
||
awaitingUpstreamReply: false,
|
||
pendingAssistantSource: null,
|
||
pendingAssistantToolName: null,
|
||
pendingAssistantMeta: null,
|
||
pendingAssistantTurnSeq: 0,
|
||
lastDeliveredAssistantTurnSeq: 0,
|
||
suppressReplyTimer: null,
|
||
suppressUpstreamUntil: 0,
|
||
idleTimer: null,
|
||
lastActivityAt: Date.now(),
|
||
_lastBargeInResetAt: 0,
|
||
_audioBlockLogOnce: false,
|
||
_lastFinalNormalized: '',
|
||
_lastFinalAt: 0,
|
||
_audioKeepaliveTimer: null,
|
||
_turnCount: 0,
|
||
};
|
||
sessions.set(sessionId, session);
|
||
attachClientHandlers(session);
|
||
return session;
|
||
}
|
||
|
||
function setupNativeVoiceGateway(server) {
|
||
const wss = new WebSocketServer({ server, path: '/ws/realtime-dialog' });
|
||
wss.on('connection', async (client, req) => {
|
||
const parsed = url.parse(req.url, true);
|
||
const sessionId = parsed.query?.sessionId;
|
||
if (!sessionId) {
|
||
client.close();
|
||
return;
|
||
}
|
||
const userId = parsed.query?.userId || null;
|
||
const session = createSession(client, sessionId);
|
||
session.userId = userId;
|
||
try {
|
||
await db.createSession(sessionId, userId, 'voice');
|
||
} catch (error) {
|
||
console.warn('[NativeVoice][DB] createSession failed:', error.message);
|
||
}
|
||
sendJson(client, { type: 'connected', sessionId });
|
||
});
|
||
return wss;
|
||
}
|
||
|
||
module.exports = {
|
||
setupNativeVoiceGateway,
|
||
};
|