# 人民日报爬虫
from typing import List, Optional
from core.ResultDomain import ResultDomain
from crawler.BaseCrawler import BaseCrawler, CrawlerConfig, NewsItem, UrlConfig
from loguru import logger
import re
import chardet
from datetime import datetime, timedelta
from bs4.element import NavigableString, Tag
from urllib.parse import urlparse
import json
class RmrbCrawler(BaseCrawler):
"""人民日报新闻爬虫"""
def __init__(self):
"""初始化人民日报爬虫"""
config = CrawlerConfig(
base_url="http://www.people.com.cn",
urls={
"search": UrlConfig(
url="http://search.people.cn/search-platform/front/search",
method="POST",
params={
"key": "",
"page": 1,
"limit": 10,
"hasTitle": True,
"hasContent": True,
"isFuzzy": True,
"type": 0, # 0 所有,1 新闻,2 互动,3 报刊,4 图片,5 视频
"sortType": 2, # 1 按相关度,2 按时间
"startTime": 0,
"endTime": 0
},
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Content-Type': 'application/json;charset=UTF-8'
}
),
"hot_point_rank": UrlConfig(
url="http://search.people.cn/search-platform/front/searchRank",
method="GET",
params={},
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Content-Type': 'application/json;charset=UTF-8'
}
),
"one_day_trending_news": UrlConfig(
url= "http://www.people.com.cn/GB/59476/review/{date}.html", # date:YYYYMMdd
method="GET",
params={},
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9',
}
)
},
)
super().__init__(config)
self.detail_map = {
"gba": self.parse_base_news_detail,
"politics": self.parse_base_news_detail,
"finance": self.parse_base_news_detail,
"cpc": self.parse_cpc_news_detail,
"theory": self.parse_cpc_news_detail,
}
def search(self, key: str, total: int, news_type: int = 0) -> ResultDomain:
"""
搜索人民日报新闻
Args:
key: 搜索关键词
total: 总数量
news_type: 新闻类型 (0=全部, 1=新闻, 2=互动, 3=报刊, 4=图片, 5=视频)
Returns:
新闻列表
"""
try:
resultDomain = ResultDomain(code=0, message="", success=True)
news_list = []
resultDomain.dataList = news_list
# 获取搜索配置
search_config = self.config.urls.get("search")
if not search_config:
logger.error("未找到搜索URL配置")
resultDomain.code = 0
resultDomain.message = "未找到搜索URL配置"
resultDomain.success = False
return resultDomain
page = 1
limit = 10
# 准备搜索参数
search_data = search_config.params.copy()
search_data["key"] = key
search_data["limit"] = limit
search_data["type"] = news_type
while len(news_list) < total:
search_data["page"] = page
response = self.fetch(search_config.url, method=search_config.method, json=search_data, headers=search_config.headers)
response_json = response.json()
if response_json.get("code") == '0':
records = response_json.get("data", {}).get("records", [])
for record in records:
news = self.parse_news_detail(record.get("url"))
if news.title == '':
news.title = record.get("title")
if news.contentRows == []:
# 如果contentOriginal是字符串,转换为列表格式
content_original = record.get("contentOriginal")
if isinstance(content_original, str):
news.contentRows = [{"type": "text", "content": content_original}]
elif isinstance(content_original, list):
news.contentRows = content_original
if not news.contentRows:
news.executeStatus= 1
news.executeMessage = "直接从接口响应获取"
if news.publishTime == '':
news.publishTime = str(datetime.fromtimestamp(record.get("displayTime", 0) / 1000).date())
if news.author == '':
news.author = record.get("author")
if news.source == '':
news.source = record.get("originName")
news_list.append(news)
else:
resultDomain.code = response_json.get("code")
resultDomain.message = f"获取搜索结果失败{response_json.get('message') or ''}"
resultDomain.success = False
return resultDomain
page += 1
resultDomain.success = True
resultDomain.code = 0
logger.info(f"搜索到 {len(news_list)} 条新闻")
return resultDomain
except Exception as e:
logger.error(f"搜索人民日报新闻失败: {str(e)}")
resultDomain.code = 0
resultDomain.message = "搜索人民日报新闻失败" + str(e)
resultDomain.success = False
return resultDomain
def hotPointRank(self) -> ResultDomain:
"""
获取人民日报热点排行
"""
try:
hot_point_rank_config = self.config.urls.get("hot_point_rank")
resultDomain = ResultDomain(code=0, message="", success=True)
news_list = []
resultDomain.dataList = news_list
if not hot_point_rank_config:
logger.error("未找到热点排行URL配置")
resultDomain.code = 0
resultDomain.message = "未找到热点排行URL配置"
resultDomain.success = False
return resultDomain
response = self.fetch(hot_point_rank_config.url, method=hot_point_rank_config.method, headers=hot_point_rank_config.headers)
response_json = response.json()
if response_json.get("code") == '0':
records = response_json.get("data", [])
for record in records:
news = self.parse_news_detail(record.get("url"))
news_list.append(news)
else:
resultDomain.code = response_json.get("code")
resultDomain.message = f"获取人民日报热点排行失败{response_json.get('message') or ''}"
resultDomain.success = False
return resultDomain
resultDomain.success = True
resultDomain.code = 0
logger.info(f"获取人民日报热点排行成功")
return resultDomain
except Exception as e:
logger.error(f"获取人民日报热点排行失败: {str(e)}")
resultDomain.code = 0
resultDomain.message = f"获取人民日报热点排行失败{str(e)}"
resultDomain.success = False
return resultDomain
def getOneDayTrendingNews(self, date: datetime) -> ResultDomain:
"""
获取人民日报一天内的热点新闻
"""
try:
resultDomain = ResultDomain(code=0, message="", success=True)
news_list = []
resultDomain.dataList = news_list
resultDomain.success = True
resultDomain.code = 0
logger.info(f"获取人民日报一天内的热点新闻成功")
date_str = date.strftime("%Y%m%d")
one_day_trending_news_config = self.config.urls.get("one_day_trending_news")
one_day_trending_news_config.url = one_day_trending_news_config.url.format(date=date_str)
response = self.fetch(one_day_trending_news_config.url, method=one_day_trending_news_config.method, headers=one_day_trending_news_config.headers)
if not response:
logger.error(f"获取响应失败: {one_day_trending_news_config.url}")
resultDomain.code = 0
resultDomain.message = f"获取响应失败{one_day_trending_news_config.url or ''}"
resultDomain.success = False
return resultDomain
soup = self.parse_html(response.content)
if not soup:
logger.error(f"解析HTML失败: {one_day_trending_news_config.url}")
resultDomain.code = 0
resultDomain.message = f"解析HTML失败{one_day_trending_news_config.url or ''}"
resultDomain.success = False
return resultDomain
all_doc_urls = []
bg01 = soup.find('td', class_="bg01")
indexfont13 = bg01.find('td', class_='indexfont13')
# 获取该 td 下的所有 a 标签
a_tags = indexfont13.find_all('a')
all_doc_urls.extend(a_tags)
bg02 = soup.find('td', class_="bg02")
p6 = bg02.find('td', class_='p6')
a_tags_p6 = p6.find_all('a')
all_doc_urls.extend(a_tags_p6)
for a_tag in all_doc_urls:
news = self.parse_news_detail(a_tag.get('href'))
news_list.append(news)
return resultDomain
except Exception as e:
logger.error(f"获取人民日报一天内的热点新闻失败: {str(e)}")
resultDomain.code = 0
resultDomain.message = f"获取人民日报一天内的热点新闻失败{str(e)}"
resultDomain.success = False
return resultDomain
def getDaysTrendingNews(self, start_date: datetime, end_date: datetime) -> ResultDomain:
"""
获取人民日报多天内的热点新闻
"""
try:
resultDomain = ResultDomain(code=0,message="", success=True)
news_list = []
resultDomain.dataList = news_list
resultDomain.success = True
resultDomain.code = 0
resultDomain.message = "获取人民日报多天内的热点新闻成功"
current_date = start_date
while current_date <= end_date:
day_result = self.getOneDayTrendingNews(current_date)
if day_result.success and day_result.dataList:
news_list.extend(day_result.dataList)
current_date += timedelta(days=1)
logger.info(f"获取人民日报多天内的热点新闻成功,共 {len(news_list)} 条")
return resultDomain
except Exception as e:
logger.error(f"获取人民日报多天内的热点新闻失败: {str(e)}")
resultDomain.code = 0
resultDomain.message = f"获取人民日报多天内的热点新闻失败{str(e)}"
resultDomain.success = False
return resultDomain
def parse_news_detail(self, url: str) -> Optional[NewsItem]:
# 从 URL 中提取 category
netloc = urlparse(url).netloc
category = "gba"
if netloc.endswith('.people.com.cn'):
category = netloc.split('.')[0]
# 从 detail_map 中获取对应的解析函数
print(category)
parser_func = self.detail_map.get(category)
if parser_func is None:
logger.error(f"未找到对应解析器,category={category}, url={url}")
parser_func = self.parse_base_news_detail
# return NewsItem(
# url=url,
# contentRows=[],
# title="",
# executeStatus=0,
# executeMessage=f"不支持的新闻类型: {category}"
# )
# 调用对应的解析方法(注意:这些方法是实例方法,需通过 self 调用)
return parser_func(url)
def parse_base_news_detail(self, url: str) -> Optional[NewsItem]:
"""
解析人民日报新闻详情
Args:
url: 新闻详情页URL
Returns:
新闻对象
"""
try:
response = self.fetch(url)
news = NewsItem(
title="",
contentRows=[], # 修复:使用 contents 而不是 content
url=url,
publishTime="",
author="",
source="人民网",
category="",
executeStatus=1,
executeMessage="成功解析新闻"
)
if not response:
logger.error(f"获取响应失败: {url}")
news.executeStatus = 0
news.executeMessage = f"获取响应失败: {url}"
return news
# BeautifulSoup 可以自动检测并解码编码,直接传入字节数据即可
# 它会从 HTML 的 标签或响应头自动检测编码
soup = self.parse_html(response.content)
if not soup:
logger.error("解析HTML失败")
news.executeStatus = 0
news.executeMessage = f"解析HTML失败"
return news
# 提取主内容区域
main_div = soup.select_one("div.layout.rm_txt.cf")
if not main_div:
logger.error("未找到主内容区域")
news.executeStatus = 0
news.executeMessage = f"未找到主内容区域"
return news
# 提取文章区域
article_div = main_div.select_one("div.col.col-1")
if not article_div:
logger.error("未找到文章区域")
news.executeStatus = 0
news.executeMessage = f"未找到文章区域"
return news
# 提取标题
title_tag = article_div.select_one("h1")
title = title_tag.get_text(strip=True) if title_tag else ""
# 提取作者
author_tag = article_div.select_one("div.author.cf")
author = author_tag.get_text(strip=True) if author_tag else ""
# 提取发布时间和来源
channel_div = article_div.select_one("div.channel.cf")
publish_time = ""
source = ""
if channel_div:
channel_info = channel_div.select_one("div.col-1-1")
if channel_info:
channel_text = channel_info.get_text(strip=True)
# 解析时间格式:2025年11月10日05:51 | 来源:人民网-人民日报
if '|' in channel_text:
parts = channel_text.split('|')
publish_time = parts[0].strip() if parts else ""
# 从 标签中提取来源
source_tag = channel_info.select_one("a")
if source_tag:
source = source_tag.get_text(strip=True)
elif len(parts) > 1 and '来源' in parts[1]:
# 如果没有 标签,从文本中提取
source = parts[1].replace('来源:', '').strip()
# 提取内容
content_div = article_div.select_one('div.rm_txt_con.cf')
contents = [] # 构建一个富文本内容
pList = content_div.find_all("p") # 所有p标签
# 解析p标签 变为quill富文本
for p in pList:
# 获取p的class内容
p_style = p.get("style")
content = ""
tag = "p"
if p.find('img'):
tag = "img"
src = p.find('img').get('src')
if src:
src = str(src) # 转换为字符串
if not src.startswith("http"):
src = self.config.base_url + src
content = f"
"
elif p.find('video'):
tag = "video"
src = p.find('video').get('src')
if src:
src = str(src) # 转换为字符串
if not src.startswith("http"):
src = self.config.base_url + src
content = f""
else:
content = str(p)
contents.append({
"tag": tag,
"content": content
})
news.title=title
news.contentRows=contents # 修复:使用 contents 而不是 content
news.url=url
news.publishTime=publish_time
news.author=author
news.source=source or "人民网"
news.category=""
logger.info(f"成功解析新闻: {title}")
return news
except Exception as e:
logger.error(f"解析新闻详情失败 [{url}]: {str(e)}")
news.executeStatus = 0
news.executeMessage = f"解析新闻详情失败: {str(e)}"
return news
def parse_cpc_news_detail(self, url: str) -> Optional[NewsItem]:
"""
解析人民日报新闻详情
"""
try:
response = self.fetch(url)
news = NewsItem(
title="",
contentRows=[], # 修复:使用 contents 而不是 content
url=url,
publishTime="",
author="",
source="人民网",
category="",
executeStatus=1,
executeMessage="成功解析新闻"
)
if not response:
logger.error(f"获取响应失败: {url}")
news.executeStatus = 0
news.executeMessage = f"获取响应失败: {url}"
return news
# BeautifulSoup 可以自动检测并解码编码,直接传入字节数据即可
# 它会从 HTML 的 标签或响应头自动检测编码
soup = self.parse_html(response.content)
if not soup:
logger.error("解析HTML失败")
news.executeStatus = 0
news.executeMessage = f"解析HTML失败"
return news
# 提取主内容区域
main_div = soup.select_one("div.text_con.text_con01")
if not main_div:
logger.error("未找到主内容区域")
news.executeStatus = 0
news.executeMessage = f"未找到主内容区域"
return news
# 提取文章区域
article_div = main_div.select_one("div.text_c")
if not article_div:
logger.error("未找到文章区域")
news.executeStatus = 0
news.executeMessage = f"未找到文章区域"
return news
# 提取标题
title_tag = article_div.select_one("h1")
title = title_tag.get_text(strip=True) if title_tag else ""
# 提取作者
author_tag = article_div.select_one("div.author.cf")
author = author_tag.get_text(strip=True) if author_tag else ""
# 提取发布时间和来源
channel_div = article_div.select_one("div.sou")
publish_time = ""
source = ""
if channel_div:
# 提取时间:取第一个非空文本节点
for child in channel_div.children:
if isinstance(child, str) and child.strip():
publish_time = child.strip().split("来源:")[0].strip()
break
# 提取来源
a_tag = channel_div.find("a")
source = a_tag.get_text(strip=True) if a_tag else ""
# 清理不可见空格
publish_time = publish_time.replace("\xa0", " ").replace(" ", " ").strip()
# 提取内容
content_div = article_div.select_one('div.show_text')
contents = [] # 构建一个富文本内容
pList = content_div.find_all("p") # 所有p标签
# 解析p标签 变为quill富文本
# 遍历 show_text 下的所有直接子节点(保持顺序)
for child in content_div.children:
# 只处理 Tag 类型的节点,跳过文本节点、注释等
if not isinstance(child, Tag):
continue
tag_name = child.name
# 情况1:检测是否是视频容器(根据 id 特征或内部结构)
video_tag = child.find('video') if tag_name != 'video' else child
if video_tag and video_tag.get('src'):
src = str(video_tag['src'])
p_style = video_tag.get("style", "")
if not src.startswith("http"):
src = self.config.base_url + src
contents.append({
"tag": "video",
"content": f""
})
continue
img_tag = child.find('img') if tag_name != 'img' else child
if img_tag and img_tag.get('src'):
src = str(img_tag['src'])
p_style = child.get("style", "")
if not src.startswith("http"):
src = self.config.base_url + src
contents.append({
"tag": "img",
"content": f"
"
})
continue
if tag_name == 'p':
p_style = child.get("style", "")
img_tag = child.find('img')
video_tag = child.find('video')
# 情况1:存在
或