# 新华网爬虫 from typing import List, Optional from bs4 import Tag from pydantic import InstanceOf from core.ResultDomain import ResultDomain from crawler.BaseCrawler import BaseCrawler, CrawlerConfig, NewsItem, UrlConfig from loguru import logger import re import chardet from datetime import datetime, timedelta from bs4.element import NavigableString from urllib.parse import urlparse, urlencode import json from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.chrome.service import Service import time import random import os class XhwCrawler(BaseCrawler): def __init__(self): """初始化人民日报爬虫""" config = CrawlerConfig( base_url="https://xhsz.news.cn/", urls={ "search": UrlConfig( url="https://xhsz.news.cn/s", method="GET", params={ "k": "", "action": "", "page": 1 }, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0', 'Referer': 'https://xhsz.news.cn/', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } ), }, ) super().__init__(config) self.search_action_map = { "全部": "index", "热点发布": "news" } # 初始化时创建driver self.driver = self._init_driver() def _init_driver(self): """初始化并返回Chrome WebDriver实例""" chrome_options = Options() # 确保浏览器可见,不使用无头模式 # 或者完全删除这行,因为默认就是有界面模式 chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--disable-blink-features=AutomationControlled') chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) chrome_options.add_experimental_option('useAutomationExtension', False) chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36') # 确保浏览器可见 chrome_options.add_argument('--start-maximized') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--disable-web-security') chrome_options.add_argument('--allow-running-insecure-content') chrome_options.add_argument('--disable-features=VizDisplayCompositor') chrome_options.add_argument('--remote-debugging-port=9222') # 添加调试端口 service = Service(executable_path=r"chromedriver.exe") try: self.driver = webdriver.Chrome(service=service, options=chrome_options) logger.info("Chrome浏览器初始化成功") except Exception as e: logger.error(f"Chrome浏览器初始化失败: {str(e)}") return None # 设置隐式等待时间 self.driver.implicitly_wait(10) # 访问主页获取初始Cookie logger.info("访问主页获取初始Cookie") logger.info(f"准备访问URL: {self.config.base_url}") try: self.driver.get(self.config.base_url) logger.info(f"成功访问URL: {self.config.base_url}") except Exception as e: logger.error(f"访问URL失败: {self.config.base_url}, 错误: {str(e)}") return None time.sleep(random.uniform(2, 4)) # 检查是否有验证页面 page_source = self.driver.page_source if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source: logger.warning("检测到验证页面,尝试手动处理验证") # 尝试等待用户手动处理验证 logger.info("请在30秒内手动完成验证...") time.sleep(30) # 刷新页面,检查验证是否完成 self.driver.refresh() time.sleep(random.uniform(2, 4)) # 再次检查验证状态 page_source = self.driver.page_source if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source: logger.error("验证未完成,无法继续爬取") # self.driver.quit() # self.driver = None return None return self.driver def __del__(self): """析构函数,确保关闭浏览器""" if hasattr(self, 'driver') and self.driver: self.driver.quit() logger.info("浏览器已关闭") def search(self, key:str, total=10, action="news") -> ResultDomain: # 检查driver是否已初始化 if not self.driver: logger.error("WebDriver未初始化,无法继续爬取") resultDomain = ResultDomain(code=1, message="WebDriver未初始化,无法继续爬取", success=False) return resultDomain # 直接使用self.driver news_urls = [] news_list = [] resultDomain = ResultDomain(code=0, message="", success=True, dataList=news_list) # 获取搜索配置 search_config = self.config.urls.get("search") if not search_config: logger.error("未找到搜索URL配置") resultDomain.code = 0 resultDomain.message = "未找到搜索URL配置" resultDomain.success = False return resultDomain pagesize = 10 # 准备搜索参数 search_data = search_config.params.copy() search_data["k"] = key search_data["action"] = action # 获取新闻url url_base_map = {} # 向上取整计算需要的页数 total_pages = (total + pagesize - 1) // pagesize for page in range(1, total_pages + 1): search_data["page"] = page pageHtml = search_config.url + "?" + urlencode(search_data) # 分页的html logger.info(f"请求URL: {pageHtml}") # 使用Selenium访问页面 self.driver.get(pageHtml) time.sleep(random.uniform(2, 4)) # 检查是否有验证页面 if not self.driver: logger.error("WebDriver已失效,无法继续爬取") resultDomain = ResultDomain(code=1, message="WebDriver已失效,无法继续爬取", success=False) return resultDomain page_source = self.driver.page_source if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source: logger.warning("检测到验证页面,尝试手动处理验证") logger.info("请在30秒内手动完成验证...") time.sleep(30) # 检查driver是否仍然有效 if not self.driver: logger.error("WebDriver已失效,无法继续爬取") resultDomain = ResultDomain(code=1, message="WebDriver已失效,无法继续爬取", success=False) return resultDomain self.driver.refresh() time.sleep(random.uniform(2, 4)) # 再次检查验证状态 if not self.driver: logger.error("WebDriver已失效,无法继续爬取") resultDomain = ResultDomain(code=1, message="WebDriver已失效,无法继续爬取", success=False) return resultDomain page_source = self.driver.page_source if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source: logger.error("验证未完成,无法继续爬取") resultDomain = ResultDomain(code=1, message="验证未完成,无法继续爬取", success=False) return resultDomain # 解析页面内容 pageSoup = self.parse_html(page_source) logger.info(f"解析后的HTML内容: {str(pageSoup)[:500]}...") # 只输出前500个字符 # 从分页中获取新闻url searchMainDiv = pageSoup.find("div", class_="page-search-main") if not searchMainDiv: logger.error("未找到搜索主体部分") resultDomain.code = 0 resultDomain.message = "未找到搜索主体部分" resultDomain.success = False return resultDomain searchGroupDiv = searchMainDiv.find("div", class_="page-search-group") if not searchGroupDiv: logger.error("未找到搜索组") resultDomain.code = 0 resultDomain.message = "未找到搜索组" resultDomain.success = False return resultDomain newsDiv = searchGroupDiv.find("div", class_="page-search-news") if not newsDiv: logger.error("未找到新闻列表") resultDomain.code = 0 resultDomain.message = "未找到新闻列表" resultDomain.success = False return resultDomain newsList = newsDiv.find_all("div", class_="group") for news in newsList: news_info = news.find("div.head") news_title = news_info.find("div.title") news_date = news_info.find("div.date").text.strip() url = news_title.find("a").get("href") url_base_map[url] = {"title": news_title.get_text(strip=True), "date": news_date} news_urls.append(url) # 临时保存url到url.json with open("url.json", "w", encoding="utf-8") as f: json.dump(url_base_map, f, ensure_ascii=False, indent=4) # 从新闻url中获取新闻详情 for news_url in news_urls: news = self.parse_news_detail(news_url) news.title = url_base_map.get(news_url, {}).get("title") news.publishTime = url_base_map.get(news_url, {}).get("date") news_list.append(news) # 临时保存新闻到news.json with open("news.json", "w", encoding="utf-8") as f: json.dump(news_list, f, ensure_ascii=False, indent=4) # 关闭浏览器 if self.driver: self.driver.quit() logger.info("浏览器已关闭") return resultDomain def parse_news_detail(self, url: str) -> Optional[NewsItem]: return self.parse_xhsz_news_detail_selenium(url) def parse_xhsz_news_detail_selenium(self, url: str) -> Optional[NewsItem]: # 检查driver是否已初始化 if not self.driver: logger.error("WebDriver未初始化,无法获取新闻详情") return None newsItem = NewsItem(title="", contentRows=[], url=url) # 使用Selenium访问新闻详情页 self.driver.get(url) time.sleep(random.uniform(2, 4)) # 检查是否有验证页面 if not self.driver: logger.error("WebDriver已失效,无法获取新闻详情") return None page_source = self.driver.page_source if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source: logger.warning("检测到验证页面,尝试手动处理验证") logger.info("请在30秒内手动完成验证...") time.sleep(30) # 检查driver是否仍然有效 if not self.driver: logger.error("WebDriver已失效,无法获取新闻详情") return None self.driver.refresh() time.sleep(random.uniform(2, 4)) # 再次检查验证状态 if not self.driver: logger.error("WebDriver已失效,无法获取新闻详情") return None page_source = self.driver.page_source if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source: logger.error("验证未完成,无法获取新闻详情") return None # 解析页面内容 newsDetailSoup = self.parse_html(page_source) # 查找新闻主体部分 main_div = newsDetailSoup.find("div.page-news-detail") if not main_div: logger.error(f"未找到新闻主体部分: {url}") return None article_div = main_div.find("div.page-news-l") if not article_div: logger.error(f"未找到新闻文章部分: {url}") return None # 获取标题 title_div = article_div.find("div.page-news-detail-title") if title_div: newsItem.title = title_div.text.strip() # 获取新闻元信息 channal_div = article_div.find("div.page-news-detail-note") if channal_div: channal_items_div = channal_div.find_all("div.item") for item in channal_items_div: text = item.text.strip() if "来源" in text: parts = text.split(":", 1) if len(parts) > 1: newsItem.source = parts[1].strip() elif "发布时间" in text: parts = text.split(":", 1) if len(parts) > 1: newsItem.publishTime = parts[1].strip() elif "浏览人数" in text: parts = text.split(":", 1) if len(parts) > 1: newsItem.viewCount = parts[1].strip() # 获取新闻内容 content_div = article_div.find("div.page-news-detail-content") if content_div: # 遍历内容区域中的所有元素 for child in content_div.children: if not isinstance(child, Tag): continue # 处理图片 if child.name == "p" and child.find("img"): img_tag = child.find("img") if img_tag: src = str(img_tag.get("src")) img_tag["src"] = self._normalize_url(src) newsItem.contentRows.append({ "tag": "img", "content": str(img_tag) }) # 处理视频 elif child.find("video"): video_tag = child.find("video") if video_tag: src = str(video_tag.get("src")) video_tag["src"] = self._normalize_url(src) newsItem.contentRows.append({ "tag": "video", "content": str(video_tag) }) # 处理普通段落 elif child.name == "p" and child.get_text(strip=True): newsItem.contentRows.append({ "tag": "p", "content": child.get_text(strip=True) }) return newsItem def parse_xhsz_news_detail(self, url: str) -> Optional[NewsItem]: newsItem = NewsItem(title="", contentRows=[], url=url) response = self.fetch(url) newsDetailSoup = self.parse_html(response.content) main_div = newsDetailSoup.find("div.page-news-detail") article_div = main_div.find("div.page-news-l") title_div = article_div.find("div.page-news-detail-title") channal_div = article_div.find("div.page-news-detail-note") content_div = article_div.find("div.page-news-detail-content") # 获取新闻标题 newsItem.title = title_div.text.strip() # 获取新闻来源、发布时间、浏览人数 channal_items_div = channal_div.find_all("div.item") if("来源" in channal_items_div[0].text): newsItem.source = channal_items_div[0].text.strip().split(":")[1] if("发布时间" in channal_items_div[1].text): newsItem.publishTime = channal_items_div[1].text.strip().split(":")[1] if("浏览人数" in channal_items_div[2].text): newsItem.viewCount = channal_items_div[2].text.strip().split(":")[1] for child in content_div.children: if not isinstance(child, Tag): continue img_tag = child.find("img") video_tag = child.find("video") tag = "p" content = str(child) if img_tag: # 是图片 tag = "img" src = str(img_tag.get("src")) if src: img_tag["src"] = self._normalize_url(src) content = str(img_tag) elif video_tag: # 是视频 tag = "video" src = str(video_tag.get("src")) if src: video_tag["src"] = self._normalize_url(src) content = str(video_tag) newsItem.contentRows.append({"tag": tag, "content": content}) return newsItem def _normalize_url(self, url: str) -> str: """ 规范化 URL,补全协议和域名 Args: url: 原始 URL Returns: 完整的 URL """ if not url: return url # 已经是完整 URL if url.startswith("http://") or url.startswith("https://"): return url # 协议相对 URL,补充 https: if url.startswith("//"): return "https:" + url # 相对路径,补全域名 return self.config.base_url + url