Files
yiliao/backend/services/batch_report_service.py

802 lines
32 KiB
Python
Raw Permalink Normal View History

import os
import tempfile
from pathlib import Path
from typing import List, Dict, Any
from datetime import datetime
# 导入 DeepSeek 健康内容生成服务
from services.deepseek_health_service import DeepSeekHealthService
class BatchReportService:
"""批量报告处理服务"""
def __init__(self, ocr_service, llm_service, pdf_service, template_service):
self.ocr_service = ocr_service
self.llm_service = llm_service
self.pdf_service = pdf_service
self.template_service = template_service
# 初始化 DeepSeek 健康内容生成服务
self.deepseek_health_service = DeepSeekHealthService()
# 临时文件目录
self.temp_dir = Path(tempfile.gettempdir()) / "medical_reports_temp"
self.temp_dir.mkdir(exist_ok=True)
def process_multiple_reports(
self,
file_paths: List[str],
patient_name: str = "患者"
) -> Dict[str, Any]:
"""
处理多个报告文件并生成综合健康报告
新流程直接将文件传给 Coze 工作流处理
Args:
file_paths: 临时上传的多个PDF文件路径列表
patient_name: 患者姓名
Returns:
包含分析结果和生成的PDF路径的字典
"""
try:
print(f"正在处理 {len(file_paths)} 份报告...")
# 准备文件信息列表
file_infos = []
for idx, file_path in enumerate(file_paths, 1):
filename = Path(file_path).name
print(f" [{idx}/{len(file_paths)}] 准备文件: {filename}")
file_infos.append({
"filename": filename,
"filepath": file_path
})
# 调用分析(会根据 LLM 类型选择不同的处理方式)
print("正在进行综合分析...")
combined_analysis = self._analyze_with_files(file_infos)
# 使用 DeepSeek 生成健康评估和建议内容
if self.deepseek_health_service.is_available():
health_content = self.deepseek_health_service.generate_health_content(combined_analysis)
if health_content:
combined_analysis["health_assessment"] = health_content.get("health_assessment", {})
combined_analysis["health_advice"] = health_content.get("health_advice", {})
# 更新异常项DeepSeek 可能提供更详细的信息)
if health_content.get("abnormal_items"):
combined_analysis["abnormal_items_detailed"] = health_content["abnormal_items"]
else:
print("\n ⚠️ DeepSeek API Key 未配置,跳过健康评估和建议生成")
# 生成综合报告 PDF
print("\n正在生成健康报告...")
pdf_path = self._generate_comprehensive_report(
patient_name=patient_name,
reports=file_infos,
analysis=combined_analysis
)
# 清理临时文件
print("正在清理临时文件...")
self._cleanup_temp_files(file_paths)
return {
"success": True,
"patient_name": patient_name,
"report_count": len(file_paths),
"analysis": combined_analysis,
"pdf_path": pdf_path,
"generated_at": datetime.now().isoformat()
}
except Exception as e:
# 即使出错也要清理临时文件
self._cleanup_temp_files(file_paths)
raise Exception(f"批量处理失败: {str(e)}")
def _analyze_with_files(self, file_infos: List[Dict[str, str]]) -> Dict[str, Any]:
"""
综合分析流程两阶段处理
1. OCR 提取所有文件的文本
2. Coze 分析文本 返回 JSON
3. Ollama 处理 Coze JSON 生成 Be.U 风格报告
"""
# 第1步OCR 提取文本
print(" [步骤1] OCR 提取文本...")
extracted_texts = []
for idx, file_info in enumerate(file_infos, 1):
print(f" [{idx}/{len(file_infos)}] 识别: {file_info['filename']}")
text = self.ocr_service.extract_text(file_info["filepath"])
extracted_texts.append({
"filename": file_info["filename"],
"text": text
})
# 第2-3步LLM 分析Coze → Ollama 或 纯 Ollama
print(" [步骤2-3] 综合分析与报告生成...")
return self._analyze_combined_reports(extracted_texts)
def _analyze_with_coze_files(self, file_infos: List[Dict[str, str]]) -> Dict[str, Any]:
"""
使用 Coze 文件上传 + 工作流处理
1. 上传文件到 Coze 获取 file_id
2. 分批调用工作流每批最多 3 个文件
3. 合并结果
"""
import requests
import json
import time
api_key = os.getenv("COZE_API_KEY")
workflow_id = os.getenv("COZE_WORKFLOW_ID")
if not api_key or not workflow_id:
raise ValueError("未配置 Coze API 所需的 COZE_API_KEY 或 COZE_WORKFLOW_ID")
# 第1步上传所有文件获取 file_id
file_ids = []
for idx, file_info in enumerate(file_infos, 1):
print(f" [{idx}/{len(file_infos)}] 上传: {file_info['filename']}")
try:
file_id = self._upload_file_to_coze(
file_path=file_info['filepath'],
api_key=api_key
)
file_ids.append({
"filename": file_info['filename'],
"file_id": file_id
})
print(f" ✓ File ID: {file_id}")
except Exception as e:
print(f" ✗ 上传失败: {e}")
raise Exception(f"文件上传失败: {file_info['filename']}, {e}")
# 第2步一次性调用工作流处理所有文件
print(f"\n [步骤2] 调用 Coze 工作流分析 {len(file_ids)} 个文件...")
# 构造请求参数input 是字符串数组,每个元素是 JSON 字符串
input_params = []
for file_data in file_ids:
# 每个元素是 JSON 字符串格式:"{\"file_id\":\"xxx\"}"
json_str = json.dumps({"file_id": file_data["file_id"]}, ensure_ascii=False)
input_params.append(json_str)
print(f" - {file_data['filename']}: {file_data['file_id']}")
# 调用工作流
try:
final_result = self._call_coze_workflow(
workflow_id=workflow_id,
api_key=api_key,
input_params=input_params
)
print(f" ✓ 工作流处理完成")
except Exception as e:
print(f" ✗ 工作流调用失败: {e}")
raise
# 保存结果缓存
try:
cache_file = Path("coze_result_cache.json")
cache_data = {
"timestamp": time.strftime('%Y-%m-%d %H:%M:%S'),
"report_count": len(file_ids),
"coze_result": final_result,
"file_ids": file_ids
}
cache_file.write_text(json.dumps(cache_data, ensure_ascii=False, indent=2), encoding='utf-8')
print(f" → Coze 结果已缓存到: {cache_file.absolute()}")
except Exception as e:
print(f" ⚠️ 缓存保存失败: {e}")
return final_result
def _upload_file_to_coze(self, file_path: str, api_key: str) -> str:
"""
上传文件到 Coze 获取 file_id
"""
import requests
upload_url = "https://api.coze.cn/v1/files/upload"
headers = {
"Authorization": f"Bearer {api_key}"
}
with open(file_path, 'rb') as f:
files = {
'file': (Path(file_path).name, f, 'application/octet-stream')
}
response = requests.post(
upload_url,
headers=headers,
files=files,
timeout=60
)
if response.status_code != 200:
raise Exception(f"上传失败 (HTTP {response.status_code}): {response.text}")
data = response.json()
# 解析返回的 file_id
if data.get("code") == 0 and data.get("data"):
file_id = data["data"].get("id") or data["data"].get("file_id")
if file_id:
return file_id
raise Exception(f"未能获取 file_id: {data}")
def _call_coze_workflow(self, workflow_id: str, api_key: str, input_params: List[str]) -> Dict[str, Any]:
"""
调用 Coze 工作流使用流式接口
input_params: 字符串数组每个元素是 JSON 字符串格式的 file_id
"""
from cozepy import Coze, TokenAuth, COZE_CN_BASE_URL, WorkflowEventType
# 初始化 Coze 客户端
coze = Coze(auth=TokenAuth(token=api_key), base_url=COZE_CN_BASE_URL)
print(f" → 调用工作流 (file_id 数量: {len(input_params)})...")
# 调用流式工作流
import time as time_module
start = time_module.time()
stream = coze.workflows.stream_run(
workflow_id=workflow_id,
parameters={"input": input_params}
)
content_result = None
for event in stream:
if event.event == WorkflowEventType.MESSAGE:
if hasattr(event, 'message'):
msg = event.message
node_title = getattr(msg, 'node_title', None)
node_is_finish = getattr(msg, 'node_is_finish', None)
content = getattr(msg, 'content', None)
if node_title == "End" and node_is_finish and content:
content_result = content
break
elif event.event == WorkflowEventType.ERROR:
error_msg = str(event.error) if hasattr(event, 'error') else "Unknown error"
raise Exception(f"工作流执行错误: {error_msg}")
elapsed = time_module.time() - start
if not content_result:
raise Exception("未获取到工作流执行结果")
print(f" ✓ 工作流完成 (耗时: {elapsed:.1f}秒)")
# 解析结果
import json
try:
if isinstance(content_result, str):
result_data = json.loads(content_result)
else:
result_data = content_result
# 提取 output
if isinstance(result_data, dict) and "output" in result_data:
output = result_data["output"]
if isinstance(output, str):
output = json.loads(output)
return output
return result_data
except json.JSONDecodeError as e:
raise Exception(f"解析工作流结果失败: {e}")
def _call_coze_with_files(self, file_infos: List[Dict[str, str]]) -> Dict[str, Any]:
"""
直接将文件传给 Coze 工作流处理
"""
try:
import requests
import json
# 读取 Coze 配置
api_url = os.getenv("COZE_API_URL", "https://api.coze.cn/v1/workflow/run")
api_key = os.getenv("COZE_API_KEY")
workflow_id = os.getenv("COZE_WORKFLOW_ID")
max_retries = int(os.getenv("COZE_MAX_RETRIES", "3"))
if not api_key or not workflow_id:
raise ValueError("未配置 Coze API 所需的 COZE_API_KEY 或 COZE_WORKFLOW_ID")
# 准备文件数据(根据 Coze API 要求构造)
# 如果 Coze 需要文件内容,读取并转换为 base64
files_data = []
for file_info in file_infos:
filepath = file_info["filepath"]
filename = file_info["filename"]
# 读取文件内容并转换为 base64
with open(filepath, 'rb') as f:
import base64
file_content = base64.b64encode(f.read()).decode('utf-8')
files_data.append({
"filename": filename,
"content": file_content,
"type": "application/pdf" if filename.endswith('.pdf') else "image/jpeg"
})
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# 构造请求体
payload = {
"workflow_id": workflow_id,
"parameters": {
"input": files_data # 传递文件数组
}
}
print(f" 正在调用 Coze 工作流处理 {len(files_data)} 个文件...")
last_error = None
for attempt in range(max_retries):
try:
response = requests.post(api_url, headers=headers, json=payload, timeout=300)
response.raise_for_status()
data = response.json()
# 解析 Coze 返回结果
if isinstance(data, dict) and data.get("code") == 0:
raw_data = data.get("data", {})
if isinstance(raw_data, str):
try:
raw_data = json.loads(raw_data)
except json.JSONDecodeError:
pass
output = raw_data.get("output", raw_data)
# 使用 Ollama 增强 Coze 结果
print(" [阶段2] 使用 Ollama 优化报告内容...")
final_result = self._enhance_with_ollama(output, f"处理了 {len(files_data)} 个文件")
return final_result
last_error = f"Coze API 返回非0 code: {data}"
except Exception as e:
last_error = str(e)
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 3
print(f" 重试 {attempt + 1}/{max_retries}...")
import time
time.sleep(wait_time)
else:
break
raise Exception(last_error or "Coze API 调用失败")
except Exception as e:
print(f" ⚠ Coze 处理失败: {e}")
# 降级到 OCR + Ollama 方式
print(" 降级到 OCR + Ollama 处理...")
extracted_texts = []
for file_info in file_infos:
text = self.ocr_service.extract_text(file_info["filepath"])
extracted_texts.append({
"filename": file_info["filename"],
"text": text
})
return self._analyze_combined_reports(extracted_texts)
def _analyze_combined_reports(self, reports: List[Dict[str, str]]) -> Dict[str, Any]:
"""
综合分析流程两阶段处理
- 如果使用 Coze
阶段1: Coze 工作流处理数据 返回结构化 JSON
阶段2: Ollama 分析 JSON 生成适配 Be.U 模板的专业报告
- 如果使用其他LLM
直接使用该 LLM 生成报告
"""
if self.llm_service.llm_type == "coze":
# === 两阶段处理Coze + Ollama ===
# 【阶段1】Coze 工作流分析
print(" [阶段1/2] Coze 工作流分析中...")
print(f" - 处理 {len(reports)} 份报告")
# Coze 工作流有执行超时限制,超过阈值时分批处理
BATCH_SIZE = 3 # 每批最多 3 个报告
import json
# 原始文本列表(给 Ollama 使用)
original_texts: List[str] = []
# 所有批次的 Coze 结果
all_coze_results: List[Dict[str, Any]] = []
# 分批处理
total_batches = (len(reports) + BATCH_SIZE - 1) // BATCH_SIZE
if total_batches > 1:
print(f" - 报告数量较多,将分 {total_batches} 批处理(每批 {BATCH_SIZE} 个)")
for batch_idx in range(total_batches):
start_idx = batch_idx * BATCH_SIZE
end_idx = min(start_idx + BATCH_SIZE, len(reports))
batch_reports = reports[start_idx:end_idx]
if total_batches > 1:
print(f"\n [批次 {batch_idx + 1}/{total_batches}] 处理报告 {start_idx + 1}-{end_idx}")
# 准备当前批次的数据
coze_inputs: List[Dict[str, str]] = []
for report in batch_reports:
filename = report["filename"]
text = report["text"]
# 保留一份可读的原始文本
original_text = f"【文件名】{filename}\n【内容】\n{text}"
original_texts.append(original_text)
# 构造传给 Coze 的 JSON 对象
coze_obj = {
"filename": filename,
"text": text,
}
coze_inputs.append(coze_obj)
print(f" - {filename}: {len(text)} 字符")
print(f" - 本批次元素个数: {len(coze_inputs)}")
# 保存当前批次的调试信息
if total_batches > 1:
debug_file = Path(f"debug_batch_{batch_idx + 1}.json")
else:
debug_file = Path("debug_ocr_texts.json")
try:
final_payload = {
"workflow_id": os.getenv("COZE_WORKFLOW_ID", ""),
"parameters": {
"input": coze_inputs
}
}
debug_file.write_text(json.dumps(final_payload, ensure_ascii=False, indent=2), encoding='utf-8')
print(f" → Payload 已保存: {debug_file.name}")
except Exception as e:
print(f" ⚠️ 保存调试文件失败: {e}")
# 调用 Coze 处理当前批次
print(f" → 调用 Coze 工作流...")
batch_result = self.llm_service.analyze_multiple_reports(coze_inputs)
# 检查当前批次是否成功
if batch_result.get("error"):
error_msg = batch_result.get('error')
print(f" ✗ 批次 {batch_idx + 1} 失败: {error_msg}")
raise Exception(f"Coze 工作流调用失败: {error_msg}")
print(f" ✓ 批次 {batch_idx + 1} 完成")
all_coze_results.append(batch_result)
# 合并所有批次的结果
print(f"\n ✓ 所有批次处理完成,合并结果...")
coze_result = self._merge_batch_results(all_coze_results)
# 保存 Coze 返回结果用于后续测试
try:
import time
cache_file = Path("coze_result_cache.json")
cache_data = {
"timestamp": time.strftime('%Y-%m-%d %H:%M:%S'),
"report_count": len(reports),
"coze_result": coze_result,
"original_texts": original_texts
}
cache_file.write_text(json.dumps(cache_data, ensure_ascii=False, indent=2), encoding='utf-8')
print(f" → Coze 结果已缓存到: {cache_file.absolute()}")
except Exception as e:
print(f" ⚠️ 缓存保存失败: {e}")
# 【阶段2】Ollama 优化生成
print(" [阶段2/2] Ollama 生成 Be.U 风格报告...")
print(" - 将 Coze JSON 转换为专业报告内容")
print(" - 适配 Be.U Wellness Center 模板")
# 合并原始文本供 Ollama 参考(仍然使用人类可读的文本,而不是 JSON 字符串)
combined_text = "\n\n".join(original_texts)
final_analysis = self._enhance_with_ollama(coze_result, combined_text)
print(" ✓ 综合报告生成完成")
return final_analysis
else:
# === 单阶段:直接使用当前 LLM ===
print(f" 使用 {self.llm_service.llm_type} 直接生成报告...")
print(f" - 处理 {len(reports)} 份报告")
# 合并所有报告文本
combined_text = "\n\n=== 报告分隔 ===\n\n".join([
f"【文件名】{report['filename']}\n【内容】\n{report['text']}"
for report in reports
])
analysis = self.llm_service.analyze_single_report(combined_text)
print(" ✓ 报告生成完成")
return analysis
def _merge_batch_results(self, batch_results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
合并多个批次的 Coze 结果
"""
if len(batch_results) == 1:
return batch_results[0]
print(f" - 合并 {len(batch_results)} 个批次的结果...")
# 合并结果
merged = {
"summary": "",
"key_findings": [],
"abnormal_items": [],
"risk_assessment": "",
"recommendations": []
}
# 收集所有字段
summaries = []
risk_assessments = []
for idx, result in enumerate(batch_results, 1):
# 摘要
if result.get("summary"):
summaries.append(f"批次{idx}: {result['summary']}")
# 关键发现
if result.get("key_findings"):
merged["key_findings"].extend(result["key_findings"])
# 异常指标
if result.get("abnormal_items"):
merged["abnormal_items"].extend(result["abnormal_items"])
# 风险评估
if result.get("risk_assessment"):
risk_assessments.append(f"批次{idx}: {result['risk_assessment']}")
# 建议
if result.get("recommendations"):
merged["recommendations"].extend(result["recommendations"])
# 合并摘要和风险评估
merged["summary"] = "\n\n".join(summaries) if summaries else "未提供摘要"
merged["risk_assessment"] = "\n\n".join(risk_assessments) if risk_assessments else "未提供风险评估"
print(f" - 合并后: 关键发现 {len(merged['key_findings'])} 项, "
f"异常指标 {len(merged['abnormal_items'])} 项, "
f"建议 {len(merged['recommendations'])}")
return merged
def _enhance_with_ollama(self, coze_result: Dict[str, Any], original_text: str) -> Dict[str, Any]:
"""
使用 Ollama 分析 Coze 返回的 JSON生成适配 Be.U 模板的最终报告内容
"""
try:
import requests
import json
# 构建给 Ollama 的提示词
prompt = f"""你是一位专业的医疗报告撰写专家。现在需要基于 Coze 工作流返回的结构化数据,生成一份适合 Be.U Wellness Center 风格的功能医学健康报告。
Coze 工作流返回的数据
{json.dumps(coze_result, ensure_ascii=False, indent=2)}
原始检测报告文本
{original_text}
请基于以上信息生成一份专业的综合健康报告包含以下部分JSON格式
1. summary: 综合健康摘要整体评估语言专业且易懂
2. key_findings: 关键发现列表提取最重要的检测结果
3. abnormal_items: 异常指标详情包含 name, value, reference, level
4. risk_assessment: 健康风险评估基于所有指标的综合分析
5. recommendations: 个性化健康建议具体可执行的建议
要求
- 语言专业但易于理解
- 突出重点和异常项
- 提供可操作的健康建议
- 使用 Be.U Wellness Center 的专业风格
- 必须返回完整的 JSON 格式
请直接返回 JSON不要有其他文字"""
# 调用 Ollama
ollama_host = os.getenv("OLLAMA_HOST", "http://localhost:11434")
ollama_model = os.getenv("OLLAMA_MODEL", "qwen2.5:7b")
response = requests.post(
f"{ollama_host}/api/generate",
json={
"model": ollama_model,
"prompt": prompt,
"stream": False
},
timeout=300
)
if response.status_code == 200:
content = response.json().get("response", "")
# 解析 Ollama 返回的 JSON
return self._parse_ollama_response(content)
else:
print(f" ⚠ Ollama 调用失败,使用 Coze 原始结果")
return coze_result
except Exception as e:
print(f" ⚠ Ollama 增强失败: {e},使用 Coze 原始结果")
return coze_result
def _parse_ollama_response(self, response: str) -> Dict[str, Any]:
"""解析 Ollama 返回的 JSON"""
try:
import re
import json
# 尝试提取 JSON
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', response, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
json_str = json_match.group(0)
else:
json_str = response
result = json.loads(json_str)
# 确保必需字段存在
required_fields = ["summary", "key_findings", "abnormal_items", "risk_assessment", "recommendations"]
for field in required_fields:
if field not in result:
result[field] = [] if field in ["key_findings", "abnormal_items", "recommendations"] else "未提供"
return result
except Exception as e:
print(f" ⚠ JSON 解析失败: {e}")
return {
"summary": "解析失败",
"key_findings": [],
"abnormal_items": [],
"risk_assessment": "无法生成",
"recommendations": []
}
def _direct_ollama_analysis(self, combined_text: str) -> Dict[str, Any]:
"""
Coze 失败后的降级方案直接使用 Ollama 生成完整报告
"""
try:
import requests
import json
print(" → 使用 Ollama 模型生成完整报告...")
# 构建 Ollama 提示词
prompt = f"""你是一位专业的医疗报告分析助手。请分析以下医疗报告,提供专业的综合健康评估。
医疗报告内容
{combined_text}
请按以下 JSON 格式返回分析结果
{{
"summary": "综合健康摘要2-3句话",
"key_findings": ["关键发现1", "关键发现2", "..."],
"abnormal_items": [
{{
"name": "指标名称",
"result": "测量值",
"reference": "参考范围",
"level": "high/low/normal"
}}
],
"risk_assessment": "健康风险评估(综合说明)",
"recommendations": ["建议1", "建议2", "..."]
}}
请严格按照 JSON 格式返回不要添加其他说明文字"""
# 调用 Ollama
response = requests.post(
"http://localhost:11434/api/generate",
json={
"model": "qwen2.5:7b",
"prompt": prompt,
"stream": False
},
timeout=180
)
if response.status_code == 200:
ollama_response = response.json().get("response", "")
print(f" ✓ Ollama 响应完成")
# 解析 JSON
import re
json_match = re.search(r'\{.*\}', ollama_response, re.DOTALL)
if json_match:
result = json.loads(json_match.group(0))
# 确保必需字段存在
required_fields = ["summary", "key_findings", "abnormal_items", "risk_assessment", "recommendations"]
for field in required_fields:
if field not in result:
result[field] = [] if field in ["key_findings", "abnormal_items", "recommendations"] else "未提供"
return result
else:
raise ValueError("无法解析 Ollama 返回的 JSON")
else:
raise Exception(f"Ollama API 返回错误: {response.status_code}")
except Exception as e:
print(f" ⚠️ Ollama 降级方案也失败: {e}")
return {
"summary": "由于系统问题,暂时无法生成完整分析",
"key_findings": ["OCR 文本提取完成", "分析服务暂时不可用"],
"abnormal_items": [],
"risk_assessment": "建议稍后重试或使用其他方式分析",
"recommendations": ["联系技术支持", "检查系统配置"]
}
def _generate_comprehensive_report(
self,
patient_name: str,
reports: List[Dict[str, str]],
analysis: Dict[str, Any]
) -> str:
"""生成综合健康报告 PDF"""
# 准备扩展的模板数据
template_data = {
"patient_name": patient_name,
"report_count": len(reports),
"report_list": [r["filename"] for r in reports],
"analysis": analysis,
"generation_date": datetime.now().strftime("%Y年%m月%d")
}
# 生成 PDF使用增强的模板
pdf_path = self.pdf_service.generate_comprehensive_report(
patient_name=patient_name,
template_data=template_data
)
return pdf_path
def _cleanup_temp_files(self, file_paths: List[str]):
"""清理临时文件"""
for file_path in file_paths:
try:
if os.path.exists(file_path):
os.remove(file_path)
print(f" ✓ 已删除临时文件: {Path(file_path).name}")
except Exception as e:
print(f" ⚠ 删除临时文件失败 {Path(file_path).name}: {e}")