# 人民日报爬虫 from typing import List, Optional from core import ResultDomain from crawler.BaseCrawler import BaseCrawler, CrawlerConfig, NewsItem, UrlConfig from loguru import logger import re import chardet from datetime import datetime class RmrbCrawler(BaseCrawler): """人民日报新闻爬虫""" def __init__(self): """初始化人民日报爬虫""" config = CrawlerConfig( base_url="http://www.people.com.cn", urls={ "search": UrlConfig( url="http://search.people.cn/search-platform/front/search", method="POST", params={ "key": "", "page": 1, "limit": 10, "hasTitle": True, "hasContent": True, "isFuzzy": True, "type": 0, # 0 所有,1 新闻,2 互动,3 报刊,4 图片,5 视频 "sortType": 2, # 1 按相关度,2 按时间 "startTime": 0, "endTime": 0 }, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Content-Type': 'application/json;charset=UTF-8' } ), "hot_point_rank": UrlConfig( url="http://search.people.cn/search-platform/front/searchRank", method="GET", params={}, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36', 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Content-Type': 'application/json;charset=UTF-8' } ), "one_day_trending_news": UrlConfig( url=lambda date: f"http://www.people.com.cn/GB/59476/review/{date}.html", # date:YYYYMMdd method="GET", params={}, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9', } ) }, ) super().__init__(config) def search(self, key: str, total: int = 10, news_type: int = 0) -> ResultDomain: """ 搜索人民日报新闻 Args: key: 搜索关键词 total: 总数量 news_type: 新闻类型 (0=全部, 1=新闻, 2=互动, 3=报刊, 4=图片, 5=视频) Returns: 新闻列表 """ try: resultDomain = ResultDomain() news_list = [] resultDomain.dataList = news_list # 获取搜索配置 search_config = self.config.urls.get("search") if not search_config: logger.error("未找到搜索URL配置") resultDomain.code = 0 resultDomain.message = "未找到搜索URL配置" resultDomain.success = False return resultDomain page = 1 limit = 10 # 准备搜索参数 search_data = search_config.params.copy() search_data["key"] = key search_data["limit"] = limit search_data["type"] = news_type while len(news_list) < total: search_data["page"] = page response = self.fetch(search_config.url, method=search_config.method, data=search_data, headers=search_config.headers) response_json = response.json() if response_json.get("code") == 0: records = response_json.get("data", {}).get("records", []) for record in records: news = self.parse_news_detail(record.get("url")) news_list.append(news) else: resultDomain.code = response_json.get("code") resultDomain.message = "获取搜索结果失败" + response_json.get("message") resultDomain.success = False return resultDomain page += 1 resultDomain.success = True resultDomain.code = 0 logger.info(f"搜索到 {len(news_list)} 条新闻") return resultDomain except Exception as e: logger.error(f"搜索人民日报新闻失败: {str(e)}") resultDomain.code = 0 resultDomain.message = "搜索人民日报新闻失败" + str(e) resultDomain.success = False return resultDomain def hotPointRank(self) -> ResultDomain: """ 获取人民日报热点排行 """ try: hot_point_rank_config = self.config.urls.get("hot_point_rank") resultDomain = ResultDomain() news_list = [] resultDomain.dataList = news_list if not hot_point_rank_config: logger.error("未找到热点排行URL配置") resultDomain.code = 0 resultDomain.message = "未找到热点排行URL配置" resultDomain.success = False return resultDomain response = self.fetch(hot_point_rank_config.url, method=hot_point_rank_config.method, headers=hot_point_rank_config.headers) response_json = response.json() if response_json.get("code") == 0: records = response_json.get("data", []) for record in records: news = self.parse_news_detail(record.get("url")) news_list.append(news) else: resultDomain.code = response_json.get("code") resultDomain.message = "获取人民日报热点排行失败" + response_json.get("message") resultDomain.success = False return resultDomain resultDomain.success = True resultDomain.code = 0 logger.info(f"获取人民日报热点排行成功") return resultDomain except Exception as e: logger.error(f"获取人民日报热点排行失败: {str(e)}") resultDomain.code = 0 resultDomain.message = "获取人民日报热点排行失败" + str(e) resultDomain.success = False return resultDomain def getOneDayTrendingNews(self, date: datetime) -> ResultDomain: """ 获取人民日报一天内的热点新闻 """ try: resultDomain = ResultDomain() news_list = [] resultDomain.dataList = news_list resultDomain.success = True resultDomain.code = 0 logger.info(f"获取人民日报一天内的热点新闻成功") date_str = date.strftime("%Y%m%d") one_day_trending_news_config = self.config.urls.get("one_day_trending_news") one_day_trending_news_config.url = one_day_trending_news_config.url(date_str) response = self.fetch(one_day_trending_news_config.url, method=one_day_trending_news_config.method, headers=one_day_trending_news_config.headers) if not response: logger.error(f"获取响应失败: {one_day_trending_news_config.url}") resultDomain.code = 0 resultDomain.message = "获取响应失败" + one_day_trending_news_config.url resultDomain.success = False return resultDomain soup = self.parse_html(response.content) if not soup: logger.error(f"解析HTML失败: {one_day_trending_news_config.url}") resultDomain.code = 0 resultDomain.message = "解析HTML失败" + one_day_trending_news_config.url resultDomain.success = False return resultDomain all_doc_urls = [] all_doc_urls.extend(a_tags) bg01 = soup.find('td', class_="bg01") indexfont13 = bg01.find('td', class_='indexfont13') # 获取该 td 下的所有 a 标签 a_tags = indexfont13.find_all('a') bg02 = soup.find('td', class_="bg02") p6 = bg02.find('td', class_='p6') a_tags_p6 = p6.find_all('a') all_doc_urls.extend(a_tags_p6) for a_tag in all_doc_urls: news = self.parse_news_detail(a_tag.get('href')) news_list.append(news) return resultDomain except Exception as e: logger.error(f"获取人民日报一天内的热点新闻失败: {str(e)}") resultDomain.code = 0 resultDomain.message = "获取人民日报一天内的热点新闻失败" + str(e) resultDomain.success = False return resultDomain def getDaysTrendingNews(self, start_date: datetime, end_date: datetime) -> ResultDomain: """ 获取人民日报多天内的热点新闻 """ try: resultDomain = ResultDomain() news_list = [] resultDomain.dataList = news_list resultDomain.success = True resultDomain.code = 0 resultDomain.message = "获取人民日报多天内的热点新闻成功" for date in range(start_date, end_date): resultDomain = self.getOneDayTrendingNews(date) if not resultDomain.success: continue news_list.extend(resultDomain.dataList) logger.info(f"获取人民日报多天内的热点新闻成功") return resultDomain except Exception as e: logger.error(f"获取人民日报多天内的热点新闻失败: {str(e)}") resultDomain.code = 0 resultDomain.message = "获取人民日报多天内的热点新闻失败" + str(e) resultDomain.success = False return resultDomain def parse_news_detail(self, url: str) -> Optional[NewsItem]: """ 解析人民日报新闻详情 Args: url: 新闻详情页URL Returns: 新闻对象 """ try: response = self.fetch(url) if not response: logger.error(f"获取响应失败: {url}") return None # BeautifulSoup 可以自动检测并解码编码,直接传入字节数据即可 # 它会从 HTML 的 标签或响应头自动检测编码 soup = self.parse_html(response.content) if not soup: logger.error("解析HTML失败") return None # 提取主内容区域 main_div = soup.find("div", class_="layout rm_txt cf") if not main_div: logger.error("未找到主内容区域") return None # 提取文章区域 article_div = main_div.find("div", class_="col col-1") if not article_div: logger.error("未找到文章区域") return None # 提取标题 title_tag = article_div.select_one("h1") title = title_tag.get_text(strip=True) if title_tag else "" # 提取作者 author_tag = article_div.select_one("div.author.cf") author = author_tag.get_text(strip=True) if author_tag else "" # 提取发布时间和来源 channel_div = article_div.select_one("div.channel.cf") publish_time = "" source = "" if channel_div: channel_info = channel_div.select_one("div.col-1-1") if channel_info: channel_text = channel_info.get_text(strip=True) # 解析时间格式:2025年11月10日05:51 | 来源:人民网-人民日报 if '|' in channel_text: parts = channel_text.split('|') publish_time = parts[0].strip() if parts else "" # 从 标签中提取来源 source_tag = channel_info.select_one("a") if source_tag: source = source_tag.get_text(strip=True) elif len(parts) > 1 and '来源' in parts[1]: # 如果没有 标签,从文本中提取 source = parts[1].replace('来源:', '').strip() # 提取内容 content_div = article_div.select_one('div.rm_txt_con.cf') contents = [] # 构建一个富文本内容 pList = content_div.find_all("p") # 所有p标签 # 解析p标签 变为quill富文本 for p in pList: # 获取p的class内容 p_style = p.get("style") content = "" tag = "p" if p.find('img'): tag = "img" src = p.find('img').get('src') if not src.startswith("http") and src: src = self.config.base_url + src content = f"" elif p.find('video'): tag = "video" src = p.find('video').get('src') if not src.startswith("http") and src: src = self.config.base_url + src content = f"