"""XLSX 解析器,使用 openpyxl 提取 Excel 数据并转换为 Markdown 表格格式""" import os from typing import List from openpyxl import load_workbook from exceptions import ParseError from parsers.base import BaseParser class XlsxParser(BaseParser): """XLSX 解析器,遍历所有工作表,处理合并单元格,以 sheet 名为标题,数据转 Markdown 表格""" def supported_extensions(self) -> List[str]: return [".xlsx"] def parse(self, file_path: str) -> str: file_name = os.path.basename(file_path) try: workbook = load_workbook(file_path) except Exception as e: raise ParseError(file_name, f"XLSX 文件打开失败: {e}") try: return self._workbook_to_md(workbook) except ParseError: raise except Exception as e: raise ParseError(file_name, f"XLSX 文件解析失败: {e}") def _workbook_to_md(self, workbook) -> str: """将整个工作簿转换为 Markdown""" sheet_parts = [] for sheetname in workbook.sheetnames: sheet = workbook[sheetname] md = self._sheet_to_md(sheet, sheetname) if md: sheet_parts.append(md) return "\n\n".join(sheet_parts) def _sheet_to_md(self, sheet, sheetname: str) -> str: """将单个工作表转换为 Markdown 表格""" # 处理合并单元格:填充合并区域的值 merged_values = self._get_merged_cell_values(sheet) # 收集所有行数据 rows = list(sheet.iter_rows()) if not rows: return "" # 第一行作为表头 headers = [] for cell in rows[0]: value = self._resolve_cell_value(cell, merged_values) headers.append(self._escape_cell(value)) if not any(h.strip() for h in headers): return "" lines = [f"## {sheetname}", ""] # 表头行 lines.append("| " + " | ".join(headers) + " |") # 分隔行 lines.append("| " + " | ".join("---" for _ in headers) + " |") # 数据行 for row in rows[1:]: cells = [] for cell in row: value = self._resolve_cell_value(cell, merged_values) cells.append(self._escape_cell(value)) lines.append("| " + " | ".join(cells) + " |") return "\n".join(lines) @staticmethod def _get_merged_cell_values(sheet) -> dict: """ 获取合并单元格的值映射。 返回 dict: {cell_coordinate: value} 对于合并区域内的所有单元格, 值来自合并区域左上角的单元格。 """ merged_values = {} for merged_range in sheet.merged_cells.ranges: # 左上角单元格的值 top_left_value = sheet.cell( row=merged_range.min_row, column=merged_range.min_col ).value # 填充合并区域内所有单元格 for row in range(merged_range.min_row, merged_range.max_row + 1): for col in range(merged_range.min_col, merged_range.max_col + 1): coord = sheet.cell(row=row, column=col).coordinate merged_values[coord] = top_left_value return merged_values @staticmethod def _resolve_cell_value(cell, merged_values: dict): """获取单元格的值,优先使用合并单元格映射""" value = cell.value if value is None: value = merged_values.get(cell.coordinate) return value @staticmethod def _escape_cell(value) -> str: """转义单元格内容,避免破坏 Markdown 表格结构""" if value is None: return "" cell_str = str(value) cell_str = cell_str.replace("\r\n", "
") cell_str = cell_str.replace("\n", "
") cell_str = cell_str.replace("|", "|") cell_str = cell_str.replace("`", "`") return cell_str