Files
bigwo/test2/server/services/kbRetriever.js

449 lines
15 KiB
JavaScript
Raw Normal View History

const axios = require('axios');
const https = require('https');
const crypto = require('crypto');
const redisClient = require('./redisClient');
// HTTP keep-alive agent复用TCP连接
const kbHttpAgent = new https.Agent({
keepAlive: true,
keepAliveMsecs: 30000,
maxSockets: 6,
timeout: 15000,
});
// ============ Volcengine SignerV4 (minimal) ============
function hmacSHA256(key, data) {
return crypto.createHmac('sha256', key).update(data, 'utf8').digest();
}
function sha256Hex(data) {
return crypto.createHash('sha256').update(data, 'utf8').digest('hex');
}
function signRequest({ method, host, path, body, ak, sk, service, region }) {
const now = new Date();
const dateStamp = now.toISOString().replace(/[-:]/g, '').slice(0, 8);
const amzDate = now.toISOString().replace(/[-:]/g, '').replace(/\.\d{3}/, '');
const credentialScope = `${dateStamp}/${region}/${service}/request`;
const bodyHash = sha256Hex(body || '');
const headers = {
'content-type': 'application/json',
'host': host,
'x-content-sha256': bodyHash,
'x-date': amzDate,
};
const signedHeaders = Object.keys(headers).sort().join(';');
const canonicalHeaders = Object.keys(headers).sort().map(k => `${k}:${headers[k]}\n`).join('');
const canonicalRequest = [method, path, '', canonicalHeaders, signedHeaders, bodyHash].join('\n');
const stringToSign = ['HMAC-SHA256', amzDate, credentialScope, sha256Hex(canonicalRequest)].join('\n');
let signingKey = hmacSHA256(sk, dateStamp);
signingKey = hmacSHA256(signingKey, region);
signingKey = hmacSHA256(signingKey, service);
signingKey = hmacSHA256(signingKey, 'request');
const signature = hmacSHA256(signingKey, stringToSign).toString('hex');
return {
...headers,
'authorization': `HMAC-SHA256 Credential=${ak}/${credentialScope}, SignedHeaders=${signedHeaders}, Signature=${signature}`,
};
}
// 默认 KB ID → VikingDB collection name 映射
const DEFAULT_COLLECTION_MAP = {
'kb-ad2e0ea30902421b': 'product_details',
'kb-d45d3056a7b75ac5': 'faq_qa',
'kb-d0ef0b7b8f36a839': 'science_training',
'kb-6a170ab7b1bc024f': 'system_training',
'kb-a69b0928e1714de7': 'test',
};
// 连接预热:服务启动后自动建立到 VikingDB API 的 TLS 连接
setTimeout(() => {
const ak = process.env.VOLC_ACCESS_KEY_ID;
if (ak) {
axios.get('https://api-knowledgebase.mlp.cn-beijing.volces.com/', {
timeout: 5000,
httpsAgent: kbHttpAgent,
}).catch(() => {});
console.log('[KBRetriever] VikingDB connection pool warmup sent');
}
}, 2500);
// ============ 配置读取 ============
function getConfig() {
const authKey = process.env.VOLC_ARK_API_KEY || process.env.VOLC_ACCESS_KEY_ID;
const ak = process.env.VOLC_ACCESS_KEY_ID;
const sk = process.env.VOLC_SECRET_ACCESS_KEY;
const kbEndpointId = process.env.VOLC_ARK_KNOWLEDGE_ENDPOINT_ID || process.env.VOLC_ARK_ENDPOINT_ID;
const kbModel = process.env.VOLC_ARK_KB_MODEL || kbEndpointId;
const kbIds = (process.env.VOLC_ARK_KNOWLEDGE_BASE_IDS || '').split(',').map(id => id.trim()).filter(Boolean);
const retrievalTopK = parseInt(process.env.VOLC_ARK_KB_RETRIEVAL_TOP_K) || 25;
const threshold = parseFloat(process.env.VOLC_ARK_KNOWLEDGE_THRESHOLD) || 0.1;
const rerankerModel = process.env.VOLC_ARK_RERANKER_MODEL || process.env.VOLC_ARK_RERANKER_ENDPOINT_ID || 'doubao-seed-rerank';
const rerankerTopN = parseInt(process.env.VOLC_ARK_RERANKER_TOP_N) || 5;
const enableReranker = process.env.ENABLE_RERANKER !== 'false';
const enableRedisContext = process.env.ENABLE_REDIS_CONTEXT !== 'false';
const retrievalMode = process.env.VOLC_ARK_KB_RETRIEVAL_MODE || 'raw';
// VikingDB collection 映射:环境变量覆盖或使用默认映射
let collectionMap = DEFAULT_COLLECTION_MAP;
if (process.env.VIKINGDB_COLLECTION_MAP) {
try { collectionMap = JSON.parse(process.env.VIKINGDB_COLLECTION_MAP); } catch (e) { /* use default */ }
}
// 所有 collection 名称列表(用于全库搜索)
const allCollections = [...new Set(Object.values(collectionMap))];
return {
authKey,
ak,
sk,
kbEndpointId,
kbModel,
kbIds,
retrievalTopK,
threshold,
rerankerModel,
rerankerTopN,
enableReranker,
enableRedisContext,
retrievalMode,
collectionMap,
allCollections,
};
}
// ============ VikingDB 单 collection 搜索 ============
async function searchVikingDB(collectionName, query, limit, config) {
const host = 'api-knowledgebase.mlp.cn-beijing.volces.com';
const apiPath = '/api/knowledge/collection/search_knowledge';
const requestBody = {
project: 'default',
name: collectionName,
query: query,
limit: limit,
pre_processing: { need_instruction: true, rewrite: false },
dense_weight: 0.5,
post_processing: { rerank_switch: false, chunk_group: false },
};
const bodyStr = JSON.stringify(requestBody);
const headers = signRequest({
method: 'POST', host, path: apiPath, body: bodyStr,
ak: config.ak, sk: config.sk,
service: 'air', region: 'cn-north-1',
});
const response = await axios.post(`https://${host}${apiPath}`, bodyStr, {
headers,
timeout: 10000,
httpsAgent: kbHttpAgent,
});
if (response.data?.code !== 0) {
console.warn(`[KBRetriever] VikingDB search "${collectionName}" error: ${response.data?.message}`);
return [];
}
const resultList = response.data?.data?.result_list || [];
return resultList.map((item, idx) => ({
id: item.chunk_id || item.id || `vdb_${collectionName}_${idx}`,
content: (item.content || '').replace(/<KBTable>/g, '').trim(),
score: item.score || 0,
doc_name: item.doc_info?.doc_name || item.doc_info?.title || '',
chunk_title: item.chunk_title || '',
metadata: item.doc_info || {},
collection: collectionName,
})).filter(c => c.content);
}
// ============ 1. 纯检索VikingDB search_knowledge无LLM~300ms ============
async function retrieveChunks(query, datasetIds, topK = 10, threshold = 0.1) {
const config = getConfig();
// 检查 AK/SK 配置
if (!config.ak || !config.sk) {
console.warn('[KBRetriever] retrieveChunks skipped: AK/SK not configured');
return { chunks: [], error: 'aksk_not_configured' };
}
const effectiveQuery = (query && query.trim()) ? query : '请介绍你们的产品和服务';
const startTime = Date.now();
// 确定要搜索的 collection根据 datasetIds 映射,或搜索全部
const effectiveDatasetIds = (Array.isArray(datasetIds) && datasetIds.length > 0)
? datasetIds
: config.kbIds;
let collectionNames = [];
if (effectiveDatasetIds.length > 0) {
collectionNames = effectiveDatasetIds
.map(id => config.collectionMap[id])
.filter(Boolean);
}
// 如果没有映射,搜索所有 collection
if (collectionNames.length === 0) {
collectionNames = config.allCollections;
}
if (collectionNames.length === 0) {
console.warn('[KBRetriever] retrieveChunks skipped: no collections to search');
return { chunks: [], error: 'no_collections' };
}
// 并行搜索所有相关 collection
const perCollectionLimit = Math.max(3, Math.ceil(topK / collectionNames.length));
const searchPromises = collectionNames.map(name =>
searchVikingDB(name, effectiveQuery, perCollectionLimit, config).catch(err => {
console.warn(`[KBRetriever] VikingDB search "${name}" failed: ${err.message}`);
return [];
})
);
const results = await Promise.all(searchPromises);
// 合并所有结果,按分数排序
let allChunks = results.flat();
allChunks.sort((a, b) => (b.score || 0) - (a.score || 0));
allChunks = allChunks.slice(0, topK);
// 按阈值过滤
const beforeFilter = allChunks.length;
if (threshold > 0) {
allChunks = allChunks.filter(c => (c.score || 0) >= threshold);
}
const latencyMs = Date.now() - startTime;
const topScore = allChunks.length > 0 ? allChunks[0].score?.toFixed(3) : 'N/A';
console.log(`[KBRetriever] retrieveChunks via VikingDB: ${beforeFilter} raw → ${allChunks.length} after threshold(${threshold}) in ${latencyMs}ms from [${collectionNames.join(',')}] topScore=${topScore}`);
return {
chunks: allChunks,
latencyMs,
kbHasContent: allChunks.length > 0,
usage: {},
hasReferences: true,
};
}
// ============ 2. 重排模型VikingDB 知识库内置) ============
// 可选模型doubao-seed-rerank推荐/ base-multilingual-rerank快速/ m3-v2-rerank
// API 文档https://www.volcengine.com/docs/84313/1254474
async function rerankChunks(query, chunks, topN = 3) {
const config = getConfig();
if (!chunks || chunks.length === 0) {
return [];
}
// 如果 chunks 数量 <= topN直接返回
if (chunks.length <= topN) {
return chunks;
}
if (!config.enableReranker) {
console.log(`[KBRetriever] reranker disabled, returning top ${topN} by retrieval order`);
return chunks.slice(0, topN);
}
try {
const startTime = Date.now();
// VikingDB rerank 请求格式:每个 item 包含 query + content
const datas = chunks.map(c => ({
query: query,
content: c.content || '',
title: c.doc_name || '',
}));
const body = {
rerank_model: config.rerankerModel,
datas: datas,
};
const rerankHost = 'api-knowledgebase.mlp.cn-beijing.volces.com';
const rerankPath = '/api/knowledge/service/rerank';
const bodyStr = JSON.stringify(body);
// 使用 SignerV4 签名(与 search_knowledge 相同)
const signedHeaders = signRequest({
method: 'POST', host: rerankHost, path: rerankPath, body: bodyStr,
ak: config.ak, sk: config.sk,
service: 'air', region: 'cn-north-1',
});
const response = await axios.post(
`https://${rerankHost}${rerankPath}`,
bodyStr,
{
headers: signedHeaders,
timeout: 5000,
}
);
const latencyMs = Date.now() - startTime;
const responseData = response.data;
// VikingDB 返回格式:{code: 0, data: [score1, score2, ...]} 或 {data: {scores: [...]}}
let scores = [];
if (responseData?.data?.scores && Array.isArray(responseData.data.scores)) {
scores = responseData.data.scores;
} else if (Array.isArray(responseData?.data)) {
scores = responseData.data;
}
if (scores.length > 0 && scores.length === chunks.length) {
// 将分数与 chunks 配对,按分数降序排列
const scored = chunks.map((chunk, idx) => ({
...chunk,
score: scores[idx] ?? chunk.score,
reranked: true,
}));
const reranked = scored
.sort((a, b) => (b.score || 0) - (a.score || 0))
.slice(0, topN);
console.log(`[KBRetriever] reranked ${chunks.length}${reranked.length} chunks in ${latencyMs}ms (${config.rerankerModel}), scores=[${reranked.map(c => (c.score || 0).toFixed(3)).join(',')}]`);
return reranked;
}
console.warn(`[KBRetriever] reranker returned ${scores.length} scores (expected ${chunks.length}) in ${latencyMs}ms, fallback to retrieval order`);
return chunks.slice(0, topN);
} catch (err) {
const errDetail = err.response?.data?.message || err.message;
console.warn(`[KBRetriever] reranker failed: ${errDetail}, fallback to retrieval order`);
return chunks.slice(0, topN);
}
}
// ============ 3. 构建 RAG payload ============
function buildRagPayload(rerankedChunks, conversationHistory = []) {
const ragItems = [];
// 语气引导:让 S2S 用口语化方式复述 KB 内容,保持与自由对话一致的语音风格
ragItems.push({
title: '回答要求',
content: '用口语化、简洁的方式回答,像朋友聊天一样自然地说出来,不要念稿、不要播音腔。先给结论,再补充关键信息。',
});
// 注入对话上下文(如果有)
if (conversationHistory && conversationHistory.length > 0) {
const contextLines = conversationHistory.map(msg => {
const roleName = msg.role === 'user' ? '用户' : '助手';
return `${roleName}: ${msg.content}`;
});
ragItems.push({
title: '对话上下文',
content: contextLines.join('\n'),
});
}
// 注入重排后的 KB 片段
for (let i = 0; i < rerankedChunks.length; i++) {
const chunk = rerankedChunks[i];
ragItems.push({
title: chunk.doc_name || `知识库片段${i + 1}`,
content: chunk.content,
});
}
return ragItems;
}
// ============ 4. 主入口:检索 → 重排 → 组装 ============
async function searchAndRerank(query, opts = {}) {
const {
datasetIds = null,
sessionId = null,
session = null,
originalQuery = null,
} = opts;
const config = getConfig();
const startTime = Date.now();
// Step 1: 纯检索(用极低阈值,让 reranker 做质量判断)
const RETRIEVAL_THRESHOLD = 0.01;
const retrievalResult = await retrieveChunks(
query,
datasetIds,
config.retrievalTopK,
RETRIEVAL_THRESHOLD
);
if (retrievalResult.error) {
return {
hit: false,
reason: retrievalResult.error,
chunks: [],
rerankedChunks: [],
ragPayload: [],
latencyMs: Date.now() - startTime,
source: 'ark_knowledge',
};
}
if (retrievalResult.chunks.length === 0) {
return {
hit: false,
reason: retrievalResult.kbHasContent ? 'chunks_parse_failed' : 'no_relevant_content',
chunks: [],
rerankedChunks: [],
ragPayload: [],
latencyMs: Date.now() - startTime,
source: 'ark_knowledge',
};
}
// Step 2: 重排
const rerankedChunks = await rerankChunks(
originalQuery || query,
retrievalResult.chunks,
config.rerankerTopN
);
// Step 3: 获取对话上下文Redis → 降级 MySQL
let conversationHistory = [];
if (config.enableRedisContext && sessionId) {
const redisHistory = await redisClient.getRecentHistory(sessionId, 5);
if (redisHistory && redisHistory.length > 0) {
conversationHistory = redisHistory;
console.log(`[KBRetriever] loaded ${redisHistory.length} history items from Redis`);
}
}
// Step 4: 组装 payload
const ragPayload = buildRagPayload(rerankedChunks, conversationHistory);
// Step 5: 判断 hit/no-hit
// 基于重排分数判断:最高分 > 0.3 视为 hit
const topScore = rerankedChunks.length > 0 ? (rerankedChunks[0].score || 0) : 0;
const hitThreshold = config.enableReranker && config.rerankerModel ? 0.1 : 0.3;
const hit = rerankedChunks.length > 0 && topScore >= hitThreshold;
const totalLatencyMs = Date.now() - startTime;
console.log(`[KBRetriever] searchAndRerank completed in ${totalLatencyMs}ms: ${retrievalResult.chunks.length} retrieved → ${rerankedChunks.length} reranked, hit=${hit}, topScore=${topScore.toFixed(3)}`);
return {
query,
originalQuery: originalQuery || query,
hit,
reason: hit ? 'reranked_hit' : 'below_threshold',
chunks: retrievalResult.chunks,
rerankedChunks,
ragPayload,
topScore,
latencyMs: totalLatencyMs,
retrievalLatencyMs: retrievalResult.latencyMs,
source: 'ark_knowledge',
hasReferences: retrievalResult.hasReferences,
usage: retrievalResult.usage,
};
}
module.exports = {
retrieveChunks,
rerankChunks,
buildRagPayload,
searchAndRerank,
getConfig,
};