Files
urbanLifeline/difyPlugin/pdf/tools/pdf_toc.py
2026-03-15 13:00:30 +08:00

313 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import re
from collections import OrderedDict
from collections.abc import Generator
from typing import Any
import fitz
from dify_plugin import Tool
from dify_plugin.entities.model.llm import LLMModelConfig
from dify_plugin.entities.model.message import SystemPromptMessage, UserPromptMessage
from dify_plugin.entities.tool import ToolInvokeMessage
_TOC_SYSTEM_PROMPT = """你是专业的PDF目录解析助手。请从以下PDF文本中提取文档的目录/章节结构。
要求:
1. 识别所有一级和二级标题及其对应的页码
2. 只返回纯JSON数组不要markdown代码块不要任何解释
3. 格式: [{"title": "章节标题", "page": 页码数字}]
4. 页码必须是文档中标注的实际页码数字
5. 如果无法识别目录,返回空数组 []"""
class PdfTocTool(Tool):
_TOC_PATTERNS = [
r"目录",
r"\s*录",
r"\u3000录",
r"Table of Contents",
r"Contents",
r"目次",
]
def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessage]:
file = tool_parameters.get("file")
if not file:
yield self.create_text_message("Error: file is required")
return
model_config = tool_parameters.get("model")
doc = fitz.open(stream=file.blob, filetype="pdf")
try:
num_pages = len(doc)
# 1) 优先从PDF元数据提取目录
catalog = self._catalog_from_metadata(doc.get_toc(), num_pages)
# 2) 元数据无目录时使用LLM解析
if not catalog and model_config:
catalog = self._extract_toc_with_llm(doc, num_pages, model_config)
# 3) 无LLM配置时回退到正则解析
if not catalog:
toc_start, toc_end = self._find_toc_pages(doc, num_pages)
if toc_start is not None and toc_end is not None:
toc_text = "\n".join(
doc[index].get_text() or "" for index in range(toc_start, toc_end + 1)
)
printed_catalog = self._parse_toc_lines(toc_text)
catalog = self._attach_page_indexes(printed_catalog, toc_end, num_pages)
if not catalog:
catalog = []
yield self.create_text_message(json.dumps(catalog, ensure_ascii=False))
finally:
doc.close()
def _extract_toc_with_llm(
self, doc: fitz.Document, num_pages: int, model_config: dict[str, Any]
) -> list[dict[str, int | str]]:
# 先尝试定位目录页
toc_start, toc_end = self._find_toc_pages(doc, num_pages)
if toc_start is not None and toc_end is not None:
# 有目录页,提取目录页文本
toc_text = "\n".join(
doc[index].get_text() or "" for index in range(toc_start, toc_end + 1)
)
content_offset = toc_end
else:
# 无目录页提取前15页文本让LLM识别章节结构
sample = min(num_pages, 15)
toc_text = "\n\n--- 第{}页 ---\n".join(
[""] + [doc[i].get_text() or "" for i in range(sample)]
)
toc_text = toc_text.strip()
if not toc_text:
return []
content_offset = 0
# 截断过长文本
if len(toc_text) > 15000:
toc_text = toc_text[:15000] + "\n...[截断]"
try:
response = self.session.model.llm.invoke(
model_config=LLMModelConfig(**model_config),
prompt_messages=[
SystemPromptMessage(content=_TOC_SYSTEM_PROMPT),
UserPromptMessage(content=toc_text),
],
stream=False,
)
llm_text = self._get_response_text(response)
if not llm_text:
return []
raw_catalog = self._parse_llm_json(llm_text)
if not raw_catalog:
return []
# 转换LLM返回的简单格式为完整catalog
return self._build_catalog_from_llm(raw_catalog, content_offset, num_pages)
except Exception:
return []
def _build_catalog_from_llm(
self, raw: list[dict], content_offset: int, num_pages: int
) -> list[dict[str, int | str]]:
entries: list[tuple[str, int]] = []
for item in raw:
title = str(item.get("title") or "").strip()
page = self._to_int(item.get("page"), None)
if not title or page is None:
continue
entries.append((title, page))
if not entries:
return []
# 计算偏移量:第一个条目的页码与实际内容起始页的差值
first_printed_page = entries[0][1]
offset = (content_offset + 1) - first_printed_page if content_offset > 0 else 0
result: list[dict[str, int | str]] = []
for i, (title, page) in enumerate(entries):
next_page = entries[i + 1][1] if i + 1 < len(entries) else page
page_start_index = max(0, min(page + offset - 1, num_pages - 1))
page_end_index = max(page_start_index, min(next_page + offset - 2, num_pages - 1))
if i == len(entries) - 1:
page_end_index = num_pages - 1
result.append({
"title": title,
"start": page,
"end": max(page, next_page - 1) if i + 1 < len(entries) else page,
"page_start_index": page_start_index,
"page_end_index": page_end_index,
})
return result
@staticmethod
def _get_response_text(response: Any) -> str:
if not hasattr(response, "message") or not response.message:
return ""
content = response.message.content
if isinstance(content, str):
text = content
elif isinstance(content, list):
text = "".join(
item.data if hasattr(item, "data") else str(item) for item in content
)
else:
text = str(content)
# 清理思考标签
text = re.sub(r"<think>[\s\S]*?</think>", "", text, flags=re.IGNORECASE)
text = re.sub(r"<\|[^>]+\|>", "", text)
return text.strip()
@staticmethod
def _parse_llm_json(text: str) -> list[dict]:
# 尝试提取JSON代码块
code_match = re.search(r"```(?:json)?\s*([\s\S]*?)```", text)
if code_match:
text = code_match.group(1).strip()
# 尝试找到JSON数组
bracket_match = re.search(r"\[[\s\S]*\]", text)
if bracket_match:
text = bracket_match.group(0)
try:
result = json.loads(text)
if isinstance(result, list):
return result
except Exception:
pass
return []
def _catalog_from_metadata(self, toc: list, num_pages: int) -> list[dict[str, int | str]]:
top = [(title, max(0, page - 1)) for level, title, page in toc if level <= 2 and page >= 1]
if not top:
return []
result: list[dict[str, int | str]] = []
for index, (title, start_index) in enumerate(top):
end_index = top[index + 1][1] - 1 if index + 1 < len(top) else num_pages - 1
result.append({
"title": title,
"start": start_index + 1,
"end": max(start_index, end_index) + 1,
"page_start_index": start_index,
"page_end_index": max(start_index, end_index),
})
return result
def _find_toc_pages(self, doc: fitz.Document, num_pages: int) -> tuple[int | None, int | None]:
toc_start = None
toc_end = None
for page_number in range(min(num_pages, 30)):
text = doc[page_number].get_text() or ""
if any(re.search(pattern, text, re.IGNORECASE) for pattern in self._TOC_PATTERNS):
if toc_start is None:
toc_start = page_number
toc_end = page_number
elif toc_start is not None:
break
return toc_start, toc_end
def _parse_toc_lines(self, text: str) -> list[dict[str, int | str]]:
marker = re.search(
r"^(List\s+of\s+Figures|List\s+of\s+Tables|图目录|表目录)",
text,
re.IGNORECASE | re.MULTILINE,
)
if marker:
text = text[: marker.start()]
pattern = re.compile(r"^\s*(?P<title>.+?)\s*(?:\.{2,}|\s)\s*(?P<page>\d{1,5})\s*$")
entries: list[tuple[str, int]] = []
for raw in text.splitlines():
line = raw.strip()
if not line or len(line) < 3 or re.fullmatch(r"\d+", line):
continue
match = pattern.match(line)
if not match:
continue
title = re.sub(r"\s+", " ", match.group("title")).strip("-_:")
page = self._to_int(match.group("page"), None)
if not title or page is None or len(title) <= 1:
continue
if title.lower() in {"page", "pages", "目录", "contents"}:
continue
entries.append((title, page))
if not entries:
return []
dedup: OrderedDict[str, int] = OrderedDict()
for title, page in entries:
dedup.setdefault(title, page)
titles = list(dedup.keys())
pages = [dedup[title] for title in titles]
result: list[dict[str, int | str]] = []
for index, title in enumerate(titles):
start = pages[index]
end = max(start, pages[index + 1] - 1) if index + 1 < len(pages) else start
result.append({"title": title, "start": start, "end": end})
return result
def _attach_page_indexes(
self, catalog: list[dict[str, int | str]], toc_end: int, num_pages: int
) -> list[dict[str, int | str]]:
if not catalog:
return []
first_page = None
for item in catalog:
start = self._to_int(item.get("start"), None)
if start is not None and (first_page is None or start < first_page):
first_page = start
if first_page is None:
return []
offset = (toc_end + 1) - first_page
result: list[dict[str, int | str]] = []
for item in catalog:
start = self._to_int(item.get("start"), None)
end = self._to_int(item.get("end"), start)
if start is None:
continue
if end is None:
end = start
page_start_index = max(0, min(start + offset, num_pages - 1))
page_end_index = max(page_start_index, min(end + offset, num_pages - 1))
result.append({
"title": str(item.get("title") or "Untitled"),
"start": start,
"end": max(start, end),
"page_start_index": page_start_index,
"page_end_index": page_end_index,
})
return result
@staticmethod
def _to_int(value: Any, default: int | None) -> int | None:
try:
if value is None or value == "":
return default
return int(value)
except Exception:
return default