Files
bigwo/test2/server/services/nativeVoiceGateway.js

1215 lines
54 KiB
JavaScript
Raw Normal View History

const { WebSocket, WebSocketServer } = require('ws');
const url = require('url');
const db = require('../db');
const { correctAsrText } = require('./fastAsrCorrector');
const contextKeywordTracker = require('./contextKeywordTracker');
const { isBrandHarmful, getVoiceSafeReply, BRAND_HARMFUL_PATTERN, BRAND_POSITIVE_LEGALITY_PATTERN } = require('./contentSafeGuard');
const {
MsgType,
unmarshal,
createStartConnectionMessage,
createStartSessionMessage,
createAudioMessage,
createChatTTSTextMessage,
createSayHelloMessage,
createChatRAGTextMessage,
} = require('./realtimeDialogProtocol');
const {
getRuleBasedDirectRouteDecision,
normalizeKnowledgeAlias,
normalizeTextForSpeech,
splitTextForSpeech,
estimateSpeechDurationMs,
shouldForceKnowledgeRoute,
resolveReply,
} = require('./realtimeDialogRouting');
const ToolExecutor = require('./toolExecutor');
const {
DEFAULT_VOICE_ASSISTANT_PROFILE,
resolveAssistantProfile,
buildVoiceSystemRole,
buildVoiceGreeting,
} = require('./assistantProfileConfig');
const { getAssistantProfile } = require('./assistantProfileService');
const redisClient = require('./redisClient');
const sessions = new Map();
const IDLE_TIMEOUT_MS = 5 * 60 * 1000;
const DEFAULT_VOICE_BOT_NAME = DEFAULT_VOICE_ASSISTANT_PROFILE.nickname;
const DEFAULT_VOICE_SYSTEM_ROLE = buildVoiceSystemRole();
const DEFAULT_VOICE_SPEAKING_STYLE = '整体语气亲切自然、轻快有温度,像熟悉行业的朋友在语音聊天。优先短句和口语化表达,先给结论,再补一句最有帮助的信息。不要播音腔,不要念稿,不要客服腔,不要过度热情,也不要输出任何思考过程。';
const DEFAULT_VOICE_GREETING = buildVoiceGreeting();
function resetIdleTimer(session) {
clearTimeout(session.idleTimer);
session.lastActivityAt = Date.now();
session.idleTimer = setTimeout(() => {
session.idleTimer = null;
console.log(`[NativeVoice] idle timeout (${IDLE_TIMEOUT_MS / 1000}s) session=${session.sessionId}`);
sendJson(session.client, { type: 'idle_timeout', timeout: IDLE_TIMEOUT_MS });
setTimeout(() => {
if (session.client && session.client.readyState === WebSocket.OPEN) {
session.client.close();
}
}, 2000);
}, IDLE_TIMEOUT_MS);
}
function sendJson(ws, payload) {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(payload));
}
}
function buildStartSessionPayload(options) {
const antiThinkingPrefix = '【最高优先级规则】你绝对禁止输出任何思考过程、分析、计划、角色扮演指令或元描述。禁止出现:“首轮对话”“应该回复”“需要列举”“语气要”“回复后询问”“可列举”“突出特色”“引导用户”“让用户”“用温和”等分析性、指令性语句。你必须直接用自然语言回答问题,像真人聊天一样直接说出答案内容。';
const baseSystemRole = options.systemRole || DEFAULT_VOICE_SYSTEM_ROLE;
const baseSpeakingStyle = options.speakingStyle || DEFAULT_VOICE_SPEAKING_STYLE;
return {
asr: {
extra: {
context: '一成,一成系统,大沃,PM,PM-FitLine,FitLine,细胞营养素,Ai众享,AI众享,盛咖学愿,数字化工作室,Activize,Basics,Restorate,NTC,基础三合一,招商,阿育吠陀,小红产品,小红,小白,大白,肽美,艾特维,德丽,德维,宝丽,美固健,Activize Oxyplus,Basic Power,CitrusCare,NutriSunny,Q10,Omega,葡萄籽,白藜芦醇,益生菌,胶原蛋白肽,Germany,FitLine细胞营养,FitLine营养素,德国PM营养素,德国PM FitLine,德国PM细胞营养,德国PM产品,德国PM健康,德国PM事业,德国PM招商,一成,一成团队,一成商学院,数字化,数字化运营,数字化经营,数字化营销,数字化创业,数字化工作室,数字化事业,招商加盟,合作加盟,事业合作',
nbest: 1,
},
},
tts: {
speaker: options.speaker || process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts',
audio_config: {
channel: 1,
format: 'pcm_s16le',
sample_rate: 24000,
},
},
dialog: {
dialog_id: '',
bot_name: options.botName || '大沃',
system_role: normalizeTextForSpeech(`${antiThinkingPrefix} ${baseSystemRole}`),
speaking_style: normalizeTextForSpeech(baseSpeakingStyle),
extra: {
input_mod: 'audio',
model: options.modelVersion || 'SC2.0',
strict_audit: false,
audit_response: '抱歉,这个问题我暂时无法回答。',
},
},
};
}
function parseJsonPayload(message) {
try {
return JSON.parse(message.payload.toString('utf8'));
} catch (error) {
return null;
}
}
function extractRawText(jsonPayload) {
const text = jsonPayload?.text
|| jsonPayload?.content
|| jsonPayload?.results?.[0]?.text
|| jsonPayload?.results?.[0]?.alternatives?.[0]?.text
|| '';
return String(text || '').trim();
}
function extractUserText(jsonPayload, sessionId = null) {
let text = extractRawText(jsonPayload);
text = correctAsrText(text);
if (sessionId) {
contextKeywordTracker.updateSession(sessionId, normalizeKnowledgeAlias(text));
}
return text;
}
const THINKING_PATTERN = /^(首轮对话|用户想|用户问|应该回复|需要列举|可列举|突出特色|引导进一步|引导用户|让用户|回复后询问|语气要|用温和|需热情|需简洁|需专业)/;
const THINKING_MID_PATTERN = /(?:需客观回复|应说明其|回复后询问|引导.*对话|用.*口吻回复|语气要.*热情|需要.*引导|应该.*回复|先.*再.*最后)/;
function sanitizeAssistantText(text) {
if (!text) return text;
if (isBrandHarmful(text)) {
console.warn(`[NativeVoice][SafeGuard] blocked harmful content: ${JSON.stringify(text.slice(0, 200))}`);
return getVoiceSafeReply();
}
if (THINKING_PATTERN.test(text.trim())) {
console.warn(`[NativeVoice][SafeGuard] blocked thinking output: ${JSON.stringify(text.slice(0, 200))}`);
return null;
}
return text;
}
function isFinalUserPayload(jsonPayload) {
if (jsonPayload?.is_final === true) {
return true;
}
if (Array.isArray(jsonPayload?.results)) {
return jsonPayload.results.some((item) => item && item.is_interim === false);
}
return false;
}
function persistUserSpeech(session, text) {
const cleanText = (text || '').trim();
if (!cleanText) return false;
const now = Date.now();
if (session.lastPersistedUserText === cleanText && now - (session.lastPersistedUserAt || 0) < 5000) {
return false;
}
session.lastPersistedUserText = cleanText;
session.lastPersistedUserAt = now;
session.latestUserText = cleanText;
session.latestUserTurnSeq = (session.latestUserTurnSeq || 0) + 1;
resetIdleTimer(session);
db.addMessage(session.sessionId, 'user', cleanText, 'voice_asr').catch((e) => console.warn('[NativeVoice][DB] add user failed:', e.message));
redisClient.pushMessage(session.sessionId, { role: 'user', content: cleanText, source: 'voice_asr' }).catch(() => {});
sendJson(session.client, {
type: 'subtitle',
role: 'user',
text: cleanText,
isFinal: true,
sequence: `native_user_${now}`,
});
return true;
}
function persistAssistantSpeech(session, text, { source = 'voice_bot', toolName = null, persistToDb = true, meta = null } = {}) {
const cleanText = sanitizeAssistantText((text || '').trim());
if (!cleanText) return false;
const now = Date.now();
if (session.lastPersistedAssistantText === cleanText && now - (session.lastPersistedAssistantAt || 0) < 5000) {
return false;
}
session.lastPersistedAssistantText = cleanText;
session.lastPersistedAssistantAt = now;
resetIdleTimer(session);
if (persistToDb) {
db.addMessage(session.sessionId, 'assistant', cleanText, source, toolName, meta).catch((e) => console.warn('[NativeVoice][DB] add assistant failed:', e.message));
redisClient.pushMessage(session.sessionId, { role: 'assistant', content: cleanText, source }).catch(() => {});
}
sendJson(session.client, {
type: 'subtitle',
role: 'assistant',
text: cleanText,
isFinal: true,
source,
toolName,
sequence: `native_assistant_${now}`,
});
return true;
}
function appendAssistantStream(session, payload) {
const chunkText = extractRawText(payload);
if (!chunkText) {
return '';
}
const replyId = payload?.reply_id || '';
if (replyId && session.assistantStreamReplyId && session.assistantStreamReplyId !== replyId) {
session.assistantStreamBuffer = '';
}
session.assistantStreamReplyId = replyId || session.assistantStreamReplyId || '';
session.assistantStreamBuffer = `${session.assistantStreamBuffer || ''}${chunkText}`;
return session.assistantStreamBuffer;
}
function flushAssistantStream(session, { source = 'voice_bot', toolName = null, meta = null } = {}) {
const fullText = (session.assistantStreamBuffer || '').trim();
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
if (!fullText) {
return false;
}
return persistAssistantSpeech(session, fullText, { source, toolName, meta });
}
async function loadHandoffSummaryForVoice(session) {
try {
const history = await db.getHistoryForLLM(session.sessionId, 10);
if (!history.length) {
session.handoffSummary = '';
session.handoffSummaryUsed = false;
return;
}
session.handoffSummary = buildDeterministicHandoffSummary(history);
session.handoffSummaryUsed = false;
console.log(`[NativeVoice] Handoff summary prepared for ${session.sessionId}: ${session.handoffSummary ? 'yes' : 'no'}`);
} catch (error) {
session.handoffSummary = '';
session.handoffSummaryUsed = false;
console.warn('[NativeVoice] loadHandoffSummaryForVoice failed:', error.message);
}
}
function buildDeterministicHandoffSummary(messages = []) {
const normalizedMessages = (Array.isArray(messages) ? messages : [])
.filter((item) => item && (item.role === 'user' || item.role === 'assistant') && String(item.content || '').trim())
.slice(-8);
if (!normalizedMessages.length) {
return '';
}
const userMessages = normalizedMessages.filter((item) => item.role === 'user');
const currentQuestion = String(userMessages[userMessages.length - 1]?.content || '').trim();
const previousQuestion = String(userMessages[userMessages.length - 2]?.content || '').trim();
const assistantFacts = normalizedMessages
.filter((item) => item.role === 'assistant')
.slice(-2)
.map((item) => String(item.content || '').trim())
.filter(Boolean)
.map((item) => item.slice(0, 60))
.join('');
const parts = [];
if (currentQuestion) {
parts.push(`当前问题:${currentQuestion}`);
}
if (previousQuestion && previousQuestion !== currentQuestion) {
parts.push(`上一轮关注:${previousQuestion}`);
}
if (assistantFacts) {
parts.push(`已给信息:${assistantFacts}`);
}
return parts.join('');
}
async function sendSpeechText(session, speechText) {
const chunks = splitTextForSpeech(speechText);
if (!chunks.length || !session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
return;
}
console.log(`[NativeVoice] sendSpeechText start session=${session.sessionId} chunks=${chunks.length} textLen=${speechText.length}`);
session.currentSpeechText = speechText;
session.isSendingChatTTSText = true;
session.currentTtsType = 'chat_tts_text';
session.chatTTSUntil = Date.now() + estimateSpeechDurationMs(speechText) + 800;
clearTimeout(session.chatTTSTimer);
session.chatTTSTimer = setTimeout(() => {
session.chatTTSTimer = null;
if ((session.chatTTSUntil || 0) <= Date.now()) {
session.isSendingChatTTSText = false;
}
}, Math.max(200, session.chatTTSUntil - Date.now() + 50));
sendJson(session.client, { type: 'tts_reset', ttsType: 'chat_tts_text' });
for (let index = 0; index < chunks.length; index += 1) {
const chunk = chunks[index];
console.log(`[NativeVoice] sendSpeechText chunk session=${session.sessionId} index=${index + 1}/${chunks.length} len=${chunk.length} start=${index === 0} end=false text=${JSON.stringify(chunk.slice(0, 80))}`);
session.upstream.send(createChatTTSTextMessage(session.sessionId, {
start: index === 0,
end: false,
content: chunk,
}));
}
console.log(`[NativeVoice] sendSpeechText end session=${session.sessionId}`);
session.upstream.send(createChatTTSTextMessage(session.sessionId, {
start: false,
end: true,
content: '',
}));
}
function sendReady(session) {
if (session.readySent) {
return;
}
session.readySent = true;
sendJson(session.client, { type: 'ready' });
}
function sendGreeting(session) {
if (session.hasSentGreeting) {
sendReady(session);
return;
}
session.hasSentGreeting = true;
const greetingText = session.greetingText || DEFAULT_VOICE_GREETING;
console.log(`[NativeVoice] sendGreeting session=${session.sessionId} text=${JSON.stringify(greetingText.slice(0, 80))}`);
persistAssistantSpeech(session, greetingText, { source: 'voice_bot' });
clearTimeout(session.greetingTimer);
clearTimeout(session.readyTimer);
session.greetingSentAt = Date.now();
session.greetingProtectionUntil = Date.now() + 2000;
session.currentSpeechText = greetingText;
try {
session.upstream.send(createSayHelloMessage(session.sessionId, greetingText));
console.log(`[NativeVoice] sendSayHello event=300 session=${session.sessionId}`);
} catch (error) {
session.hasSentGreeting = false;
session.greetingProtectionUntil = 0;
console.warn('[NativeVoice] SayHello failed:', error.message);
}
sendReady(session);
}
async function replayGreeting(session) {
const greetingText = String(session.greetingText || '').trim();
if (!greetingText || !session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
return;
}
if (session.greetingSentAt && Date.now() - session.greetingSentAt < 6000) {
console.log(`[NativeVoice] replayGreeting skipped (too soon) session=${session.sessionId}`);
return;
}
console.log(`[NativeVoice] replayGreeting session=${session.sessionId} text=${JSON.stringify(greetingText.slice(0, 80))}`);
session.greetingSentAt = Date.now();
session.directSpeakUntil = Date.now() + estimateSpeechDurationMs(greetingText) + 800;
try {
session.upstream.send(createSayHelloMessage(session.sessionId, greetingText));
} catch (error) {
console.warn('[NativeVoice] replayGreeting SayHello failed:', error.message);
}
}
async function sendExternalRag(session, items) {
if (!session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
return;
}
const ragItems = Array.isArray(items) ? items.filter((item) => item && item.content) : [];
if (!ragItems.length) {
return;
}
session.upstream.send(createChatRAGTextMessage(session.sessionId, JSON.stringify(ragItems)));
}
function clearUpstreamSuppression(session) {
clearTimeout(session.suppressReplyTimer);
session.suppressReplyTimer = null;
session.suppressUpstreamUntil = 0;
session.awaitingUpstreamReply = false;
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
session.pendingAssistantTurnSeq = 0;
session.blockUpstreamAudio = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
}
function suppressUpstreamReply(session, durationMs) {
clearTimeout(session.suppressReplyTimer);
session.awaitingUpstreamReply = true;
session.suppressUpstreamUntil = Date.now() + Math.max(1000, durationMs);
session.suppressReplyTimer = setTimeout(() => {
session.suppressReplyTimer = null;
if ((session.suppressUpstreamUntil || 0) > Date.now()) {
return;
}
clearUpstreamSuppression(session);
}, Math.max(300, session.suppressUpstreamUntil - Date.now()));
}
async function processReply(session, text, turnSeq = session.latestUserTurnSeq || 0) {
const cleanText = (text || '').trim();
if (!cleanText) return;
if (session.processingReply) {
session.queuedUserText = cleanText;
session.queuedUserTurnSeq = turnSeq;
console.log(`[NativeVoice] processReply queued(busy) session=${session.sessionId} text=${JSON.stringify(cleanText.slice(0, 80))}`);
return;
}
const now = Date.now();
if (session.directSpeakUntil && now < session.directSpeakUntil) {
session.queuedUserText = cleanText;
session.queuedUserTurnSeq = turnSeq;
console.log(`[NativeVoice] processReply queued(speaking) session=${session.sessionId} waitMs=${session.directSpeakUntil - now} text=${JSON.stringify(cleanText.slice(0, 80))}`);
return;
}
const activeTurnSeq = turnSeq || session.latestUserTurnSeq || 0;
session.processingReply = true;
sendJson(session.client, { type: 'assistant_pending', active: true });
let isKnowledgeCandidate = shouldForceKnowledgeRoute(cleanText);
// KB话题保护窗口最近60秒内有KB hit当前轮不是纯闲聊/告别也视为KB候选
// 防止用户质疑/纠正产品信息时S2S自由编造如"粉末来的呀你搞错了吧"
const KB_PROTECTION_WINDOW_MS = 60000;
if (!isKnowledgeCandidate && session._lastKbHitAt && (Date.now() - session._lastKbHitAt < KB_PROTECTION_WINDOW_MS)) {
const isPureChitchat = /^(喂|你好|嗨|hi|hello|谢谢|谢谢你|谢谢啦|多谢|感谢|再见|拜拜|拜|好的|嗯|哦|行|没事了|不用了|可以了|好的谢谢|没问题|知道了|明白了|了解了|好嘞|好吧|行吧|ok|okay)[,。!?~\s]*$/i.test(cleanText);
if (!isPureChitchat) {
isKnowledgeCandidate = true;
console.log(`[NativeVoice] KB protection window active, promoting to kbCandidate session=${session.sessionId} lastKbHit=${Math.round((Date.now() - session._lastKbHitAt) / 1000)}s ago`);
}
}
if (isKnowledgeCandidate) {
session.blockUpstreamAudio = true;
suppressUpstreamReply(session, 30000);
sendJson(session.client, { type: 'tts_reset', reason: 'processing' });
// 过渡语已移除KB查询优化后延迟已降至~2.6s,无需填充
session._fillerActive = false;
}
console.log(`[NativeVoice] processReply start session=${session.sessionId} text=${JSON.stringify(cleanText.slice(0, 120))} blocked=${session.blockUpstreamAudio} kbCandidate=${isKnowledgeCandidate}`);
try {
// KB预查缓存消费如果partial阶段已启动预查且文本匹配直接使用缓存结果
let resolveResult = null;
if (isKnowledgeCandidate && session.pendingKbPrequery && session._kbPrequeryText) {
const preText = (session._kbPrequeryText || '').replace(/[啊哦嗯呢呀哎诶额,。!?、,.\s]/g, '');
const finalText = cleanText.replace(/[啊哦嗯呢呀哎诶额,。!?、,.\s]/g, '');
// 放宽相似度:子串包含 或 重叠字符占比>=60% 即视为匹配
let isSimilar = preText && finalText && (finalText.includes(preText) || preText.includes(finalText));
if (!isSimilar && preText && finalText) {
const shorter = preText.length <= finalText.length ? preText : finalText;
const longer = preText.length <= finalText.length ? finalText : preText;
let overlap = 0;
for (let i = 0; i < shorter.length; i++) {
if (longer.includes(shorter[i])) overlap++;
}
isSimilar = overlap / shorter.length >= 0.45;
}
if (isSimilar) {
const prequeryResult = await session.pendingKbPrequery;
// 只复用 hit 结果no-hit 可能因 partial 文本路由不完整,用完整文本 re-search
if (prequeryResult && prequeryResult.delivery !== 'upstream_chat') {
console.log(`[NativeVoice] using KB prequery cache (hit) session=${session.sessionId} preText=${JSON.stringify(session._kbPrequeryText.slice(0, 60))}`);
resolveResult = prequeryResult;
} else {
console.log(`[NativeVoice] prequery no-hit, re-searching with full text session=${session.sessionId} preText=${JSON.stringify((session._kbPrequeryText || '').slice(0, 40))} finalText=${JSON.stringify(cleanText.slice(0, 40))}`);
}
} else {
console.log(`[NativeVoice] KB prequery text mismatch, re-querying session=${session.sessionId} pre=${JSON.stringify(preText.slice(0, 40))} final=${JSON.stringify(finalText.slice(0, 40))}`);
}
}
session.pendingKbPrequery = null;
session._kbPrequeryText = '';
session._kbPrequeryStartedAt = 0;
if (!resolveResult) {
resolveResult = await resolveReply(session.sessionId, session, cleanText);
}
const { delivery, speechText, ragItems, source, toolName, routeDecision, responseMeta } = resolveResult;
if (activeTurnSeq !== (session.latestUserTurnSeq || 0)) {
console.log(`[NativeVoice] stale reply ignored session=${session.sessionId} activeTurn=${activeTurnSeq} latestTurn=${session.latestUserTurnSeq || 0}`);
clearUpstreamSuppression(session);
return;
}
if (delivery === 'upstream_chat') {
// kbCandidate 但 S2S 未调工具 → 放开 S2S 自然回复
// 依赖1) system prompt 品牌保护指令引导 S2S 调工具 2) isBrandHarmful 流式拦截兜底
if (isKnowledgeCandidate) {
console.log(`[NativeVoice] processReply kbCandidate+upstream_chat, unblock S2S session=${session.sessionId}`);
}
session.blockUpstreamAudio = false;
session._lastPartialAt = 0;
session.awaitingUpstreamReply = true;
session.pendingAssistantSource = 'voice_bot';
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = responseMeta;
session.pendingAssistantTurnSeq = activeTurnSeq;
console.log(`[NativeVoice] processReply handoff session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=upstream_chat`);
return;
}
if (delivery === 'external_rag') {
if (!session.blockUpstreamAudio) {
session.blockUpstreamAudio = true;
}
session.discardNextAssistantResponse = true;
sendJson(session.client, { type: 'tts_reset', reason: 'knowledge_hit' });
const ragContent = (ragItems || []).filter((item) => item && item.content);
if (ragContent.length > 0) {
console.log(`[NativeVoice] processReply sending external_rag to S2S session=${session.sessionId} route=${routeDecision?.route || 'unknown'} items=${ragContent.length}`);
// KB话题记忆记录本轮用户原始问题和时间戳用于保护窗口和追问enrichment
if (responseMeta?.hit !== false && responseMeta?.reason !== 'honest_fallback') {
session._lastKbTopic = cleanText;
session._lastKbHitAt = Date.now();
}
// 不提前发KB原文作字幕等S2S event 351返回实际语音文本后再更新字幕
// 这样字幕和语音保持一致S2S会基于RAG内容生成自然口语化的回答
session._pendingExternalRagReply = true;
await sendExternalRag(session, ragContent);
session.awaitingUpstreamReply = true;
session.pendingAssistantSource = source;
session.pendingAssistantToolName = toolName;
session.pendingAssistantMeta = responseMeta;
session.pendingAssistantTurnSeq = activeTurnSeq;
} else {
console.log(`[NativeVoice] processReply external_rag empty content, fallback to upstream session=${session.sessionId}`);
session.blockUpstreamAudio = false;
clearUpstreamSuppression(session);
}
return;
}
if (!speechText) {
console.log(`[NativeVoice] processReply empty session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=${delivery || 'unknown'}`);
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
return;
}
console.log(`[NativeVoice] processReply resolved session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=local_tts source=${source} tool=${toolName || 'chat'} speechLen=${speechText.length}`);
session.directSpeakUntil = Date.now() + estimateSpeechDurationMs(speechText) + 800;
suppressUpstreamReply(session, estimateSpeechDurationMs(speechText) + 1800);
session.lastDeliveredAssistantTurnSeq = activeTurnSeq;
persistAssistantSpeech(session, speechText, { source, toolName, meta: responseMeta });
await sendSpeechText(session, speechText);
} catch (error) {
console.error('[NativeVoice] processReply failed:', error.message);
sendJson(session.client, { type: 'error', error: error.message });
} finally {
session.processingReply = false;
if (!session.awaitingUpstreamReply) {
session.blockUpstreamAudio = false;
}
if (!session.awaitingUpstreamReply) {
sendJson(session.client, { type: 'assistant_pending', active: false });
}
const pending = session.queuedUserText;
const pendingTurnSeq = session.queuedUserTurnSeq || 0;
session.queuedUserText = '';
session.queuedUserTurnSeq = 0;
if (pending && pendingTurnSeq && pendingTurnSeq !== activeTurnSeq && (!session.directSpeakUntil || Date.now() >= session.directSpeakUntil)) {
setTimeout(() => {
session.blockUpstreamAudio = true;
processReply(session, pending, pendingTurnSeq).catch((err) => {
console.error('[NativeVoice] queued processReply failed:', err.message);
});
}, 200);
} else if (pending && pendingTurnSeq && pendingTurnSeq !== activeTurnSeq) {
const waitMs = Math.max(200, session.directSpeakUntil - Date.now() + 200);
clearTimeout(session.queuedReplyTimer);
session.queuedReplyTimer = setTimeout(() => {
session.queuedReplyTimer = null;
const queuedText = session.queuedUserText || pending;
const queuedTurnSeq = session.queuedUserTurnSeq || pendingTurnSeq;
session.queuedUserText = '';
session.queuedUserTurnSeq = 0;
session.blockUpstreamAudio = true;
processReply(session, queuedText, queuedTurnSeq).catch((err) => {
console.error('[NativeVoice] delayed queued processReply failed:', err.message);
});
}, waitMs);
}
}
}
function handleUpstreamMessage(session, data) {
let message;
try {
message = unmarshal(data);
} catch (error) {
console.warn('[NativeVoice] unmarshal failed:', error.message);
return;
}
if (message.type === MsgType.AUDIO_ONLY_SERVER) {
const isDefaultTts = !session.currentTtsType || session.currentTtsType === 'default';
const isSuppressingUpstreamAudio = (session.suppressUpstreamUntil || 0) > Date.now() && isDefaultTts;
// 用户刚停止说话后短暂阻止默认TTS音频给event 459的blockUpstreamAudio留时间生效
const isUserJustSpeaking = isDefaultTts && session._lastPartialAt && (Date.now() - session._lastPartialAt < 800);
// blockUpstreamAudio 生效时:仅放行 external_rag 和限时过渡语音频,其余全部阻断
// 修复:旧逻辑只阻断 isDefaultTts导致 chat_tts_text 窗口期 S2S 自主回复音频泄漏
const isBlockPassthrough = session.currentTtsType === 'external_rag' ||
(session._fillerActive && (session.chatTTSUntil || 0) > Date.now());
if ((session.blockUpstreamAudio && !isBlockPassthrough) || isSuppressingUpstreamAudio || isUserJustSpeaking) {
if (!session._audioBlockLogOnce) {
session._audioBlockLogOnce = true;
console.log(`[NativeVoice] audio blocked session=${session.sessionId} ttsType=${session.currentTtsType} block=${session.blockUpstreamAudio} suppress=${isSuppressingUpstreamAudio}`);
}
return;
}
session._audioBlockLogOnce = false;
if (session.client && session.client.readyState === WebSocket.OPEN) {
session.client.send(message.payload, { binary: true });
}
return;
}
const payload = parseJsonPayload(message);
if (message.type === MsgType.ERROR) {
console.error(`[NativeVoice] upstream error session=${session.sessionId} code=${message.event} payload=${message.payload.toString('utf8').slice(0, 200)}`);
sendJson(session.client, { type: 'error', error: message.payload.toString('utf8') });
return;
}
if (message.type !== MsgType.FULL_SERVER) {
return;
}
if (message.event === 150) {
session.upstreamReady = true;
console.log(`[NativeVoice] upstream ready session=${session.sessionId}`);
resetIdleTimer(session);
sendGreeting(session);
return;
}
if (message.event === 300) {
console.log(`[NativeVoice] SayHello response session=${session.sessionId}`);
return;
}
if (message.event === 350) {
session.currentTtsType = payload?.tts_type || '';
if (payload?.tts_type === 'chat_tts_text' && session.pendingGreetingAck) {
session.pendingGreetingAck = false;
clearTimeout(session.greetingAckTimer);
session.greetingAckTimer = null;
}
if (session.blockUpstreamAudio && payload?.tts_type === 'external_rag') {
session.blockUpstreamAudio = false;
session.suppressUpstreamUntil = 0;
clearTimeout(session.suppressReplyTimer);
session.suppressReplyTimer = null;
// 注意不清除discardNextAssistantResponse让它拦截S2S默认回复的残留event 351
// 该标记会在KB回复的event 550 chunks到达时自动清除
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
// 清除过渡语的chat TTS状态确保external_rag回复不被isLocalChatTTSTextActive拦截
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
session.currentSpeechText = '';
session._fillerActive = false;
clearTimeout(session.chatTTSTimer);
sendJson(session.client, { type: 'tts_reset', reason: 'rag_response_start' });
console.log(`[NativeVoice] unblock for external_rag tts session=${session.sessionId}`);
} else if (session.blockUpstreamAudio && payload?.tts_type === 'chat_tts_text') {
console.log(`[NativeVoice] chat_tts_text started, keeping block for S2S default response session=${session.sessionId}`);
}
console.log(`[NativeVoice] upstream tts_event session=${session.sessionId} ttsType=${payload?.tts_type || ''}`);
sendJson(session.client, { type: 'tts_event', payload });
return;
}
const isLocalChatTTSTextActive = !!session.isSendingChatTTSText && (session.chatTTSUntil || 0) > Date.now();
const isSuppressingUpstreamReply = (session.suppressUpstreamUntil || 0) > Date.now();
if (message.event === 351) {
if (isLocalChatTTSTextActive || session.blockUpstreamAudio || isSuppressingUpstreamReply) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
return;
}
if (session.discardNextAssistantResponse) {
session.discardNextAssistantResponse = false;
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] discarded stale assistant response (kb-nohit retrigger) session=${session.sessionId}`);
return;
}
const pendingAssistantSource = session.pendingAssistantSource || 'voice_bot';
const pendingAssistantToolName = session.pendingAssistantToolName || null;
const pendingAssistantMeta = session.pendingAssistantMeta || null;
const pendingAssistantTurnSeq = session.pendingAssistantTurnSeq || session.latestUserTurnSeq || 0;
session.awaitingUpstreamReply = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
if (pendingAssistantTurnSeq && session.lastDeliveredAssistantTurnSeq === pendingAssistantTurnSeq) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
console.log(`[NativeVoice] duplicate assistant final ignored (351) session=${session.sessionId} turn=${pendingAssistantTurnSeq}`);
return;
}
const assistantText = extractRawText(payload);
if (assistantText) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
// 过渡语的event 351不持久化直接丢弃
if (session._fillerActive) {
console.log(`[NativeVoice] discarded filler assistant text session=${session.sessionId}`);
session._fillerActive = false;
return;
}
// 清除external_rag等待标记KB回复已到达
if (session._pendingExternalRagReply) {
session._pendingExternalRagReply = false;
}
// 品牌安全检测:最终助手文本包含有害内容时,阻断音频并注入安全回复
if (isBrandHarmful(assistantText)) {
console.warn(`[NativeVoice][SafeGuard] harmful content in final assistant text, blocking session=${session.sessionId} text=${JSON.stringify(assistantText.slice(0, 120))}`);
session.blockUpstreamAudio = true;
sendJson(session.client, { type: 'tts_reset', reason: 'harmful_blocked' });
const safeReply = getVoiceSafeReply();
session.lastDeliveredAssistantTurnSeq = pendingAssistantTurnSeq;
persistAssistantSpeech(session, safeReply, { source: 'voice_bot' });
sendSpeechText(session, safeReply).catch((err) => {
console.warn('[NativeVoice][SafeGuard] sendSpeechText failed:', err.message);
});
} else {
console.log(`[NativeVoice] upstream assistant session=${session.sessionId} text=${JSON.stringify(assistantText.slice(0, 120))}`);
session.lastDeliveredAssistantTurnSeq = pendingAssistantTurnSeq;
persistAssistantSpeech(session, assistantText, {
source: pendingAssistantSource,
toolName: pendingAssistantToolName,
meta: pendingAssistantMeta,
});
// KB回复完成后重新阻断音频防止下一个问题的S2S默认回复在early block前泄露
if (session.currentTtsType === 'external_rag') {
session.blockUpstreamAudio = true;
console.log(`[NativeVoice] re-blocked after KB response session=${session.sessionId}`);
}
}
} else {
const didFlush = flushAssistantStream(session, {
source: pendingAssistantSource,
toolName: pendingAssistantToolName,
meta: pendingAssistantMeta,
});
if (didFlush) {
session.lastDeliveredAssistantTurnSeq = pendingAssistantTurnSeq;
}
}
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
return;
}
if (message.event === 550) {
// external_rag chunks到达时清除discardNextAssistantResponse默认回复的351已过或不会来
if (session.discardNextAssistantResponse && session.currentTtsType === 'external_rag') {
session.discardNextAssistantResponse = false;
console.log(`[NativeVoice] cleared discardNextAssistantResponse for external_rag stream session=${session.sessionId}`);
}
if (isLocalChatTTSTextActive || session.blockUpstreamAudio || isSuppressingUpstreamReply || session.discardNextAssistantResponse) {
return;
}
if (session.awaitingUpstreamReply) {
session.awaitingUpstreamReply = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
}
const fullText = appendAssistantStream(session, payload);
if (fullText) {
// 品牌安全检测S2S模型输出传销等负面内容时立即阻断音频并注入安全回复
if (fullText.length >= 4 && isBrandHarmful(fullText)) {
console.warn(`[NativeVoice][SafeGuard] harmful content detected in stream, blocking audio session=${session.sessionId} text=${JSON.stringify(fullText.slice(0, 120))}`);
session.blockUpstreamAudio = true;
session.discardNextAssistantResponse = true;
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
sendJson(session.client, { type: 'tts_reset', reason: 'harmful_blocked' });
// 注入安全回复语音,替代有害内容
const safeReply = getVoiceSafeReply();
persistAssistantSpeech(session, safeReply, { source: 'voice_bot' });
sendSpeechText(session, safeReply).catch((err) => {
console.warn('[NativeVoice][SafeGuard] sendSpeechText failed:', err.message);
});
return;
}
// 检测思考模式S2S模型输出分析/计划而非直接回答,立即阻断
if (fullText.length >= 10 && (THINKING_PATTERN.test(fullText.trim()) || THINKING_MID_PATTERN.test(fullText))) {
console.warn(`[NativeVoice][SafeGuard] thinking detected in stream, blocking audio session=${session.sessionId} text=${JSON.stringify(fullText.slice(0, 120))}`);
session.blockUpstreamAudio = true;
session.discardNextAssistantResponse = true;
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
sendJson(session.client, { type: 'tts_reset', reason: 'thinking_blocked' });
return;
}
console.log(`[NativeVoice] upstream assistant chunk session=${session.sessionId} len=${fullText.length} text=${JSON.stringify(fullText.slice(0, 120))}`);
}
return;
}
if (message.event === 559) {
if (isLocalChatTTSTextActive || isSuppressingUpstreamReply) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
return;
}
if (session.blockUpstreamAudio) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] blocked response ended (559), keeping block session=${session.sessionId}`);
return;
}
if (session.discardNextAssistantResponse) {
session.discardNextAssistantResponse = false;
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] discarded stale stream end (559, kb-nohit retrigger) session=${session.sessionId}`);
return;
}
// external_rag流期间阻止默认回复的559过早flush部分KB文本
if (session._pendingExternalRagReply) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] suppressed 559 flush during external_rag flow session=${session.sessionId}`);
return;
}
const pendingAssistantTurnSeq = session.pendingAssistantTurnSeq || session.latestUserTurnSeq || 0;
if (pendingAssistantTurnSeq && session.lastDeliveredAssistantTurnSeq === pendingAssistantTurnSeq) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
console.log(`[NativeVoice] duplicate assistant final ignored (559) session=${session.sessionId} turn=${pendingAssistantTurnSeq}`);
return;
}
session.awaitingUpstreamReply = false;
session.blockUpstreamAudio = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
const didFlush = flushAssistantStream(session, {
source: session.pendingAssistantSource || 'voice_bot',
toolName: session.pendingAssistantToolName || null,
meta: session.pendingAssistantMeta || null,
});
if (didFlush) {
session.lastDeliveredAssistantTurnSeq = pendingAssistantTurnSeq;
}
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
return;
}
if (message.event === 450 || (message.event === 451 && !isFinalUserPayload(payload))) {
const text = extractUserText(payload, session.sessionId);
if (text) {
const now = Date.now();
const isDirectSpeaking = session.directSpeakUntil && now < session.directSpeakUntil;
const isChatTTSSpeaking = session.isSendingChatTTSText && (session.chatTTSUntil || 0) > now;
// TTS回声检测播放期间如果ASR识别文本是当前播放文本的子串判定为回声忽略
if ((isDirectSpeaking || isChatTTSSpeaking) && session.currentSpeechText) {
const normalizedPartial = text.replace(/[,。!?、,.\s]/g, '');
const normalizedSpeech = session.currentSpeechText.replace(/[,。!?、,.\s]/g, '');
if (normalizedPartial.length <= 3 || normalizedSpeech.includes(normalizedPartial)) {
if (!session._echoLogOnce) {
session._echoLogOnce = true;
console.log(`[NativeVoice] TTS echo detected, ignoring partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 80))}`);
}
return;
}
session._echoLogOnce = false;
} else {
session._echoLogOnce = false;
}
// Greeting保护窗口发送问候语后短暂保护期内忽略barge-in
if (session.greetingProtectionUntil && now < session.greetingProtectionUntil) {
console.log(`[NativeVoice] greeting protection active, ignoring partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 80))}`);
return;
}
console.log(`[NativeVoice] upstream partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 120))}`);
const normalizedPartial = normalizeKnowledgeAlias(text);
session.latestUserText = normalizedPartial;
session._lastPartialAt = now;
// 提前阻断部分识别文字含知识库关键词时立即阻断S2S音频防止有害内容播出
if (normalizedPartial.length >= 6 && !session.blockUpstreamAudio && shouldForceKnowledgeRoute(normalizedPartial)) {
session.blockUpstreamAudio = true;
session.currentTtsType = 'default';
// 立即清除客户端已收到的S2S音频防止用户听到抢答片段
sendJson(session.client, { type: 'tts_reset', reason: 'early_block' });
console.log(`[NativeVoice] early block: partial text matched KB keywords session=${session.sessionId} text=${JSON.stringify(text.slice(0, 80))}`);
// KB预查询提前启动知识库查询减少final ASR后的等待时间
const kbPrequeryDebounce = 600;
if (normalizedPartial.length >= 8 && (!session._kbPrequeryStartedAt || now - session._kbPrequeryStartedAt > kbPrequeryDebounce)) {
session._kbPrequeryStartedAt = now;
session._kbPrequeryText = normalizedPartial;
console.log(`[NativeVoice] KB prequery started session=${session.sessionId} text=${JSON.stringify(normalizedPartial.slice(0, 80))}`);
session.pendingKbPrequery = resolveReply(session.sessionId, session, normalizedPartial).catch((err) => {
console.warn(`[NativeVoice] KB prequery failed session=${session.sessionId}:`, err.message);
return null;
});
}
}
// 用户开口说话时立即打断所有 AI 播放(包括 S2S external_rag 音频)
const isS2SAudioPlaying = !session.blockUpstreamAudio && session.currentTtsType === 'external_rag';
if (isDirectSpeaking || isChatTTSSpeaking || isS2SAudioPlaying) {
console.log(`[NativeVoice] user barge-in (partial) session=${session.sessionId} direct=${isDirectSpeaking} chatTTS=${isChatTTSSpeaking} s2sRag=${isS2SAudioPlaying}`);
session.directSpeakUntil = 0;
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
session.currentSpeechText = '';
clearTimeout(session.chatTTSTimer);
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
clearUpstreamSuppression(session);
}
// 阻断 S2S 音频转发,防止用户打断后仍听到残留音频
session.blockUpstreamAudio = true;
}
// 无论当前是否在播放,都发送 tts_reset 确保客户端停止所有音频播放
if (!session._lastBargeInResetAt || now - session._lastBargeInResetAt > 500) {
session._lastBargeInResetAt = now;
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
}
sendJson(session.client, {
type: 'subtitle',
role: 'user',
text: text,
isFinal: false,
sequence: `native_partial_${Date.now()}`,
});
}
return;
}
if (message.event === 459 || (message.event === 451 && isFinalUserPayload(payload))) {
const rawFinalText = extractUserText(payload, session.sessionId) || '';
const finalText = normalizeKnowledgeAlias(rawFinalText) || session.latestUserText || '';
const now459 = Date.now();
// 双事件去重S2S可能同时发送event 459和event 451(is_final),用去标点归一化文本+时间窗口去重
const normalizedForDedup = finalText.replace(/[,。!?、,.?!\s]/g, '');
if (normalizedForDedup && session._lastFinalNormalized === normalizedForDedup && now459 - (session._lastFinalAt || 0) < 1500) {
console.log(`[NativeVoice] duplicate final ignored (event=${message.event}) session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 80))}`);
return;
}
session._lastFinalNormalized = normalizedForDedup;
session._lastFinalAt = now459;
// TTS回声检测final级别播放期间ASR最终识别文本如果是当前播放文本的子串判定为回声
const isDirectSpeaking459 = session.directSpeakUntil && now459 < session.directSpeakUntil;
const isChatTTSSpeaking459 = session.isSendingChatTTSText && (session.chatTTSUntil || 0) > now459;
if ((isDirectSpeaking459 || isChatTTSSpeaking459) && session.currentSpeechText && finalText) {
const normalizedFinal = finalText.replace(/[,。!?、,.\s]/g, '');
const normalizedSpeech = session.currentSpeechText.replace(/[,。!?、,.\s]/g, '');
if (normalizedFinal.length <= 4 || normalizedSpeech.includes(normalizedFinal)) {
console.log(`[NativeVoice] TTS echo detected in final, ignoring session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 80))}`);
return;
}
}
// Greeting保护窗口
if (session.greetingProtectionUntil && now459 < session.greetingProtectionUntil && finalText) {
console.log(`[NativeVoice] greeting protection active, ignoring final session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 80))}`);
return;
}
console.log(`[NativeVoice] upstream final session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 120))}`);
if (session.directSpeakUntil && Date.now() < session.directSpeakUntil) {
console.log(`[NativeVoice] user interrupt during speaking session=${session.sessionId}`);
session.directSpeakUntil = 0;
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
session.currentSpeechText = '';
clearTimeout(session.chatTTSTimer);
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
clearUpstreamSuppression(session);
}
} else if (session.isSendingChatTTSText && (session.chatTTSUntil || 0) > Date.now()) {
console.log(`[NativeVoice] user interrupt chatTTS during speaking session=${session.sessionId}`);
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
session.currentSpeechText = '';
clearTimeout(session.chatTTSTimer);
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
clearUpstreamSuppression(session);
}
}
if (persistUserSpeech(session, rawFinalText || finalText)) {
session.blockUpstreamAudio = true;
session.currentTtsType = 'default';
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
sendJson(session.client, { type: 'tts_reset', reason: 'new_turn' });
processReply(session, finalText).catch((error) => {
console.error('[NativeVoice] processReply error:', error.message);
});
}
return;
}
sendJson(session.client, {
type: 'event',
event: message.event,
payload,
});
}
function attachClientHandlers(session) {
session.client.on('message', async (raw, isBinary) => {
if (isBinary) {
if (session.upstream && session.upstream.readyState === WebSocket.OPEN && session.upstreamReady) {
session.upstream.send(createAudioMessage(session.sessionId, raw));
}
return;
}
let parsed;
try {
parsed = JSON.parse(raw.toString('utf8'));
} catch (error) {
sendJson(session.client, { type: 'error', error: 'invalid client json' });
return;
}
if (parsed.type === 'start') {
session.userId = parsed.userId || session.userId || null;
const remoteProfileResult = await getAssistantProfile({ userId: session.userId });
const assistantProfile = resolveAssistantProfile({
...(remoteProfileResult.profile || {}),
...(session.assistantProfile || {}),
...((parsed.assistantProfile && typeof parsed.assistantProfile === 'object') ? parsed.assistantProfile : {}),
});
session.assistantProfile = assistantProfile;
session.botName = parsed.botName || assistantProfile.nickname || DEFAULT_VOICE_BOT_NAME;
session.systemRole = parsed.systemRole || buildVoiceSystemRole(assistantProfile);
session.speakingStyle = parsed.speakingStyle || session.speakingStyle || DEFAULT_VOICE_SPEAKING_STYLE;
session.speaker = parsed.speaker || process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts';
session.modelVersion = parsed.modelVersion || 'O';
session.greetingText = parsed.greetingText || buildVoiceGreeting(assistantProfile);
// 立即发送 ready不等 upstream event 150大幅缩短前端等待时间
sendReady(session);
session.upstream = createUpstreamConnection(session);
loadHandoffSummaryForVoice(session).catch((error) => {
console.warn('[NativeVoice] async loadHandoffSummaryForVoice failed:', error.message);
});
return;
}
if (parsed.type === 'stop') {
session.client.close();
return;
}
if (parsed.type === 'replay_greeting') {
replayGreeting(session).catch((error) => {
console.warn('[NativeVoice] replayGreeting failed:', error.message);
});
return;
}
if (parsed.type === 'text' && parsed.text) {
if (persistUserSpeech(session, parsed.text)) {
processReply(session, parsed.text, session.latestUserTurnSeq).catch((error) => {
console.error('[NativeVoice] text processReply failed:', error.message);
});
}
}
});
session.client.on('close', () => {
clearTimeout(session.chatTTSTimer);
clearTimeout(session.greetingTimer);
clearTimeout(session.greetingAckTimer);
clearTimeout(session.readyTimer);
clearTimeout(session.suppressReplyTimer);
clearTimeout(session.idleTimer);
if (session.upstream && session.upstream.readyState === WebSocket.OPEN) {
session.upstream.close();
}
sessions.delete(session.sessionId);
});
}
function createUpstreamConnection(session) {
const upstream = new WebSocket('wss://openspeech.bytedance.com/api/v3/realtime/dialogue', {
headers: {
'X-Api-Resource-Id': 'volc.speech.dialog',
'X-Api-Access-Key': process.env.VOLC_S2S_TOKEN,
'X-Api-App-Key': process.env.VOLC_DIALOG_APP_KEY || 'PlgvMymc7f3tQnJ6',
'X-Api-App-ID': process.env.VOLC_S2S_APP_ID,
'X-Api-Connect-Id': session.sessionId,
},
});
upstream.on('open', () => {
upstream.send(createStartConnectionMessage());
upstream.send(createStartSessionMessage(session.sessionId, buildStartSessionPayload(session)));
});
upstream.on('message', (data, isBinary) => {
if (!isBinary && typeof data === 'string') {
sendJson(session.client, { type: 'server_text', text: data });
return;
}
handleUpstreamMessage(session, Buffer.isBuffer(data) ? data : Buffer.from(data));
});
upstream.on('error', (error) => {
console.error('[NativeVoice] upstream ws error:', error.message);
sendJson(session.client, { type: 'error', error: `语音服务连接异常: ${error.message}` });
});
upstream.on('close', (code) => {
console.log(`[NativeVoice] upstream closed session=${session.sessionId} code=${code}`);
session.upstreamReady = false;
sendJson(session.client, { type: 'upstream_closed', code });
setTimeout(() => {
if (session.client && session.client.readyState === WebSocket.OPEN) {
session.client.close();
}
}, 3000);
});
return upstream;
}
function createSession(client, sessionId) {
const assistantProfile = resolveAssistantProfile();
const session = {
sessionId,
client,
upstream: null,
upstreamReady: false,
isSendingChatTTSText: false,
latestUserText: '',
latestUserTurnSeq: 0,
queuedUserText: '',
queuedUserTurnSeq: 0,
processingReply: false,
blockUpstreamAudio: false,
directSpeakUntil: 0,
queuedReplyTimer: null,
lastPersistedAssistantText: '',
lastPersistedAssistantAt: 0,
assistantStreamBuffer: '',
assistantStreamReplyId: '',
currentTtsType: '',
currentSpeechText: '',
greetingProtectionUntil: 0,
_echoLogOnce: false,
_fillerActive: false,
_pendingExternalRagReply: false,
_lastPartialAt: 0,
pendingKbPrequery: null,
_kbPrequeryText: '',
_kbPrequeryStartedAt: 0,
_lastKbTopic: '',
_lastKbHitAt: 0,
assistantProfile,
botName: assistantProfile.nickname,
systemRole: buildVoiceSystemRole(assistantProfile),
speakingStyle: DEFAULT_VOICE_SPEAKING_STYLE,
speaker: process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts',
modelVersion: 'O',
greetingText: buildVoiceGreeting(assistantProfile),
hasSentGreeting: false,
greetingTimer: null,
greetingAckTimer: null,
pendingGreetingAck: false,
greetingRetryCount: 0,
readyTimer: null,
readySent: false,
handoffSummary: '',
handoffSummaryUsed: false,
awaitingUpstreamReply: false,
pendingAssistantSource: null,
pendingAssistantToolName: null,
pendingAssistantMeta: null,
pendingAssistantTurnSeq: 0,
lastDeliveredAssistantTurnSeq: 0,
suppressReplyTimer: null,
suppressUpstreamUntil: 0,
idleTimer: null,
lastActivityAt: Date.now(),
_lastBargeInResetAt: 0,
_audioBlockLogOnce: false,
_lastFinalNormalized: '',
_lastFinalAt: 0,
};
sessions.set(sessionId, session);
attachClientHandlers(session);
return session;
}
function setupNativeVoiceGateway(server) {
const wss = new WebSocketServer({ server, path: '/ws/realtime-dialog' });
wss.on('connection', async (client, req) => {
const parsed = url.parse(req.url, true);
const sessionId = parsed.query?.sessionId;
if (!sessionId) {
client.close();
return;
}
const userId = parsed.query?.userId || null;
const session = createSession(client, sessionId);
session.userId = userId;
try {
await db.createSession(sessionId, userId, 'voice');
} catch (error) {
console.warn('[NativeVoice][DB] createSession failed:', error.message);
}
sendJson(client, { type: 'connected', sessionId });
});
return wss;
}
module.exports = {
setupNativeVoiceGateway,
};