1045 lines
39 KiB
Python
1045 lines
39 KiB
Python
"""
|
||
使用docxtpl填充Word模板
|
||
"""
|
||
from docxtpl import DocxTemplate
|
||
import json
|
||
from pathlib import Path
|
||
|
||
|
||
def clean_extracted_data(items: list) -> list:
|
||
"""清理提取的数据,分离单位和参考范围,过滤无效数据"""
|
||
import re
|
||
|
||
cleaned = []
|
||
|
||
for item in items:
|
||
result = item.get('result', '')
|
||
unit = item.get('unit', '')
|
||
reference = item.get('reference', '')
|
||
project = item.get('project', '')
|
||
|
||
# 跳过无效数据
|
||
if result in ['.', ':', '-', '/', '', None]:
|
||
# 检查unit中是否有实际结果(如 "Yellow [Normal...]")
|
||
if unit:
|
||
# 提取unit开头的结果值
|
||
result_in_unit = re.match(r'^([A-Za-z]+)\s*\[', unit)
|
||
if result_in_unit:
|
||
item['result'] = result_in_unit.group(1)
|
||
unit = re.sub(r'^[A-Za-z]+\s*', '', unit)
|
||
else:
|
||
continue # 跳过无效数据
|
||
else:
|
||
continue
|
||
|
||
# 跳过明显错误的project(如包含Phase、antibody等)
|
||
if any(kw in project.lower() for kw in ['phase', 'antibody', 'treponema']):
|
||
# 这些可能是OCR错误识别的行
|
||
abb = item.get('abb', '').upper()
|
||
if abb in ['PH', 'CU', 'CL', 'CA']: # 这些ABB容易被误匹配
|
||
continue
|
||
|
||
# 如果unit包含[Normal...]或(...)范围信息,分离出来
|
||
if unit:
|
||
# 匹配 [Normal : xxx] 或 [正常 : xxx]
|
||
normal_match = re.search(r'\[Normal\s*[::]\s*([^\]]+)\]', unit, re.IGNORECASE)
|
||
if normal_match:
|
||
if not reference:
|
||
item['reference'] = normal_match.group(1).strip()
|
||
unit = re.sub(r'\[Normal\s*[::][^\]]+\]', '', unit, flags=re.IGNORECASE).strip()
|
||
|
||
# 匹配 (xxx-xxx) 范围
|
||
range_match = re.search(r'\([\d\.\-<>]+\)', unit)
|
||
if range_match and not reference:
|
||
item['reference'] = range_match.group(0)
|
||
unit = re.sub(r'\([\d\.\-<>]+\)', '', unit).strip()
|
||
|
||
# 清理开头的数字(可能是错误解析)
|
||
unit = re.sub(r'^-?\d+\s*', '', unit).strip()
|
||
|
||
item['unit'] = unit
|
||
|
||
cleaned.append(item)
|
||
|
||
return cleaned
|
||
|
||
|
||
def select_best_match(items_by_abb: dict) -> dict:
|
||
"""当同一ABB有多个条目时,选择最佳的一个"""
|
||
import re
|
||
|
||
best = {}
|
||
for abb, items in items_by_abb.items():
|
||
if len(items) == 1:
|
||
best[abb] = items[0]
|
||
else:
|
||
# 选择有有效数值结果的
|
||
scored = []
|
||
for item in items:
|
||
score = 0
|
||
result = item.get('result', '')
|
||
|
||
# 有数值结果加分
|
||
if re.search(r'\d+\.?\d*', result):
|
||
score += 10
|
||
|
||
# 有参考范围加分
|
||
if item.get('reference'):
|
||
score += 5
|
||
|
||
# 有单位加分
|
||
if item.get('unit') and len(item.get('unit', '')) < 20:
|
||
score += 3
|
||
|
||
# 定性结果(Negative/Positive等)也有效
|
||
if result.lower() in ['negative', 'positive', 'normal', 'reactive', 'non-reactive']:
|
||
score += 8
|
||
|
||
scored.append((score, item))
|
||
|
||
# 选择得分最高的
|
||
scored.sort(key=lambda x: x[0], reverse=True)
|
||
best[abb] = scored[0][1]
|
||
|
||
return best
|
||
|
||
|
||
def build_context(matched_data: dict) -> dict:
|
||
"""
|
||
将匹配数据转换为docxtpl上下文格式
|
||
|
||
Args:
|
||
matched_data: {ABB: {result, unit, reference, point}}
|
||
|
||
Returns:
|
||
docxtpl context dict
|
||
"""
|
||
import re
|
||
context = {}
|
||
|
||
# 模块映射(根据project名称和ABB推断模块)
|
||
def get_module(abb, project, result):
|
||
abb_upper = abb.upper()
|
||
project_lower = project.lower()
|
||
result_lower = result.lower() if result else ''
|
||
|
||
# 尿检特有项目
|
||
urine_projects = ['color', 'specific gravity', 'protein', 'glucose', 'ketone',
|
||
'nitrite', 'turbidity', '颜色', '比重', '蛋白', '糖', '酮体', '亚硝酸']
|
||
if any(kw in project_lower for kw in urine_projects):
|
||
return 'URINE'
|
||
|
||
# 尿检WBC特征:project是"WBC"且result是小数字或Negative/Positive
|
||
if abb_upper == 'WBC' and project_lower == 'wbc':
|
||
return 'URINE'
|
||
if abb_upper == 'WBC' and 'total' in project_lower:
|
||
return 'CBC'
|
||
|
||
# pH在尿检中
|
||
if abb_upper == 'PH' and 'ph' in project_lower and len(project) < 20:
|
||
return 'URINE'
|
||
|
||
# 定性结果通常是尿检
|
||
if abb_upper in ['PRO', 'GLU', 'KET', 'NIT', 'BLD'] and result_lower in ['negative', 'positive', 'trace']:
|
||
return 'URINE'
|
||
|
||
return ''
|
||
|
||
# 重复ABB列表
|
||
duplicate_abbs = ['PRO', 'WBC', 'COLOR', 'PH', 'GLU', 'SG', 'NIT', 'KET', 'BLD', 'ERY']
|
||
|
||
# ABB别名映射:提取数据ABB -> 模板变量名格式
|
||
# 解决如 CA153 vs CA15_3、CA199 vs CA19_9 的格式差异
|
||
abb_aliases = {
|
||
'CA153': 'CA15_3',
|
||
'CA199': 'CA19_9',
|
||
'ABO': 'BLOODTYPE', # ABO血型 -> BLOODTYPE
|
||
'RH': 'BLOODTYPERH', # Rh血型 -> BLOODTYPERH
|
||
'CKMB': 'CK_MB', # 心肌酶
|
||
}
|
||
|
||
for abb, data in matched_data.items():
|
||
# 标准化变量名(只保留字母数字下划线)
|
||
var_name = abb.replace('-', '_').replace('/', '_').replace('%', 'pct')
|
||
var_name = re.sub(r'[^a-zA-Z0-9_]', '', var_name)
|
||
|
||
# 检查是否有别名映射
|
||
abb_upper = abb.upper()
|
||
if abb_upper in abb_aliases:
|
||
alias_var = abb_aliases[abb_upper]
|
||
# 同时生成别名格式的变量
|
||
context[f"{alias_var}_result"] = data.get('result', '')
|
||
context[f"{alias_var}_point"] = data.get('point', '')
|
||
context[f"{alias_var}_refer"] = data.get('reference', '')
|
||
context[f"{alias_var}_unit"] = data.get('unit', '')
|
||
if not var_name or var_name[0].isdigit():
|
||
var_name = 'V_' + var_name
|
||
|
||
# 对于重复ABB,根据project推断模块并添加前缀
|
||
if abb.upper() in duplicate_abbs:
|
||
module = get_module(abb, data.get('project', ''), data.get('result', ''))
|
||
if module:
|
||
var_name_with_module = f"{module}_{var_name}"
|
||
context[f"{var_name_with_module}_result"] = data.get('result', '')
|
||
context[f"{var_name_with_module}_point"] = data.get('point', '')
|
||
context[f"{var_name_with_module}_refer"] = data.get('reference', '')
|
||
context[f"{var_name_with_module}_unit"] = data.get('unit', '')
|
||
|
||
# 同时保留不带前缀的(兼容)
|
||
context[f"{var_name}_result"] = data.get('result', '')
|
||
context[f"{var_name}_point"] = data.get('point', '')
|
||
context[f"{var_name}_refer"] = data.get('reference', '')
|
||
context[f"{var_name}_unit"] = data.get('unit', '')
|
||
|
||
return context
|
||
|
||
|
||
def fill_template(template_path: str, matched_data: dict, output_path: str):
|
||
"""
|
||
使用docxtpl填充模板
|
||
|
||
Args:
|
||
template_path: docxtpl格式的模板路径
|
||
matched_data: 匹配的数据
|
||
output_path: 输出文件路径
|
||
"""
|
||
doc = DocxTemplate(template_path)
|
||
|
||
# 构建上下文
|
||
context = build_context(matched_data)
|
||
|
||
print(f"准备填充 {len(context)} 个变量")
|
||
|
||
# 渲染
|
||
doc.render(context)
|
||
|
||
# 保存
|
||
doc.save(output_path)
|
||
print(f"[OK] 已保存到: {output_path}")
|
||
|
||
return doc
|
||
|
||
|
||
def clean_empty_rows(doc_path: str, output_path: str):
|
||
"""清理空白数据行,合并表格"""
|
||
from docx import Document
|
||
from docx.text.paragraph import Paragraph as EarlyPara
|
||
import re
|
||
import copy
|
||
|
||
doc = Document(doc_path)
|
||
|
||
# === 首先删除"异常指标汇总"区域的所有表格 ===
|
||
# 这些表格在第一个检测模块之前,不应该存在
|
||
body_early = doc._body._body
|
||
children_early = list(body_early)
|
||
|
||
# 检测模块关键词(必须精确匹配检测模块标题)
|
||
detection_kw = ['urine detection', '尿液检测', 'complete blood count', '血常规',
|
||
'blood sugar', '血糖', 'blood lipid', '血脂', 'liver function', '肝功能',
|
||
'kidney function', '肾功能', 'thyroid', '甲状腺', 'coagulation', '凝血',
|
||
'infectious', '传染病', 'electrolyte', '电解质']
|
||
exclude_kw = ['health program', '健康方案', 'health report', '健康报告',
|
||
'abnormal', '异常', 'overall', '整体', 'assessment', '评估',
|
||
'blood glucose', 'hematology', 'hormonal', 'immunology', 'nutrition']
|
||
|
||
# 找第一个检测模块位置(查找精确的模块标题)
|
||
first_module_idx = len(children_early)
|
||
for idx, elem in enumerate(children_early):
|
||
if elem.tag.endswith('}p'):
|
||
try:
|
||
p = EarlyPara(elem, doc)
|
||
txt = p.text.strip().lower()
|
||
# 检测模块标题通常是短文本且包含特定关键词
|
||
if txt and len(txt) < 80:
|
||
is_mod = any(k in txt for k in detection_kw)
|
||
is_exc = any(k in txt for k in exclude_kw)
|
||
if is_mod and not is_exc:
|
||
first_module_idx = idx
|
||
print(f" 找到第一个检测模块: 位置{idx}")
|
||
break
|
||
except:
|
||
pass
|
||
|
||
# 删除第一个检测模块之前的所有表格(无论有无数据)
|
||
removed_early = 0
|
||
for idx, elem in enumerate(children_early):
|
||
if idx >= first_module_idx:
|
||
break
|
||
if elem.tag.endswith('}tbl'):
|
||
try:
|
||
elem.getparent().remove(elem)
|
||
removed_early += 1
|
||
except:
|
||
pass
|
||
|
||
if removed_early > 0:
|
||
print(f"[OK] 删除异常指标汇总区域表格: {removed_early} 个")
|
||
|
||
removed_rows = 0
|
||
merged_count = 0
|
||
|
||
def has_data_in_row(cells):
|
||
# 有效的定性结果列表
|
||
valid_qualitative = [
|
||
'negative', 'positive', 'normal', 'reactive', 'non-reactive',
|
||
'trace', 'clear', 'cloudy', 'turbid',
|
||
'yellow', 'pale yellow', 'dark yellow', 'amber', 'straw', # 尿液颜色
|
||
'red', 'brown', 'green', 'orange',
|
||
'a', 'b', 'ab', 'o', 'rh+', 'rh-', # 血型
|
||
'detected', 'not detected', 'present', 'absent'
|
||
]
|
||
|
||
# 只以“Result列”判断是否有数据,避免把 Project/Refer 误判为结果
|
||
# 模板结构通常为:
|
||
# - 11列:0 ABB, 1-2 Project, 3-4 Result, 5-6 Point, 7-8 Refer, 9-10 Unit
|
||
# - 6列:0 ABB, 1 Project, 2 Result, 3 Point, 4 Refer, 5 Unit
|
||
if len(cells) >= 11:
|
||
result_col_candidates = [3, 4]
|
||
elif len(cells) >= 6:
|
||
result_col_candidates = [2, 3]
|
||
else:
|
||
result_col_candidates = [2]
|
||
|
||
result_candidates = []
|
||
for col_idx in result_col_candidates:
|
||
if col_idx < len(cells):
|
||
txt = (cells[col_idx].text or '').strip()
|
||
if txt:
|
||
result_candidates.append(txt)
|
||
result_text = result_candidates[0] if result_candidates else ''
|
||
|
||
if not result_text:
|
||
return False
|
||
if result_text in ['', '-', '/', ' ', '.', ':', '{{', '}}']:
|
||
return False
|
||
if result_text.startswith('{{'):
|
||
return False
|
||
|
||
# 排除“范围值”形态(常出现在 Refer 列,但模板错位时也可能落到 Result/Point 列)
|
||
if re.match(r'^[\(\[]?\s*[-+]?\d+(?:\.\d+)?\s*[-–~]\s*[-+]?\d+(?:\.\d+)?\s*[\)\]]?$', result_text):
|
||
return False
|
||
|
||
if re.search(r'\d', result_text):
|
||
return True
|
||
if result_text.lower() in valid_qualitative:
|
||
return True
|
||
if len(result_text) > 2 and result_text.isalpha():
|
||
return True
|
||
return False
|
||
|
||
def is_header_row(row_text, cells=None):
|
||
"""精确识别表头行"""
|
||
# 先排除描述行,避免被误判为表头
|
||
if 'clinical significance' in row_text or '临床意义' in row_text:
|
||
return False
|
||
|
||
# 表头必须具备“Abb/简称 + Project/项目 + Result/结果”组合特征
|
||
has_abb = ('abb' in row_text) or ('简称' in row_text)
|
||
has_project = ('project' in row_text) or ('项目' in row_text)
|
||
has_result = ('result' in row_text) or ('结果' in row_text)
|
||
if not (has_abb and has_project and has_result):
|
||
return False
|
||
|
||
if cells:
|
||
non_empty_cells = [c for c in cells if c.text.strip()]
|
||
if len(non_empty_cells) < 2:
|
||
return False
|
||
if any(len(c.text.strip()) > 30 for c in cells):
|
||
return False
|
||
|
||
return True
|
||
|
||
def is_description_row(row_text):
|
||
return 'clinical significance' in row_text or '临床意义' in row_text
|
||
|
||
def is_data_row(first_cell):
|
||
if first_cell and 1 <= len(first_cell) <= 20:
|
||
clean = re.sub(r'[^a-zA-Z0-9]', '', first_cell)
|
||
return bool(clean) and clean.isalnum()
|
||
return False
|
||
|
||
def analyze_table(table):
|
||
info = {'header_idx': -1, 'desc_indices': [], 'data_with_result': [], 'data_without_result': []}
|
||
for row_idx, row in enumerate(table.rows):
|
||
cells = row.cells
|
||
if len(cells) < 2:
|
||
continue
|
||
row_text = ' '.join([c.text.strip().lower() for c in cells])
|
||
first_cell = cells[0].text.strip()
|
||
|
||
if is_header_row(row_text, cells):
|
||
info['header_idx'] = row_idx
|
||
elif is_description_row(row_text):
|
||
info['desc_indices'].append(row_idx)
|
||
elif is_data_row(first_cell):
|
||
if has_data_in_row(cells):
|
||
info['data_with_result'].append(row_idx)
|
||
else:
|
||
info['data_without_result'].append(row_idx)
|
||
return info
|
||
|
||
def is_special_table(table):
|
||
try:
|
||
if len(table.rows) != 3:
|
||
return False
|
||
row2_text = ' '.join([c.text for c in table.rows[2].cells]).lower()
|
||
return ('clinical significance' in row2_text) or ('临床意义' in row2_text)
|
||
except:
|
||
return False
|
||
|
||
def special_table_has_data(table):
|
||
try:
|
||
if len(table.rows) < 2:
|
||
return False
|
||
cells = table.rows[1].cells
|
||
if len(cells) < 3:
|
||
return False
|
||
result_text = (cells[2].text or '').strip()
|
||
if not result_text:
|
||
return False
|
||
if result_text in ['', '-', '/', '.', ':']:
|
||
return False
|
||
if result_text.startswith('{{'):
|
||
return False
|
||
return True
|
||
except:
|
||
return False
|
||
|
||
removed_special_tables = 0
|
||
for table in list(doc.tables):
|
||
if is_special_table(table) and not special_table_has_data(table):
|
||
try:
|
||
table._tbl.getparent().remove(table._tbl)
|
||
removed_special_tables += 1
|
||
except:
|
||
pass
|
||
|
||
# 获取表格顺序
|
||
body = doc._body._body
|
||
table_order = []
|
||
table_elem_indices = {} # 记录每个表格在body中的元素索引
|
||
body_children = list(body)
|
||
for idx, elem in enumerate(body_children):
|
||
if elem.tag.endswith('}tbl'):
|
||
for t in doc.tables:
|
||
if t._tbl is elem:
|
||
table_order.append(t)
|
||
table_elem_indices[t] = idx
|
||
break
|
||
|
||
# 找到第一个检测模块标题的位置(用于排除文档开头的非检测模块表格)
|
||
from docx.text.paragraph import Paragraph as Para
|
||
first_module_elem_idx = len(body_children) # 默认在最后
|
||
for idx, elem in enumerate(body_children):
|
||
if elem.tag.endswith('}p'):
|
||
try:
|
||
p = Para(elem, doc)
|
||
txt = p.text.strip().lower()
|
||
# 检查是否是检测模块标题(排除非检测模块)
|
||
if txt and len(txt) < 50:
|
||
is_module = any(kw in txt for kw in module_keywords)
|
||
is_exclude = any(kw in txt for kw in exclude_keywords)
|
||
if is_module and not is_exclude:
|
||
first_module_elem_idx = idx
|
||
break
|
||
except:
|
||
pass
|
||
|
||
# 合并表格(只在下一个表头之前搜索,避免跨模块吸走数据)
|
||
# 排除文档开头(第一个检测模块之前)的表格,避免把数据合并到非检测模块表格
|
||
tables_to_remove = set()
|
||
for i in range(len(table_order)):
|
||
if table_order[i] in tables_to_remove:
|
||
continue
|
||
|
||
t1 = table_order[i]
|
||
t1_elem_idx = table_elem_indices.get(t1, 0)
|
||
|
||
# 跳过第一个检测模块之前的表格(如"异常指标汇总")
|
||
if t1_elem_idx < first_module_elem_idx:
|
||
continue
|
||
|
||
info1 = analyze_table(t1)
|
||
|
||
if info1['header_idx'] >= 0 and len(info1['data_with_result']) == 0:
|
||
next_header_pos = None
|
||
for k in range(i + 1, len(table_order)):
|
||
if table_order[k] in tables_to_remove:
|
||
continue
|
||
k_info = analyze_table(table_order[k])
|
||
if k_info['header_idx'] >= 0 and len(k_info['data_with_result']) == 0:
|
||
next_header_pos = k
|
||
break
|
||
search_end = next_header_pos if next_header_pos is not None else len(table_order)
|
||
|
||
candidates = []
|
||
for j in range(i + 1, search_end):
|
||
if table_order[j] in tables_to_remove:
|
||
continue
|
||
candidate = table_order[j]
|
||
candidate_info = analyze_table(candidate)
|
||
if len(candidate_info['data_with_result']) > 0:
|
||
candidates.append((candidate, candidate_info))
|
||
|
||
if not candidates:
|
||
continue
|
||
|
||
# 取第一条数据的项目名作为标题
|
||
title_text = ''
|
||
try:
|
||
first_candidate, first_candidate_info = candidates[0]
|
||
if first_candidate_info.get('data_with_result'):
|
||
data_row_idx = first_candidate_info['data_with_result'][0]
|
||
if len(first_candidate.rows[data_row_idx].cells) > 1:
|
||
title_text = first_candidate.rows[data_row_idx].cells[1].text.strip()
|
||
if not title_text:
|
||
title_text = first_candidate.rows[data_row_idx].cells[0].text.strip()
|
||
except:
|
||
title_text = ''
|
||
|
||
# 清空:删除表头行之后所有旧行,但尽量保留表头下一行作为“标题行结构”
|
||
header_idx = info1['header_idx']
|
||
title_row_idx = header_idx + 1
|
||
keep_title_row = title_row_idx < len(t1.rows)
|
||
delete_from = (title_row_idx + 1) if keep_title_row else (header_idx + 1)
|
||
for ridx in range(len(t1.rows) - 1, delete_from - 1, -1):
|
||
try:
|
||
t1._tbl.remove(t1.rows[ridx]._tr)
|
||
removed_rows += 1
|
||
except:
|
||
pass
|
||
|
||
if not keep_title_row:
|
||
try:
|
||
new_tr = copy.deepcopy(t1.rows[header_idx]._tr)
|
||
t1._tbl.insert(title_row_idx, new_tr)
|
||
except:
|
||
pass
|
||
|
||
try:
|
||
if title_row_idx < len(t1.rows):
|
||
title_row = t1.rows[title_row_idx]
|
||
for c in title_row.cells:
|
||
c.text = ''
|
||
if title_text:
|
||
title_row.cells[0].text = title_text
|
||
except:
|
||
pass
|
||
|
||
for candidate, candidate_info in candidates:
|
||
for row_idx in candidate_info['data_with_result'] + candidate_info['desc_indices']:
|
||
new_tr = copy.deepcopy(candidate.rows[row_idx]._tr)
|
||
t1._tbl.append(new_tr)
|
||
|
||
tables_to_remove.add(candidate)
|
||
merged_count += 1
|
||
|
||
for t in tables_to_remove:
|
||
try:
|
||
t._tbl.getparent().remove(t._tbl)
|
||
except:
|
||
pass
|
||
|
||
# 删除逻辑:
|
||
# 1. 两个数据行都没数据 → 删除整个表格
|
||
# 2. 一行有数据一行没有 → 只删没数据的行,保留解释行
|
||
tables_to_delete = []
|
||
|
||
for table in doc.tables:
|
||
info = analyze_table(table)
|
||
data_with = info['data_with_result'] # 有数据的行
|
||
data_without = info['data_without_result'] # 没数据的行
|
||
|
||
# 情况1:所有数据行都没有数据 → 删除整个表格
|
||
if len(data_with) == 0 and len(data_without) > 0:
|
||
tables_to_delete.append(table)
|
||
continue
|
||
|
||
# 情况2:有些行有数据,有些没有 → 只删除没数据的行
|
||
if len(data_with) > 0 and len(data_without) > 0:
|
||
for row_idx in sorted(data_without, reverse=True):
|
||
try:
|
||
table._tbl.remove(table.rows[row_idx]._tr)
|
||
removed_rows += 1
|
||
except:
|
||
pass
|
||
|
||
# 删除整个表格
|
||
for table in tables_to_delete:
|
||
try:
|
||
table._tbl.getparent().remove(table._tbl)
|
||
removed_rows += 1
|
||
except:
|
||
pass
|
||
|
||
# 补全合并后的标题行(表头下一行为空时)
|
||
for table in doc.tables:
|
||
info = analyze_table(table)
|
||
if info['header_idx'] < 0:
|
||
continue
|
||
if len(info['data_with_result']) == 0:
|
||
continue
|
||
|
||
title_row_idx = info['header_idx'] + 1
|
||
if title_row_idx >= len(table.rows):
|
||
continue
|
||
|
||
try:
|
||
title_row = table.rows[title_row_idx]
|
||
# 如果表头下一行本身就是数据行,则需要插入一个独立标题行
|
||
try:
|
||
first_cell = title_row.cells[0].text.strip() if title_row.cells else ''
|
||
if is_data_row(first_cell) and has_data_in_row(title_row.cells):
|
||
extracted_title = ''
|
||
try:
|
||
if len(title_row.cells) > 1:
|
||
extracted_title = title_row.cells[1].text.strip()
|
||
if not extracted_title:
|
||
extracted_title = title_row.cells[0].text.strip()
|
||
except:
|
||
extracted_title = ''
|
||
|
||
header_tr = copy.deepcopy(table.rows[info['header_idx']]._tr)
|
||
table._tbl.insert(title_row_idx, header_tr)
|
||
title_row = table.rows[title_row_idx]
|
||
try:
|
||
for c in title_row.cells:
|
||
c.text = ''
|
||
if extracted_title:
|
||
title_row.cells[0].text = extracted_title
|
||
except:
|
||
pass
|
||
continue
|
||
except:
|
||
pass
|
||
|
||
if any((c.text or '').strip() for c in title_row.cells):
|
||
continue
|
||
|
||
first_data_idx = info['data_with_result'][0]
|
||
if first_data_idx >= len(table.rows):
|
||
continue
|
||
data_row = table.rows[first_data_idx]
|
||
|
||
title_text = ''
|
||
if len(data_row.cells) > 1:
|
||
title_text = data_row.cells[1].text.strip()
|
||
if not title_text:
|
||
title_text = data_row.cells[0].text.strip()
|
||
if not title_text:
|
||
continue
|
||
|
||
for c in title_row.cells:
|
||
c.text = ''
|
||
title_row.cells[0].text = title_text
|
||
except:
|
||
pass
|
||
|
||
# 删除没有数据且没有表头的表格(保留表头表格)
|
||
removed_tables = 0
|
||
for table in list(doc.tables):
|
||
info = analyze_table(table)
|
||
# 只删除既没有数据也没有表头的表格
|
||
if len(info['data_with_result']) == 0 and info['header_idx'] < 0:
|
||
try:
|
||
table._tbl.getparent().remove(table._tbl)
|
||
removed_tables += 1
|
||
except:
|
||
pass
|
||
|
||
# === 新增:梳理文档结构 ===
|
||
# 模块标题关键词(24个文字模块分类)
|
||
module_keywords = [
|
||
# 1. 尿液检测
|
||
'urine detection', 'urine analysis', 'urinalysis', '尿液检测', '尿常规',
|
||
# 2. 血常规
|
||
'complete blood count', 'blood routine', 'cbc', '血常规',
|
||
# 3. 血糖
|
||
'blood sugar', 'glucose', 'blood glucose', '血糖', '糖代谢',
|
||
# 4. 血脂
|
||
'lipid panel', 'lipid profile', 'blood lipid', '血脂',
|
||
# 5. 血型
|
||
'blood type', 'blood group', 'abo', '血型',
|
||
# 6. 凝血功能
|
||
'coagulation', 'clotting', '凝血功能', '凝血',
|
||
# 7. 传染病四项
|
||
'infectious disease', 'hepatitis', '传染病四项', '传染病',
|
||
# 8. 血电解质
|
||
'electrolyte', 'serum electrolyte', '血电解质', '电解质',
|
||
# 9. 肝功能
|
||
'liver function', 'hepatic function', '肝功能',
|
||
# 10. 肾功能
|
||
'kidney function', 'renal function', '肾功能',
|
||
# 11. 心肌酶谱
|
||
'cardiac enzyme', 'myocardial enzyme', '心肌酶谱', '心肌酶',
|
||
# 12. 甲状腺功能
|
||
'thyroid function', 'thyroid', '甲状腺功能', '甲状腺',
|
||
# 13. 心脑血管风险因子
|
||
'cardiovascular risk', 'cerebrovascular', '心脑血管风险因子', '心脑血管', '心血管',
|
||
# 14. 骨代谢
|
||
'bone metabolism', 'bone marker', '骨代谢',
|
||
# 15. 微量元素
|
||
'trace element', 'microelement', 'heavy metal', '微量元素', '重金属',
|
||
# 16. 淋巴细胞亚群
|
||
'lymphocyte subsets', 'lymphocyte subpopulation', '淋巴细胞亚群',
|
||
# 17. 体液免疫
|
||
'humoral immunity', 'immunoglobulin', '体液免疫',
|
||
# 18. 炎症反应
|
||
'inflammation', 'inflammatory', '炎症反应', '炎症',
|
||
# 19. 自身抗体
|
||
'autoantibody', 'autoimmune', '自身抗体', '自身免疫',
|
||
# 20. 女性荷尔蒙
|
||
'female hormone', 'estrogen', 'progesterone', '女性荷尔蒙', '女性激素',
|
||
# 21. 男性荷尔蒙
|
||
'male hormone', 'testosterone', 'androgen', '男性荷尔蒙', '男性激素',
|
||
# 22. 肿瘤标记物
|
||
'tumor marker', 'cancer marker', '肿瘤标记物', '肿瘤标志物',
|
||
# 23. 影像学检查
|
||
'imaging', 'radiology', 'ultrasound', 'x-ray', 'ct', 'mri', '影像学检查', '影像',
|
||
# 24. 女性专项检查
|
||
'female specific', 'gynecological', 'gynecology', '女性专项检查', '妇科',
|
||
]
|
||
|
||
# 排除列表:这些不是检测模块,不应该被识别为模块标题
|
||
exclude_keywords = [
|
||
'client health program', '客户健康方案',
|
||
'health report', '健康报告',
|
||
'overall health', '整体健康',
|
||
'health assessment', '健康评估',
|
||
'abnormal index', '异常指标',
|
||
'be.u', 'wellness center',
|
||
'name', 'gender', 'age', 'nation', # 用户信息字段
|
||
'姓名', '性别', '年龄', '国籍',
|
||
]
|
||
|
||
def contains_exclude_keyword(text: str) -> bool:
|
||
"""检查文本是否包含排除关键词"""
|
||
text_lower = text.lower()
|
||
return any(kw in text_lower for kw in exclude_keywords)
|
||
|
||
def is_module_title_table(table):
|
||
"""检查表格是否是模块标题表格"""
|
||
if len(table.rows) < 1:
|
||
return False
|
||
try:
|
||
for row_idx in range(min(2, len(table.rows))):
|
||
row_text = ' '.join([c.text.lower().strip() for c in table.rows[row_idx].cells])
|
||
# 先检查排除关键词
|
||
if contains_exclude_keyword(row_text):
|
||
return False
|
||
for kw in module_keywords:
|
||
if kw in row_text:
|
||
return True
|
||
except:
|
||
pass
|
||
return False
|
||
|
||
def table_has_data(table):
|
||
"""检查表格是否有有效数据"""
|
||
info = analyze_table(table)
|
||
return len(info['data_with_result']) > 0
|
||
|
||
def is_module_title_paragraph(p_text: str) -> bool:
|
||
"""检查段落是否是模块标题(文字模块)"""
|
||
if not p_text:
|
||
return False
|
||
text = p_text.strip().lower()
|
||
if not text:
|
||
return False
|
||
# 标题通常很短(避免误匹配正文)
|
||
if len(text) > 40:
|
||
return False
|
||
# 先检查排除关键词
|
||
if contains_exclude_keyword(text):
|
||
return False
|
||
return any(kw in text for kw in module_keywords)
|
||
|
||
# 1. 基于body元素顺序识别模块(支持段落标题与表格标题)
|
||
from docx.oxml import OxmlElement
|
||
from docx.oxml.ns import qn as oxml_qn
|
||
from docx.text.paragraph import Paragraph
|
||
from docx.table import Table
|
||
|
||
body = doc._body._body
|
||
body_children = list(body)
|
||
|
||
tbl_map = {t._tbl: t for t in doc.tables}
|
||
|
||
def get_table_from_elem(elem):
|
||
return tbl_map.get(elem)
|
||
|
||
def is_blank_paragraph_elem(elem):
|
||
try:
|
||
p = Paragraph(elem, doc)
|
||
return p.text.strip() == ''
|
||
except:
|
||
return False
|
||
|
||
def create_visible_blank_paragraph():
|
||
"""创建可见的空行段落(含一个空格run,避免被Word折叠)"""
|
||
p = OxmlElement('w:p')
|
||
pPr = OxmlElement('w:pPr')
|
||
spacing = OxmlElement('w:spacing')
|
||
spacing.set(oxml_qn('w:after'), '0')
|
||
spacing.set(oxml_qn('w:before'), '0')
|
||
pPr.append(spacing)
|
||
p.append(pPr)
|
||
|
||
r = OxmlElement('w:r')
|
||
t = OxmlElement('w:t')
|
||
t.text = ' '
|
||
r.append(t)
|
||
p.append(r)
|
||
return p
|
||
|
||
def is_module_start_elem(elem):
|
||
if elem.tag.endswith('}tbl'):
|
||
t = get_table_from_elem(elem)
|
||
return bool(t) and is_module_title_table(t)
|
||
if elem.tag.endswith('}p'):
|
||
try:
|
||
p = Paragraph(elem, doc)
|
||
return is_module_title_paragraph(p.text)
|
||
except:
|
||
return False
|
||
return False
|
||
|
||
# 收集所有模块起点
|
||
module_start_indices = [i for i, e in enumerate(body_children) if is_module_start_elem(e)]
|
||
|
||
# === 模块删除逻辑(删除无数据的文字模块及其表格)===
|
||
# 规则:当一个文字模块中没有任何表格有数据时,删除该模块标题和所有表格
|
||
removed_modules = 0
|
||
elements_removed_in_modules = 0
|
||
|
||
if module_start_indices:
|
||
# 从后往前处理每个模块,避免索引变化问题
|
||
for idx in range(len(module_start_indices) - 1, -1, -1):
|
||
start_i = module_start_indices[idx]
|
||
end_i = module_start_indices[idx + 1] if idx + 1 < len(module_start_indices) else len(body_children)
|
||
|
||
# 获取模块区间内的所有元素
|
||
module_elements = body_children[start_i:end_i]
|
||
|
||
# 检查模块内是否有任何表格有数据
|
||
module_has_data = False
|
||
module_tables = []
|
||
for e in module_elements:
|
||
if e.tag.endswith('}tbl'):
|
||
t = get_table_from_elem(e)
|
||
if t:
|
||
module_tables.append(e)
|
||
if table_has_data(t):
|
||
module_has_data = True
|
||
|
||
# 如果模块没有数据,删除模块标题和所有表格
|
||
if not module_has_data and module_tables:
|
||
# 删除模块内的所有元素(从后往前删除)
|
||
for e in reversed(module_elements):
|
||
try:
|
||
e.getparent().remove(e)
|
||
elements_removed_in_modules += 1
|
||
except:
|
||
pass
|
||
removed_modules += 1
|
||
|
||
# 重新抓取body(删除后索引已变化)
|
||
body = doc._body._body
|
||
body_children = list(body)
|
||
|
||
# 2. 在模块内表格之间添加空行(段落/表格标题均作为模块边界)
|
||
space_count = 0
|
||
current_module_started = False
|
||
prev_was_data_table = False
|
||
|
||
i = 0
|
||
while i < len(body_children):
|
||
elem = body_children[i]
|
||
|
||
if is_module_start_elem(elem):
|
||
current_module_started = True
|
||
prev_was_data_table = False
|
||
i += 1
|
||
continue
|
||
|
||
if current_module_started and elem.tag.endswith('}tbl'):
|
||
t = get_table_from_elem(elem)
|
||
is_title = bool(t) and is_module_title_table(t)
|
||
is_data = bool(t) and (not is_title) and table_has_data(t)
|
||
|
||
if is_data:
|
||
# 向上跳过空段落,判断前一个有效元素是否为数据表格
|
||
j = i - 1
|
||
while j >= 0 and body_children[j].tag.endswith('}p') and is_blank_paragraph_elem(body_children[j]):
|
||
j -= 1
|
||
|
||
prev_is_data_table = False
|
||
if j >= 0 and body_children[j].tag.endswith('}tbl'):
|
||
prev_t = get_table_from_elem(body_children[j])
|
||
if prev_t and (not is_module_title_table(prev_t)) and table_has_data(prev_t):
|
||
prev_is_data_table = True
|
||
|
||
if prev_is_data_table:
|
||
# 保证两表之间有一个“可见空行”段落
|
||
prev_elem = body_children[i - 1] if i - 1 >= 0 else None
|
||
|
||
# 情况1:紧挨着上一张表(或非空段落)=> 插入可见空行
|
||
if not (prev_elem is not None and prev_elem.tag.endswith('}p') and is_blank_paragraph_elem(prev_elem)):
|
||
empty_p = create_visible_blank_paragraph()
|
||
body.insert(i, empty_p)
|
||
space_count += 1
|
||
body_children = list(body)
|
||
i += 1
|
||
else:
|
||
# 情况2:已有空段落,但可能不可见 => 补一个空格 run
|
||
try:
|
||
p_elem = prev_elem
|
||
has_run = any(c.tag.endswith('}r') for c in list(p_elem))
|
||
if not has_run:
|
||
r = OxmlElement('w:r')
|
||
tt = OxmlElement('w:t')
|
||
tt.text = ' '
|
||
r.append(tt)
|
||
p_elem.append(r)
|
||
space_count += 1
|
||
except:
|
||
pass
|
||
|
||
prev_was_data_table = is_data
|
||
elif elem.tag.endswith('}p'):
|
||
# 如果表格之间已经有段落(无论是否空白),就不重复插入
|
||
pass
|
||
|
||
i += 1
|
||
|
||
doc.save(output_path)
|
||
print(f"[OK] 清理完成: 删除 {removed_rows} 行, 合并 {merged_count} 对表格, 删除 {removed_tables} 个空表格")
|
||
print(f"[OK] 清理特殊表格: 删除 {removed_special_tables} 个空特殊表格")
|
||
print(f"[OK] 结构整理: 删除 {removed_modules} 个无数据模块, 删除 {elements_removed_in_modules} 个模块元素, 插入 {space_count} 个表格间空行")
|
||
return doc
|
||
|
||
def main():
|
||
"""主函数"""
|
||
# 路径配置
|
||
template_path = r"c:\Users\UI\Desktop\医疗报告\template_docxtpl.docx"
|
||
filled_path = r"c:\Users\UI\Desktop\医疗报告\backend\reports\filled_docxtpl_temp.docx"
|
||
reports_dir = Path(__file__).parent / "reports"
|
||
reports_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
def get_next_output_path() -> str:
|
||
existing = list(reports_dir.glob("filled_report_v*.docx"))
|
||
max_v = 0
|
||
for p in existing:
|
||
name = p.stem
|
||
try:
|
||
v_str = name.split("filled_report_v", 1)[1]
|
||
v = int(v_str)
|
||
if v > max_v:
|
||
max_v = v
|
||
except:
|
||
continue
|
||
return str(reports_dir / f"filled_report_v{max_v + 1}.docx")
|
||
|
||
output_path = get_next_output_path()
|
||
|
||
# 优先使用DeepSeek处理后的数据
|
||
deepseek_file = Path(__file__).parent / "deepseek_processed_data.json"
|
||
extracted_file = Path(__file__).parent / "extracted_medical_data.json"
|
||
|
||
# 加载ABB配置
|
||
from config import load_abb_config
|
||
abb_config = load_abb_config()
|
||
|
||
use_deepseek = deepseek_file.exists()
|
||
|
||
if use_deepseek:
|
||
print("使用DeepSeek处理后的数据")
|
||
with open(deepseek_file, 'r', encoding='utf-8') as f:
|
||
matched_data = json.load(f)
|
||
print(f"加载 {len(matched_data)} 个匹配项")
|
||
|
||
# 直接填充,跳过匹配步骤
|
||
print("\n步骤1: 填充数据...")
|
||
fill_template(template_path, matched_data, filled_path)
|
||
|
||
print("\n步骤2: 清理空白行...")
|
||
clean_empty_rows(filled_path, output_path)
|
||
|
||
import os
|
||
if os.path.exists(filled_path):
|
||
os.remove(filled_path)
|
||
|
||
print(f"\n[SUCCESS] 完成! 输出: {output_path}")
|
||
return
|
||
|
||
# 原有逻辑:使用本地处理
|
||
if not extracted_file.exists():
|
||
print("[ERROR] 未找到提取数据,请先运行 deepseek_process.py 或 extract_and_fill_report.py")
|
||
return
|
||
|
||
with open(extracted_file, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
|
||
if isinstance(data, dict):
|
||
extracted_items = data.get('items', [])
|
||
else:
|
||
extracted_items = data
|
||
|
||
# 清理数据(分离单位和参考范围)
|
||
extracted_items = clean_extracted_data(extracted_items)
|
||
|
||
print(f"加载 {len(extracted_items)} 个提取项")
|
||
|
||
# 使用已加载的ABB配置
|
||
template_abbs = {}
|
||
for abb_upper, info in abb_config.get('abb_to_info', {}).items():
|
||
template_abbs[abb_upper] = info
|
||
# 处理包含/的ABB
|
||
if '/' in abb_upper:
|
||
for part in abb_upper.split('/'):
|
||
template_abbs[part.strip()] = info
|
||
|
||
# 按ABB分组
|
||
items_by_abb = {}
|
||
for item in extracted_items:
|
||
abb = item['abb'].upper()
|
||
if abb not in items_by_abb:
|
||
items_by_abb[abb] = []
|
||
items_by_abb[abb].append(item)
|
||
|
||
# 选择每个ABB的最佳匹配
|
||
best_items = select_best_match(items_by_abb)
|
||
|
||
# 与模板匹配
|
||
matched_data = {}
|
||
for abb, item in best_items.items():
|
||
if abb in template_abbs:
|
||
matched_data[abb] = item
|
||
else:
|
||
for t_abb in template_abbs:
|
||
if abb in t_abb or t_abb in abb:
|
||
matched_data[t_abb] = item
|
||
break
|
||
|
||
print(f"清理后 {len(best_items)} 个有效项, 匹配 {len(matched_data)} 个")
|
||
|
||
# 步骤1: 填充
|
||
print("\n步骤1: 填充数据...")
|
||
fill_template(template_path, matched_data, filled_path)
|
||
|
||
# 步骤2: 清理空行
|
||
print("\n步骤2: 清理空白行...")
|
||
clean_empty_rows(filled_path, output_path)
|
||
|
||
# 删除临时文件
|
||
import os
|
||
if os.path.exists(filled_path):
|
||
os.remove(filled_path)
|
||
|
||
print(f"\n[SUCCESS] 完成! 输出: {output_path}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|