Files
yiliao/backend/extract_and_fill_report.py

6490 lines
274 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
从医疗报告PDF中提取数据匹配模板结构填入Word模板
"""
import sys
import io
import os
# 修复Windows终端中文编码问题
if sys.platform == 'win32':
# 设置环境变量强制UTF-8
os.environ['PYTHONIOENCODING'] = 'utf-8'
# 设置控制台代码页为UTF-8
os.system('chcp 65001 >nul 2>&1')
# 重新配置stdout/stderr
if hasattr(sys.stdout, 'buffer'):
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
sys.stderr.reconfigure(encoding='utf-8', errors='replace')
import fitz
import json
import re
import time
import requests
import base64
from pathlib import Path
from docx import Document
from docx.shared import Pt, Cm, Inches
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.enum.table import WD_TABLE_ALIGNMENT
from docx.oxml.ns import qn
from docx.oxml import OxmlElement
from copy import deepcopy
from dotenv import load_dotenv
# 加载.env环境变量
load_dotenv(Path(__file__).parent / ".env")
# 导入优化版解析函数
from parse_medical_v2 import parse_medical_data_v2, clean_extracted_data_v2
def find_health_program_boundary(doc):
"""
动态查找"客户健康方案/Client Health Program"在文档中的位置
返回该元素在body.children中的索引作为保护边界
保护边界之前的所有内容(前四页)不应被修改
"""
body = doc.element.body
children = list(body)
for i, elem in enumerate(children):
# 获取元素的文本内容
text = ''.join(elem.itertext()).strip()
# 查找"客户健康方案"或"Client Health Program"
if '客户健康方案' in text or 'Client Health Program' in text:
print(f" [保护] 找到保护边界: 位置 {i}, 内容: {text[:50]}...")
# 返回 i+1这样保护区域包括 "Client Health Program" 本身
return i + 1
# 如果没找到返回默认值约80个元素对应前四页
print(f" [保护] 未找到'客户健康方案',使用默认边界: 80")
return 80
def find_examination_file_region(doc):
"""
查找"客户功能医学检测档案/Client Functional Medical Examination File"区域的位置
返回 (start_index, end_index) 元组,表示该区域的起始和结束位置
这个区域在尿液检测模块之前,包含客户信息和体检信息,需要保护不被删除
"""
body = doc.element.body
children = list(body)
start_idx = -1
end_idx = -1
for i, elem in enumerate(children):
text = ''.join(elem.itertext()).strip()
# 查找"客户功能医学检测档案"标题
if '功能医学检测档案' in text or 'Functional Medical Examination File' in text:
start_idx = i
print(f" [保护] 找到'客户功能医学检测档案'区域起始: 位置 {i}")
# 查找"尿液检测"标题作为结束边界
if start_idx >= 0 and ('尿液检测' in text or 'Urine Detection' in text):
end_idx = i
print(f" [保护] 找到'客户功能医学检测档案'区域结束: 位置 {i}")
break
if start_idx >= 0 and end_idx < 0:
# 如果找到了起始但没找到结束,使用起始位置+20作为结束
end_idx = start_idx + 20
print(f" [保护] 未找到结束边界,使用默认: {end_idx}")
return (start_idx, end_idx)
def copy_protected_region_from_template(template_path, output_path, boundary):
"""
从模板复制保护区域到输出文件(简化版)
策略:
1. 复制模板的前 boundary 个元素(前四页)
2. 从处理后文件中提取数据部分(从 Client Health Program 之后开始)
3. 不再额外复制"客户功能医学检测档案"区域已在步骤3-7中处理
"""
import zipfile
import shutil
from lxml import etree
import os
if boundary <= 0:
print(" [保护] 边界无效,跳过复制")
return
temp_output = str(output_path) + ".temp_output"
temp_result = str(output_path) + ".temp_result"
try:
shutil.copy(output_path, temp_output)
ns = {'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}
w_ns = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}'
with zipfile.ZipFile(template_path, 'r') as z:
template_xml = z.read('word/document.xml')
template_tree = etree.fromstring(template_xml)
template_body = template_tree.find('.//w:body', ns)
with zipfile.ZipFile(temp_output, 'r') as z:
output_xml = z.read('word/document.xml')
output_tree = etree.fromstring(output_xml)
output_body = output_tree.find('.//w:body', ns)
if template_body is None or output_body is None:
print(" [保护] 无法找到 body 元素")
return
template_children = list(template_body)
output_children = list(output_body)
print(f" [保护] 模板元素: {len(template_children)}, 处理后元素: {len(output_children)}")
# 在处理后文件中找到数据内容的起始位置
output_start = -1
for i, elem in enumerate(output_children):
text = ''.join(elem.itertext()).strip()
if 'Client Health Program' in text or '客户健康方案' in text:
output_start = i + 1
print(f" [保护] 找到 Client Health Program 位置: {i}")
break
if output_start < 0:
output_start = boundary
print(f" [保护] 使用默认起始位置: {output_start}")
else:
print(f" [保护] 数据起始位置: {output_start}")
# 清空模板body重新构建
for elem in list(template_body):
template_body.remove(elem)
# 读取原始模板
with zipfile.ZipFile(template_path, 'r') as z:
orig_template_xml = z.read('word/document.xml')
orig_template_tree = etree.fromstring(orig_template_xml)
orig_template_body = orig_template_tree.find('.//w:body', ns)
orig_template_children = list(orig_template_body)
# 1. 添加模板的前 boundary 个元素(前四页)
added_count = 0
for i in range(min(boundary, len(orig_template_children))):
elem = orig_template_children[i]
if elem.tag.endswith('}sectPr'):
continue
elem_copy = etree.fromstring(etree.tostring(elem))
template_body.append(elem_copy)
added_count += 1
print(f" [保护] 已添加模板前 {added_count} 个元素")
# 获取模板的 sectPr包含页脚引用
sectPr = None
for elem in orig_template_children:
if elem.tag.endswith('}sectPr'):
sectPr = etree.fromstring(etree.tostring(elem))
print(f" [保护] 使用模板的 sectPr包含页脚引用")
break
# 2. 添加处理后文件的数据内容部分
data_count = 0
for i in range(output_start, len(output_children)):
elem = output_children[i]
if elem.tag.endswith('}sectPr'):
continue
elem_copy = etree.fromstring(etree.tostring(elem))
template_body.append(elem_copy)
data_count += 1
print(f" [保护] 已添加 {data_count} 个数据元素")
# 3. 添加 sectPr 元素
if sectPr is not None:
template_body.append(sectPr)
print(f" [保护] 合并后总元素: {len(list(template_body))}")
# 保存修改后的 XML
new_xml = etree.tostring(template_tree, xml_declaration=True, encoding='UTF-8', standalone='yes')
with zipfile.ZipFile(template_path, 'r') as zin:
with zipfile.ZipFile(temp_result, 'w', zipfile.ZIP_DEFLATED) as zout:
for item in zin.infolist():
if item.filename == 'word/document.xml':
zout.writestr(item, new_xml)
else:
zout.writestr(item, zin.read(item.filename))
shutil.move(temp_result, output_path)
print(f" [保护] ✓ 前四页保护完成")
except Exception as e:
print(f" [保护] 复制失败: {e}")
import traceback
traceback.print_exc()
finally:
for f in [temp_output, temp_result]:
if os.path.exists(f):
try:
os.remove(f)
except:
pass
def fix_footer_reference(template_path, output_path):
"""
修复页脚引用,确保所有页面都有 Be.U Med logo
问题:在处理过程中,包含 sectPr 的段落可能被删除或修改,导致页脚引用丢失
解决:从模板复制第一个 sectPr 的 footerReference 到输出文件的 sectPr 中
"""
import zipfile
import shutil
from lxml import etree
import os
ns = {'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main',
'r': 'http://schemas.openxmlformats.org/officeDocument/2006/relationships'}
w_ns = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}'
r_ns = '{http://schemas.openxmlformats.org/officeDocument/2006/relationships}'
try:
# 读取模板的 document.xml
with zipfile.ZipFile(template_path, 'r') as z:
template_xml = z.read('word/document.xml')
template_tree = etree.fromstring(template_xml)
template_body = template_tree.find('.//w:body', ns)
# 找到模板中第一个有 footerReference 的 sectPr
template_sectPrs = template_body.findall('.//w:sectPr', ns)
footer_ref = None
header_refs = []
for sectPr in template_sectPrs:
for child in sectPr:
if 'footerReference' in child.tag:
footer_ref = etree.fromstring(etree.tostring(child))
print(f" [页脚] 找到模板页脚引用: {child.get(r_ns + 'id')}")
if 'headerReference' in child.tag:
header_refs.append(etree.fromstring(etree.tostring(child)))
if footer_ref is not None:
break
if footer_ref is None:
print(" [页脚] 模板中没有找到页脚引用,跳过")
return
# 读取输出文件的 document.xml
with zipfile.ZipFile(output_path, 'r') as z:
output_xml = z.read('word/document.xml')
output_tree = etree.fromstring(output_xml)
output_body = output_tree.find('.//w:body', ns)
# 找到输出文件中的 sectPr通常在 body 的最后)
output_sectPr = None
for elem in reversed(list(output_body)):
if elem.tag.endswith('}sectPr'):
output_sectPr = elem
break
if output_sectPr is None:
print(" [页脚] 输出文件中没有找到 sectPr跳过")
return
# 检查是否已经有 footerReference
has_footer = False
for child in output_sectPr:
if 'footerReference' in child.tag:
has_footer = True
break
if has_footer:
print(" [页脚] 输出文件已有页脚引用,跳过")
return
# 在 sectPr 的开头插入 headerReference 和 footerReference
# 顺序很重要headerReference 在前footerReference 在后
insert_pos = 0
for header_ref in header_refs:
output_sectPr.insert(insert_pos, header_ref)
insert_pos += 1
output_sectPr.insert(insert_pos, footer_ref)
print(f" [页脚] 已添加页脚引用到输出文件")
# 保存修改后的 XML
new_xml = etree.tostring(output_tree, xml_declaration=True, encoding='UTF-8', standalone='yes')
# 更新输出文件
temp_result = str(output_path) + '.temp_footer.docx'
with zipfile.ZipFile(output_path, 'r') as zin:
with zipfile.ZipFile(temp_result, 'w', zipfile.ZIP_DEFLATED) as zout:
for item in zin.infolist():
if item.filename == 'word/document.xml':
zout.writestr(item, new_xml)
else:
zout.writestr(item, zin.read(item.filename))
# 替换输出文件
shutil.move(temp_result, output_path)
print(f" [页脚] ✓ 页脚修复完成")
except Exception as e:
print(f" [页脚] 修复失败: {e}")
import traceback
traceback.print_exc()
def backup_protected_region(doc):
"""
备份保护区域的所有XML元素深拷贝
返回:(边界位置, 备份的元素列表)
重要备份的是XML元素的深拷贝可以在文档修改后恢复
"""
boundary = find_health_program_boundary(doc)
if boundary <= 0:
print(f" [保护] 未找到保护边界,跳过备份")
return -1, []
body = doc.element.body
children = list(body)
backup = []
for i in range(boundary):
backup.append(deepcopy(children[i]))
print(f" [保护] 已备份保护区域boundary={boundary}, backup_len={len(backup)}")
return boundary, backup
def restore_protected_region(doc, boundary, backup):
"""
恢复保护区域的所有XML元素
重要:这个函数会完全替换文档开头的元素,确保保护区域完全恢复
使用深拷贝确保元素可以正确插入到新文档中
"""
if boundary <= 0 or not backup:
print(f" [保护] 跳过恢复boundary={boundary}, backup_len={len(backup) if backup else 0}")
return
body = doc.element.body
children = list(body)
print(f" [保护] 开始恢复保护区域boundary={boundary}, backup_len={len(backup)}, current_children={len(children)}")
# 删除当前保护区域的所有元素(从后往前删除,避免索引变化问题)
elements_to_remove = children[:min(boundary, len(children))]
for elem in reversed(elements_to_remove):
try:
body.remove(elem)
except Exception as e:
print(f" [保护] 删除元素失败: {e}")
# 在开头插入备份的元素从后往前插入到位置0这样顺序正确
# 使用深拷贝确保元素可以正确插入到新文档中
for elem in reversed(backup):
try:
elem_copy = deepcopy(elem)
body.insert(0, elem_copy)
except Exception as e:
print(f" [保护] 插入元素失败: {e}")
print(f" [保护] 恢复完成当前children数量: {len(list(body))}")
def set_cell_border(cell, **kwargs):
"""设置单元格边框"""
tc = cell._tc
tcPr = tc.get_or_add_tcPr()
tcBorders = OxmlElement('w:tcBorders')
for edge in ['top', 'left', 'bottom', 'right']:
if edge in kwargs:
element = OxmlElement(f'w:{edge}')
element.set(qn('w:val'), kwargs[edge].get('val', 'single'))
element.set(qn('w:sz'), str(kwargs[edge].get('sz', 4)))
element.set(qn('w:color'), kwargs[edge].get('color', '000000'))
tcBorders.append(element)
tcPr.append(tcBorders)
# 配对项目定义 - 这些项目应该在同一个表格中显示(两行数据,共享临床意义)
# 格式: 基础项 -> (配对项, 基础项中文名, 配对项中文名)
PAIRED_ITEMS = {
'NEUT': ('NEUT%', '中性粒细胞数量', '中性粒细胞百分含量'),
'EOS': ('EOS%', '嗜酸细胞数量', '嗜酸细胞百分含量'),
'BAS': ('BAS%', '嗜碱细胞数量', '嗜碱细胞百分含量'),
'LYMPH': ('LYMPH%', '淋巴细胞数量', '淋巴细胞百分含量'),
'MONO': ('MONO%', '单核细胞数量', '单核细胞百分含量'),
'TOTAL RBC': ('RBC COUNT', '红细胞总数', '红细胞计数'),
}
# 反向映射 - 百分比项 -> 基础项
PAIRED_ITEMS_REVERSE = {v[0]: k for k, v in PAIRED_ITEMS.items()}
# 所有配对项目的ABB集合用于跳过单独处理
ALL_PAIRED_ABBS = set(PAIRED_ITEMS.keys()) | set(PAIRED_ITEMS_REVERSE.keys())
def get_paired_item(abb):
"""
获取配对项目信息
返回: (paired_abb, is_base, base_cn, percent_cn)
如果没有配对项目,返回 (None, None, None, None)
"""
abb_upper = abb.upper().strip()
# 检查是否是基础项
if abb_upper in PAIRED_ITEMS:
percent_abb, base_cn, percent_cn = PAIRED_ITEMS[abb_upper]
return (percent_abb, True, base_cn, percent_cn)
# 检查是否是百分比项
if abb_upper in PAIRED_ITEMS_REVERSE:
base_abb = PAIRED_ITEMS_REVERSE[abb_upper]
_, base_cn, percent_cn = PAIRED_ITEMS[base_abb]
return (base_abb, False, base_cn, percent_cn)
return (None, None, None, None)
def is_paired_item(abb):
"""检查是否是配对项目(基础项或百分比项)"""
return abb.upper().strip() in ALL_PAIRED_ABBS
def is_paired_base_item(abb):
"""检查是否是配对项目的基础项如NEUT, EOS等"""
return abb.upper().strip() in PAIRED_ITEMS
def is_paired_percent_item(abb):
"""检查是否是配对项目的百分比项如NEUT%, EOS%等)"""
return abb.upper().strip() in PAIRED_ITEMS_REVERSE
def clean_reference_range(reference: str) -> str:
"""
清理参考范围格式:
1. 去掉括号
2. 将 <X 转换为 0-X
3. 将 ≤X 转换为 0-X
例如:
- "(3.5-5.5)" -> "3.5-5.5"
- "<0.2" -> "0-0.2"
- "≤10" -> "0-10"
- "(阴性)" -> "阴性"
"""
import re
if not reference:
return reference
ref = reference.strip()
# 去掉各种括号
if ref.startswith('(') and ref.endswith(')'):
ref = ref[1:-1]
elif ref.startswith('') and ref.endswith(''):
ref = ref[1:-1]
elif ref.startswith('[') and ref.endswith(']'):
ref = ref[1:-1]
# 处理只有括号开头的情况
if ref.startswith('('):
ref = ref[1:]
if ref.endswith(')'):
ref = ref[:-1]
if ref.startswith(''):
ref = ref[1:]
if ref.endswith(''):
ref = ref[:-1]
ref = ref.strip()
# 将 <X 或 ≤X 转换为 0-X 格式
# 匹配 <数字 或 ≤数字 或 <=数字
match = re.match(r'^[<≤]\s*([\d\.]+)\s*$', ref)
if match:
upper_value = match.group(1)
ref = f"0-{upper_value}"
# 匹配 <=数字
match = re.match(r'^<=\s*([\d\.]+)\s*$', ref)
if match:
upper_value = match.group(1)
ref = f"0-{upper_value}"
return ref.strip()
def create_medical_item_table(doc, abb, project_name, result, clinical_en, clinical_cn, include_header=False):
"""
创建单个医疗检测项目表格(完全复刻版)
格式include_header=True时
Row 0: Empty (Small height) - Top Solid Border
Row 1: Header (Abb简称 | Project项目 | Result结果 | Point指示 | Refer参考 | Unit单位)
Row 2: ABB | Name | Result | Point | Refer | Unit - Dashed Borders
Row 3: Clinical Significance (Merged) - Dashed Borders
"""
# 创建表格(根据是否需要表头决定行数)
num_rows = 4 if include_header else 3
table = doc.add_table(rows=num_rows, cols=6)
table.alignment = WD_TABLE_ALIGNMENT.CENTER
table.autofit = False
# 设置列宽
widths = [Cm(2.5), Cm(3.5), Cm(2.5), Cm(2.5), Cm(2.5), Cm(2.5)]
for row in table.rows:
for idx, width in enumerate(widths):
row.cells[idx].width = width
# 定义字体样式函数
def set_font(run, bold=False, font_size=10.5):
run.bold = bold
run.font.name = 'Times New Roman'
run.font.size = Pt(font_size)
run._element.rPr.rFonts.set(qn('w:eastAsia'), '宋体')
# 定义临床意义字体样式函数华文楷体11号字
def set_clinical_font(run, bold=False):
run.bold = bold
run.font.name = '华文楷体'
run.font.size = Pt(11)
run._element.rPr.rFonts.set(qn('w:eastAsia'), '华文楷体')
# === Row 0: 空行 ===
row0 = table.rows[0]
row0.height = Cm(0.05) # 极小高度
row0.height_rule = 1 # WD_ROW_HEIGHT_RULE.EXACT (固定高度)
for cell in row0.cells:
cell.text = ''
p = cell.paragraphs[0]
p.paragraph_format.space_before = 0
p.paragraph_format.space_after = 0
p.paragraph_format.line_spacing = 0
run = p.add_run()
run.font.size = Pt(1)
# 确定数据行和解释行的索引
data_row_idx = 2 if include_header else 1
sig_row_idx = 3 if include_header else 2
# === 表头行(可选)===
if include_header:
header_row = table.rows[1]
headers = [
('Abb', '简称'), ('Project', '项目'), ('Result', '结果'),
('Point', '提示'), ('Refer', '参考'), ('Unit', '单位')
]
for idx, (en, cn) in enumerate(headers):
p = header_row.cells[idx].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(f'{en}\n{cn}')
set_font(run, bold=True, font_size=9)
# === 数据行 ===
data_row = table.rows[data_row_idx]
# 1. ABB
p = data_row.cells[0].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(abb)
set_font(run, bold=True)
# 2. 项目名
p = data_row.cells[1].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(project_name)
set_font(run, bold=True)
# 3. 结果
p = data_row.cells[2].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(str(result))
set_font(run)
# 4-6. Point, Refer, Unit (空)
for idx in [3, 4, 5]:
p = data_row.cells[idx].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
# === 临床意义行 ===
sig_row = table.rows[sig_row_idx]
top_cell = sig_row.cells[0]
for i in range(1, 6):
top_cell.merge(sig_row.cells[i])
# 第一个段落:英文临床意义
p = top_cell.paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.LEFT
run = p.add_run('Clinical Significance: ')
set_clinical_font(run, bold=True)
run = p.add_run(clinical_en)
set_clinical_font(run)
# 第二个段落:中文临床意义(独立段落,与案例文件格式一致)
p_cn = top_cell.add_paragraph()
p_cn.alignment = WD_ALIGN_PARAGRAPH.LEFT
run = p_cn.add_run('临床意义:')
set_clinical_font(run, bold=True)
run = p_cn.add_run(clinical_cn)
set_clinical_font(run)
# === 设置边框 ===
# 顶部实线 (黑色)
border_solid = {'val': 'single', 'sz': 4, 'color': '000000', 'space': 0}
# 其他虚线 (灰色)
border_dashed = {'val': 'dashed', 'sz': 4, 'color': 'AAAAAA', 'space': 0}
for i, row in enumerate(table.rows):
for cell in row.cells:
# 默认四周都是虚线
top = border_dashed
bottom = border_dashed
left = border_dashed
right = border_dashed
# 第一行顶部设置为实线
if i == 0:
top = border_solid
# 应用边框
set_cell_border(cell, top=top, bottom=bottom, left=left, right=right)
# 垂直居中
cell.vertical_alignment = 1
# 添加分隔
doc.add_paragraph()
return table
# 百度OCR配置 - 高精度版
APP_ID = '121295102'
API_KEY = '8cT0hIWTLPubtwT3Qils9q00'
SECRET_KEY = 'PPPUH7RwkuyijLqwzzoaWlXohUvm3pZs'
# 获取access_token带重试机制
def get_access_token(max_retries: int = 3):
"""获取百度OCR的access_token支持网络失败重试"""
url = "https://aip.baidubce.com/oauth/2.0/token"
params = {
"grant_type": "client_credentials",
"client_id": API_KEY,
"client_secret": SECRET_KEY
}
for retry in range(max_retries):
try:
response = requests.post(url, params=params, timeout=30)
result = response.json()
token = result.get('access_token')
if token:
return token
else:
print(f" ⚠️ 获取token失败: {result.get('error', 'unknown error')}")
if retry < max_retries - 1:
time.sleep(2 * (retry + 1))
except requests.exceptions.Timeout:
print(f" ⚠️ 获取token超时{retry+1}/{max_retries} 次重试...")
if retry < max_retries - 1:
time.sleep(2 * (retry + 1))
except requests.exceptions.ConnectionError:
print(f" ⚠️ 获取token连接失败{retry+1}/{max_retries} 次重试...")
if retry < max_retries - 1:
time.sleep(3 * (retry + 1))
except Exception as e:
print(f" ⚠️ 获取token异常: {e}{retry+1}/{max_retries} 次重试...")
if retry < max_retries - 1:
time.sleep(2 * (retry + 1))
return None
ACCESS_TOKEN = None # 每次运行重新获取
def extract_pdf_with_position(pdf_path: str, max_retries: int = 3) -> list:
"""使用百度OCR高精度+位置版提取PDF返回带位置信息的结果
Args:
pdf_path: PDF文件路径
max_retries: 每页最大重试次数(网络失败时)
"""
global ACCESS_TOKEN
if not ACCESS_TOKEN:
ACCESS_TOKEN = get_access_token()
if not ACCESS_TOKEN:
print(" ❌ 获取access_token失败")
return []
doc = fitz.open(pdf_path)
all_items = [] # 带位置的文本块
failed_pages = [] # 记录失败的页面
url = f"https://aip.baidubce.com/rest/2.0/ocr/v1/accurate?access_token={ACCESS_TOKEN}"
print(f" PDF共 {len(doc)}")
def ocr_single_page(page_idx, retry_count=0):
"""OCR单页支持重试"""
page = doc[page_idx]
pix = page.get_pixmap(dpi=150)
img_data = pix.tobytes('png')
try:
img_base64 = base64.b64encode(img_data).decode()
data = {"image": img_base64}
response = requests.post(url, data=data, timeout=30)
result = response.json()
if 'words_result' in result:
page_items = []
for item in result['words_result']:
page_items.append({
'text': item['words'],
'location': item.get('location', {}),
'page': page_idx + 1
})
print(f"{page_idx+1} 页: {len(result['words_result'])}")
return page_items, True
elif 'error_code' in result:
error_code = result['error_code']
error_msg = result.get('error_msg', '')
# 网络相关错误码,需要重试
network_errors = [18, 19, 100, 110, 111, 282000, 282003, 282004]
if error_code in network_errors and retry_count < max_retries:
print(f"{page_idx+1} 页网络错误 ({error_code}){retry_count+1}/{max_retries} 次重试...")
time.sleep(2 * (retry_count + 1)) # 递增等待时间
return ocr_single_page(page_idx, retry_count + 1)
else:
print(f"{page_idx+1} 页错误: {error_code} - {error_msg}")
return [], False
else:
print(f"{page_idx+1} 页: 未知响应格式")
return [], False
except requests.exceptions.Timeout:
if retry_count < max_retries:
print(f"{page_idx+1} 页超时,{retry_count+1}/{max_retries} 次重试...")
time.sleep(2 * (retry_count + 1))
return ocr_single_page(page_idx, retry_count + 1)
else:
print(f"{page_idx+1} 页超时,已达最大重试次数")
return [], False
except requests.exceptions.ConnectionError:
if retry_count < max_retries:
print(f"{page_idx+1} 页连接失败,{retry_count+1}/{max_retries} 次重试...")
time.sleep(3 * (retry_count + 1))
return ocr_single_page(page_idx, retry_count + 1)
else:
print(f"{page_idx+1} 页连接失败,已达最大重试次数")
return [], False
except Exception as e:
if retry_count < max_retries:
print(f"{page_idx+1} 页异常 ({e}){retry_count+1}/{max_retries} 次重试...")
time.sleep(2 * (retry_count + 1))
return ocr_single_page(page_idx, retry_count + 1)
else:
print(f"{page_idx+1} 页异常: {e}")
return [], False
# 第一轮:处理所有页面
for page_idx in range(len(doc)):
page_items, success = ocr_single_page(page_idx)
if success:
all_items.extend(page_items)
else:
failed_pages.append(page_idx)
time.sleep(0.3)
# 第二轮:重试失败的页面
if failed_pages:
print(f"\n ⚠️ {len(failed_pages)} 页提取失败,进行第二轮重试...")
time.sleep(5) # 等待一段时间后重试
still_failed = []
for page_idx in failed_pages:
print(f" 重试第 {page_idx+1} 页...")
page_items, success = ocr_single_page(page_idx)
if success:
all_items.extend(page_items)
else:
still_failed.append(page_idx + 1) # 转为1-based页码
time.sleep(1)
if still_failed:
print(f"\n ❌ 以下页面提取失败(可能需要手动检查): {still_failed}")
else:
print(f" ✓ 所有失败页面重试成功")
doc.close()
return all_items
def group_by_rows(items: list, y_threshold: int = 15) -> list:
"""按Y坐标分组识别同一行的数据"""
if not items:
return []
# 按页和Y坐标排序
sorted_items = sorted(items, key=lambda x: (x['page'], x['location'].get('top', 0)))
rows = []
current_row = []
last_page = -1
last_top = -100
for item in sorted_items:
page = item['page']
top = item['location'].get('top', 0)
# 换页或Y坐标差距大于阈值开始新行
if page != last_page or abs(top - last_top) > y_threshold:
if current_row:
# 按X坐标排序同一行的数据
current_row.sort(key=lambda x: x['location'].get('left', 0))
rows.append(current_row)
current_row = [item]
last_page = page
last_top = top
else:
current_row.append(item)
if current_row:
current_row.sort(key=lambda x: x['location'].get('left', 0))
rows.append(current_row)
return rows
def extract_pdf_text(pdf_path: str) -> str:
"""兼容旧接口 - 返回纯文本"""
items = extract_pdf_with_position(pdf_path)
rows = group_by_rows(items)
lines = []
for row in rows:
line = " ".join([item['text'] for item in row])
lines.append(line)
return "\n".join(lines)
def extract_patient_info(ocr_text: str) -> dict:
"""
从OCR文本中提取患者基本信息
提取字段:
- name: 姓名
- gender: 性别Male→男性, Female→女性
- age: 年龄(提取数字部分)
- nation: 国籍(默认"中国"OCR中通常没有
- exam_time: 体检时间Collected Date
- project: 体检项目(功能医学检测套餐)
- report_time: 报告时间(使用当前时间)
Returns:
dict: 包含患者基本信息的字典
"""
from datetime import datetime
info = {
'name': '',
'gender': '',
'age': '',
'nation': '中国', # 默认值
'exam_time': '',
'project': '功能医学检测套餐', # 固定值
'report_time': datetime.now().strftime('%Y-%m-%d') # 当前时间
}
lines = ocr_text.split('\n')
# ---------- 中文体检报告格式检测 ----------
# 格式: "姓名 姚友胜 性别男 体检单号1125041700091 年龄59"
for line in lines[:20]:
if '姓名' in line and ('性别' in line or '年龄' in line):
# 提取姓名
name_m = re.search(r'姓名\s*(\S+)', line)
if name_m:
raw = name_m.group(1)
# 去掉姓名后面粘连的 "性别" 等
raw = re.split(r'性别|年龄|体检', raw)[0]
if raw:
info['name'] = raw
# 提取性别
gender_m = re.search(r'性别\s*(男|女)', line)
if gender_m:
info['gender'] = '男性' if gender_m.group(1) == '' else '女性'
# 提取年龄
age_m = re.search(r'年龄\s*(\d+)', line)
if age_m:
info['age'] = age_m.group(1)
# 提取体检单号中的日期 (格式: 1125041700091 -> 前缀(11)+年(25)+月(04)+日(17)+序号)
id_m = re.search(r'体检单号\s*(\d+)', line)
if id_m:
id_str = id_m.group(1)
if len(id_str) >= 8:
yy = id_str[2:4]
mm = id_str[4:6]
dd = id_str[6:8]
try:
y, m, d = int(yy), int(mm), int(dd)
if 1 <= m <= 12 and 1 <= d <= 31:
info['exam_time'] = f'20{yy}-{mm}-{dd}'
except (ValueError, TypeError):
pass
break # 找到中文患者行后不再继续
# ---------- 中文报告体检日期补充 ----------
for line in lines[:50]:
if '检查日期' in line and not info['exam_time']:
date_m = re.search(r'(\d{4}[-/]\d{1,2}[-/]\d{1,2})', line)
if date_m:
info['exam_time'] = date_m.group(1)
# ---------- 英文报告格式 ----------
for line in lines:
line_lower = line.lower().strip()
# 提取姓名 - Patient Name: MR. SHUNHU YU 或 Patient Name: MS. XXX
if 'patient name' in line_lower:
# 匹配 "Patient Name: XXX" 或 "Patient Name XXX"
match = re.search(r'patient\s*name\s*[:\]\s*(.+)', line, re.IGNORECASE)
if match:
name = match.group(1).strip()
# 去掉 MR. / MS. / MRS. 等称谓
name = re.sub(r'^(MR\.|MS\.|MRS\.|MISS\.?)\s*', '', name, flags=re.IGNORECASE)
info['name'] = name.strip()
# 提取性别 - Sex : Male 或 Sex Female
if 'sex' in line_lower and ('male' in line_lower or 'female' in line_lower):
if 'female' in line_lower:
info['gender'] = '女性'
elif 'male' in line_lower:
info['gender'] = '男性'
# 提取年龄 - Age : 57Y6M17D 或 Age 35
if 'age' in line_lower and ':' in line or '' in line:
match = re.search(r'age\s*[:\]\s*(\d+)', line, re.IGNORECASE)
if match:
info['age'] = match.group(1)
# 提取体检时间 - Collected Date/Time: 20 Dec 2025 或 Collected Date : 2025-07-20
if 'collected' in line_lower and ('date' in line_lower or 'time' in line_lower):
# 匹配日期格式20 Dec 2025 或 2025-07-20 或 2025/07/20
match = re.search(r'collected\s*(?:date)?(?:/time)?\s*[:\]\s*(.+?)(?:\s+\d{1,2}[:\]\d{2})?$', line, re.IGNORECASE)
if match:
date_str = match.group(1).strip()
# 尝试解析不同的日期格式
try:
# 格式: 20 Dec 2025
parsed = datetime.strptime(date_str, '%d %b %Y')
info['exam_time'] = parsed.strftime('%Y-%m-%d')
except:
try:
# 格式: 2025-07-20
parsed = datetime.strptime(date_str, '%Y-%m-%d')
info['exam_time'] = parsed.strftime('%Y-%m-%d')
except:
try:
# 格式: 2025/07/20
parsed = datetime.strptime(date_str, '%Y/%m/%d')
info['exam_time'] = parsed.strftime('%Y-%m-%d')
except:
# 保留原始格式
info['exam_time'] = date_str
return info
def fill_patient_info_in_template(doc, patient_info: dict):
"""
在Word模板中填充患者基本信息
模板中有两处需要填充:
1. 第一处约段落83-94可能有示例数据需要替换
2. 第二处约段落263-274空白占位符需要填充
使用固定格式确保 / 符号对齐(所有 / 在同一列)
Args:
doc: python-docx Document对象
patient_info: 患者信息字典
"""
# 定义字段前缀(使用固定宽度格式确保 / 对齐)
# 英文部分用空格填充到相同宽度,确保 / 在同一列
# 最长的英文是 "Project"7字符统一填充到7字符
field_formats = {
'Name': ('Name / 姓名 ', patient_info.get('name', '')),
'Gender': ('Gender / 性别 ', patient_info.get('gender', '')),
'Age': ('Age / 年龄 ', patient_info.get('age', '')),
'Nation': ('Nation / 国籍 ', patient_info.get('nation', '')),
'Time / 体检': ('Time / 体检时间 ', patient_info.get('exam_time', '')),
'Project': ('Project / 体检项目 ', patient_info.get('project', '')),
'Time / 报告': ('Time / 报告时间 ', patient_info.get('report_time', '')),
}
filled_count = 0
for para in doc.paragraphs:
text = para.text.strip()
# 检查每个字段
for field_key, (field_format, value) in field_formats.items():
# 检查段落是否包含该字段的关键词
if field_key in text:
# 只有当值不为空时才替换
if value:
# 清空段落内容
for run in para.runs:
run.text = ''
# 添加新内容(使用固定格式)
new_text = field_format + value
if para.runs:
para.runs[0].text = new_text
else:
para.add_run(new_text)
filled_count += 1
print(f" ✓ 填充: {field_format}{value}")
break # 一个段落只匹配一个字段
print(f" 共填充 {filled_count} 个患者信息字段")
return filled_count
def parse_medical_data(text: str, source_file: str) -> list:
"""从OCR文本中解析医疗检测数据 - OCR每个字段分行"""
items = []
lines = [l.strip() for l in text.split('\n') if l.strip()]
# 项目名称到ABB的映射 - 注意优先级:更具体的放前面
name_to_abb = {
# 血常规 - 按优先级排序,更具体的放前面
'mean cell hb concentration': 'MCHC', 'mchc': 'MCHC', # 必须在 hemoglobin 前
'follicle stimulating': 'FSH', 'fsh': 'FSH', 'folicle stimulating': 'FSH', # 必须在 hemoglobin 前
'mean corpuscular hemoglobin concentration': 'MCHC',
'mean corpuscular hemoglobin': 'MCH',
'rbc distribution width': 'RDW', 'rdw': 'RDW', # 必须在 rbc 前
'red cell distribution width': 'RDW',
'total wbc': 'WBC', 'white blood cell': 'WBC', 'wbc': 'WBC',
'red blood cell': 'RBC', 'rbc count': 'RBC', 'total rbc': 'RBC',
'hemoglobin(hb)': 'Hb', 'hemoglobin': 'Hb', # 注意:不要用 'hb' 作为key会匹配到其他项
'hematocrit': 'HCT', 'hct': 'HCT',
'mean cell volume': 'MCV', 'mcv': 'MCV', 'mean corpuscular volume': 'MCV',
'platelet count': 'PLT', 'platelet': 'PLT', 'plt': 'PLT',
'mean platelet volume': 'MPV', 'mpv': 'MPV',
'neutrophil': 'NEUT', 'neut': 'NEUT',
'lymphocyte': 'LYMPH', 'lymph': 'LYMPH',
'monocyte': 'MONO', 'mono': 'MONO',
'eosinophil': 'EOS', 'eos': 'EOS',
'basophil': 'BAS', 'bas': 'BAS',
'esr': 'ESR', 'erythrocyte sedimentation': 'ESR',
'glucose(fasting)': 'FPG', 'fasting glucose': 'FPG', 'glucose': 'GLU', 'glu': 'GLU',
'hba1c': 'HbA1c', 'glycated hemoglobin': 'HbA1c', 'haemoglobin a1c': 'HbA1c', 'haemoglobin alc': 'HbA1c', 'hemoglobin a1c': 'HbA1c',
# 血脂 - HDL必须在cholesterol前面否则会被匹配为TC
'hdl-cholesterol': 'HDL', 'hdl cholesterol': 'HDL', 'hdl': 'HDL',
'ldl-cholesterol': 'LDL', 'ldl cholesterol': 'LDL', 'ldl direct': 'LDL', 'ldl': 'LDL',
'vldl-cholesterol': 'VLDL', 'vldl': 'VLDL',
'total cholesterol': 'TC', 'cholesterol': 'TC', # 放在HDL/LDL后面
'triglyceride': 'TG', 'tg': 'TG',
'alt': 'ALT', 'sgpt': 'ALT', 'alanine aminotransferase': 'ALT',
'ast': 'AST', 'sgot': 'AST', 'aspartate aminotransferase': 'AST',
'gamma glutamyl transferase': 'GGT', 'gamma gt': 'GGT', 'gamma-gt': 'GGT', 'ggt': 'GGT', 'ggt(': 'GGT',
'alp': 'ALP', 'alkaline phosphatase': 'ALP',
'total bilirubin': 'TBIL', 'bilirubin total': 'TBIL', 'bilirubin(total)': 'TBIL',
'direct bilirubin': 'DBIL', 'bilirubin(direct)': 'DBIL', 'bilirubin direct': 'DBIL',
'ldh': 'LDH', 'lactate dehydrogenase': 'LDH',
'inr': 'INR',
'beta crosslap': 'CTX', 'beta-crosslap': 'CTX',
'anion gap': 'AG',
'estimated average glucose': 'EAG',
'total protein': 'TP',
'albumin': 'ALB', 'alb': 'ALB',
'globulin': 'GLB',
'bun': 'BUN', 'urea nitrogen': 'BUN', 'blood urea nitrogen': 'BUN',
'carcinoembryonic': 'CEA', 'cea': 'CEA', 'carcinoembryonic antigen': 'CEA',
'uric acid': 'UA', 'uricacid': 'UA', 'ua': 'UA', 'uric acid.': 'UA',
'egfr': 'eGFR',
'tsh': 'TSH', 'thyroid stimulating': 'TSH',
'ft3': 'FT3', 'free t3': 'FT3',
'ft4': 'FT4', 'free t4': 'FT4',
't3': 'T3', 't4': 'T4',
'estrogen': 'E2', 'estradiol': 'E2', 'estradiol(e2)': 'E2',
'progesterone': 'PROG',
'testosterone': 'TESTO',
'fsh': 'FSH', 'lh': 'LH',
'cortisol': 'Cortisol',
'igf-1': 'IGF-1', 'igf1': 'IGF-1',
'dhea': 'DHEA', 'dhea-s': 'DHEA-S',
'prolactin': 'PRL',
'afp': 'AFP', 'alpha fetoprotein': 'AFP',
'cea': 'CEA',
'ca125': 'CA125', 'ca 125': 'CA125',
'ca153': 'CA153', 'ca 15-3': 'CA153', 'carbohydrate antigen 15-3': 'CA153', 'carbohydrate antigen 15': 'CA153',
'ca199': 'CA199', 'ca 19-9': 'CA199', 'carbohydrate antigen 19-9': 'CA199', 'carbohydrate antigen 19': 'CA199',
'psa': 'PSA',
'hepatitis b surface antigen': 'HBsAg', 'hbsag': 'HBsAg', 'hbs ag': 'HBsAg',
'hepatitis b surface antibody': 'HBsAb', 'hbsab': 'HBsAb', 'anti-hbs': 'HBsAb', 'hbs ab': 'HBsAb',
'hepatitis be antigen': 'HBeAg', 'hbeag': 'HBeAg', 'hbe ag': 'HBeAg',
'hepatitis be antibody': 'HBeAb', 'hbeab': 'HBeAb', 'hbe ab': 'HBeAb',
# 尿检项目
'ph': 'PH', 'acidity': 'PH',
'specific gravity': 'SG', 'sp gravity': 'SG',
'transparency': 'Clarity', 'clear': 'Clarity',
'glucose': 'GLU', 'glu': 'GLU',
'ketone': 'KET', 'ket': 'KET', 'ketones': 'KET',
'bilirubin': 'BIL', 'bil': 'BIL',
'urobilinogen': 'URO', 'uro': 'URO',
'nitrite': 'NIT', 'nit': 'NIT',
'leukocyte': 'LEU', 'leu': 'LEU', 'leucocyte': 'LEU',
'erythrocyte': 'ERY', 'ery': 'ERY',
'color': 'Color', 'colour': 'Color',
'clarity': 'Clarity', 'turbidity': 'Clarity', 'appearance': 'Clarity',
'bacteria': 'BAC', 'bact': 'BAC',
'mucus': 'MUC',
'yeast': 'Yeast',
'crystal': 'CRY',
'hepatitis b core antibody': 'HBcAb', 'hbcab': 'HBcAb', 'anti-hbc': 'HBcAb', 'hbc ab': 'HBcAb',
'hepatitis c antibody': 'Anti-HCV', 'anti-hcv': 'Anti-HCV', 'hcv ab': 'Anti-HCV',
'hiv': 'HIV',
'h.pylori': 'H.pylori IgG', 'h. pylori': 'H.pylori IgG', 'helicobacter': 'H.pylori IgG',
'calcium': 'Ca', # 移除 'ca' 避免误匹配 clinical, context等
'phosphorus': 'P', 'phosphate': 'P',
'iron': 'Fe', 'serum iron': 'Fe',
'ferritin': 'Ferritin',
'zinc': 'Zn', 'zn': 'Zn',
'copper': 'Cu', 'cu': 'Cu',
'magnesium': 'Mg', 'mg': 'Mg',
'vitamin b12': 'VitB12', 'vit b12': 'VitB12', 'b12': 'VitB12',
'folate': 'Folate', 'folic acid': 'Folate',
'vitamin d': '25-OH-VitD', '25-oh vitamin d': '25-OH-VitD', '25-hydroxy': '25-OH-VitD', 'vitamin d total': '25-OH-VitD',
'crp': 'CRP', 'c-reactive protein': 'CRP',
'hs-crp': 'hs-CRP',
'rf': 'RF', 'rheumatoid factor': 'RF',
'ana': 'ANA', 'antinuclear antibody': 'ANA',
'immunoglobulin g': 'IgG', 'immunoglobulin a': 'IgA', 'immunoglobulin m': 'IgM', 'immunoglobulin e': 'IgE',
'igg': 'IgG', 'iga': 'IgA', 'igm': 'IgM', 'ige': 'IgE',
'c3': 'C3', 'c4': 'C4',
'nk cell': 'NK', 'cd16': 'NK', 'cd56': 'NK',
'osteocalcin': 'OSTE',
'p1np': 'P1NP',
'ctx': 'CTX',
'pth': 'PTH',
'color': 'Color', 'colour': 'Color',
'abo group': 'ABO', 'abo blood group': 'ABO',
'rh group': 'Rh', 'rh blood group': 'Rh',
'ph': 'pH',
'specific gravity': 'SG', 'sp gravity': 'SG', 'sg': 'SG',
'lipoprotein(a)': 'LP(A)', 'lipoprotein a': 'LP(A)',
'apolipoprotein a1': 'APOA1', 'apolipoprotein a': 'APOA1',
'apolipoprotein b': 'APOB',
'protein': 'PRO',
'ketone': 'KET', 'ket': 'KET',
'nitrite': 'NIT', 'nit': 'NIT',
'bilirubin': 'BIL',
'urobilinogen': 'URO',
'leukocyte': 'LEU',
# 凝血功能
'prothrombin time': 'PT', 'pt': 'PT', 'prothrombin time(pt)': 'PT',
'thrombin time': 'TT', 'tt': 'TT', 'thrombin time(tt)': 'TT',
'fibrinogen': 'FIB', 'fibrinogen level': 'FIB',
'd-dimer': 'D-Dimer', 'fdp d-dimer': 'D-Dimer',
'aptt': 'APTT', 'activated partial thromboplastin': 'APTT',
# 电解质
'sodium': 'Na', 'na': 'Na',
'potassium': 'K', 'k': 'K',
'chloride': 'Cl', 'cl': 'Cl',
'tco2': 'TCO2', 'co2': 'TCO2',
# 同型半胱氨酸
'homocysteine': 'HCY', 'hcy': 'HCY',
# 重金属
'lead': 'Pb', 'lead in blood': 'Pb',
'chromium': 'Cr', 'chromium in blood': 'Cr',
'manganese': 'Mn', 'manganese in blood': 'Mn',
'nickel': 'Ni', 'nickel in blood': 'Ni',
# 肿瘤标志物
'nse': 'NSE', 'neuron specific enolase': 'NSE',
'cyfra': 'CYFRA21-1', 'cyfra 21-1': 'CYFRA21-1',
# 血脂比值
'cholesterol/hdl-c ratio': 'TC/HDL', 'cholesterol/hdl ratio': 'TC/HDL', 'tc/hdl': 'TC/HDL',
'ldl/hdl ratio': 'LDL/HDL', 'ldl/hdl': 'LDL/HDL',
# 心肌酶
'ck-mb': 'CK-MB', 'ckmb': 'CK-MB', 'creatine kinase-mb': 'CK-MB',
'creatine kinase': 'CK', 'ck': 'CK',
# 甲状腺
'total t4': 'T4', 'totalt4': 'T4', 'thyroxine(t4)': 'T4',
# 炎症
'aso': 'ASO', 'anti-streptolysin': 'ASO', 'anti streptolysin': 'ASO', 'aso(anti-streptolysin': 'ASO',
# 自身抗体
'anti smith': 'Anti-Sm', 'anti-sm': 'Anti-Sm',
'anti-n rnp': 'Anti-RNP', 'anti rnp': 'Anti-RNP',
}
# OCR数据格式多样
# 格式1: 项目名...: \n 数值 \n 单位 \n (参考范围)
# 格式2: 项目名...: \n 数值 H/L 单位 \n (参考范围)
# 格式3: 项目名...: \n 数值H% \n (参考范围)
# 跳过关键词 - 注意避免误匹配(如 'tel' 会匹配 'platelet'
skip_words = ['page ', 'patient name', 'doctor:', 'laboratory', 'specimen.', 'specimen type',
'collected date', 'printed', 'method:', 'bangkok', 'thailand',
'tel.', 'tel(', 'fax.', 'fax-', 'email:', 'iso 15189', 'iso15189',
'accreditation', 'lab no.', 'lab no:', 'labno', 'mrn.', 'mrn:', 'requested date',
'received date', 'address/', 'sex :', 'sex:', 'age :', 'age:',
'dob :', 'dob:', 'ref.no', 'copyright', 'reported by', 'authorised by',
'print date', 'remark:', 'remark(', 'confidential', 'this report',
'reference range', 'test name', 'result unit', 'edta blood',
'morphology:', 'morphology.', 'adequate', 'differential count',
'complete blood count', 'issue date', 'revision', 'normal range',
'for 10-year', 'this equation', 'calculated by', 'outlab',
'approved by', 'trimester', 'women(', 'female 21', 'post-menopause',
'cytoplasmic', 'oct1114', 'comment:', 'comment.', 'secs',
'report by', 'method:', 'method.', 'age:', 'age .', 'dr:', 'dr.',
'age...', # 移除了尿检项目过滤词: transparency, erythrocyte.., leucocyte.., urobilinogen..
# 过滤噪音数据 - 参考范围和标注被误识别
'borderline high', 'borderline low',
'female 12-', 'male 12-', 'female 14-', 'male 14-', 'female 15-', 'male 15-',
'female 16-', 'male 16-', 'female 17-', 'male 17-', 'female 18-', 'male 18-',
'female years', 'male years', 'thai male', 'thai female',
'serum am', 'serum pm', 'years 501', 'years 508', 'years 1717',
'years 546', 'years 468', 'years 231', 'years 225',
'scc 0', 'high =', 'low =', 'age = ', 'rbc = 0', 'high = 160',
'bilirubin = negative', 'bilirubin negative']
# 按key长度排序最长的优先匹配
sorted_keys = sorted(name_to_abb.keys(), key=len, reverse=True)
# 需要精确匹配的短key避免误匹配
# alt会误匹配cobalt/totalt4, ast会误匹配contrast等
exact_match_keys = {'ph', 'sg', 'ca', 'mg', 'na', 'k', 'cl', 'p', 'fe', 'zn', 'cu', 'ni', 'cr', 'mn', 'pb',
'alt', 'ast', 'ggt', 'alp', 'ldh', 'bun', 'ua', 'tg', 'tc', 't3', 't4', 'fsh', 'lh',
'hb', 'rbc', 'wbc', 'plt', 'mcv', 'mch', 'hct', 'rdw', 'mpv',
'crp', 'rf', 'ana', 'pth', 'nse', 'cea', 'afp', 'psa', 'hiv'}
def find_abb(project_name):
"""查找项目对应的ABB"""
pl = project_name.lower().strip()
# 对于短key要求精确匹配或单词边界匹配
for key in sorted_keys:
if key in exact_match_keys:
# 精确匹配项目名就是key或者key是独立单词
if pl == key or re.match(rf'^{key}[\s\.\:\d]', pl) or re.search(rf'\b{key}\b', pl):
return name_to_abb[key]
else:
if key in pl:
return name_to_abb[key]
# 生成ABB
words = [w for w in project_name.split() if len(w) > 0 and w[0].isalpha()]
if words:
return ''.join([w[0].upper() for w in words])[:6]
return project_name[:6].upper()
def parse_value_line(text):
"""解析数值行,返回 (result, point, unit)"""
text = text.strip()
result, point, unit = None, '', ''
# 格式1: "5.7H%" 或 "140H" 或 "230 H mg/dL" 或 "95" (数值开头)
m = re.match(r'^([\d\.]+)\s*([HL])?\s*(.*)$', text, re.IGNORECASE)
if m:
result = m.group(1)
if m.group(2):
point = '' if m.group(2).upper() == 'H' else ''
unit = m.group(3).strip() if m.group(3) else ''
return result, point, unit
# 格式2: 数值和单位合并 "158.00mg/dL" 或 "247.00mg/dL"
m = re.match(r'^([\d\.]+)([a-zA-Z/%]+[/\w]*)$', text)
if m:
result = m.group(1)
unit = m.group(2)
return result, '', unit
# 格式3: 定性结果 - 单字母血型(A/B/O/AB)或单词(Positive/Negative/Reactive等)
# 支持后面有额外内容如 "Yellow [Normal: Yellow]"
qualitative_patterns = [
r'^([ABO]|AB)\b', # 血型
r'^(Positive|Negative|Reactive|Non[- ]?[Rr]eactive|Normal|Abnormal|Adequate|Yellow|Clear|Straw|Amber)\b', # 定性结果
]
for pat in qualitative_patterns:
m = re.match(pat, text, re.IGNORECASE)
if m:
result = m.group(1)
return result, '', ''
# 格式4: 点号后跟数值 "......... 6.0 (4.5-8.0)" -> 提取6.0
m = re.match(r'^[\.:\s]+([<>]?\d+\.?\d*)\s*(.*)$', text)
if m:
result = m.group(1)
unit = m.group(2).strip()
return result, '', unit
return result, point, unit
i = 0
while i < len(lines):
line = lines[i].strip()
line_lower = line.lower()
# 跳过无关行
if any(w in line_lower for w in skip_words):
i += 1
continue
# 跳过空行
if len(line) == 0:
i += 1
continue
# 检查是否是项目名行 (包含 ... 或以 : 结尾)
# 支持中文冒号 和英文冒号 :
# 增强:支持特定的已知项目名,即使没有冒号
known_short_projects = ['ph', 'sg', 'pro', 'glu', 'nit', 'ket', 'bld', 'ery', 'leu', 'wbc', 'rbc', 'color', 'turbidity']
# 1. 标准格式:以冒号或点结尾
is_standard_project = re.match(r'^[A-Za-z][A-Za-z0-9\s\-\(\)\.]+[\.:\uff1a]+\s*$', line)
# 1.5 以(*)开头的项目名(如 (*)Thrombin Time- 不需要冒号结尾
is_star_project = re.match(r'^\(\*\)([A-Za-z][A-Za-z0-9\s\-]+)$', line)
# 2. 已知短项目名格式:可能是 "pH" 或 "pH 6.0" 或 "pH ..."
is_known_project = False
first_word = line.split()[0].lower().strip('.:') if line else ''
if first_word in known_short_projects:
is_known_project = True
if is_standard_project or is_known_project or is_star_project:
# 提取项目名
if is_standard_project:
project = re.sub(r'[\.:\]+\s*$', '', line).strip()
project = re.sub(r'\.+', '', project).strip()
# 移除开头的(*)
project = re.sub(r'^\(\*\)', '', project).strip()
elif is_star_project:
# 从(*)开头的行提取项目名
project = is_star_project.group(1).strip()
else:
# 对于已知项目,可能后面直接跟结果
parts = line.split(maxsplit=1)
project = parts[0].strip('.:')
# 如果后面有内容,可能是结果
remaining = parts[1] if len(parts) > 1 else ""
abb = find_abb(project)
# 读取后续行获取数值
result = None
unit = ""
reference = ""
point = ""
# 如果是已知项目且同一行有内容,尝试直接解析结果
if is_known_project and 'remaining' in locals() and remaining:
# 尝试解析 remaining
r, p, u = parse_value_line(remaining)
if r:
result = r
point = p
unit = u
j = i + 1
# 如果还没有结果,继续往下找
while j < len(lines) and j < i + 6 and result is None:
next_line = lines[j].strip()
next_lower = next_line.lower()
# 跳过无关行
if any(w in next_lower for w in skip_words):
j += 1
continue
# 检查是否是新的项目名
if re.match(r'^[A-Za-z][A-Za-z0-9\s\-\(\)\.]+[\.:\]+\s*$', next_line):
break
# 参考范围 (括号包围) - 先检查这个
if (next_line.startswith('(') or next_line.startswith('<') or
next_line.startswith('>')) and result is not None:
reference = next_line if next_line.startswith('(') else f'({next_line})'
j += 1
break
# 尝试解析数值行
if result is None:
r, p, u = parse_value_line(next_line)
if r:
result = r
point = p if p else point
unit = u if u else unit
j += 1
continue
# 单独的单位行
if re.match(r'^[\*a-zA-Z0-9\^\/\%\-\.]+$', next_line) and not next_line[0].isdigit():
if not unit:
unit = next_line
j += 1
continue
j += 1
# 保存结果 - 过滤噪音
if result and abb:
project_lower = project.lower()
# 过滤噪音项目名和无效结果
noise_projects = ['age', 'high', 'low', 'a', 'h', 'l', 'clinical info',
'context', 'guidelines', 'standards', 'personal data',
'copyright', 'report', 'specimen', 'method']
noise_patterns = ['female ', 'male ', 'years ', 'handled following',
'evolving clinical', 'privacy laws']
is_noise = (
project_lower in noise_projects or
(project_lower == 'rbc' and result == '0') or
result in ['.', ':', '-', '/'] or # 无效结果
len(project) > 50 or # 项目名过长肯定是噪音
any(p in project_lower for p in noise_patterns)
)
if not is_noise:
# 白细胞分类项目特殊处理:根据参考范围判断是数量还是百分比
# 百分比的参考范围通常是 0-100 之间的数值,如 (46.5-75.0)
# 数量的参考范围通常包含 10^3 或 *10 等单位
wbc_diff_abbs = {'NEUT', 'LYMPH', 'MONO', 'EOS', 'BAS'}
if abb.upper() in wbc_diff_abbs:
is_percentage = False
# 检查单位是否是百分比
if unit and '%' in unit:
is_percentage = True
# 检查参考范围是否是百分比形式0-100之间的数值
elif reference:
ref_match = re.search(r'\(?([\d\.]+)\s*[-]\s*([\d\.]+)\)?', reference)
if ref_match:
try:
low = float(ref_match.group(1))
high = float(ref_match.group(2))
# 如果参考范围在0-100之间且没有10^3等单位标识认为是百分比
if 0 <= low <= 100 and 0 <= high <= 100 and '10^' not in reference and '*10' not in reference:
is_percentage = True
except:
pass
if is_percentage:
abb = abb.upper() + '%'
# 如果单位为空,添加%
if not unit:
unit = '%'
items.append({
'abb': abb,
'project': project,
'result': result,
'point': point,
'unit': unit,
'reference': reference,
'source': source_file
})
i = j
continue
# 检查定性结果格式: "项目名...: 结果" 或 "项目名..... . 结果"
# 更宽松:项目名后有点(可含空格),匹配定性结果
match = re.match(r'^(.+?)[\.\s]{2,}[:\]?\s*(Negative|Positive|Non[- ]?Reactive|Reactive|Normal|B|A|AB|O|Yellow|Clear)\b', line, re.IGNORECASE)
if match:
project = match.group(1).strip()
project = re.sub(r'\.+', '', project).strip()
result = match.group(2).strip()
# 过滤噪音 - 只过滤明确的噪音
project_lower = project.lower()
is_noise = (
project_lower in ['age', 'high', 'low', 'a', 'h', 'l'] or
any(p in project_lower for p in ['female ', 'male ', 'years '])
)
if not is_noise:
abb = find_abb(project)
items.append({
'abb': abb,
'project': project,
'result': result,
'point': '',
'unit': '',
'reference': '',
'source': source_file
})
i += 1
continue
# 检查带冒号的行中是否直接包含定性结果(备用匹配)
# 如 "HIV-1/HIV-2 Antibody.....: Non Reactive"
match = re.match(r'^([A-Za-z][A-Za-z0-9\s\-\(\)/\.]+)[:\]+\s*(Non[- ]?[Rr]eactive|Reactive|Negative|Positive|Yellow|Clear)$', line, re.IGNORECASE)
if match:
project = match.group(1).strip()
project = re.sub(r'\.+', '', project).strip()
result = match.group(2)
abb = find_abb(project)
items.append({
'abb': abb,
'project': project,
'result': result,
'point': '',
'unit': '',
'reference': '',
'source': source_file
})
i += 1
continue
# 检查带点号或冒号的行中是否直接包含数值
# 如 "ESR 1 Hour ................... 20 H mm/hr" 或 "pH......... 6.0 (4.5-8.0)"
# 更宽松:项目名后有点(可含空格),结果以数字或<开头
match = re.match(r'^(.+?)[\.\s]{2,}[:\]?\s*([<>]?\d+\.?\d*)\s*([HL])?\s*(.*)$', line, re.IGNORECASE)
if match:
project = match.group(1).strip()
project = re.sub(r'\.+', '', project).strip()
result = match.group(2)
point = '' if match.group(3) and match.group(3).upper() == 'H' else ('' if match.group(3) and match.group(3).upper() == 'L' else '')
rest = match.group(4).strip() if match.group(4) else ''
# 解析剩余部分获取单位和参考范围
unit = ''
reference = ''
if rest:
ref_match = re.search(r'\(([^\)]+)\)', rest)
if ref_match:
reference = f'({ref_match.group(1)})'
rest = rest[:ref_match.start()].strip()
unit = rest
abb = find_abb(project)
items.append({
'abb': abb,
'project': project,
'result': result,
'point': point,
'unit': unit,
'reference': reference,
'source': source_file
})
i += 1
continue
# 备用匹配1: 项目名(括号内容).: 数值 格式
# 如 "CEA(Carcinoembryonic Antigen).: 1.41" 或 "Vitamin D(25-OH...): 35.00"
match = re.match(r'^([A-Za-z][A-Za-z0-9\s\-]+)\([^\)]+\)[\.:\s]+\s*([<>]?\d+\.?\d*)\s*(.*)$', line)
if match:
project = match.group(1).strip()
result = match.group(2)
rest = match.group(3).strip()
abb = find_abb(project)
unit = ''
reference = ''
if rest:
ref_match = re.search(r'\(([^\)]+)\)', rest)
if ref_match:
reference = f'({ref_match.group(1)})'
rest = rest[:ref_match.start()].strip()
unit = rest
items.append({
'abb': abb, 'project': project, 'result': result,
'point': '', 'unit': unit, 'reference': reference, 'source': source_file
})
i += 1
continue
# 备用匹配2: 连续点号后跟冒号或空格和结果
# 如 "Color........................ Yellow" 或 "pH......... 6.0" 或 "Specific Gravity..............: 1.030"
match = re.match(r'^([A-Za-z][A-Za-z0-9\s\-/\(\)]*?)\.{3,}[:\s]+(.+)$', line)
if match:
project = match.group(1).strip()
rest = match.group(2).strip()
abb = find_abb(project)
# 解析rest可能是 "Yellow [Normal: Yellow]" 或 "6.0 (4.5-8.0)" 或 "1.030 (1.003-1.030)"
result = None
unit = ''
reference = ''
# 先尝试提取数值
num_match = re.match(r'^([<>]?\d+\.?\d*)\s*([HL])?\s*(.*)$', rest, re.IGNORECASE)
if num_match:
result = num_match.group(1)
rest2 = num_match.group(3).strip()
ref_match = re.search(r'\(([^\)]+)\)', rest2)
if ref_match:
reference = f'({ref_match.group(1)})'
rest2 = rest2[:ref_match.start()].strip()
unit = rest2
else:
# 尝试提取定性结果
qual_match = re.match(r'^(Negative|Positive|Yellow|Clear|Normal|Non[- ]?Reactive|Reactive)\b', rest, re.IGNORECASE)
if qual_match:
result = qual_match.group(1)
if result and abb:
items.append({
'abb': abb, 'project': project, 'result': result,
'point': '', 'unit': unit, 'reference': reference, 'source': source_file
})
i += 1
continue
i += 1
return items
def clean_extracted_data(items: list) -> list:
"""清洗提取的数据修复常见OCR解析错误"""
import re
cleaned = []
for item in items:
abb = item.get('abb', '').upper()
result = item.get('result', '')
unit = item.get('unit', '')
project = item.get('project', '')
reference = item.get('reference', '')
# 1. 过滤明显的噪音数据
if abb in ['A', 'H', 'L', 'R', 'AGE']:
continue
if project.lower() in ['age', 'high', 'low', 'received', 'collected']:
continue
if 'phase' in project.lower() or 'trimester' in project.lower():
continue
# 2. 修复result在unit字段的情况如Color的Yellow
if result in ['', '.', '-', '/'] and unit:
# 颜色值
colors = ['yellow', 'amber', 'straw', 'colorless', 'red', 'brown', 'dark', 'clear']
for color in colors:
if color in unit.lower():
result = color.capitalize()
# 从unit中提取参考范围
if '[' in unit and 'normal' in unit.lower():
ref_match = re.search(r'\[.*?(\d.*?)\]', unit, re.IGNORECASE)
if ref_match:
reference = ref_match.group(1)
unit = ''
break
# 定性结果
qualitative = ['negative', 'positive', 'reactive', 'non-reactive', 'normal']
for q in qualitative:
if q in unit.lower():
result = q.capitalize()
unit = ''
break
# 3. 过滤无效结果
if result in ['', '.', '-', '/', '00', '99', '999']:
continue
# 4. 修复unit中包含参考范围的情况
if unit and ('[' in unit or 'normal' in unit.lower()):
# 提取真正的单位
unit_match = re.match(r'^([a-zA-Z0-9\^/%\*]+)', unit)
if unit_match:
real_unit = unit_match.group(1)
if len(real_unit) <= 15:
unit = real_unit
else:
unit = ''
else:
unit = ''
# 5. 修复特定ABB的数据
# pH应该在4.0-9.0范围
if abb == 'PH':
try:
val = float(result.replace(',', '.'))
if not (4.0 <= val <= 9.0):
continue
except:
continue
# SG应该在1.000-1.050范围
if abb == 'SG':
try:
val = float(result.replace(',', '.'))
if not (1.000 <= val <= 1.050):
continue
except:
continue
# 6. 更新item
item['result'] = result
item['unit'] = unit
if reference and not item.get('reference'):
item['reference'] = reference
cleaned.append(item)
return cleaned
def extract_all_pdfs(pdf_dir: str) -> tuple:
"""提取目录下所有PDF的数据
Returns:
tuple: (all_items, ocr_texts) - 检测项列表和每个PDF的OCR原文字典
"""
pdf_path = Path(pdf_dir)
pdf_files = list(pdf_path.glob("*.pdf"))
all_items = []
ocr_texts = {} # {pdf_name: ocr_text}
for pdf_file in pdf_files:
print(f"\n📄 处理: {pdf_file.name}")
text = extract_pdf_text(str(pdf_file))
ocr_texts[pdf_file.name] = text # 保留OCR原文供后续复用
# 使用优化版解析函数
items = parse_medical_data_v2(text, pdf_file.name)
print(f" ✓ 提取 {len(items)} 个检测项")
all_items.extend(items)
# 清洗数据 - 使用优化版清洗函数
all_items = clean_extracted_data_v2(all_items)
print(f"\n ✓ 清洗后保留 {len(all_items)} 个有效检测项")
return all_items, ocr_texts
def match_with_template(extracted_items: list, template_config: dict) -> dict:
"""将提取的数据与模板结构匹配"""
import re
# 兼容新旧配置格式
if 'items' in template_config:
# 旧格式
template_items = template_config['items']
elif 'modules' in template_config:
# 新格式从modules中提取所有items
template_items = []
for module_name, module_data in template_config['modules'].items():
for item in module_data.get('items', []):
template_items.append({
'abb': item.get('abb', ''),
'project': item.get('project', ''),
'project_cn': item.get('project_cn', ''),
'module': module_name
})
else:
template_items = []
# 结果有效性验证规则
def is_valid_result(abb, result):
"""检查结果是否对该项目有效"""
if not result:
return False
result_lower = result.lower().strip()
abb_upper = abb.upper()
# 定性结果项目
qualitative = ['PRO', 'GLU', 'KET', 'BIL', 'NIT', 'URO', 'LEU', 'BLD',
'HBSAG', 'HBSAB', 'HBEAG', 'HBEAB', 'HBCAB', 'ANTI-HCV', 'HIV', 'RPR',
'ANA', 'ANTI-SM', 'ANTI-RNP', 'RF']
valid_qualitative = ['negative', 'positive', 'trace', 'normal', 'abnormal',
'reactive', 'non-reactive', 'nonreactive', 'weak positive',
'1+', '2+', '3+', '4+', '+-']
if abb_upper in qualitative:
# 定性结果有效
if result_lower in valid_qualitative or result_lower.replace('+', '').replace('-', '') in ['1', '2', '3', '4']:
return True
# 数值结果也有效有些定性项目也有定量结果如HBsAb抗体滴度
if re.search(r'\d', result):
return True
return False
# 血型
if abb_upper in ['ABO', 'RH']:
return result_lower in ['a', 'b', 'ab', 'o', 'positive', 'negative', 'rh+', 'rh-', '+', '-']
# 颜色
if abb_upper == 'COLOR':
return result_lower in ['yellow', 'amber', 'straw', 'colorless', 'red', 'brown', 'dark']
# pH值
if abb_upper == 'PH':
try:
val = float(result.replace(',', '.'))
return 4.0 <= val <= 9.0
except:
return False
# 比重SG
if abb_upper == 'SG':
try:
val = float(result.replace(',', '.'))
return 1.000 <= val <= 1.050
except:
return False
# 数值型结果 - 检查是否包含数字
if re.search(r'\d', result):
# 排除明显错误的值
if len(result) > 30: # 太长
return False
if result_lower in ['00', '99', '999']: # 占位符
return False
return True
return False
# 建立ABB索引
template_by_abb = {}
for item in template_items:
abb = item['abb'].upper()
template_by_abb[abb] = item
# 处理别名
if '/' in abb:
for part in abb.split('/'):
template_by_abb[part] = item
# 先按ABB分组提取数据使用大写作为key进行匹配但保留原始ABB
items_by_abb = {}
original_abb_map = {} # 保存原始ABB大小写
for item in extracted_items:
abb_upper = item['abb'].upper()
original_abb = item['abb'] # 保留原始大小写
if abb_upper not in items_by_abb:
items_by_abb[abb_upper] = []
original_abb_map[abb_upper] = original_abb # 记录原始ABB
items_by_abb[abb_upper].append(item)
matched = {}
unmatched = []
for abb_upper, items in items_by_abb.items():
original_abb = original_abb_map.get(abb_upper, abb_upper) # 获取原始ABB
# 过滤有效结果
valid_items = [i for i in items if is_valid_result(abb_upper, i.get('result', ''))]
if not valid_items:
# 如果没有有效项,使用第一个(可能是定性结果)
valid_items = items[:1]
# 选择最佳匹配(优先选择有异常标记的,其次是有单位和参考范围的)
best = valid_items[0]
for item in valid_items:
score = 0
# 异常标记权重最高(+10分
point = item.get('point', '').strip()
if point in ['', '', 'H', 'L', '', '']:
score += 10
if item.get('unit'): score += 1
if item.get('reference'): score += 1
if item.get('project'): score += 1
best_point = best.get('point', '').strip()
best_score = (10 if best_point in ['', '', 'H', 'L', '', ''] else 0) + \
(1 if best.get('unit') else 0) + \
(1 if best.get('reference') else 0) + \
(1 if best.get('project') else 0)
if score > best_score:
best = item
# 匹配到模板使用原始ABB作为key
if abb_upper in template_by_abb:
# 直接匹配优先
if original_abb not in matched: # 避免重复覆盖
# 添加模块信息和中文项目名称
best['module'] = template_by_abb[abb_upper].get('module', '')
# 使用配置文件中的中文项目名称
if template_by_abb[abb_upper].get('project_cn'):
best['project_cn'] = template_by_abb[abb_upper]['project_cn']
matched[original_abb] = best # 使用原始ABB作为key
else:
# 模糊匹配 - 只匹配有意义的相似性,避免'R' in 'COLOR'这种错误
found = False
for t_abb in template_by_abb:
# 要求至少3个字符匹配且匹配部分占比高
if len(abb_upper) >= 3 and len(t_abb) >= 3:
if abb_upper == t_abb:
if original_abb not in matched:
# 添加模块信息和中文项目名称
best['module'] = template_by_abb[t_abb].get('module', '')
if template_by_abb[t_abb].get('project_cn'):
best['project_cn'] = template_by_abb[t_abb]['project_cn']
matched[original_abb] = best # 使用原始ABB作为key
found = True
break
if not found:
unmatched.append(best)
print(f"\n匹配结果: {len(matched)} 个匹配, {len(unmatched)} 个未匹配")
# 将未匹配的项目也加入结果中,以便后续作为缺失项目处理
for item in unmatched:
original_abb = item.get('abb', '') # 使用原始ABB
if original_abb and original_abb not in matched:
matched[original_abb] = item
return matched
def remove_placeholder_tables(doc):
"""
删除原有模板中的数据行(包括占位符行和已填充数据行)
保留:模块标题行
删除表头行、数据行、Clinical Significance行
注意模块标题表格最终应该只剩下1行模块标题行
"""
import re
removed_count = 0
# 模块标题关键词(完整的模块名称)
module_title_patterns = [
'blood sugar', 'blood count', 'complete blood count', 'urine detection', 'urine test',
'liver function', 'kidney function', 'lipid profile', 'lipid panel',
'thyroid function', 'thyroid', 'tumor marker', 'electrolyte', 'serum electrolyte',
'coagulation', 'blood coagulation', 'immune', 'humoral immunity',
'bone metabolism', 'infectious disease', 'four infectious',
'heavy metal', 'microelement', 'trace element',
'cardiovascular', 'thromboembolism', 'autoantibody', 'autoimmune',
'blood type', 'inflammatory', 'lymphocyte',
'female hormone', 'male hormone', 'female-specific', 'imaging',
'myocardial enzyme', 'cardiac enzyme',
'血常规', '尿液检测', '肝功能', '肾功能', '血脂', '甲状腺功能', '甲状腺',
'肿瘤标志物', '电解质', '血糖', '凝血功能', '凝血', '体液免疫', '免疫功能',
'骨代谢', '传染病', '重金属', '微量元素', '心脑血管', '自身抗体',
'血型', '炎症', '淋巴细胞', '女性激素', '男性激素', '女性专项', '影像',
'心肌酶', '女性荷尔蒙', '男性荷尔蒙'
]
def is_module_title_row(row_text):
"""
判断是否是真正的模块标题行
模块标题行的特征:
1. 完整的模块名称重复出现多次(如 "Blood Sugar\n血糖 Blood Sugar\n血糖..."
2. 行文本主要由模块名称组成,没有其他数据内容
"""
row_text_lower = row_text.lower()
# 检查是否有完整的模块名称重复出现
for pattern in module_title_patterns:
count = row_text_lower.count(pattern)
if count >= 3: # 模块标题行通常重复3次以上
# 额外检查:行文本长度应该与重复的模块名称长度相近
pattern_total_len = len(pattern) * count
if len(row_text_lower) < pattern_total_len * 3:
return True
return False
for table in doc.tables:
rows_to_remove = []
for row_idx, row in enumerate(table.rows):
row_text = ' '.join([c.text for c in row.cells]).strip()
row_text_lower = row_text.lower()
# 空行:删除
if not row_text or row_text.replace(' ', '') == '':
rows_to_remove.append(row)
continue
# 模块标题行:保留
if is_module_title_row(row_text):
# 如果包含占位符,清除占位符文本但保留行
if '{{' in row_text:
placeholder_pattern = re.compile(r'\{\{[^}]*\}\}')
for cell in row.cells:
if '{{' in cell.text:
cell.text = placeholder_pattern.sub('', cell.text).strip()
continue
# Clinical Significance行删除会在后续步骤中重新生成
if 'clinical significance' in row_text_lower or '临床意义' in row_text:
rows_to_remove.append(row)
continue
# 其他所有行都删除(包括表头行和数据行)
rows_to_remove.append(row)
# 删除标记的行
for row in rows_to_remove:
try:
tbl = table._tbl
tbl.remove(row._tr)
removed_count += 1
except:
pass
return removed_count
def find_module_title_position(doc, module_name):
"""
找到模块标题在body中的位置
返回模块标题表格的位置,新表格应插入到这个位置之后
注意:模块标题在模板中是表格的第一行,不是段落
关键区分:
- 模块标题表格:标题行是重复的模块名称(如 "Blood Sugar\n血糖 Blood Sugar\n血糖..."
- 数据表格Clinical Significance 行是长文本描述,可能包含关键词但不是标题
"""
# 标准模块名称到搜索关键词的映射
module_titles = {
# 24个标准模块
'Urine Test': ['urine test', 'urine detection', '尿液检测', '尿常规'],
'Complete Blood Count': ['complete blood count', 'cbc', '血常规'],
'Blood Sugar': ['blood sugar', '糖代谢', '血糖'],
'Lipid Profile': ['lipid profile', 'lipid panel', '血脂'],
'Blood Type': ['blood type', '血型'],
'Blood Coagulation': ['blood coagulation', 'coagulation', '凝血功能', '凝血'],
'Four Infectious Diseases': ['infectious disease', '传染病', 'four infectious'],
'Serum Electrolytes': ['serum electrolyte', 'electrolyte', '电解质', '血清电解质'],
'Liver Function': ['liver function', '肝功能'],
'Kidney Function': ['kidney function', '肾功能'],
'Myocardial Enzyme': ['myocardial enzyme', 'cardiac enzyme', '心肌酶', '心肌酶谱'],
'Thyroid Function': ['thyroid function', '甲状腺功能', '甲功'],
'Thromboembolism': ['thromboembolism', 'cardiovascular risk', '心脑血管', '血栓'],
'Bone Metabolism': ['bone metabolism', '骨代谢'],
'Microelement': ['microelement', 'trace element', 'heavy metal', '微量元素', '重金属'],
'Lymphocyte Subpopulation': ['lymphocyte subpopulation', 'lymphocyte', '淋巴细胞亚群'],
'Humoral Immunity': ['humoral immunity', 'immune function', '体液免疫', '免疫功能'],
'Inflammatory Reaction': ['inflammatory reaction', 'inflammation', '炎症', '血沉'],
'Autoantibody': ['autoantibody', 'autoimmune', '自身抗体', '自身免疫'],
'Female Hormone': ['female hormone', '女性激素', '女性荷尔蒙'],
'Male Hormone': ['male hormone\n男性荷尔蒙', '男性激素', '男性荷尔蒙male hormone'],
'Tumor Markers': ['tumor marker', '肿瘤标志物'],
'Imaging': ['imaging', '影像'],
'Female-specific': ['female-specific', 'gynecological', '妇科', '女性专项'],
}
titles = module_titles.get(module_name, [module_name.lower()])
body = doc.element.body
def is_module_title_row(row_text):
"""
判断是否是模块标题行(而不是 Clinical Significance 行)
模块标题行特征:
1. 包含重复的模块名称(如 "Blood Sugar\n血糖 Blood Sugar\n血糖..."
2. 不以 "Clinical Significance" 开头
3. 不包含长描述性内容
"""
row_text_lower = row_text.lower().strip()
# 排除 Clinical Significance 行
if row_text_lower.startswith('clinical significance'):
return False
if '临床意义' in row_text and len(row_text) > 100:
return False
# 检查是否是重复模式的标题行
# 模块标题行通常是 "Module Name\n中文名 Module Name\n中文名..." 这种重复模式
for title in titles:
title_lower = title.lower()
# 如果关键词在文本中出现多次(>=2很可能是标题行
if row_text_lower.count(title_lower) >= 2:
# 额外检查排除包含长描述的Clinical Significance行
# Clinical Significance行通常包含这些描述性词汇
cs_indicators = ['used to', 'helps to', 'reflects', 'indicates', 'evaluating',
'diagnosis of', 'marker of', 'assessment', 'screening']
if any(ind in row_text_lower for ind in cs_indicators) and len(row_text) > 500:
return False
return True
# 如果文本很短且包含关键词,也可能是标题行
if len(row_text) < 150 and title_lower in row_text_lower:
# 额外检查:排除包含描述性词汇的行
description_words = ['content', 'level', 'reflects', 'indicates', 'assisting',
'diagnosis', 'evaluating', 'normal', 'reference']
if not any(dw in row_text_lower for dw in description_words):
return True
return False
# 遍历所有表格找模块标题
for i, table in enumerate(doc.tables):
if len(table.rows) == 0:
continue
# 只检查前3行
for row_idx in range(min(3, len(table.rows))):
row_text = ' '.join([c.text.strip() for c in table.rows[row_idx].cells])
row_text_lower = row_text.lower()
# 检查是否包含关键词
if any(title in row_text_lower for title in titles):
# 进一步验证是否是模块标题行
if is_module_title_row(row_text):
# 找到模块标题返回该表格在body中的位置
tbl_element = table._tbl
for idx, child in enumerate(body):
if child is tbl_element:
return idx
return -1
def detect_gender(matched_data: dict, abb_config: dict) -> str:
"""
【已弃用】根据匹配到的荷尔蒙项目检测性别
注意此函数已不再使用。现在统一从OCR文本中提取性别信息通过patient_info['gender'])。
保留此函数仅作为备用参考。
原判断逻辑:
1. 如果有 AMH抗缪勒氏管激素→ 女性AMH 只在女性荷尔蒙模块中)
2. 如果有 TPSA/FPSA前列腺特异性抗原→ 男性(前列腺是男性特有器官)
3. 如果有 CA125/CA15-3/SCC女性肿瘤标志物→ 女性
4. 如果都没有,检查 E2雌二醇的值女性 E2 通常 > 100 pmol/L
注意COR/Cortisol 不参与判断,因为它是需要根据性别分配的项目
"""
# 获取别名映射
abb_aliases = abb_config.get('abb_aliases', {})
# 标准化 ABB 的辅助函数
def normalize(abb):
abb_upper = abb.upper().strip()
return abb_aliases.get(abb, abb_aliases.get(abb_upper, abb)).upper()
# 检查匹配数据中的项目
has_amh = False # 女性特有
has_psa = False # 男性特有
has_female_tumor_markers = False # 女性肿瘤标志物
e2_value = None # 雌二醇值
for abb, data in matched_data.items():
result = data.get('result', '')
if not result or result in ['', '.', '-', '/']:
continue
abb_upper = abb.upper().strip()
normalized = normalize(abb)
# 检查 AMH女性特有
if normalized == 'AMH' or abb_upper == 'AMH':
has_amh = True
print(f" 发现 AMH抗缪勒氏管激素→ 女性特有项目")
# 检查 PSA男性特有
if normalized in ['TPSA', 'FPSA', 'PSA', 'F/TPSA'] or abb_upper in ['TPSA', 'FPSA', 'PSA', 'F/TPSA']:
has_psa = True
print(f" 发现 {abb}(前列腺特异性抗原)→ 男性特有项目")
# 检查女性肿瘤标志物
if normalized in ['CA125', 'CA15-3', 'CA153', 'SCC'] or abb_upper in ['CA125', 'CA15-3', 'CA153', 'SCC']:
has_female_tumor_markers = True
print(f" 发现 {abb}(女性肿瘤标志物)→ 女性特有项目")
# 记录 E2 值
if normalized == 'E2' or abb_upper == 'E2':
try:
e2_value = float(result.replace(',', '').strip())
print(f" 发现 E2雌二醇= {e2_value}")
except:
pass
# 判断性别
if has_psa:
print(f" ✓ 检测结果: 男性 (发现前列腺特异性抗原)")
return 'male'
if has_amh or has_female_tumor_markers:
print(f" ✓ 检测结果: 女性 (发现女性特有项目)")
return 'female'
# 如果有 E2 值,根据数值判断(女性 E2 通常 > 50 pmol/L
if e2_value is not None:
if e2_value > 50:
print(f" ✓ 检测结果: 女性 (E2 = {e2_value} > 50)")
return 'female'
else:
print(f" ✓ 检测结果: 男性 (E2 = {e2_value} <= 50)")
return 'male'
# 默认返回女性(因为 COR 原本在女性模块中)
print(f" ✓ 检测结果: 女性 (默认)")
return 'female'
def fill_word_template_new(template_path: str, matched_data: dict, output_path: str, api_key: str = None, patient_info: dict = None):
"""
新版填充逻辑:
1. 按照2.pdf标准模块顺序和项目顺序排列
2. 先删除原有占位符表格行
3. 为每个ABB单独创建新表格结构
4. 未匹配到标准项目的数据通过DeepSeek分析后添加到对应模块尾部
Args:
template_path: Word模板路径
matched_data: 匹配的数据字典
output_path: 输出文件路径
api_key: DeepSeek API密钥可选
patient_info: 患者信息字典包含gender字段从OCR文本提取
"""
doc = Document(template_path)
# 第一步:删除占位符行
print("\n 🧹 正在删除占位符行...")
removed = remove_placeholder_tables(doc)
print(f" ✓ 已删除 {removed} 个占位符行")
# 加载配置获取模块信息和标准顺序
from config import load_abb_config, get_standard_module_order, sort_items_by_standard_order, normalize_abb, normalize_module_name
abb_config = load_abb_config()
abb_to_module = abb_config.get('abb_to_module', {})
abb_to_info = abb_config.get('abb_to_info', {})
standard_module_order = get_standard_module_order()
# 性别检测从OCR文本中提取的patient_info获取性别
# 将中文"男性"/"女性"转换为英文"male"/"female"
gender_from_ocr = patient_info.get('gender', '') if patient_info else ''
if gender_from_ocr == '男性':
detected_gender = 'male'
print(f" ✓ 性别: 男性 (从OCR文本提取)")
elif gender_from_ocr == '女性':
detected_gender = 'female'
print(f" ✓ 性别: 女性 (从OCR文本提取)")
else:
# 如果没有从OCR提取到性别使用默认值女性
detected_gender = 'female'
print(f" ⚠️ 未从OCR文本提取到性别使用默认值: 女性")
# 根据性别确定荷尔蒙项目应该分配到的模块
hormone_target_module = 'Male Hormone' if detected_gender == 'male' else 'Female Hormone'
# 定义所有荷尔蒙相关的ABB这些项目在男性和女性荷尔蒙模块中都可能出现
hormone_abbs = {
'E2', 'PROG', 'FSH', 'LH', 'PRL', 'T', 'DHEAS', 'COR', 'CORTISOL',
'IGF-1', 'IGF1', 'AMH', 'TESTO'
}
# 按模块分组所有数据
by_module = {}
unclassified_items = [] # 无法分类的项目
config_classified = 0 # 配置文件分类计数
deepseek_classified = 0 # DeepSeek分类计数
print("\n 📂 步骤1: 根据配置文件分类...")
for abb, data in matched_data.items():
result = data.get('result', '')
if not result or result in ['', '.', '-', '/']:
continue
# 标准化ABB名称
normalized_abb = normalize_abb(abb, abb_config)
# 特殊处理荷尔蒙项目:根据检测到的性别分配到对应的荷尔蒙模块
# 注意:必须优先于配置文件映射,确保根据性别正确分配
abb_upper = abb.upper().strip()
normalized_upper = normalized_abb.upper().strip()
is_hormone_abb = (abb_upper in hormone_abbs or normalized_upper in hormone_abbs)
# 如果配置文件中有模块映射,检查是否是荷尔蒙模块
if not is_hormone_abb:
# 先检查配置文件中的模块映射
module_from_config = abb_to_module.get(normalized_abb, '')
if not module_from_config:
module_from_config = abb_to_module.get(abb, '')
if not module_from_config:
module_from_config = abb_to_module.get(normalized_abb.upper(), '')
if not module_from_config:
module_from_config = abb_to_module.get(abb.upper(), '')
# 如果配置文件中映射到荷尔蒙模块,也视为荷尔蒙项目
if module_from_config in ['Male Hormone', 'Female Hormone']:
is_hormone_abb = True
# 如果是荷尔蒙项目,根据性别分配到对应的模块
if is_hormone_abb:
# 根据性别确定目标模块:男性→男性荷尔蒙,女性→女性荷尔蒙
target_module = 'Male Hormone' if detected_gender == 'male' else 'Female Hormone'
if target_module not in by_module:
by_module[target_module] = []
by_module[target_module].append((abb, data))
config_classified += 1
print(f"{abb} → [{target_module}] (荷尔蒙项目,根据性别: {detected_gender})")
continue
# 非荷尔蒙项目:使用配置文件中的模块映射
# 先尝试精确匹配处理大小写敏感的ABB如TG/Tg
module = abb_to_module.get(normalized_abb, '')
if not module:
module = abb_to_module.get(abb, '')
# 再尝试大写匹配(向后兼容)
if not module:
module = abb_to_module.get(normalized_abb.upper(), '')
if not module:
module = abb_to_module.get(abb.upper(), '')
if module:
# 配置文件分类成功
if module not in by_module:
by_module[module] = []
by_module[module].append((abb, data))
config_classified += 1
else:
# 需要DeepSeek分类
unclassified_items.append((abb, data))
print(f" ✓ 配置文件分类: {config_classified} 个项目")
print(f" ⏳ 待DeepSeek分类: {len(unclassified_items)} 个项目")
# 使用DeepSeek分类未匹配的项目
if unclassified_items:
print("\n 🤖 步骤2: 使用DeepSeek分类未匹配项目...")
items_to_remove = []
for abb, data in unclassified_items:
module = classify_abb_module(abb, data.get('project', abb), api_key)
if module:
# 标准化模块名称
original_module = module
module = normalize_module_name(module, abb_config)
# 如果DeepSeek分类结果是荷尔蒙模块必须根据性别重新分配
if module in ['Male Hormone', 'Female Hormone']:
# 根据性别确定目标模块:男性→男性荷尔蒙,女性→女性荷尔蒙
module = 'Male Hormone' if detected_gender == 'male' else 'Female Hormone'
print(f"{abb} → [{original_module}] → [{module}] (荷尔蒙项目,根据性别: {detected_gender})")
elif original_module != module:
print(f"{abb} → [{original_module}] → [{module}]")
else:
print(f"{abb} → [{module}]")
if module not in by_module:
by_module[module] = []
by_module[module].append((abb, data))
deepseek_classified += 1
items_to_remove.append((abb, data))
else:
print(f"{abb} 无法分类")
# 从未分类列表中移除已分类的项目
for item in items_to_remove:
unclassified_items.remove(item)
print(f" ✓ DeepSeek分类: {deepseek_classified} 个项目")
total_classified = config_classified + deepseek_classified
print(f"\n 📋 分类完成: 共 {total_classified} 个项目,分布在 {len(by_module)} 个模块")
if unclassified_items:
print(f" ⚠️ {len(unclassified_items)} 个项目无法分类: {[i[0] for i in unclassified_items]}")
# 第三步:按标准模块顺序处理
added_count = 0
skipped_modules = []
print("\n 📝 步骤3: 按标准顺序填充模块...")
# 辅助函数:在项目列表中查找配对项目
def find_paired_item_in_list(items, target_abb):
"""在项目列表中查找指定ABB的项目"""
target_upper = target_abb.upper().strip()
for abb, data in items:
if abb.upper().strip() == target_upper:
return (abb, data)
return None
# 辅助函数:处理模块中的项目(支持配对项目)
def process_module_items(doc, module, sorted_items, position, abb_to_info, abb_config, api_key, gender=None):
"""处理模块中的项目,支持配对项目合并显示"""
nonlocal added_count
insert_pos = position
is_first_item = True
processed_abbs = set() # 记录已处理的ABB
for abb, data in sorted_items:
abb_upper = abb.upper().strip()
# 跳过已处理的项目
if abb_upper in processed_abbs:
continue
result = data.get('result', '')
point = data.get('point', '')
reference = data.get('reference', '')
unit = data.get('unit', '')
# 获取项目信息
normalized_abb = normalize_abb(abb, abb_config)
info = abb_to_info.get(normalized_abb, {})
if not info:
info = abb_to_info.get(abb, {})
if not info:
info = abb_to_info.get(normalized_abb.upper(), {})
if not info:
info = abb_to_info.get(abb.upper(), {})
# 优先使用配置文件中的中文名称其次使用data中的project_cn
name = info.get('project_cn') or data.get('project_cn')
# 如果没有中文名称调用DeepSeek翻译
if not name:
english_name = info.get('project') or data.get('project', abb)
name = translate_project_name_to_chinese(abb, english_name, api_key)
# 检查是否是配对项目,并且配对项目是否都存在于数据中
if is_paired_item(abb):
paired_abb, is_base, base_cn, percent_cn = get_paired_item(abb)
# 查找配对项目是否存在于当前模块的数据中
paired_item_data = find_paired_item_in_list(sorted_items, paired_abb) if paired_abb else None
if paired_item_data:
# 两个配对项目都存在,创建配对表格
paired_abb_actual, paired_data = paired_item_data
# 确定基础项和百分比项的ABB和数据
# 使用原始数据中的ABB保持PDF中的大小写格式
if is_base:
# 当前项是基础项
base_abb_name = abb # 原始ABB
percent_abb_name = paired_abb_actual # 原始配对ABB
base_result = result
base_point = point
base_reference = reference
base_unit = unit
percent_result = paired_data.get('result', '')
percent_point = paired_data.get('point', '')
percent_reference = paired_data.get('reference', '')
percent_unit = paired_data.get('unit', '')
else:
# 当前项是百分比项,配对项是基础项
base_abb_name = paired_abb_actual # 原始配对ABB
percent_abb_name = abb # 原始ABB
percent_result = result
percent_point = point
percent_reference = reference
percent_unit = unit
base_result = paired_data.get('result', '')
base_point = paired_data.get('point', '')
base_reference = paired_data.get('reference', '')
base_unit = paired_data.get('unit', '')
# 获取AI解释使用基础项的信息
ai_explanation = get_ai_explanation(abb, name, result, api_key, gender=gender)
try:
# 使用配对表格(两行数据都填入)
insert_paired_items_table_with_both_data(
doc, insert_pos,
base_abb_name, percent_abb_name,
base_cn, percent_cn,
base_result, base_point, base_reference, base_unit,
percent_result, percent_point, percent_reference, percent_unit,
ai_explanation['en'], ai_explanation['cn'],
include_header=is_first_item # 只有模块第一个项目有表头
)
added_count += 1
insert_pos += 2
is_first_item = False
# 标记基础项和百分比项都已处理
processed_abbs.add(abb_upper)
processed_abbs.add(paired_abb.upper().strip())
print(f" ✓ 配对项目: {base_abb_name} + {percent_abb_name}")
continue
except Exception as e:
print(f" ✗ 添加配对项目 {abb} 失败: {e}")
else:
# 只有一个配对项目存在,使用普通表格
print(f" 配对项目 {abb} 的配对项 {paired_abb} 不存在,使用普通表格")
# 普通项目,创建单独表格
ai_explanation = get_ai_explanation(abb, name, result, api_key, gender=gender)
try:
insert_table_after_position(
doc, insert_pos, abb, name, result,
ai_explanation['en'], ai_explanation['cn'],
point=point, reference=reference, unit=unit,
include_header=is_first_item # 只有模块第一个项目有表头
)
added_count += 1
insert_pos += 2
is_first_item = False
processed_abbs.add(abb_upper)
except Exception as e:
print(f" ✗ 添加 {abb} 失败: {e}")
return insert_pos
return insert_pos
# 按标准顺序遍历模块
for module in standard_module_order:
if module not in by_module:
continue
items = by_module[module]
# 按标准项目顺序排序(标准项目在前,非标准项目在后)
sorted_items = sort_items_by_standard_order(items, module, abb_config)
# 找到该模块标题的位置
position = find_module_title_position(doc, module)
if position < 0:
# 找不到模块,跳过
skipped_modules.append((module, len(items)))
continue
print(f" 📍 模块 [{module}] 标题位置: {position}, 共 {len(sorted_items)} 个项目")
# 使用新的处理函数(支持配对项目)
process_module_items(doc, module, sorted_items, position, abb_to_info, abb_config, api_key, gender=detected_gender)
# 处理不在标准顺序中的模块
for module, items in by_module.items():
if module in standard_module_order:
continue # 已处理
sorted_items = sort_items_by_standard_order(items, module, abb_config)
position = find_module_title_position(doc, module)
if position < 0:
skipped_modules.append((module, len(items)))
continue
print(f" 📍 额外模块 [{module}] 标题位置: {position}, 共 {len(sorted_items)} 个项目")
# 使用新的处理函数(支持配对项目)
process_module_items(doc, module, sorted_items, position, abb_to_info, abb_config, api_key, gender=detected_gender)
if skipped_modules:
print(f"\n ⚠️ 跳过的模块(找不到标题):")
for mod, cnt in skipped_modules:
print(f" - {mod}: {cnt} 个项目")
if unclassified_items:
print(f"\n ⚠️ 无法分类的项目:")
for abb, data in unclassified_items:
print(f" - {abb}: {data.get('result', '')}")
print(f"\n✓ 已为 {added_count} 个项目创建单独表格")
# 使用安全保存
if output_path:
from xml_safe_save import safe_save
safe_save(doc, output_path, template_path)
print(f"✓ 保存到: {output_path}")
return doc
def fill_word_template(template_path: str, matched_data: dict, output_path: str, api_key: str = None, patient_info: dict = None):
"""
将匹配的数据填入Word模板兼容旧接口
Args:
template_path: Word模板路径
matched_data: 匹配的数据字典
output_path: 输出文件路径
api_key: DeepSeek API密钥可选
patient_info: 患者信息字典(可选)
"""
# 直接调用新版函数
return fill_word_template_new(template_path, matched_data, output_path, api_key, patient_info)
# 默认单位映射 - 用于补充OCR未识别的单位
default_units = {
# 血常规
'WBC': '10^9/L', 'RBC': '10^12/L', 'HB': 'g/L', 'HGB': 'g/L', 'HCT': '%',
'MCV': 'fL', 'MCH': 'pg', 'MCHC': 'g/L', 'RDW': '%', 'PLT': '10^9/L',
'NEUT': '%', 'LYMPH': '%', 'MONO': '%', 'EOS': '%', 'BAS': '%',
'NEUT#': '10^9/L', 'LYMPH#': '10^9/L', 'MONO#': '10^9/L', 'EOS#': '10^9/L', 'BAS#': '10^9/L',
# 肝功能
'ALT': 'U/L', 'AST': 'U/L', 'GGT': 'U/L', 'ALP': 'U/L', 'LDH': 'U/L',
'TBIL': 'μmol/L', 'DBIL': 'μmol/L', 'IBIL': 'μmol/L',
'TP': 'g/L', 'ALB': 'g/L', 'GLB': 'g/L',
# 肾功能
'BUN': 'mmol/L', 'CREA': 'μmol/L', 'UA': 'μmol/L', 'EGFR': 'mL/min/1.73m²',
# 血脂
'TC': 'mmol/L', 'TG': 'mmol/L', 'HDL': 'mmol/L', 'LDL': 'mmol/L',
'APOA1': 'g/L', 'APOB': 'g/L', 'LP(A)': 'mg/L',
# 电解质
'NA': 'mmol/L', 'K': 'mmol/L', 'CL': 'mmol/L', 'CA': 'mmol/L',
'P': 'mmol/L', 'MG': 'mmol/L', 'FE': 'μmol/L', 'ZN': 'μmol/L', 'CU': 'μmol/L',
'TCO2': 'mmol/L',
# 血糖
'GLU': 'mmol/L', 'HBA1C': '%', 'OGTT': 'mmol/L',
# 甲状腺
'TSH': 'mIU/L', 'FT3': 'pmol/L', 'FT4': 'pmol/L', 'T3': 'nmol/L', 'T4': 'nmol/L',
# 激素
'E2': 'pmol/L', 'PROG': 'nmol/L', 'TESTO': 'nmol/L', 'FSH': 'IU/L', 'LH': 'IU/L',
'PRL': 'mIU/L', 'CORTISOL': 'nmol/L', 'DHEA-S': 'μmol/L', 'IGF-1': 'ng/mL',
# 肿瘤标志物
'AFP': 'ng/mL', 'CEA': 'ng/mL', 'CA125': 'U/mL', 'CA153': 'U/mL', 'CA199': 'U/mL',
'PSA': 'ng/mL', 'NSE': 'ng/mL', 'CYFRA21-1': 'ng/mL',
# 凝血
'PT': 's', 'APTT': 's', 'TT': 's', 'FIB': 'g/L', 'D-DIMER': 'mg/L', 'INR': '',
# 炎症/免疫
'CRP': 'mg/L', 'HS-CRP': 'mg/L', 'RF': 'IU/mL', 'ESR': 'mm/h',
'IGG': 'g/L', 'IGA': 'g/L', 'IGM': 'g/L', 'IGE': 'IU/mL',
'C3': 'g/L', 'C4': 'g/L',
# 维生素
'VITB12': 'pmol/L', 'FOLATE': 'nmol/L', '25-OH-VITD': 'nmol/L',
# 尿常规 - 大部分定性无单位
'SG': '', 'PH': '', 'PRO': '', 'GLU': '', 'KET': '', 'BIL': '', 'NIT': '', 'URO': '', 'LEU': '',
# 血型
'ABO': '', 'RH': '',
# 传染病
'HBSAG': '', 'HBSAB': '', 'HBEAG': '', 'HBEAB': '', 'HBCAB': '',
'ANTI-HCV': '', 'HIV': '', 'RPR': '',
# 自身抗体
'ANA': '', 'ANTI-SM': '', 'ANTI-RNP': '',
# 同型半胱氨酸
'HCY': 'μmol/L',
# 骨代谢
'OSTE': 'ng/mL', 'P1NP': 'ng/mL', 'CTX': 'ng/mL', 'PTH': 'pg/mL',
}
# 默认参考范围映射 - 用于补充OCR未识别的参考范围
default_references = {
# 尿常规定性项目
'COLOR': 'Yellow', 'PRO': 'Negative', 'GLU': 'Negative', 'KET': 'Negative',
'BIL': 'Negative', 'NIT': 'Negative', 'URO': 'Normal', 'LEU': 'Negative',
'BLD': 'Negative', 'PH': '(4.5-8.0)', 'SG': '(1.003-1.030)',
# 传染病
'HBSAG': 'Negative', 'HBSAB': 'Negative/Positive', 'HBEAG': 'Negative',
'HBEAB': 'Negative', 'HBCAB': 'Negative', 'ANTI-HCV': 'Negative',
'HIV': 'Non-reactive', 'RPR': 'Non-reactive',
# 自身抗体
'ANA': 'Negative', 'ANTI-SM': 'Negative', 'ANTI-RNP': 'Negative', 'RF': 'Negative',
# 血型
'ABO': 'A/B/O/AB', 'RH': 'Positive/Negative',
}
# 表头关键词 - 用于识别真正的表头行
# 真正的表头行应该同时包含ABB+Project或ABB+Result等组合
header_core = ['abb', '简称'] # 表头行必须包含这些词之一
header_extra = ['project', '项目', 'result', '结果', 'refer', '参考', 'unit', '单位']
# Word模板ABB别名映射Word中的格式 -> 提取数据中的ABB
word_abb_aliases = {
# 肿瘤标志物
'CA15-3': 'CA153', 'CA19-9': 'CA199',
# 血型
'BLOOD TYPE': 'ABO', 'BLOOD TYPE RH': 'Rh', 'ABO': 'ABO', 'RH': 'Rh',
# 电解质
'CALCIUM': 'CA', 'MAGNESIUM': 'MG', 'CHLORIDE': 'CL', 'SODIUM': 'NA', 'KALIUM': 'K',
'PHOSPHORUS': 'P', 'NA': 'NA', 'K': 'K', 'CL': 'CL', 'P': 'P',
# 糖代谢
'HBA1C': 'HBA1C', 'FBS': 'GLU', 'FPG': 'FPG', 'EAG': 'EAG',
# 激素
'IGF1': 'IGF-1', 'DHEAS': 'DHEA-S', 'DHEA-S': 'DHEA-S', 'COR': 'CORTISOL', 'TESTO': 'TESTO',
# 尿检项目
'COLOR': 'COLOR', 'KET': 'KET', 'PRO': 'PRO', 'NIT': 'NIT',
'PH': 'PH', 'SG': 'SG', 'BLD/ERY': 'ERY', 'CLARITY': 'CLARITY', 'TUR': 'CLARITY',
'BIL': 'BIL', 'ERY': 'ERY', 'URO': 'URO', 'LEU': 'LEU',
# 血常规
'BAS': 'BAS', 'EOS': 'EOS', 'LYMPH': 'LYMPH', 'MONO': 'MONO', 'NEUT': 'NEUT',
'BAS%': 'BAS', 'EOS%': 'EOS', 'LYMPH%': 'LYMPH', 'MONO%': 'MONO', 'NEUT%': 'NEUT',
'RBC COUNT': 'RBC', 'WBC COUNT': 'WBC', 'TOTAL RBC': 'RBC',
'MCH': 'MCH', 'RDW': 'RDW', 'RBC': 'RBC', 'WBC': 'WBC',
# 免疫
'ANTI-SM': 'ANTI-SM', 'ANTI-RNP': 'ANTI-RNP', 'ANA': 'ANA', 'ASO': 'ASO',
'H. PYLORI IGG': 'H.PYLORI',
# 骨代谢
'25-OH-VD2+D3': '25-OH-VITD', '25-OH-VITD': '25-OH-VITD',
'Β - CTX': 'CTX', 'CTX': 'CTX', 'TPINP': 'P1NP', 'OST': 'OSTE',
# 同型半胱氨酸
'HOMOCYSTEINE': 'HCY', 'HCY': 'HCY',
# 凝血
'INR': 'INR', 'D - DIMER': 'D-DIMER', 'D-DIMER': 'D-DIMER',
'APTT': 'APTT', 'PT': 'PT', 'TT': 'TT',
# 肾功能
'SCR': 'CR', 'CR': 'CR', 'UA': 'UA', 'EGFR': 'EGFR',
# 肝功能
'DBIL': 'DBIL', 'IBIL': 'IBIL', 'ALB': 'ALB', 'GLB': 'GLB',
# 血脂
'TCO2': 'TCO2', 'AG': 'AG', 'VLDL': 'VLDL', 'LP(A)': 'LP(A)', 'LPA': 'LP(A)', 'APOB': 'APOB',
# 铁代谢
'FER': 'FERRITIN', 'FERRITIN': 'FERRITIN', 'FE': 'FE',
# 前列腺
'TPSA': 'PSA', 'PSA': 'PSA', 'FPSA': 'FPSA',
# 肿瘤
'CYFRA21-1': 'CYFRA21-1', 'NSE': 'NSE',
# 传染病
'HIV': 'HIV', 'RPR': 'RPR', 'ANTI-HCV': 'ANTI-HCV', 'SAPA': 'SAPA',
'TRUST': 'RPR', 'TPPA': 'TPPA',
# 微量元素
'MN': 'MN', 'NI': 'NI', 'MIB': 'MIB', 'CIB': 'CIB', 'ZN': 'ZN', 'CU': 'CU',
# 其他
'SEC': 'SEC', 'CRY': 'CRY', 'T4-TOTAL': 'T4', 'T4': 'T4',
}
# 遍历所有表格
for table_idx, table in enumerate(doc.tables):
for row_idx, row in enumerate(table.rows):
cells = row.cells
if len(cells) < 2:
continue
# 获取整行文本用于判断
row_text = ' '.join([c.text.strip().lower() for c in cells])
# 跳过表头行 - 必须同时包含ABB关键词和其他表头词
is_header = any(kw in row_text for kw in header_core) and any(kw in row_text for kw in header_extra)
if is_header:
continue
# 跳过标题行 (如 "Complete Blood Count 血常规")
if 'complete blood' in row_text or 'blood count' in row_text:
continue
if 'clinical significance' in row_text:
continue
if '临床意义' in row_text or '检测' in row_text:
continue
# 获取第一个单元格作为ABB
first_cell_text = cells[0].text.strip()
# 跳过空行
if not first_cell_text:
continue
# ABB应该是短字符串通常是大写字母组合
# 跳过太长的或包含中文的
if len(first_cell_text) > 20:
continue
if any('\u4e00' <= c <= '\u9fff' for c in first_cell_text):
# 第一列包含中文可能不是ABB列检查是否是数据行的其他格式
continue
abb = first_cell_text.upper()
# 通过别名映射转换Word模板中的ABB格式
lookup_abb = word_abb_aliases.get(abb, abb)
# 构建大小写不敏感的查找表
matched_data_upper = {k.upper(): v for k, v in matched_data.items()}
# 查找匹配的数据(大小写不敏感)
data = None
# 优先用别名转换后的ABB查找
if lookup_abb.upper() in matched_data_upper:
data = matched_data_upper[lookup_abb.upper()]
elif abb in matched_data_upper:
data = matched_data_upper[abb]
else:
# 尝试模糊匹配 - 处理带括号的情况如 "Hemoglobin(Hb)" 匹配 "HB"
for key in matched_data:
key_upper = key.upper()
if abb in key_upper.replace('(', ' ').replace(')', ' ').split():
data = matched_data[key]
break
if data:
# 找到匹配数据,标记为已填充(无论是否实际写入)
filled_abbs.add(lookup_abb.upper())
# 确定列索引 - 基于模板结构
# 列0: ABB, 列1-2: Project, 列3-4: Result, 列5-6: Point, 列7-8: Refer, 列9-10: Unit
try:
# 预处理修复OCR解析错误结果被放到unit字段的情况
result_val = data.get('result', '')
unit_val = data.get('unit', '')
# 如果result无效但unit包含颜色/定性结果则从unit提取
if result_val in ['', '.', '-', '/'] and unit_val:
# 检查unit是否包含颜色值
colors = ['yellow', 'amber', 'straw', 'colorless', 'red', 'brown', 'dark']
for color in colors:
if color in unit_val.lower():
result_val = color.capitalize()
unit_val = ''
break
# 填充Result (列3)
if result_val and result_val not in ['.', '-', '/'] and len(cells) > 3:
# 检查目标单元格是否为空或只包含占位符(包括模板变量{{xxx}}
current_text = cells[3].text.strip()
is_empty = not current_text or current_text in ['', '-', '/'] or current_text.startswith('{{')
if is_empty:
cells[3].text = str(result_val)
filled_count += 1
# 填充Point (列5)
if data.get('point') and len(cells) > 5:
current_text = cells[5].text.strip()
if not current_text or current_text in ['', '-', '/']:
cells[5].text = data['point']
# 填充Reference (列7) - 优先使用提取的参考范围,否则使用默认值
if len(cells) > 7:
current_text = cells[7].text.strip()
if not current_text or current_text in ['', '-', '/']:
ref = data.get('reference', '')
if not ref:
# 使用默认参考范围
ref = default_references.get(abb, '')
if ref:
cells[7].text = ref
# 填充Unit (列9) - 优先使用提取的单位,否则使用默认单位
if len(cells) > 9:
current_text = cells[9].text.strip()
if not current_text or current_text in ['', '-', '/']:
unit = data.get('unit', '')
# 检查unit是否有效排除混入的参考范围
if unit:
invalid_unit = (
len(unit) > 20 or # 单位不应该太长
'normal' in unit.lower() or
'[' in unit or ']' in unit or
'(' in unit or ')' in unit or
'-' in unit and any(c.isdigit() for c in unit) # 包含数字范围
)
if invalid_unit:
unit = ''
if not unit:
# 使用默认单位
unit = default_units.get(abb, '')
if unit:
cells[9].text = unit
except Exception as e:
print(f"Error filling {abb}: {e}")
pass
# 计算未填充的项目(大小写不敏感比较)
filled_abbs_upper = {a.upper() for a in filled_abbs}
unfilled_abbs = {k for k in matched_data.keys() if k.upper() not in filled_abbs_upper}
if unfilled_abbs:
print(f"\n 📋 发现 {len(unfilled_abbs)} 个未匹配项目,将添加到报告末尾")
add_missing_items_table(doc, unfilled_abbs, matched_data, api_key)
cleaned_count = 0
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
if placeholder_pattern.search(cell.text):
cell.text = placeholder_pattern.sub('', cell.text).strip()
cleaned_count += 1
if cleaned_count > 0:
print(f" 🧹 清理 {cleaned_count} 个占位符")
# 保存
doc.save(output_path)
print(f"\n✓ 已填充 {filled_count} 个数据项")
print(f"✓ 保存到: {output_path}")
return doc
# DeepSeek API配置优先从.env读取否则使用备用Key
DEEPSEEK_API_KEY = os.environ.get('DEEPSEEK_API_KEY', '') or "sk-a8653b2b866b4e26a0dea234a498b1fa"
DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions"
# DeepSeek缓存文件路径
DEEPSEEK_CACHE_FILE = Path(__file__).parent / "deepseek_cache.json"
_deepseek_cache = None # 内存缓存
def load_deepseek_cache():
"""加载DeepSeek缓存"""
global _deepseek_cache
if _deepseek_cache is not None:
return _deepseek_cache
if DEEPSEEK_CACHE_FILE.exists():
try:
with open(DEEPSEEK_CACHE_FILE, 'r', encoding='utf-8') as f:
_deepseek_cache = json.load(f)
except:
_deepseek_cache = {'classifications': {}, 'explanations': {}}
else:
_deepseek_cache = {'classifications': {}, 'explanations': {}}
return _deepseek_cache
def save_deepseek_cache():
"""保存DeepSeek缓存"""
global _deepseek_cache
if _deepseek_cache:
with open(DEEPSEEK_CACHE_FILE, 'w', encoding='utf-8') as f:
json.dump(_deepseek_cache, f, ensure_ascii=False, indent=2)
def translate_project_name_to_chinese(abb: str, project_name: str, api_key: str = None) -> str:
"""
将英文项目名称翻译为中文
Args:
abb: 项目缩写
project_name: 英文项目名称
api_key: DeepSeek API Key
Returns:
中文项目名称
"""
if not project_name or not api_key:
return project_name
# 检查缓存
cache = load_deepseek_cache()
if 'translations' not in cache:
cache['translations'] = {}
cache_key = f"{abb}:{project_name}"
if cache_key in cache['translations']:
return cache['translations'][cache_key]
# 调用DeepSeek翻译
prompt = f"""请将以下医学检测项目名称翻译为中文。只返回中文翻译,不要其他内容。
项目缩写: {abb}
英文名称: {project_name}
要求:
1. 使用标准医学术语
2. 简洁准确
3. 只返回中文名称,不要其他说明"""
try:
response = call_deepseek_api(prompt, api_key, max_tokens=100, timeout=30)
if response:
# 清理响应
cn_name = response.strip()
# 移除可能的引号和多余内容
cn_name = cn_name.strip('"\'')
if '\n' in cn_name:
cn_name = cn_name.split('\n')[0].strip()
# 保存到缓存
cache['translations'][cache_key] = cn_name
save_deepseek_cache()
return cn_name
except Exception as e:
print(f" ⚠️ 翻译 {abb} 失败: {e}")
return project_name
def enhance_data_with_deepseek(matched_data: dict, api_key: str) -> dict:
"""
使用DeepSeek智能补充数据
1. 为没有参考范围的项目补充参考范围(包括定性结果)
2. 判断没有point标记但可能异常的项目
Args:
matched_data: 匹配后的数据字典
api_key: DeepSeek API Key
Returns:
增强后的数据字典
"""
import json
# 收集需要处理的项目
items_need_reference = [] # 需要补充参考范围的项目
items_need_check = [] # 需要判断是否异常的项目
# 定性结果关键词
qualitative_keywords = ['negative', 'positive', 'non-reactive', 'reactive',
'normal', 'abnormal', '阴性', '阳性', '正常', '异常',
'clear', 'cloudy', 'yellow', 'amber', 'trace', 'nil']
for abb, data in matched_data.items():
result = data.get('result', '').strip()
reference = data.get('reference', '').strip()
point = data.get('point', '').strip()
unit = data.get('unit', '').strip()
project = data.get('project', abb)
# 检查是否是定性结果
is_qualitative = any(kw in result.lower() for kw in qualitative_keywords)
# 定性结果没有参考范围,需要补充
if is_qualitative and not reference:
items_need_reference.append({
'abb': abb,
'project': project,
'result': result,
'unit': unit,
'is_qualitative': True
})
continue
# 尝试解析数值结果
try:
# 处理可能的数值格式
result_clean = result.replace(',', '').replace(' ', '')
result_value = float(result_clean)
# 需要补充参考范围
if not reference:
items_need_reference.append({
'abb': abb,
'project': project,
'result': result,
'unit': unit,
'is_qualitative': False
})
# 有参考范围但没有point标记需要判断是否异常
if reference and not point:
items_need_check.append({
'abb': abb,
'project': project,
'result': result,
'reference': reference,
'unit': unit
})
except (ValueError, TypeError):
# 非数值结果且不是已知定性结果,也尝试补充参考范围
if not reference and result:
items_need_reference.append({
'abb': abb,
'project': project,
'result': result,
'unit': unit,
'is_qualitative': True
})
print(f" 需要补充参考范围: {len(items_need_reference)} 个项目")
print(f" 需要判断异常: {len(items_need_check)} 个项目")
# 1. 补充参考范围
if items_need_reference:
print(" 正在调用DeepSeek补充参考范围...")
items_desc = []
for item in items_need_reference[:30]: # 限制数量避免prompt过长
desc = f"- {item['abb']}: {item['project']}, 结果: {item['result']}"
if item['unit']:
desc += f" {item['unit']}"
if item.get('is_qualitative'):
desc += " (定性检测)"
items_desc.append(desc)
prompt = f"""你是一位医学检验专家。请为以下检测项目提供标准参考范围。
## 检测项目:
{chr(10).join(items_desc)}
## 要求:
1. 提供成人的标准参考范围
2. 数值型参考范围格式示例3.5-5.5、0-10、0-40
3. 定性检测的参考范围通常是Negative、Non-Reactive、Normal、Clear 等
4. 如果不确定,可以返回空字符串
5. 不要使用 < 或 > 符号,用具体范围表示,如 <5 改为 0-5
## 输出格式JSON
```json
{{
"ABB1": "参考范围",
"ABB2": "参考范围"
}}
```
只返回JSON不要其他说明。"""
try:
response = call_deepseek_api(prompt, api_key, max_tokens=1000, timeout=60)
if response:
# 解析JSON
if '```json' in response:
response = response.split('```json')[1].split('```')[0]
elif '```' in response:
response = response.split('```')[1].split('```')[0]
references = json.loads(response.strip())
updated_count = 0
for abb, ref in references.items():
# 尝试多种匹配方式
matched_key = None
if abb in matched_data:
matched_key = abb
elif abb.upper() in matched_data:
matched_key = abb.upper()
elif abb.lower() in matched_data:
matched_key = abb.lower()
if matched_key and ref:
matched_data[matched_key]['reference'] = ref
updated_count += 1
print(f" ✓ 已补充 {updated_count} 个项目的参考范围")
except Exception as e:
print(f" ⚠️ 补充参考范围失败: {e}")
# 2. 判断异常项目
if items_need_check:
print(" 正在调用DeepSeek判断异常项目...")
items_desc = []
for item in items_need_check[:30]: # 限制数量
desc = f"- {item['abb']}: {item['project']}, 结果: {item['result']}, 参考范围: {item['reference']}"
if item['unit']:
desc += f", 单位: {item['unit']}"
items_desc.append(desc)
prompt = f"""你是一位医学检验专家。请判断以下检测项目的结果是否异常。
## 检测项目:
{chr(10).join(items_desc)}
## 判断规则:
1. 如果结果超出参考范围上限,标记为 ""(偏高)
2. 如果结果低于参考范围下限,标记为 ""(偏低)
3. 如果结果在参考范围内,标记为 ""(正常,空字符串)
4. 参考范围格式可能是3.5-5.5、<10、>100、0-40 等
## 输出格式JSON
```json
{{
"ABB1": "",
"ABB2": "",
"ABB3": ""
}}
```
只返回JSON不要其他说明。"""
try:
response = call_deepseek_api(prompt, api_key, max_tokens=1000, timeout=60)
if response:
# 解析JSON
if '```json' in response:
response = response.split('```json')[1].split('```')[0]
elif '```' in response:
response = response.split('```')[1].split('```')[0]
abnormal_flags = json.loads(response.strip())
abnormal_count = 0
for abb, flag in abnormal_flags.items():
abb_upper = abb.upper()
if abb_upper in matched_data and flag in ['', '', 'H', 'L']:
matched_data[abb_upper]['point'] = flag
abnormal_count += 1
print(f"{abb_upper}: {flag}")
print(f" ✓ 发现 {abnormal_count} 个新异常项目")
except Exception as e:
print(f" ⚠️ 判断异常失败: {e}")
return matched_data
def call_deepseek_api(prompt: str, api_key: str = None, max_tokens: int = 2000, timeout: int = 120) -> str:
"""
调用DeepSeek API
"""
key = api_key or DEEPSEEK_API_KEY
if not key:
return None
headers = {
"Authorization": f"Bearer {key}",
"Content-Type": "application/json"
}
data = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
"max_tokens": max_tokens
}
try:
response = requests.post(DEEPSEEK_API_URL, headers=headers, json=data, timeout=timeout)
if response.status_code == 200:
return response.json()["choices"][0]["message"]["content"]
else:
print(f" ⚠ DeepSeek API错误: {response.status_code}")
return None
except Exception as e:
print(f" ⚠ DeepSeek请求失败: {e}")
return None
def classify_abb_module(abb: str, project_name: str, api_key: str = None) -> str:
"""
使用DeepSeek判断ABB项目属于哪个文字模块
"""
# 首先尝试基于ABB和项目名的规则匹配
abb_upper = abb.upper()
project_lower = project_name.lower()
# 预定义的ABB到模块映射
abb_module_map = {
# 尿检
'COLOR': 'Urine Detection', 'CLARITY': 'Urine Detection', 'SG': 'Urine Detection',
'PH': 'Urine Detection', 'PRO': 'Urine Detection', 'GLU': 'Urine Detection',
'KET': 'Urine Detection', 'NIT': 'Urine Detection', 'URO': 'Urine Detection',
'BIL': 'Urine Detection', 'LEU': 'Urine Detection', 'ERY': 'Urine Detection',
'BLD': 'Urine Detection', 'CRY': 'Urine Detection', 'BAC': 'Urine Detection',
# 血常规
'WBC': 'Complete Blood Count', 'RBC': 'Complete Blood Count', 'HB': 'Complete Blood Count',
'HGB': 'Complete Blood Count', 'HCT': 'Complete Blood Count', 'MCV': 'Complete Blood Count',
'MCH': 'Complete Blood Count', 'MCHC': 'Complete Blood Count', 'PLT': 'Complete Blood Count',
'RDW': 'Complete Blood Count', 'RDW-SD': 'Complete Blood Count', 'RDW-CV': 'Complete Blood Count',
'MPV': 'Complete Blood Count', 'PDW': 'Complete Blood Count', 'PCT': 'Complete Blood Count',
'P-LCR': 'Complete Blood Count',
'NEUT': 'Complete Blood Count', 'NEUT%': 'Complete Blood Count',
'LYMPH': 'Complete Blood Count', 'LYMPH%': 'Complete Blood Count',
'MONO': 'Complete Blood Count', 'MONO%': 'Complete Blood Count',
'EOS': 'Complete Blood Count', 'EOS%': 'Complete Blood Count',
'BAS': 'Complete Blood Count', 'BAS%': 'Complete Blood Count',
'ESR': 'Complete Blood Count',
# 肝功能
'ALT': 'Liver Function', 'AST': 'Liver Function', 'GGT': 'Liver Function',
'ALP': 'Liver Function', 'TBIL': 'Liver Function', 'DBIL': 'Liver Function',
'IBIL': 'Liver Function', 'TP': 'Liver Function', 'ALB': 'Liver Function',
'GLB': 'Liver Function', 'A/G': 'Liver Function', 'LDH': 'Liver Function',
'CHE': 'Liver Function', 'TF': 'Liver Function',
# 肾功能
'BUN': 'Kidney Function', 'CREA': 'Kidney Function', 'CR': 'Kidney Function',
'UA': 'Kidney Function', 'EGFR': 'Kidney Function', 'CYS-C': 'Kidney Function',
'CYSC': 'Kidney Function', 'Β2-MG': 'Kidney Function', 'B2-MG': 'Kidney Function',
# 血脂
'TC': 'Lipid Panel', 'TG': 'Lipid Panel', 'HDL': 'Lipid Panel', 'LDL': 'Lipid Panel',
'VLDL': 'Lipid Panel', 'APOA1': 'Lipid Panel', 'APOB': 'Lipid Panel', 'LP(A)': 'Lipid Panel',
'FFA': 'Lipid Panel',
# 电解质
'NA': 'Electrolytes', 'K': 'Electrolytes', 'CL': 'Electrolytes', 'CA': 'Electrolytes',
'P': 'Electrolytes', 'MG': 'Electrolytes', 'FE': 'Electrolytes', 'ZN': 'Electrolytes',
'CU': 'Electrolytes', 'TCO2': 'Electrolytes', 'AG': 'Electrolytes',
# 糖代谢
'FPG': 'Glucose', 'FBS': 'Glucose', 'HBA1C': 'Glucose', 'OGTT': 'Glucose', 'INS': 'Glucose',
'C-PEP': 'Glucose', 'EAG': 'Glucose',
# 甲状腺
'TSH': 'Thyroid', 'FT3': 'Thyroid', 'FT4': 'Thyroid', 'T3': 'Thyroid', 'T4': 'Thyroid',
'TG-AB': 'Thyroid', 'TGAB': 'Thyroid', 'TPO-AB': 'Thyroid',
# 激素
'E2': 'Hormone', 'PROG': 'Hormone', 'TESTO': 'Hormone', 'FSH': 'Hormone', 'LH': 'Hormone',
'PRL': 'Hormone', 'CORTISOL': 'Hormone', 'DHEA-S': 'Hormone', 'IGF-1': 'Hormone',
# 肿瘤标志物
'AFP': 'Tumor Markers', 'CEA': 'Tumor Markers', 'CA125': 'Tumor Markers',
'CA153': 'Tumor Markers', 'CA199': 'Tumor Markers', 'PSA': 'Tumor Markers',
'FPSA': 'Tumor Markers', 'TPSA': 'Tumor Markers', 'F/TPSA': 'Tumor Markers',
'NSE': 'Tumor Markers', 'CYFRA21-1': 'Tumor Markers',
'SCC': 'Tumor Markers', 'CA724': 'Tumor Markers', 'CA72-4': 'Tumor Markers',
'CA19-9': 'Tumor Markers', 'CA24-2': 'Tumor Markers', 'CA50': 'Tumor Markers',
'PROGRP': 'Tumor Markers',
# 凝血
'PT': 'Coagulation', 'APTT': 'Coagulation', 'TT': 'Coagulation', 'FIB': 'Coagulation',
'D-DIMER': 'Coagulation', 'INR': 'Coagulation', 'FDP': 'Coagulation',
# 传染病
'HBSAG': 'Infectious Disease', 'HBSAB': 'Infectious Disease', 'HBEAG': 'Infectious Disease',
'HBEAB': 'Infectious Disease', 'HBCAB': 'Infectious Disease', 'ANTI-HCV': 'Infectious Disease',
'HIV': 'Infectious Disease', 'RPR': 'Infectious Disease', 'TPPA': 'Infectious Disease',
'H.PYLORI': 'Infectious Disease',
# 免疫功能
'IGG': 'Immune Function', 'IGA': 'Immune Function', 'IGM': 'Immune Function',
'IGE': 'Immune Function', 'C3': 'Immune Function', 'C4': 'Immune Function',
'CRP': 'Immune Function', 'HS-CRP': 'Immune Function', 'RF': 'Immune Function',
'ANA': 'Immune Function', 'ANTI-SM': 'Immune Function', 'ANTI-RNP': 'Immune Function',
'ASO': 'Immune Function', 'NK': 'Immune Function',
# 骨代谢
'OSTE': 'Bone Metabolism', 'P1NP': 'Bone Metabolism', 'CTX': 'Bone Metabolism',
'PTH': 'Bone Metabolism', '25-OH-VITD': 'Bone Metabolism',
'25-OH-VD2+D3': 'Bone Metabolism', 'VD3': 'Bone Metabolism', 'VD2': 'Bone Metabolism',
'OST': 'Bone Metabolism',
# 重金属
'PB': 'Heavy Metals', 'MN': 'Heavy Metals', 'NI': 'Heavy Metals',
'CR': 'Heavy Metals', 'CD': 'Heavy Metals', 'HG': 'Heavy Metals',
# 维生素
'VITB12': 'Vitamin', 'FOLATE': 'Vitamin', 'VITD': 'Vitamin',
'VITA': 'Vitamin', 'VITE': 'Vitamin', 'VITK1': 'Vitamin',
'VITB1': 'Vitamin', 'VITB2': 'Vitamin', 'VITB3': 'Vitamin',
'VITB5': 'Vitamin', 'VITB6': 'Vitamin',
'FER': 'Vitamin', # 铁蛋白(贫血相关)
# 同型半胱氨酸
'HCY': 'Homocysteine',
# 血型
'ABO': 'Blood Type', 'RH': 'Blood Type',
}
# TG 歧义消解: 甲状腺球蛋白(Tg/Thyroid) vs 甘油三酯(TG/Lipid Panel)
if abb_upper == 'TG':
if '甲状腺' in project_lower or 'thyroglobulin' in project_lower:
return 'Thyroid'
# 其他情况默认为甘油三酯(Lipid Panel)
# 尝试规则匹配
if abb_upper in abb_module_map:
return abb_module_map[abb_upper]
# 基于项目名关键词匹配(英文+中文)
keyword_module = {
# 尿液检测
'urine': 'Urine Detection', 'urinary': 'Urine Detection',
'尿液': 'Urine Detection', '尿检': 'Urine Detection', '酸碱度': 'Urine Detection',
'浊度': 'Urine Detection', '隐血': 'Urine Detection', '亚硝酸盐': 'Urine Detection', '酮体': 'Urine Detection',
# 血常规
'blood cell': 'Complete Blood Count', 'hemoglobin': 'Complete Blood Count',
'platelet': 'Complete Blood Count', 'neutrophil': 'Complete Blood Count',
'中性粒细胞': 'Complete Blood Count', '淋巴细胞数量': 'Complete Blood Count',
'血红蛋白': 'Complete Blood Count', '血小板': 'Complete Blood Count',
'嗜酸': 'Complete Blood Count', '嗜碱': 'Complete Blood Count', '单核细胞': 'Complete Blood Count',
'红细胞': 'Complete Blood Count', '白细胞': 'Complete Blood Count',
# 肝功能
'liver': 'Liver Function', 'hepat': 'Liver Function', 'bilirubin': 'Liver Function',
'肝功能': 'Liver Function', '总蛋白': 'Liver Function', '白蛋白': 'Liver Function',
'球蛋白': 'Liver Function', '胆红素': 'Liver Function', '转氨酶': 'Liver Function',
'碱性磷酸酶': 'Liver Function', '谷氨酰': 'Liver Function',
# 肾功能
'kidney': 'Kidney Function', 'renal': 'Kidney Function', 'creatinine': 'Kidney Function',
'肾功能': 'Kidney Function', '肌酐': 'Kidney Function', '尿素氮': 'Kidney Function', '尿酸': 'Kidney Function',
# 血脂
'cholesterol': 'Lipid Panel', 'triglyceride': 'Lipid Panel', 'lipid': 'Lipid Panel',
'胆固醇': 'Lipid Panel', '甘油三酯': 'Lipid Panel', '脂蛋白': 'Lipid Panel', '血脂': 'Lipid Panel',
# 血糖
'glucose': 'Glucose', 'sugar': 'Glucose', 'hba1c': 'Glucose', 'insulin': 'Glucose',
'空腹血糖': 'Glucose', '糖化血红蛋白': 'Glucose', '血糖': 'Glucose',
# 甲状腺
'thyroid': 'Thyroid', 'tsh': 'Thyroid',
'甲状腺': 'Thyroid', '促甲状腺': 'Thyroid',
# 激素/荷尔蒙
'estrogen': 'Hormone', 'testosterone': 'Hormone', 'progesterone': 'Hormone',
'cortisol': 'Hormone', 'hormone': 'Hormone',
'雌二醇': 'Hormone', '孕酮': 'Hormone', '睾酮': 'Hormone', '催乳素': 'Hormone',
'皮质醇': 'Hormone', '荷尔蒙': 'Hormone', '促卵泡': 'Hormone', '促黄体': 'Hormone',
'脱氢表雄酮': 'Hormone', '生长因子': 'Hormone', '抗缪勒': 'Hormone',
# 肿瘤标志物
'tumor': 'Tumor Markers', 'cancer': 'Tumor Markers', 'antigen': 'Tumor Markers',
'肿瘤': 'Tumor Markers', '甲胎蛋白': 'Tumor Markers', '癌胚抗原': 'Tumor Markers',
'铁蛋白': 'Tumor Markers', '糖类抗原': 'Tumor Markers', '前列腺': 'Tumor Markers',
'鳞状细胞': 'Tumor Markers', '降钙素': 'Tumor Markers', '烯醇化酶': 'Tumor Markers',
# 凝血
'coagul': 'Coagulation', 'thrombin': 'Coagulation', 'fibrin': 'Coagulation',
'凝血': 'Coagulation', '纤维蛋白原': 'Coagulation',
# 传染病
'hepatitis': 'Infectious Disease', 'hiv': 'Infectious Disease', 'syphilis': 'Infectious Disease',
'乙肝': 'Infectious Disease', '丙肝': 'Infectious Disease', '梅毒': 'Infectious Disease',
'传染病': 'Infectious Disease', '免疫缺陷病毒': 'Infectious Disease',
# 免疫功能
'immun': 'Immune Function', 'antibod': 'Immune Function', 'complement': 'Immune Function',
'红细胞沉降': 'Immune Function', '免疫球蛋白': 'Immune Function', '补体': 'Immune Function',
'c反应蛋白': 'Immune Function', '抗链球菌': 'Immune Function', '抗核抗体': 'Immune Function',
'类风湿因子': 'Immune Function', '炎症': 'Immune Function',
# 骨代谢
'bone': 'Bone Metabolism', 'osteocalcin': 'Bone Metabolism',
'骨代谢': 'Bone Metabolism', '骨钙素': 'Bone Metabolism', '甲状旁腺': 'Bone Metabolism',
'维生素d': 'Bone Metabolism', '胶原': 'Bone Metabolism',
# 重金属/微量元素
'metal': 'Heavy Metals', 'lead': 'Heavy Metals', 'mercury': 'Heavy Metals',
'微量元素': 'Heavy Metals', '重金属': 'Heavy Metals',
# 维生素
'vitamin': 'Vitamin', 'folate': 'Vitamin', 'b12': 'Vitamin',
# 同型半胱氨酸
'homocysteine': 'Homocysteine',
'同型半胱氨酸': 'Homocysteine',
# 血型
'血型': 'Blood Type',
# 心肌酶
'肌酸激酶': 'Immune Function', '乳酸脱氢酶': 'Immune Function',
# 电解质
'电解质': 'Electrolytes', '': 'Electrolytes', '': 'Electrolytes', '': 'Electrolytes',
'': 'Electrolytes', '': 'Electrolytes', '': 'Electrolytes',
# 胃功能
'胃蛋白酶原': 'Immune Function', '胃泌素': 'Immune Function',
# 维生素
'维生素': 'Vitamin',
# 影像学
'影像': 'Other', '心电图': 'Other', 'b超': 'Other',
# 女性专项
'妇科': 'Other', '女性专项': 'Other',
}
# 按关键词长度降序匹配,确保长关键词优先(如 '糖化血红蛋白' 优先于 '血红蛋白'
for keyword, module in sorted(keyword_module.items(), key=lambda x: len(x[0]), reverse=True):
if keyword in project_lower:
return module
# 如果规则匹配失败检查缓存或调用DeepSeek API
cache = load_deepseek_cache()
cache_key = f"{abb}:{project_name}"
# 检查缓存
if cache_key in cache.get('classifications', {}):
return cache['classifications'][cache_key]
if api_key:
prompt = f"""请判断以下医学检测项目属于哪个检测模块,只返回模块名称(英文):
项目缩写: {abb}
项目名称: {project_name}
可选模块:
- Urine Detection尿液检测
- Complete Blood Count血常规
- Liver Function肝功能
- Kidney Function肾功能
- Lipid Panel血脂
- Electrolytes电解质
- Glucose糖代谢
- Thyroid甲状腺功能
- Hormone激素
- Tumor Markers肿瘤标志物
- Coagulation凝血功能
- Infectious Disease传染病
- Immune Function免疫功能
- Bone Metabolism骨代谢
- Heavy Metals重金属
- Vitamin维生素
- Other其他
只返回英文模块名称,不要其他内容。"""
result = call_deepseek_api(prompt, api_key, max_tokens=50)
if result:
result = result.strip()
# 验证返回的模块名是否有效
valid_modules = ['Urine Detection', 'Complete Blood Count', 'Liver Function',
'Kidney Function', 'Lipid Panel', 'Electrolytes', 'Glucose',
'Thyroid', 'Hormone', 'Tumor Markers', 'Coagulation',
'Infectious Disease', 'Immune Function', 'Bone Metabolism',
'Heavy Metals', 'Vitamin', 'Other']
for vm in valid_modules:
if vm.lower() in result.lower():
# 保存到缓存
cache['classifications'][cache_key] = vm
save_deepseek_cache()
return vm
return 'Other'
def get_ai_explanation(abb: str, project_name: str, result: str, api_key: str = None, gender: str = None) -> dict:
"""
获取临床意义解释
优先级1. 模板解释 -> 2. 缓存 -> 3. DeepSeek生成 -> 4. 通用模板
参数:
abb: 项目缩写
project_name: 项目名称
result: 检测结果
api_key: DeepSeek API密钥
gender: 性别 ('male''female'),用于 COR/Cortisol 的临床意义选择
"""
import json as json_module
from pathlib import Path
# ABB别名映射提取数据中的ABB -> 模板解释中的ABB
abb_aliases = {
'WBC': 'WBC COUNT',
'ABO': 'BLOOD TYPE',
'Rh': 'BLOOD TYPE RH',
'HCV': 'HCV-IGM',
'Scr': 'SCR',
'DBil': 'DBIL',
'TBil': 'TBIL',
'HbA1C': 'HBA1C',
'Hcy': 'HCY',
'Fer': 'FER',
'TgAb': 'TGAB',
'pH': 'PH',
'β-CTX': 'Β-CTX',
'Color': 'COLOR',
'Clarity': 'TUR',
'BIL': 'BIL', # 尿胆红素
'URO': 'URO', # 尿胆原
'ERY': 'BLD', # 尿红细胞/隐血
'IgA': 'IGA',
'IgE': 'IGE',
'IgG': 'IGG',
'IgM': 'IGM',
'Lp(a)': 'LP(A)',
'hs-CRP': 'hs-CRP',
# 电解质和微量元素(大小写映射)
'Cl': 'CL',
'Na': 'NA',
'Mg': 'MG',
'Ca': 'CA',
'K': 'K',
'P': 'P',
# 重金属(大小写映射)
'Pb': 'PB',
'Cr': 'CR',
'Hg': 'HG',
'Cd': 'CD',
'Mn': 'MN',
'Ni': 'NI',
'Zn': 'ZN',
'Cu': 'CU',
'Fe': 'FE',
# 其他
'CIB': 'CIB',
}
# 特殊处理 COR/Cortisol根据性别选择正确的临床意义
lookup_abb = abb
abb_upper = abb.upper().strip()
if abb_upper in ['COR', 'CORTISOL']:
if gender == 'male':
lookup_abb = 'CORTISOL' # 男性使用 CORTISOL 的临床意义
else:
lookup_abb = 'COR' # 女性使用 COR 的临床意义
# 应用别名映射
if lookup_abb in abb_aliases:
lookup_abb = abb_aliases[lookup_abb]
elif lookup_abb.upper() in abb_aliases:
lookup_abb = abb_aliases[lookup_abb.upper()]
# 1. 首先尝试从模板解释文件获取
template_explanations_file = Path(__file__).parent / "template_explanations.json"
if template_explanations_file.exists():
try:
with open(template_explanations_file, 'r', encoding='utf-8') as f:
template_explanations = json_module.load(f)
# 先尝试精确匹配处理大小写敏感的ABB如TG/Tg
abb_stripped = lookup_abb.strip()
if abb_stripped in template_explanations:
exp = template_explanations[abb_stripped]
if exp.get('clinical_en') and exp.get('clinical_cn'):
return {'en': exp['clinical_en'], 'cn': exp['clinical_cn']}
# 再尝试大写匹配
abb_upper_lookup = lookup_abb.upper().strip()
if abb_upper_lookup in template_explanations:
exp = template_explanations[abb_upper_lookup]
if exp.get('clinical_en') and exp.get('clinical_cn'):
return {'en': exp['clinical_en'], 'cn': exp['clinical_cn']}
# 去除特殊字符后匹配
abb_clean = ''.join(c for c in abb_upper_lookup if c.isalnum())
for key, value in template_explanations.items():
key_clean = ''.join(c for c in key.upper() if c.isalnum())
if abb_clean == key_clean:
if value.get('clinical_en') and value.get('clinical_cn'):
return {'en': value['clinical_en'], 'cn': value['clinical_cn']}
# 尝试原始ABB未经别名转换
if abb.strip() in template_explanations:
exp = template_explanations[abb.strip()]
if exp.get('clinical_en') and exp.get('clinical_cn'):
return {'en': exp['clinical_en'], 'cn': exp['clinical_cn']}
if abb.upper().strip() in template_explanations:
exp = template_explanations[abb.upper().strip()]
if exp.get('clinical_en') and exp.get('clinical_cn'):
return {'en': exp['clinical_en'], 'cn': exp['clinical_cn']}
except Exception as e:
pass # 静默失败,继续尝试其他方式
# 2. 检查缓存
cache = load_deepseek_cache()
cache_key = f"{abb}:{project_name}"
if cache_key in cache.get('explanations', {}):
return cache['explanations'][cache_key]
# 3. 如果有API密钥调用DeepSeek
if api_key:
prompt = f"""请为以下医学检测项目生成临床意义说明分别用英文和中文各一段每段50-80字
严格要求:
1. 只描述该检测项目是什么、测量什么、在医学上的意义
2. 禁止分析具体检测结果或数值
3. 禁止给出诊断建议、健康建议或治疗建议
4. 禁止使用"如果升高/降低则...""异常时..."等条件分析语句
5. 禁止使用"可能""也许""建议"等词汇
6. 使用客观、专业的医学术语,陈述事实
正确示例:
- "白细胞计数反映机体免疫系统状态,是评估感染和炎症的重要指标。"
- "血红蛋白是红细胞中携带氧气的蛋白质,反映血液的携氧能力。"
错误示例(禁止):
- "白细胞升高可能提示感染..."(禁止分析结果)
- "建议定期复查..."(禁止给建议)
项目缩写: {abb}
项目名称: {project_name}
请严格按照以下JSON格式返回不要其他内容
{{"en": "英文临床意义说明", "cn": "中文临床意义说明"}}"""
response = call_deepseek_api(prompt, api_key, max_tokens=500)
if response:
try:
# 尝试解析JSON
# 清理可能的markdown标记
clean_response = response.strip()
if '```json' in clean_response:
clean_response = clean_response.split('```json')[1].split('```')[0]
elif '```' in clean_response:
clean_response = clean_response.split('```')[1].split('```')[0]
data = json_module.loads(clean_response.strip())
if 'en' in data and 'cn' in data:
# 保存到缓存
cache['explanations'][cache_key] = data
save_deepseek_cache()
return data
except:
pass
# 4. 降级:使用预定义模板
templates = {
'WBC': {'en': 'White blood cell count reflects immune system status and is an important indicator for evaluating infection and inflammation.',
'cn': '白细胞计数反映机体免疫系统状态,是评估感染和炎症的重要指标。'},
'RBC': {'en': 'Red blood cell count reflects the oxygen-carrying capacity of blood and is used to evaluate anemia status.',
'cn': '红细胞计数反映血液的携氧能力,用于评估贫血状况。'},
'HB': {'en': 'Hemoglobin is the oxygen-carrying protein in red blood cells, reflecting the oxygen transport capacity of blood.',
'cn': '血红蛋白是红细胞中携带氧气的蛋白质,反映血液的携氧能力。'},
'PLT': {'en': 'Platelet count reflects the blood clotting function and hemostatic capacity.',
'cn': '血小板计数反映血液的凝血功能和止血能力。'},
'ALT': {'en': 'Alanine aminotransferase (ALT) is an enzyme primarily found in liver cells, reflecting liver cell integrity.',
'cn': '谷丙转氨酶ALT主要存在于肝细胞中反映肝细胞的完整性。'},
'AST': {'en': 'Aspartate aminotransferase (AST) is an enzyme found in liver and heart muscle cells, reflecting tissue integrity.',
'cn': '谷草转氨酶AST存在于肝脏和心肌细胞中反映组织的完整性。'},
'TC': {'en': 'Total cholesterol is a lipid component in blood, important for cardiovascular health assessment.',
'cn': '总胆固醇是血液中的脂质成分,对心血管健康评估具有重要意义。'},
'TG': {'en': 'Triglycerides are the main form of fat storage in the body, reflecting lipid metabolism status.',
'cn': '甘油三酯是体内脂肪储存的主要形式,反映脂质代谢状况。'},
'GLU': {'en': 'Blood glucose is the primary energy source for cells, essential for diabetes screening and metabolic assessment.',
'cn': '血糖是细胞的主要能量来源,是糖尿病筛查和代谢评估的重要指标。'},
'TSH': {'en': 'TSH level reflects thyroid function and helps diagnose thyroid disorders.',
'cn': 'TSH水平反映甲状腺功能有助于诊断甲状腺疾病。'},
}
if abb.upper() in templates:
return templates[abb.upper()]
# 通用模板
return {
"en": f"{project_name} ({abb}) is a medical test indicator used for health assessment and disease screening.",
"cn": f"{project_name}{abb})是一项医学检测指标,用于健康评估和疾病筛查。"
}
def find_module_end_position(doc, module_name):
"""
找到指定模块的最后一个表格位置
通过查找模块标题行来精确定位
返回该模块最后一个表格在doc.element.body中的索引
"""
# 模块标题的精确匹配(必须是标题行,不是普通数据)
module_titles = {
'Urine Detection': ['urine detection', '尿液检测'],
'Complete Blood Count': ['complete blood count', '血常规'],
'Heavy Metals': ['heavy metal', '重金属', 'trace element', '微量元素', 'microelement'],
'Infectious Disease': ['infectious disease', '传染病', 'hepatitis', '肝炎'],
'Kidney Function': ['kidney function', '肾功能'],
'Liver Function': ['liver function', '肝功能'],
'Lipid Panel': ['lipid panel', '血脂'],
'Thyroid': ['thyroid function', '甲状腺功能'],
'Hormone': ['hormone', '激素', 'female hormone', 'male hormone'],
'Tumor Markers': ['tumor marker', '肿瘤标志物'],
'Electrolytes': ['electrolyte', '电解质'],
'Glucose': ['glucose metabolism', '糖代谢'],
'Coagulation': ['coagulation', '凝血'],
'Immune Function': ['immune function', '免疫功能', 'humoral immunity', '体液免疫'],
'Bone Metabolism': ['bone metabolism', '骨代谢'],
}
titles = module_titles.get(module_name, [module_name.lower()])
body = doc.element.body
# 第一步:找到模块标题表格的索引
module_start_table_idx = -1
for i, table in enumerate(doc.tables):
# 检查第一行或第二行是否包含模块标题
for row_idx in range(min(2, len(table.rows))):
row_text = ' '.join([c.text.lower().strip() for c in table.rows[row_idx].cells])
# 标题行通常在整行都是相同的文字(合并单元格)
if any(title in row_text for title in titles):
module_start_table_idx = i
break
if module_start_table_idx >= 0:
break
if module_start_table_idx < 0:
return -1
# 第二步:找到下一个模块的起始位置(或文档末尾)
next_module_table_idx = len(doc.tables)
all_titles = []
for t_list in module_titles.values():
all_titles.extend(t_list)
for i in range(module_start_table_idx + 1, len(doc.tables)):
table = doc.tables[i]
for row_idx in range(min(2, len(table.rows))):
row_text = ' '.join([c.text.lower().strip() for c in table.rows[row_idx].cells])
# 检查是否是另一个模块的标题
if any(title in row_text and title not in titles for title in all_titles):
next_module_table_idx = i
break
if next_module_table_idx < len(doc.tables):
break
# 第三步找到该模块范围内最后一个表格在body中的位置
last_table_in_module = next_module_table_idx - 1
if last_table_in_module < module_start_table_idx:
last_table_in_module = module_start_table_idx
# 获取body中的位置
tbl_element = doc.tables[last_table_in_module]._tbl
for idx, child in enumerate(body):
if child is tbl_element:
return idx
return -1
def insert_table_after_position(doc, position, abb, project_name, result, clinical_en, clinical_cn,
point='', reference='', unit='', include_header=False):
"""
在指定位置后插入新表格(完全复刻模板样式)
格式(无表头时):
Row 0: ABB | Name | Result | Point | Refer | Unit - 数据行
Row 1: Clinical Significance (Merged) - 解释行
格式(有表头时):
Row 0: Header - Abb简称 | Project项目 | Result结果 | Point指示 | Refer参考 | Unit单位
Row 1: ABB | Name | Result | Point | Refer | Unit - 数据行
Row 2: Clinical Significance (Merged) - 解释行
"""
from lxml import etree
# 清理参考范围格式
reference = clean_reference_range(reference)
# 根据是否需要表头决定行数
num_rows = 3 if include_header else 2
table = doc.add_table(rows=num_rows, cols=6)
table.alignment = WD_TABLE_ALIGNMENT.CENTER
table.autofit = False
# 设置列宽
widths = [Cm(2.5), Cm(3.5), Cm(2.5), Cm(2.5), Cm(2.5), Cm(2.5)]
for row in table.rows:
for idx, width in enumerate(widths):
row.cells[idx].width = width
# 定义字体样式函数
def set_font(run, bold=False, font_size=10.5):
run.bold = bold
run.font.name = 'Times New Roman'
run.font.size = Pt(font_size)
run._element.rPr.rFonts.set(qn('w:eastAsia'), '宋体')
# 定义临床意义字体样式函数华文楷体11号字
def set_clinical_font(run, bold=False):
run.bold = bold
run.font.name = '华文楷体'
run.font.size = Pt(11)
run._element.rPr.rFonts.set(qn('w:eastAsia'), '华文楷体')
# 确定数据行和解释行的索引
if include_header:
# 有表头Row 0=表头, Row 1=数据, Row 2=解释
header_row_idx = 0
data_row_idx = 1
sig_row_idx = 2
# === 表头行 ===
row0 = table.rows[header_row_idx]
headers = [
('Abb', '简称'), ('Project', '项目'), ('Result', '结果'),
('Point', '提示'), ('Refer', '参考'), ('Unit', '单位')
]
for idx, (en, cn) in enumerate(headers):
p = row0.cells[idx].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(f'{en}\n{cn}')
set_font(run, bold=True, font_size=9)
else:
# 无表头Row 0=数据, Row 1=解释
data_row_idx = 0
sig_row_idx = 1
# === 数据行 ===
data_row = table.rows[data_row_idx]
# 1. ABB
p = data_row.cells[0].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(abb)
set_font(run, bold=True)
# 2. 项目名
p = data_row.cells[1].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(project_name)
set_font(run, bold=True)
# 3. 结果
p = data_row.cells[2].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(str(result))
set_font(run)
# 4. Point列
p = data_row.cells[3].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if point:
run = p.add_run(point)
set_font(run)
# 5. Refer列
p = data_row.cells[4].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if reference:
run = p.add_run(reference)
set_font(run)
# 6. Unit列
p = data_row.cells[5].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if unit:
run = p.add_run(unit)
set_font(run)
# === 临床意义行 ===
sig_row = table.rows[sig_row_idx]
top_cell = sig_row.cells[0]
for i in range(1, 6):
top_cell.merge(sig_row.cells[i])
# 第一个段落:英文临床意义
p = top_cell.paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.LEFT
run = p.add_run('Clinical Significance: ')
set_clinical_font(run, bold=True)
run = p.add_run(clinical_en)
set_clinical_font(run)
# 第二个段落:中文临床意义(独立段落,与案例文件格式一致)
p_cn = top_cell.add_paragraph()
p_cn.alignment = WD_ALIGN_PARAGRAPH.LEFT
run = p_cn.add_run('临床意义:')
set_clinical_font(run, bold=True)
run = p_cn.add_run(clinical_cn)
set_clinical_font(run)
# === 设置边框 ===
# 顶部实线 (黑色)
border_solid = {'val': 'single', 'sz': 4, 'color': '000000', 'space': 0}
# 其他虚线 (灰色)
border_dashed = {'val': 'dashed', 'sz': 4, 'color': 'AAAAAA', 'space': 0}
for i, row in enumerate(table.rows):
for cell in row.cells:
# 默认四周都是虚线
top = border_dashed
bottom = border_dashed
left = border_dashed
right = border_dashed
# 第一行顶部设置为实线
if i == 0:
top = border_solid
# 应用边框
set_cell_border(cell, top=top, bottom=bottom, left=left, right=right)
# 垂直居中
cell.vertical_alignment = 1
# 移动表格到指定位置
if position >= 0:
body = doc.element.body
tbl_element = table._tbl
# 从当前位置移除
body.remove(tbl_element)
# 插入到指定位置后
body.insert(position + 1, tbl_element)
# 添加分隔段落(表格后空一行)
if position >= 0:
from docx.oxml import OxmlElement
empty_p = OxmlElement('w:p')
body.insert(position + 2, empty_p)
return table
def insert_paired_items_table(doc, position,
abb, name_cn, result, clinical_en, clinical_cn,
point='', reference='', unit='',
include_header=False):
"""
在指定位置后插入配对项目表格(两行数据,共享临床意义)
例如EOS和EOS%显示在同一个表格中
格式(无表头时):
Row 0: ABB | Name_CN (基础项) | Result | Point | Reference | Unit
Row 1: ABB% | Name_CN (百分比项) | (空) | (空) | (空) | (空)
Row 2: Clinical Significance (Merged) - 解释行
格式(有表头时):
Row 0: Header
Row 1: ABB | Name_CN (基础项) | Result | Point | Reference | Unit
Row 2: ABB% | Name_CN (百分比项) | (空) | (空) | (空) | (空)
Row 3: Clinical Significance (Merged) - 解释行
注意数据只填入第一行基础项或百分比项取决于传入的是哪个第二行只显示ABB和名称
"""
from lxml import etree
# 获取配对信息
abb_upper = abb.upper().strip()
paired_abb, is_base, base_cn, percent_cn = get_paired_item(abb)
if not paired_abb:
# 不是配对项目,使用普通表格
return insert_table_after_position(doc, position, abb, name_cn, result,
clinical_en, clinical_cn,
point=point, reference=reference, unit=unit,
include_header=include_header)
# 确定基础项和百分比项的ABB和名称
# 数据填入传入的那一行
if is_base:
abb1 = abb_upper
abb2 = paired_abb
name1 = base_cn
name2 = percent_cn
# 数据在第一行
result1, point1, reference1, unit1 = result, point, reference, unit
result2, point2, reference2, unit2 = '', '', '', ''
else:
abb1 = paired_abb
abb2 = abb_upper
name1 = base_cn
name2 = percent_cn
# 数据在第二行
result1, point1, reference1, unit1 = '', '', '', ''
result2, point2, reference2, unit2 = result, point, reference, unit
# 根据是否需要表头决定行数
num_rows = 4 if include_header else 3
table = doc.add_table(rows=num_rows, cols=6)
table.alignment = WD_TABLE_ALIGNMENT.CENTER
table.autofit = False
# 设置列宽
widths = [Cm(2.5), Cm(3.5), Cm(2.5), Cm(2.5), Cm(2.5), Cm(2.5)]
for row in table.rows:
for idx, width in enumerate(widths):
row.cells[idx].width = width
# 定义字体样式函数
def set_font(run, bold=False, font_size=10.5):
run.bold = bold
run.font.name = 'Times New Roman'
run.font.size = Pt(font_size)
run._element.rPr.rFonts.set(qn('w:eastAsia'), '宋体')
# 定义临床意义字体样式函数华文楷体11号字
def set_clinical_font(run, bold=False):
run.bold = bold
run.font.name = '华文楷体'
run.font.size = Pt(11)
run._element.rPr.rFonts.set(qn('w:eastAsia'), '华文楷体')
# 确定行索引
if include_header:
header_row_idx = 0
data_row1_idx = 1
data_row2_idx = 2
sig_row_idx = 3
# === 表头行 ===
row0 = table.rows[header_row_idx]
headers = [
('Abb', '简称'), ('Project', '项目'), ('Result', '结果'),
('Point', '提示'), ('Refer', '参考'), ('Unit', '单位')
]
for idx, (en, cn) in enumerate(headers):
p = row0.cells[idx].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(f'{en}\n{cn}')
set_font(run, bold=True, font_size=9)
else:
data_row1_idx = 0
data_row2_idx = 1
sig_row_idx = 2
# === 数据行1 (基础项如EOS) ===
data_row1 = table.rows[data_row1_idx]
# 1. ABB1
p = data_row1.cells[0].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(abb1)
set_font(run, bold=True)
# 2. 项目名1 (中文名)
p = data_row1.cells[1].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(name1)
set_font(run, bold=True)
# 3. Result1
p = data_row1.cells[2].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if result1:
run = p.add_run(str(result1))
set_font(run)
# 4. Point1
p = data_row1.cells[3].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if point1:
run = p.add_run(str(point1))
set_font(run)
# 5. Reference1
p = data_row1.cells[4].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if reference1:
run = p.add_run(str(reference1))
set_font(run)
# 6. Unit1
p = data_row1.cells[5].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if unit1:
run = p.add_run(str(unit1))
set_font(run)
# === 数据行2 (百分比项如EOS%) ===
data_row2 = table.rows[data_row2_idx]
# 1. ABB2
p = data_row2.cells[0].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(abb2)
set_font(run, bold=True)
# 2. 项目名2 (中文名)
p = data_row2.cells[1].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(name2)
set_font(run, bold=True)
# 3. Result2
p = data_row2.cells[2].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if result2:
run = p.add_run(str(result2))
set_font(run)
# 4. Point2
p = data_row2.cells[3].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if point2:
run = p.add_run(str(point2))
set_font(run)
# 5. Reference2
p = data_row2.cells[4].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if reference2:
run = p.add_run(str(reference2))
set_font(run)
# 6. Unit2
p = data_row2.cells[5].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if unit2:
run = p.add_run(str(unit2))
set_font(run)
# === 临床意义行 ===
sig_row = table.rows[sig_row_idx]
top_cell = sig_row.cells[0]
for i in range(1, 6):
top_cell.merge(sig_row.cells[i])
# 第一个段落:英文临床意义
p = top_cell.paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.LEFT
run = p.add_run('Clinical Significance: ')
set_clinical_font(run, bold=True)
run = p.add_run(clinical_en)
set_clinical_font(run)
# 第二个段落:中文临床意义(独立段落,与案例文件格式一致)
p_cn = top_cell.add_paragraph()
p_cn.alignment = WD_ALIGN_PARAGRAPH.LEFT
run = p_cn.add_run('临床意义:')
set_clinical_font(run, bold=True)
run = p_cn.add_run(clinical_cn)
set_clinical_font(run)
# === 设置边框 ===
border_solid = {'val': 'single', 'sz': 4, 'color': '000000', 'space': 0}
border_dashed = {'val': 'dashed', 'sz': 4, 'color': 'AAAAAA', 'space': 0}
for i, row in enumerate(table.rows):
for cell in row.cells:
top = border_dashed
bottom = border_dashed
left = border_dashed
right = border_dashed
if i == 0:
top = border_solid
set_cell_border(cell, top=top, bottom=bottom, left=left, right=right)
cell.vertical_alignment = 1
# 移动表格到指定位置
if position >= 0:
body = doc.element.body
tbl_element = table._tbl
body.remove(tbl_element)
body.insert(position + 1, tbl_element)
# 添加分隔段落
if position >= 0:
from docx.oxml import OxmlElement
empty_p = OxmlElement('w:p')
body.insert(position + 2, empty_p)
return table
def insert_paired_items_table_with_both_data(doc, position,
base_abb, percent_abb,
base_cn, percent_cn,
base_result, base_point, base_reference, base_unit,
percent_result, percent_point, percent_reference, percent_unit,
clinical_en, clinical_cn,
include_header=False):
"""
插入配对项目表格,两行数据都填入
Row 0 (可选): 表头
Row 1: 基础项 ABB | 中文名 | Result | Point | Reference | Unit
Row 2: 百分比项 ABB | 中文名 | Result | Point | Reference | Unit
Row 3: Clinical Significance (合并单元格)
"""
from lxml import etree
# 清理参考范围格式
base_reference = clean_reference_range(base_reference)
percent_reference = clean_reference_range(percent_reference)
# 根据是否需要表头决定行数
num_rows = 4 if include_header else 3
table = doc.add_table(rows=num_rows, cols=6)
table.alignment = WD_TABLE_ALIGNMENT.CENTER
table.autofit = False
# 设置列宽
widths = [Cm(2.5), Cm(3.5), Cm(2.5), Cm(2.5), Cm(2.5), Cm(2.5)]
for row in table.rows:
for idx, width in enumerate(widths):
row.cells[idx].width = width
# 定义字体样式函数
def set_font(run, bold=False, font_size=10.5):
run.bold = bold
run.font.name = 'Times New Roman'
run.font.size = Pt(font_size)
run._element.rPr.rFonts.set(qn('w:eastAsia'), '宋体')
# 定义临床意义字体样式函数华文楷体11号字
def set_clinical_font(run, bold=False):
run.bold = bold
run.font.name = '华文楷体'
run.font.size = Pt(11)
run._element.rPr.rFonts.set(qn('w:eastAsia'), '华文楷体')
# 确定行索引
if include_header:
header_row_idx = 0
data_row1_idx = 1
data_row2_idx = 2
sig_row_idx = 3
# === 表头行 ===
row0 = table.rows[header_row_idx]
headers = [
('Abb', '简称'), ('Project', '项目'), ('Result', '结果'),
('Point', '提示'), ('Refer', '参考'), ('Unit', '单位')
]
for idx, (en, cn) in enumerate(headers):
p = row0.cells[idx].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(f'{en}\n{cn}')
set_font(run, bold=True, font_size=9)
else:
data_row1_idx = 0
data_row2_idx = 1
sig_row_idx = 2
# === 数据行1 (基础项) ===
data_row1 = table.rows[data_row1_idx]
# 1. ABB
p = data_row1.cells[0].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(base_abb)
set_font(run, bold=True)
# 2. 项目名 (中文名)
p = data_row1.cells[1].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(base_cn)
set_font(run, bold=True)
# 3. Result
p = data_row1.cells[2].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if base_result:
run = p.add_run(str(base_result))
set_font(run)
# 4. Point
p = data_row1.cells[3].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if base_point:
run = p.add_run(str(base_point))
set_font(run)
# 5. Reference
p = data_row1.cells[4].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if base_reference:
run = p.add_run(str(base_reference))
set_font(run)
# 6. Unit
p = data_row1.cells[5].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if base_unit:
run = p.add_run(str(base_unit))
set_font(run)
# === 数据行2 (百分比项) ===
data_row2 = table.rows[data_row2_idx]
# 1. ABB
p = data_row2.cells[0].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(percent_abb)
set_font(run, bold=True)
# 2. 项目名 (中文名)
p = data_row2.cells[1].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(percent_cn)
set_font(run, bold=True)
# 3. Result
p = data_row2.cells[2].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if percent_result:
run = p.add_run(str(percent_result))
set_font(run)
# 4. Point
p = data_row2.cells[3].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if percent_point:
run = p.add_run(str(percent_point))
set_font(run)
# 5. Reference
p = data_row2.cells[4].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if percent_reference:
run = p.add_run(str(percent_reference))
set_font(run)
# 6. Unit
p = data_row2.cells[5].paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
if percent_unit:
run = p.add_run(str(percent_unit))
set_font(run)
# === 临床意义行 ===
sig_row = table.rows[sig_row_idx]
top_cell = sig_row.cells[0]
for i in range(1, 6):
top_cell.merge(sig_row.cells[i])
# 第一个段落:英文临床意义
p = top_cell.paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.LEFT
run = p.add_run('Clinical Significance: ')
set_clinical_font(run, bold=True)
run = p.add_run(clinical_en)
set_clinical_font(run)
# 第二个段落:中文临床意义(独立段落,与案例文件格式一致)
p_cn = top_cell.add_paragraph()
p_cn.alignment = WD_ALIGN_PARAGRAPH.LEFT
run = p_cn.add_run('临床意义:')
set_clinical_font(run, bold=True)
run = p_cn.add_run(clinical_cn)
set_clinical_font(run)
# === 设置边框 ===
border_solid = {'val': 'single', 'sz': 4, 'color': '000000', 'space': 0}
border_dashed = {'val': 'dashed', 'sz': 4, 'color': 'AAAAAA', 'space': 0}
for i, row in enumerate(table.rows):
for cell in row.cells:
top = border_dashed
bottom = border_dashed
left = border_dashed
right = border_dashed
if i == 0:
top = border_solid
set_cell_border(cell, top=top, bottom=bottom, left=left, right=right)
cell.vertical_alignment = 1
# 移动表格到指定位置
if position >= 0:
body = doc.element.body
tbl_element = table._tbl
body.remove(tbl_element)
body.insert(position + 1, tbl_element)
# 添加分隔段落
if position >= 0:
from docx.oxml import OxmlElement
empty_p = OxmlElement('w:p')
body.insert(position + 2, empty_p)
return table
def add_missing_items_table(doc, unfilled_abbs, matched_data, api_key=None):
"""
添加缺失项目到对应模块尾部
流程:
1. 先用DeepSeek分析所有缺失项目属于哪个模块
2. 按标准模块顺序处理,在对应模块尾部添加表格
3. 然后调用DeepSeek生成Clinical Significance解释
"""
if not unfilled_abbs:
print("\n ✓ 没有缺失项目需要添加")
return
# 加载配置获取模块信息和标准顺序
from config import load_abb_config, get_standard_module_order, sort_items_by_standard_order, normalize_abb, normalize_module_name
abb_config = load_abb_config()
abb_to_module = abb_config.get('abb_to_module', {})
abb_to_info = abb_config.get('abb_to_info', {})
standard_module_order = get_standard_module_order()
print(f"\n 📋 开始处理 {len(unfilled_abbs)} 个缺失项目...")
# ===== 第一步使用DeepSeek分析所有缺失项目属于哪个模块 =====
print("\n 🔍 步骤1: 分析缺失项目所属模块...")
by_module = {} # {module: [(abb, data), ...]}
items_to_classify = [] # 需要调用DeepSeek分类的项目
for abb in unfilled_abbs:
data = matched_data.get(abb, {})
result = data.get('result', '')
if not result:
continue
project_name = data.get('project', abb)
# 标准化ABB名称
normalized_abb = normalize_abb(abb, abb_config)
# 优先使用配置中的模块(先精确匹配,再大写匹配)
module = abb_to_module.get(normalized_abb, '')
if not module:
module = abb_to_module.get(abb, '')
if not module:
module = abb_to_module.get(normalized_abb.upper(), '')
if not module:
module = abb_to_module.get(abb.upper(), '')
if module:
if module not in by_module:
by_module[module] = []
by_module[module].append((abb, data))
print(f"{abb} → [{module}] (配置文件)")
else:
# 需要DeepSeek分类
items_to_classify.append((abb, data, project_name))
# 批量调用DeepSeek分类
if items_to_classify:
print(f"\n 🤖 调用DeepSeek分类 {len(items_to_classify)} 个未知项目...")
for abb, data, project_name in items_to_classify:
module = classify_abb_module(abb, project_name, api_key)
# 标准化模块名称
original_module = module
module = normalize_module_name(module, abb_config)
if original_module != module:
print(f"{abb} → [{original_module}] → [{module}] (DeepSeek)")
else:
print(f"{abb} → [{module}] (DeepSeek)")
if module not in by_module:
by_module[module] = []
by_module[module].append((abb, data))
# 打印分组结果
print(f"\n 📊 分组结果:")
for module in standard_module_order:
if module in by_module:
items = by_module[module]
print(f" [{module}]: {len(items)} 个项目 - {[i[0] for i in items]}")
# 打印不在标准顺序中的模块
for module, items in by_module.items():
if module not in standard_module_order:
print(f" [{module}] (额外): {len(items)} 个项目 - {[i[0] for i in items]}")
# ===== 第二步:按标准模块顺序添加表格 =====
print(f"\n 📝 步骤2: 按标准顺序在对应模块尾部添加表格...")
# 找到每个模块的标题位置
module_positions = {}
skipped_modules = []
for module in by_module.keys():
pos = find_module_title_position(doc, module)
if pos < 0:
skipped_modules.append(module)
print(f" ⚠️ 模块 [{module}] 找不到标题位置,将跳过")
else:
module_positions[module] = pos
print(f" 📍 模块 [{module}] 标题位置: {pos}")
# 为每个模块的每个ABB创建表格
added_items = []
added_count = 0
# 按标准顺序处理模块
for module in standard_module_order:
if module not in by_module or module in skipped_modules:
continue
items = by_module[module]
position = module_positions.get(module, -1)
if position < 0:
continue
# 按标准项目顺序排序
sorted_items = sort_items_by_standard_order(items, module, abb_config)
print(f"\n 📁 处理模块 [{module}] ({len(sorted_items)} 个项目)...")
insert_pos = position
for abb, data in sorted_items:
result = data.get('result', '')
point = data.get('point', '')
reference = data.get('reference', '')
unit = data.get('unit', '')
normalized_abb = normalize_abb(abb, abb_config)
info = abb_to_info.get(normalized_abb, {})
if not info:
info = abb_to_info.get(abb, {})
if not info:
info = abb_to_info.get(normalized_abb.upper(), {})
if not info:
info = abb_to_info.get(abb.upper(), {})
# 优先使用配置文件中的中文名称其次使用data中的project_cn
name = info.get('project_cn') or data.get('project_cn')
# 如果没有中文名称调用DeepSeek翻译
if not name:
english_name = info.get('project') or data.get('project', abb)
name = translate_project_name_to_chinese(abb, english_name, api_key)
# 先用占位符创建表格
placeholder_en = "[Generating clinical significance...]"
placeholder_cn = "[正在生成临床意义...]"
try:
insert_table_after_position(
doc, insert_pos, abb, name, result,
placeholder_en, placeholder_cn,
point=point, reference=reference, unit=unit,
include_header=False
)
print(f" ✓ 添加表格: {abb} ({name}) = {result}")
added_items.append((abb, name, result))
added_count += 1
insert_pos += 2
except Exception as e:
print(f" ✗ 添加 {abb} 失败: {e}")
# 处理不在标准顺序中的模块
for module, items in by_module.items():
if module in standard_module_order or module in skipped_modules:
continue
position = module_positions.get(module, -1)
if position < 0:
continue
sorted_items = sort_items_by_standard_order(items, module, abb_config)
print(f"\n 📁 处理额外模块 [{module}] ({len(sorted_items)} 个项目)...")
insert_pos = position
for abb, data in sorted_items:
result = data.get('result', '')
point = data.get('point', '')
reference = data.get('reference', '')
unit = data.get('unit', '')
normalized_abb = normalize_abb(abb, abb_config)
info = abb_to_info.get(normalized_abb, {})
if not info:
info = abb_to_info.get(abb, {})
if not info:
info = abb_to_info.get(normalized_abb.upper(), {})
if not info:
info = abb_to_info.get(abb.upper(), {})
# 优先使用配置文件中的中文名称其次使用data中的project_cn
name = info.get('project_cn') or data.get('project_cn')
# 如果没有中文名称调用DeepSeek翻译
if not name:
english_name = info.get('project') or data.get('project', abb)
name = translate_project_name_to_chinese(abb, english_name, api_key)
placeholder_en = "[Generating clinical significance...]"
placeholder_cn = "[正在生成临床意义...]"
try:
insert_table_after_position(
doc, insert_pos, abb, name, result,
placeholder_en, placeholder_cn,
point=point, reference=reference, unit=unit,
include_header=False
)
print(f" ✓ 添加表格: {abb} ({name}) = {result}")
added_items.append((abb, name, result))
added_count += 1
insert_pos += 2
except Exception as e:
print(f" ✗ 添加 {abb} 失败: {e}")
print(f"\n ✓ 已添加 {added_count} 个表格")
# ===== 第三步调用DeepSeek生成Clinical Significance解释 =====
if added_items and api_key:
print(f"\n 🤖 步骤3: 调用DeepSeek生成Clinical Significance解释...")
# 遍历文档中的表格找到占位符并替换为AI解释
for abb, name, result in added_items:
print(f" 🤖 生成 {abb} 的临床意义解释...")
ai_explanation = get_ai_explanation(abb, name, result, api_key)
# 在文档中找到该ABB的表格并更新解释
for table in doc.tables:
for row in table.rows:
cells = row.cells
if len(cells) > 0:
first_cell_text = cells[0].text.strip().upper()
if first_cell_text == abb.upper():
# 找到匹配的ABB查找下一行的Clinical Significance
row_idx = list(table.rows).index(row)
if row_idx + 1 < len(table.rows):
sig_row = table.rows[row_idx + 1]
sig_cell = sig_row.cells[0]
if 'Generating' in sig_cell.text or '正在生成' in sig_cell.text:
# 替换占位符
sig_cell.text = ''
p = sig_cell.paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.LEFT
def set_font(run, bold=False, font_size=9):
run.bold = bold
run.font.name = 'Times New Roman'
run.font.size = Pt(font_size)
run._element.rPr.rFonts.set(qn('w:eastAsia'), '宋体')
run = p.add_run('Clinical Significance: ')
set_font(run, bold=True)
run = p.add_run(ai_explanation['en'])
set_font(run)
run = p.add_run('\n')
run = p.add_run('临床意义:')
set_font(run, bold=True)
run = p.add_run(ai_explanation['cn'])
set_font(run)
print(f" ✓ 已更新 {abb} 的解释")
break
print(f"\n ✅ 缺失项目处理完成,共添加 {added_count} 个项目")
def clean_empty_rows(doc_path: str, output_path: str, patient_info: dict = None):
"""清理空白数据行,并将数据表格合并到表头下
规则:
1. 删除空数据行ABB有内容但Result为空
2. 如果表头下只有描述没有数据,删除描述,将下方数据表格内容移上来
重要:跳过保护区域(前四页)和"客户功能医学检测档案"区域的所有表格
Args:
doc_path: 文档路径
output_path: 输出路径
patient_info: 患者信息字典包含gender字段从OCR文本提取用于模块清理
"""
from docx import Document
from lxml import etree
import re
import copy
from xml_safe_save import safe_save
template_path = Path(__file__).parent / "template_complete.docx"
doc = Document(doc_path)
# 获取保护边界位置
protection_boundary = find_health_program_boundary(doc)
print(f" [保护] 清理空行时跳过前 {protection_boundary} 个元素")
# 获取"客户功能医学检测档案"区域位置
exam_file_start, exam_file_end = find_examination_file_region(doc)
if exam_file_start >= 0:
print(f" [保护] 清理空行时跳过'客户功能医学检测档案'区域: {exam_file_start}-{exam_file_end}")
def is_in_protected_region(idx):
"""检查索引是否在保护区域内"""
# 检查是否在前四页保护区域内
if idx < protection_boundary:
return True
# 检查是否在"客户功能医学检测档案"区域内
if exam_file_start >= 0 and exam_file_start <= idx < exam_file_end:
return True
return False
# 构建保护区域内的表格集合(包括前四页和"客户功能医学检测档案"区域)
body = doc.element.body
body_children = list(body)
protected_tables = set()
for i, elem in enumerate(body_children):
if is_in_protected_region(i):
if elem.tag.endswith('}tbl'):
for t in doc.tables:
if t._tbl is elem:
protected_tables.add(id(t))
break
print(f" [保护] 保护区域内有 {len(protected_tables)} 个表格将被跳过")
removed_rows = 0
merged_count = 0
def has_data_in_row(cells):
"""检查行是否有有效数据(只以 Result 列判断,避免 Refer 范围数字误判)"""
valid_qualitative = [
'negative', 'positive', 'normal', 'reactive', 'non-reactive',
'a', 'b', 'ab', 'o', # 血型
'yellow', 'amber', 'straw', 'colorless', 'red', 'brown', 'dark', 'clear' # 颜色
]
# 模板结构通常为:
# - 11列0 ABB, 1-2 Project, 3-4 Result, 5-6 Point, 7-8 Refer, 9-10 Unit
# - 6列0 ABB, 1 Project, 2 Result, 3 Point, 4 Refer, 5 Unit
if len(cells) >= 11:
result_col_candidates = [3, 4]
elif len(cells) >= 6:
result_col_candidates = [2, 3]
else:
result_col_candidates = [2]
result_candidates = []
for col_idx in result_col_candidates:
if col_idx < len(cells):
txt = (cells[col_idx].text or '').strip()
if txt:
result_candidates.append(txt)
result_text = result_candidates[0] if result_candidates else ''
if not result_text:
return False
if result_text in ['', '-', '/', ' ', '.', ':', '{{', '}}']:
return False
if result_text.startswith('{{'):
return False
# 排除“范围值”形态(常出现在 Refer 列,但模板错位时也可能落到 Result/Point 列)
if re.match(r'^[\(\[]?\s*[-+]?\d+(?:\.\d+)?\s*[-~]\s*[-+]?\d+(?:\.\d+)?\s*[\)\]]?$', result_text):
return False
if re.search(r'\d', result_text):
return True
if result_text.lower() in valid_qualitative:
return True
return False
def is_header_row(row_text, cells=None):
"""精确识别表头行"""
# 先排除描述行,避免被误判为表头
if 'clinical significance' in row_text or '临床意义' in row_text:
return False
# 表头必须具备“Abb/简称 + Project/项目 + Result/结果”组合特征
has_abb = ('abb' in row_text) or ('简称' in row_text)
has_project = ('project' in row_text) or ('项目' in row_text)
has_result = ('result' in row_text) or ('结果' in row_text)
if not (has_abb and has_project and has_result):
return False
# 如果提供了cells进行更严格的检查
if cells:
# 表头行通常有多个列且每个单元格内容较短
non_empty_cells = [c for c in cells if c.text.strip()]
if len(non_empty_cells) < 2:
return False
# 表头单元格内容通常较短(<30字符
if any(len(c.text.strip()) > 30 for c in cells):
return False
return True
def is_title_row(row_text, cells=None):
"""识别标题行(如 Blood Type 血型, Four Infectious Diseases 传染病四项)"""
# 先排除描述行,避免解释行误判为标题
if 'clinical significance' in row_text or '临床意义' in row_text:
return False
# 常见标题关键词 - 包含所有24个标准模块的关键词
title_keywords = [
# 英文关键词
'blood count', 'blood type', 'blood sugar', 'blood coagulation',
'function', 'profile', 'panel', 'test', 'detection',
'examination', 'analysis', 'screening', 'marker', 'hormone',
'infectious', 'disease', 'immunoglobulin', 'complement', 'lipid',
'electrolyte', 'coagulation', 'metabolism', 'microelement', 'trace element',
'lymphocyte', 'humoral', 'immunity', 'inflammatory', 'autoantibody',
'thromboembolism', 'imaging', 'gynecological', 'female-specific',
'myocardial', 'enzyme', 'cardiac', # 心肌酶谱相关关键词
# 中文关键词
'血常规', '血型', '血糖', '凝血', '肝功能', '肾功能', '血脂', '甲状腺',
'检查', '检测', '传染病', '电解质', '骨代谢', '微量元素', '重金属',
'淋巴细胞', '体液免疫', '免疫功能', '炎症', '自身抗体', '心脑血管',
'影像', '妇科', '女性专项', '肿瘤标记物', '肿瘤标志物', '荷尔蒙',
'心肌酶', '心肌酶谱' # 心肌酶谱中文关键词
]
if any(kw in row_text for kw in title_keywords):
if cells:
# 获取所有非空单元格的内容
non_empty_texts = [c.text.strip() for c in cells if c.text.strip()]
# 去重后的内容数量(合并单元格会有相同内容)
unique_texts = set(non_empty_texts)
# 标题行特征去重后只有1-2种不同内容或者只有少量非空单元格
if len(unique_texts) <= 2 or len(non_empty_texts) <= 2:
return True
else:
return True
return False
def is_description_row(row_text):
return 'clinical significance' in row_text or '临床意义' in row_text
def is_data_row(first_cell):
if first_cell and 2 <= len(first_cell) <= 15:
clean = first_cell.replace('-', '').replace('/', '').replace('%', '').replace('(', '').replace(')', '').replace(' ', '')
return clean and clean.replace('.', '').isalnum()
return False
def is_special_table(table):
"""检查是否是自动生成的特殊格式表格(防止被合并)
特殊表格特征:
1. 2-4行
2. 最后一行包含 "Clinical Significance""临床意义"
3. 第一行不是模块标题(不包含重复的模块名称)
"""
rows = len(table.rows)
if rows < 2 or rows > 4:
return False
try:
# 检查最后一行是否包含临床意义
last_row_text = ' '.join([c.text for c in table.rows[-1].cells]).lower()
if 'clinical significance' not in last_row_text and '临床意义' not in last_row_text:
return False
# 检查第一行是否是模块标题(模块标题表格不是特殊表格)
first_row_text = ' '.join([c.text for c in table.rows[0].cells]).lower()
# 模块标题特征:同一个文本重复多次
first_cell = table.rows[0].cells[0].text.strip()
if first_cell and len(first_cell) > 3:
# 检查是否所有单元格都包含相同的文本
all_same = all(first_cell in c.text for c in table.rows[0].cells)
if all_same:
return False # 这是模块标题表格,不是特殊表格
return True
except:
pass
return False
def analyze_table(table):
"""分析表格结构"""
info = {'header_idx': -1, 'title_idx': -1, 'desc_indices': [],
'data_with_result': [], 'data_without_result': [],
'is_special': is_special_table(table)}
for row_idx, row in enumerate(table.rows):
cells = row.cells
if len(cells) < 2:
continue
row_text = ' '.join([c.text.strip().lower() for c in cells])
first_cell = cells[0].text.strip()
if is_header_row(row_text, cells):
info['header_idx'] = row_idx
elif is_title_row(row_text, cells):
info['title_idx'] = row_idx
elif is_description_row(row_text):
info['desc_indices'].append(row_idx)
elif is_data_row(first_cell):
if has_data_in_row(cells):
info['data_with_result'].append(row_idx)
else:
info['data_without_result'].append(row_idx)
return info
def special_table_has_data(table):
"""特殊表格是否有有效结果。
支持多种结构:
1. 普通项目表格2-3行cells[0]=ABB, cells[1]=项目名, cells[2]=Result
2. 配对项目表格3-4行两个数据行项目名 + Result共享临床意义
注意配对表格的ABB列cells[0]可能为空项目名在cells[1]
3. 11列表格模板cells[0]=ABB, cells[1]=项目名, cells[2]可能是项目名重复
若所有数据行都没有有效内容,则认为该表格应被删除。
"""
try:
rows = len(table.rows)
if rows < 2:
return False
# 检查是否有任何有效的数据行
has_valid_data = False
for ri in range(rows):
cells = table.rows[ri].cells
if len(cells) < 2:
continue
first_cell = (cells[0].text or '').strip()
second_cell = (cells[1].text or '').strip() if len(cells) > 1 else ''
third_cell = (cells[2].text or '').strip() if len(cells) > 2 else ''
row_text = ' '.join([c.text for c in cells]).lower()
# 跳过Clinical Significance行
if 'clinical significance' in row_text or '临床意义' in row_text:
continue
# 跳过表头行
if first_cell.lower().startswith('abb') or ('project' in row_text and '项目' in row_text):
continue
# 检查是否有有效内容ABB列、项目名列或Result列
# 配对表格的ABB列可能为空但项目名列和Result列有内容
has_content = False
# 检查ABB列第一列
if first_cell and first_cell not in [' ', '\n'] and not first_cell.startswith('{{'):
has_content = True
# 检查项目名列(第二列)- 配对表格的中文项目名
if not has_content and second_cell and second_cell not in [' ', '\n']:
# 排除占位符
if not second_cell.startswith('{{'):
has_content = True
# 检查Result列第三列
if not has_content and third_cell and third_cell not in [' ', '\n', '-', '/']:
if not third_cell.startswith('{{'):
has_content = True
if has_content:
has_valid_data = True
break
return has_valid_data
except:
return False
def table_has_any_data(table):
"""检查表格是否有任何有效数据(用于模块删除判断)"""
# 先检查特殊表格
if is_special_table(table):
return special_table_has_data(table)
# 普通表格检查
info = analyze_table(table)
return len(info['data_with_result']) > 0
# 0. 先删除“特殊表格”中没有结果的整张表(否则后续逻辑会跳过它们)
removed_special_tables = 0
for table in list(doc.tables):
# 跳过保护区域内的表格
if id(table) in protected_tables:
continue
info = analyze_table(table)
if info['is_special'] and not special_table_has_data(table):
try:
table._tbl.getparent().remove(table._tbl)
removed_special_tables += 1
except:
pass
# 获取body中表格的顺序只处理保护区域外的表格
body = doc._body._body
table_order = []
for elem in body:
if elem.tag.endswith('}tbl'):
for t in doc.tables:
if t._tbl is elem:
# 跳过保护区域内的表格
if id(t) not in protected_tables:
table_order.append(t)
break
# 第一遍:合并表格(表头下无数据,向后搜索找第一个有数据的表格)
tables_to_remove = set()
for i in range(len(table_order)):
if table_order[i] in tables_to_remove:
continue
t1 = table_order[i]
info1 = analyze_table(t1)
# 如果t1本身就是特殊表格不要往里合并东西
if info1['is_special']:
continue
# 条件t1有表头但无数据
if info1['header_idx'] >= 0 and len(info1['data_with_result']) == 0:
# 只在“下一个表头表格”之前搜索,避免跨模块吸走数据
next_header_pos = None
for k in range(i + 1, len(table_order)):
if table_order[k] in tables_to_remove:
continue
k_info = analyze_table(table_order[k])
# 如果遇到特殊表格,视为边界,停止搜索
if k_info['is_special']:
next_header_pos = k
break
# 以“有表头但无数据”的表作为模块边界(数据表可能也带表头,不能当边界)
if k_info['header_idx'] >= 0 and len(k_info['data_with_result']) == 0:
next_header_pos = k
break
search_end = next_header_pos if next_header_pos is not None else len(table_order)
# 在范围内收集所有“有数据且无表头”的表格
candidates = []
for j in range(i + 1, search_end):
if table_order[j] in tables_to_remove:
continue
candidate = table_order[j]
candidate_info = analyze_table(candidate)
# 跳过特殊表格(不作为被合并对象)
if candidate_info['is_special']:
continue
if len(candidate_info['data_with_result']) > 0:
candidates.append((candidate, candidate_info))
if not candidates:
continue
# 用第一个候选数据表的“项目名”作为标题覆盖t1标题避免出现空标题
title_text = ''
try:
first_candidate, first_candidate_info = candidates[0]
if first_candidate_info.get('data_with_result'):
data_row_idx = first_candidate_info['data_with_result'][0]
if len(first_candidate.rows[data_row_idx].cells) > 1:
title_text = first_candidate.rows[data_row_idx].cells[1].text.strip()
if not title_text:
title_text = first_candidate.rows[data_row_idx].cells[0].text.strip()
except:
title_text = ''
# 清空t1保留表头行
header_idx = info1['header_idx']
title_row_idx = header_idx + 1
# 清空:删除表头行之后所有旧行,但尽量保留表头下一行作为“标题行结构”
keep_title_row = title_row_idx < len(t1.rows)
delete_from = (title_row_idx + 1) if keep_title_row else (header_idx + 1)
for ridx in range(len(t1.rows) - 1, delete_from - 1, -1):
try:
t1._tbl.remove(t1.rows[ridx]._tr)
removed_rows += 1
except:
pass
# 确保存在标题行没有则插入一行插入后重新通过t1.rows获取
if not keep_title_row:
try:
new_tr = copy.deepcopy(t1.rows[header_idx]._tr)
t1._tbl.insert(title_row_idx, new_tr)
except:
pass
# 写入标题:只在第一列写入“第一条数据项目名”,其余列清空
try:
if title_row_idx < len(t1.rows):
title_row = t1.rows[title_row_idx]
for c in title_row.cells:
c.text = ''
if title_text:
title_row.cells[0].text = title_text
except:
pass
# 将候选表格的标题/数据/描述复制到t1并删除候选表格
for candidate, candidate_info in candidates:
rows_to_copy = []
rows_to_copy.extend(candidate_info['data_with_result'])
rows_to_copy.extend(candidate_info['desc_indices'])
for row_idx in sorted(rows_to_copy):
src_row = candidate.rows[row_idx]
new_tr = copy.deepcopy(src_row._tr)
t1._tbl.append(new_tr)
tables_to_remove.add(candidate)
merged_count += 1
# 删除被合并的表格
for t in tables_to_remove:
try:
t._tbl.getparent().remove(t._tbl)
except:
pass
# 第二遍:删除剩余的空数据行(跳过特殊表格和保护区域)
# 同时删除紧随其后的"Clinical Significance/临床意义"描述行,避免留下孤儿解释块
for table in doc.tables:
# 跳过保护区域内的表格
if id(table) in protected_tables:
continue
info = analyze_table(table)
# 跳过特殊表格
if info['is_special']:
continue
rows_to_remove = set()
for row_idx in info['data_without_result']:
rows_to_remove.add(row_idx)
# 检查后续行是否是描述行(可能有多行描述)
next_idx = row_idx + 1
while next_idx < len(table.rows):
try:
next_cells = table.rows[next_idx].cells
next_text = ' '.join([(c.text or '').strip().lower() for c in next_cells])
# 检查是否是描述行
if is_description_row(next_text):
rows_to_remove.add(next_idx)
next_idx += 1
continue
# 也检查是否是空行或只有少量文字的行(可能是格式化问题)
if not next_text.strip() or len(next_text.strip()) < 5:
rows_to_remove.add(next_idx)
next_idx += 1
continue
except:
pass
break
# 额外检查:删除所有孤立的描述行(前面没有对应数据行的描述)
kept_data_rows = set(info['data_with_result']) - rows_to_remove
for desc_idx in info['desc_indices']:
# 检查这个描述行前面是否有保留的数据行
has_data_before = False
for data_idx in kept_data_rows:
if data_idx < desc_idx:
# 检查data_idx和desc_idx之间是否没有其他数据行
intervening_data = [d for d in kept_data_rows if data_idx < d < desc_idx]
if not intervening_data:
has_data_before = True
break
if not has_data_before:
rows_to_remove.add(desc_idx)
for row_idx in sorted(rows_to_remove, reverse=True):
try:
table._tbl.remove(table.rows[row_idx]._tr)
removed_rows += 1
except:
pass
# 第二点五遍:补全合并后的标题行(表头下一行为空时,跳过特殊表格和保护区域)
for table in doc.tables:
# 跳过保护区域内的表格
if id(table) in protected_tables:
continue
info = analyze_table(table)
# 跳过特殊表格
if info['is_special']:
continue
if info['header_idx'] < 0:
continue
if len(info['data_with_result']) == 0:
continue
title_row_idx = info['header_idx'] + 1
if title_row_idx >= len(table.rows):
continue
try:
title_row = table.rows[title_row_idx]
# 如果表头下一行本身就是数据行,则插入一个“空标题行”(复制表头行结构)
try:
first_cell = title_row.cells[0].text.strip() if title_row.cells else ''
if is_data_row(first_cell) and has_data_in_row(title_row.cells):
extracted_title = ''
try:
if len(title_row.cells) > 1:
extracted_title = title_row.cells[1].text.strip()
if not extracted_title:
extracted_title = title_row.cells[0].text.strip()
except:
extracted_title = ''
header_tr = copy.deepcopy(table.rows[info['header_idx']]._tr)
table._tbl.insert(title_row_idx, header_tr)
title_row = table.rows[title_row_idx]
try:
for c in title_row.cells:
c.text = ''
if extracted_title:
title_row.cells[0].text = extracted_title
except:
pass
continue
except:
pass
# 若标题行已有内容且不是空行,则不覆盖
if any((c.text or '').strip() for c in title_row.cells):
continue
first_data_idx = info['data_with_result'][0]
if first_data_idx >= len(table.rows):
continue
data_row = table.rows[first_data_idx]
title_text = ''
if len(data_row.cells) > 1:
title_text = data_row.cells[1].text.strip()
if not title_text:
title_text = data_row.cells[0].text.strip()
if not title_text:
continue
for c in title_row.cells:
c.text = ''
title_row.cells[0].text = title_text
except:
pass
# 第三遍:删除所有没有数据的表格
# 重要:跳过保护区域内的表格
# 重要保留模块标题表格title_idx >= 0
# 重要:保留表头表格(包含 Abb/Project/Result
removed_tables = 0
for table in list(doc.tables):
# 跳过保护区域内的表格
if id(table) in protected_tables:
continue
info = analyze_table(table)
# 跳过特殊表格 - 这些是新生成的独立表格,必须保留
if info['is_special']:
continue
# 跳过模块标题表格 - 这些是模块的标题行,必须保留
if info['title_idx'] >= 0:
continue
# 跳过表头表格 - 这些是数据表格的表头,必须保留
if info['header_idx'] >= 0:
continue
# 只要没有数据就删除整个表格
if len(info['data_with_result']) == 0:
try:
table._tbl.getparent().remove(table._tbl)
removed_tables += 1
except:
pass
# 第3.5遍:删除重复的模块标题表格
# 模块标题表格特征只有1行包含重复的模块名称
seen_module_titles = set()
removed_duplicate_titles = 0
for table in list(doc.tables):
if id(table) in protected_tables:
continue
# 检查是否是模块标题表格只有1行内容重复
if len(table.rows) == 1:
row_text = ' '.join([c.text.strip() for c in table.rows[0].cells]).lower()
# 检查是否包含模块关键词且重复出现
for kw in ['imaging', 'urine', 'blood count', 'blood type', 'coagulation',
'infectious', 'electrolyte', 'liver', 'kidney', 'myocardial',
'thyroid', 'lipid', 'blood sugar', 'thromboembolism', 'bone',
'microelement', 'lymphocyte', 'humoral', 'inflammatory',
'autoantibody', 'tumor', 'female hormone', 'male hormone',
'female-specific', '影像', '尿液', '血常规', '血型', '凝血',
'传染病', '电解质', '肝功能', '肾功能', '心肌酶', '甲状腺',
'血脂', '血糖', '心脑血管', '骨代谢', '微量元素', '淋巴细胞',
'体液免疫', '炎症', '自身抗体', '肿瘤', '女性激素', '男性激素', '女性专项']:
if kw in row_text and row_text.count(kw) >= 2:
# 这是模块标题表格
if kw in seen_module_titles:
# 重复的标题表格,删除
try:
table._tbl.getparent().remove(table._tbl)
removed_duplicate_titles += 1
except:
pass
else:
seen_module_titles.add(kw)
break
if removed_duplicate_titles > 0:
print(f" [清理] 删除 {removed_duplicate_titles} 个重复的模块标题表格")
# 重要:在模块清理之前,先保存并重新加载文档,确保索引正确
safe_save(doc, output_path, template_path)
doc = Document(output_path)
# 第四遍:删除无数据的模块(包括标题、文字、图片等)
from docx.text.paragraph import Paragraph
module_keywords_cleanup = [
'urine detection', 'urine test', '尿液检测', 'complete blood count', '血常规',
'blood sugar', 'glucose', '血糖', 'lipid panel', 'lipid profile', '血脂',
'blood type', '血型', 'coagulation', 'blood coagulation', '凝血',
'infectious disease', 'four infectious', '传染病', '传染病四项',
'electrolyte', 'serum electrolyte', '电解质', '血清电解质',
'liver function', '肝功能', 'kidney function', '肾功能',
'cardiac enzyme', 'myocardial enzyme', 'enzyme spectrum', '心肌酶', '心肌酶谱',
'thyroid', 'thyroid function', '甲状腺', '甲状腺功能',
'cardiovascular', 'thromboembolism', '心血管', '心脑血管',
'bone metabolism', '骨代谢',
'trace element', 'heavy metal', 'microelement', '微量元素', '重金属',
'lymphocyte', 'lymphocyte subpopulation', '淋巴细胞', '淋巴细胞亚群',
'humoral immunity', '体液免疫', 'immune function', '免疫功能',
'inflammation', 'inflammatory', '炎症', '炎症反应',
'autoantibody', 'autoimmune', '自身抗体', '自身免疫',
'female hormone', '女性激素', '女性荷尔蒙', 'male hormone', '男性激素', '男性荷尔蒙',
'gynecological', 'female-specific', '妇科', '女性专项',
'tumor marker', '肿瘤标记物', '肿瘤标志物',
'imaging', '影像',
]
exclude_keywords_cleanup = ['health program', 'health report', 'abnormal', '异常', 'overall', 'assessment', 'clinical significance', '临床意义', 'functional medical health advice', '功能医学健康建议', 'medical intervention', '医学干预', 'nutrition', '营养', 'exercise', '运动', 'sleep', '睡眠', 'lifestyle', '生活方式', 'follow-up', '随访', 'functional medical team', '功能医学团队',
'(一)', '(二)', '(三)', '(四)', '(五)', '(六)',
'复查', '监测', '标志物', '血液学', '状态',
'bhrt', 'ivnt', 'msc', '干细胞', '静脉营养', '激素替代',
'建议', '方案', '治疗', '调理', '改善', '优化']
protected_section_keywords = ['functional medical health advice', '功能医学健康建议',
'overall health assessment', '整体健康状况',
'abnormal index', '异常指标',
'health report analysis', '健康报告分析',
'medical intervention', '医学干预',
'nutrition intervention', '营养干预',
'exercise intervention', '运动干预',
'sleep', '睡眠', 'lifestyle', '生活方式',
'follow-up', '随访', 'functional medical team', '功能医学团队']
def is_protected_section_cleanup(text):
if not text:
return False
text_lower = text.lower().strip()
return any(kw in text_lower for kw in protected_section_keywords)
def is_module_title_para_cleanup(text):
if not text or len(text) > 100:
return False
text_lower = text.lower().strip()
if text_lower.startswith('(i)') or text_lower.startswith('(ii)') or text_lower.startswith('(iii)'):
return False
if text_lower.startswith('i.') or text_lower.startswith('ii.') or text_lower.startswith('iii.'):
return False
if any(ex in text_lower for ex in exclude_keywords_cleanup):
return False
return any(kw in text_lower for kw in module_keywords_cleanup)
def is_module_title_table_cleanup(table):
if len(table.rows) < 1 or len(table.rows) > 2:
return False
try:
full_text = ' '.join([c.text.strip() for row in table.rows for c in row.cells]).lower()
if 'clinical significance' in full_text or '临床意义' in full_text:
return False
if 'abb' in full_text and 'project' in full_text and 'result' in full_text:
return False
# 模块标题关键词(包含变体拼写)
module_title_names = [
'urine detection', 'urine test', '尿液检测',
'complete blood count', '血常规',
'blood sugar', '血糖', 'lipid profile', '血脂', 'blood type', '血型',
'blood coagulation', '凝血功能', 'four infectious diseases', '传染病四项',
'serum electrolytes', '血电解质', 'liver function', '肝功能',
'kidney function', '肾功能', 'myocardial enzyme', '心肌酶',
'thyroid function', '甲状腺功能', 'thromboembolism', '心脑血管',
'bone metabolism', '骨代谢', 'microelement', '微量元素',
'humoral immunity', '体液免疫', 'inflammatory reaction', '炎症反应',
'autoantibody', '自身抗体', 'female hormone', '女性激素',
'male hormone', '男性激素', 'tumor markers', '肿瘤标记物',
'lymphocyte', 'lymphocyto', '淋巴细胞', '淋巴细胞亚群',
'imaging', '影像学', 'female-specific', '女性专项'
]
row_text = ' '.join([c.text.strip() for c in table.rows[0].cells]).lower()
# 放宽条件只要标题出现1次即可之前要求2次太严格
for title in module_title_names:
if title in row_text:
return True
return False
except:
return False
body = doc._body._body
body_children = list(body)
tbl_map = {}
for t in doc.tables:
tbl_map[id(t._tbl)] = t
# 精确识别模块ID按优先级排列female hormone 必须在 male hormone 之前匹配,避免子串冲突)
_MODULE_IDENTIFY_RULES = [
('female hormone', 'female hormone'), ('女性荷尔蒙', 'female hormone'), ('女性激素', 'female hormone'),
('male hormone', 'male hormone'), ('男性荷尔蒙', 'male hormone'), ('男性激素', 'male hormone'),
('female-specific', 'female-specific'), ('女性专项', 'female-specific'),
('urine detection', 'urine'), ('urine test', 'urine'), ('尿液检测', 'urine'),
('complete blood count', 'blood count'), ('血常规', 'blood count'),
('blood sugar', 'blood sugar'), ('血糖', 'blood sugar'),
('lipid profile', 'lipid'), ('血脂', 'lipid'),
('blood type', 'blood type'), ('血型', 'blood type'),
('blood coagulation', 'coagulation'), ('凝血功能', 'coagulation'), ('凝血', 'coagulation'),
('four infectious', 'infectious'), ('传染病', 'infectious'),
('serum electrolyte', 'electrolyte'), ('血电解质', 'electrolyte'), ('电解质', 'electrolyte'),
('liver function', 'liver'), ('肝功能', 'liver'),
('kidney function', 'kidney'), ('肾功能', 'kidney'),
('myocardial enzyme', 'myocardial'), ('心肌酶', 'myocardial'),
('thyroid function', 'thyroid'), ('甲状腺功能', 'thyroid'), ('甲状腺', 'thyroid'),
('thromboembolism', 'thrombo'), ('心脑血管', 'thrombo'),
('bone metabolism', 'bone'), ('骨代谢', 'bone'),
('microelement', 'microelement'), ('微量元素', 'microelement'),
('humoral immunity', 'humoral'), ('体液免疫', 'humoral'),
('inflammatory', 'inflammatory'), ('炎症', 'inflammatory'),
('autoantibody', 'autoantibody'), ('自身抗体', 'autoantibody'),
('tumor marker', 'tumor'), ('肿瘤标记', 'tumor'),
('lymphocyte', 'lymphocyte'), ('lymphocyto', 'lymphocyte'), ('淋巴细胞', 'lymphocyte'),
('imaging', 'imaging'), ('影像', 'imaging'),
]
def identify_module_id(title_text):
"""从模块标题文本精确识别模块ID"""
text_lower = title_text.lower()
for pattern, mid in _MODULE_IDENTIFY_RULES:
if pattern in text_lower:
return mid
return None
# 找出所有模块标题表格及其位置(统一使用 is_module_title_table_cleanup + identify_module_id
module_title_positions = [] # [(position, table, module_id)]
for i, elem in enumerate(body_children):
if elem.tag.endswith('}tbl'):
for t in doc.tables:
if t._tbl is elem:
if is_module_title_table_cleanup(t):
try:
title_text = ' '.join([c.text.strip() for c in t.rows[0].cells])
except:
title_text = ''
mid = identify_module_id(title_text)
if mid:
module_title_positions.append((i, t, mid))
break
# 检查每个模块是否有数据表格
modules_with_data = set()
for idx, (pos, title_table, module_id) in enumerate(module_title_positions):
next_pos = module_title_positions[idx + 1][0] if idx + 1 < len(module_title_positions) else len(body_children)
has_data = False
for j in range(pos + 1, next_pos):
elem = body_children[j]
if elem.tag.endswith('}tbl'):
for t in doc.tables:
if t._tbl is elem:
if not is_module_title_table_cleanup(t) and table_has_any_data(t):
has_data = True
break
if has_data:
break
if has_data:
modules_with_data.add(module_id)
print(f" [模块清理] 有数据的模块: {sorted(modules_with_data)}")
# 根据性别判断结果,决定删除哪个荷尔蒙模块
# 将中文"男性"/"女性"转换为英文"male"/"female"
gender_from_ocr = patient_info.get('gender', '') if patient_info else ''
if gender_from_ocr == '男性':
detected_gender = 'male'
elif gender_from_ocr == '女性':
detected_gender = 'female'
else:
# 如果没有从OCR提取到性别使用默认值女性
detected_gender = 'female'
# 模块ID到描述段落搜索关键词的映射用于清理文档中残留的描述段落
module_desc_mapping = {
'urine': ('urine detection', '尿液检测'),
'blood count': ('complete blood count', '血常规'),
'blood sugar': ('blood sugar', '血糖'),
'lipid': ('lipid profile', '血脂'),
'blood type': ('blood type', '血型'),
'coagulation': ('blood coagulation', '凝血'),
'infectious': ('four infectious', '传染病'),
'electrolyte': ('serum electrolyte', '电解质'),
'liver': ('liver function', '肝功能'),
'kidney': ('kidney function', '肾功能'),
'myocardial': ('myocardial enzyme', '心肌酶'),
'thyroid': ('thyroid function', '甲状腺'),
'thrombo': ('thromboembolism', '心脑血管'),
'bone': ('bone metabolism', '骨代谢'),
'microelement': ('microelement', '微量元素'),
'humoral': ('humoral immunity', '体液免疫'),
'inflammatory': ('inflammatory', '炎症'),
'autoantibody': ('autoantibody', '自身抗体'),
'female hormone': ('female hormone', '女性荷尔蒙'),
'male hormone': ('male hormone', '男性荷尔蒙'),
'tumor': ('tumor marker', '肿瘤标记'),
'lymphocyte': ('lymphocyto', '淋巴细胞'),
'imaging': ('imaging', '影像'),
'female-specific': ('female-specific', '女性专项'),
}
# 荷尔蒙模块清理逻辑:根据性别判断结果,只保留一个荷尔蒙模块
if detected_gender == 'male':
if 'female hormone' in modules_with_data:
print(f" [模块清理] 性别为男性,强制删除女性荷尔蒙模块")
modules_with_data.discard('female hormone')
else: # female
if 'male hormone' in modules_with_data:
print(f" [模块清理] 性别为女性,强制删除男性荷尔蒙模块")
modules_with_data.discard('male hormone')
# 动态构建需要清理描述的空模块列表(所有没有数据的模块)
empty_modules_to_clean = []
for module_id, (en_title, cn_title) in module_desc_mapping.items():
if module_id not in modules_with_data:
empty_modules_to_clean.append((module_id, en_title, cn_title))
print(f" [模块清理] 需要删除描述的空模块: {[m[0] for m in empty_modules_to_clean]}")
removed_modules = 0
print(f" [模块清理] 找到 {len(module_title_positions)} 个模块起点")
for idx in range(len(module_title_positions) - 1, -1, -1):
start_i, _tbl, module_id = module_title_positions[idx]
end_i = module_title_positions[idx + 1][0] if idx + 1 < len(module_title_positions) else len(body_children)
try:
module_title = ' '.join([c.text.strip() for c in _tbl.rows[0].cells])[:40]
except:
module_title = 'Unknown'
module_elements = body_children[start_i:end_i]
if is_protected_section_cleanup(module_title):
continue
# 根据性别判断是否强制删除荷尔蒙模块精确匹配module_id
should_force_remove = False
if module_id == 'female hormone' and detected_gender == 'male':
should_force_remove = True
print(f" [模块清理] 性别为男性,强制删除女性荷尔蒙模块: {module_title}")
elif module_id == 'male hormone' and detected_gender == 'female':
should_force_remove = True
print(f" [模块清理] 性别为女性,强制删除男性荷尔蒙模块: {module_title}")
# 如果模块有数据且不需要强制删除,直接跳过
if not should_force_remove and module_id and module_id in modules_with_data:
continue
# 兆底检查:扫描模块内表格是否实际有数据
module_has_data = False
for e in module_elements:
if e.tag.endswith('}tbl'):
for t in doc.tables:
if t._tbl is e:
if not is_module_title_table_cleanup(t) and table_has_any_data(t):
module_has_data = True
break
if should_force_remove or not module_has_data:
# 安全边界(向后):从 start_i+1 往后扫描,找到下一个模块的标题段落,避免删除下一个模块的标题+描述
safe_end = end_i
for ei in range(start_i + 1, end_i):
elem = body_children[ei]
if elem.tag.endswith('}p'):
p_text = ''.join(elem.itertext()).strip()
if is_module_title_para_cleanup(p_text):
# 确认这个标题段落属于另一个模块(不是当前模块)
p_mid = identify_module_id(p_text)
if p_mid and p_mid != module_id:
safe_end = ei
break
# 安全边界(向前):从 start_i-1 往前扫描,找到当前模块的标题段落和描述段落
# 这些段落在标题表格之前,需要一起删除
safe_start = start_i
for ei in range(start_i - 1, -1, -1):
elem = body_children[ei]
if elem.tag.endswith('}tbl'):
# 遇到表格(上一个模块的数据表格),停止
break
if elem.tag.endswith('}p'):
p_text = ''.join(elem.itertext()).strip()
if is_module_title_para_cleanup(p_text):
p_mid = identify_module_id(p_text)
if p_mid and p_mid != module_id:
# 属于其他模块的标题段落,停止
break
safe_start = ei
removed_in_module = 0
for ei in range(safe_end - 1, safe_start - 1, -1):
try:
body_children[ei].getparent().remove(body_children[ei])
removed_in_module += 1
except:
pass
removed_modules += 1
if should_force_remove:
print(f" [模块清理] 删除荷尔蒙模块(根据性别): {module_title} ({removed_in_module} 个元素)")
else:
print(f" [模块清理] 删除空模块: {module_title} ({removed_in_module} 个元素)")
# 删除空模块的描述段落
if empty_modules_to_clean:
# 重新获取body_children因为上面可能删除了一些元素
body_children = list(body)
from docx.oxml.ns import qn
# 构建数据模块关键词集合(用于安全检查,防止误删有数据模块的内容)
data_module_keywords = set()
for mid in modules_with_data:
if mid in module_desc_mapping:
en, cn = module_desc_mapping[mid]
data_module_keywords.add(en.lower())
data_module_keywords.add(cn)
# 找到所有描述段落标题的位置
desc_title_positions = [] # [(position, module_id, title_text)]
for i, elem in enumerate(body_children):
if elem.tag.endswith('}p'):
text_parts = []
for t in elem.iter(qn('w:t')):
if t.text:
text_parts.append(t.text)
text = ''.join(text_parts).strip()
text_lower = text.lower()
# 检查是否是描述段落标题(包含模块名称)
# 注意:描述标题可能较长(如 "Thyroid Function Test Result Analysis 甲状腺功能检测结果分析"放宽到200字符
if len(text) < 200:
for module_id, en_title, cn_title in empty_modules_to_clean:
if en_title in text_lower and cn_title in text:
desc_title_positions.append((i, module_id, text[:40]))
break
# 找到所有可能的描述段落标题(用于确定边界)
# 关键:必须检测所有模块的描述标题(包括有数据的模块),作为删除边界
all_desc_titles = [
'urine detection', 'complete blood count', 'blood sugar', 'lipid profile',
'blood type', 'blood coagulation', 'four infectious', 'serum electrolyte',
'liver function', 'kidney function', 'myocardial enzyme', 'thyroid function',
'thromboembolism', 'bone metabolism', 'microelement', 'humoral immunity',
'inflammatory', 'autoantibody', 'female hormone', 'male hormone',
'tumor marker', 'lymphocyte', 'lymphocyto', 'imaging', 'female-specific'
]
all_title_positions = []
for i, elem in enumerate(body_children):
if elem.tag.endswith('}p'):
text_parts = []
for t in elem.iter(qn('w:t')):
if t.text:
text_parts.append(t.text)
text = ''.join(text_parts).strip()
text_lower = text.lower()
# 放宽长度限制到200字符避免遗漏长标题导致边界检测失败
if len(text) < 200:
for title in all_desc_titles:
if title in text_lower:
all_title_positions.append(i)
break
all_title_positions.sort()
print(f" [描述清理] 检测到 {len(desc_title_positions)} 个空模块描述标题, {len(all_title_positions)} 个边界标题")
# 删除空模块的描述段落
removed_desc = 0
for pos, module_id, title_text in sorted(desc_title_positions, reverse=True):
# 找到下一个描述标题的位置
next_pos = len(body_children)
for p in all_title_positions:
if p > pos:
next_pos = p
break
# 安全检查:扫描待删除范围,如果包含有数据模块的关键词则截断
safe_end = next_pos
for i in range(pos + 1, next_pos):
if i < len(body_children):
elem_text = ''.join(body_children[i].itertext()).strip().lower()
for dkw in data_module_keywords:
if dkw.lower() in elem_text:
# 发现有数据模块的内容,截断删除范围
safe_end = i
print(f" [描述清理] 安全截断: {title_text} 在位置 {i} 发现数据模块关键词 '{dkw}',从 {next_pos} 截断到 {safe_end}")
break
if safe_end != next_pos:
break
# 删除从当前标题到安全边界之间的所有元素
elements_to_remove = []
for i in range(pos, safe_end):
if i < len(body_children):
elements_to_remove.append(body_children[i])
for elem in reversed(elements_to_remove):
try:
elem.getparent().remove(elem)
removed_desc += 1
except:
pass
print(f" [描述清理] 删除空模块描述: {title_text} ({len(elements_to_remove)} 个元素, 范围 {pos}-{safe_end})")
# 使用安全保存
safe_save(doc, output_path, template_path)
print(f"\n✓ 清理完成: 删除 {removed_rows} 行, 合并 {merged_count} 对表格, 删除 {removed_tables} 个空表格, 删除 {removed_special_tables} 个空特殊表格")
print(f"✓ 模块清理: 删除 {removed_modules} 个无数据模块")
return doc
def format_document_structure(doc_path: str, output_path: str):
"""
整理Word文档结构
1. 清理多余的空白段落(连续空段落只保留一个)
2. 在模块标题前插入分页符(确保每个模块从新页开始)
重要:跳过保护区域(前四页)和"客户功能医学检测档案"区域的所有元素
"""
from docx import Document
from docx.oxml.ns import qn
from docx.oxml import OxmlElement
from xml_safe_save import safe_save
template_path_local = Path(__file__).parent / "template_complete.docx"
doc = Document(doc_path)
body = doc.element.body
# 获取保护边界位置
protection_boundary = find_health_program_boundary(doc)
print(f" [保护] 格式整理时跳过前 {protection_boundary} 个元素")
# 获取"客户功能医学检测档案"区域位置
exam_file_start, exam_file_end = find_examination_file_region(doc)
if exam_file_start >= 0:
print(f" [保护] 格式整理时跳过'客户功能医学检测档案'区域: {exam_file_start}-{exam_file_end}")
# 模块标题关键词(与清理函数保持一致)
module_keywords = [
'urine detection', 'urine test', '尿液检测', 'complete blood count', '血常规',
'blood sugar', 'glucose', '血糖', 'lipid panel', 'lipid profile', '血脂',
'blood type', '血型', 'coagulation', 'blood coagulation', '凝血',
'infectious disease', 'four infectious', '传染病', '传染病四项',
'electrolyte', 'serum electrolyte', '电解质', '血清电解质',
'liver function', '肝功能', 'kidney function', '肾功能',
'cardiac enzyme', 'myocardial enzyme', 'enzyme spectrum', '心肌酶', '心肌酶谱',
'thyroid', 'thyroid function', '甲状腺', '甲状腺功能',
'cardiovascular', 'thromboembolism', '心血管', '心脑血管',
'bone metabolism', '骨代谢',
'trace element', 'heavy metal', 'microelement', '微量元素', '重金属',
'lymphocyte', 'lymphocyte subpopulation', '淋巴细胞', '淋巴细胞亚群',
'humoral immunity', '体液免疫', 'immune function', '免疫功能',
'inflammation', 'inflammatory', '炎症', '炎症反应',
'autoantibody', 'autoimmune', '自身抗体', '自身免疫',
'female hormone', '女性激素', '女性荷尔蒙', 'male hormone', '男性激素', '男性荷尔蒙',
'gynecological', 'female-specific', '妇科', '女性专项',
'tumor marker', '肿瘤标记物', '肿瘤标志物',
'imaging', '影像',
]
exclude_keywords = ['health program', 'health report', 'abnormal', '异常', 'overall', 'assessment',
'medical intervention', '医学干预', 'functional medical health advice', '功能医学健康建议']
def is_module_title_paragraph(text):
"""检查段落是否是模块标题"""
if not text or len(text) > 100:
return False
text_lower = text.lower().strip()
# 排除章节大标题(以罗马数字或括号数字开头)
if text_lower.startswith('(i)') or text_lower.startswith('(ii)') or text_lower.startswith('(iii)'):
return False
if text_lower.startswith('i.') or text_lower.startswith('ii.') or text_lower.startswith('iii.'):
return False
if any(ex in text_lower for ex in exclude_keywords):
return False
return any(kw in text_lower for kw in module_keywords)
def is_module_title_table(elem):
"""检查表格元素是否是模块标题表格"""
text = ''.join(elem.itertext()).strip()
if not text or len(text) > 200:
return False
text_lower = text.lower()
# 排除章节大标题
if any(ex in text_lower for ex in exclude_keywords):
return False
# 检查是否包含模块关键词
for kw in module_keywords:
if kw in text_lower:
# 模块标题表格通常会重复模块名称多次
if text_lower.count(kw) >= 2:
return True
return False
def is_in_protected_region(idx):
"""检查索引是否在保护区域内"""
# 检查是否在前四页保护区域内
if idx < protection_boundary:
return True
# 检查是否在"客户功能医学检测档案"区域内
if exam_file_start >= 0 and exam_file_start <= idx < exam_file_end:
return True
return False
def create_page_break_paragraph():
"""创建包含分页符的段落"""
p = OxmlElement('w:p')
r = OxmlElement('w:r')
br = OxmlElement('w:br')
br.set(qn('w:type'), 'page')
r.append(br)
p.append(r)
return p
# 第一步:清理多余的空白段落和占位符段落(跳过保护区域)
removed_count = 0
children = list(body)
prev_was_empty_p = False
# 需要删除的占位符文本
placeholder_texts = ['testing result检测结果', 'testing result 检测结果']
for i, elem in enumerate(children):
# 跳过保护区域(包括前四页和"客户功能医学检测档案"区域)
if is_in_protected_region(i):
prev_was_empty_p = False # 重置状态,避免跨区域删除
continue
if elem.tag.endswith('}p'):
text = ''.join(elem.itertext()).strip()
text_lower = text.lower().replace(' ', '')
has_break = elem.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br') is not None
# 删除 "Testing Result检测结果" 占位符段落
if any(ph.replace(' ', '') in text_lower for ph in placeholder_texts):
try:
body.remove(elem)
removed_count += 1
continue
except:
pass
if not text and not has_break:
if prev_was_empty_p:
try:
body.remove(elem)
removed_count += 1
except:
pass
else:
prev_was_empty_p = True
else:
prev_was_empty_p = False
else:
prev_was_empty_p = False
# 第二步:在模块标题前插入分页符(每个模块都需要,跳过保护区域)
# 注意:模块标题可能是段落(<p>)或表格(<tbl>
# 重新计算保护区域边界(因为第一步删除元素后位置偏移)
protection_boundary = find_health_program_boundary(doc)
exam_file_start, exam_file_end = find_examination_file_region(doc)
pagebreak_count = 0
children = list(body) # 重新获取
for i, elem in enumerate(children):
# 跳过保护区域
if is_in_protected_region(i):
continue
is_title = False
# 检查段落类型的模块标题
if elem.tag.endswith('}p'):
text = ''.join(elem.itertext()).strip()
if is_module_title_paragraph(text):
is_title = True
# 检查表格类型的模块标题
elif elem.tag.endswith('}tbl'):
if is_module_title_table(elem):
is_title = True
if is_title:
# 检查前面是否已经有分页符
has_pagebreak_before = False
if i > 0:
prev_elem = children[i-1]
if prev_elem.tag.endswith('}p'):
prev_break = prev_elem.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br')
if prev_break is not None and prev_break.get(qn('w:type')) == 'page':
has_pagebreak_before = True
if not has_pagebreak_before:
# 在模块标题前插入分页符
pb = create_page_break_paragraph()
elem.addprevious(pb)
pagebreak_count += 1
# 第2.3步:清理特定模块后的空白页
# 特殊处理:某些模块后面容易产生空白页(凝血功能、骨代谢等)
def clean_module_trailing_blanks(body, module_keywords, next_module_keywords):
"""清理指定模块数据表格前的多余空白段落"""
children = list(body)
removed_count = 0
# 找到模块标题表格的位置(数据区域开始)
for i, elem in enumerate(children):
if elem.tag.endswith('}tbl'):
text = ''.join(elem.itertext()).strip().lower()
if any(kw in text for kw in module_keywords):
# 找到了模块标题表格,检查前面是否有多余的空段落
# 往前查找,删除分页符前的空段落(保留一个分页符)
j = i - 1
page_break_found = False
while j >= 0:
prev_elem = children[j]
if prev_elem.tag.endswith('}p'):
prev_text = ''.join(prev_elem.itertext()).strip()
has_break = prev_elem.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br') is not None
if not prev_text and not has_break:
# 空段落,删除
try:
body.remove(prev_elem)
removed_count += 1
except:
pass
elif has_break and not prev_text:
# 分页符段落
if page_break_found:
# 已经有一个分页符了,删除多余的
try:
body.remove(prev_elem)
removed_count += 1
except:
pass
else:
page_break_found = True
else:
# 有内容的段落,停止
break
else:
# 不是段落,停止
break
j -= 1
# 重新获取children
children = list(body)
return removed_count
# 清理凝血功能模块数据表格前的空白
removed = clean_module_trailing_blanks(body, ['coagulation', '凝血'], ['infectious', '传染病'])
if removed > 0:
print(f" 🧹 清理凝血功能模块前 {removed} 个空白元素")
# 清理骨代谢模块数据表格前的空白
removed = clean_module_trailing_blanks(body, ['bone metabolism', '骨代谢'], ['microelement', '微量元素'])
if removed > 0:
print(f" 🧹 清理骨代谢模块前 {removed} 个空白元素")
# 清理骨代谢模块数据表格后、微量元素分页符前的空段落
def clean_between_modules(body, current_module_keywords, next_module_keywords):
"""清理当前模块最后一个数据表格后、下一个模块分页符前的空段落"""
children = list(body)
removed_count = 0
w_ns = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}'
# 找到下一个模块标题的位置
next_module_pos = -1
for i, elem in enumerate(children):
text = ''.join(elem.itertext()).strip().lower()
if any(kw in text for kw in next_module_keywords):
next_module_pos = i
break
if next_module_pos < 0:
return 0
# 从下一个模块标题往前找,删除空段落(保留一个分页符)
j = next_module_pos - 1
page_break_found = False
while j >= 0:
elem = children[j]
if elem.tag.endswith('}p'):
text = ''.join(elem.itertext()).strip()
br_elem = elem.find(f'.//{w_ns}br')
has_break = br_elem is not None
break_type = br_elem.get(f'{w_ns}type', '') if br_elem is not None else ''
if not text and not has_break:
# 空段落,删除
try:
body.remove(elem)
removed_count += 1
except:
pass
elif has_break and break_type == 'page' and not text:
# 分页符段落
if page_break_found:
# 已经有一个分页符了,删除多余的
try:
body.remove(elem)
removed_count += 1
except:
pass
else:
page_break_found = True
# 找到分页符,停止(保留这个分页符)
break
else:
# 有内容的段落或其他类型的换行,停止
break
elif elem.tag.endswith('}tbl'):
# 遇到表格,停止
break
j -= 1
return removed_count
removed = clean_between_modules(body, ['bone metabolism', '骨代谢'], ['microelement', '微量元素'])
if removed > 0:
print(f" 🧹 清理骨代谢模块后 {removed} 个空白元素")
# 第2.5步:在保护区域之后的所有图片前添加分页符
# 重要:只处理保护区域之后的图片,前四页的图片不能添加分页符
safe_save(doc, output_path, template_path_local)
doc = Document(output_path)
body = doc.element.body
children = list(body)
health_program_pos = find_health_program_boundary(doc)
print(f" [图片分页] 保护边界位置: {health_program_pos}")
# 模块标题关键词用于判断图片是否是页面底部的logo图片
module_keywords = [
'urine', 'blood', 'sugar', 'lipid', 'coagulation', 'infectious', 'electrolyte',
'liver', 'kidney', 'myocardial', 'thyroid', 'thromboembolism', 'bone', 'microelement',
'immunity', 'inflammatory', 'autoantibody', 'hormone', 'tumor', 'lymphocyte', 'imaging',
'尿液', '血常规', '血糖', '血脂', '凝血', '传染病', '电解质', '肝功能', '肾功能',
'心肌酶', '甲状腺', '血栓', '骨代谢', '微量元素', '免疫', '炎症', '自身抗体',
'激素', '肿瘤', '淋巴', '影像'
]
def is_logo_image(children, img_idx):
"""检查图片是否是页面底部的logo图片logo后面通常紧跟着下一个模块标题"""
# 检查图片后面的几个元素
for j in range(img_idx + 1, min(img_idx + 5, len(children))):
next_elem = children[j]
next_text = ''.join(next_elem.itertext()).strip().lower()
# 如果后面紧跟着模块标题说明这是logo图片
if any(kw in next_text for kw in module_keywords):
return True
return False
# 先收集所有需要添加分页符的图片元素
# 注意:不再在图片前添加分页符,因为这会导致空白页
# 分页符应该在模块标题前添加而不是在logo图片前
images_need_pagebreak = []
# 暂时禁用图片分页符功能,因为它会导致空白页
# for i, elem in enumerate(children):
# ...
# 然后统一添加分页符(避免循环中修改列表导致的问题)
image_pagebreak_count = 0
for elem in images_need_pagebreak:
pb = create_page_break_paragraph()
elem.addprevious(pb)
image_pagebreak_count += 1
if image_pagebreak_count > 0:
print(f" 📷 在 {image_pagebreak_count} 个图片前插入分页符")
# 第三步:清理文档末尾的空白内容(空段落、分页符、空表格)
# 从后往前删除,直到遇到有内容的元素
children = list(body)
removed_tail = 0
for i in range(len(children) - 1, -1, -1):
elem = children[i]
tag = elem.tag.split('}')[-1]
# 跳过sectPr文档设置
if tag == 'sectPr':
continue
# 检查是否是空段落或只有分页符的段落
if tag == 'p':
text = ''.join(elem.itertext()).strip()
if not text:
try:
body.remove(elem)
removed_tail += 1
continue
except:
pass
else:
break # 遇到有内容的段落,停止
# 检查是否是空表格(只有标题行没有数据)
elif tag == 'tbl':
# 找到对应的Table对象
is_empty_table = True
for t in doc.tables:
if t._tbl is elem:
# 检查表格是否有实际数据
for row in t.rows:
row_text = ' '.join([c.text.strip() for c in row.cells]).lower()
if row_text and 'clinical significance' not in row_text:
# 检查是否是数据行(包含数字或结果)
import re
if re.search(r'\d', row_text) or any(kw in row_text for kw in ['positive', 'negative', 'normal']):
is_empty_table = False
break
break
if is_empty_table:
try:
body.remove(elem)
removed_tail += 1
continue
except:
pass
else:
break # 遇到有数据的表格,停止
else:
break # 遇到其他类型元素,停止
if removed_tail > 0:
print(f" 🧹 清理文档末尾 {removed_tail} 个空白元素")
# 第三步:清理连续的分页符(避免空白页)
# 重新加载文档
safe_save(doc, output_path, template_path_local)
doc = Document(output_path)
body = doc.element.body
children = list(body)
w_ns = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}'
removed_pagebreaks = 0
# 清理分页符前面的空段落(这会导致空白页)
i = 0
while i < len(children):
elem = children[i]
if elem.tag.endswith('}p'):
br = elem.find(f'.//{w_ns}br')
if br is not None and br.get(f'{w_ns}type') == 'page':
text = ''.join(elem.itertext()).strip()
if not text: # 这是一个分页符段落
# 检查前面是否有空段落,如果有就删除
if i > 0:
prev_elem = children[i - 1]
if prev_elem.tag.endswith('}p'):
prev_text = ''.join(prev_elem.itertext()).strip()
prev_br = prev_elem.find(f'.//{w_ns}br')
if not prev_text and prev_br is None:
# 前面是空段落,删除它
try:
body.remove(prev_elem)
children = list(body)
removed_pagebreaks += 1
continue # 不增加i继续检查
except:
pass
i += 1
# 清理连续的分页符
children = list(body)
i = 0
while i < len(children) - 1:
elem = children[i]
next_elem = children[i + 1]
if elem.tag.endswith('}p'):
br = elem.find(f'.//{w_ns}br')
if br is not None and br.get(f'{w_ns}type') == 'page':
text = ''.join(elem.itertext()).strip()
if not text:
if next_elem.tag.endswith('}p'):
next_br = next_elem.find(f'.//{w_ns}br')
next_text = ''.join(next_elem.itertext()).strip()
if next_br is not None and next_br.get(f'{w_ns}type') == 'page' and not next_text:
try:
body.remove(elem)
children = list(body)
removed_pagebreaks += 1
continue
except:
pass
elif not next_text and next_br is None:
try:
body.remove(next_elem)
children = list(body)
removed_pagebreaks += 1
continue
except:
pass
i += 1
# 第四步:删除表头前面的多余分页符
# 表头前面不应该有分页符(分页符应该在模块标题前面)
children = list(body)
removed_header_pagebreaks = 0
i = 1
while i < len(children):
elem = children[i]
if elem.tag.endswith('}tbl'):
# 检查是否是表头表格
text = ''.join(elem.itertext()).strip().lower()
if 'abb' in text and 'project' in text and 'result' in text:
# 这是表头表格,检查前面是否有分页符
if i > 0:
prev_elem = children[i - 1]
if prev_elem.tag.endswith('}p'):
br = prev_elem.find(f'.//{w_ns}br')
if br is not None and br.get(f'{w_ns}type') == 'page':
prev_text = ''.join(prev_elem.itertext()).strip()
if not prev_text:
try:
body.remove(prev_elem)
children = list(body)
removed_header_pagebreaks += 1
continue # 不增加i
except:
pass
i += 1
if removed_pagebreaks > 0:
print(f" 🧹 清理 {removed_pagebreaks} 个连续分页符")
if removed_header_pagebreaks > 0:
print(f" 🧹 清理表头前 {removed_header_pagebreaks} 个多余分页符")
# 使用安全保存
safe_save(doc, output_path, template_path_local)
print(f"\n✓ 格式整理完成: 清理了 {removed_count} 个多余空白段落, 插入 {pagebreak_count} 个模块间分页符")
return doc
def main(force_extract=False, use_deepseek=False, deepseek_api_key=None):
"""
主函数
Args:
force_extract: 是否强制重新提取数据(忽略缓存)
use_deepseek: 是否使用DeepSeek分析补充数据
deepseek_api_key: DeepSeek API密钥
"""
# 路径配置
pdf_dir = r"c:\Users\UI\Desktop\医疗报告\医疗报告智能体"
template_config_path = Path(__file__).parent / "abb_mapping_config.json"
word_template_path = Path(__file__).parent / "template_complete.docx"
reports_dir = Path(__file__).parent / "reports"
reports_dir.mkdir(exist_ok=True)
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = reports_dir / f"filled_report_{timestamp}.docx"
extracted_file = Path(__file__).parent / "extracted_medical_data.json"
# ========== 获取保护边界位置(不备份,改为在各步骤中跳过保护区域)==========
print('\n' + '=' * 60)
print('[PROTECT] 检测保护区域边界(前四页)')
print('=' * 60)
template_doc = Document(word_template_path)
protection_boundary = find_health_program_boundary(template_doc)
print(f' 保护边界位置: {protection_boundary}')
print(f' 说明: 保护区域内的元素将在各处理步骤中被跳过')
del template_doc # 释放模板文档
print("=" * 60)
print("步骤1: 获取检测数据 (百度OCR)")
print("=" * 60)
# 检查PDF目录中的文件
pdf_files = list(Path(pdf_dir).glob("*.pdf"))
pdf_files_info = {str(f.name): f.stat().st_mtime for f in pdf_files}
# 检查是否需要重新提取
need_extract = force_extract
if not need_extract and extracted_file.exists():
with open(extracted_file, 'r', encoding='utf-8') as f:
cached_data = json.load(f)
# 检查缓存中记录的PDF文件信息
cached_pdf_info = cached_data.get('pdf_files', {})
# 比较当前PDF文件和缓存中的文件
if set(pdf_files_info.keys()) != set(cached_pdf_info.keys()):
# 文件列表不同(有新增或删除)
new_files = set(pdf_files_info.keys()) - set(cached_pdf_info.keys())
removed_files = set(cached_pdf_info.keys()) - set(pdf_files_info.keys())
if new_files:
print(f" 📄 检测到新增PDF文件: {', '.join(new_files)}")
if removed_files:
print(f" 📄 检测到删除PDF文件: {', '.join(removed_files)}")
need_extract = True
else:
# 检查文件修改时间
for fname, mtime in pdf_files_info.items():
if fname in cached_pdf_info and mtime > cached_pdf_info[fname]:
print(f" 📄 检测到PDF文件已更新: {fname}")
need_extract = True
break
else:
need_extract = True
if not need_extract:
print(f" ✓ 发现缓存数据: {extracted_file}")
extracted_items = cached_data.get('items', [])
patient_info = cached_data.get('patient_info', {})
print(f" ✓ 从缓存读取 {len(extracted_items)} 个检测项")
if patient_info:
print(f" ✓ 从缓存读取患者信息: {patient_info.get('name', '未知')}")
print(f" 💡 如需重新提取,请删除缓存文件或使用 --force 参数")
else:
# 重新提取
if force_extract:
print(" 📄 强制重新提取...")
else:
print(" 📄 检测到文件变化开始OCR提取...")
# 提取检测数据同时返回OCR原文避免重复OCR
extracted_items, ocr_texts = extract_all_pdfs(pdf_dir)
print(f"\n共提取 {len(extracted_items)} 个检测项")
# 提取患者基本信息复用已有的OCR文本不再重复调用OCR
patient_info = {}
if ocr_texts:
print("\n 📋 提取患者基本信息...")
first_ocr_text = next(iter(ocr_texts.values()))
patient_info = extract_patient_info(first_ocr_text)
print(f" 姓名: {patient_info.get('name', '未提取')}")
print(f" 性别: {patient_info.get('gender', '未提取')}")
print(f" 年龄: {patient_info.get('age', '未提取')}")
print(f" 体检时间: {patient_info.get('exam_time', '未提取')}")
print(f" 报告时间: {patient_info.get('report_time', '未提取')}")
# 保存提取的数据包含PDF文件信息和患者信息用于后续比较
with open(extracted_file, 'w', encoding='utf-8') as f:
json.dump({
'total_items': len(extracted_items),
'items': extracted_items,
'pdf_files': pdf_files_info, # 记录PDF文件信息
'patient_info': patient_info # 记录患者信息
}, f, ensure_ascii=False, indent=2)
print(f"✓ 数据已保存到: {extracted_file}")
# 设置全局DeepSeek API Key
global DEEPSEEK_API_KEY
if deepseek_api_key:
DEEPSEEK_API_KEY = deepseek_api_key
print("\n" + "=" * 60)
print("步骤2: 与模板结构匹配")
print("=" * 60)
with open(template_config_path, 'r', encoding='utf-8') as f:
template_config = json.load(f)
matched_data = match_with_template(extracted_items, template_config)
# 步骤2.5: 使用DeepSeek补充参考范围和判断异常
if use_deepseek and deepseek_api_key:
print("\n" + "=" * 60)
print("步骤2.5: 智能补充参考范围和异常判断")
print("=" * 60)
matched_data = enhance_data_with_deepseek(matched_data, deepseek_api_key)
print("\n" + "=" * 60)
print("步骤3: 填入Word模板")
print("=" * 60)
fill_word_template(word_template_path, matched_data, output_path, deepseek_api_key, patient_info)
# 步骤4: 处理额外检测项目
# 注意步骤3已经通过DeepSeek分类处理了大部分项目这里只处理真正未被处理的项目
print("\n" + "=" * 60)
print("步骤4: 处理额外检测项目")
print("=" * 60)
# 暂时禁用额外项目处理因为步骤3已经通过DeepSeek分类处理了所有项目
# 如果需要启用需要修改extra_items_handler.py排除已在步骤3中处理的项目
print(" 额外项目已在步骤3中通过DeepSeek分类处理")
# try:
# from extra_items_handler import process_extra_items
# process_extra_items(extracted_items, str(output_path), deepseek_api_key)
# except Exception as e:
# print(f" ⚠️ 额外项目处理失败: {e}")
# import traceback
# traceback.print_exc()
# 步骤5: 填充异常指标汇总
print("\n" + "=" * 60)
print("步骤5: 填充异常指标汇总")
print("=" * 60)
# 收集异常项目
abnormal_items = []
for abb, data in matched_data.items():
point = data.get('point', '')
if point in ['', '', 'H', 'L', '', '']:
abnormal_items.append({
'abb': abb,
'name': data.get('project', abb),
'result': data.get('result', ''),
'point': point,
'reference': data.get('reference', ''),
'unit': data.get('unit', '')
})
if abnormal_items:
print(f" 发现 {len(abnormal_items)} 个异常项目")
doc = Document(output_path)
from health_content_generator import fill_abnormal_index_summary, generate_item_explanations
# 获取异常项目的临床意义解释(优先使用模板解释)
item_explanations = generate_item_explanations(abnormal_items, deepseek_api_key, call_deepseek_api if use_deepseek else None)
fill_abnormal_index_summary(doc, abnormal_items, item_explanations)
# 使用安全保存
from xml_safe_save import safe_save
safe_save(doc, output_path, word_template_path)
else:
print(" 没有异常项目")
print("\n" + "=" * 60)
print("步骤6: 清理空白数据行")
print("=" * 60)
clean_empty_rows(output_path, output_path, patient_info)
print("\n" + "=" * 60)
print("步骤7: 格式整理(表格间空行 + 模块间分页符)")
print("=" * 60)
format_document_structure(output_path, output_path)
# 步骤8: 修复保护区域
print("\n" + "=" * 60)
print("步骤8: 修复保护区域(前四页)")
print("=" * 60)
print(" 策略: 从原始模板复制前四页,保留所有图片和布局")
copy_protected_region_from_template(word_template_path, output_path, protection_boundary)
# 步骤8.5: 填充患者基本信息
print("\n" + "=" * 60)
print("步骤8.5: 填充患者基本信息")
print("=" * 60)
if patient_info and any(patient_info.values()):
doc = Document(output_path)
fill_patient_info_in_template(doc, patient_info)
doc.save(output_path)
print(f" ✓ 患者信息已填充")
else:
print(" ⚠️ 未提取到患者信息,跳过填充")
# 步骤9: 根据异常项生成健康评估和建议内容(可选)
# 注意必须在步骤8之后执行因为步骤8会从模板复制前四页
if use_deepseek and deepseek_api_key:
print("\n" + "=" * 60)
print("步骤9: 生成健康评估与建议内容")
print("=" * 60)
doc = Document(output_path)
from health_content_generator import generate_and_fill_health_content as gen_health
gen_health(doc, matched_data, deepseek_api_key, call_deepseek_api)
# 直接保存不使用safe_save避免覆盖分页符
doc.save(output_path)
print(f" ✓ 健康内容已保存")
# 步骤10: 修复页脚(确保所有页面都有 Be.U Med logo
print("\n" + "=" * 60)
print("步骤10: 修复页脚")
print("=" * 60)
fix_footer_reference(word_template_path, output_path)
print("\n" + "=" * 60)
print("✅ 全部完成!")
print(f"✅ 输出文件: {output_path}")
print("=" * 60)
if __name__ == '__main__':
import os
force = '--force' in sys.argv or '-f' in sys.argv
# 默认启用 DeepSeek 分析
use_deepseek = '--no-deepseek' not in sys.argv
# 获取DeepSeek API Key优先使用代码中的默认值其次环境变量最后命令行参数
deepseek_key = DEEPSEEK_API_KEY or os.environ.get('DEEPSEEK_API_KEY', '')
for i, arg in enumerate(sys.argv):
if arg in ['--api-key', '-k'] and i + 1 < len(sys.argv):
deepseek_key = sys.argv[i + 1]
break
if use_deepseek and not deepseek_key:
print("⚠️ 使用DeepSeek需要提供API Key")
print(" 方法1: 在代码中设置 DEEPSEEK_API_KEY")
print(" 方法2: 设置环境变量 DEEPSEEK_API_KEY")
print(" 方法3: 使用参数 --api-key YOUR_KEY")
sys.exit(1)
print("=" * 60)
print(" 医疗报告智能提取与填充系统")
print("=" * 60)
print(f" OCR提取: 百度高精度OCR")
print(f" 智能分析: {'DeepSeek ✓' if use_deepseek else '关闭'}")
print(f" 强制刷新: {'' if force else ''}")
print("=" * 60)
main(force_extract=force, use_deepseek=use_deepseek, deepseek_api_key=deepseek_key)