Initial commit: AI 知识库文档智能分块工具

This commit is contained in:
AI Knowledge Splitter
2026-03-02 17:38:28 +08:00
commit 92e7fc5bda
160 changed files with 9577 additions and 0 deletions

1
parsers/__init__.py Normal file
View File

@@ -0,0 +1 @@

62
parsers/base.py Normal file
View File

@@ -0,0 +1,62 @@
"""BaseParser 抽象基类和 ParserRegistry 解析器注册表"""
import os
from abc import ABC, abstractmethod
from typing import List
from exceptions import ParseError, UnsupportedFormatError
class BaseParser(ABC):
"""文件解析器抽象基类"""
@abstractmethod
def supported_extensions(self) -> List[str]:
"""返回支持的文件扩展名列表,如 ['.pdf']"""
pass
@abstractmethod
def parse(self, file_path: str) -> str:
"""
解析文件并返回纯文本/Markdown 内容。
Args:
file_path: 文件路径
Returns:
提取的文本内容Markdown 格式优先)
Raises:
ParseError: 文件损坏、格式不支持或编码无法识别时抛出
"""
pass
class ParserRegistry:
"""根据文件扩展名自动选择合适的解析器"""
def __init__(self):
self._parsers: List[BaseParser] = []
def register(self, parser: BaseParser) -> None:
"""注册一个解析器"""
self._parsers.append(parser)
def get_parser(self, file_path: str) -> BaseParser:
"""
根据文件扩展名返回对应的解析器。
Args:
file_path: 文件路径
Returns:
匹配的解析器实例
Raises:
UnsupportedFormatError: 未找到匹配的解析器时抛出
"""
ext = os.path.splitext(file_path)[1].lower()
for parser in self._parsers:
if ext in parser.supported_extensions():
return parser
raise UnsupportedFormatError(os.path.basename(file_path), ext)

100
parsers/csv_parser.py Normal file
View File

@@ -0,0 +1,100 @@
"""CSV 文件解析器,使用 charset-normalizer 检测编码 + csv.reader 读取,转换为 Markdown 表格"""
import csv
import io
import os
from typing import List
from charset_normalizer import detect
from exceptions import ParseError
from parsers.base import BaseParser
class CsvParser(BaseParser):
"""CSV 文件解析器,将 CSV 数据转换为 Markdown 表格格式"""
def supported_extensions(self) -> List[str]:
return [".csv"]
def parse(self, file_path: str) -> str:
"""
解析 CSV 文件,自动检测编码,转换为 Markdown 表格格式。
参考 MaxKB CsvSplitHandle.get_content() 核心逻辑:
读取文件字节 → charset_normalizer 检测编码 → 解码 → csv.reader 读取 → 构建 Markdown 表格
Markdown 表格格式:
| header1 | header2 | header3 |
| --- | --- | --- |
| data1 | data2 | data3 |
单元格处理:换行符替换为 <br>,管道符转义为 &#124;
Args:
file_path: 文件路径
Returns:
Markdown 表格格式的文本内容
Raises:
ParseError: 文件无法读取或编码检测失败时抛出
"""
file_name = os.path.basename(file_path)
try:
with open(file_path, "rb") as f:
buffer = f.read()
except Exception as e:
raise ParseError(file_name, f"文件读取失败: {e}")
if len(buffer) == 0:
return ""
result = detect(buffer)
encoding = result.get("encoding")
if encoding is None:
raise ParseError(file_name, "无法检测文件编码")
try:
text = buffer.decode(encoding)
except Exception as e:
raise ParseError(file_name, f"编码解码失败 ({encoding}): {e}")
reader = csv.reader(io.StringIO(text))
rows = list(reader)
if not rows:
return ""
header = rows[0]
if not header:
return ""
lines = []
# Header row
header_cells = [self._escape_cell(cell) for cell in header]
lines.append("| " + " | ".join(header_cells) + " |")
# Separator row
lines.append("| " + " | ".join(["---"] * len(header)) + " |")
# Data rows
for row in rows[1:]:
# Pad or truncate row to match header length
padded = row + [""] * (len(header) - len(row))
cells = [self._escape_cell(cell) for cell in padded[: len(header)]]
lines.append("| " + " | ".join(cells) + " |")
return "\n".join(lines) + "\n"
@staticmethod
def _escape_cell(cell: str) -> str:
"""转义单元格内容:换行符替换为 <br>,管道符替换为 &#124;"""
cell = cell.replace("|", "&#124;")
cell = cell.replace("\r\n", "<br>")
cell = cell.replace("\n", "<br>")
cell = cell.replace("\r", "<br>")
return cell

