Files
schoolNews/schoolNewsCrawler/crawler/rmrb/RmrbCrawler.py
2025-11-28 17:16:17 +08:00

657 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 人民日报爬虫
from typing import List, Optional
from core.ResultDomain import ResultDomain
from crawler.BaseCrawler import BaseCrawler, CrawlerConfig, NewsItem, UrlConfig
from loguru import logger
import re
import chardet
from datetime import datetime, timedelta
from bs4.element import NavigableString, Tag
from urllib.parse import urlparse
import json
class RmrbCrawler(BaseCrawler):
"""人民日报新闻爬虫"""
def __init__(self):
"""初始化人民日报爬虫"""
config = CrawlerConfig(
base_url="http://www.people.com.cn",
urls={
"search": UrlConfig(
url="http://search.people.cn/search-platform/front/search",
method="POST",
params={
"key": "",
"page": 1,
"limit": 10,
"hasTitle": True,
"hasContent": True,
"isFuzzy": True,
"type": 0, # 0 所有1 新闻2 互动3 报刊4 图片5 视频
"sortType": 2, # 1 按相关度2 按时间
"startTime": 0,
"endTime": 0
},
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Content-Type': 'application/json;charset=UTF-8'
}
),
"hot_point_rank": UrlConfig(
url="http://search.people.cn/search-platform/front/searchRank",
method="GET",
params={},
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Content-Type': 'application/json;charset=UTF-8'
}
),
"one_day_trending_news": UrlConfig(
url= "http://www.people.com.cn/GB/59476/review/{date}.html", # date:YYYYMMdd
method="GET",
params={},
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9',
}
)
},
)
super().__init__(config)
self.detail_map = {
"gba": self.parse_base_news_detail,
"politics": self.parse_base_news_detail,
"finance": self.parse_base_news_detail,
"cpc": self.parse_cpc_news_detail,
"theory": self.parse_cpc_news_detail,
}
def search(self, key: str, total: int, news_type: int = 0) -> ResultDomain:
"""
搜索人民日报新闻
Args:
key: 搜索关键词
total: 总数量
news_type: 新闻类型 (0=全部, 1=新闻, 2=互动, 3=报刊, 4=图片, 5=视频)
Returns:
新闻列表
"""
try:
resultDomain = ResultDomain(code=0, message="", success=True)
news_list = []
resultDomain.dataList = news_list
# 获取搜索配置
search_config = self.config.urls.get("search")
if not search_config:
logger.error("未找到搜索URL配置")
resultDomain.code = 0
resultDomain.message = "未找到搜索URL配置"
resultDomain.success = False
return resultDomain
page = 1
limit = 10
# 准备搜索参数
search_data = search_config.params.copy()
search_data["key"] = key
search_data["limit"] = limit
search_data["type"] = news_type
while len(news_list) < total:
search_data["page"] = page
response = self.fetch(search_config.url, method=search_config.method, json=search_data, headers=search_config.headers)
response_json = response.json()
if response_json.get("code") == '0':
records = response_json.get("data", {}).get("records", [])
for record in records:
news = self.parse_news_detail(record.get("url"))
if news.title == '':
news.title = record.get("title")
if news.contentRows == []:
# 如果contentOriginal是字符串转换为列表格式
content_original = record.get("contentOriginal")
if isinstance(content_original, str):
news.contentRows = [{"type": "text", "content": content_original}]
elif isinstance(content_original, list):
news.contentRows = content_original
if not news.contentRows:
news.executeStatus= 1
news.executeMessage = "直接从接口响应获取"
if news.publishTime == '':
news.publishTime = str(datetime.fromtimestamp(record.get("displayTime", 0) / 1000).date())
if news.author == '':
news.author = record.get("author")
if news.source == '':
news.source = record.get("originName")
news_list.append(news)
else:
resultDomain.code = response_json.get("code")
resultDomain.message = f"获取搜索结果失败{response_json.get('message') or ''}"
resultDomain.success = False
return resultDomain
page += 1
resultDomain.success = True
resultDomain.code = 0
logger.info(f"搜索到 {len(news_list)} 条新闻")
return resultDomain
except Exception as e:
logger.error(f"搜索人民日报新闻失败: {str(e)}")
resultDomain.code = 0
resultDomain.message = "搜索人民日报新闻失败" + str(e)
resultDomain.success = False
return resultDomain
def hotPointRank(self) -> ResultDomain:
"""
获取人民日报热点排行
"""
try:
hot_point_rank_config = self.config.urls.get("hot_point_rank")
resultDomain = ResultDomain(code=0, message="", success=True)
news_list = []
resultDomain.dataList = news_list
if not hot_point_rank_config:
logger.error("未找到热点排行URL配置")
resultDomain.code = 0
resultDomain.message = "未找到热点排行URL配置"
resultDomain.success = False
return resultDomain
response = self.fetch(hot_point_rank_config.url, method=hot_point_rank_config.method, headers=hot_point_rank_config.headers)
response_json = response.json()
if response_json.get("code") == '0':
records = response_json.get("data", [])
for record in records:
news = self.parse_news_detail(record.get("url"))
news_list.append(news)
else:
resultDomain.code = response_json.get("code")
resultDomain.message = f"获取人民日报热点排行失败{response_json.get('message') or ''}"
resultDomain.success = False
return resultDomain
resultDomain.success = True
resultDomain.code = 0
logger.info(f"获取人民日报热点排行成功")
return resultDomain
except Exception as e:
logger.error(f"获取人民日报热点排行失败: {str(e)}")
resultDomain.code = 0
resultDomain.message = f"获取人民日报热点排行失败{str(e)}"
resultDomain.success = False
return resultDomain
def getOneDayTrendingNews(self, date: datetime) -> ResultDomain:
"""
获取人民日报一天内的热点新闻
"""
try:
resultDomain = ResultDomain(code=0, message="", success=True)
news_list = []
resultDomain.dataList = news_list
resultDomain.success = True
resultDomain.code = 0
logger.info(f"获取人民日报一天内的热点新闻成功")
date_str = date.strftime("%Y%m%d")
one_day_trending_news_config = self.config.urls.get("one_day_trending_news")
one_day_trending_news_config.url = one_day_trending_news_config.url.format(date=date_str)
response = self.fetch(one_day_trending_news_config.url, method=one_day_trending_news_config.method, headers=one_day_trending_news_config.headers)
if not response:
logger.error(f"获取响应失败: {one_day_trending_news_config.url}")
resultDomain.code = 0
resultDomain.message = f"获取响应失败{one_day_trending_news_config.url or ''}"
resultDomain.success = False
return resultDomain
soup = self.parse_html(response.content)
if not soup:
logger.error(f"解析HTML失败: {one_day_trending_news_config.url}")
resultDomain.code = 0
resultDomain.message = f"解析HTML失败{one_day_trending_news_config.url or ''}"
resultDomain.success = False
return resultDomain
all_doc_urls = []
bg01 = soup.find('td', class_="bg01")
indexfont13 = bg01.find('td', class_='indexfont13')
# 获取该 td 下的所有 a 标签
a_tags = indexfont13.find_all('a')
all_doc_urls.extend(a_tags)
bg02 = soup.find('td', class_="bg02")
p6 = bg02.find('td', class_='p6')
a_tags_p6 = p6.find_all('a')
all_doc_urls.extend(a_tags_p6)
for a_tag in all_doc_urls:
news = self.parse_news_detail(a_tag.get('href'))
news_list.append(news)
return resultDomain
except Exception as e:
logger.error(f"获取人民日报一天内的热点新闻失败: {str(e)}")
resultDomain.code = 0
resultDomain.message = f"获取人民日报一天内的热点新闻失败{str(e)}"
resultDomain.success = False
return resultDomain
def getDaysTrendingNews(self, start_date: datetime, end_date: datetime) -> ResultDomain:
"""
获取人民日报多天内的热点新闻
"""
try:
resultDomain = ResultDomain(code=0,message="", success=True)
news_list = []
resultDomain.dataList = news_list
resultDomain.success = True
resultDomain.code = 0
resultDomain.message = "获取人民日报多天内的热点新闻成功"
current_date = start_date
while current_date <= end_date:
day_result = self.getOneDayTrendingNews(current_date)
if day_result.success and day_result.dataList:
news_list.extend(day_result.dataList)
current_date += timedelta(days=1)
logger.info(f"获取人民日报多天内的热点新闻成功,共 {len(news_list)}")
return resultDomain
except Exception as e:
logger.error(f"获取人民日报多天内的热点新闻失败: {str(e)}")
resultDomain.code = 0
resultDomain.message = f"获取人民日报多天内的热点新闻失败{str(e)}"
resultDomain.success = False
return resultDomain
def parse_news_detail(self, url: str) -> Optional[NewsItem]:
# 从 URL 中提取 category
netloc = urlparse(url).netloc
category = "gba"
if netloc.endswith('.people.com.cn'):
category = netloc.split('.')[0]
# 从 detail_map 中获取对应的解析函数
print(category)
parser_func = self.detail_map.get(category)
if parser_func is None:
logger.error(f"未找到对应解析器category={category}, url={url}")
parser_func = self.parse_base_news_detail
# return NewsItem(
# url=url,
# contentRows=[],
# title="",
# executeStatus=0,
# executeMessage=f"不支持的新闻类型: {category}"
# )
# 调用对应的解析方法(注意:这些方法是实例方法,需通过 self 调用)
return parser_func(url)
def parse_base_news_detail(self, url: str) -> Optional[NewsItem]:
"""
解析人民日报新闻详情
Args:
url: 新闻详情页URL
Returns:
新闻对象
"""
try:
response = self.fetch(url)
news = NewsItem(
title="",
contentRows=[], # 修复:使用 contents 而不是 content
url=url,
publishTime="",
author="",
source="人民网",
category="",
executeStatus=1,
executeMessage="成功解析新闻"
)
if not response:
logger.error(f"获取响应失败: {url}")
news.executeStatus = 0
news.executeMessage = f"获取响应失败: {url}"
return news
# BeautifulSoup 可以自动检测并解码编码,直接传入字节数据即可
# 它会从 HTML 的 <meta charset> 标签或响应头自动检测编码
soup = self.parse_html(response.content)
if not soup:
logger.error("解析HTML失败")
news.executeStatus = 0
news.executeMessage = f"解析HTML失败"
return news
# 提取主内容区域
main_div = soup.select_one("div.layout.rm_txt.cf")
if not main_div:
logger.error("未找到主内容区域")
news.executeStatus = 0
news.executeMessage = f"未找到主内容区域"
return news
# 提取文章区域
article_div = main_div.select_one("div.col.col-1")
if not article_div:
logger.error("未找到文章区域")
news.executeStatus = 0
news.executeMessage = f"未找到文章区域"
return news
# 提取标题
title_tag = article_div.select_one("h1")
title = title_tag.get_text(strip=True) if title_tag else ""
# 提取作者
author_tag = article_div.select_one("div.author.cf")
author = author_tag.get_text(strip=True) if author_tag else ""
# 提取发布时间和来源
channel_div = article_div.select_one("div.channel.cf")
publish_time = ""
source = ""
if channel_div:
channel_info = channel_div.select_one("div.col-1-1")
if channel_info:
channel_text = channel_info.get_text(strip=True)
# 解析时间格式2025年11月10日05:51 | 来源:人民网-人民日报
if '|' in channel_text:
parts = channel_text.split('|')
publish_time = parts[0].strip() if parts else ""
# 从 <a> 标签中提取来源
source_tag = channel_info.select_one("a")
if source_tag:
source = source_tag.get_text(strip=True)
elif len(parts) > 1 and '来源' in parts[1]:
# 如果没有 <a> 标签,从文本中提取
source = parts[1].replace('来源:', '').strip()
# 提取内容
content_div = article_div.select_one('div.rm_txt_con.cf')
contents = [] # 构建一个富文本内容
pList = content_div.find_all("p") # 所有p标签
# 解析p标签 变为quill富文本
for p in pList:
# 获取p的class内容
p_style = p.get("style")
content = ""
tag = "p"
if p.find('img'):
tag = "img"
src = p.find('img').get('src')
if src:
src = str(src) # 转换为字符串
if not src.startswith("http"):
src = self.config.base_url + src
if src=="http://www.people.com.cn/img/2020wbc/imgs/share.png": #分享图片跳过
continue
content = f"<img style='{p_style}' src='{src}' />"
elif p.find('video'):
tag = "video"
src = p.find('video').get('src')
if src:
src = str(src) # 转换为字符串
if not src.startswith("http"):
src = self.config.base_url + src
content = f"<video style='align-items: center;' src='{src}' />"
else:
content = str(p)
contents.append({
"tag": tag,
"content": content
})
news.title=title
news.contentRows=contents # 修复:使用 contents 而不是 content
news.url=url
news.publishTime=publish_time
news.author=author
news.source=source or "人民网"
news.category=""
logger.info(f"成功解析新闻: {title}")
return news
except Exception as e:
logger.error(f"解析新闻详情失败 [{url}]: {str(e)}")
news.executeStatus = 0
news.executeMessage = f"解析新闻详情失败: {str(e)}"
return news
def parse_cpc_news_detail(self, url: str) -> Optional[NewsItem]:
"""
解析人民日报新闻详情
"""
try:
response = self.fetch(url)
news = NewsItem(
title="",
contentRows=[], # 修复:使用 contents 而不是 content
url=url,
publishTime="",
author="",
source="人民网",
category="",
executeStatus=1,
executeMessage="成功解析新闻"
)
if not response:
logger.error(f"获取响应失败: {url}")
news.executeStatus = 0
news.executeMessage = f"获取响应失败: {url}"
return news
# BeautifulSoup 可以自动检测并解码编码,直接传入字节数据即可
# 它会从 HTML 的 <meta charset> 标签或响应头自动检测编码
soup = self.parse_html(response.content)
if not soup:
logger.error("解析HTML失败")
news.executeStatus = 0
news.executeMessage = f"解析HTML失败"
return news
# 提取主内容区域
main_div = soup.select_one("div.text_con.text_con01")
if not main_div:
logger.error("未找到主内容区域")
news.executeStatus = 0
news.executeMessage = f"未找到主内容区域"
return news
# 提取文章区域
article_div = main_div.select_one("div.text_c")
if not article_div:
logger.error("未找到文章区域")
news.executeStatus = 0
news.executeMessage = f"未找到文章区域"
return news
# 提取标题
title_tag = article_div.select_one("h1")
title = title_tag.get_text(strip=True) if title_tag else ""
# 提取作者
author_tag = article_div.select_one("div.author.cf")
author = author_tag.get_text(strip=True) if author_tag else ""
# 提取发布时间和来源
channel_div = article_div.select_one("div.sou")
publish_time = ""
source = ""
if channel_div:
# 提取时间:取第一个非空文本节点
for child in channel_div.children:
if isinstance(child, str) and child.strip():
publish_time = child.strip().split("来源:")[0].strip()
break
# 提取来源
a_tag = channel_div.find("a")
source = a_tag.get_text(strip=True) if a_tag else ""
# 清理不可见空格
publish_time = publish_time.replace("\xa0", " ").replace(" ", " ").strip()
# 提取内容
content_div = article_div.select_one('div.show_text')
contents = [] # 构建一个富文本内容
pList = content_div.find_all("p") # 所有p标签
# 解析p标签 变为quill富文本
# 遍历 show_text 下的所有直接子节点(保持顺序)
for child in content_div.children:
# 只处理 Tag 类型的节点,跳过文本节点、注释等
if not isinstance(child, Tag):
continue
tag_name = child.name
# 情况1检测是否是视频容器根据 id 特征或内部结构)
video_tag = child.find('video') if tag_name != 'video' else child
if video_tag and video_tag.get('src'):
src = str(video_tag['src'])
p_style = video_tag.get("style", "")
if not src.startswith("http"):
src = self.config.base_url + src
contents.append({
"tag": "video",
"content": f"<video style='{p_style}' src='{src}'></video>"
})
continue
img_tag = child.find('img') if tag_name != 'img' else child
if img_tag and img_tag.get('src'):
src = str(img_tag['src'])
p_style = child.get("style", "")
if not src.startswith("http"):
src = self.config.base_url + src
contents.append({
"tag": "img",
"content": f"<img style='{p_style}' src='{src}' />"
})
continue
if tag_name == 'p':
p_style = child.get("style", "")
img_tag = child.find('img')
video_tag = child.find('video')
# 情况1存在 <img> 或 <video> 标签(静态资源)
if img_tag or video_tag:
src = img_tag.get('src') if img_tag else video_tag.get('src')
if src:
src = str(src)
if not src.startswith(('http://', 'https://')):
src = self.config.base_url.rstrip('/') + '/' + src.lstrip('/')
tag_type = "img" if img_tag else "video"
if img_tag:
content_html = f"<img style='{p_style}' src='{src}' />"
else:
content_html = f"<video style='{p_style}' src='{src}' controls></video>"
contents.append({
"tag": tag_type,
"content": content_html
})
else:
# 无 src当作普通段落
contents.append({"tag": "p", "content": str(child)})
continue
# 情况2检查是否包含人民网的 showPlayer 脚本(动态视频)
script_tags = child.find_all('script')
video_src = None
poster_url = None
for script in script_tags:
script_text = script.string or ""
if "showPlayer" not in script_text:
continue
# 使用正则精准提取 src 和 posterUrl支持空格、换行
src_match = re.search(r"src\s*:\s*'([^']*)'", script_text)
poster_match = re.search(r"posterUrl\s*:\s*'([^']*)'", script_text)
if src_match:
video_src = src_match.group(1)
if poster_match:
poster_url = poster_match.group(1)
if video_src:
break # 找到视频源即可退出
if video_src:
# 补全 URL确保是绝对路径
if not video_src.startswith(('http://', 'https://')):
video_src = self.config.base_url.rstrip('/') + '/' + video_src.lstrip('/')
if poster_url and not poster_url.startswith(('http://', 'https://')):
poster_url = self.config.base_url.rstrip('/') + '/' + poster_url.lstrip('/')
# 构造 video 标签属性
attrs_parts = []
if p_style:
attrs_parts.append(f"style='{p_style}'")
if poster_url:
attrs_parts.append(f"poster='{poster_url}'")
attrs_parts.append("controls")
attrs = " ".join(attrs_parts)
contents.append({
"tag": "video",
"content": f"<video {attrs} src='{video_src}'></video>"
})
else:
# 普通段落文本
contents.append({
"tag": "p",
"content": str(child)
})
continue
news.title=title
news.contentRows=contents # 修复:使用 contents 而不是 content
news.url=url
news.publishTime=publish_time
news.author=author
news.source=source or "人民网"
news.category=""
logger.info(f"成功解析新闻: {title}")
return news
except Exception as e:
logger.error(f"解析新闻详情失败 [{url}]: {str(e)}")
return NewsItem(
title="",
contentRows=[],
url=url,
publishTime="",
author="",
source="人民网",
category="",
executeStatus=0,
executeMessage=f"解析新闻详情失败: {str(e)}"
)