Files
yiliao/backend/fill_with_docxtpl.py

1045 lines
39 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
使用docxtpl填充Word模板
"""
from docxtpl import DocxTemplate
import json
from pathlib import Path
def clean_extracted_data(items: list) -> list:
"""清理提取的数据,分离单位和参考范围,过滤无效数据"""
import re
cleaned = []
for item in items:
result = item.get('result', '')
unit = item.get('unit', '')
reference = item.get('reference', '')
project = item.get('project', '')
# 跳过无效数据
if result in ['.', ':', '-', '/', '', None]:
# 检查unit中是否有实际结果如 "Yellow [Normal...]"
if unit:
# 提取unit开头的结果值
result_in_unit = re.match(r'^([A-Za-z]+)\s*\[', unit)
if result_in_unit:
item['result'] = result_in_unit.group(1)
unit = re.sub(r'^[A-Za-z]+\s*', '', unit)
else:
continue # 跳过无效数据
else:
continue
# 跳过明显错误的project如包含Phase、antibody等
if any(kw in project.lower() for kw in ['phase', 'antibody', 'treponema']):
# 这些可能是OCR错误识别的行
abb = item.get('abb', '').upper()
if abb in ['PH', 'CU', 'CL', 'CA']: # 这些ABB容易被误匹配
continue
# 如果unit包含[Normal...]或(...)范围信息,分离出来
if unit:
# 匹配 [Normal : xxx] 或 [正常 : xxx]
normal_match = re.search(r'\[Normal\s*[:]\s*([^\]]+)\]', unit, re.IGNORECASE)
if normal_match:
if not reference:
item['reference'] = normal_match.group(1).strip()
unit = re.sub(r'\[Normal\s*[:][^\]]+\]', '', unit, flags=re.IGNORECASE).strip()
# 匹配 (xxx-xxx) 范围
range_match = re.search(r'\([\d\.\-<>]+\)', unit)
if range_match and not reference:
item['reference'] = range_match.group(0)
unit = re.sub(r'\([\d\.\-<>]+\)', '', unit).strip()
# 清理开头的数字(可能是错误解析)
unit = re.sub(r'^-?\d+\s*', '', unit).strip()
item['unit'] = unit
cleaned.append(item)
return cleaned
def select_best_match(items_by_abb: dict) -> dict:
"""当同一ABB有多个条目时选择最佳的一个"""
import re
best = {}
for abb, items in items_by_abb.items():
if len(items) == 1:
best[abb] = items[0]
else:
# 选择有有效数值结果的
scored = []
for item in items:
score = 0
result = item.get('result', '')
# 有数值结果加分
if re.search(r'\d+\.?\d*', result):
score += 10
# 有参考范围加分
if item.get('reference'):
score += 5
# 有单位加分
if item.get('unit') and len(item.get('unit', '')) < 20:
score += 3
# 定性结果Negative/Positive等也有效
if result.lower() in ['negative', 'positive', 'normal', 'reactive', 'non-reactive']:
score += 8
scored.append((score, item))
# 选择得分最高的
scored.sort(key=lambda x: x[0], reverse=True)
best[abb] = scored[0][1]
return best
def build_context(matched_data: dict) -> dict:
"""
将匹配数据转换为docxtpl上下文格式
Args:
matched_data: {ABB: {result, unit, reference, point}}
Returns:
docxtpl context dict
"""
import re
context = {}
# 模块映射根据project名称和ABB推断模块
def get_module(abb, project, result):
abb_upper = abb.upper()
project_lower = project.lower()
result_lower = result.lower() if result else ''
# 尿检特有项目
urine_projects = ['color', 'specific gravity', 'protein', 'glucose', 'ketone',
'nitrite', 'turbidity', '颜色', '比重', '蛋白', '', '酮体', '亚硝酸']
if any(kw in project_lower for kw in urine_projects):
return 'URINE'
# 尿检WBC特征project是"WBC"且result是小数字或Negative/Positive
if abb_upper == 'WBC' and project_lower == 'wbc':
return 'URINE'
if abb_upper == 'WBC' and 'total' in project_lower:
return 'CBC'
# pH在尿检中
if abb_upper == 'PH' and 'ph' in project_lower and len(project) < 20:
return 'URINE'
# 定性结果通常是尿检
if abb_upper in ['PRO', 'GLU', 'KET', 'NIT', 'BLD'] and result_lower in ['negative', 'positive', 'trace']:
return 'URINE'
return ''
# 重复ABB列表
duplicate_abbs = ['PRO', 'WBC', 'COLOR', 'PH', 'GLU', 'SG', 'NIT', 'KET', 'BLD', 'ERY']
# ABB别名映射提取数据ABB -> 模板变量名格式
# 解决如 CA153 vs CA15_3、CA199 vs CA19_9 的格式差异
abb_aliases = {
'CA153': 'CA15_3',
'CA199': 'CA19_9',
'ABO': 'BLOODTYPE', # ABO血型 -> BLOODTYPE
'RH': 'BLOODTYPERH', # Rh血型 -> BLOODTYPERH
'CKMB': 'CK_MB', # 心肌酶
}
for abb, data in matched_data.items():
# 标准化变量名(只保留字母数字下划线)
var_name = abb.replace('-', '_').replace('/', '_').replace('%', 'pct')
var_name = re.sub(r'[^a-zA-Z0-9_]', '', var_name)
# 检查是否有别名映射
abb_upper = abb.upper()
if abb_upper in abb_aliases:
alias_var = abb_aliases[abb_upper]
# 同时生成别名格式的变量
context[f"{alias_var}_result"] = data.get('result', '')
context[f"{alias_var}_point"] = data.get('point', '')
context[f"{alias_var}_refer"] = data.get('reference', '')
context[f"{alias_var}_unit"] = data.get('unit', '')
if not var_name or var_name[0].isdigit():
var_name = 'V_' + var_name
# 对于重复ABB根据project推断模块并添加前缀
if abb.upper() in duplicate_abbs:
module = get_module(abb, data.get('project', ''), data.get('result', ''))
if module:
var_name_with_module = f"{module}_{var_name}"
context[f"{var_name_with_module}_result"] = data.get('result', '')
context[f"{var_name_with_module}_point"] = data.get('point', '')
context[f"{var_name_with_module}_refer"] = data.get('reference', '')
context[f"{var_name_with_module}_unit"] = data.get('unit', '')
# 同时保留不带前缀的(兼容)
context[f"{var_name}_result"] = data.get('result', '')
context[f"{var_name}_point"] = data.get('point', '')
context[f"{var_name}_refer"] = data.get('reference', '')
context[f"{var_name}_unit"] = data.get('unit', '')
return context
def fill_template(template_path: str, matched_data: dict, output_path: str):
"""
使用docxtpl填充模板
Args:
template_path: docxtpl格式的模板路径
matched_data: 匹配的数据
output_path: 输出文件路径
"""
doc = DocxTemplate(template_path)
# 构建上下文
context = build_context(matched_data)
print(f"准备填充 {len(context)} 个变量")
# 渲染
doc.render(context)
# 保存
doc.save(output_path)
print(f"[OK] 已保存到: {output_path}")
return doc
def clean_empty_rows(doc_path: str, output_path: str):
"""清理空白数据行,合并表格"""
from docx import Document
from docx.text.paragraph import Paragraph as EarlyPara
import re
import copy
doc = Document(doc_path)
# === 首先删除"异常指标汇总"区域的所有表格 ===
# 这些表格在第一个检测模块之前,不应该存在
body_early = doc._body._body
children_early = list(body_early)
# 检测模块关键词(必须精确匹配检测模块标题)
detection_kw = ['urine detection', '尿液检测', 'complete blood count', '血常规',
'blood sugar', '血糖', 'blood lipid', '血脂', 'liver function', '肝功能',
'kidney function', '肾功能', 'thyroid', '甲状腺', 'coagulation', '凝血',
'infectious', '传染病', 'electrolyte', '电解质']
exclude_kw = ['health program', '健康方案', 'health report', '健康报告',
'abnormal', '异常', 'overall', '整体', 'assessment', '评估',
'blood glucose', 'hematology', 'hormonal', 'immunology', 'nutrition']
# 找第一个检测模块位置(查找精确的模块标题)
first_module_idx = len(children_early)
for idx, elem in enumerate(children_early):
if elem.tag.endswith('}p'):
try:
p = EarlyPara(elem, doc)
txt = p.text.strip().lower()
# 检测模块标题通常是短文本且包含特定关键词
if txt and len(txt) < 80:
is_mod = any(k in txt for k in detection_kw)
is_exc = any(k in txt for k in exclude_kw)
if is_mod and not is_exc:
first_module_idx = idx
print(f" 找到第一个检测模块: 位置{idx}")
break
except:
pass
# 删除第一个检测模块之前的所有表格(无论有无数据)
removed_early = 0
for idx, elem in enumerate(children_early):
if idx >= first_module_idx:
break
if elem.tag.endswith('}tbl'):
try:
elem.getparent().remove(elem)
removed_early += 1
except:
pass
if removed_early > 0:
print(f"[OK] 删除异常指标汇总区域表格: {removed_early}")
removed_rows = 0
merged_count = 0
def has_data_in_row(cells):
# 有效的定性结果列表
valid_qualitative = [
'negative', 'positive', 'normal', 'reactive', 'non-reactive',
'trace', 'clear', 'cloudy', 'turbid',
'yellow', 'pale yellow', 'dark yellow', 'amber', 'straw', # 尿液颜色
'red', 'brown', 'green', 'orange',
'a', 'b', 'ab', 'o', 'rh+', 'rh-', # 血型
'detected', 'not detected', 'present', 'absent'
]
# 只以“Result列”判断是否有数据避免把 Project/Refer 误判为结果
# 模板结构通常为:
# - 11列0 ABB, 1-2 Project, 3-4 Result, 5-6 Point, 7-8 Refer, 9-10 Unit
# - 6列0 ABB, 1 Project, 2 Result, 3 Point, 4 Refer, 5 Unit
if len(cells) >= 11:
result_col_candidates = [3, 4]
elif len(cells) >= 6:
result_col_candidates = [2, 3]
else:
result_col_candidates = [2]
result_candidates = []
for col_idx in result_col_candidates:
if col_idx < len(cells):
txt = (cells[col_idx].text or '').strip()
if txt:
result_candidates.append(txt)
result_text = result_candidates[0] if result_candidates else ''
if not result_text:
return False
if result_text in ['', '-', '/', ' ', '.', ':', '{{', '}}']:
return False
if result_text.startswith('{{'):
return False
# 排除“范围值”形态(常出现在 Refer 列,但模板错位时也可能落到 Result/Point 列)
if re.match(r'^[\(\[]?\s*[-+]?\d+(?:\.\d+)?\s*[-~]\s*[-+]?\d+(?:\.\d+)?\s*[\)\]]?$', result_text):
return False
if re.search(r'\d', result_text):
return True
if result_text.lower() in valid_qualitative:
return True
if len(result_text) > 2 and result_text.isalpha():
return True
return False
def is_header_row(row_text, cells=None):
"""精确识别表头行"""
# 先排除描述行,避免被误判为表头
if 'clinical significance' in row_text or '临床意义' in row_text:
return False
# 表头必须具备“Abb/简称 + Project/项目 + Result/结果”组合特征
has_abb = ('abb' in row_text) or ('简称' in row_text)
has_project = ('project' in row_text) or ('项目' in row_text)
has_result = ('result' in row_text) or ('结果' in row_text)
if not (has_abb and has_project and has_result):
return False
if cells:
non_empty_cells = [c for c in cells if c.text.strip()]
if len(non_empty_cells) < 2:
return False
if any(len(c.text.strip()) > 30 for c in cells):
return False
return True
def is_description_row(row_text):
return 'clinical significance' in row_text or '临床意义' in row_text
def is_data_row(first_cell):
if first_cell and 1 <= len(first_cell) <= 20:
clean = re.sub(r'[^a-zA-Z0-9]', '', first_cell)
return bool(clean) and clean.isalnum()
return False
def analyze_table(table):
info = {'header_idx': -1, 'desc_indices': [], 'data_with_result': [], 'data_without_result': []}
for row_idx, row in enumerate(table.rows):
cells = row.cells
if len(cells) < 2:
continue
row_text = ' '.join([c.text.strip().lower() for c in cells])
first_cell = cells[0].text.strip()
if is_header_row(row_text, cells):
info['header_idx'] = row_idx
elif is_description_row(row_text):
info['desc_indices'].append(row_idx)
elif is_data_row(first_cell):
if has_data_in_row(cells):
info['data_with_result'].append(row_idx)
else:
info['data_without_result'].append(row_idx)
return info
def is_special_table(table):
try:
if len(table.rows) != 3:
return False
row2_text = ' '.join([c.text for c in table.rows[2].cells]).lower()
return ('clinical significance' in row2_text) or ('临床意义' in row2_text)
except:
return False
def special_table_has_data(table):
try:
if len(table.rows) < 2:
return False
cells = table.rows[1].cells
if len(cells) < 3:
return False
result_text = (cells[2].text or '').strip()
if not result_text:
return False
if result_text in ['', '-', '/', '.', ':']:
return False
if result_text.startswith('{{'):
return False
return True
except:
return False
removed_special_tables = 0
for table in list(doc.tables):
if is_special_table(table) and not special_table_has_data(table):
try:
table._tbl.getparent().remove(table._tbl)
removed_special_tables += 1
except:
pass
# 获取表格顺序
body = doc._body._body
table_order = []
table_elem_indices = {} # 记录每个表格在body中的元素索引
body_children = list(body)
for idx, elem in enumerate(body_children):
if elem.tag.endswith('}tbl'):
for t in doc.tables:
if t._tbl is elem:
table_order.append(t)
table_elem_indices[t] = idx
break
# 找到第一个检测模块标题的位置(用于排除文档开头的非检测模块表格)
from docx.text.paragraph import Paragraph as Para
first_module_elem_idx = len(body_children) # 默认在最后
for idx, elem in enumerate(body_children):
if elem.tag.endswith('}p'):
try:
p = Para(elem, doc)
txt = p.text.strip().lower()
# 检查是否是检测模块标题(排除非检测模块)
if txt and len(txt) < 50:
is_module = any(kw in txt for kw in module_keywords)
is_exclude = any(kw in txt for kw in exclude_keywords)
if is_module and not is_exclude:
first_module_elem_idx = idx
break
except:
pass
# 合并表格(只在下一个表头之前搜索,避免跨模块吸走数据)
# 排除文档开头(第一个检测模块之前)的表格,避免把数据合并到非检测模块表格
tables_to_remove = set()
for i in range(len(table_order)):
if table_order[i] in tables_to_remove:
continue
t1 = table_order[i]
t1_elem_idx = table_elem_indices.get(t1, 0)
# 跳过第一个检测模块之前的表格(如"异常指标汇总"
if t1_elem_idx < first_module_elem_idx:
continue
info1 = analyze_table(t1)
if info1['header_idx'] >= 0 and len(info1['data_with_result']) == 0:
next_header_pos = None
for k in range(i + 1, len(table_order)):
if table_order[k] in tables_to_remove:
continue
k_info = analyze_table(table_order[k])
if k_info['header_idx'] >= 0 and len(k_info['data_with_result']) == 0:
next_header_pos = k
break
search_end = next_header_pos if next_header_pos is not None else len(table_order)
candidates = []
for j in range(i + 1, search_end):
if table_order[j] in tables_to_remove:
continue
candidate = table_order[j]
candidate_info = analyze_table(candidate)
if len(candidate_info['data_with_result']) > 0:
candidates.append((candidate, candidate_info))
if not candidates:
continue
# 取第一条数据的项目名作为标题
title_text = ''
try:
first_candidate, first_candidate_info = candidates[0]
if first_candidate_info.get('data_with_result'):
data_row_idx = first_candidate_info['data_with_result'][0]
if len(first_candidate.rows[data_row_idx].cells) > 1:
title_text = first_candidate.rows[data_row_idx].cells[1].text.strip()
if not title_text:
title_text = first_candidate.rows[data_row_idx].cells[0].text.strip()
except:
title_text = ''
# 清空:删除表头行之后所有旧行,但尽量保留表头下一行作为“标题行结构”
header_idx = info1['header_idx']
title_row_idx = header_idx + 1
keep_title_row = title_row_idx < len(t1.rows)
delete_from = (title_row_idx + 1) if keep_title_row else (header_idx + 1)
for ridx in range(len(t1.rows) - 1, delete_from - 1, -1):
try:
t1._tbl.remove(t1.rows[ridx]._tr)
removed_rows += 1
except:
pass
if not keep_title_row:
try:
new_tr = copy.deepcopy(t1.rows[header_idx]._tr)
t1._tbl.insert(title_row_idx, new_tr)
except:
pass
try:
if title_row_idx < len(t1.rows):
title_row = t1.rows[title_row_idx]
for c in title_row.cells:
c.text = ''
if title_text:
title_row.cells[0].text = title_text
except:
pass
for candidate, candidate_info in candidates:
for row_idx in candidate_info['data_with_result'] + candidate_info['desc_indices']:
new_tr = copy.deepcopy(candidate.rows[row_idx]._tr)
t1._tbl.append(new_tr)
tables_to_remove.add(candidate)
merged_count += 1
for t in tables_to_remove:
try:
t._tbl.getparent().remove(t._tbl)
except:
pass
# 删除逻辑:
# 1. 两个数据行都没数据 → 删除整个表格
# 2. 一行有数据一行没有 → 只删没数据的行,保留解释行
tables_to_delete = []
for table in doc.tables:
info = analyze_table(table)
data_with = info['data_with_result'] # 有数据的行
data_without = info['data_without_result'] # 没数据的行
# 情况1所有数据行都没有数据 → 删除整个表格
if len(data_with) == 0 and len(data_without) > 0:
tables_to_delete.append(table)
continue
# 情况2有些行有数据有些没有 → 只删除没数据的行
if len(data_with) > 0 and len(data_without) > 0:
for row_idx in sorted(data_without, reverse=True):
try:
table._tbl.remove(table.rows[row_idx]._tr)
removed_rows += 1
except:
pass
# 删除整个表格
for table in tables_to_delete:
try:
table._tbl.getparent().remove(table._tbl)
removed_rows += 1
except:
pass
# 补全合并后的标题行(表头下一行为空时)
for table in doc.tables:
info = analyze_table(table)
if info['header_idx'] < 0:
continue
if len(info['data_with_result']) == 0:
continue
title_row_idx = info['header_idx'] + 1
if title_row_idx >= len(table.rows):
continue
try:
title_row = table.rows[title_row_idx]
# 如果表头下一行本身就是数据行,则需要插入一个独立标题行
try:
first_cell = title_row.cells[0].text.strip() if title_row.cells else ''
if is_data_row(first_cell) and has_data_in_row(title_row.cells):
extracted_title = ''
try:
if len(title_row.cells) > 1:
extracted_title = title_row.cells[1].text.strip()
if not extracted_title:
extracted_title = title_row.cells[0].text.strip()
except:
extracted_title = ''
header_tr = copy.deepcopy(table.rows[info['header_idx']]._tr)
table._tbl.insert(title_row_idx, header_tr)
title_row = table.rows[title_row_idx]
try:
for c in title_row.cells:
c.text = ''
if extracted_title:
title_row.cells[0].text = extracted_title
except:
pass
continue
except:
pass
if any((c.text or '').strip() for c in title_row.cells):
continue
first_data_idx = info['data_with_result'][0]
if first_data_idx >= len(table.rows):
continue
data_row = table.rows[first_data_idx]
title_text = ''
if len(data_row.cells) > 1:
title_text = data_row.cells[1].text.strip()
if not title_text:
title_text = data_row.cells[0].text.strip()
if not title_text:
continue
for c in title_row.cells:
c.text = ''
title_row.cells[0].text = title_text
except:
pass
# 删除没有数据且没有表头的表格(保留表头表格)
removed_tables = 0
for table in list(doc.tables):
info = analyze_table(table)
# 只删除既没有数据也没有表头的表格
if len(info['data_with_result']) == 0 and info['header_idx'] < 0:
try:
table._tbl.getparent().remove(table._tbl)
removed_tables += 1
except:
pass
# === 新增:梳理文档结构 ===
# 模块标题关键词24个文字模块分类
module_keywords = [
# 1. 尿液检测
'urine detection', 'urine analysis', 'urinalysis', '尿液检测', '尿常规',
# 2. 血常规
'complete blood count', 'blood routine', 'cbc', '血常规',
# 3. 血糖
'blood sugar', 'glucose', 'blood glucose', '血糖', '糖代谢',
# 4. 血脂
'lipid panel', 'lipid profile', 'blood lipid', '血脂',
# 5. 血型
'blood type', 'blood group', 'abo', '血型',
# 6. 凝血功能
'coagulation', 'clotting', '凝血功能', '凝血',
# 7. 传染病四项
'infectious disease', 'hepatitis', '传染病四项', '传染病',
# 8. 血电解质
'electrolyte', 'serum electrolyte', '血电解质', '电解质',
# 9. 肝功能
'liver function', 'hepatic function', '肝功能',
# 10. 肾功能
'kidney function', 'renal function', '肾功能',
# 11. 心肌酶谱
'cardiac enzyme', 'myocardial enzyme', '心肌酶谱', '心肌酶',
# 12. 甲状腺功能
'thyroid function', 'thyroid', '甲状腺功能', '甲状腺',
# 13. 心脑血管风险因子
'cardiovascular risk', 'cerebrovascular', '心脑血管风险因子', '心脑血管', '心血管',
# 14. 骨代谢
'bone metabolism', 'bone marker', '骨代谢',
# 15. 微量元素
'trace element', 'microelement', 'heavy metal', '微量元素', '重金属',
# 16. 淋巴细胞亚群
'lymphocyte subsets', 'lymphocyte subpopulation', '淋巴细胞亚群',
# 17. 体液免疫
'humoral immunity', 'immunoglobulin', '体液免疫',
# 18. 炎症反应
'inflammation', 'inflammatory', '炎症反应', '炎症',
# 19. 自身抗体
'autoantibody', 'autoimmune', '自身抗体', '自身免疫',
# 20. 女性荷尔蒙
'female hormone', 'estrogen', 'progesterone', '女性荷尔蒙', '女性激素',
# 21. 男性荷尔蒙
'male hormone', 'testosterone', 'androgen', '男性荷尔蒙', '男性激素',
# 22. 肿瘤标记物
'tumor marker', 'cancer marker', '肿瘤标记物', '肿瘤标志物',
# 23. 影像学检查
'imaging', 'radiology', 'ultrasound', 'x-ray', 'ct', 'mri', '影像学检查', '影像',
# 24. 女性专项检查
'female specific', 'gynecological', 'gynecology', '女性专项检查', '妇科',
]
# 排除列表:这些不是检测模块,不应该被识别为模块标题
exclude_keywords = [
'client health program', '客户健康方案',
'health report', '健康报告',
'overall health', '整体健康',
'health assessment', '健康评估',
'abnormal index', '异常指标',
'be.u', 'wellness center',
'name', 'gender', 'age', 'nation', # 用户信息字段
'姓名', '性别', '年龄', '国籍',
]
def contains_exclude_keyword(text: str) -> bool:
"""检查文本是否包含排除关键词"""
text_lower = text.lower()
return any(kw in text_lower for kw in exclude_keywords)
def is_module_title_table(table):
"""检查表格是否是模块标题表格"""
if len(table.rows) < 1:
return False
try:
for row_idx in range(min(2, len(table.rows))):
row_text = ' '.join([c.text.lower().strip() for c in table.rows[row_idx].cells])
# 先检查排除关键词
if contains_exclude_keyword(row_text):
return False
for kw in module_keywords:
if kw in row_text:
return True
except:
pass
return False
def table_has_data(table):
"""检查表格是否有有效数据"""
info = analyze_table(table)
return len(info['data_with_result']) > 0
def is_module_title_paragraph(p_text: str) -> bool:
"""检查段落是否是模块标题(文字模块)"""
if not p_text:
return False
text = p_text.strip().lower()
if not text:
return False
# 标题通常很短(避免误匹配正文)
if len(text) > 40:
return False
# 先检查排除关键词
if contains_exclude_keyword(text):
return False
return any(kw in text for kw in module_keywords)
# 1. 基于body元素顺序识别模块支持段落标题与表格标题
from docx.oxml import OxmlElement
from docx.oxml.ns import qn as oxml_qn
from docx.text.paragraph import Paragraph
from docx.table import Table
body = doc._body._body
body_children = list(body)
tbl_map = {t._tbl: t for t in doc.tables}
def get_table_from_elem(elem):
return tbl_map.get(elem)
def is_blank_paragraph_elem(elem):
try:
p = Paragraph(elem, doc)
return p.text.strip() == ''
except:
return False
def create_visible_blank_paragraph():
"""创建可见的空行段落含一个空格run避免被Word折叠"""
p = OxmlElement('w:p')
pPr = OxmlElement('w:pPr')
spacing = OxmlElement('w:spacing')
spacing.set(oxml_qn('w:after'), '0')
spacing.set(oxml_qn('w:before'), '0')
pPr.append(spacing)
p.append(pPr)
r = OxmlElement('w:r')
t = OxmlElement('w:t')
t.text = ' '
r.append(t)
p.append(r)
return p
def is_module_start_elem(elem):
if elem.tag.endswith('}tbl'):
t = get_table_from_elem(elem)
return bool(t) and is_module_title_table(t)
if elem.tag.endswith('}p'):
try:
p = Paragraph(elem, doc)
return is_module_title_paragraph(p.text)
except:
return False
return False
# 收集所有模块起点
module_start_indices = [i for i, e in enumerate(body_children) if is_module_start_elem(e)]
# === 模块删除逻辑(删除无数据的文字模块及其表格)===
# 规则:当一个文字模块中没有任何表格有数据时,删除该模块标题和所有表格
removed_modules = 0
elements_removed_in_modules = 0
if module_start_indices:
# 从后往前处理每个模块,避免索引变化问题
for idx in range(len(module_start_indices) - 1, -1, -1):
start_i = module_start_indices[idx]
end_i = module_start_indices[idx + 1] if idx + 1 < len(module_start_indices) else len(body_children)
# 获取模块区间内的所有元素
module_elements = body_children[start_i:end_i]
# 检查模块内是否有任何表格有数据
module_has_data = False
module_tables = []
for e in module_elements:
if e.tag.endswith('}tbl'):
t = get_table_from_elem(e)
if t:
module_tables.append(e)
if table_has_data(t):
module_has_data = True
# 如果模块没有数据,删除模块标题和所有表格
if not module_has_data and module_tables:
# 删除模块内的所有元素(从后往前删除)
for e in reversed(module_elements):
try:
e.getparent().remove(e)
elements_removed_in_modules += 1
except:
pass
removed_modules += 1
# 重新抓取body删除后索引已变化
body = doc._body._body
body_children = list(body)
# 2. 在模块内表格之间添加空行(段落/表格标题均作为模块边界)
space_count = 0
current_module_started = False
prev_was_data_table = False
i = 0
while i < len(body_children):
elem = body_children[i]
if is_module_start_elem(elem):
current_module_started = True
prev_was_data_table = False
i += 1
continue
if current_module_started and elem.tag.endswith('}tbl'):
t = get_table_from_elem(elem)
is_title = bool(t) and is_module_title_table(t)
is_data = bool(t) and (not is_title) and table_has_data(t)
if is_data:
# 向上跳过空段落,判断前一个有效元素是否为数据表格
j = i - 1
while j >= 0 and body_children[j].tag.endswith('}p') and is_blank_paragraph_elem(body_children[j]):
j -= 1
prev_is_data_table = False
if j >= 0 and body_children[j].tag.endswith('}tbl'):
prev_t = get_table_from_elem(body_children[j])
if prev_t and (not is_module_title_table(prev_t)) and table_has_data(prev_t):
prev_is_data_table = True
if prev_is_data_table:
# 保证两表之间有一个“可见空行”段落
prev_elem = body_children[i - 1] if i - 1 >= 0 else None
# 情况1紧挨着上一张表或非空段落=> 插入可见空行
if not (prev_elem is not None and prev_elem.tag.endswith('}p') and is_blank_paragraph_elem(prev_elem)):
empty_p = create_visible_blank_paragraph()
body.insert(i, empty_p)
space_count += 1
body_children = list(body)
i += 1
else:
# 情况2已有空段落但可能不可见 => 补一个空格 run
try:
p_elem = prev_elem
has_run = any(c.tag.endswith('}r') for c in list(p_elem))
if not has_run:
r = OxmlElement('w:r')
tt = OxmlElement('w:t')
tt.text = ' '
r.append(tt)
p_elem.append(r)
space_count += 1
except:
pass
prev_was_data_table = is_data
elif elem.tag.endswith('}p'):
# 如果表格之间已经有段落(无论是否空白),就不重复插入
pass
i += 1
doc.save(output_path)
print(f"[OK] 清理完成: 删除 {removed_rows} 行, 合并 {merged_count} 对表格, 删除 {removed_tables} 个空表格")
print(f"[OK] 清理特殊表格: 删除 {removed_special_tables} 个空特殊表格")
print(f"[OK] 结构整理: 删除 {removed_modules} 个无数据模块, 删除 {elements_removed_in_modules} 个模块元素, 插入 {space_count} 个表格间空行")
return doc
def main():
"""主函数"""
# 路径配置
template_path = r"c:\Users\UI\Desktop\医疗报告\template_docxtpl.docx"
filled_path = r"c:\Users\UI\Desktop\医疗报告\backend\reports\filled_docxtpl_temp.docx"
reports_dir = Path(__file__).parent / "reports"
reports_dir.mkdir(parents=True, exist_ok=True)
def get_next_output_path() -> str:
existing = list(reports_dir.glob("filled_report_v*.docx"))
max_v = 0
for p in existing:
name = p.stem
try:
v_str = name.split("filled_report_v", 1)[1]
v = int(v_str)
if v > max_v:
max_v = v
except:
continue
return str(reports_dir / f"filled_report_v{max_v + 1}.docx")
output_path = get_next_output_path()
# 优先使用DeepSeek处理后的数据
deepseek_file = Path(__file__).parent / "deepseek_processed_data.json"
extracted_file = Path(__file__).parent / "extracted_medical_data.json"
# 加载ABB配置
from config import load_abb_config
abb_config = load_abb_config()
use_deepseek = deepseek_file.exists()
if use_deepseek:
print("使用DeepSeek处理后的数据")
with open(deepseek_file, 'r', encoding='utf-8') as f:
matched_data = json.load(f)
print(f"加载 {len(matched_data)} 个匹配项")
# 直接填充,跳过匹配步骤
print("\n步骤1: 填充数据...")
fill_template(template_path, matched_data, filled_path)
print("\n步骤2: 清理空白行...")
clean_empty_rows(filled_path, output_path)
import os
if os.path.exists(filled_path):
os.remove(filled_path)
print(f"\n[SUCCESS] 完成! 输出: {output_path}")
return
# 原有逻辑:使用本地处理
if not extracted_file.exists():
print("[ERROR] 未找到提取数据,请先运行 deepseek_process.py 或 extract_and_fill_report.py")
return
with open(extracted_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, dict):
extracted_items = data.get('items', [])
else:
extracted_items = data
# 清理数据(分离单位和参考范围)
extracted_items = clean_extracted_data(extracted_items)
print(f"加载 {len(extracted_items)} 个提取项")
# 使用已加载的ABB配置
template_abbs = {}
for abb_upper, info in abb_config.get('abb_to_info', {}).items():
template_abbs[abb_upper] = info
# 处理包含/的ABB
if '/' in abb_upper:
for part in abb_upper.split('/'):
template_abbs[part.strip()] = info
# 按ABB分组
items_by_abb = {}
for item in extracted_items:
abb = item['abb'].upper()
if abb not in items_by_abb:
items_by_abb[abb] = []
items_by_abb[abb].append(item)
# 选择每个ABB的最佳匹配
best_items = select_best_match(items_by_abb)
# 与模板匹配
matched_data = {}
for abb, item in best_items.items():
if abb in template_abbs:
matched_data[abb] = item
else:
for t_abb in template_abbs:
if abb in t_abb or t_abb in abb:
matched_data[t_abb] = item
break
print(f"清理后 {len(best_items)} 个有效项, 匹配 {len(matched_data)}")
# 步骤1: 填充
print("\n步骤1: 填充数据...")
fill_template(template_path, matched_data, filled_path)
# 步骤2: 清理空行
print("\n步骤2: 清理空白行...")
clean_empty_rows(filled_path, output_path)
# 删除临时文件
import os
if os.path.exists(filled_path):
os.remove(filled_path)
print(f"\n[SUCCESS] 完成! 输出: {output_path}")
if __name__ == "__main__":
main()