217
parsers/doc_parser.py Normal file
View File

@@ -0,0 +1,217 @@
"""Word 文档解析器,使用 python-docx 提取文本并转换为 Markdown 格式"""
import os
from typing import List, Optional
from docx import Document
from docx.table import Table as DocxTable
from docx.text.paragraph import Paragraph as DocxParagraph
from exceptions import ParseError
from parsers.base import BaseParser
# 字体大小 → 标题层级映射(需要 bold
# (min_pt, max_pt) → heading_level
_FONT_SIZE_HEADING_MAP = [
(36, 100, 1),
(26, 36, 2),
(24, 26, 3),
(22, 24, 4),
(18, 22, 5),
(16, 18, 6),
]
class DocParser(BaseParser):
"""Word 文档解析器,遍历文档 body 元素,段落按 style/字体大小判断标题层级,表格转 Markdown"""
def supported_extensions(self) -> List[str]:
return [".docx"]
def parse(self, file_path: str) -> str:
"""
解析 Word 文档,提取文本并转换为 Markdown 格式。
参考 MaxKB DocSplitHandle 核心逻辑:
遍历 doc.element.body → 段落按 style name 或字体大小判断标题层级
→ 表格转 Markdown 表格 → 拼接为 Markdown
Args:
file_path: 文件路径
Returns:
Markdown 格式的文本内容
Raises:
ParseError: 文件无法读取或解析失败时抛出
"""
file_name = os.path.basename(file_path)
try:
doc = Document(file_path)
except Exception as e:
raise ParseError(file_name, f"Word 文档打开失败: {e}")
try:
return self._to_md(doc)
except ParseError:
raise
except Exception as e:
raise ParseError(file_name, f"Word 文档解析失败: {e}")
@staticmethod
def _get_title_level(paragraph: DocxParagraph) -> Optional[int]:
"""
判断段落的标题层级。
优先检查 style nameHeading X / TOC 标题 / 标题),
回退到字体大小 + bold 判断。
Args:
paragraph: python-docx 段落对象
Returns:
标题层级 (1-6),非标题返回 None
"""
# 1. 检查 style name
style_name = paragraph.style.name if paragraph.style else ""
if style_name:
for prefix in ("Heading", "TOC 标题", "标题"):
if style_name.startswith(prefix):
# 提取层级数字
suffix = style_name[len(prefix):].strip()
if suffix.isdigit():
level = int(suffix)
if 1 <= level <= 6:
return level
# 如果没有数字后缀但匹配了前缀,默认为 1
if not suffix:
return 1
# 2. 回退到字体大小 + bold 判断
if not paragraph.runs:
return None
first_run = paragraph.runs[0]
if not first_run.bold:
return None
font_size = first_run.font.size
if font_size is None:
return None
pt = font_size.pt
for min_pt, max_pt, level in _FONT_SIZE_HEADING_MAP:
if min_pt <= pt < max_pt:
return level
return None
@staticmethod
def _paragraph_to_md(paragraph: DocxParagraph, level: Optional[int]) -> str:
"""
将段落转换为 Markdown 文本。
Args:
paragraph: python-docx 段落对象
level: 标题层级None 表示普通段落
Returns:
Markdown 格式的文本
"""
text = paragraph.text.strip()
if not text:
return ""
if level is not None:
return "#" * level + " " + text
return text
@staticmethod
def _table_to_md(table: DocxTable) -> str:
"""
将表格转换为 Markdown 表格格式。
第一行作为表头,第二行为分隔行,其余为数据行。
Cell 文本中的 | 转义为 &#124;,换行转为 <br>。
Args:
table: python-docx 表格对象
Returns:
Markdown 表格文本
"""
rows = table.rows
if not rows:
return ""
def cell_text(cell) -> str:
"""提取单元格文本,处理多段落和特殊字符"""
text = "<br>".join(p.text for p in cell.paragraphs)
text = text.replace("|", "&#124;")
text = text.replace("\n", "<br>")
return text
lines = []
# 表头行
header_cells = [cell_text(cell) for cell in rows[0].cells]
lines.append("| " + " | ".join(header_cells) + " |")
# 分隔行
lines.append("| " + " | ".join("---" for _ in header_cells) + " |")
# 数据行
for row in rows[1:]:
data_cells = [cell_text(cell) for cell in row.cells]
lines.append("| " + " | ".join(data_cells) + " |")
return "\n".join(lines)
def _to_md(self, doc: Document) -> str:
"""
将整个文档转换为 Markdown。
遍历 doc.element.body 的子元素,根据 tag 判断是段落还是表格,
分别转换后拼接。
Args:
doc: python-docx Document 对象
Returns:
Markdown 格式的完整文本
"""
parts = []
# 建立 element → 对象的映射,使用 doc.paragraphs/doc.tables 获取
# 正确构造的对象(带完整 parent chain可访问 style/part
para_elements = {}
for paragraph in doc.paragraphs:
para_elements[paragraph._element] = paragraph
table_elements = {}
for table in doc.tables:
table_elements[table._element] = table
for element in doc.element.body:
tag = element.tag
if tag.endswith("}tbl") or tag.endswith("tbl"):
# 表格元素
table = table_elements.get(element)
if table is not None:
md = self._table_to_md(table)
if md:
parts.append(md)
elif tag.endswith("}p") or tag.endswith("p"):
# 段落元素
paragraph = para_elements.get(element)
if paragraph is None:
continue
level = self._get_title_level(paragraph)
md = self._paragraph_to_md(paragraph, level)
if md:
parts.append(md)
return "\n".join(parts)

