# 人民日报爬虫 from typing import List, Optional from core.ResultDomain import ResultDomain from crawler.BaseCrawler import BaseCrawler, CrawlerConfig, NewsItem, UrlConfig from loguru import logger import re import chardet from datetime import datetime, timedelta from bs4 import NavigableString from urllib.parse import urlparse import json class RmrbCrawler(BaseCrawler): """人民日报新闻爬虫""" def __init__(self): """初始化人民日报爬虫""" config = CrawlerConfig( base_url="http://www.people.com.cn", urls={ "search": UrlConfig( url="http://search.people.cn/search-platform/front/search", method="POST", params={ "key": "", "page": 1, "limit": 10, "hasTitle": True, "hasContent": True, "isFuzzy": True, "type": 0, # 0 所有,1 新闻,2 互动,3 报刊,4 图片,5 视频 "sortType": 2, # 1 按相关度,2 按时间 "startTime": 0, "endTime": 0 }, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Content-Type': 'application/json;charset=UTF-8' } ), "hot_point_rank": UrlConfig( url="http://search.people.cn/search-platform/front/searchRank", method="GET", params={}, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Content-Type': 'application/json;charset=UTF-8' } ), "one_day_trending_news": UrlConfig( url= "http://www.people.com.cn/GB/59476/review/{date}.html", # date:YYYYMMdd method="GET", params={}, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9', } ) }, ) super().__init__(config) self.detail_map = { "gba": self.parse_base_news_detail, "politics": self.parse_base_news_detail, "finance": self.parse_base_news_detail, "cpc": self.parse_cpc_news_detail, } def search(self, key: str, total: int, news_type: int = 0) -> ResultDomain: """ 搜索人民日报新闻 Args: key: 搜索关键词 total: 总数量 news_type: 新闻类型 (0=全部, 1=新闻, 2=互动, 3=报刊, 4=图片, 5=视频) Returns: 新闻列表 """ try: resultDomain = ResultDomain(code=0, message="", success=True) news_list = [] resultDomain.dataList = news_list # 获取搜索配置 search_config = self.config.urls.get("search") if not search_config: logger.error("未找到搜索URL配置") resultDomain.code = 0 resultDomain.message = "未找到搜索URL配置" resultDomain.success = False return resultDomain page = 1 limit = 10 # 准备搜索参数 search_data = search_config.params.copy() search_data["key"] = key search_data["limit"] = limit search_data["type"] = news_type while len(news_list) < total: search_data["page"] = page response = self.fetch(search_config.url, method=search_config.method, json=search_data, headers=search_config.headers) response_json = response.json() if response_json.get("code") == '0': records = response_json.get("data", {}).get("records", []) for record in records: news = self.parse_news_detail(record.get("url")) if news.title == '': news.title = record.get("title") if news.contentRows == []: # 如果contentOriginal是字符串,转换为列表格式 content_original = record.get("contentOriginal") if isinstance(content_original, str): news.contentRows = [{"type": "text", "content": content_original}] elif isinstance(content_original, list): news.contentRows = content_original if not news.contentRows: news.executeStatus= 1 news.executeMessage = "直接从接口响应获取" if news.publishTime == '': news.publishTime = str(datetime.fromtimestamp(record.get("displayTime", 0) / 1000).date()) if news.author == '': news.author = record.get("author") if news.source == '': news.source = record.get("originName") news_list.append(news) else: resultDomain.code = response_json.get("code") resultDomain.message = f"获取搜索结果失败{response_json.get('message') or ''}" resultDomain.success = False return resultDomain page += 1 resultDomain.success = True resultDomain.code = 0 logger.info(f"搜索到 {len(news_list)} 条新闻") return resultDomain except Exception as e: logger.error(f"搜索人民日报新闻失败: {str(e)}") resultDomain.code = 0 resultDomain.message = "搜索人民日报新闻失败" + str(e) resultDomain.success = False return resultDomain def hotPointRank(self) -> ResultDomain: """ 获取人民日报热点排行 """ try: hot_point_rank_config = self.config.urls.get("hot_point_rank") resultDomain = ResultDomain(code=0, message="", success=True) news_list = [] resultDomain.dataList = news_list if not hot_point_rank_config: logger.error("未找到热点排行URL配置") resultDomain.code = 0 resultDomain.message = "未找到热点排行URL配置" resultDomain.success = False return resultDomain response = self.fetch(hot_point_rank_config.url, method=hot_point_rank_config.method, headers=hot_point_rank_config.headers) response_json = response.json() if response_json.get("code") == '0': records = response_json.get("data", []) for record in records: news = self.parse_news_detail(record.get("url")) news_list.append(news) else: resultDomain.code = response_json.get("code") resultDomain.message = f"获取人民日报热点排行失败{response_json.get('message') or ''}" resultDomain.success = False return resultDomain resultDomain.success = True resultDomain.code = 0 logger.info(f"获取人民日报热点排行成功") return resultDomain except Exception as e: logger.error(f"获取人民日报热点排行失败: {str(e)}") resultDomain.code = 0 resultDomain.message = f"获取人民日报热点排行失败{str(e)}" resultDomain.success = False return resultDomain def getOneDayTrendingNews(self, date: datetime) -> ResultDomain: """ 获取人民日报一天内的热点新闻 """ try: resultDomain = ResultDomain(code=0, message="", success=True) news_list = [] resultDomain.dataList = news_list resultDomain.success = True resultDomain.code = 0 logger.info(f"获取人民日报一天内的热点新闻成功") date_str = date.strftime("%Y%m%d") one_day_trending_news_config = self.config.urls.get("one_day_trending_news") one_day_trending_news_config.url = one_day_trending_news_config.url.format(date=date_str) response = self.fetch(one_day_trending_news_config.url, method=one_day_trending_news_config.method, headers=one_day_trending_news_config.headers) if not response: logger.error(f"获取响应失败: {one_day_trending_news_config.url}") resultDomain.code = 0 resultDomain.message = f"获取响应失败{one_day_trending_news_config.url or ''}" resultDomain.success = False return resultDomain soup = self.parse_html(response.content) if not soup: logger.error(f"解析HTML失败: {one_day_trending_news_config.url}") resultDomain.code = 0 resultDomain.message = f"解析HTML失败{one_day_trending_news_config.url or ''}" resultDomain.success = False return resultDomain all_doc_urls = [] bg01 = soup.find('td', class_="bg01") indexfont13 = bg01.find('td', class_='indexfont13') # 获取该 td 下的所有 a 标签 a_tags = indexfont13.find_all('a') all_doc_urls.extend(a_tags) bg02 = soup.find('td', class_="bg02") p6 = bg02.find('td', class_='p6') a_tags_p6 = p6.find_all('a') all_doc_urls.extend(a_tags_p6) for a_tag in all_doc_urls: news = self.parse_news_detail(a_tag.get('href')) news_list.append(news) return resultDomain except Exception as e: logger.error(f"获取人民日报一天内的热点新闻失败: {str(e)}") resultDomain.code = 0 resultDomain.message = f"获取人民日报一天内的热点新闻失败{str(e)}" resultDomain.success = False return resultDomain def getDaysTrendingNews(self, start_date: datetime, end_date: datetime) -> ResultDomain: """ 获取人民日报多天内的热点新闻 """ try: resultDomain = ResultDomain(code=0,message="", success=True) news_list = [] resultDomain.dataList = news_list resultDomain.success = True resultDomain.code = 0 resultDomain.message = "获取人民日报多天内的热点新闻成功" current_date = start_date while current_date <= end_date: day_result = self.getOneDayTrendingNews(current_date) if day_result.success and day_result.dataList: news_list.extend(day_result.dataList) current_date += timedelta(days=1) logger.info(f"获取人民日报多天内的热点新闻成功,共 {len(news_list)} 条") return resultDomain except Exception as e: logger.error(f"获取人民日报多天内的热点新闻失败: {str(e)}") resultDomain.code = 0 resultDomain.message = f"获取人民日报多天内的热点新闻失败{str(e)}" resultDomain.success = False return resultDomain def parse_news_detail(self, url: str) -> Optional[NewsItem]: # 从 URL 中提取 category netloc = urlparse(url).netloc category = "gba" if netloc.endswith('.people.com.cn'): category = netloc.split('.')[0] # 从 detail_map 中获取对应的解析函数 print(category) parser_func = self.detail_map.get(category) if parser_func is None: logger.error(f"未找到对应解析器,category={category}, url={url}") return NewsItem( url=url, executeStatus=0, executeMessage=f"不支持的新闻类型: {category}" ) # 调用对应的解析方法(注意:这些方法是实例方法,需通过 self 调用) return parser_func(url) def parse_base_news_detail(self, url: str) -> Optional[NewsItem]: """ 解析人民日报新闻详情 Args: url: 新闻详情页URL Returns: 新闻对象 """ try: response = self.fetch(url) news = NewsItem( title="", contentRows=[], # 修复:使用 contents 而不是 content url=url, publishTime="", author="", source="人民网", category="", executeStatus=1, executeMessage="成功解析新闻" ) if not response: logger.error(f"获取响应失败: {url}") news.executeStatus = 0 news.executeMessage = f"获取响应失败: {url}" return news # BeautifulSoup 可以自动检测并解码编码,直接传入字节数据即可 # 它会从 HTML 的 标签或响应头自动检测编码 soup = self.parse_html(response.content) if not soup: logger.error("解析HTML失败") news.executeStatus = 0 news.executeMessage = f"解析HTML失败" return news # 提取主内容区域 main_div = soup.select_one("div.layout.rm_txt.cf") if not main_div: logger.error("未找到主内容区域") news.executeStatus = 0 news.executeMessage = f"未找到主内容区域" return news # 提取文章区域 article_div = main_div.select_one("div.col.col-1") if not article_div: logger.error("未找到文章区域") news.executeStatus = 0 news.executeMessage = f"未找到文章区域" return news # 提取标题 title_tag = article_div.select_one("h1") title = title_tag.get_text(strip=True) if title_tag else "" # 提取作者 author_tag = article_div.select_one("div.author.cf") author = author_tag.get_text(strip=True) if author_tag else "" # 提取发布时间和来源 channel_div = article_div.select_one("div.channel.cf") publish_time = "" source = "" if channel_div: channel_info = channel_div.select_one("div.col-1-1") if channel_info: channel_text = channel_info.get_text(strip=True) # 解析时间格式:2025年11月10日05:51 | 来源:人民网-人民日报 if '|' in channel_text: parts = channel_text.split('|') publish_time = parts[0].strip() if parts else "" # 从 标签中提取来源 source_tag = channel_info.select_one("a") if source_tag: source = source_tag.get_text(strip=True) elif len(parts) > 1 and '来源' in parts[1]: # 如果没有 标签,从文本中提取 source = parts[1].replace('来源:', '').strip() # 提取内容 content_div = article_div.select_one('div.rm_txt_con.cf') contents = [] # 构建一个富文本内容 pList = content_div.find_all("p") # 所有p标签 # 解析p标签 变为quill富文本 for p in pList: # 获取p的class内容 p_style = p.get("style") content = "" tag = "p" if p.find('img'): tag = "img" src = p.find('img').get('src') if src: src = str(src) # 转换为字符串 if not src.startswith("http"): src = self.config.base_url + src content = f"" elif p.find('video'): tag = "video" src = p.find('video').get('src') if src: src = str(src) # 转换为字符串 if not src.startswith("http"): src = self.config.base_url + src content = f"