# 新华网爬虫
from itertools import count
from typing import List, Optional
from bs4 import Tag
from pydantic import InstanceOf
from sqlalchemy import false
from core.ResultDomain import ResultDomain
from crawler.BaseCrawler import BaseCrawler, CrawlerConfig, NewsItem, UrlConfig
from loguru import logger
import re
import chardet
from datetime import datetime, timedelta
from bs4.element import NavigableString
from urllib.parse import urlparse, urlencode, urlunparse
import json
from seleniumwire import webdriver # 注意不是 selenium
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.chrome.service import Service
import platform
from urllib.parse import urlparse, parse_qs
import time
import random
import os
class XxqgCrawler(BaseCrawler):
def __init__(self):
"""初始化学习强国爬虫"""
config = CrawlerConfig(
base_url="https://www.xuexi.cn/",
urls={
"search": UrlConfig(
url="https://static.xuexi.cn/search/online/index.html",
apiurl="https://search.xuexi.cn/api/search",
method="GET",
params={
"query": ""
},
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
'Referer': 'https://www.xuexi.cn/',
'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
),
"important": UrlConfig(
url="https://www.xuexi.cn/98d5ae483720f701144e4dabf99a4a34/5957f69bffab66811b99940516ec8784.html",
method="GET",
params={},
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
'Referer': 'https://www.xuexi.cn/',
'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
)
},
)
super().__init__(config)
# 初始化时创建driver
self.driver = self._init_driver()
def _init_driver(self):
"""初始化并返回Chrome WebDriver实例"""
chrome_options = Options()
# 确保浏览器可见,不使用无头模式
# 或者完全删除这行,因为默认就是有界面模式
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# 确保浏览器可见
chrome_options.add_argument('--start-maximized')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--disable-web-security')
chrome_options.add_argument('--allow-running-insecure-content')
chrome_options.add_argument('--disable-features=VizDisplayCompositor')
# 判断系统类型获取对应的chromedriver路径
chrome_driver_path = 'win/chromedriver.exe'
if platform.system() == 'Linux':
chrome_driver_path = 'linux/chromedriver'
service = Service(executable_path=chrome_driver_path)
driver = None
try:
driver = webdriver.Chrome(service=service, options=chrome_options)
logger.info("Chrome浏览器初始化成功")
except Exception as e:
logger.error(f"Chrome浏览器初始化失败: {str(e)}")
return driver
# 设置隐式等待时间
# driver.implicitly_wait(10)
# 访问主页获取初始Cookie
logger.info("访问主页获取初始Cookie")
try:
driver.get(self.config.base_url)
except Exception as e:
logger.error(f"访问URL失败: {self.config.base_url}, 错误: {str(e)}")
return driver
time.sleep(random.uniform(2, 4))
# 检查是否有验证页面
page_source = driver.page_source
if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source:
logger.warning("检测到验证页面,尝试手动处理验证")
# 尝试等待用户手动处理验证
logger.info("请在30秒内手动完成验证...")
time.sleep(30)
# 刷新页面,检查验证是否完成
driver.refresh()
time.sleep(random.uniform(2, 4))
# 再次检查验证状态
page_source = driver.page_source
if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source:
logger.error("验证未完成,无法继续爬取")
# self.driver.quit()
# self.driver = None
return driver
return driver
def _normalize_url(self, url: str) -> str:
"""
规范化 URL,补全协议和域名
Args:
url: 原始 URL
Returns:
完整的 URL
"""
if not url:
return url
# 已经是完整 URL
if url.startswith("http://") or url.startswith("https://"):
return url
# 协议相对 URL,补充 https:
if url.startswith("//"):
return "https:" + url
# 相对路径,补全域名
return self.config.base_url + url
def search(self, keyword, total=10) -> ResultDomain:
"""搜索新闻"""
search_config = self.config.urls.get("search")
if not self.driver:
logger.error("WebDriver未初始化,无法继续爬取")
return ResultDomain(code=1, message="WebDriver未初始化,无法继续爬取", success=False)
count = 0
url_base_map = {}
url_list = []
news_list = []
resultDomain = ResultDomain(code=0, message="", success=True, dataList=news_list)
def get_search_url():
"""从当前页面提取URL数据"""
nonlocal count
try:
# 等待页面加载完成
# assert self.driver is not None, "WebDriver未初始化"
if self.driver is None:
logger.error("WebDriver未初始化")
return
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div.search-result")))
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div.c-card:not(.c-sc)")))
# 解析HTML搜索结果
home = self.driver.find_element(By.CSS_SELECTOR, "div.home")
search_content = home.find_element(By.CSS_SELECTOR, "div.search-content")
search_result_div = search_content.find_element(By.CSS_SELECTOR, "div.search-result")
item_s = search_result_div.find_elements(By.CSS_SELECTOR, "div.c-card:not(.c-sc)")
for item in item_s:
if count >= total:
break
try:
# 从 a 标签获取 URL
link = item.find_element(By.CSS_SELECTOR, "a[href]")
url = link.get_attribute("href")
# 从 h3 > span.title 获取标题
title = item.find_element(By.CSS_SELECTOR, "h3 span.title").text
# 从 div.time 获取来源和时间
time_element = item.find_element(By.CSS_SELECTOR, "div.time")
time_text = time_element.text.strip()
# 判断是换行符分隔还是空格分隔
if '\n' in time_text:
time_lines = time_text.split('\n')
source = time_lines[0].strip() if len(time_lines) > 0 else ''
publish_time = time_lines[1].strip() if len(time_lines) > 1 else ''
else:
# 空格分隔,使用正则提取日期格式
date_match = re.search(r'\d{4}-\d{2}-\d{2}', time_text)
if date_match:
publish_time = date_match.group()
source = time_text[:date_match.start()].strip()
else:
source = ''
publish_time = time_text
url_base_map[url] = {
'title': title,
'source': source,
'publishTime': publish_time
}
url_list.append(url)
count += 1
except Exception as e:
logger.warning(f"解析某个搜索结果失败: {str(e)}")
continue
logger.info(f"本页提取到 {len(item_s)} 条搜索结果")
except Exception as e:
logger.exception(f"提取URL过程出错: {str(e)}")
# 方式1:初次手动点击按钮进入
logger.info("访问搜索页面并手动点击搜索")
self.driver.get(search_config.url)
time.sleep(2)
home = self.driver.find_element(By.CSS_SELECTOR, "div.home")
logger.info(home)
input_wapper_div = self.driver.find_element(By.CSS_SELECTOR, 'div.search-input-wrapper')
input_div = input_wapper_div.find_element(By.CSS_SELECTOR, 'input.search-type-input-compact')
input_div.send_keys(keyword)
search_btn = input_wapper_div.find_element(By.CSS_SELECTOR, 'button[type="submit"]')
search_btn.click()
time.sleep(2)
# 提取第一页数据
get_search_url()
# 方式2:后续页直接通过URL进入
while count < total:
# 记录提取前的数量
count_before = count
# 构建下一页URL
current_url = self.driver.current_url
qs = urlparse(current_url)
param = parse_qs(qs.query)
current_page = int(param.get('page', ['1'])[0])
param['page'] = [str(current_page + 1)]
new_url = urlunparse((qs.scheme, qs.netloc, qs.path, qs.params, urlencode(param, doseq=True), qs.fragment))
logger.info(f"翻页到第 {current_page + 1} 页")
# 直接访问新页面
self.driver.get(new_url)
time.sleep(2)
# 提取数据
get_search_url()
# 如果本页没有提取到新数据,说明没有更多结果
if count == count_before:
logger.info("本页没有提取到新数据,结束翻页")
break
logger.info(f"共提取 {len(url_list)} 条URL")
# 解析文章详情
for url in url_list:
try:
news_item = self.parse_news_detail(url)
if news_item:
# 如果某些为空,根据url_base_map补齐
if news_item.title is None or news_item.title.strip() == "":
news_item.title = url_base_map[url].get("title", "")
if news_item.publishTime is None or news_item.publishTime.strip() == "":
news_item.publishTime = url_base_map[url].get("publishTime", "")
if news_item.source is None or news_item.source.strip() == "":
news_item.source = url_base_map[url].get("source", "")
news_list.append(news_item)
except Exception as e:
logger.warning(f"解析文章详情失败: {str(e)}")
continue
resultDomain.dataList = news_list
with open("Xxqg_news_list.json", "w", encoding="utf-8") as f:
json.dump([item.model_dump() for item in resultDomain.dataList] if resultDomain.dataList else [], f, ensure_ascii=False, indent=4)
return resultDomain
def parse_news_detail(self, url: str) -> NewsItem:
news_item = NewsItem(title='', contentRows=[], url=url)
if self.driver is None:
return news_item
try:
self.driver.get(url)
article_area_div = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'div.render-detail-article'))
)
except Exception as e:
logger.warning(f"访问文章页失败或未找到文章区域: {url}, {e}")
return news_item
# 基础信息获取
try:
title_div = article_area_div.find_element(By.CSS_SELECTOR, "div.render-detail-title")
news_item.title = title_div.text.strip()
except Exception as e:
logger.warning(f"提取标题失败: {e}")
try:
time_div = article_area_div.find_element(By.CSS_SELECTOR, "div.render-detail-time")
news_item.publishTime = time_div.text.strip()
except Exception as e:
logger.warning(f"提取发布时间失败: {e}")
try:
source_div = article_area_div.find_element(By.CSS_SELECTOR, "div.render-detail-source")
news_item.source = source_div.text.strip().split(":")[1]
except Exception as e:
logger.warning(f"提取来源失败: {e}")
# 获取文章内容区域
try:
article_content_div = article_area_div.find_element(By.CSS_SELECTOR, "div.render-detail-article-content")
except Exception as e:
logger.warning(f"未找到文章内容区域: {e}")
return news_item
# 检查是否有分页
def is_page():
try:
page_div = article_content_div.find_element(By.CSS_SELECTOR, "div.detail-pagination-wrap")
return page_div is not None and page_div.is_displayed()
except:
return False
def get_content_rows():
"""提取文章内容行"""
try:
content_div = article_content_div.find_element(By.CSS_SELECTOR, "div.render-detail-content")
except Exception as e:
logger.warning(f"未找到内容区域: {str(e)}")
return
# 获取所有直接子元素
children = content_div.find_elements(By.XPATH, "./*")
for child in children:
try:
# 获取元素的class属性
class_name = child.get_attribute("class") or ""
# 图片元素
if "article-img" in class_name:
try:
img = child.find_element(By.TAG_NAME, "img")
img_src = img.get_attribute("src")
if img_src:
# 规范化URL
img_src = self._normalize_url(img_src)
# 添加图片标签
news_item.contentRows.append({
"type": "img",
"content": f'
'
})
logger.debug(f"提取图片: {img_src}")
continue
except Exception as e:
logger.warning(f"提取图片失败: {str(e)}")
# 视频元素
if "article-video" in class_name:
try:
video = child.find_element(By.TAG_NAME, "video")
video_src = video.get_attribute("src")
if video_src:
# 规范化URL
video_src = self._normalize_url(video_src)
# 添加视频标签
news_item.contentRows.append({
"type": "video",
"content": f''
})
logger.debug(f"提取视频: {video_src}")
continue
except Exception as e:
logger.warning(f"提取视频失败: {str(e)}")
# 文字元素(作为最后的兜底)
text_content = child.text.strip()
# 过滤空内容
if text_content:
news_item.contentRows.append({
"type": "text",
"content": text_content
})
logger.debug(f"提取文字: {text_content[:50]}...")
except Exception as e:
logger.warning(f"处理内容元素失败: {str(e)}")
continue
get_content_rows()
if is_page():
pass
logger.info(f"解析文章详情完成: {news_item.model_dump()}")
return news_item