Files
bigwo/parsers/csv_parser.py
2026-03-02 17:38:28 +08:00

101 lines
2.9 KiB
Python

"""CSV 文件解析器,使用 charset-normalizer 检测编码 + csv.reader 读取,转换为 Markdown 表格"""
import csv
import io
import os
from typing import List
from charset_normalizer import detect
from exceptions import ParseError
from parsers.base import BaseParser
class CsvParser(BaseParser):
"""CSV 文件解析器,将 CSV 数据转换为 Markdown 表格格式"""
def supported_extensions(self) -> List[str]:
return [".csv"]
def parse(self, file_path: str) -> str:
"""
解析 CSV 文件,自动检测编码,转换为 Markdown 表格格式。
参考 MaxKB CsvSplitHandle.get_content() 核心逻辑:
读取文件字节 → charset_normalizer 检测编码 → 解码 → csv.reader 读取 → 构建 Markdown 表格
Markdown 表格格式:
| header1 | header2 | header3 |
| --- | --- | --- |
| data1 | data2 | data3 |
单元格处理:换行符替换为 <br>,管道符转义为 &#124;
Args:
file_path: 文件路径
Returns:
Markdown 表格格式的文本内容
Raises:
ParseError: 文件无法读取或编码检测失败时抛出
"""
file_name = os.path.basename(file_path)
try:
with open(file_path, "rb") as f:
buffer = f.read()
except Exception as e:
raise ParseError(file_name, f"文件读取失败: {e}")
if len(buffer) == 0:
return ""
result = detect(buffer)
encoding = result.get("encoding")
if encoding is None:
raise ParseError(file_name, "无法检测文件编码")
try:
text = buffer.decode(encoding)
except Exception as e:
raise ParseError(file_name, f"编码解码失败 ({encoding}): {e}")
reader = csv.reader(io.StringIO(text))
rows = list(reader)
if not rows:
return ""
header = rows[0]
if not header:
return ""
lines = []
# Header row
header_cells = [self._escape_cell(cell) for cell in header]
lines.append("| " + " | ".join(header_cells) + " |")
# Separator row
lines.append("| " + " | ".join(["---"] * len(header)) + " |")
# Data rows
for row in rows[1:]:
# Pad or truncate row to match header length
padded = row + [""] * (len(header) - len(row))
cells = [self._escape_cell(cell) for cell in padded[: len(header)]]
lines.append("| " + " | ".join(cells) + " |")
return "\n".join(lines) + "\n"
@staticmethod
def _escape_cell(cell: str) -> str:
"""转义单元格内容:换行符替换为 <br>,管道符替换为 &#124;"""
cell = cell.replace("|", "&#124;")
cell = cell.replace("\r\n", "<br>")
cell = cell.replace("\n", "<br>")
cell = cell.replace("\r", "<br>")
return cell