Files
bigwo/test2/server/services/nativeVoiceGateway.js
User fe25229de7 feat: conversation long-term memory + fix source ENUM bug
- New: conversationSummarizer.js (LLM summary every 3 turns, loadBestSummary, persistFinalSummary)
- db/index.js: conversation_summaries table, upsertConversationSummary, getSessionSummary
- redisClient.js: setSummary/getSummary (TTL 2h)
- nativeVoiceGateway.js: _turnCount tracking, trigger summarize, persist on close
- realtimeDialogRouting.js: inject summary context, reduce history 5->3 rounds
- Fix: messages source ENUM missing 'search_knowledge' causing chat DB writes to fail
2026-04-03 10:19:16 +08:00

1272 lines
57 KiB
JavaScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const { WebSocket, WebSocketServer } = require('ws');
const url = require('url');
const db = require('../db');
const { correctAsrText } = require('./fastAsrCorrector');
const contextKeywordTracker = require('./contextKeywordTracker');
const { isBrandHarmful, getVoiceSafeReply, BRAND_HARMFUL_PATTERN, BRAND_POSITIVE_LEGALITY_PATTERN } = require('./contentSafeGuard');
const {
MsgType,
unmarshal,
createStartConnectionMessage,
createStartSessionMessage,
createAudioMessage,
createChatTTSTextMessage,
createSayHelloMessage,
createChatRAGTextMessage,
} = require('./realtimeDialogProtocol');
const {
getRuleBasedDirectRouteDecision,
normalizeKnowledgeAlias,
normalizeTextForSpeech,
splitTextForSpeech,
estimateSpeechDurationMs,
shouldForceKnowledgeRoute,
isPureChitchat,
resolveReply,
} = require('./realtimeDialogRouting');
const ToolExecutor = require('./toolExecutor');
const {
DEFAULT_VOICE_ASSISTANT_PROFILE,
DEFAULT_CONSULTANT_CONTACT,
resolveAssistantProfile,
getAssistantDisplayName,
buildVoiceSystemRole,
buildVoiceGreeting,
} = require('./assistantProfileConfig');
const { getAssistantProfile } = require('./assistantProfileService');
const redisClient = require('./redisClient');
const { checkProductLinkTrigger } = require('./productLinkTrigger');
const { triggerSummarizeIfNeeded, persistFinalSummary } = require('./conversationSummarizer');
const sessions = new Map();
const CONSULTANT_REFERRAL_PATTERN = /咨询(?:专业|你的)?顾问|健康管理顾问|联系顾问|一对一指导|咨询专业|咨询医生|咨询营养师|咨询专业人士|建议.*咨询|问问医生|问问.*营养师/;
const IDLE_TIMEOUT_MS = 5 * 60 * 1000;
const AUDIO_KEEPALIVE_INTERVAL_MS = 20 * 1000;
// 3200 bytes ≈ 66ms of silence at 24kHz s16le mono (larger frame to ensure S2S acceptance)
const SILENT_AUDIO_FRAME = Buffer.alloc(3200, 0);
const DEFAULT_VOICE_BOT_NAME = getAssistantDisplayName(DEFAULT_VOICE_ASSISTANT_PROFILE);
const DEFAULT_VOICE_SYSTEM_ROLE = buildVoiceSystemRole();
const DEFAULT_VOICE_SPEAKING_STYLE = '整体语气亲切自然、轻快有温度,像熟悉行业的朋友在语音聊天。优先短句和口语化表达,先给结论,再补一句最有帮助的信息。不要播音腔,不要念稿,不要客服腔,不要过度热情,也不要输出任何思考过程。';
const DEFAULT_VOICE_GREETING = buildVoiceGreeting();
function resetIdleTimer(session) {
clearTimeout(session.idleTimer);
session.lastActivityAt = Date.now();
session.idleTimer = setTimeout(() => {
session.idleTimer = null;
console.log(`[NativeVoice] idle timeout (${IDLE_TIMEOUT_MS / 1000}s) session=${session.sessionId}`);
sendJson(session.client, { type: 'idle_timeout', timeout: IDLE_TIMEOUT_MS });
setTimeout(() => {
if (session.client && session.client.readyState === WebSocket.OPEN) {
session.client.close();
}
}, 2000);
}, IDLE_TIMEOUT_MS);
}
function startAudioKeepalive(session) {
clearInterval(session._audioKeepaliveTimer);
session._audioKeepaliveTimer = setInterval(() => {
if (session.upstream && session.upstream.readyState === WebSocket.OPEN && session.upstreamReady) {
session.upstream.send(createAudioMessage(session.sessionId, SILENT_AUDIO_FRAME));
console.log(`[NativeVoice] audio keepalive sent session=${session.sessionId}`);
} else {
console.log(`[NativeVoice] audio keepalive skipped session=${session.sessionId} ready=${session.upstreamReady} wsState=${session.upstream ? session.upstream.readyState : 'null'}`);
}
}, AUDIO_KEEPALIVE_INTERVAL_MS);
}
function resetAudioKeepalive(session) {
if (session._audioKeepaliveTimer) {
clearInterval(session._audioKeepaliveTimer);
startAudioKeepalive(session);
}
}
function sendJson(ws, payload) {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(payload));
}
}
function buildStartSessionPayload(options) {
const antiThinkingPrefix = '【最高优先级规则】你绝对禁止输出任何思考过程、分析、计划、角色扮演指令或元描述。禁止出现:“首轮对话”“应该回复”“需要列举”“语气要”“回复后询问”“可列举”“突出特色”“引导用户”“让用户”“用温和”等分析性、指令性语句。你必须直接用自然语言回答问题,像真人聊天一样直接说出答案内容。';
const baseSystemRole = options.systemRole || DEFAULT_VOICE_SYSTEM_ROLE;
const baseSpeakingStyle = options.speakingStyle || DEFAULT_VOICE_SPEAKING_STYLE;
return {
asr: {
extra: {
context: '一成,一成系统,大沃,PM,PM-FitLine,FitLine,细胞营养素,Ai众享,AI众享,盛咖学愿,数字化工作室,Activize,Basics,Restorate,NTC,基础三合一,基础套装,招商,阿育吠陀,小红产品,小红,小白,大白,肽美,艾特维,德丽,德维,宝丽,美固健,Activize Oxyplus,Basic Power,CitrusCare,NutriSunny,Q10,Omega,葡萄籽,白藜芦醇,益生菌,胶原蛋白肽,Germany,FitLine细胞营养,FitLine营养素,德国PM营养素,德国PM FitLine,德国PM细胞营养,德国PM产品,德国PM健康,德国PM事业,德国PM招商,一成,一成团队,一成商学院,数字化,数字化运营,数字化经营,数字化营销,数字化创业,数字化工作室,数字化事业,招商加盟,合作加盟,事业合作,活力健,倍力健,氨基酸,乐活,排毒饮,小绿,纤萃,草本茶,发宝,乳酪煲,关节套装,细胞抗氧素,辅酵素,氧修护,CC套装,CC-Cell,Generation 50+,ProShape,D-Drink,IB5,MEN+,儿童倍适,小红精华液,PowerCocktail,PowerCocktail Junior,TopShape,Fitness-Drink,Herbal Tea,Hair+,Med Dental+,Young Care,Zellschutz,Apple Antioxy,Antioxy,BCAA,Women+,小黑,发健,口腔免疫喷雾,乳清蛋白,男士护肤,去角质,面膜,叶黄素,维适多,护理牙膏,火炉原理,暖炉原理,运动饮料,健康饮品,好转反应,整健反应,骨骼健,顾心,舒采健,衡醇饮,小粉C,异黄酮,倍适,眼霜,洁面,爽肤水',
boosting_table_id: 'ab4fde15-79b5-47e9-82b6-5125cca39f63',
nbest: 1,
},
},
tts: {
speaker: options.speaker || process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts',
audio_config: {
channel: 1,
format: 'pcm_s16le',
sample_rate: 24000,
},
},
dialog: {
dialog_id: '',
bot_name: options.botName || '大沃',
system_role: normalizeTextForSpeech(`${antiThinkingPrefix} ${baseSystemRole}`),
speaking_style: normalizeTextForSpeech(baseSpeakingStyle),
extra: {
input_mod: 'audio',
model: options.modelVersion || 'SC2.0',
strict_audit: false,
audit_response: '抱歉,这个问题我暂时无法回答。',
},
},
};
}
function parseJsonPayload(message) {
try {
return JSON.parse(message.payload.toString('utf8'));
} catch (error) {
return null;
}
}
function extractRawText(jsonPayload) {
const text = jsonPayload?.text
|| jsonPayload?.content
|| jsonPayload?.results?.[0]?.text
|| jsonPayload?.results?.[0]?.alternatives?.[0]?.text
|| '';
return String(text || '').trim();
}
function extractUserText(jsonPayload, sessionId = null) {
let text = extractRawText(jsonPayload);
text = correctAsrText(text);
if (sessionId) {
contextKeywordTracker.updateSession(sessionId, normalizeKnowledgeAlias(text));
}
return text;
}
const THINKING_PATTERN = /^(首轮对话|用户想|用户问|应该回复|需要列举|可列举|突出特色|引导进一步|引导用户|让用户|回复后询问|语气要|用温和|需热情|需简洁|需专业)/;
const THINKING_MID_PATTERN = /(?:需客观回复|应说明其|回复后询问|引导.*对话|用.*口吻回复|语气要.*热情|需要.*引导|应该.*回复|先.*再.*最后)/;
function sanitizeAssistantText(text) {
if (!text) return text;
if (isBrandHarmful(text)) {
console.warn(`[NativeVoice][SafeGuard] blocked harmful content: ${JSON.stringify(text.slice(0, 200))}`);
return getVoiceSafeReply();
}
if (THINKING_PATTERN.test(text.trim())) {
console.warn(`[NativeVoice][SafeGuard] blocked thinking output: ${JSON.stringify(text.slice(0, 200))}`);
return null;
}
return text;
}
function isFinalUserPayload(jsonPayload) {
if (jsonPayload?.is_final === true) {
return true;
}
if (Array.isArray(jsonPayload?.results)) {
return jsonPayload.results.some((item) => item && item.is_interim === false);
}
return false;
}
function persistUserSpeech(session, text) {
const cleanText = (text || '').trim();
if (!cleanText) return false;
const now = Date.now();
if (session.lastPersistedUserText === cleanText && now - (session.lastPersistedUserAt || 0) < 5000) {
return false;
}
session.lastPersistedUserText = cleanText;
session.lastPersistedUserAt = now;
session.latestUserText = cleanText;
session.latestUserTurnSeq = (session.latestUserTurnSeq || 0) + 1;
session._turnCount = (session._turnCount || 0) + 1;
resetIdleTimer(session);
db.addMessage(session.sessionId, 'user', cleanText, 'voice_asr').catch((e) => console.warn('[NativeVoice][DB] add user failed:', e.message));
redisClient.pushMessage(session.sessionId, { role: 'user', content: cleanText, source: 'voice_asr' }).catch(() => {});
sendJson(session.client, {
type: 'subtitle',
role: 'user',
text: cleanText,
isFinal: true,
sequence: `native_user_${now}`,
});
return true;
}
function persistAssistantSpeech(session, text, { source = 'voice_bot', toolName = null, persistToDb = true, meta = null } = {}) {
const cleanText = sanitizeAssistantText((text || '').trim());
if (!cleanText) return false;
const now = Date.now();
if (session.lastPersistedAssistantText === cleanText && now - (session.lastPersistedAssistantAt || 0) < 5000) {
return false;
}
session.lastPersistedAssistantText = cleanText;
session.lastPersistedAssistantAt = now;
resetIdleTimer(session);
if (persistToDb) {
db.addMessage(session.sessionId, 'assistant', cleanText, source, toolName, meta).catch((e) => console.warn('[NativeVoice][DB] add assistant failed:', e.message));
redisClient.pushMessage(session.sessionId, { role: 'assistant', content: cleanText, source }).catch(() => {});
}
sendJson(session.client, {
type: 'subtitle',
role: 'assistant',
text: cleanText,
isFinal: true,
source,
toolName,
sequence: `native_assistant_${now}`,
});
// 异步触发摘要检查每N轮
if (persistToDb) {
triggerSummarizeIfNeeded(session, session.sessionId);
}
if (CONSULTANT_REFERRAL_PATTERN.test(cleanText)) {
const contact = session.consultantContact || DEFAULT_CONSULTANT_CONTACT;
if (contact.mobile || contact.wx_qr_code || contact.wechat_id) {
console.log(`[NativeVoice] consultant referral detected session=${session.sessionId} text=${JSON.stringify(cleanText.slice(0, 80))}`);
sendJson(session.client, {
type: 'consultant_contact',
name: contact.name || '大沃专业健康管理顾问',
mobile: contact.mobile || '',
wx_qr_code: contact.wx_qr_code || '',
wechat_id: contact.wechat_id || '',
message: '如需个性化健康建议,可联系大沃专业健康管理顾问',
});
}
}
return true;
}
function appendAssistantStream(session, payload) {
const chunkText = extractRawText(payload);
if (!chunkText) {
return '';
}
const replyId = payload?.reply_id || '';
if (replyId && session.assistantStreamReplyId && session.assistantStreamReplyId !== replyId) {
session.assistantStreamBuffer = '';
}
session.assistantStreamReplyId = replyId || session.assistantStreamReplyId || '';
session.assistantStreamBuffer = `${session.assistantStreamBuffer || ''}${chunkText}`;
return session.assistantStreamBuffer;
}
function flushAssistantStream(session, { source = 'voice_bot', toolName = null, meta = null } = {}) {
const fullText = (session.assistantStreamBuffer || '').trim();
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
if (!fullText) {
return false;
}
return persistAssistantSpeech(session, fullText, { source, toolName, meta });
}
async function loadHandoffSummaryForVoice(session) {
try {
const history = await db.getHistoryForLLM(session.sessionId, 10);
if (!history.length) {
session.handoffSummary = '';
session.handoffSummaryUsed = false;
return;
}
session.handoffSummary = buildDeterministicHandoffSummary(history);
session.handoffSummaryUsed = false;
console.log(`[NativeVoice] Handoff summary prepared for ${session.sessionId}: ${session.handoffSummary ? 'yes' : 'no'}`);
} catch (error) {
session.handoffSummary = '';
session.handoffSummaryUsed = false;
console.warn('[NativeVoice] loadHandoffSummaryForVoice failed:', error.message);
}
}
function buildDeterministicHandoffSummary(messages = []) {
const normalizedMessages = (Array.isArray(messages) ? messages : [])
.filter((item) => item && (item.role === 'user' || item.role === 'assistant') && String(item.content || '').trim())
.slice(-8);
if (!normalizedMessages.length) {
return '';
}
const userMessages = normalizedMessages.filter((item) => item.role === 'user');
const currentQuestion = String(userMessages[userMessages.length - 1]?.content || '').trim();
const previousQuestion = String(userMessages[userMessages.length - 2]?.content || '').trim();
const assistantFacts = normalizedMessages
.filter((item) => item.role === 'assistant')
.slice(-2)
.map((item) => String(item.content || '').trim())
.filter(Boolean)
.map((item) => item.slice(0, 60))
.join('');
const parts = [];
if (currentQuestion) {
parts.push(`当前问题:${currentQuestion}`);
}
if (previousQuestion && previousQuestion !== currentQuestion) {
parts.push(`上一轮关注:${previousQuestion}`);
}
if (assistantFacts) {
parts.push(`已给信息:${assistantFacts}`);
}
return parts.join('');
}
async function sendSpeechText(session, speechText) {
const chunks = splitTextForSpeech(speechText);
if (!chunks.length || !session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
return;
}
console.log(`[NativeVoice] sendSpeechText start session=${session.sessionId} chunks=${chunks.length} textLen=${speechText.length}`);
session.currentSpeechText = speechText;
session.isSendingChatTTSText = true;
session.currentTtsType = 'chat_tts_text';
session.chatTTSUntil = Date.now() + estimateSpeechDurationMs(speechText) + 800;
clearTimeout(session.chatTTSTimer);
session.chatTTSTimer = setTimeout(() => {
session.chatTTSTimer = null;
if ((session.chatTTSUntil || 0) <= Date.now()) {
session.isSendingChatTTSText = false;
}
}, Math.max(200, session.chatTTSUntil - Date.now() + 50));
sendJson(session.client, { type: 'tts_reset', ttsType: 'chat_tts_text' });
for (let index = 0; index < chunks.length; index += 1) {
const chunk = chunks[index];
console.log(`[NativeVoice] sendSpeechText chunk session=${session.sessionId} index=${index + 1}/${chunks.length} len=${chunk.length} start=${index === 0} end=false text=${JSON.stringify(chunk.slice(0, 80))}`);
session.upstream.send(createChatTTSTextMessage(session.sessionId, {
start: index === 0,
end: false,
content: chunk,
}));
}
console.log(`[NativeVoice] sendSpeechText end session=${session.sessionId}`);
session.upstream.send(createChatTTSTextMessage(session.sessionId, {
start: false,
end: true,
content: '',
}));
}
function sendReady(session) {
if (session.readySent) {
return;
}
session.readySent = true;
sendJson(session.client, { type: 'ready' });
}
function sendGreeting(session) {
if (session.hasSentGreeting) {
sendReady(session);
return;
}
session.hasSentGreeting = true;
const greetingText = session.greetingText || DEFAULT_VOICE_GREETING;
console.log(`[NativeVoice] sendGreeting session=${session.sessionId} text=${JSON.stringify(greetingText.slice(0, 80))}`);
persistAssistantSpeech(session, greetingText, { source: 'voice_bot' });
clearTimeout(session.greetingTimer);
clearTimeout(session.readyTimer);
session.greetingSentAt = Date.now();
session.greetingProtectionUntil = Date.now() + 2000;
session.currentSpeechText = greetingText;
try {
session.upstream.send(createSayHelloMessage(session.sessionId, greetingText));
console.log(`[NativeVoice] sendSayHello event=300 session=${session.sessionId}`);
} catch (error) {
session.hasSentGreeting = false;
session.greetingProtectionUntil = 0;
console.warn('[NativeVoice] SayHello failed:', error.message);
}
sendReady(session);
}
async function replayGreeting(session) {
const greetingText = String(session.greetingText || '').trim();
if (!greetingText || !session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
return;
}
if (session.greetingSentAt && Date.now() - session.greetingSentAt < 6000) {
console.log(`[NativeVoice] replayGreeting skipped (too soon) session=${session.sessionId}`);
return;
}
console.log(`[NativeVoice] replayGreeting session=${session.sessionId} text=${JSON.stringify(greetingText.slice(0, 80))}`);
session.greetingSentAt = Date.now();
session.directSpeakUntil = Date.now() + estimateSpeechDurationMs(greetingText) + 800;
try {
session.upstream.send(createSayHelloMessage(session.sessionId, greetingText));
} catch (error) {
console.warn('[NativeVoice] replayGreeting SayHello failed:', error.message);
}
}
async function sendExternalRag(session, items) {
if (!session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
return;
}
const ragItems = Array.isArray(items) ? items.filter((item) => item && item.content) : [];
if (!ragItems.length) {
return;
}
session.upstream.send(createChatRAGTextMessage(session.sessionId, JSON.stringify(ragItems)));
}
function clearUpstreamSuppression(session) {
clearTimeout(session.suppressReplyTimer);
session.suppressReplyTimer = null;
session.suppressUpstreamUntil = 0;
session.awaitingUpstreamReply = false;
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
session._pendingEvidencePack = null;
session.pendingAssistantTurnSeq = 0;
session.blockUpstreamAudio = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
}
function suppressUpstreamReply(session, durationMs) {
clearTimeout(session.suppressReplyTimer);
session.awaitingUpstreamReply = true;
session.suppressUpstreamUntil = Date.now() + Math.max(1000, durationMs);
session.suppressReplyTimer = setTimeout(() => {
session.suppressReplyTimer = null;
if ((session.suppressUpstreamUntil || 0) > Date.now()) {
return;
}
clearUpstreamSuppression(session);
}, Math.max(300, session.suppressUpstreamUntil - Date.now()));
}
async function processReply(session, text, turnSeq = session.latestUserTurnSeq || 0) {
const cleanText = (text || '').trim();
if (!cleanText) return;
if (session.processingReply) {
session.queuedUserText = cleanText;
session.queuedUserTurnSeq = turnSeq;
console.log(`[NativeVoice] processReply queued(busy) session=${session.sessionId} text=${JSON.stringify(cleanText.slice(0, 80))}`);
return;
}
const now = Date.now();
if (session.directSpeakUntil && now < session.directSpeakUntil) {
session.queuedUserText = cleanText;
session.queuedUserTurnSeq = turnSeq;
console.log(`[NativeVoice] processReply queued(speaking) session=${session.sessionId} waitMs=${session.directSpeakUntil - now} text=${JSON.stringify(cleanText.slice(0, 80))}`);
return;
}
const activeTurnSeq = turnSeq || session.latestUserTurnSeq || 0;
session.processingReply = true;
sendJson(session.client, { type: 'assistant_pending', active: true });
// KB-First: 所有非闲聊查询都视为KB候选阻断S2S音频等待KB结果
let isKnowledgeCandidate = !isPureChitchat(cleanText);
if (isKnowledgeCandidate) {
session.blockUpstreamAudio = true;
suppressUpstreamReply(session, 30000);
sendJson(session.client, { type: 'tts_reset', reason: 'processing' });
// 过渡语已移除KB查询优化后延迟已降至~2.6s,无需填充
session._fillerActive = false;
}
console.log(`[NativeVoice] processReply start session=${session.sessionId} text=${JSON.stringify(cleanText.slice(0, 120))} blocked=${session.blockUpstreamAudio} kbCandidate=${isKnowledgeCandidate}`);
try {
// KB预查缓存消费如果partial阶段已启动预查且文本匹配直接使用缓存结果
let resolveResult = null;
if (isKnowledgeCandidate && session.pendingKbPrequery && session._kbPrequeryText) {
const preText = (session._kbPrequeryText || '').replace(/[啊哦嗯呢呀哎诶额,。!?、,.\s]/g, '');
const finalText = cleanText.replace(/[啊哦嗯呢呀哎诶额,。!?、,.\s]/g, '');
// 放宽相似度:子串包含 或 重叠字符占比>=60% 即视为匹配
let isSimilar = preText && finalText && (finalText.includes(preText) || preText.includes(finalText));
if (!isSimilar && preText && finalText) {
const shorter = preText.length <= finalText.length ? preText : finalText;
const longer = preText.length <= finalText.length ? finalText : preText;
let overlap = 0;
for (let i = 0; i < shorter.length; i++) {
if (longer.includes(shorter[i])) overlap++;
}
isSimilar = overlap / shorter.length >= 0.45;
}
if (isSimilar) {
const prequeryResult = await session.pendingKbPrequery;
// 只复用 hit 结果no-hit 可能因 partial 文本路由不完整,用完整文本 re-search
if (prequeryResult && prequeryResult.delivery !== 'upstream_chat') {
console.log(`[NativeVoice] using KB prequery cache (hit) session=${session.sessionId} preText=${JSON.stringify(session._kbPrequeryText.slice(0, 60))}`);
resolveResult = prequeryResult;
} else {
console.log(`[NativeVoice] prequery no-hit, re-searching with full text session=${session.sessionId} preText=${JSON.stringify((session._kbPrequeryText || '').slice(0, 40))} finalText=${JSON.stringify(cleanText.slice(0, 40))}`);
}
} else {
console.log(`[NativeVoice] KB prequery text mismatch, re-querying session=${session.sessionId} pre=${JSON.stringify(preText.slice(0, 40))} final=${JSON.stringify(finalText.slice(0, 40))}`);
}
}
session.pendingKbPrequery = null;
session._kbPrequeryText = '';
session._kbPrequeryStartedAt = 0;
if (!resolveResult) {
resolveResult = await resolveReply(session.sessionId, session, cleanText);
}
const { delivery, speechText, ragItems, source, toolName, routeDecision, responseMeta, evidencePack } = resolveResult;
// 产品链接触发检测:用户请求查看产品详情时推送对应链接
const productLinkResult = checkProductLinkTrigger(cleanText);
if (productLinkResult.triggered && productLinkResult.product) {
console.log(`[NativeVoice] product link triggered session=${session.sessionId} product=${productLinkResult.product.name}`);
sendJson(session.client, {
type: 'product_link',
product: productLinkResult.product.name,
link: productLinkResult.product.link,
description: productLinkResult.product.description,
});
}
if (activeTurnSeq !== (session.latestUserTurnSeq || 0)) {
console.log(`[NativeVoice] stale reply ignored session=${session.sessionId} activeTurn=${activeTurnSeq} latestTurn=${session.latestUserTurnSeq || 0}`);
clearUpstreamSuppression(session);
return;
}
if (delivery === 'upstream_chat') {
// kbCandidate 但 S2S 未调工具 → 放开 S2S 自然回复
// 依赖1) system prompt 品牌保护指令引导 S2S 调工具 2) isBrandHarmful 流式拦截兜底
if (isKnowledgeCandidate) {
console.log(`[NativeVoice] processReply kbCandidate+upstream_chat, unblock S2S session=${session.sessionId}`);
}
session.blockUpstreamAudio = false;
session._lastPartialAt = 0;
session._pendingEvidencePack = null;
session.awaitingUpstreamReply = true;
session.pendingAssistantSource = 'voice_bot';
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = responseMeta;
session.pendingAssistantTurnSeq = activeTurnSeq;
console.log(`[NativeVoice] processReply handoff session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=upstream_chat`);
return;
}
if (delivery === 'external_rag') {
if (!session.blockUpstreamAudio) {
session.blockUpstreamAudio = true;
}
session.discardNextAssistantResponse = true;
sendJson(session.client, { type: 'tts_reset', reason: 'knowledge_hit' });
const ragContent = (ragItems || []).filter((item) => item && item.content && item.kind !== 'context');
if (ragContent.length > 0) {
console.log(`[NativeVoice] processReply sending external_rag to S2S session=${session.sessionId} route=${routeDecision?.route || 'unknown'} items=${ragContent.length}`);
// KB话题记忆记录本轮用户原始问题和时间戳用于保护窗口和追问enrichment
if (responseMeta?.hit !== false && responseMeta?.reason !== 'honest_fallback') {
session._lastKbTopic = cleanText;
session._lastKbHitAt = Date.now();
}
session._pendingEvidencePack = evidencePack || null;
// 不提前发KB原文作字幕等S2S event 351返回实际语音文本后再更新字幕
// 这样字幕和语音保持一致S2S会基于RAG内容生成自然口语化的回答
session._pendingExternalRagReply = true;
await sendExternalRag(session, ragContent);
session.awaitingUpstreamReply = true;
session.pendingAssistantSource = source;
session.pendingAssistantToolName = toolName;
session.pendingAssistantMeta = responseMeta;
session.pendingAssistantTurnSeq = activeTurnSeq;
} else {
console.log(`[NativeVoice] processReply external_rag empty content, fallback to upstream session=${session.sessionId}`);
session.blockUpstreamAudio = false;
clearUpstreamSuppression(session);
}
return;
}
if (!speechText) {
console.log(`[NativeVoice] processReply empty session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=${delivery || 'unknown'}`);
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
return;
}
console.log(`[NativeVoice] processReply resolved session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=local_tts source=${source} tool=${toolName || 'chat'} speechLen=${speechText.length}`);
session.directSpeakUntil = Date.now() + estimateSpeechDurationMs(speechText) + 800;
suppressUpstreamReply(session, estimateSpeechDurationMs(speechText) + 1800);
session.lastDeliveredAssistantTurnSeq = activeTurnSeq;
persistAssistantSpeech(session, speechText, { source, toolName, meta: responseMeta });
await sendSpeechText(session, speechText);
} catch (error) {
console.error('[NativeVoice] processReply failed:', error.message);
sendJson(session.client, { type: 'error', error: error.message });
} finally {
session.processingReply = false;
if (!session.awaitingUpstreamReply) {
session.blockUpstreamAudio = false;
}
if (!session.awaitingUpstreamReply) {
sendJson(session.client, { type: 'assistant_pending', active: false });
}
const pending = session.queuedUserText;
const pendingTurnSeq = session.queuedUserTurnSeq || 0;
session.queuedUserText = '';
session.queuedUserTurnSeq = 0;
if (pending && pendingTurnSeq && pendingTurnSeq !== activeTurnSeq && (!session.directSpeakUntil || Date.now() >= session.directSpeakUntil)) {
setTimeout(() => {
session.blockUpstreamAudio = true;
processReply(session, pending, pendingTurnSeq).catch((err) => {
console.error('[NativeVoice] queued processReply failed:', err.message);
});
}, 200);
} else if (pending && pendingTurnSeq && pendingTurnSeq !== activeTurnSeq) {
const waitMs = Math.max(200, session.directSpeakUntil - Date.now() + 200);
clearTimeout(session.queuedReplyTimer);
session.queuedReplyTimer = setTimeout(() => {
session.queuedReplyTimer = null;
const queuedText = session.queuedUserText || pending;
const queuedTurnSeq = session.queuedUserTurnSeq || pendingTurnSeq;
session.queuedUserText = '';
session.queuedUserTurnSeq = 0;
session.blockUpstreamAudio = true;
processReply(session, queuedText, queuedTurnSeq).catch((err) => {
console.error('[NativeVoice] delayed queued processReply failed:', err.message);
});
}, waitMs);
}
}
}
function handleUpstreamMessage(session, data) {
let message;
try {
message = unmarshal(data);
} catch (error) {
console.warn('[NativeVoice] unmarshal failed:', error.message);
return;
}
if (message.type === MsgType.AUDIO_ONLY_SERVER) {
const isDefaultTts = !session.currentTtsType || session.currentTtsType === 'default';
const isSuppressingUpstreamAudio = (session.suppressUpstreamUntil || 0) > Date.now() && isDefaultTts;
// 用户刚停止说话后短暂阻止默认TTS音频给event 459的blockUpstreamAudio留时间生效
const isUserJustSpeaking = isDefaultTts && session._lastPartialAt && (Date.now() - session._lastPartialAt < 800);
// blockUpstreamAudio 生效时:仅放行 external_rag 和限时过渡语音频,其余全部阻断
// 修复:旧逻辑只阻断 isDefaultTts导致 chat_tts_text 窗口期 S2S 自主回复音频泄漏
const isBlockPassthrough = session.currentTtsType === 'external_rag' ||
(session._fillerActive && (session.chatTTSUntil || 0) > Date.now());
if ((session.blockUpstreamAudio && !isBlockPassthrough) || isSuppressingUpstreamAudio || isUserJustSpeaking) {
if (!session._audioBlockLogOnce) {
session._audioBlockLogOnce = true;
console.log(`[NativeVoice] audio blocked session=${session.sessionId} ttsType=${session.currentTtsType} block=${session.blockUpstreamAudio} suppress=${isSuppressingUpstreamAudio}`);
}
return;
}
session._audioBlockLogOnce = false;
if (session.client && session.client.readyState === WebSocket.OPEN) {
session.client.send(message.payload, { binary: true });
}
return;
}
const payload = parseJsonPayload(message);
if (message.type === MsgType.ERROR) {
console.error(`[NativeVoice] upstream error session=${session.sessionId} code=${message.event} payload=${message.payload.toString('utf8').slice(0, 200)}`);
sendJson(session.client, { type: 'error', error: message.payload.toString('utf8') });
return;
}
if (message.type !== MsgType.FULL_SERVER) {
return;
}
if (message.event === 150) {
session.upstreamReady = true;
console.log(`[NativeVoice] upstream ready session=${session.sessionId}`);
resetIdleTimer(session);
startAudioKeepalive(session);
sendGreeting(session);
return;
}
if (message.event === 300) {
console.log(`[NativeVoice] SayHello response session=${session.sessionId}`);
return;
}
if (message.event === 350) {
session.currentTtsType = payload?.tts_type || '';
if (payload?.tts_type === 'chat_tts_text' && session.pendingGreetingAck) {
session.pendingGreetingAck = false;
clearTimeout(session.greetingAckTimer);
session.greetingAckTimer = null;
}
if (session.blockUpstreamAudio && payload?.tts_type === 'external_rag') {
session.blockUpstreamAudio = false;
session.suppressUpstreamUntil = 0;
clearTimeout(session.suppressReplyTimer);
session.suppressReplyTimer = null;
// 注意不清除discardNextAssistantResponse让它拦截S2S默认回复的残留event 351
// 该标记会在KB回复的event 550 chunks到达时自动清除
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
// 清除过渡语的chat TTS状态确保external_rag回复不被isLocalChatTTSTextActive拦截
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
session.currentSpeechText = '';
session._fillerActive = false;
clearTimeout(session.chatTTSTimer);
sendJson(session.client, { type: 'tts_reset', reason: 'rag_response_start' });
console.log(`[NativeVoice] unblock for external_rag tts session=${session.sessionId}`);
} else if (session.blockUpstreamAudio && payload?.tts_type === 'chat_tts_text') {
console.log(`[NativeVoice] chat_tts_text started, keeping block for S2S default response session=${session.sessionId}`);
}
console.log(`[NativeVoice] upstream tts_event session=${session.sessionId} ttsType=${payload?.tts_type || ''}`);
sendJson(session.client, { type: 'tts_event', payload });
return;
}
const isLocalChatTTSTextActive = !!session.isSendingChatTTSText && (session.chatTTSUntil || 0) > Date.now();
const isSuppressingUpstreamReply = (session.suppressUpstreamUntil || 0) > Date.now();
if (message.event === 351) {
if (isLocalChatTTSTextActive || session.blockUpstreamAudio || isSuppressingUpstreamReply) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
return;
}
if (session.discardNextAssistantResponse) {
session.discardNextAssistantResponse = false;
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] discarded stale assistant response (kb-nohit retrigger) session=${session.sessionId}`);
return;
}
const pendingAssistantSource = session.pendingAssistantSource || 'voice_bot';
const pendingAssistantToolName = session.pendingAssistantToolName || null;
const pendingAssistantMeta = session.pendingAssistantMeta || null;
const pendingAssistantTurnSeq = session.pendingAssistantTurnSeq || session.latestUserTurnSeq || 0;
session.awaitingUpstreamReply = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
if (pendingAssistantTurnSeq && session.lastDeliveredAssistantTurnSeq === pendingAssistantTurnSeq) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
session._pendingEvidencePack = null;
console.log(`[NativeVoice] duplicate assistant final ignored (351) session=${session.sessionId} turn=${pendingAssistantTurnSeq}`);
return;
}
const assistantText = extractRawText(payload);
if (assistantText) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
// 过渡语的event 351不持久化直接丢弃
if (session._fillerActive) {
console.log(`[NativeVoice] discarded filler assistant text session=${session.sessionId}`);
session._fillerActive = false;
return;
}
// 清除external_rag等待标记KB回复已到达
if (session._pendingExternalRagReply) {
session._pendingExternalRagReply = false;
}
console.log(`[NativeVoice] upstream assistant session=${session.sessionId} text=${JSON.stringify(assistantText.slice(0, 120))}`);
session.lastDeliveredAssistantTurnSeq = pendingAssistantTurnSeq;
persistAssistantSpeech(session, assistantText, {
source: pendingAssistantSource,
toolName: pendingAssistantToolName,
meta: pendingAssistantMeta,
});
// KB回复完成后重新阻断音频防止下一个问题的S2S默认回复在early block前泄露
if (session.currentTtsType === 'external_rag') {
session.blockUpstreamAudio = true;
console.log(`[NativeVoice] re-blocked after KB response session=${session.sessionId}`);
}
} else {
const didFlush = flushAssistantStream(session, {
source: pendingAssistantSource,
toolName: pendingAssistantToolName,
meta: pendingAssistantMeta,
});
if (didFlush) {
session.lastDeliveredAssistantTurnSeq = pendingAssistantTurnSeq;
}
}
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
session._pendingEvidencePack = null;
return;
}
if (message.event === 550) {
// external_rag chunks到达时清除discardNextAssistantResponse默认回复的351已过或不会来
if (session.discardNextAssistantResponse && session.currentTtsType === 'external_rag') {
session.discardNextAssistantResponse = false;
console.log(`[NativeVoice] cleared discardNextAssistantResponse for external_rag stream session=${session.sessionId}`);
}
if (isLocalChatTTSTextActive || session.blockUpstreamAudio || isSuppressingUpstreamReply || session.discardNextAssistantResponse) {
return;
}
if (session.awaitingUpstreamReply) {
session.awaitingUpstreamReply = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
}
const fullText = appendAssistantStream(session, payload);
if (fullText) {
// 品牌安全检测S2S模型输出传销等负面内容时立即阻断音频并注入安全回复
if (fullText.length >= 4 && isBrandHarmful(fullText)) {
console.warn(`[NativeVoice][SafeGuard] harmful content detected in stream, blocking audio session=${session.sessionId} text=${JSON.stringify(fullText.slice(0, 120))}`);
session.blockUpstreamAudio = true;
session.discardNextAssistantResponse = true;
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
sendJson(session.client, { type: 'tts_reset', reason: 'harmful_blocked' });
// 注入安全回复语音,替代有害内容
const safeReply = getVoiceSafeReply();
persistAssistantSpeech(session, safeReply, { source: 'voice_bot' });
sendSpeechText(session, safeReply).catch((err) => {
console.warn('[NativeVoice][SafeGuard] sendSpeechText failed:', err.message);
});
return;
}
// 检测思考模式S2S模型输出分析/计划而非直接回答,立即阻断
if (fullText.length >= 10 && (THINKING_PATTERN.test(fullText.trim()) || THINKING_MID_PATTERN.test(fullText))) {
console.warn(`[NativeVoice][SafeGuard] thinking detected in stream, blocking audio session=${session.sessionId} text=${JSON.stringify(fullText.slice(0, 120))}`);
session.blockUpstreamAudio = true;
session.discardNextAssistantResponse = true;
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
sendJson(session.client, { type: 'tts_reset', reason: 'thinking_blocked' });
return;
}
console.log(`[NativeVoice] upstream assistant chunk session=${session.sessionId} len=${fullText.length} text=${JSON.stringify(fullText.slice(0, 120))}`);
}
return;
}
if (message.event === 559) {
if (isLocalChatTTSTextActive || isSuppressingUpstreamReply) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
return;
}
if (session.blockUpstreamAudio) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] blocked response ended (559), keeping block session=${session.sessionId}`);
return;
}
if (session.discardNextAssistantResponse) {
session.discardNextAssistantResponse = false;
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] discarded stale stream end (559, kb-nohit retrigger) session=${session.sessionId}`);
return;
}
// external_rag流期间阻止默认回复的559过早flush部分KB文本
if (session._pendingExternalRagReply) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] suppressed 559 flush during external_rag flow session=${session.sessionId}`);
return;
}
const pendingAssistantTurnSeq = session.pendingAssistantTurnSeq || session.latestUserTurnSeq || 0;
if (pendingAssistantTurnSeq && session.lastDeliveredAssistantTurnSeq === pendingAssistantTurnSeq) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
session._pendingEvidencePack = null;
console.log(`[NativeVoice] duplicate assistant final ignored (559) session=${session.sessionId} turn=${pendingAssistantTurnSeq}`);
return;
}
session.awaitingUpstreamReply = false;
session.blockUpstreamAudio = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
const didFlush = flushAssistantStream(session, {
source: session.pendingAssistantSource || 'voice_bot',
toolName: session.pendingAssistantToolName || null,
meta: session.pendingAssistantMeta || null,
});
if (didFlush) {
session.lastDeliveredAssistantTurnSeq = pendingAssistantTurnSeq;
}
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
session._pendingEvidencePack = null;
return;
}
if (message.event === 450 || (message.event === 451 && !isFinalUserPayload(payload))) {
const text = extractUserText(payload, session.sessionId);
if (text) {
const now = Date.now();
const isDirectSpeaking = session.directSpeakUntil && now < session.directSpeakUntil;
const isChatTTSSpeaking = session.isSendingChatTTSText && (session.chatTTSUntil || 0) > now;
// TTS回声检测播放期间如果ASR识别文本是当前播放文本的子串判定为回声忽略
if ((isDirectSpeaking || isChatTTSSpeaking) && session.currentSpeechText) {
const normalizedPartial = text.replace(/[,。!?、,.\s]/g, '');
const normalizedSpeech = session.currentSpeechText.replace(/[,。!?、,.\s]/g, '');
if (normalizedPartial.length <= 3 || normalizedSpeech.includes(normalizedPartial)) {
if (!session._echoLogOnce) {
session._echoLogOnce = true;
console.log(`[NativeVoice] TTS echo detected, ignoring partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 80))}`);
}
return;
}
session._echoLogOnce = false;
} else {
session._echoLogOnce = false;
}
// Greeting保护窗口发送问候语后短暂保护期内忽略barge-in
if (session.greetingProtectionUntil && now < session.greetingProtectionUntil) {
console.log(`[NativeVoice] greeting protection active, ignoring partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 80))}`);
return;
}
console.log(`[NativeVoice] upstream partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 120))}`);
const normalizedPartial = normalizeKnowledgeAlias(text);
session.latestUserText = normalizedPartial;
session._lastPartialAt = now;
// KB-First: 非闲聊文本一律提前阻断S2S音频防止有害内容播出
if (normalizedPartial.length >= 6 && !session.blockUpstreamAudio && !isPureChitchat(normalizedPartial)) {
session.blockUpstreamAudio = true;
session.currentTtsType = 'default';
sendJson(session.client, { type: 'tts_reset', reason: 'early_block' });
console.log(`[NativeVoice] early block: non-chitchat partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 80))}`);
// KB预查询提前启动知识库查询减少final ASR后的等待时间
const kbPrequeryDebounce = 600;
if (normalizedPartial.length >= 8 && (!session._kbPrequeryStartedAt || now - session._kbPrequeryStartedAt > kbPrequeryDebounce)) {
session._kbPrequeryStartedAt = now;
session._kbPrequeryText = normalizedPartial;
console.log(`[NativeVoice] KB prequery started session=${session.sessionId} text=${JSON.stringify(normalizedPartial.slice(0, 80))}`);
session.pendingKbPrequery = resolveReply(session.sessionId, session, normalizedPartial).catch((err) => {
console.warn(`[NativeVoice] KB prequery failed session=${session.sessionId}:`, err.message);
return null;
});
}
}
// 用户开口说话时立即打断所有 AI 播放(包括 S2S external_rag 音频)
const isS2SAudioPlaying = !session.blockUpstreamAudio && session.currentTtsType === 'external_rag';
if (isDirectSpeaking || isChatTTSSpeaking || isS2SAudioPlaying) {
console.log(`[NativeVoice] user barge-in (partial) session=${session.sessionId} direct=${isDirectSpeaking} chatTTS=${isChatTTSSpeaking} s2sRag=${isS2SAudioPlaying}`);
session.directSpeakUntil = 0;
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
session.currentSpeechText = '';
clearTimeout(session.chatTTSTimer);
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
clearUpstreamSuppression(session);
}
// 阻断 S2S 音频转发,防止用户打断后仍听到残留音频
session.blockUpstreamAudio = true;
}
// 无论当前是否在播放,都发送 tts_reset 确保客户端停止所有音频播放
if (!session._lastBargeInResetAt || now - session._lastBargeInResetAt > 500) {
session._lastBargeInResetAt = now;
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
}
sendJson(session.client, {
type: 'subtitle',
role: 'user',
text: text,
isFinal: false,
sequence: `native_partial_${Date.now()}`,
});
}
return;
}
if (message.event === 459 || (message.event === 451 && isFinalUserPayload(payload))) {
const rawFinalText = extractUserText(payload, session.sessionId) || '';
const finalText = normalizeKnowledgeAlias(rawFinalText) || session.latestUserText || '';
const now459 = Date.now();
// 双事件去重S2S可能同时发送event 459和event 451(is_final),用去标点归一化文本+时间窗口去重
const normalizedForDedup = finalText.replace(/[,。!?、,.?!\s]/g, '');
if (normalizedForDedup && session._lastFinalNormalized === normalizedForDedup && now459 - (session._lastFinalAt || 0) < 1500) {
console.log(`[NativeVoice] duplicate final ignored (event=${message.event}) session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 80))}`);
return;
}
session._lastFinalNormalized = normalizedForDedup;
session._lastFinalAt = now459;
// TTS回声检测final级别播放期间ASR最终识别文本如果是当前播放文本的子串判定为回声
const isDirectSpeaking459 = session.directSpeakUntil && now459 < session.directSpeakUntil;
const isChatTTSSpeaking459 = session.isSendingChatTTSText && (session.chatTTSUntil || 0) > now459;
if ((isDirectSpeaking459 || isChatTTSSpeaking459) && session.currentSpeechText && finalText) {
const normalizedFinal = finalText.replace(/[,。!?、,.\s]/g, '');
const normalizedSpeech = session.currentSpeechText.replace(/[,。!?、,.\s]/g, '');
if (normalizedFinal.length <= 4 || normalizedSpeech.includes(normalizedFinal)) {
console.log(`[NativeVoice] TTS echo detected in final, ignoring session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 80))}`);
return;
}
}
// Greeting保护窗口
if (session.greetingProtectionUntil && now459 < session.greetingProtectionUntil && finalText) {
console.log(`[NativeVoice] greeting protection active, ignoring final session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 80))}`);
return;
}
console.log(`[NativeVoice] upstream final session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 120))}`);
if (session.directSpeakUntil && Date.now() < session.directSpeakUntil) {
console.log(`[NativeVoice] user interrupt during speaking session=${session.sessionId}`);
session.directSpeakUntil = 0;
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
session.currentSpeechText = '';
clearTimeout(session.chatTTSTimer);
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
clearUpstreamSuppression(session);
}
} else if (session.isSendingChatTTSText && (session.chatTTSUntil || 0) > Date.now()) {
console.log(`[NativeVoice] user interrupt chatTTS during speaking session=${session.sessionId}`);
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
session.currentSpeechText = '';
clearTimeout(session.chatTTSTimer);
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
clearUpstreamSuppression(session);
}
}
if (persistUserSpeech(session, rawFinalText || finalText)) {
session.blockUpstreamAudio = true;
session.currentTtsType = 'default';
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
sendJson(session.client, { type: 'tts_reset', reason: 'new_turn' });
processReply(session, finalText).catch((error) => {
console.error('[NativeVoice] processReply error:', error.message);
});
}
return;
}
sendJson(session.client, {
type: 'event',
event: message.event,
payload,
});
}
function attachClientHandlers(session) {
session.client.on('message', async (raw, isBinary) => {
if (isBinary) {
if (session.upstream && session.upstream.readyState === WebSocket.OPEN && session.upstreamReady) {
session.upstream.send(createAudioMessage(session.sessionId, raw));
resetAudioKeepalive(session);
}
return;
}
let parsed;
try {
parsed = JSON.parse(raw.toString('utf8'));
} catch (error) {
sendJson(session.client, { type: 'error', error: 'invalid client json' });
return;
}
if (parsed.type === 'start') {
session.userId = parsed.userId || session.userId || null;
const remoteProfileResult = await getAssistantProfile({ userId: session.userId });
const assistantProfile = resolveAssistantProfile({
...(remoteProfileResult.profile || {}),
...(session.assistantProfile || {}),
...((parsed.assistantProfile && typeof parsed.assistantProfile === 'object') ? parsed.assistantProfile : {}),
});
session.assistantProfile = assistantProfile;
session.botName = parsed.botName || getAssistantDisplayName(assistantProfile) || DEFAULT_VOICE_BOT_NAME;
session.systemRole = buildVoiceSystemRole(assistantProfile);
session.speakingStyle = parsed.speakingStyle || session.speakingStyle || DEFAULT_VOICE_SPEAKING_STYLE;
session.speaker = parsed.speaker || process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts';
session.modelVersion = parsed.modelVersion || 'O';
session.greetingText = buildVoiceGreeting(assistantProfile);
// 立即发送 ready不等 upstream event 150大幅缩短前端等待时间
sendReady(session);
session.upstream = createUpstreamConnection(session);
loadHandoffSummaryForVoice(session).catch((error) => {
console.warn('[NativeVoice] async loadHandoffSummaryForVoice failed:', error.message);
});
return;
}
if (parsed.type === 'stop') {
session.client.close();
return;
}
if (parsed.type === 'replay_greeting') {
replayGreeting(session).catch((error) => {
console.warn('[NativeVoice] replayGreeting failed:', error.message);
});
return;
}
if (parsed.type === 'text' && parsed.text) {
if (persistUserSpeech(session, parsed.text)) {
processReply(session, parsed.text, session.latestUserTurnSeq).catch((error) => {
console.error('[NativeVoice] text processReply failed:', error.message);
});
}
}
});
session.client.on('close', () => {
clearTimeout(session.chatTTSTimer);
clearTimeout(session.greetingTimer);
clearTimeout(session.greetingAckTimer);
clearTimeout(session.readyTimer);
clearTimeout(session.suppressReplyTimer);
clearTimeout(session.idleTimer);
clearInterval(session._audioKeepaliveTimer);
if (session.upstream && session.upstream.readyState === WebSocket.OPEN) {
session.upstream.close();
}
// 会话结束时持久化摘要到 MySQL
persistFinalSummary(session).catch((err) => {
console.warn('[NativeVoice] persistFinalSummary failed:', err.message);
});
sessions.delete(session.sessionId);
});
}
function createUpstreamConnection(session) {
const upstream = new WebSocket('wss://openspeech.bytedance.com/api/v3/realtime/dialogue', {
headers: {
'X-Api-Resource-Id': 'volc.speech.dialog',
'X-Api-Access-Key': process.env.VOLC_S2S_TOKEN,
'X-Api-App-Key': process.env.VOLC_DIALOG_APP_KEY || 'PlgvMymc7f3tQnJ6',
'X-Api-App-ID': process.env.VOLC_S2S_APP_ID,
'X-Api-Connect-Id': session.sessionId,
},
});
upstream.on('open', () => {
upstream.send(createStartConnectionMessage());
upstream.send(createStartSessionMessage(session.sessionId, buildStartSessionPayload(session)));
});
upstream.on('message', (data, isBinary) => {
if (!isBinary && typeof data === 'string') {
sendJson(session.client, { type: 'server_text', text: data });
return;
}
handleUpstreamMessage(session, Buffer.isBuffer(data) ? data : Buffer.from(data));
});
upstream.on('error', (error) => {
console.error('[NativeVoice] upstream ws error:', error.message);
sendJson(session.client, { type: 'error', error: `语音服务连接异常: ${error.message}` });
});
upstream.on('close', (code) => {
console.log(`[NativeVoice] upstream closed session=${session.sessionId} code=${code}`);
session.upstreamReady = false;
clearInterval(session._audioKeepaliveTimer);
session._audioKeepaliveTimer = null;
sendJson(session.client, { type: 'upstream_closed', code });
setTimeout(() => {
if (session.client && session.client.readyState === WebSocket.OPEN) {
session.client.close();
}
}, 3000);
});
return upstream;
}
function createSession(client, sessionId) {
const assistantProfile = resolveAssistantProfile();
const session = {
sessionId,
client,
upstream: null,
upstreamReady: false,
isSendingChatTTSText: false,
latestUserText: '',
latestUserTurnSeq: 0,
queuedUserText: '',
queuedUserTurnSeq: 0,
processingReply: false,
blockUpstreamAudio: false,
directSpeakUntil: 0,
queuedReplyTimer: null,
lastPersistedAssistantText: '',
lastPersistedAssistantAt: 0,
assistantStreamBuffer: '',
assistantStreamReplyId: '',
currentTtsType: '',
currentSpeechText: '',
greetingProtectionUntil: 0,
_echoLogOnce: false,
_fillerActive: false,
_pendingExternalRagReply: false,
_pendingEvidencePack: null,
_lastPartialAt: 0,
pendingKbPrequery: null,
_kbPrequeryText: '',
_kbPrequeryStartedAt: 0,
_lastKbTopic: '',
_lastKbHitAt: 0,
assistantProfile,
botName: getAssistantDisplayName(assistantProfile),
systemRole: buildVoiceSystemRole(assistantProfile),
speakingStyle: DEFAULT_VOICE_SPEAKING_STYLE,
speaker: process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts',
modelVersion: 'O',
greetingText: buildVoiceGreeting(assistantProfile),
hasSentGreeting: false,
greetingTimer: null,
greetingAckTimer: null,
pendingGreetingAck: false,
greetingRetryCount: 0,
readyTimer: null,
readySent: false,
handoffSummary: '',
handoffSummaryUsed: false,
awaitingUpstreamReply: false,
pendingAssistantSource: null,
pendingAssistantToolName: null,
pendingAssistantMeta: null,
pendingAssistantTurnSeq: 0,
lastDeliveredAssistantTurnSeq: 0,
suppressReplyTimer: null,
suppressUpstreamUntil: 0,
idleTimer: null,
lastActivityAt: Date.now(),
_lastBargeInResetAt: 0,
_audioBlockLogOnce: false,
_lastFinalNormalized: '',
_lastFinalAt: 0,
_audioKeepaliveTimer: null,
_turnCount: 0,
};
sessions.set(sessionId, session);
attachClientHandlers(session);
return session;
}
function setupNativeVoiceGateway(server) {
const wss = new WebSocketServer({ server, path: '/ws/realtime-dialog' });
wss.on('connection', async (client, req) => {
const parsed = url.parse(req.url, true);
const sessionId = parsed.query?.sessionId;
if (!sessionId) {
client.close();
return;
}
const userId = parsed.query?.userId || null;
const session = createSession(client, sessionId);
session.userId = userId;
try {
await db.createSession(sessionId, userId, 'voice');
} catch (error) {
console.warn('[NativeVoice][DB] createSession failed:', error.message);
}
sendJson(client, { type: 'connected', sessionId });
});
return wss;
}
module.exports = {
setupNativeVoiceGateway,
};