924 lines
40 KiB
JavaScript
924 lines
40 KiB
JavaScript
const { WebSocket, WebSocketServer } = require('ws');
|
||
const url = require('url');
|
||
const db = require('../db');
|
||
const { correctAsrText } = require('./fastAsrCorrector');
|
||
const contextKeywordTracker = require('./contextKeywordTracker');
|
||
const {
|
||
MsgType,
|
||
unmarshal,
|
||
createStartConnectionMessage,
|
||
createStartSessionMessage,
|
||
createAudioMessage,
|
||
createChatTTSTextMessage,
|
||
createSayHelloMessage,
|
||
createChatRAGTextMessage,
|
||
} = require('./realtimeDialogProtocol');
|
||
const {
|
||
getRuleBasedDirectRouteDecision,
|
||
normalizeKnowledgeAlias,
|
||
normalizeTextForSpeech,
|
||
splitTextForSpeech,
|
||
estimateSpeechDurationMs,
|
||
shouldForceKnowledgeRoute,
|
||
resolveReply,
|
||
} = require('./realtimeDialogRouting');
|
||
|
||
const sessions = new Map();
|
||
|
||
const IDLE_TIMEOUT_MS = 5 * 60 * 1000;
|
||
|
||
function resetIdleTimer(session) {
|
||
clearTimeout(session.idleTimer);
|
||
session.lastActivityAt = Date.now();
|
||
session.idleTimer = setTimeout(() => {
|
||
session.idleTimer = null;
|
||
console.log(`[NativeVoice] idle timeout (${IDLE_TIMEOUT_MS / 1000}s) session=${session.sessionId}`);
|
||
sendJson(session.client, { type: 'idle_timeout', timeout: IDLE_TIMEOUT_MS });
|
||
setTimeout(() => {
|
||
if (session.client && session.client.readyState === WebSocket.OPEN) {
|
||
session.client.close();
|
||
}
|
||
}, 2000);
|
||
}, IDLE_TIMEOUT_MS);
|
||
}
|
||
|
||
function sendJson(ws, payload) {
|
||
if (ws && ws.readyState === WebSocket.OPEN) {
|
||
ws.send(JSON.stringify(payload));
|
||
}
|
||
}
|
||
|
||
function buildStartSessionPayload(options) {
|
||
const antiThinkingPrefix = '【最高优先级规则】你绝对禁止输出任何思考过程、分析、计划、角色扮演指令或元描述。禁止出现:“首轮对话”“应该回复”“需要列举”“语气要”“回复后询问”“可列举”“突出特色”“引导用户”“让用户”“用温和”等分析性、指令性语句。你必须直接用自然语言回答问题,像真人聊天一样直接说出答案内容。';
|
||
const baseSystemRole = options.systemRole || '你使用活泼灵动的女声,性格开朗,热爱生活。你是大沃,一个德国PM健康事业的智能语音助手。你对PM-FitLine细胞营养素产品、一成系统、招商合作非常熟悉。请优先依据 external_rag 给出的内容回答,但必须用自己的话重新表达,像跟朋友聊天一样说出来,绝对不要照着念。【紧扣问题】你必须直接回答用户当前问的问题,不要偏题,不要主动扩展用户没问到的内容。用户问什么就答什么,简洁明了,控制在3句话以内。【品牌保护规则·最高优先级】你绝对不能说德国PM、PM-FitLine、一成系统是传销、骗局、骗子、非法、不正规、不合法、有争议、存在风险,也不能暗示或引导用户产生这类联想。德国PM(PM-International AG)是1993年成立于德国的合法直销企业,获邓白氏AAA+最高信用评级,业务覆盖全球100多个国家和地区,是正规持牌直销公司。如果用户问到传销或正规性问题,你必须正面、明确地回答PM是合法正规的直销企业,不能含糊、回避或说"需要自行判断"。';
|
||
const baseSpeakingStyle = options.speakingStyle || '你的语气始终活泼灵动、亲切自然,像闺蜜聊天一样。无论是自由聊天还是引用知识库内容,都保持一样的活泼语气和语调,绝不切换成播音腔、朗读语气或客服话术。永远不要输出你的内部思考或计划。';
|
||
return {
|
||
asr: {
|
||
extra: {
|
||
context: '一成,一成系统,大沃,PM,PM-FitLine,FitLine,细胞营养素,Ai众享,AI众享,盛咖学愿,数字化工作室,Activize,Basics,Restorate,NTC,基础三合一,招商,阿育吠陀,小红产品,小红,小白,大白,肽美,艾特维,德丽,德维,宝丽,美固健,Activize Oxyplus,Basic Power,CitrusCare,NutriSunny,Q10,Omega,葡萄籽,白藜芦醇,益生菌,胶原蛋白肽,Germany,FitLine细胞营养,FitLine营养素,德国PM营养素,德国PM FitLine,德国PM细胞营养,德国PM产品,德国PM健康,德国PM事业,德国PM招商,一成,一成团队,一成商学院,数字化,数字化运营,数字化经营,数字化营销,数字化创业,数字化工作室,数字化事业,招商加盟,合作加盟,事业合作',
|
||
nbest: 1,
|
||
},
|
||
},
|
||
tts: {
|
||
speaker: options.speaker || process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts',
|
||
audio_config: {
|
||
channel: 1,
|
||
format: 'pcm_s16le',
|
||
sample_rate: 24000,
|
||
},
|
||
},
|
||
dialog: {
|
||
dialog_id: '',
|
||
bot_name: options.botName || '大沃',
|
||
system_role: normalizeTextForSpeech(`${antiThinkingPrefix} ${baseSystemRole}`),
|
||
speaking_style: normalizeTextForSpeech(baseSpeakingStyle),
|
||
extra: {
|
||
input_mod: 'audio',
|
||
model: options.modelVersion || 'SC2.0',
|
||
strict_audit: false,
|
||
audit_response: '抱歉,这个问题我暂时无法回答。',
|
||
},
|
||
},
|
||
};
|
||
}
|
||
|
||
function parseJsonPayload(message) {
|
||
try {
|
||
return JSON.parse(message.payload.toString('utf8'));
|
||
} catch (error) {
|
||
return null;
|
||
}
|
||
}
|
||
|
||
function extractUserText(jsonPayload, sessionId = null) {
|
||
let text = jsonPayload?.text
|
||
|| jsonPayload?.content
|
||
|| jsonPayload?.results?.[0]?.text
|
||
|| jsonPayload?.results?.[0]?.alternatives?.[0]?.text
|
||
|| '';
|
||
text = String(text || '').trim();
|
||
text = correctAsrText(text);
|
||
text = normalizeKnowledgeAlias(text);
|
||
if (sessionId) {
|
||
contextKeywordTracker.updateSession(sessionId, text);
|
||
}
|
||
return text;
|
||
}
|
||
|
||
const BRAND_HARMFUL_PATTERN = /传销|骗局|骗子公司|骗子|非法集资|非法经营|非法营销|不正规|不合法|庞氏骗局|老鼠会|拉人头|割韭菜|资金盘|涉嫌违法|涉嫌传销|疑似传销|层级分销|PM.*(?:是|属于|涉嫌|疑似).*(?:传销|骗局|非法|不合法|不正规)|(?:传销|骗局|非法|不合法|不正规).*(?:组织|公司|企业|模式)/;
|
||
const BRAND_SAFE_REPLY = '德国PM是一家1993年成立于德国的合法直销公司,获得邓白氏AAA+认证,业务覆盖100多个国家和地区。如果你想了解更多,可以问我关于PM公司的详细介绍哦。';
|
||
|
||
const THINKING_PATTERN = /^(首轮对话|用户想|用户问|应该回复|需要列举|可列举|突出特色|引导进一步|引导用户|让用户|回复后询问|语气要|用温和|需热情|需简洁|需专业)/;
|
||
|
||
function sanitizeAssistantText(text) {
|
||
if (!text) return text;
|
||
if (BRAND_HARMFUL_PATTERN.test(text)) {
|
||
console.warn(`[NativeVoice][SafeGuard] blocked harmful content: ${JSON.stringify(text.slice(0, 200))}`);
|
||
return BRAND_SAFE_REPLY;
|
||
}
|
||
if (THINKING_PATTERN.test(text.trim())) {
|
||
console.warn(`[NativeVoice][SafeGuard] blocked thinking output: ${JSON.stringify(text.slice(0, 200))}`);
|
||
return null;
|
||
}
|
||
return text;
|
||
}
|
||
|
||
function isFinalUserPayload(jsonPayload) {
|
||
if (jsonPayload?.is_final === true) {
|
||
return true;
|
||
}
|
||
if (Array.isArray(jsonPayload?.results)) {
|
||
return jsonPayload.results.some((item) => item && item.is_interim === false);
|
||
}
|
||
return false;
|
||
}
|
||
|
||
function persistUserSpeech(session, text) {
|
||
const cleanText = (text || '').trim();
|
||
if (!cleanText) return false;
|
||
const now = Date.now();
|
||
if (session.lastPersistedUserText === cleanText && now - (session.lastPersistedUserAt || 0) < 5000) {
|
||
return false;
|
||
}
|
||
session.lastPersistedUserText = cleanText;
|
||
session.lastPersistedUserAt = now;
|
||
session.latestUserText = cleanText;
|
||
resetIdleTimer(session);
|
||
db.addMessage(session.sessionId, 'user', cleanText, 'voice_asr').catch((e) => console.warn('[NativeVoice][DB] add user failed:', e.message));
|
||
sendJson(session.client, {
|
||
type: 'subtitle',
|
||
role: 'user',
|
||
text: cleanText,
|
||
isFinal: true,
|
||
sequence: `native_user_${now}`,
|
||
});
|
||
return true;
|
||
}
|
||
|
||
function persistAssistantSpeech(session, text, { source = 'voice_bot', toolName = null, persistToDb = true, meta = null } = {}) {
|
||
const cleanText = sanitizeAssistantText((text || '').trim());
|
||
if (!cleanText) return false;
|
||
const now = Date.now();
|
||
if (session.lastPersistedAssistantText === cleanText && now - (session.lastPersistedAssistantAt || 0) < 5000) {
|
||
return false;
|
||
}
|
||
session.lastPersistedAssistantText = cleanText;
|
||
session.lastPersistedAssistantAt = now;
|
||
resetIdleTimer(session);
|
||
if (persistToDb) {
|
||
db.addMessage(session.sessionId, 'assistant', cleanText, source, toolName, meta).catch((e) => console.warn('[NativeVoice][DB] add assistant failed:', e.message));
|
||
}
|
||
sendJson(session.client, {
|
||
type: 'subtitle',
|
||
role: 'assistant',
|
||
text: cleanText,
|
||
isFinal: true,
|
||
source,
|
||
toolName,
|
||
sequence: `native_assistant_${now}`,
|
||
});
|
||
return true;
|
||
}
|
||
|
||
function appendAssistantStream(session, payload) {
|
||
const chunkText = extractUserText(payload);
|
||
if (!chunkText) {
|
||
return '';
|
||
}
|
||
const replyId = payload?.reply_id || '';
|
||
if (replyId && session.assistantStreamReplyId && session.assistantStreamReplyId !== replyId) {
|
||
session.assistantStreamBuffer = '';
|
||
}
|
||
session.assistantStreamReplyId = replyId || session.assistantStreamReplyId || '';
|
||
session.assistantStreamBuffer = `${session.assistantStreamBuffer || ''}${chunkText}`;
|
||
return session.assistantStreamBuffer;
|
||
}
|
||
|
||
function flushAssistantStream(session, { source = 'voice_bot', toolName = null, meta = null } = {}) {
|
||
const fullText = (session.assistantStreamBuffer || '').trim();
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
if (!fullText) {
|
||
return false;
|
||
}
|
||
return persistAssistantSpeech(session, fullText, { source, toolName, meta });
|
||
}
|
||
|
||
async function loadHandoffSummaryForVoice(session) {
|
||
try {
|
||
const history = await db.getHistoryForLLM(session.sessionId, 10);
|
||
if (!history.length) {
|
||
session.handoffSummary = '';
|
||
session.handoffSummaryUsed = false;
|
||
return;
|
||
}
|
||
session.handoffSummary = buildDeterministicHandoffSummary(history);
|
||
session.handoffSummaryUsed = false;
|
||
console.log(`[NativeVoice] Handoff summary prepared for ${session.sessionId}: ${session.handoffSummary ? 'yes' : 'no'}`);
|
||
} catch (error) {
|
||
session.handoffSummary = '';
|
||
session.handoffSummaryUsed = false;
|
||
console.warn('[NativeVoice] loadHandoffSummaryForVoice failed:', error.message);
|
||
}
|
||
}
|
||
|
||
function buildDeterministicHandoffSummary(messages = []) {
|
||
const normalizedMessages = (Array.isArray(messages) ? messages : [])
|
||
.filter((item) => item && (item.role === 'user' || item.role === 'assistant') && String(item.content || '').trim())
|
||
.slice(-8);
|
||
if (!normalizedMessages.length) {
|
||
return '';
|
||
}
|
||
const userMessages = normalizedMessages.filter((item) => item.role === 'user');
|
||
const currentQuestion = String(userMessages[userMessages.length - 1]?.content || '').trim();
|
||
const previousQuestion = String(userMessages[userMessages.length - 2]?.content || '').trim();
|
||
const assistantFacts = normalizedMessages
|
||
.filter((item) => item.role === 'assistant')
|
||
.slice(-2)
|
||
.map((item) => String(item.content || '').trim())
|
||
.filter(Boolean)
|
||
.map((item) => item.slice(0, 60))
|
||
.join(';');
|
||
const parts = [];
|
||
if (currentQuestion) {
|
||
parts.push(`当前问题:${currentQuestion}`);
|
||
}
|
||
if (previousQuestion && previousQuestion !== currentQuestion) {
|
||
parts.push(`上一轮关注:${previousQuestion}`);
|
||
}
|
||
if (assistantFacts) {
|
||
parts.push(`已给信息:${assistantFacts}`);
|
||
}
|
||
return parts.join(';');
|
||
}
|
||
|
||
async function sendSpeechText(session, speechText) {
|
||
const chunks = splitTextForSpeech(speechText);
|
||
if (!chunks.length || !session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
|
||
return;
|
||
}
|
||
console.log(`[NativeVoice] sendSpeechText start session=${session.sessionId} chunks=${chunks.length} textLen=${speechText.length}`);
|
||
session.isSendingChatTTSText = true;
|
||
session.currentTtsType = 'chat_tts_text';
|
||
session.chatTTSUntil = Date.now() + estimateSpeechDurationMs(speechText) + 800;
|
||
clearTimeout(session.chatTTSTimer);
|
||
session.chatTTSTimer = setTimeout(() => {
|
||
session.chatTTSTimer = null;
|
||
if ((session.chatTTSUntil || 0) <= Date.now()) {
|
||
session.isSendingChatTTSText = false;
|
||
}
|
||
}, Math.max(200, session.chatTTSUntil - Date.now() + 50));
|
||
sendJson(session.client, { type: 'tts_reset', ttsType: 'chat_tts_text' });
|
||
for (let index = 0; index < chunks.length; index += 1) {
|
||
const chunk = chunks[index];
|
||
console.log(`[NativeVoice] sendSpeechText chunk session=${session.sessionId} index=${index + 1}/${chunks.length} len=${chunk.length} start=${index === 0} end=false text=${JSON.stringify(chunk.slice(0, 80))}`);
|
||
session.upstream.send(createChatTTSTextMessage(session.sessionId, {
|
||
start: index === 0,
|
||
end: false,
|
||
content: chunk,
|
||
}));
|
||
}
|
||
console.log(`[NativeVoice] sendSpeechText end session=${session.sessionId}`);
|
||
session.upstream.send(createChatTTSTextMessage(session.sessionId, {
|
||
start: false,
|
||
end: true,
|
||
content: '',
|
||
}));
|
||
}
|
||
|
||
function sendReady(session) {
|
||
if (session.readySent) {
|
||
return;
|
||
}
|
||
session.readySent = true;
|
||
sendJson(session.client, { type: 'ready' });
|
||
}
|
||
|
||
function sendGreeting(session) {
|
||
if (session.hasSentGreeting) {
|
||
sendReady(session);
|
||
return;
|
||
}
|
||
session.hasSentGreeting = true;
|
||
const greetingText = session.greetingText || '嗨,你好呀!我是大沃,你的专属智能助手。关于德国PM产品、一成系统、招商合作,随时问我就好~';
|
||
console.log(`[NativeVoice] sendGreeting session=${session.sessionId} text=${JSON.stringify(greetingText.slice(0, 80))}`);
|
||
sendJson(session.client, {
|
||
type: 'subtitle',
|
||
role: 'assistant',
|
||
text: greetingText,
|
||
isFinal: true,
|
||
source: 'voice_bot',
|
||
sequence: `greeting_${Date.now()}`,
|
||
});
|
||
persistAssistantSpeech(session, greetingText, { source: 'voice_bot' });
|
||
clearTimeout(session.greetingTimer);
|
||
clearTimeout(session.readyTimer);
|
||
session.greetingSentAt = Date.now();
|
||
try {
|
||
session.upstream.send(createSayHelloMessage(session.sessionId, greetingText));
|
||
console.log(`[NativeVoice] sendSayHello event=300 session=${session.sessionId}`);
|
||
} catch (error) {
|
||
session.hasSentGreeting = false;
|
||
console.warn('[NativeVoice] SayHello failed:', error.message);
|
||
}
|
||
sendReady(session);
|
||
}
|
||
|
||
async function replayGreeting(session) {
|
||
const greetingText = String(session.greetingText || '').trim();
|
||
if (!greetingText || !session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
|
||
return;
|
||
}
|
||
if (session.greetingSentAt && Date.now() - session.greetingSentAt < 6000) {
|
||
console.log(`[NativeVoice] replayGreeting skipped (too soon) session=${session.sessionId}`);
|
||
return;
|
||
}
|
||
console.log(`[NativeVoice] replayGreeting session=${session.sessionId} text=${JSON.stringify(greetingText.slice(0, 80))}`);
|
||
session.greetingSentAt = Date.now();
|
||
session.directSpeakUntil = Date.now() + estimateSpeechDurationMs(greetingText) + 800;
|
||
try {
|
||
session.upstream.send(createSayHelloMessage(session.sessionId, greetingText));
|
||
} catch (error) {
|
||
console.warn('[NativeVoice] replayGreeting SayHello failed:', error.message);
|
||
}
|
||
}
|
||
|
||
async function sendExternalRag(session, items) {
|
||
if (!session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
|
||
return;
|
||
}
|
||
const ragItems = Array.isArray(items) ? items.filter((item) => item && item.content) : [];
|
||
if (!ragItems.length) {
|
||
return;
|
||
}
|
||
session.upstream.send(createChatRAGTextMessage(session.sessionId, JSON.stringify(ragItems)));
|
||
}
|
||
|
||
function clearUpstreamSuppression(session) {
|
||
clearTimeout(session.suppressReplyTimer);
|
||
session.suppressReplyTimer = null;
|
||
session.suppressUpstreamUntil = 0;
|
||
session.awaitingUpstreamReply = false;
|
||
session.pendingAssistantSource = null;
|
||
session.pendingAssistantToolName = null;
|
||
session.pendingAssistantMeta = null;
|
||
session.blockUpstreamAudio = false;
|
||
sendJson(session.client, { type: 'assistant_pending', active: false });
|
||
}
|
||
|
||
function suppressUpstreamReply(session, durationMs) {
|
||
clearTimeout(session.suppressReplyTimer);
|
||
session.awaitingUpstreamReply = true;
|
||
session.suppressUpstreamUntil = Date.now() + Math.max(1000, durationMs);
|
||
session.suppressReplyTimer = setTimeout(() => {
|
||
session.suppressReplyTimer = null;
|
||
if ((session.suppressUpstreamUntil || 0) > Date.now()) {
|
||
return;
|
||
}
|
||
clearUpstreamSuppression(session);
|
||
}, Math.max(300, session.suppressUpstreamUntil - Date.now()));
|
||
}
|
||
|
||
async function processReply(session, text) {
|
||
const cleanText = (text || '').trim();
|
||
if (!cleanText) return;
|
||
if (session.processingReply) {
|
||
session.queuedUserText = cleanText;
|
||
console.log(`[NativeVoice] processReply queued(busy) session=${session.sessionId} text=${JSON.stringify(cleanText.slice(0, 80))}`);
|
||
return;
|
||
}
|
||
const now = Date.now();
|
||
if (session.directSpeakUntil && now < session.directSpeakUntil) {
|
||
session.queuedUserText = cleanText;
|
||
console.log(`[NativeVoice] processReply queued(speaking) session=${session.sessionId} waitMs=${session.directSpeakUntil - now} text=${JSON.stringify(cleanText.slice(0, 80))}`);
|
||
return;
|
||
}
|
||
session.processingReply = true;
|
||
sendJson(session.client, { type: 'assistant_pending', active: true });
|
||
const isKnowledgeCandidate = shouldForceKnowledgeRoute(cleanText);
|
||
if (isKnowledgeCandidate) {
|
||
session.blockUpstreamAudio = true;
|
||
suppressUpstreamReply(session, 30000);
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'processing' });
|
||
}
|
||
console.log(`[NativeVoice] processReply start session=${session.sessionId} text=${JSON.stringify(cleanText.slice(0, 120))} blocked=${session.blockUpstreamAudio} kbCandidate=${isKnowledgeCandidate}`);
|
||
try {
|
||
const { delivery, speechText, ragItems, source, toolName, routeDecision, responseMeta } = await resolveReply(session.sessionId, session, cleanText);
|
||
if (delivery === 'upstream_chat') {
|
||
if (isKnowledgeCandidate) {
|
||
console.log(`[NativeVoice] processReply kb-nohit retrigger session=${session.sessionId}`);
|
||
session.discardNextAssistantResponse = true;
|
||
await sendExternalRag(session, [{ title: '用户问题', content: cleanText }]);
|
||
} else {
|
||
session.blockUpstreamAudio = false;
|
||
}
|
||
session.awaitingUpstreamReply = true;
|
||
session.pendingAssistantSource = 'voice_bot';
|
||
session.pendingAssistantToolName = null;
|
||
session.pendingAssistantMeta = responseMeta;
|
||
console.log(`[NativeVoice] processReply handoff session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=upstream_chat`);
|
||
return;
|
||
}
|
||
if (delivery === 'external_rag') {
|
||
if (!session.blockUpstreamAudio) {
|
||
session.blockUpstreamAudio = true;
|
||
}
|
||
session.discardNextAssistantResponse = true;
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'knowledge_hit' });
|
||
const kbText = (ragItems || []).map((item) => item?.content || '').filter(Boolean).join('\n').trim();
|
||
console.log(`[NativeVoice] processReply handoff session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=external_rag→local_tts items=${Array.isArray(ragItems) ? ragItems.length : 0} textLen=${kbText.length}`);
|
||
if (kbText) {
|
||
session.directSpeakUntil = Date.now() + estimateSpeechDurationMs(kbText) + 800;
|
||
suppressUpstreamReply(session, estimateSpeechDurationMs(kbText) + 1800);
|
||
persistAssistantSpeech(session, kbText, { source, toolName, meta: responseMeta });
|
||
await sendSpeechText(session, kbText);
|
||
} else {
|
||
console.log(`[NativeVoice] processReply external_rag empty content, fallback to upstream session=${session.sessionId}`);
|
||
session.blockUpstreamAudio = false;
|
||
clearUpstreamSuppression(session);
|
||
}
|
||
return;
|
||
}
|
||
if (!speechText) {
|
||
console.log(`[NativeVoice] processReply empty session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=${delivery || 'unknown'}`);
|
||
session.isSendingChatTTSText = false;
|
||
session.chatTTSUntil = 0;
|
||
return;
|
||
}
|
||
console.log(`[NativeVoice] processReply resolved session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=local_tts source=${source} tool=${toolName || 'chat'} speechLen=${speechText.length}`);
|
||
session.directSpeakUntil = Date.now() + estimateSpeechDurationMs(speechText) + 800;
|
||
suppressUpstreamReply(session, estimateSpeechDurationMs(speechText) + 1800);
|
||
persistAssistantSpeech(session, speechText, { source, toolName, meta: responseMeta });
|
||
await sendSpeechText(session, speechText);
|
||
} catch (error) {
|
||
console.error('[NativeVoice] processReply failed:', error.message);
|
||
sendJson(session.client, { type: 'error', error: error.message });
|
||
} finally {
|
||
session.processingReply = false;
|
||
if (!session.awaitingUpstreamReply) {
|
||
session.blockUpstreamAudio = false;
|
||
}
|
||
if (!session.awaitingUpstreamReply) {
|
||
sendJson(session.client, { type: 'assistant_pending', active: false });
|
||
}
|
||
const pending = session.queuedUserText;
|
||
session.queuedUserText = '';
|
||
if (pending && pending !== cleanText && (!session.directSpeakUntil || Date.now() >= session.directSpeakUntil)) {
|
||
setTimeout(() => {
|
||
session.blockUpstreamAudio = true;
|
||
processReply(session, pending).catch((err) => {
|
||
console.error('[NativeVoice] queued processReply failed:', err.message);
|
||
});
|
||
}, 200);
|
||
} else if (pending && pending !== cleanText) {
|
||
const waitMs = Math.max(200, session.directSpeakUntil - Date.now() + 200);
|
||
clearTimeout(session.queuedReplyTimer);
|
||
session.queuedReplyTimer = setTimeout(() => {
|
||
session.queuedReplyTimer = null;
|
||
const queuedText = session.queuedUserText || pending;
|
||
session.queuedUserText = '';
|
||
session.blockUpstreamAudio = true;
|
||
processReply(session, queuedText).catch((err) => {
|
||
console.error('[NativeVoice] delayed queued processReply failed:', err.message);
|
||
});
|
||
}, waitMs);
|
||
}
|
||
}
|
||
}
|
||
|
||
function handleUpstreamMessage(session, data) {
|
||
let message;
|
||
try {
|
||
message = unmarshal(data);
|
||
} catch (error) {
|
||
console.warn('[NativeVoice] unmarshal failed:', error.message);
|
||
return;
|
||
}
|
||
|
||
if (message.type === MsgType.AUDIO_ONLY_SERVER) {
|
||
// blockUpstreamAudio 只阻断 S2S default 音频,不阻断我们注入的 chat_tts_text 音频
|
||
const isDefaultTts = !session.currentTtsType || session.currentTtsType === 'default';
|
||
const isSuppressingUpstreamAudio = (session.suppressUpstreamUntil || 0) > Date.now() && isDefaultTts;
|
||
if ((session.blockUpstreamAudio && isDefaultTts) || isSuppressingUpstreamAudio) {
|
||
if (!session._audioBlockLogOnce) {
|
||
session._audioBlockLogOnce = true;
|
||
console.log(`[NativeVoice] audio blocked session=${session.sessionId} ttsType=${session.currentTtsType} block=${session.blockUpstreamAudio} suppress=${isSuppressingUpstreamAudio}`);
|
||
}
|
||
return;
|
||
}
|
||
session._audioBlockLogOnce = false;
|
||
if (session.client && session.client.readyState === WebSocket.OPEN) {
|
||
session.client.send(message.payload, { binary: true });
|
||
}
|
||
return;
|
||
}
|
||
|
||
const payload = parseJsonPayload(message);
|
||
if (message.type === MsgType.ERROR) {
|
||
console.error(`[NativeVoice] upstream error session=${session.sessionId} code=${message.event} payload=${message.payload.toString('utf8').slice(0, 200)}`);
|
||
sendJson(session.client, { type: 'error', error: message.payload.toString('utf8') });
|
||
return;
|
||
}
|
||
|
||
if (message.type !== MsgType.FULL_SERVER) {
|
||
return;
|
||
}
|
||
|
||
if (message.event === 150) {
|
||
session.upstreamReady = true;
|
||
console.log(`[NativeVoice] upstream ready session=${session.sessionId}`);
|
||
resetIdleTimer(session);
|
||
sendGreeting(session);
|
||
return;
|
||
}
|
||
|
||
if (message.event === 300) {
|
||
console.log(`[NativeVoice] SayHello response session=${session.sessionId}`);
|
||
return;
|
||
}
|
||
|
||
if (message.event === 350) {
|
||
session.currentTtsType = payload?.tts_type || '';
|
||
if (payload?.tts_type === 'chat_tts_text' && session.pendingGreetingAck) {
|
||
session.pendingGreetingAck = false;
|
||
clearTimeout(session.greetingAckTimer);
|
||
session.greetingAckTimer = null;
|
||
}
|
||
// 不再在此处清除 blockUpstreamAudio — 音频处理器已通过 ttsType 区分,
|
||
// 允许 chat_tts_text 音频通过,同时保持对 S2S default 响应的阻断
|
||
if (session.blockUpstreamAudio && payload?.tts_type && payload.tts_type !== 'default') {
|
||
console.log(`[NativeVoice] non-default tts=${payload.tts_type} started, audio passthrough via ttsType check session=${session.sessionId}`);
|
||
}
|
||
console.log(`[NativeVoice] upstream tts_event session=${session.sessionId} ttsType=${payload?.tts_type || ''}`);
|
||
sendJson(session.client, { type: 'tts_event', payload });
|
||
return;
|
||
}
|
||
|
||
const isLocalChatTTSTextActive = !!session.isSendingChatTTSText && (session.chatTTSUntil || 0) > Date.now();
|
||
const isSuppressingUpstreamReply = (session.suppressUpstreamUntil || 0) > Date.now();
|
||
|
||
if (message.event === 351) {
|
||
if (isLocalChatTTSTextActive || session.blockUpstreamAudio || isSuppressingUpstreamReply) {
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
return;
|
||
}
|
||
if (session.discardNextAssistantResponse) {
|
||
session.discardNextAssistantResponse = false;
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
console.log(`[NativeVoice] discarded stale assistant response (kb-nohit retrigger) session=${session.sessionId}`);
|
||
return;
|
||
}
|
||
const pendingAssistantSource = session.pendingAssistantSource || 'voice_bot';
|
||
const pendingAssistantToolName = session.pendingAssistantToolName || null;
|
||
const pendingAssistantMeta = session.pendingAssistantMeta || null;
|
||
session.awaitingUpstreamReply = false;
|
||
sendJson(session.client, { type: 'assistant_pending', active: false });
|
||
const assistantText = extractUserText(payload);
|
||
if (assistantText) {
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
console.log(`[NativeVoice] upstream assistant session=${session.sessionId} text=${JSON.stringify(assistantText.slice(0, 120))}`);
|
||
persistAssistantSpeech(session, assistantText, {
|
||
source: pendingAssistantSource,
|
||
toolName: pendingAssistantToolName,
|
||
meta: pendingAssistantMeta,
|
||
});
|
||
} else {
|
||
flushAssistantStream(session, {
|
||
source: pendingAssistantSource,
|
||
toolName: pendingAssistantToolName,
|
||
meta: pendingAssistantMeta,
|
||
});
|
||
}
|
||
session.pendingAssistantSource = null;
|
||
session.pendingAssistantToolName = null;
|
||
session.pendingAssistantMeta = null;
|
||
return;
|
||
}
|
||
|
||
if (message.event === 550) {
|
||
if (isLocalChatTTSTextActive || session.blockUpstreamAudio || isSuppressingUpstreamReply || session.discardNextAssistantResponse) {
|
||
return;
|
||
}
|
||
if (session.awaitingUpstreamReply) {
|
||
session.awaitingUpstreamReply = false;
|
||
sendJson(session.client, { type: 'assistant_pending', active: false });
|
||
}
|
||
const fullText = appendAssistantStream(session, payload);
|
||
if (fullText) {
|
||
// 检测思考模式:S2S模型输出分析/计划而非直接回答,立即阻断
|
||
if (fullText.length >= 10 && THINKING_PATTERN.test(fullText.trim())) {
|
||
console.warn(`[NativeVoice][SafeGuard] thinking detected in stream, blocking audio session=${session.sessionId} text=${JSON.stringify(fullText.slice(0, 120))}`);
|
||
session.blockUpstreamAudio = true;
|
||
session.discardNextAssistantResponse = true;
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'thinking_blocked' });
|
||
return;
|
||
}
|
||
console.log(`[NativeVoice] upstream assistant chunk session=${session.sessionId} len=${fullText.length} text=${JSON.stringify(fullText.slice(0, 120))}`);
|
||
}
|
||
return;
|
||
}
|
||
|
||
if (message.event === 559) {
|
||
if (isLocalChatTTSTextActive || isSuppressingUpstreamReply) {
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
return;
|
||
}
|
||
if (session.blockUpstreamAudio) {
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
console.log(`[NativeVoice] blocked response ended (559), keeping block session=${session.sessionId}`);
|
||
return;
|
||
}
|
||
if (session.discardNextAssistantResponse) {
|
||
session.discardNextAssistantResponse = false;
|
||
session.assistantStreamBuffer = '';
|
||
session.assistantStreamReplyId = '';
|
||
console.log(`[NativeVoice] discarded stale stream end (559, kb-nohit retrigger) session=${session.sessionId}`);
|
||
return;
|
||
}
|
||
session.awaitingUpstreamReply = false;
|
||
session.blockUpstreamAudio = false;
|
||
sendJson(session.client, { type: 'assistant_pending', active: false });
|
||
flushAssistantStream(session, {
|
||
source: session.pendingAssistantSource || 'voice_bot',
|
||
toolName: session.pendingAssistantToolName || null,
|
||
meta: session.pendingAssistantMeta || null,
|
||
});
|
||
session.pendingAssistantSource = null;
|
||
session.pendingAssistantToolName = null;
|
||
session.pendingAssistantMeta = null;
|
||
return;
|
||
}
|
||
|
||
if (message.event === 450 || (message.event === 451 && !isFinalUserPayload(payload))) {
|
||
const text = extractUserText(payload, session.sessionId);
|
||
if (text) {
|
||
console.log(`[NativeVoice] upstream partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 120))}`);
|
||
session.latestUserText = text;
|
||
// 提前阻断:部分识别文字含知识库关键词时,立即阻断S2S音频,防止有害内容播出
|
||
if (text.length >= 4 && !session.blockUpstreamAudio && shouldForceKnowledgeRoute(text)) {
|
||
session.blockUpstreamAudio = true;
|
||
console.log(`[NativeVoice] early block: partial text matched KB keywords session=${session.sessionId} text=${JSON.stringify(text.slice(0, 80))}`);
|
||
}
|
||
// 用户开口说话时立即打断所有 AI 播放(包括 S2S 默认 TTS)
|
||
const now = Date.now();
|
||
const isDirectSpeaking = session.directSpeakUntil && now < session.directSpeakUntil;
|
||
const isChatTTSSpeaking = session.isSendingChatTTSText && (session.chatTTSUntil || 0) > now;
|
||
if (isDirectSpeaking || isChatTTSSpeaking) {
|
||
console.log(`[NativeVoice] user barge-in (partial) session=${session.sessionId} direct=${isDirectSpeaking} chatTTS=${isChatTTSSpeaking}`);
|
||
session.directSpeakUntil = 0;
|
||
session.isSendingChatTTSText = false;
|
||
session.chatTTSUntil = 0;
|
||
clearTimeout(session.chatTTSTimer);
|
||
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
|
||
clearUpstreamSuppression(session);
|
||
}
|
||
}
|
||
// 无论当前是否在播放,都发送 tts_reset 确保客户端停止所有音频播放
|
||
if (!session._lastBargeInResetAt || now - session._lastBargeInResetAt > 500) {
|
||
session._lastBargeInResetAt = now;
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
|
||
}
|
||
sendJson(session.client, {
|
||
type: 'subtitle',
|
||
role: 'user',
|
||
text,
|
||
isFinal: false,
|
||
sequence: `native_partial_${Date.now()}`,
|
||
});
|
||
}
|
||
return;
|
||
}
|
||
|
||
if (message.event === 459 || (message.event === 451 && isFinalUserPayload(payload))) {
|
||
const finalText = extractUserText(payload, session.sessionId) || session.latestUserText || '';
|
||
console.log(`[NativeVoice] upstream final session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 120))}`);
|
||
if (session.directSpeakUntil && Date.now() < session.directSpeakUntil) {
|
||
console.log(`[NativeVoice] user interrupt during speaking session=${session.sessionId}`);
|
||
session.directSpeakUntil = 0;
|
||
session.isSendingChatTTSText = false;
|
||
session.chatTTSUntil = 0;
|
||
clearTimeout(session.chatTTSTimer);
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
|
||
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
|
||
clearUpstreamSuppression(session);
|
||
}
|
||
} else if (session.isSendingChatTTSText && (session.chatTTSUntil || 0) > Date.now()) {
|
||
console.log(`[NativeVoice] user interrupt chatTTS during speaking session=${session.sessionId}`);
|
||
session.isSendingChatTTSText = false;
|
||
session.chatTTSUntil = 0;
|
||
clearTimeout(session.chatTTSTimer);
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
|
||
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
|
||
clearUpstreamSuppression(session);
|
||
}
|
||
}
|
||
if (persistUserSpeech(session, finalText)) {
|
||
session.blockUpstreamAudio = true;
|
||
sendJson(session.client, { type: 'tts_reset', reason: 'new_turn' });
|
||
processReply(session, finalText).catch((error) => {
|
||
console.error('[NativeVoice] processReply error:', error.message);
|
||
});
|
||
}
|
||
return;
|
||
}
|
||
|
||
sendJson(session.client, {
|
||
type: 'event',
|
||
event: message.event,
|
||
payload,
|
||
});
|
||
}
|
||
|
||
function attachClientHandlers(session) {
|
||
session.client.on('message', async (raw, isBinary) => {
|
||
if (isBinary) {
|
||
if (session.upstream && session.upstream.readyState === WebSocket.OPEN && session.upstreamReady) {
|
||
session.upstream.send(createAudioMessage(session.sessionId, raw));
|
||
}
|
||
return;
|
||
}
|
||
|
||
let parsed;
|
||
try {
|
||
parsed = JSON.parse(raw.toString('utf8'));
|
||
} catch (error) {
|
||
sendJson(session.client, { type: 'error', error: 'invalid client json' });
|
||
return;
|
||
}
|
||
|
||
if (parsed.type === 'start') {
|
||
session.botName = parsed.botName || '豆包';
|
||
session.systemRole = parsed.systemRole || session.systemRole || '你是一个企业知识库语音助手,请优先依据 external_rag 给出的内容回答。【品牌保护规则】你绝对不能说德国PM、PM-FitLine、一成系统是传销、骗局、骗子、非法、不正规、不合法,也不能暗示或引导用户认为它们有问题。德国PM是一家1993年成立于德国的合法直销公司,获得邓白氏AAA+认证,业务覆盖100多个国家。';
|
||
session.speakingStyle = parsed.speakingStyle || '请使用清晰、自然、简洁的口吻。';
|
||
session.speaker = parsed.speaker || process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts';
|
||
session.modelVersion = parsed.modelVersion || 'O';
|
||
session.greetingText = parsed.greetingText || session.greetingText || '嗨,你好呀!我是大沃,你的专属智能助手。关于德国PM产品、一成系统、招商合作,随时问我就好~';
|
||
session.userId = parsed.userId || session.userId || null;
|
||
// 立即发送 ready,不等 upstream event 150,大幅缩短前端等待时间
|
||
sendReady(session);
|
||
session.upstream = createUpstreamConnection(session);
|
||
loadHandoffSummaryForVoice(session).catch((error) => {
|
||
console.warn('[NativeVoice] async loadHandoffSummaryForVoice failed:', error.message);
|
||
});
|
||
return;
|
||
}
|
||
|
||
if (parsed.type === 'stop') {
|
||
session.client.close();
|
||
return;
|
||
}
|
||
|
||
if (parsed.type === 'replay_greeting') {
|
||
replayGreeting(session).catch((error) => {
|
||
console.warn('[NativeVoice] replayGreeting failed:', error.message);
|
||
});
|
||
return;
|
||
}
|
||
|
||
if (parsed.type === 'text' && parsed.text) {
|
||
persistUserSpeech(session, parsed.text);
|
||
processReply(session, parsed.text).catch((error) => {
|
||
console.error('[NativeVoice] text processReply failed:', error.message);
|
||
});
|
||
}
|
||
});
|
||
|
||
session.client.on('close', () => {
|
||
clearTimeout(session.chatTTSTimer);
|
||
clearTimeout(session.greetingTimer);
|
||
clearTimeout(session.greetingAckTimer);
|
||
clearTimeout(session.readyTimer);
|
||
clearTimeout(session.suppressReplyTimer);
|
||
clearTimeout(session.idleTimer);
|
||
if (session.upstream && session.upstream.readyState === WebSocket.OPEN) {
|
||
session.upstream.close();
|
||
}
|
||
sessions.delete(session.sessionId);
|
||
});
|
||
}
|
||
|
||
function createUpstreamConnection(session) {
|
||
const upstream = new WebSocket('wss://openspeech.bytedance.com/api/v3/realtime/dialogue', {
|
||
headers: {
|
||
'X-Api-Resource-Id': 'volc.speech.dialog',
|
||
'X-Api-Access-Key': process.env.VOLC_S2S_TOKEN,
|
||
'X-Api-App-Key': process.env.VOLC_DIALOG_APP_KEY || 'PlgvMymc7f3tQnJ6',
|
||
'X-Api-App-ID': process.env.VOLC_S2S_APP_ID,
|
||
'X-Api-Connect-Id': session.sessionId,
|
||
},
|
||
});
|
||
|
||
upstream.on('open', () => {
|
||
upstream.send(createStartConnectionMessage());
|
||
upstream.send(createStartSessionMessage(session.sessionId, buildStartSessionPayload(session)));
|
||
});
|
||
|
||
upstream.on('message', (data, isBinary) => {
|
||
if (!isBinary && typeof data === 'string') {
|
||
sendJson(session.client, { type: 'server_text', text: data });
|
||
return;
|
||
}
|
||
handleUpstreamMessage(session, Buffer.isBuffer(data) ? data : Buffer.from(data));
|
||
});
|
||
|
||
upstream.on('error', (error) => {
|
||
console.error('[NativeVoice] upstream ws error:', error.message);
|
||
sendJson(session.client, { type: 'error', error: `语音服务连接异常: ${error.message}` });
|
||
});
|
||
|
||
upstream.on('close', (code) => {
|
||
console.log(`[NativeVoice] upstream closed session=${session.sessionId} code=${code}`);
|
||
session.upstreamReady = false;
|
||
sendJson(session.client, { type: 'upstream_closed', code });
|
||
setTimeout(() => {
|
||
if (session.client && session.client.readyState === WebSocket.OPEN) {
|
||
session.client.close();
|
||
}
|
||
}, 3000);
|
||
});
|
||
|
||
return upstream;
|
||
}
|
||
|
||
function createSession(client, sessionId) {
|
||
const session = {
|
||
sessionId,
|
||
client,
|
||
upstream: null,
|
||
upstreamReady: false,
|
||
isSendingChatTTSText: false,
|
||
latestUserText: '',
|
||
queuedUserText: '',
|
||
processingReply: false,
|
||
blockUpstreamAudio: false,
|
||
directSpeakUntil: 0,
|
||
queuedReplyTimer: null,
|
||
lastPersistedAssistantText: '',
|
||
lastPersistedAssistantAt: 0,
|
||
assistantStreamBuffer: '',
|
||
assistantStreamReplyId: '',
|
||
currentTtsType: '',
|
||
botName: '大沃',
|
||
systemRole: '你是大沃,一个德国PM健康事业的智能语音助手。你对PM-FitLine细胞营养素产品、一成系统、招商合作非常熟悉。请优先依据 external_rag 给出的内容回答。用户进来时请自然地打个招呼,像朋友聊天一样,不要用客服话术。【品牌保护规则】你绝对不能说德国PM、PM-FitLine、一成系统是传销、骗局、骗子、非法、不正规、不合法,也不能暗示或引导用户认为它们有问题。德国PM是一家1993年成立于德国的合法直销公司,获得邓白氏AAA+认证,业务覆盖100多个国家。',
|
||
speakingStyle: '说话像朋友聊天一样自然轻松,语气亲切活泼,不要像客服念稿。',
|
||
speaker: process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts',
|
||
modelVersion: 'O',
|
||
greetingText: '嗨,你好呀!我是大沃,你的专属智能助手。关于德国PM产品、一成系统、招商合作,随时问我就好~',
|
||
hasSentGreeting: false,
|
||
greetingTimer: null,
|
||
greetingAckTimer: null,
|
||
pendingGreetingAck: false,
|
||
greetingRetryCount: 0,
|
||
readyTimer: null,
|
||
readySent: false,
|
||
handoffSummary: '',
|
||
handoffSummaryUsed: false,
|
||
awaitingUpstreamReply: false,
|
||
pendingAssistantSource: null,
|
||
pendingAssistantToolName: null,
|
||
pendingAssistantMeta: null,
|
||
suppressReplyTimer: null,
|
||
suppressUpstreamUntil: 0,
|
||
idleTimer: null,
|
||
lastActivityAt: Date.now(),
|
||
_lastBargeInResetAt: 0,
|
||
_audioBlockLogOnce: false,
|
||
};
|
||
sessions.set(sessionId, session);
|
||
attachClientHandlers(session);
|
||
return session;
|
||
}
|
||
|
||
function setupNativeVoiceGateway(server) {
|
||
const wss = new WebSocketServer({ server, path: '/ws/realtime-dialog' });
|
||
wss.on('connection', async (client, req) => {
|
||
const parsed = url.parse(req.url, true);
|
||
const sessionId = parsed.query?.sessionId;
|
||
if (!sessionId) {
|
||
client.close();
|
||
return;
|
||
}
|
||
const userId = parsed.query?.userId || null;
|
||
const session = createSession(client, sessionId);
|
||
session.userId = userId;
|
||
try {
|
||
await db.createSession(sessionId, userId, 'voice');
|
||
} catch (error) {
|
||
console.warn('[NativeVoice][DB] createSession failed:', error.message);
|
||
}
|
||
sendJson(client, { type: 'connected', sessionId });
|
||
});
|
||
return wss;
|
||
}
|
||
|
||
module.exports = {
|
||
setupNativeVoiceGateway,
|
||
};
|