96
parsers/html_parser.py Normal file
View File

@@ -0,0 +1,96 @@
"""HTML 文件解析器,使用 bs4 + html2text 将 HTML 转换为 Markdown"""
import os
from typing import List, Optional
from bs4 import BeautifulSoup
from charset_normalizer import detect
import html2text
from exceptions import ParseError
from parsers.base import BaseParser
class HtmlParser(BaseParser):
"""HTML 文件解析器,去除脚本和样式,转换为 Markdown 格式"""
def supported_extensions(self) -> List[str]:
return [".html", ".htm"]
def parse(self, file_path: str) -> str:
"""
解析 HTML 文件,自动检测编码,去除 script/style 标签,转换为 Markdown。
参考 MaxKB HTMLSplitHandle.get_content() 核心逻辑:
读取文件字节 → 检测编码(优先 meta charset回退 charset_normalizer
→ 解码 → html2text 转 Markdown
Args:
file_path: 文件路径
Returns:
Markdown 格式的文本内容
Raises:
ParseError: 文件无法读取或编码检测失败时抛出
"""
file_name = os.path.basename(file_path)
try:
with open(file_path, "rb") as f:
buffer = f.read()
except Exception as e:
raise ParseError(file_name, f"文件读取失败: {e}")
if len(buffer) == 0:
return ""
encoding = self._get_encoding(buffer, file_name)
try:
content = buffer.decode(encoding)
except Exception as e:
raise ParseError(file_name, f"编码解码失败 ({encoding}): {e}")
converter = html2text.HTML2Text()
converter.body_width = 0 # Don't wrap lines
converter.ignore_images = False
converter.ignore_links = False
return converter.handle(content)
@staticmethod
def _get_encoding(buffer: bytes, file_name: str) -> str:
"""
检测 HTML 文件编码。
优先从 HTML meta charset 标签获取编码,回退到 charset_normalizer 自动检测。
Args:
buffer: 文件字节内容
file_name: 文件名(用于错误信息)
Returns:
检测到的编码名称
Raises:
ParseError: 编码无法检测时抛出
"""
# First try: extract charset from meta tags
try:
soup = BeautifulSoup(buffer, "html.parser")
meta_list = soup.find_all("meta")
for meta in meta_list:
if meta.attrs and "charset" in meta.attrs:
return meta.attrs["charset"]
except Exception:
pass
# Fallback: charset_normalizer
result = detect(buffer)
encoding = result.get("encoding") if result else None
if encoding is None:
raise ParseError(file_name, "无法检测文件编码")
return encoding

