507 lines
22 KiB
Python
507 lines
22 KiB
Python
import os
|
||
import json
|
||
from typing import Dict, Any, List
|
||
|
||
class LLMService:
|
||
"""大语言模型服务,支持本地Ollama或OpenAI API"""
|
||
|
||
def __init__(self):
|
||
self.llm_type = self._detect_llm_type()
|
||
self._initialize_llm()
|
||
|
||
def _detect_llm_type(self) -> str:
|
||
"""检测可用的LLM类型"""
|
||
# 优先使用 DeepSeek(如果已配置)
|
||
if os.getenv("DEEPSEEK_API_KEY") and os.getenv("USE_DEEPSEEK_LLM", "true").lower() == "true":
|
||
return "deepseek"
|
||
# 检查 OpenAI
|
||
elif os.getenv("OPENAI_API_KEY"):
|
||
return "openai"
|
||
# Coze(如果已配置)
|
||
elif os.getenv("COZE_API_KEY") and os.getenv("COZE_WORKFLOW_ID"):
|
||
return "coze"
|
||
elif os.getenv("OLLAMA_HOST") or self._check_ollama_available():
|
||
return "ollama"
|
||
else:
|
||
return "mock"
|
||
|
||
def _check_ollama_available(self) -> bool:
|
||
"""检查Ollama是否可用"""
|
||
try:
|
||
import requests
|
||
response = requests.get("http://localhost:11434/api/tags", timeout=2)
|
||
return response.status_code == 200
|
||
except:
|
||
return False
|
||
|
||
def _initialize_llm(self):
|
||
"""初始化LLM客户端"""
|
||
if self.llm_type == "deepseek":
|
||
try:
|
||
self.deepseek_api_key = os.getenv("DEEPSEEK_API_KEY")
|
||
self.deepseek_api_url = os.getenv("DEEPSEEK_API_BASE", "https://api.deepseek.com") + "/v1/chat/completions"
|
||
self.model = os.getenv("DEEPSEEK_MODEL", "deepseek-chat")
|
||
print(f"✓ 使用 DeepSeek API (模型: {self.model})")
|
||
except Exception as e:
|
||
print(f"⚠ DeepSeek 初始化失败: {e}")
|
||
self.llm_type = "mock"
|
||
|
||
elif self.llm_type == "openai":
|
||
try:
|
||
from openai import OpenAI
|
||
|
||
# 支持自定义API端点
|
||
api_key = os.getenv("OPENAI_API_KEY")
|
||
api_base = os.getenv("OPENAI_API_BASE")
|
||
|
||
if api_base:
|
||
self.client = OpenAI(api_key=api_key, base_url=api_base)
|
||
else:
|
||
self.client = OpenAI(api_key=api_key)
|
||
|
||
self.model = os.getenv("OPENAI_MODEL", "gpt-3.5-turbo")
|
||
print(f"✓ 使用 OpenAI API (模型: {self.model})")
|
||
except Exception as e:
|
||
print(f"⚠ OpenAI 初始化失败: {e}")
|
||
self.llm_type = "mock"
|
||
|
||
elif self.llm_type == "ollama":
|
||
try:
|
||
import requests
|
||
self.ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434")
|
||
# 默认使用已安装的 qwen2.5:7b 模型,如需更换可通过 OLLAMA_MODEL 环境变量覆盖
|
||
self.model = os.getenv("OLLAMA_MODEL", "qwen2.5:7b")
|
||
print(f"✓ 使用 Ollama (模型: {self.model})")
|
||
except Exception as e:
|
||
print(f"⚠ Ollama 初始化失败: {e}")
|
||
self.llm_type = "mock"
|
||
|
||
elif self.llm_type == "coze":
|
||
try:
|
||
# Coze 工作流调用所需配置,通过环境变量提供
|
||
self.coze_api_url = os.getenv("COZE_API_URL", "https://api.coze.cn/v1/workflow/run")
|
||
self.coze_api_key = os.getenv("COZE_API_KEY")
|
||
self.coze_workflow_id = os.getenv("COZE_WORKFLOW_ID")
|
||
|
||
if not self.coze_api_key or not self.coze_workflow_id:
|
||
raise ValueError("COZE_API_KEY 或 COZE_WORKFLOW_ID 未配置")
|
||
|
||
print("✓ 使用 Coze 工作流作为LLM")
|
||
except Exception as e:
|
||
print(f"⚠ Coze 初始化失败: {e}")
|
||
self.llm_type = "mock"
|
||
|
||
if self.llm_type == "mock":
|
||
print("✓ 使用模拟LLM模式(用于演示)")
|
||
|
||
def analyze_single_report(self, report_text: str) -> Dict[str, Any]:
|
||
"""分析单个报告"""
|
||
prompt = f"""请分析以下医疗报告,提取关键信息:
|
||
|
||
{report_text}
|
||
|
||
请提供:
|
||
1. 摘要
|
||
2. 关键发现
|
||
3. 异常指标
|
||
4. 风险评估
|
||
5. 建议
|
||
|
||
以JSON格式返回结果。
|
||
"""
|
||
|
||
if self.llm_type == "deepseek":
|
||
return self._call_deepseek(prompt)
|
||
elif self.llm_type == "openai":
|
||
return self._call_openai(prompt)
|
||
elif self.llm_type == "ollama":
|
||
return self._call_ollama(prompt)
|
||
elif self.llm_type == "coze":
|
||
# 对于 Coze,直接将原始报告文本传给工作流,由工作流内部负责解析与生成结构化结果
|
||
print(f" → 准备调用 Coze 工作流...")
|
||
coze_input = [{
|
||
"filename": "single_report",
|
||
"text": report_text,
|
||
}]
|
||
result = self._call_coze(coze_input) # 单个报告也作为数组传入
|
||
print(f" ← Coze 调用返回")
|
||
return result
|
||
else:
|
||
return self._mock_analysis(report_text)
|
||
|
||
def analyze_multiple_reports(self, report_texts: List[str]) -> Dict[str, Any]:
|
||
"""
|
||
分析多个报告(Coze专用)
|
||
report_texts: 报告文本的数组,每个元素是一个PDF的文本
|
||
"""
|
||
if self.llm_type == "coze":
|
||
print(f" → 准备调用 Coze 工作流(传入 {len(report_texts)} 个报告)...")
|
||
result = self._call_coze(report_texts)
|
||
print(f" ← Coze 调用返回")
|
||
return result
|
||
else:
|
||
# 其他LLM类型合并文本后调用
|
||
combined = "\n\n".join(report_texts)
|
||
return self.analyze_single_report(combined)
|
||
|
||
def _call_openai(self, prompt: str) -> Dict[str, Any]:
|
||
"""调用OpenAI API"""
|
||
try:
|
||
response = self.client.chat.completions.create(
|
||
model=self.model,
|
||
messages=[
|
||
{"role": "system", "content": "你是一位专业的医疗报告分析助手。"},
|
||
{"role": "user", "content": prompt}
|
||
],
|
||
temperature=0.7,
|
||
max_tokens=2000
|
||
)
|
||
|
||
content = response.choices[0].message.content
|
||
return self._parse_llm_response(content)
|
||
|
||
except Exception as e:
|
||
return {
|
||
"error": f"OpenAI API 调用失败: {str(e)}",
|
||
"summary": "分析失败",
|
||
"key_findings": [],
|
||
"abnormal_items": [],
|
||
"risk_assessment": "无法评估",
|
||
"recommendations": []
|
||
}
|
||
|
||
def _call_deepseek(self, prompt: str) -> Dict[str, Any]:
|
||
"""调用 DeepSeek API"""
|
||
try:
|
||
import requests
|
||
|
||
headers = {
|
||
"Authorization": f"Bearer {self.deepseek_api_key}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
data = {
|
||
"model": self.model,
|
||
"messages": [
|
||
{"role": "system", "content": "你是一位专业的医疗报告分析助手。请以JSON格式返回分析结果。"},
|
||
{"role": "user", "content": prompt}
|
||
],
|
||
"temperature": 0.3,
|
||
"max_tokens": 4000
|
||
}
|
||
|
||
response = requests.post(
|
||
self.deepseek_api_url,
|
||
headers=headers,
|
||
json=data,
|
||
timeout=120
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
content = response.json()["choices"][0]["message"]["content"]
|
||
return self._parse_llm_response(content)
|
||
else:
|
||
raise Exception(f"DeepSeek 返回错误: {response.status_code} - {response.text}")
|
||
|
||
except Exception as e:
|
||
return {
|
||
"error": f"DeepSeek API 调用失败: {str(e)}",
|
||
"summary": "分析失败",
|
||
"key_findings": [],
|
||
"abnormal_items": [],
|
||
"risk_assessment": "无法评估",
|
||
"recommendations": []
|
||
}
|
||
|
||
def _call_coze(self, report_texts: List[str]) -> Dict[str, Any]:
|
||
"""
|
||
调用 Coze 工作流 API(流式模式),对医疗报告进行分析
|
||
report_texts: 报告文本数组,每个元素是一个PDF的文本
|
||
"""
|
||
try:
|
||
import time
|
||
from cozepy import Coze, TokenAuth, COZE_CN_BASE_URL, WorkflowEventType
|
||
|
||
api_key = getattr(self, "coze_api_key", os.getenv("COZE_API_KEY"))
|
||
workflow_id = getattr(self, "coze_workflow_id", os.getenv("COZE_WORKFLOW_ID"))
|
||
max_retries = int(os.getenv("COZE_MAX_RETRIES", "3"))
|
||
|
||
if not api_key or not workflow_id:
|
||
raise ValueError("未配置 Coze API 所需的 COZE_API_KEY 或 COZE_WORKFLOW_ID")
|
||
|
||
print(f" → 调用 Coze 工作流(流式模式)...")
|
||
print(f" → Workflow ID: {workflow_id}")
|
||
print(f" → 数组元素个数: {len(report_texts)}")
|
||
total_chars = 0
|
||
for item in report_texts:
|
||
if isinstance(item, str):
|
||
total_chars += len(item)
|
||
elif isinstance(item, dict):
|
||
text_value = item.get("text")
|
||
if isinstance(text_value, str):
|
||
total_chars += len(text_value)
|
||
print(f" → 总文本长度: {total_chars} 字符")
|
||
print(f" → 请求发送时间: {time.strftime('%H:%M:%S')}")
|
||
|
||
# 初始化 Coze 客户端
|
||
coze = Coze(auth=TokenAuth(token=api_key), base_url=COZE_CN_BASE_URL)
|
||
|
||
# 添加请求开始时间
|
||
import time as time_module
|
||
start = time_module.time()
|
||
|
||
last_error = None
|
||
for attempt in range(max_retries):
|
||
try:
|
||
if attempt > 0:
|
||
print(f" → 重试 {attempt}/{max_retries - 1}...")
|
||
|
||
# 调用流式接口
|
||
stream = coze.workflows.runs.stream(
|
||
workflow_id=workflow_id,
|
||
parameters={"input": report_texts}
|
||
)
|
||
|
||
print(f" ✓ 已连接到流式接口,等待执行...")
|
||
|
||
# 处理事件流
|
||
content_result = None
|
||
event_count = 0
|
||
for event in stream:
|
||
event_count += 1
|
||
print(f" [事件 {event_count}] 类型: {event.event}")
|
||
|
||
if event.event == WorkflowEventType.MESSAGE:
|
||
# 打印进度信息
|
||
if hasattr(event, 'message') and event.message:
|
||
msg = event.message
|
||
node_type = getattr(msg, 'node_type', None)
|
||
node_title = getattr(msg, 'node_title', None)
|
||
node_is_finish = getattr(msg, 'node_is_finish', None)
|
||
content = getattr(msg, 'content', None)
|
||
|
||
print(f" 节点标题: {node_title}")
|
||
print(f" 节点类型: {node_type}")
|
||
print(f" 是否完成: {node_is_finish}")
|
||
print(f" 内容长度: {len(content) if content else 0}")
|
||
|
||
if node_title:
|
||
print(f" ⏳ 执行节点: {node_title} (类型: {node_type})")
|
||
|
||
# 检查是否为结束节点(使用 node_title 判断)
|
||
if node_title == "End" and node_is_finish and content:
|
||
print(f" ✓ 工作流执行完成,获取到结果")
|
||
content_result = content
|
||
break
|
||
|
||
elif event.event == WorkflowEventType.ERROR:
|
||
error_msg = str(event.error) if hasattr(event, 'error') else "Unknown error"
|
||
print(f" ✗ 错误事件: {error_msg}")
|
||
raise Exception(f"工作流执行错误: {error_msg}")
|
||
|
||
elif event.event == WorkflowEventType.INTERRUPT:
|
||
print(f" ⚠️ 工作流需要交互,暂不支持")
|
||
raise Exception("工作流需要人工交互,当前不支持")
|
||
|
||
if not content_result:
|
||
raise Exception("未获取到工作流执行结果")
|
||
|
||
elapsed = time_module.time() - start
|
||
print(f" → 收到完整结果 (耗时: {elapsed:.1f}秒)")
|
||
print(f" → 结果数据: {content_result[:200]}...")
|
||
|
||
# 解析 content 字段(通常包含 JSON 格式的输出)
|
||
# content 格式示例: {"output":"```json\n{...}\n```"}
|
||
if isinstance(content_result, str):
|
||
# 尝试解析为 JSON
|
||
try:
|
||
content_json = json.loads(content_result)
|
||
output = content_json.get("output", content_result)
|
||
except json.JSONDecodeError:
|
||
output = content_result
|
||
|
||
# 如果 output 包含 markdown 格式的 JSON,提取出来
|
||
if isinstance(output, str) and "```json" in output:
|
||
import re
|
||
json_match = re.search(r'```json\s*\n(.*?)\n```', output, re.DOTALL)
|
||
if json_match:
|
||
output = json_match.group(1)
|
||
|
||
# 尝试解析最终的 JSON
|
||
data = {"code": 0, "data": {"output": output}}
|
||
else:
|
||
data = {"code": 0, "data": {"output": content_result}}
|
||
|
||
# 参考间隔定时脚本的返回结构:{ code: 0, data: { output: ... } }
|
||
if isinstance(data, dict) and data.get("code") == 0:
|
||
raw_data = data.get("data", {})
|
||
if isinstance(raw_data, str):
|
||
try:
|
||
raw_data = json.loads(raw_data)
|
||
except json.JSONDecodeError:
|
||
# data.data 为字符串,直接按 LLM 文本解析
|
||
return self._parse_llm_response(raw_data)
|
||
|
||
# 期望 workflow 在 data.output 中返回结果
|
||
output = raw_data.get("output", raw_data)
|
||
|
||
# 如果 output 还是字符串,再次解析
|
||
if isinstance(output, str):
|
||
try:
|
||
output = json.loads(output)
|
||
print(f" ✓ Coze 返回的 output 需要二次解析")
|
||
except json.JSONDecodeError:
|
||
print(f" ⚠️ output 为字符串但无法解析为JSON,尝试文本解析")
|
||
return self._parse_llm_response(output)
|
||
|
||
if isinstance(output, dict):
|
||
# 如果已经是结构化结果,直接补齐字段
|
||
print(f" ✓ Coze 返回结构化数据")
|
||
print(f" → 包含字段: {list(output.keys())}")
|
||
|
||
result = output
|
||
required_fields = [
|
||
"summary",
|
||
"key_findings",
|
||
"abnormal_items",
|
||
"risk_assessment",
|
||
"recommendations",
|
||
]
|
||
for field in required_fields:
|
||
if field not in result:
|
||
result[field] = [] if field in [
|
||
"key_findings",
|
||
"abnormal_items",
|
||
"recommendations",
|
||
] else "未提供"
|
||
|
||
print(f" ✓✓ Coze 工作流调用成功!")
|
||
return result
|
||
|
||
if isinstance(output, str):
|
||
# output 为文本,通过原有 JSON 解析逻辑处理
|
||
return self._parse_llm_response(output)
|
||
|
||
# 其它类型(列表等),转为字符串后再解析
|
||
return self._parse_llm_response(json.dumps(output, ensure_ascii=False))
|
||
|
||
# code 非 0,视为错误
|
||
last_error = f"Coze API 返回非0 code: {data}"
|
||
print(f" ✗ Coze 返回错误: {last_error}")
|
||
|
||
except Exception as e: # 包含超时在内的所有请求异常
|
||
last_error = str(e)
|
||
print(f" ✗ Coze API 调用失败: {last_error}")
|
||
if attempt < max_retries - 1:
|
||
# 简单的递增退避等待
|
||
wait_time = (attempt + 1) * 3
|
||
print(f" → 等待 {wait_time} 秒后重试...")
|
||
time.sleep(wait_time)
|
||
else:
|
||
print(f" ✗✗ 已达最大重试次数,放弃调用")
|
||
break
|
||
|
||
print(f" ✗✗ Coze 工作流调用最终失败: {last_error}")
|
||
raise Exception(last_error or "Coze API 调用失败")
|
||
|
||
except Exception as e:
|
||
return {
|
||
"error": f"Coze API 调用失败: {str(e)}",
|
||
"summary": "分析失败",
|
||
"key_findings": [],
|
||
"abnormal_items": [],
|
||
"risk_assessment": "无法评估",
|
||
"recommendations": []
|
||
}
|
||
|
||
def _call_ollama(self, prompt: str) -> Dict[str, Any]:
|
||
"""调用Ollama API"""
|
||
try:
|
||
import requests
|
||
|
||
response = requests.post(
|
||
f"{self.ollama_host}/api/generate",
|
||
json={
|
||
"model": self.model,
|
||
"prompt": prompt,
|
||
"stream": False
|
||
},
|
||
timeout=300
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
content = response.json().get("response", "")
|
||
return self._parse_llm_response(content)
|
||
else:
|
||
raise Exception(f"Ollama 返回错误: {response.status_code}")
|
||
|
||
except Exception as e:
|
||
return {
|
||
"error": f"Ollama API 调用失败: {str(e)}",
|
||
"summary": "分析失败",
|
||
"key_findings": [],
|
||
"abnormal_items": [],
|
||
"risk_assessment": "无法评估",
|
||
"recommendations": []
|
||
}
|
||
|
||
def _parse_llm_response(self, response: str) -> Dict[str, Any]:
|
||
"""解析LLM响应"""
|
||
try:
|
||
# 尝试提取JSON内容
|
||
import re
|
||
|
||
# 查找JSON代码块
|
||
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', response, re.DOTALL)
|
||
if json_match:
|
||
json_str = json_match.group(1)
|
||
else:
|
||
# 查找裸JSON
|
||
json_match = re.search(r'\{.*\}', response, re.DOTALL)
|
||
if json_match:
|
||
json_str = json_match.group(0)
|
||
else:
|
||
json_str = response
|
||
|
||
result = json.loads(json_str)
|
||
|
||
# 验证必需字段
|
||
required_fields = ["summary", "key_findings", "abnormal_items", "risk_assessment", "recommendations"]
|
||
for field in required_fields:
|
||
if field not in result:
|
||
result[field] = [] if field in ["key_findings", "abnormal_items", "recommendations"] else "未提供"
|
||
|
||
return result
|
||
|
||
except:
|
||
# 解析失败,返回原始文本
|
||
return {
|
||
"summary": "无法解析LLM响应",
|
||
"raw_response": response,
|
||
"key_findings": [],
|
||
"abnormal_items": [],
|
||
"risk_assessment": "解析失败",
|
||
"recommendations": []
|
||
}
|
||
|
||
def _mock_analysis(self, report_text: str) -> Dict[str, Any]:
|
||
"""模拟分析结果"""
|
||
return {
|
||
"summary": "这是一份血常规检查报告。根据报告内容,各项指标均在正常参考范围内,未发现明显异常。",
|
||
"key_findings": [
|
||
"白细胞计数: 6.5×10^9/L,正常范围",
|
||
"红细胞计数: 4.8×10^12/L,正常范围",
|
||
"血红蛋白: 145 g/L,正常范围",
|
||
"血小板计数: 220×10^9/L,正常范围"
|
||
],
|
||
"abnormal_items": [],
|
||
"risk_assessment": "低风险。所有检测指标均在正常范围内,未发现需要关注的异常项。建议定期体检,保持健康生活方式。",
|
||
"recommendations": [
|
||
"继续保持良好的生活习惯",
|
||
"定期进行健康体检(建议每年一次)",
|
||
"保持均衡饮食和适量运动",
|
||
"如有不适症状,及时就医"
|
||
],
|
||
"note": "这是一个模拟的分析结果。实际使用时请配置 OpenAI API 或本地 Ollama 模型。"
|
||
}
|