Files
bigwo/test2/server/services/nativeVoiceGateway.js

1198 lines
52 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const { WebSocket, WebSocketServer } = require('ws');
const url = require('url');
const db = require('../db');
const { correctAsrText } = require('./fastAsrCorrector');
const contextKeywordTracker = require('./contextKeywordTracker');
const { isBrandHarmful, getVoiceSafeReply, BRAND_HARMFUL_PATTERN, BRAND_POSITIVE_LEGALITY_PATTERN } = require('./contentSafeGuard');
const {
MsgType,
unmarshal,
createStartConnectionMessage,
createStartSessionMessage,
createAudioMessage,
createChatTTSTextMessage,
createSayHelloMessage,
createChatRAGTextMessage,
} = require('./realtimeDialogProtocol');
const {
getRuleBasedDirectRouteDecision,
normalizeKnowledgeAlias,
normalizeTextForSpeech,
splitTextForSpeech,
estimateSpeechDurationMs,
shouldForceKnowledgeRoute,
resolveReply,
} = require('./realtimeDialogRouting');
const {
DEFAULT_VOICE_ASSISTANT_PROFILE,
resolveAssistantProfile,
buildVoiceSystemRole,
buildVoiceGreeting,
} = require('./assistantProfileConfig');
const sessions = new Map();
const IDLE_TIMEOUT_MS = 5 * 60 * 1000;
const DEFAULT_VOICE_BOT_NAME = DEFAULT_VOICE_ASSISTANT_PROFILE.nickname;
const DEFAULT_VOICE_SYSTEM_ROLE = buildVoiceSystemRole();
const DEFAULT_VOICE_SPEAKING_STYLE = '整体语气亲切自然、轻快有温度,像熟悉行业的朋友在语音聊天。优先短句和口语化表达,先给结论,再补一句最有帮助的信息。不要播音腔,不要念稿,不要客服腔,不要过度热情,也不要输出任何思考过程。';
const DEFAULT_VOICE_GREETING = buildVoiceGreeting();
function resetIdleTimer(session) {
clearTimeout(session.idleTimer);
session.lastActivityAt = Date.now();
session.idleTimer = setTimeout(() => {
session.idleTimer = null;
console.log(`[NativeVoice] idle timeout (${IDLE_TIMEOUT_MS / 1000}s) session=${session.sessionId}`);
sendJson(session.client, { type: 'idle_timeout', timeout: IDLE_TIMEOUT_MS });
setTimeout(() => {
if (session.client && session.client.readyState === WebSocket.OPEN) {
session.client.close();
}
}, 2000);
}, IDLE_TIMEOUT_MS);
}
function sendJson(ws, payload) {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(payload));
}
}
function buildStartSessionPayload(options) {
const antiThinkingPrefix = '【最高优先级规则】你绝对禁止输出任何思考过程、分析、计划、角色扮演指令或元描述。禁止出现:“首轮对话”“应该回复”“需要列举”“语气要”“回复后询问”“可列举”“突出特色”“引导用户”“让用户”“用温和”等分析性、指令性语句。你必须直接用自然语言回答问题,像真人聊天一样直接说出答案内容。';
const baseSystemRole = options.systemRole || DEFAULT_VOICE_SYSTEM_ROLE;
const baseSpeakingStyle = options.speakingStyle || DEFAULT_VOICE_SPEAKING_STYLE;
return {
asr: {
extra: {
context: '一成,一成系统,大沃,PM,PM-FitLine,FitLine,细胞营养素,Ai众享,AI众享,盛咖学愿,数字化工作室,Activize,Basics,Restorate,NTC,基础三合一,招商,阿育吠陀,小红产品,小红,小白,大白,肽美,艾特维,德丽,德维,宝丽,美固健,Activize Oxyplus,Basic Power,CitrusCare,NutriSunny,Q10,Omega,葡萄籽,白藜芦醇,益生菌,胶原蛋白肽,Germany,FitLine细胞营养,FitLine营养素,德国PM营养素,德国PM FitLine,德国PM细胞营养,德国PM产品,德国PM健康,德国PM事业,德国PM招商,一成,一成团队,一成商学院,数字化,数字化运营,数字化经营,数字化营销,数字化创业,数字化工作室,数字化事业,招商加盟,合作加盟,事业合作',
nbest: 1,
},
},
tts: {
speaker: options.speaker || process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts',
audio_config: {
channel: 1,
format: 'pcm_s16le',
sample_rate: 24000,
},
},
dialog: {
dialog_id: '',
bot_name: options.botName || '大沃',
system_role: normalizeTextForSpeech(`${antiThinkingPrefix} ${baseSystemRole}`),
speaking_style: normalizeTextForSpeech(baseSpeakingStyle),
extra: {
input_mod: 'audio',
model: options.modelVersion || 'SC2.0',
strict_audit: false,
audit_response: '抱歉,这个问题我暂时无法回答。',
},
},
};
}
function parseJsonPayload(message) {
try {
return JSON.parse(message.payload.toString('utf8'));
} catch (error) {
return null;
}
}
function extractRawText(jsonPayload) {
const text = jsonPayload?.text
|| jsonPayload?.content
|| jsonPayload?.results?.[0]?.text
|| jsonPayload?.results?.[0]?.alternatives?.[0]?.text
|| '';
return String(text || '').trim();
}
function extractUserText(jsonPayload, sessionId = null) {
let text = extractRawText(jsonPayload);
text = correctAsrText(text);
if (sessionId) {
contextKeywordTracker.updateSession(sessionId, normalizeKnowledgeAlias(text));
}
return text;
}
const THINKING_PATTERN = /^(首轮对话|用户想|用户问|应该回复|需要列举|可列举|突出特色|引导进一步|引导用户|让用户|回复后询问|语气要|用温和|需热情|需简洁|需专业)/;
const THINKING_MID_PATTERN = /(?:需客观回复|应说明其|回复后询问|引导.*对话|用.*口吻回复|语气要.*热情|需要.*引导|应该.*回复|先.*再.*最后)/;
function sanitizeAssistantText(text) {
if (!text) return text;
if (isBrandHarmful(text)) {
console.warn(`[NativeVoice][SafeGuard] blocked harmful content: ${JSON.stringify(text.slice(0, 200))}`);
return getVoiceSafeReply();
}
if (THINKING_PATTERN.test(text.trim())) {
console.warn(`[NativeVoice][SafeGuard] blocked thinking output: ${JSON.stringify(text.slice(0, 200))}`);
return null;
}
return text;
}
function isFinalUserPayload(jsonPayload) {
if (jsonPayload?.is_final === true) {
return true;
}
if (Array.isArray(jsonPayload?.results)) {
return jsonPayload.results.some((item) => item && item.is_interim === false);
}
return false;
}
function persistUserSpeech(session, text) {
const cleanText = (text || '').trim();
if (!cleanText) return false;
const now = Date.now();
if (session.lastPersistedUserText === cleanText && now - (session.lastPersistedUserAt || 0) < 5000) {
return false;
}
session.lastPersistedUserText = cleanText;
session.lastPersistedUserAt = now;
session.latestUserText = cleanText;
session.latestUserTurnSeq = (session.latestUserTurnSeq || 0) + 1;
resetIdleTimer(session);
db.addMessage(session.sessionId, 'user', cleanText, 'voice_asr').catch((e) => console.warn('[NativeVoice][DB] add user failed:', e.message));
sendJson(session.client, {
type: 'subtitle',
role: 'user',
text: cleanText,
isFinal: true,
sequence: `native_user_${now}`,
});
return true;
}
function persistAssistantSpeech(session, text, { source = 'voice_bot', toolName = null, persistToDb = true, meta = null } = {}) {
const cleanText = sanitizeAssistantText((text || '').trim());
if (!cleanText) return false;
const now = Date.now();
if (session.lastPersistedAssistantText === cleanText && now - (session.lastPersistedAssistantAt || 0) < 5000) {
return false;
}
session.lastPersistedAssistantText = cleanText;
session.lastPersistedAssistantAt = now;
resetIdleTimer(session);
if (persistToDb) {
db.addMessage(session.sessionId, 'assistant', cleanText, source, toolName, meta).catch((e) => console.warn('[NativeVoice][DB] add assistant failed:', e.message));
}
sendJson(session.client, {
type: 'subtitle',
role: 'assistant',
text: cleanText,
isFinal: true,
source,
toolName,
sequence: `native_assistant_${now}`,
});
return true;
}
function appendAssistantStream(session, payload) {
const chunkText = extractRawText(payload);
if (!chunkText) {
return '';
}
const replyId = payload?.reply_id || '';
if (replyId && session.assistantStreamReplyId && session.assistantStreamReplyId !== replyId) {
session.assistantStreamBuffer = '';
}
session.assistantStreamReplyId = replyId || session.assistantStreamReplyId || '';
session.assistantStreamBuffer = `${session.assistantStreamBuffer || ''}${chunkText}`;
return session.assistantStreamBuffer;
}
function flushAssistantStream(session, { source = 'voice_bot', toolName = null, meta = null } = {}) {
const fullText = (session.assistantStreamBuffer || '').trim();
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
if (!fullText) {
return false;
}
return persistAssistantSpeech(session, fullText, { source, toolName, meta });
}
async function loadHandoffSummaryForVoice(session) {
try {
const history = await db.getHistoryForLLM(session.sessionId, 10);
if (!history.length) {
session.handoffSummary = '';
session.handoffSummaryUsed = false;
return;
}
session.handoffSummary = buildDeterministicHandoffSummary(history);
session.handoffSummaryUsed = false;
console.log(`[NativeVoice] Handoff summary prepared for ${session.sessionId}: ${session.handoffSummary ? 'yes' : 'no'}`);
} catch (error) {
session.handoffSummary = '';
session.handoffSummaryUsed = false;
console.warn('[NativeVoice] loadHandoffSummaryForVoice failed:', error.message);
}
}
function buildDeterministicHandoffSummary(messages = []) {
const normalizedMessages = (Array.isArray(messages) ? messages : [])
.filter((item) => item && (item.role === 'user' || item.role === 'assistant') && String(item.content || '').trim())
.slice(-8);
if (!normalizedMessages.length) {
return '';
}
const userMessages = normalizedMessages.filter((item) => item.role === 'user');
const currentQuestion = String(userMessages[userMessages.length - 1]?.content || '').trim();
const previousQuestion = String(userMessages[userMessages.length - 2]?.content || '').trim();
const assistantFacts = normalizedMessages
.filter((item) => item.role === 'assistant')
.slice(-2)
.map((item) => String(item.content || '').trim())
.filter(Boolean)
.map((item) => item.slice(0, 60))
.join('');
const parts = [];
if (currentQuestion) {
parts.push(`当前问题:${currentQuestion}`);
}
if (previousQuestion && previousQuestion !== currentQuestion) {
parts.push(`上一轮关注:${previousQuestion}`);
}
if (assistantFacts) {
parts.push(`已给信息:${assistantFacts}`);
}
return parts.join('');
}
async function sendSpeechText(session, speechText) {
const chunks = splitTextForSpeech(speechText);
if (!chunks.length || !session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
return;
}
console.log(`[NativeVoice] sendSpeechText start session=${session.sessionId} chunks=${chunks.length} textLen=${speechText.length}`);
session.currentSpeechText = speechText;
session.isSendingChatTTSText = true;
session.currentTtsType = 'chat_tts_text';
session.chatTTSUntil = Date.now() + estimateSpeechDurationMs(speechText) + 800;
clearTimeout(session.chatTTSTimer);
session.chatTTSTimer = setTimeout(() => {
session.chatTTSTimer = null;
if ((session.chatTTSUntil || 0) <= Date.now()) {
session.isSendingChatTTSText = false;
}
}, Math.max(200, session.chatTTSUntil - Date.now() + 50));
sendJson(session.client, { type: 'tts_reset', ttsType: 'chat_tts_text' });
for (let index = 0; index < chunks.length; index += 1) {
const chunk = chunks[index];
console.log(`[NativeVoice] sendSpeechText chunk session=${session.sessionId} index=${index + 1}/${chunks.length} len=${chunk.length} start=${index === 0} end=false text=${JSON.stringify(chunk.slice(0, 80))}`);
session.upstream.send(createChatTTSTextMessage(session.sessionId, {
start: index === 0,
end: false,
content: chunk,
}));
}
console.log(`[NativeVoice] sendSpeechText end session=${session.sessionId}`);
session.upstream.send(createChatTTSTextMessage(session.sessionId, {
start: false,
end: true,
content: '',
}));
}
function sendReady(session) {
if (session.readySent) {
return;
}
session.readySent = true;
sendJson(session.client, { type: 'ready' });
}
function sendGreeting(session) {
if (session.hasSentGreeting) {
sendReady(session);
return;
}
session.hasSentGreeting = true;
const greetingText = session.greetingText || DEFAULT_VOICE_GREETING;
console.log(`[NativeVoice] sendGreeting session=${session.sessionId} text=${JSON.stringify(greetingText.slice(0, 80))}`);
persistAssistantSpeech(session, greetingText, { source: 'voice_bot' });
clearTimeout(session.greetingTimer);
clearTimeout(session.readyTimer);
session.greetingSentAt = Date.now();
session.greetingProtectionUntil = Date.now() + 2000;
session.currentSpeechText = greetingText;
try {
session.upstream.send(createSayHelloMessage(session.sessionId, greetingText));
console.log(`[NativeVoice] sendSayHello event=300 session=${session.sessionId}`);
} catch (error) {
session.hasSentGreeting = false;
session.greetingProtectionUntil = 0;
console.warn('[NativeVoice] SayHello failed:', error.message);
}
sendReady(session);
}
async function replayGreeting(session) {
const greetingText = String(session.greetingText || '').trim();
if (!greetingText || !session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
return;
}
if (session.greetingSentAt && Date.now() - session.greetingSentAt < 6000) {
console.log(`[NativeVoice] replayGreeting skipped (too soon) session=${session.sessionId}`);
return;
}
console.log(`[NativeVoice] replayGreeting session=${session.sessionId} text=${JSON.stringify(greetingText.slice(0, 80))}`);
session.greetingSentAt = Date.now();
session.directSpeakUntil = Date.now() + estimateSpeechDurationMs(greetingText) + 800;
try {
session.upstream.send(createSayHelloMessage(session.sessionId, greetingText));
} catch (error) {
console.warn('[NativeVoice] replayGreeting SayHello failed:', error.message);
}
}
async function sendExternalRag(session, items) {
if (!session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
return;
}
const ragItems = Array.isArray(items) ? items.filter((item) => item && item.content) : [];
if (!ragItems.length) {
return;
}
session.upstream.send(createChatRAGTextMessage(session.sessionId, JSON.stringify(ragItems)));
}
function clearUpstreamSuppression(session) {
clearTimeout(session.suppressReplyTimer);
session.suppressReplyTimer = null;
session.suppressUpstreamUntil = 0;
session.awaitingUpstreamReply = false;
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
session.pendingAssistantTurnSeq = 0;
session.blockUpstreamAudio = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
}
function suppressUpstreamReply(session, durationMs) {
clearTimeout(session.suppressReplyTimer);
session.awaitingUpstreamReply = true;
session.suppressUpstreamUntil = Date.now() + Math.max(1000, durationMs);
session.suppressReplyTimer = setTimeout(() => {
session.suppressReplyTimer = null;
if ((session.suppressUpstreamUntil || 0) > Date.now()) {
return;
}
clearUpstreamSuppression(session);
}, Math.max(300, session.suppressUpstreamUntil - Date.now()));
}
async function processReply(session, text, turnSeq = session.latestUserTurnSeq || 0) {
const cleanText = (text || '').trim();
if (!cleanText) return;
if (session.processingReply) {
session.queuedUserText = cleanText;
session.queuedUserTurnSeq = turnSeq;
console.log(`[NativeVoice] processReply queued(busy) session=${session.sessionId} text=${JSON.stringify(cleanText.slice(0, 80))}`);
return;
}
const now = Date.now();
if (session.directSpeakUntil && now < session.directSpeakUntil) {
session.queuedUserText = cleanText;
session.queuedUserTurnSeq = turnSeq;
console.log(`[NativeVoice] processReply queued(speaking) session=${session.sessionId} waitMs=${session.directSpeakUntil - now} text=${JSON.stringify(cleanText.slice(0, 80))}`);
return;
}
const activeTurnSeq = turnSeq || session.latestUserTurnSeq || 0;
session.processingReply = true;
sendJson(session.client, { type: 'assistant_pending', active: true });
let isKnowledgeCandidate = shouldForceKnowledgeRoute(cleanText);
// KB话题保护窗口最近60秒内有KB hit当前轮不是纯闲聊/告别也视为KB候选
// 防止用户质疑/纠正产品信息时S2S自由编造如"粉末来的呀你搞错了吧"
const KB_PROTECTION_WINDOW_MS = 60000;
if (!isKnowledgeCandidate && session._lastKbHitAt && (Date.now() - session._lastKbHitAt < KB_PROTECTION_WINDOW_MS)) {
const isPureChitchat = /^(喂|你好|嗨|谢谢|再见|拜拜|好的|嗯|哦|行|没事了|不用了|可以了)[,。!?\s]*$/.test(cleanText);
if (!isPureChitchat) {
isKnowledgeCandidate = true;
console.log(`[NativeVoice] KB protection window active, promoting to kbCandidate session=${session.sessionId} lastKbHit=${Math.round((Date.now() - session._lastKbHitAt) / 1000)}s ago`);
}
}
if (isKnowledgeCandidate) {
session.blockUpstreamAudio = true;
suppressUpstreamReply(session, 30000);
sendJson(session.client, { type: 'tts_reset', reason: 'processing' });
// 过渡语已移除KB查询优化后延迟已降至~2.6s,无需填充
session._fillerActive = false;
}
console.log(`[NativeVoice] processReply start session=${session.sessionId} text=${JSON.stringify(cleanText.slice(0, 120))} blocked=${session.blockUpstreamAudio} kbCandidate=${isKnowledgeCandidate}`);
try {
// KB预查缓存消费如果partial阶段已启动预查且文本匹配直接使用缓存结果
let resolveResult = null;
if (isKnowledgeCandidate && session.pendingKbPrequery && session._kbPrequeryText) {
const preText = (session._kbPrequeryText || '').replace(/[啊哦嗯呢呀哎诶额,。!?、,.\s]/g, '');
const finalText = cleanText.replace(/[啊哦嗯呢呀哎诶额,。!?、,.\s]/g, '');
// 放宽相似度:子串包含 或 重叠字符占比>=60% 即视为匹配
let isSimilar = preText && finalText && (finalText.includes(preText) || preText.includes(finalText));
if (!isSimilar && preText && finalText) {
const shorter = preText.length <= finalText.length ? preText : finalText;
const longer = preText.length <= finalText.length ? finalText : preText;
let overlap = 0;
for (let i = 0; i < shorter.length; i++) {
if (longer.includes(shorter[i])) overlap++;
}
isSimilar = overlap / shorter.length >= 0.45;
}
if (isSimilar) {
console.log(`[NativeVoice] using KB prequery cache session=${session.sessionId} preText=${JSON.stringify(session._kbPrequeryText.slice(0, 60))}`);
resolveResult = await session.pendingKbPrequery;
} else {
console.log(`[NativeVoice] KB prequery text mismatch, re-querying session=${session.sessionId} pre=${JSON.stringify(preText.slice(0, 40))} final=${JSON.stringify(finalText.slice(0, 40))}`);
}
}
session.pendingKbPrequery = null;
session._kbPrequeryText = '';
session._kbPrequeryStartedAt = 0;
if (!resolveResult) {
resolveResult = await resolveReply(session.sessionId, session, cleanText);
}
const { delivery, speechText, ragItems, source, toolName, routeDecision, responseMeta } = resolveResult;
if (activeTurnSeq !== (session.latestUserTurnSeq || 0)) {
console.log(`[NativeVoice] stale reply ignored session=${session.sessionId} activeTurn=${activeTurnSeq} latestTurn=${session.latestUserTurnSeq || 0}`);
clearUpstreamSuppression(session);
return;
}
if (delivery === 'upstream_chat') {
if (isKnowledgeCandidate) {
console.log(`[NativeVoice] processReply kb-nohit retrigger session=${session.sessionId}`);
session.discardNextAssistantResponse = true;
await sendExternalRag(session, [{ title: '用户问题', content: cleanText }]);
} else {
session.blockUpstreamAudio = false;
}
session._lastPartialAt = 0;
session.awaitingUpstreamReply = true;
session.pendingAssistantSource = 'voice_bot';
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = responseMeta;
session.pendingAssistantTurnSeq = activeTurnSeq;
console.log(`[NativeVoice] processReply handoff session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=upstream_chat`);
return;
}
if (delivery === 'external_rag') {
if (!session.blockUpstreamAudio) {
session.blockUpstreamAudio = true;
}
session.discardNextAssistantResponse = true;
sendJson(session.client, { type: 'tts_reset', reason: 'knowledge_hit' });
const ragContent = (ragItems || []).filter((item) => item && item.content);
if (ragContent.length > 0) {
console.log(`[NativeVoice] processReply sending external_rag to S2S session=${session.sessionId} route=${routeDecision?.route || 'unknown'} items=${ragContent.length}`);
// KB话题记忆记录本轮用户原始问题和时间戳用于保护窗口和追问enrichment
if (responseMeta?.hit !== false && responseMeta?.reason !== 'honest_fallback') {
session._lastKbTopic = cleanText;
session._lastKbHitAt = Date.now();
}
session._pendingExternalRagReply = true;
await sendExternalRag(session, ragContent);
session.awaitingUpstreamReply = true;
session.pendingAssistantSource = source;
session.pendingAssistantToolName = toolName;
session.pendingAssistantMeta = responseMeta;
session.pendingAssistantTurnSeq = activeTurnSeq;
} else {
console.log(`[NativeVoice] processReply external_rag empty content, fallback to upstream session=${session.sessionId}`);
session.blockUpstreamAudio = false;
clearUpstreamSuppression(session);
}
return;
}
if (!speechText) {
console.log(`[NativeVoice] processReply empty session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=${delivery || 'unknown'}`);
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
return;
}
console.log(`[NativeVoice] processReply resolved session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=local_tts source=${source} tool=${toolName || 'chat'} speechLen=${speechText.length}`);
session.directSpeakUntil = Date.now() + estimateSpeechDurationMs(speechText) + 800;
suppressUpstreamReply(session, estimateSpeechDurationMs(speechText) + 1800);
session.lastDeliveredAssistantTurnSeq = activeTurnSeq;
persistAssistantSpeech(session, speechText, { source, toolName, meta: responseMeta });
await sendSpeechText(session, speechText);
} catch (error) {
console.error('[NativeVoice] processReply failed:', error.message);
sendJson(session.client, { type: 'error', error: error.message });
} finally {
session.processingReply = false;
if (!session.awaitingUpstreamReply) {
session.blockUpstreamAudio = false;
}
if (!session.awaitingUpstreamReply) {
sendJson(session.client, { type: 'assistant_pending', active: false });
}
const pending = session.queuedUserText;
const pendingTurnSeq = session.queuedUserTurnSeq || 0;
session.queuedUserText = '';
session.queuedUserTurnSeq = 0;
if (pending && pendingTurnSeq && pendingTurnSeq !== activeTurnSeq && (!session.directSpeakUntil || Date.now() >= session.directSpeakUntil)) {
setTimeout(() => {
session.blockUpstreamAudio = true;
processReply(session, pending, pendingTurnSeq).catch((err) => {
console.error('[NativeVoice] queued processReply failed:', err.message);
});
}, 200);
} else if (pending && pendingTurnSeq && pendingTurnSeq !== activeTurnSeq) {
const waitMs = Math.max(200, session.directSpeakUntil - Date.now() + 200);
clearTimeout(session.queuedReplyTimer);
session.queuedReplyTimer = setTimeout(() => {
session.queuedReplyTimer = null;
const queuedText = session.queuedUserText || pending;
const queuedTurnSeq = session.queuedUserTurnSeq || pendingTurnSeq;
session.queuedUserText = '';
session.queuedUserTurnSeq = 0;
session.blockUpstreamAudio = true;
processReply(session, queuedText, queuedTurnSeq).catch((err) => {
console.error('[NativeVoice] delayed queued processReply failed:', err.message);
});
}, waitMs);
}
}
}
function handleUpstreamMessage(session, data) {
let message;
try {
message = unmarshal(data);
} catch (error) {
console.warn('[NativeVoice] unmarshal failed:', error.message);
return;
}
if (message.type === MsgType.AUDIO_ONLY_SERVER) {
const isDefaultTts = !session.currentTtsType || session.currentTtsType === 'default';
const isSuppressingUpstreamAudio = (session.suppressUpstreamUntil || 0) > Date.now() && isDefaultTts;
// 用户刚停止说话后短暂阻止默认TTS音频给event 459的blockUpstreamAudio留时间生效
const isUserJustSpeaking = isDefaultTts && session._lastPartialAt && (Date.now() - session._lastPartialAt < 800);
// blockUpstreamAudio 生效时:仅放行 external_rag 和限时过渡语音频,其余全部阻断
// 修复:旧逻辑只阻断 isDefaultTts导致 chat_tts_text 窗口期 S2S 自主回复音频泄漏
const isBlockPassthrough = session.currentTtsType === 'external_rag' ||
(session._fillerActive && (session.chatTTSUntil || 0) > Date.now());
if ((session.blockUpstreamAudio && !isBlockPassthrough) || isSuppressingUpstreamAudio || isUserJustSpeaking) {
if (!session._audioBlockLogOnce) {
session._audioBlockLogOnce = true;
console.log(`[NativeVoice] audio blocked session=${session.sessionId} ttsType=${session.currentTtsType} block=${session.blockUpstreamAudio} suppress=${isSuppressingUpstreamAudio}`);
}
return;
}
session._audioBlockLogOnce = false;
if (session.client && session.client.readyState === WebSocket.OPEN) {
session.client.send(message.payload, { binary: true });
}
return;
}
const payload = parseJsonPayload(message);
if (message.type === MsgType.ERROR) {
console.error(`[NativeVoice] upstream error session=${session.sessionId} code=${message.event} payload=${message.payload.toString('utf8').slice(0, 200)}`);
sendJson(session.client, { type: 'error', error: message.payload.toString('utf8') });
return;
}
if (message.type !== MsgType.FULL_SERVER) {
return;
}
if (message.event === 150) {
session.upstreamReady = true;
console.log(`[NativeVoice] upstream ready session=${session.sessionId}`);
resetIdleTimer(session);
sendGreeting(session);
return;
}
if (message.event === 300) {
console.log(`[NativeVoice] SayHello response session=${session.sessionId}`);
return;
}
if (message.event === 350) {
session.currentTtsType = payload?.tts_type || '';
if (payload?.tts_type === 'chat_tts_text' && session.pendingGreetingAck) {
session.pendingGreetingAck = false;
clearTimeout(session.greetingAckTimer);
session.greetingAckTimer = null;
}
if (session.blockUpstreamAudio && payload?.tts_type === 'external_rag') {
session.blockUpstreamAudio = false;
session.suppressUpstreamUntil = 0;
clearTimeout(session.suppressReplyTimer);
session.suppressReplyTimer = null;
// 注意不清除discardNextAssistantResponse让它拦截S2S默认回复的残留event 351
// 该标记会在KB回复的event 550 chunks到达时自动清除
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
// 清除过渡语的chat TTS状态确保external_rag回复不被isLocalChatTTSTextActive拦截
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
session.currentSpeechText = '';
session._fillerActive = false;
clearTimeout(session.chatTTSTimer);
sendJson(session.client, { type: 'tts_reset', reason: 'rag_response_start' });
console.log(`[NativeVoice] unblock for external_rag tts session=${session.sessionId}`);
} else if (session.blockUpstreamAudio && payload?.tts_type === 'chat_tts_text') {
console.log(`[NativeVoice] chat_tts_text started, keeping block for S2S default response session=${session.sessionId}`);
}
console.log(`[NativeVoice] upstream tts_event session=${session.sessionId} ttsType=${payload?.tts_type || ''}`);
sendJson(session.client, { type: 'tts_event', payload });
return;
}
const isLocalChatTTSTextActive = !!session.isSendingChatTTSText && (session.chatTTSUntil || 0) > Date.now();
const isSuppressingUpstreamReply = (session.suppressUpstreamUntil || 0) > Date.now();
if (message.event === 351) {
if (isLocalChatTTSTextActive || session.blockUpstreamAudio || isSuppressingUpstreamReply) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
return;
}
if (session.discardNextAssistantResponse) {
session.discardNextAssistantResponse = false;
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] discarded stale assistant response (kb-nohit retrigger) session=${session.sessionId}`);
return;
}
const pendingAssistantSource = session.pendingAssistantSource || 'voice_bot';
const pendingAssistantToolName = session.pendingAssistantToolName || null;
const pendingAssistantMeta = session.pendingAssistantMeta || null;
const pendingAssistantTurnSeq = session.pendingAssistantTurnSeq || session.latestUserTurnSeq || 0;
session.awaitingUpstreamReply = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
if (pendingAssistantTurnSeq && session.lastDeliveredAssistantTurnSeq === pendingAssistantTurnSeq) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
console.log(`[NativeVoice] duplicate assistant final ignored (351) session=${session.sessionId} turn=${pendingAssistantTurnSeq}`);
return;
}
const assistantText = extractRawText(payload);
if (assistantText) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
// 过渡语的event 351不持久化直接丢弃
if (session._fillerActive) {
console.log(`[NativeVoice] discarded filler assistant text session=${session.sessionId}`);
session._fillerActive = false;
return;
}
// 清除external_rag等待标记KB回复已到达
if (session._pendingExternalRagReply) {
session._pendingExternalRagReply = false;
}
// 品牌安全检测:最终助手文本包含有害内容时,阻断音频并注入安全回复
if (isBrandHarmful(assistantText)) {
console.warn(`[NativeVoice][SafeGuard] harmful content in final assistant text, blocking session=${session.sessionId} text=${JSON.stringify(assistantText.slice(0, 120))}`);
session.blockUpstreamAudio = true;
sendJson(session.client, { type: 'tts_reset', reason: 'harmful_blocked' });
const safeReply = getVoiceSafeReply();
session.lastDeliveredAssistantTurnSeq = pendingAssistantTurnSeq;
persistAssistantSpeech(session, safeReply, { source: 'voice_bot' });
sendSpeechText(session, safeReply).catch((err) => {
console.warn('[NativeVoice][SafeGuard] sendSpeechText failed:', err.message);
});
} else {
console.log(`[NativeVoice] upstream assistant session=${session.sessionId} text=${JSON.stringify(assistantText.slice(0, 120))}`);
session.lastDeliveredAssistantTurnSeq = pendingAssistantTurnSeq;
persistAssistantSpeech(session, assistantText, {
source: pendingAssistantSource,
toolName: pendingAssistantToolName,
meta: pendingAssistantMeta,
});
// KB回复完成后重新阻断音频防止下一个问题的S2S默认回复在early block前泄露
if (session.currentTtsType === 'external_rag') {
session.blockUpstreamAudio = true;
console.log(`[NativeVoice] re-blocked after KB response session=${session.sessionId}`);
}
}
} else {
const didFlush = flushAssistantStream(session, {
source: pendingAssistantSource,
toolName: pendingAssistantToolName,
meta: pendingAssistantMeta,
});
if (didFlush) {
session.lastDeliveredAssistantTurnSeq = pendingAssistantTurnSeq;
}
}
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
return;
}
if (message.event === 550) {
// external_rag chunks到达时清除discardNextAssistantResponse默认回复的351已过或不会来
if (session.discardNextAssistantResponse && session.currentTtsType === 'external_rag') {
session.discardNextAssistantResponse = false;
console.log(`[NativeVoice] cleared discardNextAssistantResponse for external_rag stream session=${session.sessionId}`);
}
if (isLocalChatTTSTextActive || session.blockUpstreamAudio || isSuppressingUpstreamReply || session.discardNextAssistantResponse) {
return;
}
if (session.awaitingUpstreamReply) {
session.awaitingUpstreamReply = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
}
const fullText = appendAssistantStream(session, payload);
if (fullText) {
// 品牌安全检测S2S模型输出传销等负面内容时立即阻断音频并注入安全回复
if (fullText.length >= 4 && isBrandHarmful(fullText)) {
console.warn(`[NativeVoice][SafeGuard] harmful content detected in stream, blocking audio session=${session.sessionId} text=${JSON.stringify(fullText.slice(0, 120))}`);
session.blockUpstreamAudio = true;
session.discardNextAssistantResponse = true;
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
sendJson(session.client, { type: 'tts_reset', reason: 'harmful_blocked' });
// 注入安全回复语音,替代有害内容
const safeReply = getVoiceSafeReply();
persistAssistantSpeech(session, safeReply, { source: 'voice_bot' });
sendSpeechText(session, safeReply).catch((err) => {
console.warn('[NativeVoice][SafeGuard] sendSpeechText failed:', err.message);
});
return;
}
// 检测思考模式S2S模型输出分析/计划而非直接回答,立即阻断
if (fullText.length >= 10 && (THINKING_PATTERN.test(fullText.trim()) || THINKING_MID_PATTERN.test(fullText))) {
console.warn(`[NativeVoice][SafeGuard] thinking detected in stream, blocking audio session=${session.sessionId} text=${JSON.stringify(fullText.slice(0, 120))}`);
session.blockUpstreamAudio = true;
session.discardNextAssistantResponse = true;
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
sendJson(session.client, { type: 'tts_reset', reason: 'thinking_blocked' });
return;
}
console.log(`[NativeVoice] upstream assistant chunk session=${session.sessionId} len=${fullText.length} text=${JSON.stringify(fullText.slice(0, 120))}`);
}
return;
}
if (message.event === 559) {
if (isLocalChatTTSTextActive || isSuppressingUpstreamReply) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
return;
}
if (session.blockUpstreamAudio) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] blocked response ended (559), keeping block session=${session.sessionId}`);
return;
}
if (session.discardNextAssistantResponse) {
session.discardNextAssistantResponse = false;
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] discarded stale stream end (559, kb-nohit retrigger) session=${session.sessionId}`);
return;
}
// external_rag流期间阻止默认回复的559过早flush部分KB文本
if (session._pendingExternalRagReply) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] suppressed 559 flush during external_rag flow session=${session.sessionId}`);
return;
}
const pendingAssistantTurnSeq = session.pendingAssistantTurnSeq || session.latestUserTurnSeq || 0;
if (pendingAssistantTurnSeq && session.lastDeliveredAssistantTurnSeq === pendingAssistantTurnSeq) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
console.log(`[NativeVoice] duplicate assistant final ignored (559) session=${session.sessionId} turn=${pendingAssistantTurnSeq}`);
return;
}
session.awaitingUpstreamReply = false;
session.blockUpstreamAudio = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
const didFlush = flushAssistantStream(session, {
source: session.pendingAssistantSource || 'voice_bot',
toolName: session.pendingAssistantToolName || null,
meta: session.pendingAssistantMeta || null,
});
if (didFlush) {
session.lastDeliveredAssistantTurnSeq = pendingAssistantTurnSeq;
}
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
return;
}
if (message.event === 450 || (message.event === 451 && !isFinalUserPayload(payload))) {
const text = extractUserText(payload, session.sessionId);
if (text) {
const now = Date.now();
const isDirectSpeaking = session.directSpeakUntil && now < session.directSpeakUntil;
const isChatTTSSpeaking = session.isSendingChatTTSText && (session.chatTTSUntil || 0) > now;
// TTS回声检测播放期间如果ASR识别文本是当前播放文本的子串判定为回声忽略
if ((isDirectSpeaking || isChatTTSSpeaking) && session.currentSpeechText) {
const normalizedPartial = text.replace(/[,。!?、,.\s]/g, '');
const normalizedSpeech = session.currentSpeechText.replace(/[,。!?、,.\s]/g, '');
if (normalizedPartial.length <= 3 || normalizedSpeech.includes(normalizedPartial)) {
if (!session._echoLogOnce) {
session._echoLogOnce = true;
console.log(`[NativeVoice] TTS echo detected, ignoring partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 80))}`);
}
return;
}
session._echoLogOnce = false;
} else {
session._echoLogOnce = false;
}
// Greeting保护窗口发送问候语后短暂保护期内忽略barge-in
if (session.greetingProtectionUntil && now < session.greetingProtectionUntil) {
console.log(`[NativeVoice] greeting protection active, ignoring partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 80))}`);
return;
}
console.log(`[NativeVoice] upstream partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 120))}`);
const normalizedPartial = normalizeKnowledgeAlias(text);
session.latestUserText = normalizedPartial;
session._lastPartialAt = now;
// 提前阻断部分识别文字含知识库关键词时立即阻断S2S音频防止有害内容播出
if (normalizedPartial.length >= 6 && !session.blockUpstreamAudio && shouldForceKnowledgeRoute(normalizedPartial)) {
session.blockUpstreamAudio = true;
session.currentTtsType = 'default';
// 立即清除客户端已收到的S2S音频防止用户听到抢答片段
sendJson(session.client, { type: 'tts_reset', reason: 'early_block' });
console.log(`[NativeVoice] early block: partial text matched KB keywords session=${session.sessionId} text=${JSON.stringify(text.slice(0, 80))}`);
// KB预查询提前启动知识库查询减少final ASR后的等待时间
const kbPrequeryDebounce = 600;
if (normalizedPartial.length >= 8 && (!session._kbPrequeryStartedAt || now - session._kbPrequeryStartedAt > kbPrequeryDebounce)) {
session._kbPrequeryStartedAt = now;
session._kbPrequeryText = normalizedPartial;
console.log(`[NativeVoice] KB prequery started session=${session.sessionId} text=${JSON.stringify(normalizedPartial.slice(0, 80))}`);
session.pendingKbPrequery = resolveReply(session.sessionId, session, normalizedPartial).catch((err) => {
console.warn(`[NativeVoice] KB prequery failed session=${session.sessionId}:`, err.message);
return null;
});
}
}
// 用户开口说话时立即打断所有 AI 播放(包括 S2S 默认 TTS
if (isDirectSpeaking || isChatTTSSpeaking) {
console.log(`[NativeVoice] user barge-in (partial) session=${session.sessionId} direct=${isDirectSpeaking} chatTTS=${isChatTTSSpeaking}`);
session.directSpeakUntil = 0;
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
session.currentSpeechText = '';
clearTimeout(session.chatTTSTimer);
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
clearUpstreamSuppression(session);
}
}
// 无论当前是否在播放,都发送 tts_reset 确保客户端停止所有音频播放
if (!session._lastBargeInResetAt || now - session._lastBargeInResetAt > 500) {
session._lastBargeInResetAt = now;
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
}
sendJson(session.client, {
type: 'subtitle',
role: 'user',
text: text,
isFinal: false,
sequence: `native_partial_${Date.now()}`,
});
}
return;
}
if (message.event === 459 || (message.event === 451 && isFinalUserPayload(payload))) {
const rawFinalText = extractUserText(payload, session.sessionId) || '';
const finalText = normalizeKnowledgeAlias(rawFinalText) || session.latestUserText || '';
const now459 = Date.now();
// 双事件去重S2S可能同时发送event 459和event 451(is_final),用去标点归一化文本+时间窗口去重
const normalizedForDedup = finalText.replace(/[,。!?、,.?!\s]/g, '');
if (normalizedForDedup && session._lastFinalNormalized === normalizedForDedup && now459 - (session._lastFinalAt || 0) < 1500) {
console.log(`[NativeVoice] duplicate final ignored (event=${message.event}) session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 80))}`);
return;
}
session._lastFinalNormalized = normalizedForDedup;
session._lastFinalAt = now459;
// TTS回声检测final级别播放期间ASR最终识别文本如果是当前播放文本的子串判定为回声
const isDirectSpeaking459 = session.directSpeakUntil && now459 < session.directSpeakUntil;
const isChatTTSSpeaking459 = session.isSendingChatTTSText && (session.chatTTSUntil || 0) > now459;
if ((isDirectSpeaking459 || isChatTTSSpeaking459) && session.currentSpeechText && finalText) {
const normalizedFinal = finalText.replace(/[,。!?、,.\s]/g, '');
const normalizedSpeech = session.currentSpeechText.replace(/[,。!?、,.\s]/g, '');
if (normalizedFinal.length <= 4 || normalizedSpeech.includes(normalizedFinal)) {
console.log(`[NativeVoice] TTS echo detected in final, ignoring session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 80))}`);
return;
}
}
// Greeting保护窗口
if (session.greetingProtectionUntil && now459 < session.greetingProtectionUntil && finalText) {
console.log(`[NativeVoice] greeting protection active, ignoring final session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 80))}`);
return;
}
console.log(`[NativeVoice] upstream final session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 120))}`);
if (session.directSpeakUntil && Date.now() < session.directSpeakUntil) {
console.log(`[NativeVoice] user interrupt during speaking session=${session.sessionId}`);
session.directSpeakUntil = 0;
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
session.currentSpeechText = '';
clearTimeout(session.chatTTSTimer);
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
clearUpstreamSuppression(session);
}
} else if (session.isSendingChatTTSText && (session.chatTTSUntil || 0) > Date.now()) {
console.log(`[NativeVoice] user interrupt chatTTS during speaking session=${session.sessionId}`);
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
session.currentSpeechText = '';
clearTimeout(session.chatTTSTimer);
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
if (session.suppressReplyTimer || session.suppressUpstreamUntil) {
clearUpstreamSuppression(session);
}
}
if (persistUserSpeech(session, rawFinalText || finalText)) {
session.blockUpstreamAudio = true;
session.currentTtsType = 'default';
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
sendJson(session.client, { type: 'tts_reset', reason: 'new_turn' });
processReply(session, finalText).catch((error) => {
console.error('[NativeVoice] processReply error:', error.message);
});
}
return;
}
sendJson(session.client, {
type: 'event',
event: message.event,
payload,
});
}
function attachClientHandlers(session) {
session.client.on('message', async (raw, isBinary) => {
if (isBinary) {
if (session.upstream && session.upstream.readyState === WebSocket.OPEN && session.upstreamReady) {
session.upstream.send(createAudioMessage(session.sessionId, raw));
}
return;
}
let parsed;
try {
parsed = JSON.parse(raw.toString('utf8'));
} catch (error) {
sendJson(session.client, { type: 'error', error: 'invalid client json' });
return;
}
if (parsed.type === 'start') {
const assistantProfile = resolveAssistantProfile({
...(session.assistantProfile || {}),
...((parsed.assistantProfile && typeof parsed.assistantProfile === 'object') ? parsed.assistantProfile : {}),
});
session.assistantProfile = assistantProfile;
session.botName = parsed.botName || assistantProfile.nickname || DEFAULT_VOICE_BOT_NAME;
session.systemRole = parsed.systemRole || buildVoiceSystemRole(assistantProfile);
session.speakingStyle = parsed.speakingStyle || session.speakingStyle || DEFAULT_VOICE_SPEAKING_STYLE;
session.speaker = parsed.speaker || process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts';
session.modelVersion = parsed.modelVersion || 'O';
session.greetingText = parsed.greetingText || buildVoiceGreeting(assistantProfile);
session.userId = parsed.userId || session.userId || null;
// 立即发送 ready不等 upstream event 150大幅缩短前端等待时间
sendReady(session);
session.upstream = createUpstreamConnection(session);
loadHandoffSummaryForVoice(session).catch((error) => {
console.warn('[NativeVoice] async loadHandoffSummaryForVoice failed:', error.message);
});
return;
}
if (parsed.type === 'stop') {
session.client.close();
return;
}
if (parsed.type === 'replay_greeting') {
replayGreeting(session).catch((error) => {
console.warn('[NativeVoice] replayGreeting failed:', error.message);
});
return;
}
if (parsed.type === 'text' && parsed.text) {
if (persistUserSpeech(session, parsed.text)) {
processReply(session, parsed.text, session.latestUserTurnSeq).catch((error) => {
console.error('[NativeVoice] text processReply failed:', error.message);
});
}
}
});
session.client.on('close', () => {
clearTimeout(session.chatTTSTimer);
clearTimeout(session.greetingTimer);
clearTimeout(session.greetingAckTimer);
clearTimeout(session.readyTimer);
clearTimeout(session.suppressReplyTimer);
clearTimeout(session.idleTimer);
if (session.upstream && session.upstream.readyState === WebSocket.OPEN) {
session.upstream.close();
}
sessions.delete(session.sessionId);
});
}
function createUpstreamConnection(session) {
const upstream = new WebSocket('wss://openspeech.bytedance.com/api/v3/realtime/dialogue', {
headers: {
'X-Api-Resource-Id': 'volc.speech.dialog',
'X-Api-Access-Key': process.env.VOLC_S2S_TOKEN,
'X-Api-App-Key': process.env.VOLC_DIALOG_APP_KEY || 'PlgvMymc7f3tQnJ6',
'X-Api-App-ID': process.env.VOLC_S2S_APP_ID,
'X-Api-Connect-Id': session.sessionId,
},
});
upstream.on('open', () => {
upstream.send(createStartConnectionMessage());
upstream.send(createStartSessionMessage(session.sessionId, buildStartSessionPayload(session)));
});
upstream.on('message', (data, isBinary) => {
if (!isBinary && typeof data === 'string') {
sendJson(session.client, { type: 'server_text', text: data });
return;
}
handleUpstreamMessage(session, Buffer.isBuffer(data) ? data : Buffer.from(data));
});
upstream.on('error', (error) => {
console.error('[NativeVoice] upstream ws error:', error.message);
sendJson(session.client, { type: 'error', error: `语音服务连接异常: ${error.message}` });
});
upstream.on('close', (code) => {
console.log(`[NativeVoice] upstream closed session=${session.sessionId} code=${code}`);
session.upstreamReady = false;
sendJson(session.client, { type: 'upstream_closed', code });
setTimeout(() => {
if (session.client && session.client.readyState === WebSocket.OPEN) {
session.client.close();
}
}, 3000);
});
return upstream;
}
function createSession(client, sessionId) {
const assistantProfile = resolveAssistantProfile();
const session = {
sessionId,
client,
upstream: null,
upstreamReady: false,
isSendingChatTTSText: false,
latestUserText: '',
latestUserTurnSeq: 0,
queuedUserText: '',
queuedUserTurnSeq: 0,
processingReply: false,
blockUpstreamAudio: false,
directSpeakUntil: 0,
queuedReplyTimer: null,
lastPersistedAssistantText: '',
lastPersistedAssistantAt: 0,
assistantStreamBuffer: '',
assistantStreamReplyId: '',
currentTtsType: '',
currentSpeechText: '',
greetingProtectionUntil: 0,
_echoLogOnce: false,
_fillerActive: false,
_pendingExternalRagReply: false,
_lastPartialAt: 0,
pendingKbPrequery: null,
_kbPrequeryText: '',
_kbPrequeryStartedAt: 0,
_lastKbTopic: '',
_lastKbHitAt: 0,
assistantProfile,
botName: assistantProfile.nickname,
systemRole: buildVoiceSystemRole(assistantProfile),
speakingStyle: DEFAULT_VOICE_SPEAKING_STYLE,
speaker: process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts',
modelVersion: 'O',
greetingText: buildVoiceGreeting(assistantProfile),
hasSentGreeting: false,
greetingTimer: null,
greetingAckTimer: null,
pendingGreetingAck: false,
greetingRetryCount: 0,
readyTimer: null,
readySent: false,
handoffSummary: '',
handoffSummaryUsed: false,
awaitingUpstreamReply: false,
pendingAssistantSource: null,
pendingAssistantToolName: null,
pendingAssistantMeta: null,
pendingAssistantTurnSeq: 0,
lastDeliveredAssistantTurnSeq: 0,
suppressReplyTimer: null,
suppressUpstreamUntil: 0,
idleTimer: null,
lastActivityAt: Date.now(),
_lastBargeInResetAt: 0,
_audioBlockLogOnce: false,
_lastFinalNormalized: '',
_lastFinalAt: 0,
};
sessions.set(sessionId, session);
attachClientHandlers(session);
return session;
}
function setupNativeVoiceGateway(server) {
const wss = new WebSocketServer({ server, path: '/ws/realtime-dialog' });
wss.on('connection', async (client, req) => {
const parsed = url.parse(req.url, true);
const sessionId = parsed.query?.sessionId;
if (!sessionId) {
client.close();
return;
}
const userId = parsed.query?.userId || null;
const session = createSession(client, sessionId);
session.userId = userId;
try {
await db.createSession(sessionId, userId, 'voice');
} catch (error) {
console.warn('[NativeVoice][DB] createSession failed:', error.message);
}
sendJson(client, { type: 'connected', sessionId });
});
return wss;
}
module.exports = {
setupNativeVoiceGateway,
};