71
parsers/image_parser.py Normal file
View File

@@ -0,0 +1,71 @@
"""图片文件解析器,使用 Vision API 识别图片内容"""
import base64
import os
from typing import List, Optional
from api_client import ApiClient, EXTENSION_MIME_MAP
from exceptions import ApiError, ParseError
from parsers.base import BaseParser
DEFAULT_VISION_PROMPT = """\
请识别并提取图片中的所有文字和关键信息。请按以下结构输出:
1. **产品/主题名称**:图片展示的主要产品或主题
2. **文字内容**:图片中所有可见的文字,保持原始排版
3. **关键信息**:成分、功效、用法用量、规格、价格等结构化信息
4. **图片描述**:简要描述图片的视觉内容(产品外观、包装等)
如果某项信息不存在,可以省略该项。"""
class ImageParser(BaseParser):
"""图片解析器,通过 Vision API 将图片转换为文本描述"""
def __init__(self, api_client: ApiClient, vision_prompt: Optional[str] = None):
self._api_client = api_client
self._vision_prompt = vision_prompt or DEFAULT_VISION_PROMPT
def supported_extensions(self) -> List[str]:
return [".png", ".jpg", ".jpeg", ".bmp", ".gif", ".webp"]
def parse(self, file_path: str) -> str:
"""
解析图片文件:读取二进制 → base64 编码 → 调用 Vision API → 返回文本描述。
会将文件名作为上下文提示传入 prompt提高识别准确度。
"""
file_name = os.path.basename(file_path)
product_name = os.path.splitext(file_name)[0]
# 1. 读取图片文件
try:
with open(file_path, "rb") as f:
image_bytes = f.read()
except Exception as e:
raise ParseError(file_name, f"文件读取失败: {e}")
# 2. Base64 编码
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
# 3. 根据扩展名确定 MIME 类型
ext = os.path.splitext(file_path)[1].lower()
mime_type = EXTENSION_MIME_MAP.get(ext, "image/png")
# 4. 构建带文件名上下文的 prompt
context_prompt = (
f"{self._vision_prompt}\n\n"
f"参考信息:该图片的文件名为「{product_name}」,可能与图片内容相关。"
)
# 5. 调用 Vision API
try:
result = self._api_client.vision(
system_prompt=context_prompt,
image_base64=image_base64,
mime_type=mime_type,
)
except ApiError as e:
raise ParseError(file_name, f"Vision API 调用失败: {e}")
return result

View File

