"""CSV 文件解析器,使用 charset-normalizer 检测编码 + csv.reader 读取,转换为 Markdown 表格""" import csv import io import os from typing import List from charset_normalizer import detect from exceptions import ParseError from parsers.base import BaseParser class CsvParser(BaseParser): """CSV 文件解析器,将 CSV 数据转换为 Markdown 表格格式""" def supported_extensions(self) -> List[str]: return [".csv"] def parse(self, file_path: str) -> str: """ 解析 CSV 文件,自动检测编码,转换为 Markdown 表格格式。 参考 MaxKB CsvSplitHandle.get_content() 核心逻辑: 读取文件字节 → charset_normalizer 检测编码 → 解码 → csv.reader 读取 → 构建 Markdown 表格 Markdown 表格格式: | header1 | header2 | header3 | | --- | --- | --- | | data1 | data2 | data3 | 单元格处理:换行符替换为
,管道符转义为 | Args: file_path: 文件路径 Returns: Markdown 表格格式的文本内容 Raises: ParseError: 文件无法读取或编码检测失败时抛出 """ file_name = os.path.basename(file_path) try: with open(file_path, "rb") as f: buffer = f.read() except Exception as e: raise ParseError(file_name, f"文件读取失败: {e}") if len(buffer) == 0: return "" result = detect(buffer) encoding = result.get("encoding") if encoding is None: raise ParseError(file_name, "无法检测文件编码") try: text = buffer.decode(encoding) except Exception as e: raise ParseError(file_name, f"编码解码失败 ({encoding}): {e}") reader = csv.reader(io.StringIO(text)) rows = list(reader) if not rows: return "" header = rows[0] if not header: return "" lines = [] # Header row header_cells = [self._escape_cell(cell) for cell in header] lines.append("| " + " | ".join(header_cells) + " |") # Separator row lines.append("| " + " | ".join(["---"] * len(header)) + " |") # Data rows for row in rows[1:]: # Pad or truncate row to match header length padded = row + [""] * (len(header) - len(row)) cells = [self._escape_cell(cell) for cell in padded[: len(header)]] lines.append("| " + " | ".join(cells) + " |") return "\n".join(lines) + "\n" @staticmethod def _escape_cell(cell: str) -> str: """转义单元格内容:换行符替换为
,管道符替换为 |""" cell = cell.replace("|", "|") cell = cell.replace("\r\n", "
") cell = cell.replace("\n", "
") cell = cell.replace("\r", "
") return cell