Files
schoolNews/schoolNewsCrawler/crawler/xhw/XhwCrawler.py

501 lines
21 KiB
Python
Raw Normal View History

2025-11-19 16:04:50 +08:00
# 新华网爬虫
from itertools import count
2025-11-19 16:04:50 +08:00
from typing import List, Optional
2025-11-19 19:05:31 +08:00
from bs4 import Tag
from pydantic import InstanceOf
2025-11-19 16:04:50 +08:00
from core.ResultDomain import ResultDomain
from crawler.BaseCrawler import BaseCrawler, CrawlerConfig, NewsItem, UrlConfig
from loguru import logger
import re
import chardet
from datetime import datetime, timedelta
from bs4.element import NavigableString
2025-11-19 16:41:41 +08:00
from urllib.parse import urlparse, urlencode
2025-11-19 16:04:50 +08:00
import json
2025-11-19 19:05:31 +08:00
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.chrome.service import Service
import time
import random
import os
2025-11-19 16:04:50 +08:00
class XhwCrawler(BaseCrawler):
def __init__(self):
"""初始化人民日报爬虫"""
config = CrawlerConfig(
base_url="https://xhsz.news.cn/",
urls={
"search": UrlConfig(
url="https://xhsz.news.cn/s",
2025-11-19 16:41:41 +08:00
method="GET",
2025-11-19 16:04:50 +08:00
params={
"k": "",
2025-11-19 16:41:41 +08:00
"action": "",
"page": 1
2025-11-19 16:04:50 +08:00
},
headers={
2025-11-19 19:05:31 +08:00
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
'Referer': 'https://xhsz.news.cn/',
'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
2025-11-19 16:04:50 +08:00
}
),
},
)
super().__init__(config)
2025-11-19 16:41:41 +08:00
self.search_action_map = {
"全部": "index",
"热点发布": "news"
}
2025-11-19 19:05:31 +08:00
# 初始化时创建driver
self.driver = self._init_driver()
def _init_driver(self):
"""初始化并返回Chrome WebDriver实例"""
chrome_options = Options()
# 确保浏览器可见,不使用无头模式
# 或者完全删除这行,因为默认就是有界面模式
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# 确保浏览器可见
chrome_options.add_argument('--start-maximized')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--disable-web-security')
chrome_options.add_argument('--allow-running-insecure-content')
chrome_options.add_argument('--disable-features=VizDisplayCompositor')
service = Service(executable_path=r"chromedriver.exe")
driver = None
2025-11-19 19:05:31 +08:00
try:
driver = webdriver.Chrome(service=service, options=chrome_options)
2025-11-19 19:05:31 +08:00
logger.info("Chrome浏览器初始化成功")
except Exception as e:
logger.error(f"Chrome浏览器初始化失败: {str(e)}")
return None
# 设置隐式等待时间
# driver.implicitly_wait(10)
2025-11-19 19:05:31 +08:00
# 访问主页获取初始Cookie
logger.info("访问主页获取初始Cookie")
logger.info(f"准备访问URL: {self.config.base_url}")
try:
driver.get(self.config.base_url)
2025-11-19 19:05:31 +08:00
logger.info(f"成功访问URL: {self.config.base_url}")
except Exception as e:
logger.error(f"访问URL失败: {self.config.base_url}, 错误: {str(e)}")
return None
time.sleep(random.uniform(2, 4))
# 检查是否有验证页面
page_source = driver.page_source
2025-11-19 19:05:31 +08:00
if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source:
logger.warning("检测到验证页面,尝试手动处理验证")
# 尝试等待用户手动处理验证
logger.info("请在30秒内手动完成验证...")
time.sleep(30)
# 刷新页面,检查验证是否完成
driver.refresh()
2025-11-19 19:05:31 +08:00
time.sleep(random.uniform(2, 4))
# 再次检查验证状态
page_source = driver.page_source
2025-11-19 19:05:31 +08:00
if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source:
logger.error("验证未完成,无法继续爬取")
# self.driver.quit()
# self.driver = None
return None
return driver
2025-11-19 19:05:31 +08:00
def parse_news_detail(self, url: str) -> Optional[NewsItem]:
return self.parse_xhsz_news_detail(url)
2025-11-19 19:05:31 +08:00
def parse_xhsz_news_detail(self, url: str) -> NewsItem:
"""
使用Selenium解析新华网新闻详情页
异常局部捕获保证返回 NewsItem 对象即使部分内容解析失败
"""
news_item = NewsItem(title="", contentRows=[], url=url)
2025-11-19 19:05:31 +08:00
if not self.driver:
logger.error("WebDriver未初始化无法获取新闻详情")
return news_item
2025-11-19 19:05:31 +08:00
try:
self.driver.get(url)
time.sleep(2)
except Exception as e:
logger.warning(f"访问新闻详情页失败: {url}, {e}")
return news_item
# 滑动验证处理
try:
sliders = self.driver.find_elements(By.CSS_SELECTOR, ".handler.handler_bg")
if sliders:
slider = sliders[0]
action_chain = ActionChains(self.driver)
action_chain.click_and_hold(slider).perform()
distance = 1000
tracks = [distance*0.2, distance*0.3, distance*0.25, distance*0.25]
for track in tracks:
action_chain.move_by_offset(int(track), 0).pause(1)
action_chain.perform()
action_chain.release().perform()
time.sleep(2)
except Exception as e:
logger.info(f"滑块验证处理失败或未出现: {e}")
2025-11-19 19:05:31 +08:00
final_url = self.driver.current_url
if final_url != url:
news_item = self.parse_xh_news_detail(final_url)
news_item.url = url
return news_item
2025-11-19 19:05:31 +08:00
# 新闻主体
try:
main_div = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div.page-news-detail"))
)
except Exception as e:
logger.warning(f"未找到新闻主体: {url}, {e}")
return news_item
2025-11-19 19:05:31 +08:00
try:
article_div = main_div.find_element(By.CSS_SELECTOR, "div.page-news-l")
except:
logger.warning(f"未找到文章主体: {url}")
return news_item
2025-11-19 19:05:31 +08:00
# 标题
try:
title_div = article_div.find_element(By.CSS_SELECTOR, "div.page-news-detail-title")
news_item.title = title_div.text.strip()
except:
pass
2025-11-19 19:05:31 +08:00
# 新闻元信息
try:
channal_div = article_div.find_element(By.CSS_SELECTOR, "div.page-news-detail-note")
channal_items = channal_div.find_elements(By.CSS_SELECTOR, "div.item")
for item in channal_items:
2025-11-19 19:05:31 +08:00
text = item.text.strip()
if "来源" in text:
news_item.source = text.split("", 1)[-1].strip()
2025-11-19 19:05:31 +08:00
elif "发布时间" in text:
news_item.publishTime = text.split("", 1)[-1].strip()
2025-11-19 19:05:31 +08:00
elif "浏览人数" in text:
try:
news_item.viewCount = int(text.split("", 1)[-1].strip())
except:
pass
except:
pass
# 内容
try:
content_div = article_div.find_element(By.CSS_SELECTOR, "div.page-news-detail-content")
children = content_div.find_elements(By.XPATH, "./*")
for child in children:
try:
tag_name = child.tag_name.lower()
if tag_name == "p":
text = child.text.strip().replace("\xa0", "")
if not text and len(child.find_elements(By.TAG_NAME, "img")) == 0:
continue
# 图片
try:
img = child.find_element(By.TAG_NAME, "img")
src = img.get_attribute("src")
if src and not src.startswith("http"):
src = self.config.base_url + src
news_item.contentRows.append({"tag": "img", "content": f"<img src='{src}' />"})
continue
except:
pass
# 视频
try:
video = child.find_element(By.TAG_NAME, "video")
src = video.get_attribute("src")
if src and not src.startswith("http"):
src = self.config.base_url + src
news_item.contentRows.append({"tag": "video", "content": f"<video src='{src}' />"})
continue
except:
pass
# 普通段落
news_item.contentRows.append({"tag": "p", "content": child.get_attribute("outerHTML")})
elif tag_name in ["img", "video"]:
news_item.contentRows.append({"tag": tag_name, "content": child.get_attribute("outerHTML")})
except Exception as e:
logger.warning(f"解析段落失败: {e}")
2025-11-19 19:05:31 +08:00
continue
except:
logger.warning(f"新闻内容解析失败: {url}")
2025-11-19 19:05:31 +08:00
return news_item
def parse_xh_news_detail(self, url: str) -> NewsItem:
"""
使用Selenium解析新华网新闻详情页
异常局部捕获保证返回 NewsItem 对象即使部分内容解析失败
"""
news_item = NewsItem(title="", contentRows=[], url=url)
if not self.driver:
logger.error("WebDriver未初始化无法获取新闻详情")
return news_item
try:
self.driver.get(url)
time.sleep(2)
except Exception as e:
logger.warning(f"访问新闻详情页失败: {url}, {e}")
return news_item
2025-11-19 19:05:31 +08:00
# 滑动验证处理
try:
sliders = self.driver.find_elements(By.CSS_SELECTOR, ".handler.handler_bg")
if sliders:
slider = sliders[0]
action_chain = ActionChains(self.driver)
action_chain.click_and_hold(slider).perform()
distance = 1000
tracks = [distance*0.2, distance*0.3, distance*0.25, distance*0.25]
for track in tracks:
action_chain.move_by_offset(int(track), 0).pause(1)
action_chain.perform()
action_chain.release().perform()
time.sleep(2)
except Exception as e:
logger.info(f"滑块验证处理失败或未出现: {e}")
2025-11-19 19:05:31 +08:00
# head
head_div = self.driver.find_element(By.CSS_SELECTOR, "div.header.domPC")
time_div = head_div.find_element(By.CSS_SELECTOR, "div.header-time.left")
datetimes = time_div.find_element(By.CSS_SELECTOR, "span.year").text+"/"+time_div.find_element(By.CSS_SELECTOR, "span.day").text+" "+time_div.find_element(By.CSS_SELECTOR, "span.time").text
news_item.publishTime = str(datetime.strptime(datetimes, "%Y/%m/%d %H:%M:%S"))
source = head_div.find_element(By.CSS_SELECTOR, "div.source").text.split("")[1]
news_item.source = source
title = head_div.find_element(By.CSS_SELECTOR, "h1").text
news_item.title = title
# 内容
try:
article_div = self.driver.find_element(By.CSS_SELECTOR, "div.main.clearfix")
content_div = article_div.find_element(By.CSS_SELECTOR, "div#detail span#detailContent")
children = content_div.find_elements(By.XPATH, "./*")
for child in children:
try:
tag_name = child.tag_name.lower()
if tag_name == "p":
text = child.text.strip().replace("\xa0", "")
if not text and len(child.find_elements(By.TAG_NAME, "img")) == 0:
continue
# 图片
try:
img = child.find_element(By.TAG_NAME, "img")
src = img.get_attribute("src")
if src and not src.startswith("http"):
src = self.config.base_url + src
news_item.contentRows.append({"tag": "img", "content": f"<img src='{src}' />"})
continue
except:
pass
# 视频
try:
video = child.find_element(By.TAG_NAME, "video")
src = video.get_attribute("src")
if src and not src.startswith("http"):
src = self.config.base_url + src
news_item.contentRows.append({"tag": "video", "content": f"<video src='{src}' />"})
continue
except:
pass
# 普通段落
news_item.contentRows.append({"tag": "p", "content": child.get_attribute("outerHTML")})
elif tag_name in ["img", "video"]:
news_item.contentRows.append({"tag": tag_name, "content": child.get_attribute("outerHTML")})
except Exception as e:
logger.warning(f"解析段落失败: {e}")
continue
except:
logger.warning(f"新闻内容解析失败: {url}")
return news_item
2025-11-19 19:05:31 +08:00
def _normalize_url(self, url: str) -> str:
"""
规范化 URL补全协议和域名
Args:
url: 原始 URL
Returns:
完整的 URL
"""
if not url:
return url
# 已经是完整 URL
if url.startswith("http://") or url.startswith("https://"):
return url
# 协议相对 URL补充 https:
if url.startswith("//"):
return "https:" + url
# 相对路径,补全域名
return self.config.base_url + url
def search(self, key:str, total=10, action="news") -> ResultDomain:
# 检查driver是否已初始化
if not self.driver:
logger.error("WebDriver未初始化无法继续爬取")
return ResultDomain(code=1, message="WebDriver未初始化无法继续爬取", success=False)
news_urls = []
news_list = []
resultDomain = ResultDomain(code=0, message="", success=True, dataList=news_list)
# 获取搜索配置
search_config = self.config.urls.get("search")
if not search_config:
logger.error("未找到搜索URL配置")
resultDomain.code = 0
resultDomain.message = "未找到搜索URL配置"
resultDomain.success = False
return resultDomain
pagesize = 10
search_data = search_config.params.copy()
search_data["k"] = key
search_data["action"] = action
try:
# 获取新闻url
url_base_map = {}
total_pages = (total + pagesize - 1) // pagesize
for page in range(1, total_pages + 1):
search_data["page"] = page
pageHtml = search_config.url + "?" + urlencode(search_data)
logger.info(f"请求URL: {pageHtml}")
# 使用Selenium访问页面
try:
self.driver.get(pageHtml)
time.sleep(2)
except Exception as e:
logger.warning(f"访问搜索页失败: {pageHtml}, {e}")
continue
# 滑动验证处理
try:
sliders = self.driver.find_elements(By.CSS_SELECTOR, ".handler.handler_bg")
if sliders:
slider = sliders[0]
action_chain = ActionChains(self.driver)
action_chain.click_and_hold(slider).perform()
distance = 1000
tracks = [distance*0.2, distance*0.3, distance*0.25, distance*0.25]
for track in tracks:
action_chain.move_by_offset(int(track), 0).pause(1)
action_chain.release().perform()
time.sleep(5)
except Exception as e:
logger.info(f"滑动验证处理失败或未出现: {e}")
# 提取新闻列表
try:
search_main = self.driver.find_element(By.CSS_SELECTOR, "div.page-search-main")
search_group = search_main.find_element(By.CSS_SELECTOR, "div.page-search-group")
news_div = search_group.find_element(By.CSS_SELECTOR, "div.page-search-news")
news_items = news_div.find_elements(By.CSS_SELECTOR, "div.group")
for news in news_items:
try:
head = news.find_element(By.CSS_SELECTOR, "div.head")
title_div = head.find_element(By.CSS_SELECTOR, "div.title")
date_div = head.find_element(By.CSS_SELECTOR, "div.date")
a_tag = title_div.find_element(By.TAG_NAME, "a")
news_url = a_tag.get_attribute("href")
news_title = a_tag.text.strip()
news_date = date_div.text.strip()
url_base_map[news_url] = {"title": news_title, "date": news_date}
news_urls.append(news_url)
except Exception as e:
logger.warning(f"提取单条新闻URL失败: {e}")
except Exception as e:
logger.warning(f"提取新闻列表失败: {e}")
continue
# 从新闻url中获取新闻详情
count = 0
for news_url in news_urls:
try:
news = self.parse_news_detail(news_url)
if news:
news.title = url_base_map.get(news_url, {}).get("title") or news.title
news.publishTime = url_base_map.get(news_url, {}).get("date") or news.publishTime
news_list.append(news)
count += 1
if count >= total:
break
except Exception as e:
logger.warning(f"解析新闻失败: {news_url}, {e}")
continue
except Exception as e:
logger.error(f"搜索过程整体异常: {e}")
resultDomain.success = False
resultDomain.code = 0
resultDomain.message = "爬取失败"
# 最终保证返回 dataList
resultDomain.dataList = news_list
resultDomain.success = bool(news_list)
return resultDomain
def close(self):
if hasattr(self, 'driver') and self.driver:
try:
self.driver.quit()
logger.info("浏览器已关闭")
except Exception as e:
logger.warning(f"关闭浏览器失败: {str(e)}")
self.driver = None