@@ -0,0 +1,119 @@
"""旧版 Word (.doc) 解析器
优先级:
1. Windows + pywin32 → 通过 Word COM 接口转换
2. LibreOffice → 跨平台 fallback
"""
import os
import subprocess
import sys
import tempfile
from typing import List
from exceptions import ParseError
from parsers.base import BaseParser
from parsers.doc_parser import DocParser
class LegacyDocParser(BaseParser):
"""旧版 .doc 文件解析器,自动选择最佳转换方式"""
def __init__(self):
self._docx_parser = DocParser()
def supported_extensions(self) -> List[str]:
return [".doc"]
def parse(self, file_path: str) -> str:
file_name = os.path.basename(file_path)
# Windows 优先尝试 Word COM 接口
if sys.platform == "win32":
try:
return self._parse_via_com(file_path, file_name)
except ParseError:
raise
except Exception:
# COM 失败Word 未安装等fallback 到 LibreOffice
pass
return self._parse_via_libreoffice(file_path, file_name)
def _parse_via_com(self, file_path: str, file_name: str) -> str:
"""通过 pywin32 COM 接口调用 Microsoft Word 转换 .doc → .docx"""
try:
import win32com.client
import pythoncom
except ImportError:
raise RuntimeError("pywin32 未安装")
abs_path = os.path.abspath(file_path)
with tempfile.TemporaryDirectory() as tmp_dir:
docx_path = os.path.join(tmp_dir, os.path.splitext(file_name)[0] + ".docx")
pythoncom.CoInitialize()
word = None
doc = None
try:
word = win32com.client.Dispatch("Word.Application")
word.Visible = False
word.DisplayAlerts = False
doc = word.Documents.Open(abs_path, ReadOnly=True)
# SaveAs2 格式 16 = wdFormatDocumentDefault (.docx)
doc.SaveAs2(os.path.abspath(docx_path), FileFormat=16)
doc.Close(False)
doc = None
except Exception as e:
raise ParseError(file_name, f"Word COM 转换失败: {e}")
finally:
if doc:
try:
doc.Close(False)
except Exception:
pass
if word:
try:
word.Quit()
except Exception:
pass
pythoncom.CoUninitialize()
if not os.path.exists(docx_path):
raise ParseError(file_name, "Word COM 转换后未找到 .docx 文件")
return self._docx_parser.parse(docx_path)
def _parse_via_libreoffice(self, file_path: str, file_name: str) -> str:
"""通过 LibreOffice 转换 .doc → .docx跨平台 fallback"""
with tempfile.TemporaryDirectory() as tmp_dir:
try:
subprocess.run(
[
"libreoffice", "--headless", "--convert-to", "docx",
"--outdir", tmp_dir, os.path.abspath(file_path),
],
capture_output=True,
timeout=120,
check=True,
)
except FileNotFoundError:
raise ParseError(
file_name,
"无法处理 .doc 文件。Windows 需要安装 Microsoft Word"
"其他系统需要安装 LibreOffice: https://www.libreoffice.org/download/",
)
except subprocess.TimeoutExpired:
raise ParseError(file_name, "LibreOffice 转换超时120秒")
except subprocess.CalledProcessError as e:
stderr = e.stderr.decode("utf-8", errors="replace") if e.stderr else "未知错误"
raise ParseError(file_name, f"LibreOffice 转换失败: {stderr}")
base_name = os.path.splitext(file_name)[0] + ".docx"
converted_path = os.path.join(tmp_dir, base_name)
if not os.path.exists(converted_path):
raise ParseError(file_name, "LibreOffice 转换后未找到 .docx 文件")
return self._docx_parser.parse(converted_path)

114
parsers/pdf_parser.py Normal file
View File

@@ -0,0 +1,114 @@
"""PDF 文件解析器,使用 fitz (PyMuPDF) 提取文本并根据字体大小判断标题层级"""
import os
from collections import Counter
from typing import List
import fitz
from exceptions import ParseError
from parsers.base import BaseParser
class PdfParser(BaseParser):
"""PDF 文件解析器,逐页提取文本块,根据字体大小与正文众数的差值判断标题层级"""
def supported_extensions(self) -> List[str]:
return [".pdf"]
def parse(self, file_path: str) -> str:
"""
解析 PDF 文件,提取文本并根据字体大小判断标题层级,拼接为 Markdown。
参考 MaxKB PdfSplitHandle.handle_pdf_content() 核心逻辑:
逐页提取文本块(dict格式) → 收集所有字体大小 → 计算众数作为正文字体
→ 根据差值判断标题层级(>2 → ##, >0.5 → ###) → 拼接为 Markdown
Args:
file_path: 文件路径
Returns:
Markdown 格式的文本内容
Raises:
ParseError: 文件无法读取或解析失败时抛出
"""
file_name = os.path.basename(file_path)
try:
doc = fitz.open(file_path)
except Exception as e:
raise ParseError(file_name, f"PDF 文件打开失败: {e}")
try:
return self._extract_content(doc)
except ParseError:
raise
except Exception as e:
raise ParseError(file_name, f"PDF 解析失败: {e}")
finally:
doc.close()
@staticmethod
def _extract_content(doc: fitz.Document) -> str:
"""
从 PDF 文档中提取文本内容。
第一遍遍历收集所有字体大小,计算众数作为正文字体大小。
第二遍遍历根据字体大小差值判断标题层级,拼接为 Markdown。
Args:
doc: PyMuPDF 文档对象
Returns:
Markdown 格式的文本内容
"""
# 第一步:收集所有字体大小
font_sizes = []
for page_num in range(len(doc)):
page = doc.load_page(page_num)
blocks = page.get_text("dict")["blocks"]
for block in blocks:
if block["type"] == 0: # 文本块
for line in block["lines"]:
for span in line["spans"]:
if span["size"] > 0:
font_sizes.append(span["size"])
# 计算正文字体大小(众数)
if not font_sizes:
body_font_size = 12
else:
body_font_size = Counter(font_sizes).most_common(1)[0][0]
# 第二步:提取内容
content = ""
for page_num in range(len(doc)):
page = doc.load_page(page_num)
blocks = page.get_text("dict")["blocks"]
for block in blocks:
if block["type"] != 0: # 跳过非文本块
continue
for line in block["lines"]:
if not line["spans"]:
continue
text = "".join(span["text"] for span in line["spans"])
font_size = line["spans"][0]["size"]
# 根据与正文字体的差值判断标题层级
size_diff = font_size - body_font_size
if size_diff > 2: # 明显大于正文
content += f"## {text}\n\n"
elif size_diff > 0.5: # 略大于正文
content += f"### {text}\n\n"
else: # 正文
content += f"{text}\n"
# 清除 null 字符
content = content.replace("\0", "")
return content

