Files
yiliao/backend/services/pdf_service.py

268 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from pathlib import Path
from datetime import datetime
from typing import Dict, Any
from jinja2 import Environment, FileSystemLoader
class PDFService:
"""PDF报告生成服务"""
def __init__(self):
# 模板目录
self.template_dir = Path(__file__).parent.parent / "templates"
self.template_dir.mkdir(exist_ok=True)
# 输出目录
self.output_dir = Path(__file__).parent.parent / "generated_reports"
self.output_dir.mkdir(exist_ok=True)
# 初始化Jinja2环境
self.jinja_env = Environment(loader=FileSystemLoader(str(self.template_dir)))
def generate_report(
self,
filename: str,
analysis: Dict[str, Any],
llm_type: str = "Coze Workflow"
) -> str:
"""
生成PDF报告
Args:
filename: 原始文件名
analysis: 分析结果
llm_type: 使用的LLM类型
Returns:
生成的PDF文件路径
"""
try:
# 准备模板数据
template_data = self._prepare_template_data(filename, analysis, llm_type)
# 渲染HTML
html_content = self._render_html(template_data)
# 生成PDF
pdf_path = self._generate_pdf(html_content, filename)
return pdf_path
except Exception as e:
raise Exception(f"PDF生成失败: {str(e)}")
def _prepare_template_data(
self,
filename: str,
analysis: Dict[str, Any],
llm_type: str
) -> Dict[str, Any]:
"""准备模板数据"""
# 处理 key_findings
key_findings = analysis.get("key_findings", [])
if key_findings:
# 如果是对象数组,提取文本
key_findings = [
item.get("finding", item.get("text", str(item)))
if isinstance(item, dict) else str(item)
for item in key_findings
]
# 处理 abnormal_items
abnormal_items = analysis.get("abnormal_items", [])
if abnormal_items:
processed_items = []
for item in abnormal_items:
if isinstance(item, dict):
processed_items.append(item)
else:
processed_items.append({"name": str(item)})
abnormal_items = processed_items
# 处理 risk_assessment
risk_assessment = analysis.get("risk_assessment", "未提供")
if isinstance(risk_assessment, dict):
# 如果是对象,转换为文本
parts = []
if risk_assessment.get("high_risk"):
parts.append(f"【高风险】{'; '.join(risk_assessment['high_risk'])}")
if risk_assessment.get("medium_risk"):
parts.append(f"【中风险】{'; '.join(risk_assessment['medium_risk'])}")
if risk_assessment.get("low_risk"):
parts.append(f"【低风险】{'; '.join(risk_assessment['low_risk'])}")
risk_assessment = "\n".join(parts) if parts else "未检测到明确风险"
# 处理 recommendations
recommendations = analysis.get("recommendations", [])
if recommendations:
recommendations = [
item.get("recommendation", item.get("text", str(item)))
if isinstance(item, dict) else str(item)
for item in recommendations
]
return {
"filename": filename,
"analysis_date": datetime.now().strftime("%Y年%m月%d"),
"llm_type": llm_type,
"summary": analysis.get("summary", "暂无摘要"),
"key_findings": key_findings,
"abnormal_items": abnormal_items,
"risk_assessment": risk_assessment,
"recommendations": recommendations,
"generation_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
def _render_html(self, template_data: Dict[str, Any]) -> str:
"""渲染HTML模板"""
template = self.jinja_env.get_template("report_template.html")
return template.render(**template_data)
def _generate_pdf(self, html_content: str, original_filename: str) -> str:
"""将HTML转换为PDF"""
try:
# 生成PDF文件名
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_name = Path(original_filename).stem
pdf_filename = f"{base_name}_分析报告_{timestamp}.pdf"
pdf_path = self.output_dir / pdf_filename
# 尝试使用 WeasyPrint推荐质量更好
try:
from weasyprint import HTML, CSS
HTML(string=html_content).write_pdf(
str(pdf_path),
stylesheets=[CSS(string='@page { size: A4; margin: 1cm; }')]
)
except ImportError:
# 降级到 xhtml2pdf更简单无需额外依赖
print(" WeasyPrint 未安装,使用 xhtml2pdf 生成PDF...")
from xhtml2pdf import pisa
with open(pdf_path, "wb") as pdf_file:
pisa_status = pisa.CreatePDF(html_content, dest=pdf_file)
if pisa_status.err:
raise Exception("xhtml2pdf 生成失败")
return str(pdf_path)
except Exception as e:
raise Exception(f"PDF转换失败: {str(e)}")
def get_pdf_file(self, pdf_path: str) -> bytes:
"""读取PDF文件内容"""
if not os.path.exists(pdf_path):
raise FileNotFoundError("PDF文件不存在")
with open(pdf_path, "rb") as f:
return f.read()
def generate_comprehensive_report(
self,
patient_name: str,
template_data: Dict[str, Any]
) -> str:
"""
生成综合健康报告(多份报告整合)
Args:
patient_name: 患者姓名
template_data: 包含所有报告数据和分析结果的字典
Returns:
生成的PDF文件路径
"""
try:
# 准备综合报告模板数据
comprehensive_data = self._prepare_comprehensive_data(patient_name, template_data)
# 渲染HTML使用综合报告模板
html_content = self._render_comprehensive_html(comprehensive_data)
# 生成PDF
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
pdf_filename = f"{patient_name}_综合健康报告_{timestamp}.pdf"
pdf_path = self.output_dir / pdf_filename
try:
from weasyprint import HTML, CSS
HTML(string=html_content).write_pdf(
str(pdf_path),
stylesheets=[CSS(string='@page { size: A4; margin: 1cm; }')]
)
except ImportError:
# 如果 WeasyPrint 不可用,使用 xhtml2pdf
from xhtml2pdf import pisa
with open(pdf_path, "wb") as pdf_file:
pisa_status = pisa.CreatePDF(html_content, dest=pdf_file)
if pisa_status.err:
raise Exception("xhtml2pdf 生成失败")
return str(pdf_path)
except Exception as e:
raise Exception(f"综合报告生成失败: {str(e)}")
def _prepare_comprehensive_data(
self,
patient_name: str,
template_data: Dict[str, Any]
) -> Dict[str, Any]:
"""准备综合报告模板数据"""
analysis = template_data.get("analysis", {})
# 处理分析结果(与单报告相同的逻辑)
key_findings = analysis.get("key_findings", [])
if key_findings:
key_findings = [
item.get("finding", item.get("text", str(item)))
if isinstance(item, dict) else str(item)
for item in key_findings
]
abnormal_items = analysis.get("abnormal_items", [])
if abnormal_items:
processed_items = []
for item in abnormal_items:
if isinstance(item, dict):
processed_items.append(item)
else:
processed_items.append({"name": str(item)})
abnormal_items = processed_items
risk_assessment = analysis.get("risk_assessment", "未提供")
if isinstance(risk_assessment, dict):
parts = []
if risk_assessment.get("high_risk"):
parts.append(f"【高风险】{'; '.join(risk_assessment['high_risk'])}")
if risk_assessment.get("medium_risk"):
parts.append(f"【中风险】{'; '.join(risk_assessment['medium_risk'])}")
if risk_assessment.get("low_risk"):
parts.append(f"【低风险】{'; '.join(risk_assessment['low_risk'])}")
risk_assessment = "\n".join(parts) if parts else "未检测到明确风险"
recommendations = analysis.get("recommendations", [])
if recommendations:
recommendations = [
item.get("recommendation", item.get("text", str(item)))
if isinstance(item, dict) else str(item)
for item in recommendations
]
return {
"patient_name": patient_name,
"report_count": template_data.get("report_count", 0),
"report_list": template_data.get("report_list", []),
"generation_date": template_data.get("generation_date", datetime.now().strftime("%Y年%m月%d")),
"summary": analysis.get("summary", "暂无摘要"),
"key_findings": key_findings,
"abnormal_items": abnormal_items,
"risk_assessment": risk_assessment,
"recommendations": recommendations,
"generation_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
def _render_comprehensive_html(self, template_data: Dict[str, Any]) -> str:
"""渲染综合报告HTML模板"""
template = self.jinja_env.get_template("comprehensive_report_template.html")
return template.render(**template_data)