Files
yiliao/backend/extra_items_handler.py

695 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
处理PDF中有但模板中没有的检测项目
- 识别额外项目
- 调用DeepSeek进行分类
- 在对应模块末尾插入表格
"""
import json
import requests
from typing import Dict, List, Tuple
from pathlib import Path
from docx import Document
from docx.shared import Pt, Cm
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.enum.table import WD_TABLE_ALIGNMENT
from docx.oxml.ns import qn
from docx.oxml import OxmlElement
from copy import deepcopy
def clean_reference_range(reference: str) -> str:
"""清理参考范围格式:去掉括号,将<X转换为0-X"""
import re
if not reference:
return reference
ref = reference.strip()
# 去掉各种括号
if ref.startswith('(') and ref.endswith(')'):
ref = ref[1:-1]
elif ref.startswith('') and ref.endswith(''):
ref = ref[1:-1]
elif ref.startswith('[') and ref.endswith(']'):
ref = ref[1:-1]
if ref.startswith('('):
ref = ref[1:]
if ref.endswith(')'):
ref = ref[:-1]
if ref.startswith(''):
ref = ref[1:]
if ref.endswith(''):
ref = ref[:-1]
ref = ref.strip()
# 将 <X 或 ≤X 转换为 0-X 格式
match = re.match(r'^[<≤]\s*([\d\.]+)\s*$', ref)
if match:
upper_value = match.group(1)
ref = f"0-{upper_value}"
match = re.match(r'^<=\s*([\d\.]+)\s*$', ref)
if match:
upper_value = match.group(1)
ref = f"0-{upper_value}"
return ref.strip()
class ExtraItemsHandler:
"""处理模板中没有的额外检测项目"""
def __init__(self, api_key: str = None):
self.api_key = api_key
self.api_url = "https://api.deepseek.com/v1/chat/completions"
# 加载ABB配置
from config import load_abb_config
self.abb_config = load_abb_config()
# 构建已知ABB集合包括别名
self.known_abbs = set()
for abb in self.abb_config.get('abb_list', []):
self.known_abbs.add(abb.upper())
for alias in self.abb_config.get('abb_aliases', {}).keys():
self.known_abbs.add(alias.upper())
# 模块关键词映射(用于在文档中定位模块)
self.module_keywords = {
'Urine Test': ['urine detection', 'urine test', '尿液检测'],
'Complete Blood Count': ['complete blood count', 'blood count', '血常规'],
'Blood Sugar': ['blood sugar', 'glucose', '血糖'],
'Lipid Profile': ['lipid profile', 'lipid panel', '血脂'],
'Blood Type': ['blood type', '血型'],
'Blood Coagulation': ['blood coagulation', 'coagulation', '凝血功能'],
'Four Infectious Diseases': ['infectious disease', '传染病'],
'Serum Electrolytes': ['electrolyte', '电解质'],
'Liver Function': ['liver function', '肝功能'],
'Kidney Function': ['kidney function', '肾功能'],
'Myocardial Enzyme': ['myocardial enzyme', 'cardiac enzyme', '心肌酶'],
'Thyroid Function': ['thyroid function', '甲状腺'],
'Thromboembolism': ['thromboembolism', 'cardiovascular risk', '心脑血管'],
'Bone Metabolism': ['bone metabolism', '骨代谢'],
'Microelement': ['microelement', 'trace element', '微量元素'],
'Lymphocyte Subpopulation': ['lymphocyte subpopulation', '淋巴细胞亚群'],
'Humoral Immunity': ['humoral immunity', 'immunoglobulin', '体液免疫'],
'Inflammatory Reaction': ['inflammatory', 'inflammation', '炎症'],
'Autoantibody': ['autoantibody', '自身抗体'],
'Female Hormone': ['female hormone', '女性激素'],
'Male Hormone': ['male hormone', '男性激素'],
'Tumor Markers': ['tumor marker', '肿瘤标志物'],
'Imaging': ['imaging', '影像'],
'Female-specific': ['female-specific', 'gynecological', '女性专项'],
'Other Tests': ['other test', '其他检测']
}
def identify_extra_items(self, extracted_items: List[Dict]) -> List[Dict]:
"""
识别模板中没有的额外项目
Args:
extracted_items: OCR提取的所有项目
Returns:
额外项目列表
"""
extra_items = []
for item in extracted_items:
abb = item.get('abb', '').upper()
# 跳过空ABB
if not abb:
continue
# 检查是否在已知ABB中
if abb not in self.known_abbs:
extra_items.append(item)
print(f" 识别到 {len(extra_items)} 个额外项目(模板中没有)")
return extra_items
def classify_items_with_deepseek(self, extra_items: List[Dict]) -> Dict[str, List[Dict]]:
"""
使用DeepSeek对额外项目进行分类
Args:
extra_items: 额外项目列表
Returns:
{模块名: [项目列表]}
"""
if not extra_items:
return {}
if not self.api_key:
print(" ⚠️ 未配置DeepSeek API Key使用默认分类")
return self._default_classify(extra_items)
# 构建项目描述
items_desc = []
for item in extra_items:
desc = f"- ABB: {item.get('abb', '')}, 项目名: {item.get('project', '')}"
if item.get('result'):
desc += f", 结果: {item.get('result', '')}"
if item.get('unit'):
desc += f" {item.get('unit', '')}"
items_desc.append(desc)
# 获取可用模块列表
modules = list(self.abb_config.get('modules', {}).keys())
prompt = f"""你是医学检验专家,请将以下检测项目分类到对应的检测模块中。
## 待分类的检测项目:
{chr(10).join(items_desc)}
## 可用的检测模块:
{', '.join(modules)}
## 要求:
1. 根据项目的医学属性,将每个项目分配到最合适的模块
2. 如果项目不属于任何已有模块,分配到 "Other Tests"
3. 返回JSON格式
## 输出格式:
```json
{{
"模块名1": ["ABB1", "ABB2"],
"模块名2": ["ABB3"],
...
}}
```
只返回JSON不要其他说明。"""
try:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
response = requests.post(
self.api_url,
headers=headers,
json={
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"max_tokens": 2000
},
timeout=60
)
if response.status_code == 200:
content = response.json()['choices'][0]['message']['content']
# 解析JSON
if '```json' in content:
content = content.split('```json')[1].split('```')[0]
elif '```' in content:
content = content.split('```')[1].split('```')[0]
classification = json.loads(content.strip())
# 将ABB映射回完整项目数据
result = {}
abb_to_item = {item['abb'].upper(): item for item in extra_items}
for module, abbs in classification.items():
result[module] = []
for abb in abbs:
abb_upper = abb.upper()
if abb_upper in abb_to_item:
result[module].append(abb_to_item[abb_upper])
print(f" ✓ DeepSeek分类完成: {len(result)} 个模块")
return result
else:
print(f" ⚠️ DeepSeek API错误: {response.status_code}")
return self._default_classify(extra_items)
except Exception as e:
print(f" ⚠️ DeepSeek分类失败: {e}")
return self._default_classify(extra_items)
def _default_classify(self, extra_items: List[Dict]) -> Dict[str, List[Dict]]:
"""默认分类逻辑当DeepSeek不可用时"""
# 简单的关键词匹配分类
result = {'Other Tests': []}
keyword_to_module = {
'crp': 'Inflammatory Reaction',
'esr': 'Inflammatory Reaction',
'hs-crp': 'Inflammatory Reaction',
'tgab': 'Thyroid Function',
'tpoab': 'Thyroid Function',
'ery': 'Urine Test',
'cib': 'Microelement',
'mib': 'Microelement',
}
for item in extra_items:
abb_lower = item.get('abb', '').lower()
project_lower = item.get('project', '').lower()
classified = False
for keyword, module in keyword_to_module.items():
if keyword in abb_lower or keyword in project_lower:
if module not in result:
result[module] = []
result[module].append(item)
classified = True
break
if not classified:
result['Other Tests'].append(item)
# 移除空模块
result = {k: v for k, v in result.items() if v}
return result
def generate_clinical_significance(self, items: List[Dict]) -> Dict[str, Dict[str, str]]:
"""
为额外项目生成临床意义解释
Args:
items: 项目列表
Returns:
{ABB: {"clinical_en": "...", "clinical_cn": "..."}}
"""
if not items or not self.api_key:
return {}
items_desc = []
for item in items:
desc = f"- {item.get('abb', '')}: {item.get('project', '')}"
if item.get('result'):
desc += f", 结果: {item.get('result', '')}"
items_desc.append(desc)
prompt = f"""你是医学检验专家,请为以下检测项目生成简短的临床意义解释。
## 检测项目:
{chr(10).join(items_desc)}
## 要求:
1. 每个项目提供英文和中文解释
2. 解释简洁约30-50字
3. 说明该指标的临床意义
## 输出格式JSON
```json
{{
"ABB1": {{
"clinical_en": "English explanation...",
"clinical_cn": "中文解释..."
}}
}}
```
只返回JSON。"""
try:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
response = requests.post(
self.api_url,
headers=headers,
json={
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1,
"max_tokens": 4000
},
timeout=60
)
if response.status_code == 200:
content = response.json()['choices'][0]['message']['content']
if '```json' in content:
content = content.split('```json')[1].split('```')[0]
elif '```' in content:
content = content.split('```')[1].split('```')[0]
return json.loads(content.strip())
except Exception as e:
print(f" ⚠️ 生成临床意义失败: {e}")
return {}
def find_module_position(self, doc: Document, module_name: str) -> int:
"""
在文档中找到指定模块的最后一个表格位置
Args:
doc: Word文档对象
module_name: 模块名称
Returns:
模块最后一个表格在body中的索引-1表示未找到
"""
keywords = self.module_keywords.get(module_name, [module_name.lower()])
body = doc._body._body
children = list(body)
module_start_idx = -1
module_end_idx = -1
# 找到模块开始位置
for i, elem in enumerate(children):
text = ''.join(elem.itertext()).strip().lower()
for kw in keywords:
if kw in text:
module_start_idx = i
break
if module_start_idx >= 0:
break
if module_start_idx < 0:
return -1
# 找到模块结束位置(下一个模块开始或文档结束)
all_module_keywords = []
for kws in self.module_keywords.values():
all_module_keywords.extend(kws)
for i in range(module_start_idx + 1, len(children)):
text = ''.join(children[i].itertext()).strip().lower()
# 检查是否是另一个模块的开始
for kw in all_module_keywords:
if kw in text and kw not in keywords:
module_end_idx = i
break
if module_end_idx >= 0:
break
if module_end_idx < 0:
module_end_idx = len(children)
# 在模块范围内找最后一个表格
last_table_idx = -1
for i in range(module_start_idx, module_end_idx):
if children[i].tag.endswith('}tbl'):
last_table_idx = i
return last_table_idx
def create_item_table(self, doc: Document, item: Dict, clinical_en: str = "", clinical_cn: str = "") -> any:
"""
创建单个检测项目的表格
Args:
doc: Word文档对象
item: 项目数据
clinical_en: 英文临床意义
clinical_cn: 中文临床意义
Returns:
创建的表格元素
"""
# 创建表格4行6列
table = doc.add_table(rows=4, cols=6)
table.alignment = WD_TABLE_ALIGNMENT.CENTER
table.autofit = False
# 设置列宽
widths = [Cm(2.5), Cm(3.5), Cm(2.5), Cm(2.5), Cm(2.5), Cm(2.5)]
for row in table.rows:
for idx, width in enumerate(widths):
row.cells[idx].width = width
def set_font(run, bold=False, font_size=10.5):
run.bold = bold
run.font.name = 'Times New Roman'
run.font.size = Pt(font_size)
run._element.rPr.rFonts.set(qn('w:eastAsia'), '宋体')
# Row 0: 空行(顶部边框)
row0 = table.rows[0]
row0.height = Cm(0.05)
for cell in row0.cells:
cell.text = ''
# Row 1: 表头
header_row = table.rows[1]
headers = [
('Abb', '简称'), ('Project', '项目'), ('Result', '结果'),
('Point', '提示'), ('Refer', '参考'), ('Unit', '单位')
]
for idx, (en, cn) in enumerate(headers):
p = header_row.cells[idx].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(f'{en}\n{cn}')
set_font(run, bold=True, font_size=9)
# Row 2: 数据行
data_row = table.rows[2]
# ABB
p = data_row.cells[0].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(item.get('abb', ''))
set_font(run, bold=True)
# 项目名
p = data_row.cells[1].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(item.get('project', ''))
set_font(run, bold=True)
# 结果
p = data_row.cells[2].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(str(item.get('result', '')))
set_font(run)
# Point
p = data_row.cells[3].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(item.get('point', ''))
set_font(run)
# 参考范围
p = data_row.cells[4].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(clean_reference_range(item.get('reference', '')))
set_font(run, font_size=9)
# 单位
p = data_row.cells[5].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(item.get('unit', ''))
set_font(run, font_size=9)
# Row 3: 临床意义(合并单元格)
sig_row = table.rows[3]
top_cell = sig_row.cells[0]
for i in range(1, 6):
top_cell.merge(sig_row.cells[i])
p = top_cell.paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.LEFT
if clinical_en:
run = p.add_run('Clinical Significance: ')
set_font(run, bold=True, font_size=9)
run = p.add_run(clinical_en)
set_font(run, font_size=9)
run = p.add_run('\n')
if clinical_cn:
run = p.add_run('临床意义:')
set_font(run, bold=True, font_size=9)
run = p.add_run(clinical_cn)
set_font(run, font_size=9)
# 设置边框
self._set_table_borders(table)
return table._tbl
def _set_table_borders(self, table):
"""设置表格边框样式"""
def set_cell_border(cell, **kwargs):
tc = cell._tc
tcPr = tc.get_or_add_tcPr()
tcBorders = OxmlElement('w:tcBorders')
for edge in ['top', 'left', 'bottom', 'right']:
if edge in kwargs:
element = OxmlElement(f'w:{edge}')
element.set(qn('w:val'), kwargs[edge].get('val', 'single'))
element.set(qn('w:sz'), str(kwargs[edge].get('sz', 4)))
element.set(qn('w:color'), kwargs[edge].get('color', '000000'))
tcBorders.append(element)
tcPr.append(tcBorders)
border_solid = {'val': 'single', 'sz': 4, 'color': '000000'}
border_dashed = {'val': 'dashed', 'sz': 4, 'color': 'AAAAAA'}
for i, row in enumerate(table.rows):
for cell in row.cells:
top = border_solid if i == 0 else border_dashed
set_cell_border(cell, top=top, bottom=border_dashed,
left=border_dashed, right=border_dashed)
cell.vertical_alignment = 1
def insert_extra_items_to_doc(self, doc_path: str, classified_items: Dict[str, List[Dict]],
explanations: Dict[str, Dict[str, str]] = None) -> str:
"""
将额外项目插入到文档对应模块末尾
Args:
doc_path: 文档路径
classified_items: {模块名: [项目列表]}
explanations: {ABB: {"clinical_en": "...", "clinical_cn": "..."}}
Returns:
处理后的文档路径
"""
if not classified_items:
print(" 没有额外项目需要插入")
return doc_path
explanations = explanations or {}
doc = Document(doc_path)
body = doc._body._body
inserted_count = 0
for module_name, items in classified_items.items():
if not items:
continue
print(f" 处理模块 [{module_name}]: {len(items)} 个项目")
# 找到模块位置
insert_pos = self.find_module_position(doc, module_name)
if insert_pos < 0:
print(f" ⚠️ 未找到模块 [{module_name}],跳过")
continue
# 为每个项目创建表格并插入
for item in items:
abb = item.get('abb', '').upper()
exp = explanations.get(abb, {})
clinical_en = exp.get('clinical_en', '')
clinical_cn = exp.get('clinical_cn', '')
# 创建表格
table_elem = self.create_item_table(doc, item, clinical_en, clinical_cn)
# 插入到指定位置后面
children = list(body)
if insert_pos < len(children):
children[insert_pos].addnext(table_elem)
insert_pos += 1 # 更新位置,下一个表格插入到这个后面
inserted_count += 1
print(f" ✓ 插入 {abb}")
# 保存文档
doc.save(doc_path)
print(f" ✓ 共插入 {inserted_count} 个额外项目表格")
return doc_path
def process_extra_items(extracted_items: List[Dict], doc_path: str, api_key: str = None) -> str:
"""
处理额外项目的主函数
Args:
extracted_items: OCR提取的所有项目
doc_path: 已填充的文档路径
api_key: DeepSeek API Key
Returns:
处理后的文档路径
"""
print("\n" + "=" * 60)
print("处理额外检测项目(模板中没有的项目)")
print("=" * 60)
handler = ExtraItemsHandler(api_key)
# 1. 识别额外项目
extra_items = handler.identify_extra_items(extracted_items)
if not extra_items:
print(" 没有额外项目需要处理")
return doc_path
print(f"\n 额外项目列表:")
for item in extra_items:
print(f" - {item.get('abb', '')}: {item.get('project', '')} = {item.get('result', '')}")
# 2. 使用DeepSeek分类
print("\n 正在分类...")
classified_items = handler.classify_items_with_deepseek(extra_items)
if classified_items:
print(f"\n 分类结果:")
for module, items in classified_items.items():
print(f" [{module}]: {[item.get('abb', '') for item in items]}")
# 3. 生成临床意义
print("\n 正在生成临床意义...")
explanations = handler.generate_clinical_significance(extra_items)
# 4. 插入到文档
print("\n 正在插入表格...")
result_path = handler.insert_extra_items_to_doc(doc_path, classified_items, explanations)
print("\n" + "=" * 60)
print("额外项目处理完成")
print("=" * 60)
return result_path
if __name__ == "__main__":
# 测试
import os
api_key = os.getenv("DEEPSEEK_API_KEY", "")
# 加载提取数据
extracted_file = Path(__file__).parent / "extracted_medical_data.json"
if extracted_file.exists():
with open(extracted_file, 'r', encoding='utf-8') as f:
data = json.load(f)
items = data.get('items', data) if isinstance(data, dict) else data
handler = ExtraItemsHandler(api_key)
extra_items = handler.identify_extra_items(items)
print(f"\n识别到 {len(extra_items)} 个额外项目:")
for item in extra_items:
print(f" - {item.get('abb', '')}: {item.get('project', '')}")
if extra_items and api_key:
classified = handler.classify_items_with_deepseek(extra_items)
print(f"\n分类结果:")
for module, items in classified.items():
print(f" [{module}]: {[item.get('abb', '') for item in items]}")