53
parsers/text_parser.py Normal file
View File

@@ -0,0 +1,53 @@
"""TXT/MD 文件解析器,使用 charset-normalizer 自动检测编码"""
import os
from typing import List
from charset_normalizer import detect
from exceptions import ParseError
from parsers.base import BaseParser
class TextParser(BaseParser):
"""纯文本和 Markdown 文件解析器"""
def supported_extensions(self) -> List[str]:
return [".txt", ".md"]
def parse(self, file_path: str) -> str:
"""
解析文本文件,自动检测编码并返回文本内容。
参考 MaxKB TextSplitHandle.get_content() 核心逻辑:
读取文件字节 → charset_normalizer 检测编码 → 解码返回文本
Args:
file_path: 文件路径
Returns:
文件的文本内容
Raises:
ParseError: 文件无法读取或编码检测失败时抛出
"""
file_name = os.path.basename(file_path)
try:
with open(file_path, "rb") as f:
buffer = f.read()
except Exception as e:
raise ParseError(file_name, f"文件读取失败: {e}")
if len(buffer) == 0:
return ""
result = detect(buffer)
encoding = result.get("encoding")
if encoding is None:
raise ParseError(file_name, "无法检测文件编码")
try:
return buffer.decode(encoding)
except Exception as e:
raise ParseError(file_name, f"编码解码失败 ({encoding}): {e}")

84
parsers/xls_parser.py Normal file
View File

@@ -0,0 +1,84 @@
"""XLS 解析器,使用 xlrd 提取 Excel 数据并转换为 Markdown 表格格式"""
import os
from typing import List
import xlrd
from exceptions import ParseError
from parsers.base import BaseParser
class XlsParser(BaseParser):
"""XLS 解析器,遍历所有 sheet跳过空 sheet转 Markdown 表格"""
def supported_extensions(self) -> List[str]:
return [".xls"]
def parse(self, file_path: str) -> str:
file_name = os.path.basename(file_path)
try:
workbook = xlrd.open_workbook(filename=file_path)
except Exception as e:
raise ParseError(file_name, f"XLS 文件打开失败: {e}")
try:
return self._workbook_to_md(workbook)
except ParseError:
raise
except Exception as e:
raise ParseError(file_name, f"XLS 文件解析失败: {e}")
def _workbook_to_md(self, workbook) -> str:
"""将整个工作簿转换为 Markdown"""
sheet_parts = []
for sheet in workbook.sheets():
md = self._sheet_to_md(sheet)
if md:
sheet_parts.append(md)
return "\n\n".join(sheet_parts)
def _sheet_to_md(self, sheet) -> str:
"""将单个工作表转换为 Markdown 表格"""
# 跳过空 sheet
if sheet.nrows == 0 or sheet.ncols == 0:
return ""
# 第一行作为表头
headers = [self._escape_cell(cell) for cell in sheet.row_values(0)]
lines = [f"## {sheet.name}", ""]
# 表头行
lines.append("| " + " | ".join(headers) + " |")
# 分隔行
lines.append("| " + " | ".join("---" for _ in headers) + " |")
# 数据行
for row_idx in range(1, sheet.nrows):
cells = [self._escape_cell(cell) for cell in sheet.row_values(row_idx)]
lines.append("| " + " | ".join(cells) + " |")
return "\n".join(lines)
@staticmethod
def _escape_cell(value) -> str:
"""转义单元格内容,避免破坏 Markdown 表格结构"""
if value is None:
return ""
cell_str = str(value)
# xlrd 返回的空单元格可能是空字符串
if not cell_str:
return ""
cell_str = cell_str.replace("\r\n", "<br>")
cell_str = cell_str.replace("\n", "<br>")
cell_str = cell_str.replace("|", "&#124;")
cell_str = cell_str.replace("`", "&#96;")
return cell_str

