Files
bigwo/batch.py
2026-03-02 17:38:28 +08:00

142 lines
5.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""批量处理模块,递归扫描文件夹并逐个处理,含容错和汇总报告
Coze 知识库适配:
- 文本类文件docx/doc/pdf/txt/html→ AI 分块后输出到 output/
- 表格类文件xlsx/xls/csv→ 直接复制到 tables/,上传 Coze 表格知识库
- 图片类文件 → 正常走 AI 流程(后续可能调整)
"""
import os
import shutil
from dataclasses import dataclass, field
from typing import List, Set, Tuple
from splitter import Splitter, TABLE_EXTENSIONS
# 所有支持的扩展名
SUPPORTED_EXTENSIONS: Set[str] = {
".txt", ".md", ".csv", ".html", ".htm",
".pdf", ".docx", ".doc",
".xlsx", ".xls",
".png", ".jpg", ".jpeg", ".bmp", ".gif", ".webp",
}
@dataclass
class BatchResult:
"""批量处理结果"""
success: List[str] = field(default_factory=list)
failed: List[Tuple[str, str]] = field(default_factory=list) # (file_path, error_msg)
skipped: List[str] = field(default_factory=list)
tables: List[str] = field(default_factory=list) # 直接复制的表格文件
def scan_files(input_dir: str) -> List[str]:
"""递归扫描文件夹,返回所有支持格式的文件路径列表(按名称排序)"""
files = []
for root, _, filenames in os.walk(input_dir):
for filename in sorted(filenames):
if filename.startswith("."):
continue
ext = os.path.splitext(filename)[1].lower()
if ext in SUPPORTED_EXTENSIONS:
files.append(os.path.join(root, filename))
return files
def batch_process(
splitter: Splitter,
input_dir: str,
output_dir: str,
skip_existing: bool = False,
output_format: str = "markdown",
) -> BatchResult:
"""
批量处理文件夹中的所有支持格式的文件。
表格类文件xlsx/xls/csv直接复制到 output_dir/tables/ 子文件夹,
不经过 AI 分块,用户可直接上传到 Coze 表格知识库。
Args:
splitter: Splitter 实例
input_dir: 输入文件夹路径
output_dir: 输出文件夹路径
skip_existing: 是否跳过已存在的输出文件
output_format: 输出格式 ("markdown""json")
Returns:
BatchResult 包含成功/失败/跳过/表格的文件列表
"""
result = BatchResult()
files = scan_files(input_dir)
total = len(files)
if total == 0:
print(f"未在 {input_dir} 中找到支持的文件")
return result
print(f"共扫描到 {total} 个文件待处理\n")
os.makedirs(output_dir, exist_ok=True)
tables_dir = os.path.join(output_dir, "tables")
for i, file_path in enumerate(files, start=1):
rel_path = os.path.relpath(file_path, input_dir)
file_ext = os.path.splitext(file_path)[1].lower()
# 表格类文件:直接复制到 tables/ 子文件夹
if file_ext in TABLE_EXTENSIONS:
os.makedirs(tables_dir, exist_ok=True)
dest = os.path.join(tables_dir, os.path.basename(file_path))
if skip_existing and os.path.exists(dest):
result.skipped.append(file_path)
print(f"[{i}/{total}] 跳过(已存在): {rel_path}")
continue
shutil.copy2(file_path, dest)
result.tables.append(file_path)
print(f"[{i}/{total}] 表格文件,直接复制: {rel_path} → tables/")
continue
# 文本/图片类文件:走 AI 分块流程
ext = ".json" if output_format == "json" else ".md"
output_path = os.path.join(
output_dir,
os.path.splitext(rel_path)[0] + ext,
)
if skip_existing and os.path.exists(output_path):
result.skipped.append(file_path)
print(f"[{i}/{total}] 跳过(已存在): {rel_path}")
continue
print(f"[{i}/{total}] 正在处理: {rel_path}")
try:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
splitter.process(file_path, output_path)
result.success.append(file_path)
print(f" ✓ 完成")
except Exception as e:
result.failed.append((file_path, str(e)))
print(f" ✗ 失败: {e}")
return result
def print_summary(result: BatchResult) -> None:
"""打印批量处理汇总报告"""
total = len(result.success) + len(result.failed) + len(result.skipped) + len(result.tables)
print(f"\n{'=' * 50}")
print(f"批量处理完成! 共 {total} 个文件")
print(f" ✓ 成功: {len(result.success)}")
print(f" ✗ 失败: {len(result.failed)}")
print(f" ⊘ 跳过: {len(result.skipped)}")
if result.tables:
print(f" 📊 表格(直接复制): {len(result.tables)}")
print(f" → 请上传 tables/ 文件夹内的文件到 Coze「表格知识库」")
if result.failed:
print(f"\n失败文件列表:")
for path, err in result.failed:
print(f" - {os.path.basename(path)}: {err}")