Files
bigwo/test2/server/services/nativeVoiceGateway.js

750 lines
28 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const { WebSocket, WebSocketServer } = require('ws');
const url = require('url');
const db = require('../db');
const arkChatService = require('./arkChatService');
const {
MsgType,
unmarshal,
createStartConnectionMessage,
createStartSessionMessage,
createAudioMessage,
createChatTTSTextMessage,
createChatRAGTextMessage,
} = require('./realtimeDialogProtocol');
const {
getRuleBasedDirectRouteDecision,
normalizeTextForSpeech,
splitTextForSpeech,
estimateSpeechDurationMs,
shouldForceKnowledgeRoute,
resolveReply,
} = require('./realtimeDialogRouting');
const sessions = new Map();
const IDLE_TIMEOUT_MS = 5 * 60 * 1000;
function resetIdleTimer(session) {
clearTimeout(session.idleTimer);
session.lastActivityAt = Date.now();
session.idleTimer = setTimeout(() => {
session.idleTimer = null;
console.log(`[NativeVoice] idle timeout (${IDLE_TIMEOUT_MS / 1000}s) session=${session.sessionId}`);
sendJson(session.client, { type: 'idle_timeout', timeout: IDLE_TIMEOUT_MS });
setTimeout(() => {
if (session.client && session.client.readyState === WebSocket.OPEN) {
session.client.close();
}
}, 2000);
}, IDLE_TIMEOUT_MS);
}
function sendJson(ws, payload) {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(payload));
}
}
function buildStartSessionPayload(options) {
return {
asr: {
extra: {},
},
tts: {
speaker: options.speaker || process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts',
audio_config: {
channel: 1,
format: 'pcm_s16le',
sample_rate: 24000,
},
},
dialog: {
dialog_id: '',
bot_name: options.botName || '大沃',
system_role: normalizeTextForSpeech(options.systemRole || '你是大沃一个德国PM健康事业的智能语音助手。你对PM-FitLine细胞营养素产品、一成系统、招商合作非常熟悉。请优先依据 external_rag 给出的内容回答。无论是闲聊还是引用知识库内容,都要保持一样的说话风格,不要切换成朗读语气。用户进来时请自然地打个招呼,像朋友聊天一样,不要用客服话术。'),
speaking_style: normalizeTextForSpeech(options.speakingStyle || '说话像朋友聊天一样自然轻松,语气亲切活泼,不要像客服念稿。即使引用知识库内容也要用聊天的语气说出来,不要切换成播音腔或朗读语气。'),
extra: {
input_mod: 'audio',
model: options.modelVersion || 'O',
strict_audit: false,
audit_response: '抱歉,这个问题我暂时无法回答。',
},
},
};
}
function parseJsonPayload(message) {
try {
return JSON.parse(message.payload.toString('utf8'));
} catch (error) {
return null;
}
}
function extractUserText(jsonPayload) {
const text = jsonPayload?.text
|| jsonPayload?.content
|| jsonPayload?.results?.[0]?.text
|| jsonPayload?.results?.[0]?.alternatives?.[0]?.text
|| '';
return String(text || '').trim();
}
function isFinalUserPayload(jsonPayload) {
if (jsonPayload?.is_final === true) {
return true;
}
if (Array.isArray(jsonPayload?.results)) {
return jsonPayload.results.some((item) => item && item.is_interim === false);
}
return false;
}
function persistUserSpeech(session, text) {
const cleanText = (text || '').trim();
if (!cleanText) return false;
const now = Date.now();
if (session.lastPersistedUserText === cleanText && now - (session.lastPersistedUserAt || 0) < 5000) {
return false;
}
session.lastPersistedUserText = cleanText;
session.lastPersistedUserAt = now;
session.latestUserText = cleanText;
resetIdleTimer(session);
db.addMessage(session.sessionId, 'user', cleanText, 'voice_asr').catch((e) => console.warn('[NativeVoice][DB] add user failed:', e.message));
sendJson(session.client, {
type: 'subtitle',
role: 'user',
text: cleanText,
isFinal: true,
sequence: `native_user_${now}`,
});
return true;
}
function persistAssistantSpeech(session, text, { source = 'voice_bot', toolName = null, persistToDb = true, meta = null } = {}) {
const cleanText = (text || '').trim();
if (!cleanText) return false;
const now = Date.now();
if (session.lastPersistedAssistantText === cleanText && now - (session.lastPersistedAssistantAt || 0) < 5000) {
return false;
}
session.lastPersistedAssistantText = cleanText;
session.lastPersistedAssistantAt = now;
resetIdleTimer(session);
if (persistToDb) {
db.addMessage(session.sessionId, 'assistant', cleanText, source, toolName, meta).catch((e) => console.warn('[NativeVoice][DB] add assistant failed:', e.message));
}
sendJson(session.client, {
type: 'subtitle',
role: 'assistant',
text: cleanText,
isFinal: true,
source,
toolName,
sequence: `native_assistant_${now}`,
});
return true;
}
function appendAssistantStream(session, payload) {
const chunkText = extractUserText(payload);
if (!chunkText) {
return '';
}
const replyId = payload?.reply_id || '';
if (replyId && session.assistantStreamReplyId && session.assistantStreamReplyId !== replyId) {
session.assistantStreamBuffer = '';
}
session.assistantStreamReplyId = replyId || session.assistantStreamReplyId || '';
session.assistantStreamBuffer = `${session.assistantStreamBuffer || ''}${chunkText}`;
return session.assistantStreamBuffer;
}
function flushAssistantStream(session, { source = 'voice_bot', toolName = null, meta = null } = {}) {
const fullText = (session.assistantStreamBuffer || '').trim();
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
if (!fullText) {
return false;
}
return persistAssistantSpeech(session, fullText, { source, toolName, meta });
}
async function loadHandoffSummaryForVoice(session) {
try {
const history = await db.getHistoryForLLM(session.sessionId, 20);
if (!history.length) {
session.handoffSummary = '';
session.handoffSummaryUsed = false;
return;
}
session.handoffSummary = await arkChatService.summarizeContextForHandoff(history, 3);
session.handoffSummaryUsed = false;
console.log(`[NativeVoice] Handoff summary prepared for ${session.sessionId}: ${session.handoffSummary ? 'yes' : 'no'}`);
} catch (error) {
session.handoffSummary = '';
session.handoffSummaryUsed = false;
console.warn('[NativeVoice] loadHandoffSummaryForVoice failed:', error.message);
}
}
async function sendSpeechText(session, speechText) {
const chunks = splitTextForSpeech(speechText);
if (!chunks.length || !session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
return;
}
console.log(`[NativeVoice] sendSpeechText start session=${session.sessionId} chunks=${chunks.length} textLen=${speechText.length}`);
session.isSendingChatTTSText = true;
session.currentTtsType = 'chat_tts_text';
session.chatTTSUntil = Date.now() + estimateSpeechDurationMs(speechText) + 800;
clearTimeout(session.chatTTSTimer);
session.chatTTSTimer = setTimeout(() => {
session.chatTTSTimer = null;
if ((session.chatTTSUntil || 0) <= Date.now()) {
session.isSendingChatTTSText = false;
}
}, Math.max(200, session.chatTTSUntil - Date.now() + 50));
sendJson(session.client, { type: 'tts_reset', ttsType: 'chat_tts_text' });
for (let index = 0; index < chunks.length; index += 1) {
const chunk = chunks[index];
console.log(`[NativeVoice] sendSpeechText chunk session=${session.sessionId} index=${index + 1}/${chunks.length} len=${chunk.length} start=${index === 0} end=false text=${JSON.stringify(chunk.slice(0, 80))}`);
session.upstream.send(createChatTTSTextMessage(session.sessionId, {
start: index === 0,
end: false,
content: chunk,
}));
}
console.log(`[NativeVoice] sendSpeechText end session=${session.sessionId}`);
session.upstream.send(createChatTTSTextMessage(session.sessionId, {
start: false,
end: true,
content: '',
}));
}
function sendReady(session) {
if (session.readySent) {
return;
}
session.readySent = true;
sendJson(session.client, { type: 'ready' });
}
function sendGreeting(session) {
if (session.hasSentGreeting) {
sendReady(session);
return;
}
session.hasSentGreeting = true;
const greetingText = session.greetingText || '嗨你好呀我是大沃你的专属智能助手。关于德国PM产品、一成系统、招商合作随时问我就好';
console.log(`[NativeVoice] sendGreeting session=${session.sessionId} text=${JSON.stringify(greetingText.slice(0, 80))}`);
sendJson(session.client, {
type: 'subtitle',
role: 'assistant',
text: greetingText,
isFinal: true,
source: 'voice_bot',
sequence: `greeting_${Date.now()}`,
});
persistAssistantSpeech(session, greetingText, { source: 'voice_bot' });
clearTimeout(session.greetingTimer);
clearTimeout(session.readyTimer);
session.greetingTimer = setTimeout(() => {
session.greetingTimer = null;
sendSpeechText(session, greetingText)
.then(() => {
session.readyTimer = setTimeout(() => {
session.readyTimer = null;
sendReady(session);
}, Math.max(1200, Math.min(estimateSpeechDurationMs(greetingText) + 300, 8000)));
})
.catch((error) => {
session.hasSentGreeting = false;
sendReady(session);
console.warn('[NativeVoice] greeting speech failed:', error.message);
});
}, 800);
}
async function sendExternalRag(session, items) {
if (!session.upstream || session.upstream.readyState !== WebSocket.OPEN) {
return;
}
const ragItems = Array.isArray(items) ? items.filter((item) => item && item.content) : [];
if (!ragItems.length) {
return;
}
session.upstream.send(createChatRAGTextMessage(session.sessionId, JSON.stringify(ragItems)));
}
async function processReply(session, text) {
const cleanText = (text || '').trim();
if (!cleanText) return;
if (session.processingReply) {
session.queuedUserText = cleanText;
console.log(`[NativeVoice] processReply queued(busy) session=${session.sessionId} text=${JSON.stringify(cleanText.slice(0, 80))}`);
return;
}
const now = Date.now();
if (session.directSpeakUntil && now < session.directSpeakUntil) {
session.queuedUserText = cleanText;
console.log(`[NativeVoice] processReply queued(speaking) session=${session.sessionId} waitMs=${session.directSpeakUntil - now} text=${JSON.stringify(cleanText.slice(0, 80))}`);
return;
}
session.processingReply = true;
sendJson(session.client, { type: 'assistant_pending', active: true });
const isKnowledgeCandidate = shouldForceKnowledgeRoute(cleanText);
if (isKnowledgeCandidate) {
sendJson(session.client, { type: 'tts_reset', reason: 'processing' });
}
console.log(`[NativeVoice] processReply start session=${session.sessionId} text=${JSON.stringify(cleanText.slice(0, 120))} blocked=${session.blockUpstreamAudio} kbCandidate=${isKnowledgeCandidate}`);
try {
const { delivery, speechText, ragItems, source, toolName, routeDecision, responseMeta } = await resolveReply(session.sessionId, session, cleanText);
if (delivery === 'upstream_chat') {
if (isKnowledgeCandidate) {
console.log(`[NativeVoice] processReply kb-nohit retrigger session=${session.sessionId}`);
await sendExternalRag(session, [{ title: '用户问题', content: cleanText }]);
} else {
session.blockUpstreamAudio = false;
}
session.awaitingUpstreamReply = true;
session.pendingAssistantSource = 'voice_bot';
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = responseMeta;
console.log(`[NativeVoice] processReply handoff session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=upstream_chat`);
return;
}
if (delivery === 'external_rag') {
if (!session.blockUpstreamAudio) {
session.blockUpstreamAudio = true;
sendJson(session.client, { type: 'tts_reset', reason: 'knowledge_hit' });
}
session.awaitingUpstreamReply = true;
session.pendingAssistantSource = source;
session.pendingAssistantToolName = toolName;
session.pendingAssistantMeta = responseMeta;
console.log(`[NativeVoice] processReply handoff session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=external_rag items=${Array.isArray(ragItems) ? ragItems.length : 0}`);
await sendExternalRag(session, ragItems);
return;
}
if (!speechText) {
console.log(`[NativeVoice] processReply empty session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=${delivery || 'unknown'}`);
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
return;
}
console.log(`[NativeVoice] processReply resolved session=${session.sessionId} route=${routeDecision?.route || 'unknown'} delivery=local_rag source=${source} tool=${toolName || 'chat'} speechLen=${speechText.length}`);
session.awaitingUpstreamReply = true;
session.pendingAssistantSource = source;
session.pendingAssistantToolName = toolName;
session.pendingAssistantMeta = responseMeta;
await sendExternalRag(session, [{ title: '回复内容', content: speechText }]);
} catch (error) {
console.error('[NativeVoice] processReply failed:', error.message);
sendJson(session.client, { type: 'error', error: error.message });
} finally {
session.processingReply = false;
if (!session.awaitingUpstreamReply) {
session.blockUpstreamAudio = false;
}
if (!session.awaitingUpstreamReply) {
sendJson(session.client, { type: 'assistant_pending', active: false });
}
const pending = session.queuedUserText;
session.queuedUserText = '';
if (pending && pending !== cleanText && (!session.directSpeakUntil || Date.now() >= session.directSpeakUntil)) {
setTimeout(() => {
session.blockUpstreamAudio = true;
processReply(session, pending).catch((err) => {
console.error('[NativeVoice] queued processReply failed:', err.message);
});
}, 200);
} else if (pending && pending !== cleanText) {
const waitMs = Math.max(200, session.directSpeakUntil - Date.now() + 200);
clearTimeout(session.queuedReplyTimer);
session.queuedReplyTimer = setTimeout(() => {
session.queuedReplyTimer = null;
const queuedText = session.queuedUserText || pending;
session.queuedUserText = '';
session.blockUpstreamAudio = true;
processReply(session, queuedText).catch((err) => {
console.error('[NativeVoice] delayed queued processReply failed:', err.message);
});
}, waitMs);
}
}
}
function handleUpstreamMessage(session, data) {
let message;
try {
message = unmarshal(data);
} catch (error) {
console.warn('[NativeVoice] unmarshal failed:', error.message);
return;
}
if (message.type === MsgType.AUDIO_ONLY_SERVER) {
if (session.blockUpstreamAudio) {
if (!session._audioBlockLogOnce) {
session._audioBlockLogOnce = true;
console.log(`[NativeVoice] audio blocked (blockUpstream) session=${session.sessionId} ttsType=${session.currentTtsType}`);
}
return;
}
session._audioBlockLogOnce = false;
if (session.client && session.client.readyState === WebSocket.OPEN) {
session.client.send(message.payload, { binary: true });
}
return;
}
const payload = parseJsonPayload(message);
if (message.type === MsgType.ERROR) {
console.error(`[NativeVoice] upstream error session=${session.sessionId} code=${message.event} payload=${message.payload.toString('utf8').slice(0, 200)}`);
sendJson(session.client, { type: 'error', error: message.payload.toString('utf8') });
return;
}
if (message.type !== MsgType.FULL_SERVER) {
return;
}
if (message.event === 150) {
session.upstreamReady = true;
console.log(`[NativeVoice] upstream ready session=${session.sessionId}`);
resetIdleTimer(session);
sendGreeting(session);
return;
}
if (message.event === 350) {
session.currentTtsType = payload?.tts_type || '';
if (payload?.tts_type === 'chat_tts_text' && session.pendingGreetingAck) {
session.pendingGreetingAck = false;
clearTimeout(session.greetingAckTimer);
session.greetingAckTimer = null;
}
if (session.blockUpstreamAudio && payload?.tts_type && payload.tts_type !== 'default') {
session.blockUpstreamAudio = false;
console.log(`[NativeVoice] unblock audio on ttsType=${payload.tts_type} session=${session.sessionId}`);
}
console.log(`[NativeVoice] upstream tts_event session=${session.sessionId} ttsType=${payload?.tts_type || ''}`);
sendJson(session.client, { type: 'tts_event', payload });
return;
}
const isLocalChatTTSTextActive = !!session.isSendingChatTTSText && (session.chatTTSUntil || 0) > Date.now();
if (message.event === 351) {
if (isLocalChatTTSTextActive || session.blockUpstreamAudio) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
return;
}
const pendingAssistantSource = session.pendingAssistantSource || 'voice_bot';
const pendingAssistantToolName = session.pendingAssistantToolName || null;
const pendingAssistantMeta = session.pendingAssistantMeta || null;
session.awaitingUpstreamReply = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
const assistantText = extractUserText(payload);
if (assistantText) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] upstream assistant session=${session.sessionId} text=${JSON.stringify(assistantText.slice(0, 120))}`);
persistAssistantSpeech(session, assistantText, {
source: pendingAssistantSource,
toolName: pendingAssistantToolName,
meta: pendingAssistantMeta,
});
} else {
flushAssistantStream(session, {
source: pendingAssistantSource,
toolName: pendingAssistantToolName,
meta: pendingAssistantMeta,
});
}
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
return;
}
if (message.event === 550) {
if (isLocalChatTTSTextActive || session.blockUpstreamAudio) {
return;
}
if (session.awaitingUpstreamReply) {
session.awaitingUpstreamReply = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
}
const fullText = appendAssistantStream(session, payload);
if (fullText) {
console.log(`[NativeVoice] upstream assistant chunk session=${session.sessionId} len=${fullText.length} text=${JSON.stringify(fullText.slice(0, 120))}`);
}
return;
}
if (message.event === 559) {
if (isLocalChatTTSTextActive) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
return;
}
if (session.blockUpstreamAudio) {
session.assistantStreamBuffer = '';
session.assistantStreamReplyId = '';
console.log(`[NativeVoice] blocked response ended (559), keeping block session=${session.sessionId}`);
return;
}
session.awaitingUpstreamReply = false;
session.blockUpstreamAudio = false;
sendJson(session.client, { type: 'assistant_pending', active: false });
flushAssistantStream(session, {
source: session.pendingAssistantSource || 'voice_bot',
toolName: session.pendingAssistantToolName || null,
meta: session.pendingAssistantMeta || null,
});
session.pendingAssistantSource = null;
session.pendingAssistantToolName = null;
session.pendingAssistantMeta = null;
return;
}
if (message.event === 450 || (message.event === 451 && !isFinalUserPayload(payload))) {
const text = extractUserText(payload);
if (text) {
console.log(`[NativeVoice] upstream partial session=${session.sessionId} text=${JSON.stringify(text.slice(0, 120))}`);
session.latestUserText = text;
// 用户开口说话时立即打断 AI 播放
if (session.directSpeakUntil && Date.now() < session.directSpeakUntil) {
console.log(`[NativeVoice] user barge-in (partial) session=${session.sessionId}`);
session.directSpeakUntil = 0;
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
clearTimeout(session.chatTTSTimer);
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
} else if (session.isSendingChatTTSText && (session.chatTTSUntil || 0) > Date.now()) {
console.log(`[NativeVoice] user barge-in chatTTS (partial) session=${session.sessionId}`);
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
clearTimeout(session.chatTTSTimer);
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
}
sendJson(session.client, {
type: 'subtitle',
role: 'user',
text,
isFinal: false,
sequence: `native_partial_${Date.now()}`,
});
}
return;
}
if (message.event === 459 || (message.event === 451 && isFinalUserPayload(payload))) {
const finalText = extractUserText(payload) || session.latestUserText || '';
console.log(`[NativeVoice] upstream final session=${session.sessionId} text=${JSON.stringify(finalText.slice(0, 120))}`);
if (session.directSpeakUntil && Date.now() < session.directSpeakUntil) {
console.log(`[NativeVoice] user interrupt during speaking session=${session.sessionId}`);
session.directSpeakUntil = 0;
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
clearTimeout(session.chatTTSTimer);
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
} else if (session.isSendingChatTTSText && (session.chatTTSUntil || 0) > Date.now()) {
console.log(`[NativeVoice] user interrupt chatTTS during speaking session=${session.sessionId}`);
session.isSendingChatTTSText = false;
session.chatTTSUntil = 0;
clearTimeout(session.chatTTSTimer);
sendJson(session.client, { type: 'tts_reset', reason: 'user_bargein' });
}
if (persistUserSpeech(session, finalText)) {
session.blockUpstreamAudio = true;
processReply(session, finalText).catch((error) => {
console.error('[NativeVoice] processReply error:', error.message);
});
}
return;
}
sendJson(session.client, {
type: 'event',
event: message.event,
payload,
});
}
function attachClientHandlers(session) {
session.client.on('message', async (raw, isBinary) => {
if (isBinary) {
if (session.upstream && session.upstream.readyState === WebSocket.OPEN && session.upstreamReady) {
session.upstream.send(createAudioMessage(session.sessionId, raw));
}
return;
}
let parsed;
try {
parsed = JSON.parse(raw.toString('utf8'));
} catch (error) {
sendJson(session.client, { type: 'error', error: 'invalid client json' });
return;
}
if (parsed.type === 'start') {
session.botName = parsed.botName || '豆包';
session.systemRole = parsed.systemRole || '你是一个企业知识库语音助手,请优先依据 external_rag 给出的内容回答。';
session.speakingStyle = parsed.speakingStyle || '请使用清晰、自然、简洁的口吻。';
session.speaker = parsed.speaker || process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts';
session.modelVersion = parsed.modelVersion || 'O';
session.greetingText = parsed.greetingText || session.greetingText || '嗨你好呀我是大沃你的专属智能助手。关于德国PM产品、一成系统、招商合作随时问我就好';
session.userId = parsed.userId || session.userId || null;
session.upstream = createUpstreamConnection(session);
loadHandoffSummaryForVoice(session).catch((error) => {
console.warn('[NativeVoice] async loadHandoffSummaryForVoice failed:', error.message);
});
return;
}
if (parsed.type === 'stop') {
session.client.close();
return;
}
if (parsed.type === 'text' && parsed.text) {
persistUserSpeech(session, parsed.text);
processReply(session, parsed.text).catch((error) => {
console.error('[NativeVoice] text processReply failed:', error.message);
});
}
});
session.client.on('close', () => {
clearTimeout(session.chatTTSTimer);
clearTimeout(session.greetingTimer);
clearTimeout(session.greetingAckTimer);
clearTimeout(session.readyTimer);
clearTimeout(session.idleTimer);
if (session.upstream && session.upstream.readyState === WebSocket.OPEN) {
session.upstream.close();
}
sessions.delete(session.sessionId);
});
}
function createUpstreamConnection(session) {
const upstream = new WebSocket('wss://openspeech.bytedance.com/api/v3/realtime/dialogue', {
headers: {
'X-Api-Resource-Id': 'volc.speech.dialog',
'X-Api-Access-Key': process.env.VOLC_S2S_TOKEN,
'X-Api-App-Key': process.env.VOLC_DIALOG_APP_KEY || 'PlgvMymc7f3tQnJ6',
'X-Api-App-ID': process.env.VOLC_S2S_APP_ID,
'X-Api-Connect-Id': session.sessionId,
},
});
upstream.on('open', () => {
upstream.send(createStartConnectionMessage());
upstream.send(createStartSessionMessage(session.sessionId, buildStartSessionPayload(session)));
});
upstream.on('message', (data, isBinary) => {
if (!isBinary && typeof data === 'string') {
sendJson(session.client, { type: 'server_text', text: data });
return;
}
handleUpstreamMessage(session, Buffer.isBuffer(data) ? data : Buffer.from(data));
});
upstream.on('error', (error) => {
console.error('[NativeVoice] upstream ws error:', error.message);
sendJson(session.client, { type: 'error', error: `语音服务连接异常: ${error.message}` });
});
upstream.on('close', (code) => {
console.log(`[NativeVoice] upstream closed session=${session.sessionId} code=${code}`);
session.upstreamReady = false;
sendJson(session.client, { type: 'upstream_closed', code });
setTimeout(() => {
if (session.client && session.client.readyState === WebSocket.OPEN) {
session.client.close();
}
}, 3000);
});
return upstream;
}
function createSession(client, sessionId) {
const session = {
sessionId,
client,
upstream: null,
upstreamReady: false,
isSendingChatTTSText: false,
latestUserText: '',
queuedUserText: '',
processingReply: false,
blockUpstreamAudio: false,
directSpeakUntil: 0,
queuedReplyTimer: null,
lastPersistedAssistantText: '',
lastPersistedAssistantAt: 0,
assistantStreamBuffer: '',
assistantStreamReplyId: '',
currentTtsType: '',
botName: '大沃',
systemRole: '你是大沃一个德国PM健康事业的智能语音助手。你对PM-FitLine细胞营养素产品、一成系统、招商合作非常熟悉。请优先依据 external_rag 给出的内容回答。用户进来时请自然地打个招呼,像朋友聊天一样,不要用客服话术。',
speakingStyle: '说话像朋友聊天一样自然轻松,语气亲切活泼,不要像客服念稿。',
speaker: process.env.VOLC_S2S_SPEAKER_ID || 'zh_female_vv_jupiter_bigtts',
modelVersion: 'O',
greetingText: '嗨你好呀我是大沃你的专属智能助手。关于德国PM产品、一成系统、招商合作随时问我就好',
hasSentGreeting: false,
greetingTimer: null,
greetingAckTimer: null,
pendingGreetingAck: false,
greetingRetryCount: 0,
readyTimer: null,
readySent: false,
handoffSummary: '',
handoffSummaryUsed: false,
awaitingUpstreamReply: false,
pendingAssistantSource: null,
pendingAssistantToolName: null,
pendingAssistantMeta: null,
idleTimer: null,
lastActivityAt: Date.now(),
};
sessions.set(sessionId, session);
attachClientHandlers(session);
return session;
}
function setupNativeVoiceGateway(server) {
const wss = new WebSocketServer({ server, path: '/ws/realtime-dialog' });
wss.on('connection', async (client, req) => {
const parsed = url.parse(req.url, true);
const sessionId = parsed.query?.sessionId;
if (!sessionId) {
client.close();
return;
}
const userId = parsed.query?.userId || null;
const session = createSession(client, sessionId);
session.userId = userId;
try {
await db.createSession(sessionId, userId, 'voice');
} catch (error) {
console.warn('[NativeVoice][DB] createSession failed:', error.message);
}
sendJson(client, { type: 'connected', sessionId });
});
return wss;
}
module.exports = {
setupNativeVoiceGateway,
};