125
parsers/xlsx_parser.py Normal file
View File

@@ -0,0 +1,125 @@
"""XLSX 解析器,使用 openpyxl 提取 Excel 数据并转换为 Markdown 表格格式"""
import os
from typing import List
from openpyxl import load_workbook
from exceptions import ParseError
from parsers.base import BaseParser
class XlsxParser(BaseParser):
"""XLSX 解析器,遍历所有工作表,处理合并单元格,以 sheet 名为标题,数据转 Markdown 表格"""
def supported_extensions(self) -> List[str]:
return [".xlsx"]
def parse(self, file_path: str) -> str:
file_name = os.path.basename(file_path)
try:
workbook = load_workbook(file_path)
except Exception as e:
raise ParseError(file_name, f"XLSX 文件打开失败: {e}")
try:
return self._workbook_to_md(workbook)
except ParseError:
raise
except Exception as e:
raise ParseError(file_name, f"XLSX 文件解析失败: {e}")
def _workbook_to_md(self, workbook) -> str:
"""将整个工作簿转换为 Markdown"""
sheet_parts = []
for sheetname in workbook.sheetnames:
sheet = workbook[sheetname]
md = self._sheet_to_md(sheet, sheetname)
if md:
sheet_parts.append(md)
return "\n\n".join(sheet_parts)
def _sheet_to_md(self, sheet, sheetname: str) -> str:
"""将单个工作表转换为 Markdown 表格"""
# 处理合并单元格:填充合并区域的值
merged_values = self._get_merged_cell_values(sheet)
# 收集所有行数据
rows = list(sheet.iter_rows())
if not rows:
return ""
# 第一行作为表头
headers = []
for cell in rows[0]:
value = self._resolve_cell_value(cell, merged_values)
headers.append(self._escape_cell(value))
if not any(h.strip() for h in headers):
return ""
lines = [f"## {sheetname}", ""]
# 表头行
lines.append("| " + " | ".join(headers) + " |")
# 分隔行
lines.append("| " + " | ".join("---" for _ in headers) + " |")
# 数据行
for row in rows[1:]:
cells = []
for cell in row:
value = self._resolve_cell_value(cell, merged_values)
cells.append(self._escape_cell(value))
lines.append("| " + " | ".join(cells) + " |")
return "\n".join(lines)
@staticmethod
def _get_merged_cell_values(sheet) -> dict:
"""
获取合并单元格的值映射。
返回 dict: {cell_coordinate: value} 对于合并区域内的所有单元格,
值来自合并区域左上角的单元格。
"""
merged_values = {}
for merged_range in sheet.merged_cells.ranges:
# 左上角单元格的值
top_left_value = sheet.cell(
row=merged_range.min_row,
column=merged_range.min_col
).value
# 填充合并区域内所有单元格
for row in range(merged_range.min_row, merged_range.max_row + 1):
for col in range(merged_range.min_col, merged_range.max_col + 1):
coord = sheet.cell(row=row, column=col).coordinate
merged_values[coord] = top_left_value
return merged_values
@staticmethod
def _resolve_cell_value(cell, merged_values: dict):
"""获取单元格的值,优先使用合并单元格映射"""
value = cell.value
if value is None:
value = merged_values.get(cell.coordinate)
return value
@staticmethod
def _escape_cell(value) -> str:
"""转义单元格内容,避免破坏 Markdown 表格结构"""
if value is None:
return ""
cell_str = str(value)
cell_str = cell_str.replace("\r\n", "<br>")
cell_str = cell_str.replace("\n", "<br>")
cell_str = cell_str.replace("|", "&#124;")
cell_str = cell_str.replace("`", "&#96;")
return cell_str