101 lines
2.9 KiB
Python
101 lines
2.9 KiB
Python
|
|
"""CSV 文件解析器,使用 charset-normalizer 检测编码 + csv.reader 读取,转换为 Markdown 表格"""
|
|||
|
|
|
|||
|
|
import csv
|
|||
|
|
import io
|
|||
|
|
import os
|
|||
|
|
from typing import List
|
|||
|
|
|
|||
|
|
from charset_normalizer import detect
|
|||
|
|
|
|||
|
|
from exceptions import ParseError
|
|||
|
|
from parsers.base import BaseParser
|
|||
|
|
|
|||
|
|
|
|||
|
|
class CsvParser(BaseParser):
|
|||
|
|
"""CSV 文件解析器,将 CSV 数据转换为 Markdown 表格格式"""
|
|||
|
|
|
|||
|
|
def supported_extensions(self) -> List[str]:
|
|||
|
|
return [".csv"]
|
|||
|
|
|
|||
|
|
def parse(self, file_path: str) -> str:
|
|||
|
|
"""
|
|||
|
|
解析 CSV 文件,自动检测编码,转换为 Markdown 表格格式。
|
|||
|
|
|
|||
|
|
参考 MaxKB CsvSplitHandle.get_content() 核心逻辑:
|
|||
|
|
读取文件字节 → charset_normalizer 检测编码 → 解码 → csv.reader 读取 → 构建 Markdown 表格
|
|||
|
|
|
|||
|
|
Markdown 表格格式:
|
|||
|
|
| header1 | header2 | header3 |
|
|||
|
|
| --- | --- | --- |
|
|||
|
|
| data1 | data2 | data3 |
|
|||
|
|
|
|||
|
|
单元格处理:换行符替换为 <br>,管道符转义为 |
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
file_path: 文件路径
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Markdown 表格格式的文本内容
|
|||
|
|
|
|||
|
|
Raises:
|
|||
|
|
ParseError: 文件无法读取或编码检测失败时抛出
|
|||
|
|
"""
|
|||
|
|
file_name = os.path.basename(file_path)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
with open(file_path, "rb") as f:
|
|||
|
|
buffer = f.read()
|
|||
|
|
except Exception as e:
|
|||
|
|
raise ParseError(file_name, f"文件读取失败: {e}")
|
|||
|
|
|
|||
|
|
if len(buffer) == 0:
|
|||
|
|
return ""
|
|||
|
|
|
|||
|
|
result = detect(buffer)
|
|||
|
|
encoding = result.get("encoding")
|
|||
|
|
|
|||
|
|
if encoding is None:
|
|||
|
|
raise ParseError(file_name, "无法检测文件编码")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
text = buffer.decode(encoding)
|
|||
|
|
except Exception as e:
|
|||
|
|
raise ParseError(file_name, f"编码解码失败 ({encoding}): {e}")
|
|||
|
|
|
|||
|
|
reader = csv.reader(io.StringIO(text))
|
|||
|
|
rows = list(reader)
|
|||
|
|
|
|||
|
|
if not rows:
|
|||
|
|
return ""
|
|||
|
|
|
|||
|
|
header = rows[0]
|
|||
|
|
if not header:
|
|||
|
|
return ""
|
|||
|
|
|
|||
|
|
lines = []
|
|||
|
|
|
|||
|
|
# Header row
|
|||
|
|
header_cells = [self._escape_cell(cell) for cell in header]
|
|||
|
|
lines.append("| " + " | ".join(header_cells) + " |")
|
|||
|
|
|
|||
|
|
# Separator row
|
|||
|
|
lines.append("| " + " | ".join(["---"] * len(header)) + " |")
|
|||
|
|
|
|||
|
|
# Data rows
|
|||
|
|
for row in rows[1:]:
|
|||
|
|
# Pad or truncate row to match header length
|
|||
|
|
padded = row + [""] * (len(header) - len(row))
|
|||
|
|
cells = [self._escape_cell(cell) for cell in padded[: len(header)]]
|
|||
|
|
lines.append("| " + " | ".join(cells) + " |")
|
|||
|
|
|
|||
|
|
return "\n".join(lines) + "\n"
|
|||
|
|
|
|||
|
|
@staticmethod
|
|||
|
|
def _escape_cell(cell: str) -> str:
|
|||
|
|
"""转义单元格内容:换行符替换为 <br>,管道符替换为 |"""
|
|||
|
|
cell = cell.replace("|", "|")
|
|||
|
|
cell = cell.replace("\r\n", "<br>")
|
|||
|
|
cell = cell.replace("\n", "<br>")
|
|||
|
|
cell = cell.replace("\r", "<br>")
|
|||
|
|
return cell
|