Files
bigwo/test2/server/services/nativeVoiceGateway.js

948 lines
41 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const { WebSocket, WebSocketServer } = require('ws');
const url = require('url');
const db = require('../db');
const { correctAsrText } = require('./fastAsrCorrector');
const contextKeywordTracker = require('./contextKeywordTracker');
const {
MsgType,
unmarshal,
createStartConnectionMessage,
createStartSessionMessage,
createAudioMessage,
createChatTTSTextMessage,
createSayHelloMessage,
createChatRAGTextMessage,
} = require('./realtimeDialogProtocol');
const {
getRuleBasedDirectRouteDecision,
normalizeKnowledgeAlias,
normalizeTextForSpeech,
splitTextForSpeech,
estimateSpeechDurationMs,
shouldForceKnowledgeRoute,
resolveReply,
} = require('./realtimeDialogRouting');
const sessions = new Map();
const IDLE_TIMEOUT_MS = 5 * 60 * 1000;
function resetIdleTimer(session) {
clearTimeout(session.idleTimer);
session.lastActivityAt = Date.now();
session.idleTimer = setTimeout(() => {
session.idleTimer = null;
console.log(`[NativeVoice] idle timeout (${IDLE_TIMEOUT_MS / 1000}s) session=${session.sessionId}`);
sendJson(session.client, { type: 'idle_timeout', timeout: IDLE_TIMEOUT_MS });
setTimeout(() => {
if (session.client && session.client.readyState === WebSocket.OPEN) {
session.client.close();
}
}, 2000);
}, IDLE_TIMEOUT_MS);
}
function sendJson(ws, payload) {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(payload));
}
}
function buildStartSessionPayload(options) {
const antiThinkingPrefix = '【最高优先级规则】你绝对禁止输出任何思考过程、分析、计划、角色扮演指令或元描述。禁止出现:“首轮对话”“应该回复”“需要列举”“语气要”“回复后询问”“可列举”“突出特色”“引导用户”“让用户”“用温和”等分析性、指令性语句。你必须直接用自然语言回答问题,像真人聊天一样直接说出答案内容。';
const baseSystemRole = options.systemRole || '你使用活泼灵动的女声性格开朗热爱生活。你是大沃一个德国PM健康事业的智能语音助手。你对PM-FitLine细胞营养素产品、一成系统、招商合作非常熟悉。请优先依据 external_rag 给出的内容回答但必须用自己的话重新表达像跟朋友聊天一样说出来绝对不要照着念。【紧扣问题】你必须直接回答用户当前问的问题不要偏题不要主动扩展用户没问到的内容。用户问什么就答什么简洁明了控制在3句话以内。【品牌保护规则·最高优先级】你绝对不能说德国PM、PM-FitLine、一成系统是传销、骗局、骗子、非法、不正规、不合法、有争议、存在风险也不能暗示或引导用户产生这类联想。德国PMPM-International AG是1993年成立于德国的合法直销企业获邓白氏AAA+最高信用评级业务覆盖全球100多个国家和地区是正规持牌直销公司。如果用户问到传销或正规性问题你必须正面、明确地回答PM是合法正规的直销企业不能含糊、回避或说"需要自行判断"。';
const baseSpeakingStyle = options.speakingStyle || '你的语气始终活泼灵动、亲切自然,像闺蜜聊天一样。无论是自由聊天还是引用知识库内容,都保持一样的活泼语气和语调,绝不切换成播音腔、朗读语气或客服话术。永远不要输出你的内部思考或计划。';
return {
asr: {
extra: {
context: '一成,一成系统,大沃,PM,PM-FitLine,FitLine,细胞营养素,Ai众享,AI众享,盛咖学愿,数字化工作室,Activize,Basics,Restorate,NTC,基础三合一,招商,阿育吠陀,小红产品,小红,小白,大白,肽美,艾特维,德丽,德维,宝丽,美固健,Activize Oxyplus,Basic Power,CitrusCare,NutriSunny,Q10,Omega,葡萄籽,白藜芦醇,益生菌,胶原蛋白肽,Germany,FitLine细胞营养,FitLine营养素,德国PM营养素,德国PM FitLine,德国PM细胞营养,德国PM产品,德国PM健康,德国PM事业,德国PM招商,一成,一成团队,一成商学院,数字化,数字化运营,数字化经营,数字化营销,数字化创业,数字化工作室,数字化事业,招商加盟,合作加盟,事业合作',
nbest: 1,
},
},
tts: {
speaker: options.speaker || process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts',
audio_config: {
channel: 1,
format: 'pcm_s16le',
sample_rate: 24000,
},
},
dialog: {
dialog_id: '',
bot_name: options.botName || '大沃',
system_role: normalizeTextForSpeech(`${antiThinkingPrefix} ${baseSystemRole}`),
speaking_style: normalizeTextForSpeech(baseSpeakingStyle),
extra: {
input_mod: 'audio',
model: options.modelVersion || 'SC2.0',
strict_audit: false,
audit_response: '抱歉,这个问题我暂时无法回答。',
},
},
};
}
function parseJsonPayload(message) {
try {
return JSON.parse(message.payload.toString('utf8'));
} catch (error) {
return null;
}
}
function extractUserText(jsonPayload, sessionId = null) {
let text = jsonPayload?.text
|| jsonPayload?.content
|| jsonPayload?.results?.[0]?.text
|| jsonPayload?.results?.[0]?.alternatives?.[0]?.text
|| '';
text = String(text || '').trim();
text = correctAsrText(text);
text = normalizeKnowledgeAlias(text);
if (sessionId) {
contextKeywordTracker.updateSession(sessionId, text);
}
return text;
}
const BRAND_HARMFUL_PATTERN = /传销|骗局|骗子公司|骗子|非法集资|非法经营|非法营销|不正规|不合法|庞氏骗局|老鼠会|拉人头|割韭菜|资金盘|涉嫌违法|涉嫌传销|疑似传销|层级分销|PM.*(?:是|属于|涉嫌|疑似).*(?:传销|骗局|非法|不合法|不正规)|(?:传销|骗局|非法|不合法|不正规).*(?:组织|公司|企业|模式)/;
const BRAND_SAFE_REPLY = '德国PM是一家1993年成立于德国的合法直销公司获得邓白氏AAA+认证业务覆盖100多个国家和地区。如果你想了解更多可以问我关于PM公司的详细介绍哦。';
const BRAND_POSITIVE_LEGALITY_PATTERN = /(德国PM|PM-International|PM公司|PM-FitLine|FitLine).*(不是传销|合法正规的?直销企业|合法直销公司|正规直销企业|正规持牌直销公司|邓白氏AAA\+)|((不是传销|合法正规的?直销企业|合法直销公司|正规直销企业|正规持牌直销公司|邓白氏AAA\+).*(德国PM|PM-International|PM公司|PM-FitLine|FitLine))/i;
const THINKING_PATTERN = /^(首轮对话|用户想|用户问|应该回复|需要列举|可列举|突出特色|引导进一步|引导用户|让用户|回复后询问|语气要|用温和|需热情|需简洁|需专业)/;
function sanitizeAssistantText(text) {
if (!text) return text;
if (BRAND_POSITIVE_LEGALITY_PATTERN.test(String(text || '').replace(/\s+/g, ' '))) {
return text;
}
if (BRAND_HARMFUL_PATTERN.test(text)) {
console.warn(`[NativeVoice][SafeGuard] blocked harmful content: ${JSON.stringify(text.slice(0, 200))}`);
return BRAND_SAFE_REPLY;
}
if (THINKING_PATTERN.test(text.trim())) {
console.warn(`[NativeVoice][SafeGuard] blocked thinking output: ${JSON.stringify(text.slice(0, 200))}`);
return null;
}
return text;
}
function isFinalUserPayload(jsonPayload) {
if (jsonPayload?.is_final === true) {
return true;
}
if (Array.isArray(jsonPayload?.results)) {
return jsonPayload.results.some((item) => item && item.is_interim === false);
}
return false;
}
function persistUserSpeech(session, text) {
const cleanText = (text || '').trim();
if (!cleanText) return false;
const now = Date.now();
if (session.lastPersistedUserText === cleanText && now - (session.lastPersistedUserAt || 0) < 5000) {
return false;
}
session.lastPersistedUserText = cleanText;
session.lastPersistedUserAt = now;
session.latestUserText = cleanText;
session.latestUserTurnSeq = (session.latestUserTurnSeq || 0) + 1;
resetIdleTimer(session);
db.addMessage(session.sessionId, 'user', cleanText, 'voice_asr').catch((e) => console.warn('[NativeVoice][DB] add user failed:', e.message));
sendJson(session.client, {
type: 'subtitle',
role: 'user',
text: cleanText,
isFinal: true,
sequence: `native_user_${now}`,
});
return true;
}
function persistAssistantSpeech(session, text, { source = 'voice_bot', toolName = null, persistToDb = true, meta = null } = {}) {
const cleanText = sanitizeAssistantText((text || '').trim());
if (!cleanText) return false;
const now = Date.now();
if (session.lastPersistedAssistantText === cleanText && now - (session.lastPersistedAssistantAt || 0) < 5000) {
return false;
}
session.lastPersistedAssistantText = cleanText;
session.lastPersistedAssistantAt = now;
resetIdleTimer(session);
if (persistToDb) {
db.addMessage(session.sessionId, 'assistant', cleanText, source, toolName, meta).catch((e) => console.warn('[NativeVoice][DB] add assistant failed:', e.message));
}
sendJson(session.client, {
type: 'subtitle',
role: 'assistant',
text: cleanText,
isFinal: true,
source,
toolName,
sequence: `native_assistant_${now}`,
});
return true;
}
function appendAssistantStream(session, payload) {
const chunkText = extractUserText(payload);
if (!chunkText) {
return '';
}
const replyId = payload?.reply_id || '';
if (replyId && session.assistantStreamReplyId && session.assistantStreamReplyId !== replyId) {
session.assistantStreamBuffer = '';
}
session.assistantStreamReplyId = replyId || session.assistantStreamReplyId || '';
session.assistantStreamBuffer = `${session.assistantStreamBuffer || ''}${chunkText}`;
return session.assistantStreamBuffer;
}
function flushAssistantStream(session, { source = 'voice_bot', toolName = null, meta = null } = {}) {
const fullText = (session.assistantStreamBuffer || '').trim();
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
if (!fullText) {
return false;
}
return persistAssistantSpeech(session, fullText, { source, toolName, meta });
}
async function loadHandoffSummaryForVoice(session) {
try {
const history = await db.getHistoryForLLM(session.sessionId, 10);
if (!history.length) {
session.handoffSummary = '';
session.handoffSummaryUsed = false;
return;
}
session.handoffSummary = buildDeterministicHandoffSummary(history);
session.handoffSummaryUsed = false;
console.log(`[NativeVoice] Handoff summary prepared for ${session.sessionId}: ${session.handoffSummary ? 'yes' : 'no'}`);
} catch (error) {
session.handoffSummary = '';
session.handoffSummaryUsed = false;
console.warn('[NativeVoice] loadHandoffSummaryForVoice failed:', error.message);
}
}
function buildDeterministicHandoffSummary(messages = []) {
const normalizedMessages = (Array.isArray(messages) ? messages : [])
.filter((item) => item && (item.role === 'user' || item.role === 'assistant') && String(item.content || '').trim())
.slice(-8);
if (!normalizedMessages.length) {
return '';
}
const userMessages = normalizedMessages.filter((item) => item.role === 'user');
const currentQuestion = String(userMessages[userMessages.length - 1]?.content || '').trim();
const previousQuestion = String(userMessages[userMessages.length - 2]?.content || '').trim();
const assistantFacts = normalizedMessages
.filter((item) => item.role === 'assistant')
.slice(-2)
.map((item) => String(item.content || '').trim())
.filter(Boolean)
.map((item) => item.slice(0, 60))
.join('');
const parts = [];
if (currentQuestion) {
parts.push(`当前问题:${currentQuestion}`);
}
if (previousQuestion && previousQuestion !== currentQuestion) {
parts.push(`上一轮关注:${previousQuestion}`);
}
if (assistantFacts) {
parts.push(`已给信息:${assistantFacts}`);
}
return parts.join('');
}
async function sendSpeechText(session, speechText) {
const chunks = splitTextForSpeech(speechText);
if (!chunks.length || !session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
return;
}
console.log(`[NativeVoice] sendSpeechText start session=${session.sessionId} chunks=${chunks.length} textLen=${speechText.length}`);
session.isSendingChatTTSText = true;
session.currentTtsType = 'chat_tts_text';
session.chatTTSUntil = Date.now() + estimateSpeechDurationMs(speechText) + 800;
clearTimeout(session.chatTTSTimer);
session.chatTTSTimer = setTimeout(() => {
session.chatTTSTimer = null;
if ((session.chatTTSUntil || 0) <= Date.now()) {
session.isSendingChatTTSText = false;
}
}, Math.max(200, session.chatTTSUntil - Date.now() + 50));
sendJson(session.client, { type: 'tts_reset', ttsType: 'chat_tts_text' });
for (let index = 0; index < chunks.length; index += 1) {
const chunk = chunks[index];
console.log(`[NativeVoice] sendSpeechText chunk session=${session.sessionId} index=${index + 1}/${chunks.length} len=${chunk.length} start=${index === 0} end=false text=${JSON.stringify(chunk.slice(0, 80))}`);
session.upstream.send(createChatTTSTextMessage(session.sessionId, {
start: index === 0,
end: false,
content: chunk,
}));
}
console.log(`[NativeVoice] sendSpeechText end session=${session.sessionId}`);
session.upstream.send(createChatTTSTextMessage(session.sessionId, {
start: false,
end: true,
content: '',
}));
}
function sendReady(session) {
if (session.readySent) {
return;
}
session.readySent = true;
sendJson(session.client, { type: 'ready' });
}
function sendGreeting(session) {
if (session.hasSentGreeting) {
sendReady(session);
return;
}
session.hasSentGreeting = true;
const greetingText = session.greetingText || '嗨你好呀我是大沃你的专属智能助手。关于德国PM产品、一成系统、招商合作随时问我就好';
console.log(`[NativeVoice] sendGreeting session=${session.sessionId} text=${JSON.stringify(greetingText.slice(0, 80))}`);
sendJson(session.client, {
type: 'subtitle',
role: 'assistant',
text: greetingText,
isFinal: true,
source: 'voice_bot',
sequence: `greeting_${Date.now()}`,
});
persistAssistantSpeech(session, greetingText, { source: 'voice_bot' });
clearTimeout(session.greetingTimer);
clearTimeout(session.readyTimer);
session.greetingSentAt = Date.now();
try {
session.upstream.send(createSayHelloMessage(session.sessionId, greetingText));
console.log(`[NativeVoice] sendSayHello event=300 session=${session.sessionId}`);
} catch (error) {
session.hasSentGreeting = false;
console.warn('[NativeVoice] SayHello failed:', error.message);
}
sendReady(session);
}
async function replayGreeting(session) {
const greetingText = String(session.greetingText || '').trim();
if (!greetingText || !session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
return;
}
if (session.greetingSentAt && Date.now() - session.greetingSentAt < 6000) {
console.log(`[NativeVoice] replayGreeting skipped (too soon) session=${session.sessionId}`);
return;
}
console.log(`[NativeVoice] replayGreeting session=${session.sessionId} text=${JSON.stringify(greetingText.slice(0, 80))}`);
session.greetingSentAt = Date.now();
session.directSpeakUntil = Date.now() + estimateSpeechDurationMs(greetingText) + 800;
try {
session.upstream.send(createSayHelloMessage(session.sessionId, greetingText));
} catch (error) {
console.warn('[NativeVoice] replayGreeting SayHello failed:', error.message);
}
}
async function sendExternalRag(session, items) {
if (!session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
return;
}
const ragItems = Array.isArray(items) ? items.filter((item) => item && item.content) : [];
if (!ragItems.length) {
return;
}
session.upstream.send(createChatRAGTextMessage(session.sessionId, JSON.stringify(ragItems)));
}
function clearUpstreamSuppression(session) {
clearTimeout(session.suppressReplyTimer);
session.suppressReplyTimer = null;
session.suppressUpstreamUntil = 0;
session.awaitingUpstreamReply = false;
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
session.blockUpstreamAudio = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
}
function suppressUpstreamReply(session, durationMs) {
clearTimeout(session.suppressReplyTimer);
session.awaitingUpstreamReply = true;
session.suppressUpstreamUntil = Date.now() + Math.max(1000, durationMs);
session.suppressReplyTimer = setTimeout(() => {
session.suppressReplyTimer = null;
if ((session.suppressUpstreamUntil || 0) > Date.now()) {
return;
}
clearUpstreamSuppression(session);
}, Math.max(300, session.suppressUpstreamUntil - Date.now()));
}
async function processReply(session, text, turnSeq = session.latestUserTurnSeq || 0) {
const cleanText = (text || '').trim();
if (!cleanText) return;
if (session.processingReply) {
session.queuedUserText = cleanText;
session.queuedUserTurnSeq = turnSeq;
console.log(`[NativeVoice] processReply queued(busy) session=${session.sessionId} text=${JSON.stringify(cleanText.slice(0, 80))}`);
return;
}
const now = Date.now();
if (session.directSpeakUntil && now < session.directSpeakUntil) {
session.queuedUserText = cleanText;
session.queuedUserTurnSeq = turnSeq;
console.log(`[NativeVoice] processReply queued(speaking) session=${session.sessionId} waitMs=${session.directSpeakUntil - now} text=${JSON.stringify(cleanText.slice(0, 80))}`);
return;
}
const activeTurnSeq = turnSeq || session.latestUserTurnSeq || 0;
session.processingReply = true;
sendJson(session.client, { type: 'assistant_pending', active: true });
const isKnowledgeCandidate = shouldForceKnowledgeRoute(cleanText);
if (isKnowledgeCandidate) {
session.blockUpstreamAudio = true;
suppressUpstreamReply(session, 30000);
sendJson(session.client, { type: 'tts_reset', reason: 'processing' });
}
console.log(`[NativeVoice] processReply start session=${session.sessionId} text=${JSON.stringify(cleanText.slice(0, 120))} blocked=${session.blockUpstreamAudio} kbCandidate=${isKnowledgeCandidate}`);
try {
const { delivery, speechText, ragItems, source, toolName, routeDecision, responseMeta } = await resolveReply(session.sessionId, session, cleanText);
if (activeTurnSeq !== (session.latestUserTurnSeq || 0)) {
console.log(`[NativeVoice] stale reply ignored session=${session.sessionId} activeTurn=${activeTurnSeq} latestTurn=${session.latestUserTurnSeq || 0}`);
clearUpstreamSuppression(session);
return;
}
if (delivery === 'upstream_chat') {
if (isKnowledgeCandidate) {
console.log(`[NativeVoice] processReply kb-nohit retrigger session=${session.sessionId}`);
session.discardNextAssistantResponse = true;
await sendExternalRag(session, [{ title: '用户问题', content: cleanText }]);
} else {
session.blockUpstreamAudio = false;
}
session.awaitingUpstreamReply = true;
session.pendingAssistantSource = 'voice_bot';
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = responseMeta;
console.log(`[NativeVoice] processReply handoff session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=upstream_chat`);
return;
}
if (delivery === 'external_rag') {
if (!session.blockUpstreamAudio) {
session.blockUpstreamAudio = true;
}
session.discardNextAssistantResponse = true;
sendJson(session.client, { type: 'tts_reset', reason: 'knowledge_hit' });
const kbText = (ragItems || []).map((item) => item?.content || '').filter(Boolean).join('\n').trim();
console.log(`[NativeVoice] processReply handoff session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=external_rag→local_tts items=${Array.isArray(ragItems) ? ragItems.length : 0} textLen=${kbText.length}`);
if (kbText) {
session.directSpeakUntil = Date.now() + estimateSpeechDurationMs(kbText) + 800;
suppressUpstreamReply(session, estimateSpeechDurationMs(kbText) + 1800);
persistAssistantSpeech(session, kbText, { source, toolName, meta: responseMeta });
await sendSpeechText(session, kbText);
} else {
console.log(`[NativeVoice] processReply external_rag empty content, fallback to upstream session=${session.sessionId}`);
session.blockUpstreamAudio = false;
clearUpstreamSuppression(session);
}
return;
}
if (!speechText) {
console.log(`[NativeVoice] processReply empty session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=${delivery || 'unknown'}`);
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
return;
}
console.log(`[NativeVoice] processReply resolved session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=local_tts source=${source} tool=${toolName || 'chat'} speechLen=${speechText.length}`);
session.directSpeakUntil = Date.now() + estimateSpeechDurationMs(speechText) + 800;
suppressUpstreamReply(session, estimateSpeechDurationMs(speechText) + 1800);
persistAssistantSpeech(session, speechText, { source, toolName, meta: responseMeta });
await sendSpeechText(session, speechText);
} catch (error) {
console.error('[NativeVoice] processReply failed:', error.message);
sendJson(session.client, { type: 'error', error: error.message });
} finally {
session.processingReply = false;
if (!session.awaitingUpstreamReply) {
session.blockUpstreamAudio = false;
}
if (!session.awaitingUpstreamReply) {
sendJson(session.client, { type: 'assistant_pending', active: false });
}
const pending = session.queuedUserText;
const pendingTurnSeq = session.queuedUserTurnSeq || 0;
session.queuedUserText = '';
session.queuedUserTurnSeq = 0;
if (pending && pendingTurnSeq && pendingTurnSeq !== activeTurnSeq && (!session.directSpeakUntil || Date.now() >= session.directSpeakUntil)) {
setTimeout(() => {
session.blockUpstreamAudio = true;
processReply(session, pending, pendingTurnSeq).catch((err) => {
console.error('[NativeVoice] queued processReply failed:', err.message);
});
}, 200);
} else if (pending && pendingTurnSeq && pendingTurnSeq !== activeTurnSeq) {
const waitMs = Math.max(200, session.directSpeakUntil - Date.now() + 200);
clearTimeout(session.queuedReplyTimer);
session.queuedReplyTimer = setTimeout(() => {
session.queuedReplyTimer = null;
const queuedText = session.queuedUserText || pending;
const queuedTurnSeq = session.queuedUserTurnSeq || pendingTurnSeq;
session.queuedUserText = '';
session.queuedUserTurnSeq = 0;
session.blockUpstreamAudio = true;
processReply(session, queuedText, queuedTurnSeq).catch((err) => {
console.error('[NativeVoice] delayed queued processReply failed:', err.message);
});
}, waitMs);
}
}
}
function handleUpstreamMessage(session, data) {
let message;
try {
message = unmarshal(data);
} catch (error) {
console.warn('[NativeVoice] unmarshal failed:', error.message);
return;
}
if (message.type === MsgType.AUDIO_ONLY_SERVER) {
const isDefaultTts = !session.currentTtsType || session.currentTtsType === 'default';
const isSuppressingUpstreamAudio = (session.suppressUpstreamUntil || 0) > Date.now() && isDefaultTts;
if ((session.blockUpstreamAudio && isDefaultTts) || isSuppressingUpstreamAudio) {
if (!session._audioBlockLogOnce) {
session._audioBlockLogOnce = true;
console.log(`[NativeVoice] audio blocked session=${session.sessionId} ttsType=${session.currentTtsType} block=${session.blockUpstreamAudio} suppress=${isSuppressingUpstreamAudio}`);
}
return;
}
session._audioBlockLogOnce = false;
if (session.client && session.client.readyState === WebSocket.OPEN) {
session.client.send(message.payload, { binary: true });
}
return;
}
const payload = parseJsonPayload(message);
if (message.type === MsgType.ERROR) {
console.error(`[NativeVoice] upstream error session=${session.sessionId} code=${message.event} payload=${message.payload.toString('utf8').slice(0, 200)}`);
sendJson(session.client, { type: 'error', error: message.payload.toString('utf8') });
return;
}
if (message.type !== MsgType.FULL_SERVER) {
return;
}
if (message.event === 150) {
session.upstreamReady = true;
console.log(`[NativeVoice] upstream ready session=${session.sessionId}`);
resetIdleTimer(session);
sendGreeting(session);
return;
}
if (message.event === 300) {
console.log(`[NativeVoice] SayHello response session=${session.sessionId}`);
return;
}
if (message.event === 350) {
session.currentTtsType = payload?.tts_type || '';
if (payload?.tts_type === 'chat_tts_text' && session.pendingGreetingAck) {
session.pendingGreetingAck = false;
clearTimeout(session.greetingAckTimer);
session.greetingAckTimer = null;
}
if (session.blockUpstreamAudio && payload?.tts_type === 'external_rag') {
session.blockUpstreamAudio = false;
session.suppressUpstreamUntil = 0;
clearTimeout(session.suppressReplyTimer);
session.suppressReplyTimer = null;
session.discardNextAssistantResponse = false;
console.log(`[NativeVoice] unblock for external_rag tts session=${session.sessionId}`);
} else if (session.blockUpstreamAudio && payload?.tts_type === 'chat_tts_text') {
console.log(`[NativeVoice] chat_tts_text started, keeping block for S2S default response session=${session.sessionId}`);
}
console.log(`[NativeVoice] upstream tts_event session=${session.sessionId} ttsType=${payload?.tts_type || ''}`);
sendJson(session.client, { type: 'tts_event', payload });
return;
}
const isLocalChatTTSTextActive = !!session.isSendingChatTTSText && (session.chatTTSUntil || 0) > Date.now();
const isSuppressingUpstreamReply = (session.suppressUpstreamUntil || 0) > Date.now();
if (message.event === 351) {
if (isLocalChatTTSTextActive || session.blockUpstreamAudio || isSuppressingUpstreamReply) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
return;
}
if (session.discardNextAssistantResponse) {
session.discardNextAssistantResponse = false;
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] discarded stale assistant response (kb-nohit retrigger) session=${session.sessionId}`);
return;
}
const pendingAssistantSource = session.pendingAssistantSource || 'voice_bot';
const pendingAssistantToolName = session.pendingAssistantToolName || null;
const pendingAssistantMeta = session.pendingAssistantMeta || null;
session.awaitingUpstreamReply = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
const assistantText = extractUserText(payload);
if (assistantText) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] upstream assistant session=${session.sessionId} text=${JSON.stringify(assistantText.slice(0, 120))}`);
persistAssistantSpeech(session, assistantText, {
source: pendingAssistantSource,
toolName: pendingAssistantToolName,
meta: pendingAssistantMeta,
});
} else {
flushAssistantStream(session, {
source: pendingAssistantSource,
toolName: pendingAssistantToolName,
meta: pendingAssistantMeta,
});
}
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
return;
}
if (message.event === 550) {
if (isLocalChatTTSTextActive || session.blockUpstreamAudio || isSuppressingUpstreamReply || session.discardNextAssistantResponse) {
return;
}
if (session.awaitingUpstreamReply) {
session.awaitingUpstreamReply = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
}
const fullText = appendAssistantStream(session, payload);
if (fullText) {
// 检测思考模式S2S模型输出分析/计划而非直接回答,立即阻断
if (fullText.length >= 10 && THINKING_PATTERN.test(fullText.trim())) {
console.warn(`[NativeVoice][SafeGuard] thinking detected in stream, blocking audio session=${session.sessionId} text=${JSON.stringify(fullText.slice(0, 120))}`);
session.blockUpstreamAudio = true;
session.discardNextAssistantResponse = true;
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
sendJson(session.client, { type: 'tts_reset', reason: 'thinking_blocked' });
return;
}
console.log(`[NativeVoice] upstream assistant chunk session=${session.sessionId} len=${fullText.length} text=${JSON.stringify(fullText.slice(0, 120))}`);
}
return;
}
if (message.event === 559) {
if (isLocalChatTTSTextActive || isSuppressingUpstreamReply) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
return;
}
if (session.blockUpstreamAudio) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] blocked response ended (559), keeping block session=${session.sessionId}`);
return;
}
if (session.discardNextAssistantResponse) {
session.discardNextAssistantResponse = false;
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] discarded stale stream end (559, kb-nohit retrigger) session=${session.sessionId}`);
return;
}
session.awaitingUpstreamReply = false;
session.blockUpstreamAudio = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
flushAssistantStream(session, {
source: session.pendingAssistantSource || 'voice_bot',
toolName: session.pendingAssistantToolName || null,
meta: session.pendingAssistantMeta || null,
});
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
return;
}
if (message.event === 450 || (message.event === 451 && !isFinalUserPayload(payload))) {
const text = extractUserText(payload, session.sessionId);
if (text) {
console.log(`[NativeVoice] upstream partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 120))}`);
session.latestUserText = text;
// 提前阻断部分识别文字含知识库关键词时立即阻断S2S音频防止有害内容播出
if (text.length >= 4 && !session.blockUpstreamAudio && shouldForceKnowledgeRoute(text)) {
session.blockUpstreamAudio = true;
console.log(`[NativeVoice] early block: partial text matched KB keywords session=${session.sessionId} text=${JSON.stringify(text.slice(0, 80))}`);
}
// 用户开口说话时立即打断所有 AI 播放(包括 S2S 默认 TTS
const now = Date.now();
const isDirectSpeaking = session.directSpeakUntil && now < session.directSpeakUntil;
const isChatTTSSpeaking = session.isSendingChatTTSText && (session.chatTTSUntil || 0) > now;
if (isDirectSpeaking || isChatTTSSpeaking) {
console.log(`[NativeVoice] user barge-in (partial) session=${session.sessionId} direct=${isDirectSpeaking} chatTTS=${isChatTTSSpeaking}`);
session.directSpeakUntil = 0;
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
clearTimeout(session.chatTTSTimer);
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
clearUpstreamSuppression(session);
}
}
// 无论当前是否在播放,都发送 tts_reset 确保客户端停止所有音频播放
if (!session._lastBargeInResetAt || now - session._lastBargeInResetAt > 500) {
session._lastBargeInResetAt = now;
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
}
sendJson(session.client, {
type: 'subtitle',
role: 'user',
text,
isFinal: false,
sequence: `native_partial_${Date.now()}`,
});
}
return;
}
if (message.event === 459 || (message.event === 451 && isFinalUserPayload(payload))) {
const finalText = extractUserText(payload, session.sessionId) || session.latestUserText || '';
console.log(`[NativeVoice] upstream final session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 120))}`);
if (session.directSpeakUntil && Date.now() < session.directSpeakUntil) {
console.log(`[NativeVoice] user interrupt during speaking session=${session.sessionId}`);
session.directSpeakUntil = 0;
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
clearTimeout(session.chatTTSTimer);
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
clearUpstreamSuppression(session);
}
} else if (session.isSendingChatTTSText && (session.chatTTSUntil || 0) > Date.now()) {
console.log(`[NativeVoice] user interrupt chatTTS during speaking session=${session.sessionId}`);
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
clearTimeout(session.chatTTSTimer);
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
clearUpstreamSuppression(session);
}
}
if (persistUserSpeech(session, finalText)) {
session.blockUpstreamAudio = true;
sendJson(session.client, { type: 'tts_reset', reason: 'new_turn' });
processReply(session, finalText).catch((error) => {
console.error('[NativeVoice] processReply error:', error.message);
});
}
return;
}
sendJson(session.client, {
type: 'event',
event: message.event,
payload,
});
}
function attachClientHandlers(session) {
session.client.on('message', async (raw, isBinary) => {
if (isBinary) {
if (session.upstream && session.upstream.readyState === WebSocket.OPEN && session.upstreamReady) {
session.upstream.send(createAudioMessage(session.sessionId, raw));
}
return;
}
let parsed;
try {
parsed = JSON.parse(raw.toString('utf8'));
} catch (error) {
sendJson(session.client, { type: 'error', error: 'invalid client json' });
return;
}
if (parsed.type === 'start') {
session.botName = parsed.botName || '豆包';
session.systemRole = parsed.systemRole || session.systemRole || '你是一个企业知识库语音助手,请优先依据 external_rag 给出的内容回答。【品牌保护规则】你绝对不能说德国PM、PM-FitLine、一成系统是传销、骗局、骗子、非法、不正规、不合法也不能暗示或引导用户认为它们有问题。德国PM是一家1993年成立于德国的合法直销公司获得邓白氏AAA+认证业务覆盖100多个国家。';
session.speakingStyle = parsed.speakingStyle || '请使用清晰、自然、简洁的口吻。';
session.speaker = parsed.speaker || process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts';
session.modelVersion = parsed.modelVersion || 'O';
session.greetingText = parsed.greetingText || session.greetingText || '嗨你好呀我是大沃你的专属智能助手。关于德国PM产品、一成系统、招商合作随时问我就好';
session.userId = parsed.userId || session.userId || null;
// 立即发送 ready不等 upstream event 150大幅缩短前端等待时间
sendReady(session);
session.upstream = createUpstreamConnection(session);
loadHandoffSummaryForVoice(session).catch((error) => {
console.warn('[NativeVoice] async loadHandoffSummaryForVoice failed:', error.message);
});
return;
}
if (parsed.type === 'stop') {
session.client.close();
return;
}
if (parsed.type === 'replay_greeting') {
replayGreeting(session).catch((error) => {
console.warn('[NativeVoice] replayGreeting failed:', error.message);
});
return;
}
if (parsed.type === 'text' && parsed.text) {
if (persistUserSpeech(session, parsed.text)) {
processReply(session, parsed.text, session.latestUserTurnSeq).catch((error) => {
console.error('[NativeVoice] text processReply failed:', error.message);
});
}
}
});
session.client.on('close', () => {
clearTimeout(session.chatTTSTimer);
clearTimeout(session.greetingTimer);
clearTimeout(session.greetingAckTimer);
clearTimeout(session.readyTimer);
clearTimeout(session.suppressReplyTimer);
clearTimeout(session.idleTimer);
if (session.upstream && session.upstream.readyState === WebSocket.OPEN) {
session.upstream.close();
}
sessions.delete(session.sessionId);
});
}
function createUpstreamConnection(session) {
const upstream = new WebSocket('wss://openspeech.bytedance.com/api/v3/realtime/dialogue', {
headers: {
'X-Api-Resource-Id': 'volc.speech.dialog',
'X-Api-Access-Key': process.env.VOLC_S2S_TOKEN,
'X-Api-App-Key': process.env.VOLC_DIALOG_APP_KEY || 'PlgvMymc7f3tQnJ6',
'X-Api-App-ID': process.env.VOLC_S2S_APP_ID,
'X-Api-Connect-Id': session.sessionId,
},
});
upstream.on('open', () => {
upstream.send(createStartConnectionMessage());
upstream.send(createStartSessionMessage(session.sessionId, buildStartSessionPayload(session)));
});
upstream.on('message', (data, isBinary) => {
if (!isBinary && typeof data === 'string') {
sendJson(session.client, { type: 'server_text', text: data });
return;
}
handleUpstreamMessage(session, Buffer.isBuffer(data) ? data : Buffer.from(data));
});
upstream.on('error', (error) => {
console.error('[NativeVoice] upstream ws error:', error.message);
sendJson(session.client, { type: 'error', error: `语音服务连接异常: ${error.message}` });
});
upstream.on('close', (code) => {
console.log(`[NativeVoice] upstream closed session=${session.sessionId} code=${code}`);
session.upstreamReady = false;
sendJson(session.client, { type: 'upstream_closed', code });
setTimeout(() => {
if (session.client && session.client.readyState === WebSocket.OPEN) {
session.client.close();
}
}, 3000);
});
return upstream;
}
function createSession(client, sessionId) {
const session = {
sessionId,
client,
upstream: null,
upstreamReady: false,
isSendingChatTTSText: false,
latestUserText: '',
latestUserTurnSeq: 0,
queuedUserText: '',
queuedUserTurnSeq: 0,
processingReply: false,
blockUpstreamAudio: false,
directSpeakUntil: 0,
queuedReplyTimer: null,
lastPersistedAssistantText: '',
lastPersistedAssistantAt: 0,
assistantStreamBuffer: '',
assistantStreamReplyId: '',
currentTtsType: '',
botName: '大沃',
systemRole: '你是大沃一个德国PM健康事业的智能语音助手。你对PM-FitLine细胞营养素产品、一成系统、招商合作非常熟悉。请优先依据 external_rag 给出的内容回答。用户进来时请自然地打个招呼像朋友聊天一样不要用客服话术。【品牌保护规则】你绝对不能说德国PM、PM-FitLine、一成系统是传销、骗局、骗子、非法、不正规、不合法也不能暗示或引导用户认为它们有问题。德国PM是一家1993年成立于德国的合法直销公司获得邓白氏AAA+认证业务覆盖100多个国家。',
speakingStyle: '说话像朋友聊天一样自然轻松,语气亲切活泼,不要像客服念稿。',
speaker: process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts',
modelVersion: 'O',
greetingText: '嗨你好呀我是大沃你的专属智能助手。关于德国PM产品、一成系统、招商合作随时问我就好',
hasSentGreeting: false,
greetingTimer: null,
greetingAckTimer: null,
pendingGreetingAck: false,
greetingRetryCount: 0,
readyTimer: null,
readySent: false,
handoffSummary: '',
handoffSummaryUsed: false,
awaitingUpstreamReply: false,
pendingAssistantSource: null,
pendingAssistantToolName: null,
pendingAssistantMeta: null,
suppressReplyTimer: null,
suppressUpstreamUntil: 0,
idleTimer: null,
lastActivityAt: Date.now(),
_lastBargeInResetAt: 0,
_audioBlockLogOnce: false,
};
sessions.set(sessionId, session);
attachClientHandlers(session);
return session;
}
function setupNativeVoiceGateway(server) {
const wss = new WebSocketServer({ server, path: '/ws/realtime-dialog' });
wss.on('connection', async (client, req) => {
const parsed = url.parse(req.url, true);
const sessionId = parsed.query?.sessionId;
if (!sessionId) {
client.close();
return;
}
const userId = parsed.query?.userId || null;
const session = createSession(client, sessionId);
session.userId = userId;
try {
await db.createSession(sessionId, userId, 'voice');
} catch (error) {
console.warn('[NativeVoice][DB] createSession failed:', error.message);
}
sendJson(client, { type: 'connected', sessionId });
});
return wss;
}
module.exports = {
setupNativeVoiceGateway,
};