Files
ztb/processors/content_fetcher.py
2026-02-13 18:15:20 +08:00

294 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
内容获取器 - 获取详情页文本 + 附件内容
"""
import logging
import os
import random
import re
import time
import requests
import urllib3
import pdfplumber
from bs4 import BeautifulSoup
from docx import Document
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger("ztb")
class ContentFetcher:
"""页面内容 + 附件获取器"""
# 速率控制参数
RPM_LIMIT = 12 # 每分钟最大请求数
DELAY_MIN = 1.5 # 请求间最小延迟(秒)
DELAY_MAX = 3.0 # 请求间最大延迟(秒)
MAX_DOWNLOAD_MB = 50 # 单个附件最大体积MB
def __init__(self, temp_dir: str = "temp_files"):
self.temp_dir = temp_dir
os.makedirs(temp_dir, exist_ok=True)
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36",
}
self._req_timestamps = [] # 用于 RPM 限速
# ---------- 公开方法 ----------
def get_full_content(self, url: str, max_attachments: int = 2) -> str:
"""
获取页面文本 + 附件解析文本,合并返回(单次请求)
Args:
url: 详情页 URL
max_attachments: 最多处理的附件数
Returns:
合并后的全文文本
"""
# 1. 获取页面 HTML单次请求
html = self._fetch_html(url)
if not html:
return ""
# 2. 提取页面纯文本
soup = BeautifulSoup(html, "html.parser")
page_content = soup.get_text(separator="\n", strip=True)
# 3. 提取发布时间
publish_time = self._extract_publish_time(soup, page_content)
if publish_time:
page_content = f"发布时间: {publish_time}\n\n" + page_content
# 4. 从同一 HTML 查找并解析附件
attachments = self._find_attachments(soup, url)
attachment_content = ""
for att in attachments[:max_attachments]:
att_text = self._download_and_parse(att["url"], att["name"])
if att_text:
attachment_content += f"\n\n=== 附件: {att['name']} ===\n{att_text}"
full_content = page_content
if attachment_content:
full_content += attachment_content
return full_content
@staticmethod
def _extract_publish_time(soup: BeautifulSoup, page_content: str) -> str:
"""
从页面中提取发布时间
Args:
soup: BeautifulSoup 对象
page_content: 页面纯文本
Returns:
发布时间字符串,如 "2026-02-13 16:12:28"
"""
# 1. 尝试从页面文本中提取
patterns = [
r'信息发布时间[:]\s*([\d-]+\s[\d:]+)',
r'发布时间[:]\s*([\d-]+\s[\d:]+)',
r'发布日期[:]\s*([\d-]+\s[\d:]+)',
r'发布时间[:]\s*([\d-]+)',
r'发布日期[:]\s*([\d-]+)',
]
for pattern in patterns:
match = re.search(pattern, page_content)
if match:
return match.group(1).strip()
# 2. 尝试从HTML标签中提取
time_tags = soup.find_all(['time', 'span', 'div'], class_=re.compile(r'time|date|publish', re.I))
for tag in time_tags:
text = tag.get_text(strip=True)
match = re.search(r'([\d-]+\s[\d:]+)', text)
if match:
return match.group(1).strip()
return ""
# ---------- 速率控制 ----------
def _throttle(self):
"""请求前限速RPM 上限 + 随机延迟"""
now = time.time()
self._req_timestamps = [
t for t in self._req_timestamps if now - t < 60]
if len(self._req_timestamps) >= self.RPM_LIMIT:
wait = 60 - (now - self._req_timestamps[0]) + random.uniform(1, 3)
if wait > 0:
logger.debug(f"ContentFetcher 限速等待 {wait:.0f}s")
time.sleep(wait)
self._req_timestamps.append(time.time())
time.sleep(random.uniform(self.DELAY_MIN, self.DELAY_MAX))
# ---------- 页面获取 ----------
def _fetch_html(self, url: str, max_retries: int = 3) -> str:
"""获取页面 HTML 原文"""
self._throttle()
for retry in range(max_retries):
try:
resp = requests.get(url, headers=self.headers,
timeout=45, verify=False)
resp.encoding = "utf-8"
if resp.status_code != 200:
logger.warning(f"页面返回 {resp.status_code}: {url[:60]}")
if retry < max_retries - 1:
time.sleep(3)
continue
return ""
logger.debug(f"页面获取成功 {len(resp.text)} 字符: {url[:60]}")
return resp.text
except Exception as e:
logger.warning(f"获取页面失败 ({retry+1}/{max_retries}): {e}")
if retry < max_retries - 1:
time.sleep(3)
return ""
# ---------- 附件发现 ----------
@staticmethod
def _find_attachments(soup: BeautifulSoup, base_url: str) -> list:
"""从已解析的 HTML 中查找附件链接"""
attachments = []
for link in soup.find_all("a"):
href = link.get("href", "")
text = link.get_text(strip=True)
if any(ext in href.lower() for ext in [".pdf", ".doc", ".docx"]):
if not href.startswith("http"):
if href.startswith("/"):
base = "/".join(base_url.split("/")[:3])
href = base + href
else:
href = base_url.rsplit("/", 1)[0] + "/" + href
attachments.append({
"name": text or href.split("/")[-1],
"url": href,
})
return attachments
# ---------- 附件下载与解析 ----------
def _download_and_parse(self, url: str, filename: str,
max_retries: int = 3) -> str:
"""下载附件并解析为文本"""
self._throttle()
file_type = self._get_file_type(url)
max_bytes = self.MAX_DOWNLOAD_MB * 1024 * 1024
for retry in range(max_retries):
try:
logger.debug(f"下载附件: {filename}")
resp = requests.get(url, headers=self.headers,
timeout=90, verify=False, stream=True)
resp.raise_for_status()
temp_path = os.path.join(
self.temp_dir, f"temp_{hash(url)}.{file_type}")
total = 0
with open(temp_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
total += len(chunk)
if total > max_bytes:
logger.warning(
f"附件超过 {self.MAX_DOWNLOAD_MB}MB 限制,跳过: {filename}")
break
if total > max_bytes:
try:
os.remove(temp_path)
except OSError:
pass
return ""
logger.debug(f"附件已下载 {total/1024:.1f}KB: {filename}")
try:
if file_type == "pdf":
return self._parse_pdf(temp_path)
elif file_type in ("doc", "docx"):
return self._parse_word(temp_path)
return ""
finally:
try:
os.remove(temp_path)
except OSError:
pass
except Exception as e:
logger.warning(f"附件处理失败 ({retry+1}/{max_retries}): {e}")
if retry < max_retries - 1:
time.sleep(4)
return ""
# ---------- 文件解析 ----------
@staticmethod
def _parse_pdf(file_path: str) -> str:
"""解析 PDF 文件"""
try:
text = ""
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
return text
except Exception as e:
logger.warning(f"PDF解析失败: {e}")
return ""
@staticmethod
def _parse_word(file_path: str) -> str:
"""解析 Word 文件(支持 .doc 和 .docx"""
# 尝试 python-docx (适用于 .docx)
try:
doc = Document(file_path)
text = "\n".join(p.text for p in doc.paragraphs)
if len(text) > 500:
return text
except Exception:
pass
# 回退: UTF-16LE 解码 (适用于 .doc)
try:
with open(file_path, "rb") as f:
content = f.read()
raw = content.decode("utf-16le", errors="ignore")
readable = []
for c in raw:
if "\u4e00" <= c <= "\u9fff" or c in ",。;:""''《》【】、0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz%.×+- \n□☑":
readable.append(c)
elif readable and readable[-1] != " ":
readable.append(" ")
text = re.sub(r" +", " ", "".join(readable))
if len(text) > 500:
return text
except Exception:
pass
return ""
@staticmethod
def _get_file_type(filename: str) -> str:
"""根据文件名/URL 判断文件类型"""
low = filename.lower()
if ".pdf" in low:
return "pdf"
if ".docx" in low:
return "docx"
if ".doc" in low:
return "doc"
return "unknown"