Files
bigwo/OPTIMIZATION.md

605 lines
19 KiB
Markdown
Raw Permalink Normal View History

# AI 知识库文档智能分块工具 — 优化方案
> 基于 `2026火山知识库用` 实际资料分析,共 12 项优化建议。
---
## 🔴 1. `.doc` 格式完全不支持
**现状:** `DocParser.supported_extensions()` 只返回 `[".docx"]``python-docx` 库无法读取旧版 Word 二进制格式(`.doc`)。
**影响范围:** 9 个文件无法处理:
```
培训新人起步三关.doc
招商与代理.doc
PM产品117个问与答.doc
PM产品后短暂皮肤发痒问与答.doc
PM产品暖炉原理介绍.doc
PM产品整应反应好转反应解析.doc
PM公司各国地址是电话.doc
PM公司介绍时间更新.doc
PM公司实力介绍.doc
```
**优化方案:** 新建 `parsers/legacy_doc_parser.py`,通过 `subprocess` 调用 LibreOffice 将 `.doc` 转为 `.docx`,再复用 `DocParser` 解析。
```python
# parsers/legacy_doc_parser.py
"""旧版 Word (.doc) 解析器,通过 LibreOffice 转换后复用 DocParser"""
import os
import subprocess
import tempfile
from typing import List
from exceptions import ParseError
from parsers.base import BaseParser
from parsers.doc_parser import DocParser
class LegacyDocParser(BaseParser):
"""旧版 .doc 文件解析器,先转换为 .docx 再解析"""
def __init__(self):
self._docx_parser = DocParser()
def supported_extensions(self) -> List[str]:
return [".doc"]
def parse(self, file_path: str) -> str:
file_name = os.path.basename(file_path)
with tempfile.TemporaryDirectory() as tmp_dir:
try:
subprocess.run(
[
"libreoffice", "--headless", "--convert-to", "docx",
"--outdir", tmp_dir, file_path,
],
capture_output=True, timeout=60, check=True,
)
except FileNotFoundError:
raise ParseError(file_name, "未安装 LibreOffice无法处理 .doc 文件。"
"请安装: https://www.libreoffice.org/download/")
except subprocess.TimeoutExpired:
raise ParseError(file_name, "LibreOffice 转换超时")
except subprocess.CalledProcessError as e:
raise ParseError(file_name, f"LibreOffice 转换失败: {e.stderr.decode()}")
# 找到转换后的 .docx 文件
base_name = os.path.splitext(file_name)[0] + ".docx"
converted_path = os.path.join(tmp_dir, base_name)
if not os.path.exists(converted_path):
raise ParseError(file_name, "LibreOffice 转换后未找到 .docx 文件")
return self._docx_parser.parse(converted_path)
```
**注册到 Splitter**
```python
# splitter.py 中新增
from parsers.legacy_doc_parser import LegacyDocParser
self._registry.register(LegacyDocParser())
```
**依赖:** 系统需安装 LibreOffice`brew install --cask libreoffice` / `apt install libreoffice`)。
---
## 🔴 2. 不支持批量处理文件夹
**现状:** `main.py` 只接受单个文件路径,处理 25+ 文档和 80+ 图片需要逐个手动执行。
**优化方案:** 在 `main.py` 中增加 `--batch` / `-b` 参数,支持传入文件夹路径,递归扫描所有支持的文件格式并批量处理。
```python
# main.py 新增参数
parser.add_argument(
"-b", "--batch",
default=None,
help="批量处理模式:指定输入文件夹路径,递归扫描所有支持的文件",
)
parser.add_argument(
"--output-dir",
default=None,
help="批量模式的输出目录(默认:输入文件夹下的 output/ 子目录)",
)
```
**新增 `batch.py` 批量处理模块:**
```python
# batch.py
"""批量处理模块,递归扫描文件夹并逐个处理"""
import os
from dataclasses import dataclass, field
from typing import List, Set
from splitter import Splitter
# 所有支持的扩展名
SUPPORTED_EXTENSIONS: Set[str] = {
".txt", ".md", ".csv", ".html", ".htm",
".pdf", ".docx", ".doc",
".xlsx", ".xls",
".png", ".jpg", ".jpeg", ".bmp", ".gif", ".webp",
}
@dataclass
class BatchResult:
"""批量处理结果"""
success: List[str] = field(default_factory=list)
failed: List[tuple] = field(default_factory=list) # (file_path, error_msg)
skipped: List[str] = field(default_factory=list)
def scan_files(input_dir: str) -> List[str]:
"""递归扫描文件夹,返回所有支持格式的文件路径列表"""
files = []
for root, _, filenames in os.walk(input_dir):
for filename in sorted(filenames):
ext = os.path.splitext(filename)[1].lower()
if ext in SUPPORTED_EXTENSIONS:
files.append(os.path.join(root, filename))
return files
def batch_process(
splitter: Splitter,
input_dir: str,
output_dir: str,
skip_existing: bool = False,
) -> BatchResult:
"""批量处理文件夹中的所有文件"""
result = BatchResult()
files = scan_files(input_dir)
total = len(files)
os.makedirs(output_dir, exist_ok=True)
for i, file_path in enumerate(files, start=1):
rel_path = os.path.relpath(file_path, input_dir)
output_path = os.path.join(
output_dir,
os.path.splitext(rel_path)[0] + ".md",
)
# 跳过已处理的文件
if skip_existing and os.path.exists(output_path):
result.skipped.append(file_path)
print(f"[{i}/{total}] 跳过(已存在): {rel_path}")
continue
print(f"[{i}/{total}] 正在处理: {rel_path}...")
try:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
splitter.process(file_path, output_path)
result.success.append(file_path)
except Exception as e:
result.failed.append((file_path, str(e)))
print(f" ✗ 失败: {e}")
return result
```
**使用方式:**
```bash
# 批量处理整个文件夹
python main.py -b "2026火山知识库用/" -k sk-xxx --output-dir output/
# 跳过已处理的文件(断点续传)
python main.py -b "2026火山知识库用/" -k sk-xxx --skip-existing
```
---
## 🔴 3. 批量处理无容错机制
**现状:** `main.py` 中任何异常都会 `sys.exit(1)` 直接退出,一个文件失败就全部中断。
**优化方案:** 在上面的 `batch.py` 中已包含容错逻辑try/except 包裹单个文件处理)。额外增加处理完成后的汇总报告:
```python
# batch.py 中新增
def print_summary(result: BatchResult) -> None:
"""打印批量处理汇总报告"""
total = len(result.success) + len(result.failed) + len(result.skipped)
print(f"\n{'='*50}")
print(f"处理完成! 共 {total} 个文件")
print(f" ✓ 成功: {len(result.success)}")
print(f" ✗ 失败: {len(result.failed)}")
print(f" ⊘ 跳过: {len(result.skipped)}")
if result.failed:
print(f"\n失败文件列表:")
for path, err in result.failed:
print(f" - {os.path.basename(path)}: {err}")
```
---
## 🟡 4. 图片解析器没有利用文件名上下文
**现状:** `ImageParser.parse()` 只传了图片二进制数据给 Vision API没有利用文件名中的产品名信息。
**影响:** 文件名如 `辅酶Q10.jpg``氨基酸.jpg` 本身就是强上下文提示,不用白不用。
**优化方案:** 修改 `ImageParser`,将文件名作为上下文传入 prompt。
```python
# parsers/image_parser.py
VISION_SYSTEM_PROMPT = (
"请识别并提取图片中的所有文字内容,包括产品名称、成分、功效、用法用量等信息。"
"请以结构化的方式输出。如果图片中没有文字,请详细描述图片的主要内容。"
)
def parse(self, file_path: str) -> str:
file_name = os.path.basename(file_path)
product_name = os.path.splitext(file_name)[0]
# ... 读取和编码图片 ...
# 将文件名作为上下文提示
context_prompt = (
f"{VISION_SYSTEM_PROMPT}\n\n"
f"参考信息:该图片的文件名为「{product_name}」,可能与图片内容相关。"
)
try:
result = self._api_client.vision(
system_prompt=context_prompt,
image_base64=image_base64,
)
except ApiError as e:
raise ParseError(file_name, f"Vision API 调用失败: {e}")
return result
```
---
## 🟡 5. Vision API 的 prompt 太通用
**现状:** 当前 prompt 是 `"请识别并提取图片中的所有文字内容。如果图片中没有文字,请描述图片的主要内容。"`
**影响:** 用户的图片是产品标注图(保健品、护肤品等),通用 prompt 无法引导 API 提取结构化的产品信息。
**优化方案:** 支持自定义 Vision prompt并提供更好的默认 prompt。
```python
# parsers/image_parser.py
# 默认 prompt 优化为结构化提取
VISION_SYSTEM_PROMPT = """\
请识别并提取图片中的所有文字和关键信息。请按以下结构输出:
1. **产品/主题名称**:图片展示的主要产品或主题
2. **文字内容**:图片中所有可见的文字,保持原始排版
3. **关键信息**:成分、功效、用法用量、规格、价格等结构化信息
4. **图片描述**:简要描述图片的视觉内容(产品外观、包装等)
如果某项信息不存在,可以省略该项。"""
class ImageParser(BaseParser):
def __init__(self, api_client: ApiClient, vision_prompt: str = None):
self._api_client = api_client
self._vision_prompt = vision_prompt or VISION_SYSTEM_PROMPT
```
**在 CLI 中增加参数:**
```python
# main.py
parser.add_argument(
"--vision-prompt",
default=None,
help="自定义图片识别的 system prompt",
)
```
---
## 🟡 6. 图片 MIME 类型硬编码为 `image/png`
**现状:** `api_client.py``vision()` 方法中写死了 `data:image/png;base64,...`
**影响:** 用户的图片全是 `.jpg`/`.jpeg`,虽然大多数 API 能容错处理,但不规范,可能导致某些模型解析异常。
**优化方案:** 让 `vision()` 接受 MIME 类型参数,由 `ImageParser` 根据扩展名传入。
```python
# api_client.py
EXTENSION_MIME_MAP = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".gif": "image/gif",
".bmp": "image/bmp",
".webp": "image/webp",
}
def vision(self, system_prompt: str, image_base64: str,
mime_type: str = "image/png", # 新增参数
model: str = "deepseek-chat") -> str:
def _call():
response = self._client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {
"url": f"data:{mime_type};base64,{image_base64}",
},
},
],
},
],
)
return response.choices[0].message.content
return self._retry(_call)
```
```python
# parsers/image_parser.py 调用时传入正确的 MIME 类型
from api_client import EXTENSION_MIME_MAP
ext = os.path.splitext(file_path)[1].lower()
mime_type = EXTENSION_MIME_MAP.get(ext, "image/png")
result = self._api_client.vision(
system_prompt=context_prompt,
image_base64=image_base64,
mime_type=mime_type,
)
```
---
## 🟡 7. `writer.py` 没有自动创建输出目录
**现状:** 如果输出路径的父目录不存在,`open()` 会抛出 `FileNotFoundError`
**优化方案:** 在 `write()` 方法开头加一行:
```python
# writer.py
def write(self, chunks, output_path, source_file, delimiter="---"):
# 自动创建输出目录
output_dir = os.path.dirname(output_path)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
# ... 后续逻辑不变 ...
```
改动极小,但能避免批量处理时因目录不存在而失败。
---
## 🟢 8. 缺少处理进度展示
**现状:** `splitter.py` 只打印简单的文字信息,批量处理时无法直观看到整体进度。
**优化方案:** 在 `batch.py` 中已包含 `[i/total]` 格式的进度展示(见第 2 项)。对于单文件处理,可以在 `splitter.py` 中增加更详细的阶段提示:
```python
# splitter.py
def process(self, input_path: str, output_path: str) -> None:
file_name = os.path.basename(input_path)
file_size = os.path.getsize(input_path)
size_str = f"{file_size / 1024:.1f}KB" if file_size < 1024 * 1024 else f"{file_size / 1024 / 1024:.1f}MB"
print(f"[1/4] 解析文件: {file_name} ({size_str})")
text = parser.parse(input_path)
print(f" 提取文本: {len(text)} 字符")
print(f"[2/4] AI 语义分块中...")
chunks = self._chunker.chunk(text, on_progress=progress_callback)
print(f"[3/4] 写入输出: {output_path}")
self._writer.write(chunks, output_path, source_file, self._delimiter)
print(f"[4/4] 完成! 共 {len(chunks)} 个分块")
```
---
## 🟢 9. 没有断点续传 / 跳过已处理文件
**现状:** 重新运行会重复处理所有文件,包括重复调用 API 产生费用。
**优化方案:** 在 `main.py` 中增加 `--skip-existing` 参数(已在第 2 项的 `batch.py` 中实现)。
```python
# main.py
parser.add_argument(
"--skip-existing",
action="store_true",
default=False,
help="跳过已存在的输出文件(避免重复处理和 API 费用)",
)
```
**单文件模式也应支持:**
```python
# main.py 的 main() 函数中
if args.skip_existing and os.path.exists(output_path):
print(f"输出文件已存在,跳过: {output_path}")
return
```
---
## 🟢 10. `_retry` 没有捕获网络异常
**现状:** `api_client.py` 的重试逻辑只处理 `openai.RateLimitError``openai.APIError`,不处理网络层异常。
**影响:** 批量处理 80+ 张图片时,网络超时、连接中断等问题很常见,当前会直接抛异常中断。
**优化方案:** 在 `_retry` 中增加对网络异常的重试:
```python
# api_client.py
def _retry(self, call: Callable[[], str]) -> str:
"""执行带指数退避重试的 API 调用,对速率限制和网络异常重试"""
for attempt in range(self.MAX_RETRIES + 1):
try:
return call()
except openai.RateLimitError:
if attempt < self.MAX_RETRIES:
self._sleep(self.RETRY_DELAYS[attempt])
else:
raise ApiError("速率限制重试耗尽", status_code=429)
except openai.APIConnectionError:
# 网络连接失败,重试
if attempt < self.MAX_RETRIES:
self._sleep(self.RETRY_DELAYS[attempt])
else:
raise ApiError("网络连接失败,重试耗尽")
except openai.APITimeoutError:
# 请求超时,重试
if attempt < self.MAX_RETRIES:
self._sleep(self.RETRY_DELAYS[attempt])
else:
raise ApiError("API 请求超时,重试耗尽")
except openai.APIError as e:
raise ApiError(str(e), status_code=getattr(e, "status_code", None))
```
---
## 🟢 11. 中文分块的 `PRE_SPLIT_SIZE` 可能偏小
**现状:** `chunker.py``PRE_SPLIT_SIZE = 8000` 字符。中文一个字就是一个字符8000 字符 ≈ 8000 字。
**影响:** 像 `PM产品117个问与答.doc` 这种长文档,问答对可能被切到不同段中,破坏语义完整性。
**优化方案:** 将 `PRE_SPLIT_SIZE` 改为可配置参数,并适当增大默认值:
```python
# chunker.py
class AIChunker:
DEFAULT_PRE_SPLIT_SIZE = 12000 # 增大到 12000更适合中文文档
def __init__(self, api_client: ApiClient, delimiter: str = "---",
pre_split_size: int = None):
self._api_client = api_client
self._delimiter = delimiter
self.PRE_SPLIT_SIZE = pre_split_size or self.DEFAULT_PRE_SPLIT_SIZE
```
```python
# main.py 新增参数
parser.add_argument(
"--chunk-size",
type=int,
default=None,
help="预切分大小(字符数),默认 12000。中文文档建议 10000-15000",
)
```
---
## 🟢 12. 缺少输出格式选项
**现状:** 只能输出 Markdown 格式。
**影响:** 知识库场景通常需要将分块结果导入向量数据库(如 Milvus、Pinecone、WeaviateJSON 格式更方便。
**优化方案:** 新增 `JsonWriter`,支持 `--format` 参数选择输出格式。
```python
# writer.py 新增 JsonWriter
import json
class JsonWriter:
"""将 Chunk 列表写入 JSON 文件,方便导入向量数据库"""
def write(
self,
chunks: List[Chunk],
output_path: str,
source_file: str,
delimiter: str = "---",
) -> None:
output_dir = os.path.dirname(output_path)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
# 将扩展名改为 .json
json_path = os.path.splitext(output_path)[0] + ".json"
data = {
"source_file": source_file,
"process_time": datetime.now().isoformat(),
"total_chunks": len(chunks),
"chunks": [
{
"index": i,
"title": chunk.title,
"content": chunk.content,
"char_count": len(chunk.content),
}
for i, chunk in enumerate(chunks)
],
}
if os.path.exists(json_path):
print(f"警告: 输出文件已存在,将覆盖: {json_path}")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
```
```python
# main.py 新增参数
parser.add_argument(
"-f", "--format",
choices=["markdown", "json"],
default="markdown",
help="输出格式(默认: markdown",
)
```
---
## 优化优先级总览
| 优先级 | 编号 | 问题 | 涉及文件 | 工作量 |
|--------|------|------|----------|--------|
| 🔴 P0 | 1 | `.doc` 格式不支持 | 新建 `legacy_doc_parser.py`,改 `splitter.py` | 中 |
| 🔴 P0 | 2 | 不支持批量处理 | 新建 `batch.py`,改 `main.py` | 中 |
| 🔴 P0 | 3 | 无容错机制 | `batch.py`(同上) | 小 |
| 🟡 P1 | 4 | 图片文件名未利用 | `parsers/image_parser.py` | 小 |
| 🟡 P1 | 5 | Vision prompt 太通用 | `parsers/image_parser.py` | 小 |
| 🟡 P1 | 6 | MIME 类型硬编码 | `api_client.py``parsers/image_parser.py` | 小 |
| 🟡 P1 | 7 | 输出目录不自动创建 | `writer.py` | 极小 |
| 🟢 P2 | 8 | 缺少进度展示 | `splitter.py` | 小 |
| 🟢 P2 | 9 | 无断点续传 | `main.py` | 小 |
| 🟢 P2 | 10 | 网络异常不重试 | `api_client.py` | 小 |
| 🟢 P2 | 11 | PRE_SPLIT_SIZE 偏小 | `chunker.py``main.py` | 小 |
| 🟢 P2 | 12 | 缺少 JSON 输出 | `writer.py``main.py` | 中 |
---
## 建议实施顺序
1. **第一阶段(核心功能):** #1 + #2 + #3 → 解决"能不能用"的问题
2. **第二阶段(质量提升):** #4 + #5 + #6 + #7 → 提升处理质量
3. **第三阶段(体验优化):** #8 + #9 + #10 + #11 + #12 → 提升使用体验