# 新华网爬虫
from itertools import count
from typing import List, Optional
from bs4 import Tag
from pydantic import InstanceOf
from sqlalchemy import false
from core.ResultDomain import ResultDomain
from crawler.BaseCrawler import BaseCrawler, CrawlerConfig, NewsItem, UrlConfig
from loguru import logger
import re
import chardet
from datetime import datetime, timedelta
from bs4.element import NavigableString
from urllib.parse import urlparse, urlencode, urlunparse
import json
from seleniumwire import webdriver # 注意不是 selenium
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.chrome.service import Service
import platform
from urllib.parse import urlparse, parse_qs
import time
import random
import os
class XxqgCrawler(BaseCrawler):
def __init__(self):
"""初始化学习强国爬虫"""
config = CrawlerConfig(
base_url="https://www.xuexi.cn/",
urls={
"search": UrlConfig(
url="https://static.xuexi.cn/search/online/index.html",
apiurl="https://search.xuexi.cn/api/search",
method="GET",
params={
"query": ""
},
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
'Referer': 'https://www.xuexi.cn/',
'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
),
"important": UrlConfig(
url="https://www.xuexi.cn/98d5ae483720f701144e4dabf99a4a34/5957f69bffab66811b99940516ec8784.html",
method="GET",
params={
"path": "98d5ae483720f701144e4dabf99a4a34/5957f69bffab66811b99940516ec8784"
},
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
'Referer': 'https://www.xuexi.cn/',
'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
),
"xuexishiping":UrlConfig(
url="https://www.xuexi.cn/d05cad69216e688d304bb91ef3aac4c6/9a3668c13f6e303932b5e0e100fc248b.html",
method="GET",
params={
"path": "d05cad69216e688d304bb91ef3aac4c6/9a3668c13f6e303932b5e0e100fc248b"
},
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
'Referer': 'https://www.xuexi.cn/',
'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
),
"zonghexinwen": UrlConfig(
url="https://www.xuexi.cn/7097477a9643eacffe4cc101e4906fdb/9a3668c13f6e303932b5e0e100fc248b.html",
method="GET",
params={
"path": "7097477a9643eacffe4cc101e4906fdb/9a3668c13f6e303932b5e0e100fc248b"
},
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
'Referer': 'https://www.xuexi.cn/',
'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
),
"zhongxuanbu": UrlConfig(
url="https://www.xuexi.cn/105c2fa2843fa9e6d17440e172115c92/9a3668c13f6e303932b5e0e100fc248b.html",
method="GET",
params={
"path": "105c2fa2843fa9e6d17440e172115c92/9a3668c13f6e303932b5e0e100fc248b"
},
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
'Referer': 'https://www.xuexi.cn/',
'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
),
},
)
super().__init__(config)
# 初始化时创建driver
self.driver = self._init_driver()
def _init_driver(self):
"""初始化并返回Chrome WebDriver实例(无头模式)"""
chrome_options = Options()
# 无头模式配置
chrome_options.add_argument('--headless=new') # 使用新的headless模式
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu')
# 设置窗口大小(headless模式必需)
chrome_options.add_argument('--window-size=1920,1080')
# 反检测配置
chrome_options.add_argument('--disable-blink-features=AutomationControlled')
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# 其他安全配置
chrome_options.add_argument('--disable-web-security')
chrome_options.add_argument('--allow-running-insecure-content')
chrome_options.add_argument('--disable-features=VizDisplayCompositor')
# 根据系统选择chromedriver路径和chrome二进制文件路径
chrome_driver_path = 'win/chromedriver.exe'
chrome_binary_path = 'win/chrome-headless/chrome-headless-shell-win64/chrome-headless-shell.exe'
if platform.system() == 'Linux':
chrome_driver_path = 'linux/chromedriver'
chrome_binary_path = 'linux/chrome-headless/chrome-headless/chrome-headless-shell' # Linux使用系统安装的Chrome
# 指定Chrome二进制文件路径(用于chrome-headless-shell)
if chrome_binary_path and os.path.exists(chrome_binary_path):
chrome_options.binary_location = chrome_binary_path
logger.info(f"使用Chrome二进制: {chrome_binary_path}")
service = Service(executable_path=chrome_driver_path)
driver = None
try:
driver = webdriver.Chrome(service=service, options=chrome_options)
logger.info("Chrome浏览器初始化成功")
except Exception as e:
logger.error(f"Chrome浏览器初始化失败: {str(e)}")
return driver
# 设置隐式等待时间
# driver.implicitly_wait(10)
# 访问主页获取初始Cookie
logger.info("访问主页获取初始Cookie")
try:
driver.get(self.config.base_url)
except Exception as e:
logger.error(f"访问URL失败: {self.config.base_url}, 错误: {str(e)}")
return driver
time.sleep(random.uniform(2, 4))
# 检查是否有验证页面
page_source = driver.page_source
if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source:
logger.warning("检测到验证页面,尝试手动处理验证")
# 尝试等待用户手动处理验证
logger.info("请在30秒内手动完成验证...")
time.sleep(30)
# 刷新页面,检查验证是否完成
driver.refresh()
time.sleep(random.uniform(2, 4))
# 再次检查验证状态
page_source = driver.page_source
if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source:
logger.error("验证未完成,无法继续爬取")
# self.driver.quit()
# self.driver = None
return driver
return driver
def _normalize_url(self, url: str) -> str:
"""
规范化 URL,补全协议和域名
Args:
url: 原始 URL
Returns:
完整的 URL
"""
if not url:
return url
# 已经是完整 URL
if url.startswith("http://") or url.startswith("https://"):
return url
# 协议相对 URL,补充 https:
if url.startswith("//"):
return "https:" + url
# 相对路径,补全域名
return self.config.base_url + url
def _extract_inline_style(self, element) -> str:
"""
提取元素的计算样式并转换为inline style
Args:
element: Selenium WebElement
Returns:
inline style 字符串
"""
# 需要提取的CSS属性列表
css_properties = [
'text-align',
'text-indent',
'margin',
'margin-top',
'margin-bottom',
'margin-left',
'margin-right',
'padding',
'padding-top',
'padding-bottom',
'padding-left',
'padding-right',
'font-size',
'font-weight',
'font-style',
'color',
'background-color',
'line-height',
'letter-spacing',
'word-spacing'
]
styles = []
for prop in css_properties:
try:
value = element.value_of_css_property(prop)
# 过滤默认值和空值
if value and value not in ['none', 'normal', 'auto', '0px', 'rgba(0, 0, 0, 0)', 'transparent']:
# 对于 margin/padding,如果都是 0px 就跳过
if 'margin' in prop or 'padding' in prop:
if value == '0px' or value == '0':
continue
styles.append(f"{prop}: {value}")
except:
continue
return "; ".join(styles) if styles else ""
def parse_news_detail(self, url: str) -> NewsItem:
news_item = NewsItem(title='', contentRows=[], url=url)
if self.driver is None:
return news_item
try:
self.driver.get(url)
article_area_div = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'div.render-detail-article'))
)
except Exception as e:
logger.warning(f"访问文章页失败或未找到文章区域: {url}, {e}")
return news_item
# 基础信息获取
try:
title_div = article_area_div.find_element(By.CSS_SELECTOR, "div.render-detail-title")
news_item.title = title_div.text.strip()
except Exception as e:
logger.warning(f"提取标题失败: {e}")
try:
time_div = article_area_div.find_element(By.CSS_SELECTOR, "span.render-detail-time")
news_item.publishTime = time_div.text.strip()
except Exception as e:
logger.warning(f"提取发布时间失败: {e}")
try:
source_div = article_area_div.find_element(By.CSS_SELECTOR, "span.render-detail-resource")
news_item.source = source_div.text.strip().split(":")[1]
except Exception as e:
logger.warning(f"提取来源失败: {e}")
# 获取文章内容区域
try:
article_content_div = article_area_div.find_element(By.CSS_SELECTOR, "div.render-detail-article-content")
except Exception as e:
logger.warning(f"未找到文章内容区域: {e}")
return news_item
# 检查是否有分页
def is_page():
try:
page_div = article_content_div.find_element(By.CSS_SELECTOR, "div.detail-pagination-wrap")
return page_div is not None and page_div.is_displayed()
except:
return False
def get_content_rows():
"""提取文章内容行"""
try:
content_div = article_content_div.find_element(By.CSS_SELECTOR, "div.render-detail-content")
except Exception as e:
logger.warning(f"未找到内容区域: {str(e)}")
return
# 获取所有直接子元素
children = content_div.find_elements(By.XPATH, "./*")
for child in children:
try:
# 获取元素的class属性
class_name = child.get_attribute("class") or ""
# 图片元素
if "article-img" in class_name:
try:
img = child.find_element(By.TAG_NAME, "img")
img_src = img.get_attribute("src")
if img_src:
# 规范化URL
img_src = self._normalize_url(img_src)
# 添加图片标签
news_item.contentRows.append({
"type": "img",
"content": f'
'
})
logger.debug(f"提取图片: {img_src}")
continue
except Exception as e:
logger.warning(f"提取图片失败: {str(e)}")
# 视频元素
if "article-video" in class_name:
try:
video = child.find_element(By.TAG_NAME, "video")
video_src = video.get_attribute("src")
if video_src:
# 规范化URL
video_src = self._normalize_url(video_src)
# 添加视频标签
news_item.contentRows.append({
"type": "video",
"content": f''
})
logger.debug(f"提取视频: {video_src}")
continue
except Exception as e:
logger.warning(f"提取视频失败: {str(e)}")
# 文字元素(作为最后的兜底)
text_content = child.text.strip()
# 过滤空内容
if text_content:
# 提取计算样式并转换为inline style
inline_style = self._extract_inline_style(child)
tag_name = child.tag_name
# 构建新的HTML标签(用inline style替代class)
if inline_style:
content_html = f'<{tag_name} style="{inline_style}">{child.get_attribute("innerHTML")}{tag_name}>'
else:
content_html = f'<{tag_name}>{child.get_attribute("innerHTML")}{tag_name}>'
news_item.contentRows.append({
"type": "text",
"content": content_html
})
logger.debug(f"提取文字(转换样式): {text_content[:50]}...")
except Exception as e:
logger.warning(f"处理内容元素失败: {str(e)}")
continue
get_content_rows()
if is_page():
pass
logger.info(f"解析文章详情完成: {news_item.url}")
if news_item.contentRows:
news_item.executeStatus = 1
return news_item
def search(self, keyword, total=10) -> ResultDomain:
"""搜索新闻"""
search_config = self.config.urls.get("search")
if not self.driver:
logger.error("WebDriver未初始化,无法继续爬取")
return ResultDomain(code=1, message="WebDriver未初始化,无法继续爬取", success=False)
count = 0
url_base_map = {}
url_list = []
news_list = []
resultDomain = ResultDomain(code=0, message="", success=True, dataList=news_list)
def get_search_url():
"""从当前页面提取URL数据"""
nonlocal count
try:
# 等待页面加载完成
# assert self.driver is not None, "WebDriver未初始化"
if self.driver is None:
logger.error("WebDriver未初始化")
return
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div.search-result")))
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div.c-card:not(.c-sc)")))
# 解析HTML搜索结果
home = self.driver.find_element(By.CSS_SELECTOR, "div.home")
search_content = home.find_element(By.CSS_SELECTOR, "div.search-content")
search_result_div = search_content.find_element(By.CSS_SELECTOR, "div.search-result")
item_s = search_result_div.find_elements(By.CSS_SELECTOR, "div.c-card:not(.c-sc)")
for item in item_s:
if count >= total:
break
try:
# 从 a 标签获取 URL
link = item.find_element(By.CSS_SELECTOR, "a[href]")
url = link.get_attribute("href")
# 从 h3 > span.title 获取标题
title = item.find_element(By.CSS_SELECTOR, "h3 span.title").text
# 从 div.time 获取来源和时间
time_element = item.find_element(By.CSS_SELECTOR, "div.time")
time_text = time_element.text.strip()
# 判断是换行符分隔还是空格分隔
if '\n' in time_text:
time_lines = time_text.split('\n')
source = time_lines[0].strip() if len(time_lines) > 0 else ''
publish_time = time_lines[1].strip() if len(time_lines) > 1 else ''
else:
# 空格分隔,使用正则提取日期格式
date_match = re.search(r'\d{4}-\d{2}-\d{2}', time_text)
if date_match:
publish_time = date_match.group()
source = time_text[:date_match.start()].strip()
else:
source = ''
publish_time = time_text
url_base_map[url] = {
'title': title,
'source': source,
'publishTime': publish_time
}
url_list.append(url)
count += 1
except Exception as e:
logger.warning(f"解析某个搜索结果失败: {str(e)}")
continue
logger.info(f"本页提取到 {len(item_s)} 条搜索结果")
except Exception as e:
logger.exception(f"提取URL过程出错: {str(e)}")
# 方式1:初次手动点击按钮进入
logger.info("访问搜索页面并手动点击搜索")
self.driver.get(search_config.url)
time.sleep(2)
home = self.driver.find_element(By.CSS_SELECTOR, "div.home")
logger.info(home)
input_wapper_div = self.driver.find_element(By.CSS_SELECTOR, 'div.search-input-wrapper')
input_div = input_wapper_div.find_element(By.CSS_SELECTOR, 'input.search-type-input-compact')
input_div.send_keys(keyword)
search_btn = input_wapper_div.find_element(By.CSS_SELECTOR, 'button[type="submit"]')
search_btn.click()
time.sleep(2)
# 提取第一页数据
get_search_url()
# 方式2:后续页直接通过URL进入
while count < total:
# 记录提取前的数量
count_before = count
# 构建下一页URL
current_url = self.driver.current_url
qs = urlparse(current_url)
param = parse_qs(qs.query)
current_page = int(param.get('page', ['1'])[0])
param['page'] = [str(current_page + 1)]
new_url = urlunparse((qs.scheme, qs.netloc, qs.path, qs.params, urlencode(param, doseq=True), qs.fragment))
logger.info(f"翻页到第 {current_page + 1} 页")
# 直接访问新页面
self.driver.get(new_url)
time.sleep(2)
# 提取数据
get_search_url()
# 如果本页没有提取到新数据,说明没有更多结果
if count == count_before:
logger.info("本页没有提取到新数据,结束翻页")
break
logger.info(f"共提取 {len(url_list)} 条URL")
# 解析文章详情
for url in url_list:
try:
news_item = self.parse_news_detail(url)
if news_item:
# 如果某些为空,根据url_base_map补齐
if news_item.title is None or news_item.title.strip() == "":
news_item.title = url_base_map[url].get("title", "")
if news_item.publishTime is None or news_item.publishTime.strip() == "":
news_item.publishTime = url_base_map[url].get("publishTime", "")
if news_item.source is None or news_item.source.strip() == "":
news_item.source = url_base_map[url].get("source", "")
news_list.append(news_item)
except Exception as e:
logger.warning(f"解析文章详情失败: {str(e)}")
continue
resultDomain.dataList = news_list
# with open("Xxqg_news_list.json", "w", encoding="utf-8") as f:
# json.dump([item.model_dump() for item in resultDomain.dataList] if resultDomain.dataList else [], f, ensure_ascii=False, indent=4)
return resultDomain
def crawl_base(self, config: UrlConfig, yesterday=True, start:Optional[str]=None, end:Optional[str]=None) -> ResultDomain:
news_list = []
resultDomain = ResultDomain(code=0, message="", success=True, dataList=news_list)
if self.driver is None:
logger.error("WebDriver未初始化,无法继续爬取")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = "WebDriver未初始化"
return resultDomain
self.driver.get(config.url)
try:
if self.driver is None:
resultDomain.message="driver未初始化"
return resultDomain
left_div = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'div#page-main'))
)
except Exception as e:
logger.exception(f"访问首页失败: {str(e)}")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = f"访问首页失败: {str(e)}"
return resultDomain
# 从selenium-wire捕获的请求中筛选包含JSON数据的请求
time.sleep(3) # 等待所有请求完成
request_list = self.driver.requests
json_request = []
target_path = config.params.get("path", "")
target_request = None
logger.info(f"开始查找目标JSON请求,共有 {len(request_list)} 个请求")
# 首先查找包含完整路径的JSON请求
for request in request_list:
if ".json" in request.url:
json_request.append(request)
if target_path in request.url:
target_request = request
if target_request is None:
logger.error("未找到目标JSON请求")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = "未找到目标JSON请求"
return resultDomain
# 解析meta请求响应获取channelId
try:
meta_data = json.loads(target_request.response.body)
# logger.info(f"Meta响应数据: {meta_data}")
# 提取channelId
if 'pageData' in meta_data and 'channel' in meta_data['pageData']:
meta_id = meta_data['pageData']['channel']['channelId']
logger.info(f"成功获取channelId: {meta_id}")
else:
logger.error(f"Meta数据结构异常,无法找到channelId。数据结构: {meta_data.keys()}")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = "无法从meta请求中提取channelId"
return resultDomain
except Exception as e:
logger.exception(f"解析meta请求失败: {str(e)}")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = f"解析meta请求失败: {str(e)}"
return resultDomain
# 使用channelId查找文章数据请求
data_request = None
for json_item in json_request:
if meta_id in json_item.url:
data_request = json_item
break
if data_request is None:
logger.error("未找到目标JSON请求")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = "未找到目标JSON请求"
return resultDomain
# 解析文章数据请求响应(可能是gzip压缩的)
try:
response_body = data_request.response.body
# 检查是否是gzip压缩
if response_body[:2] == b'\x1f\x8b': # gzip magic number
import gzip
response_body = gzip.decompress(response_body)
logger.info("检测到gzip压缩,已解压")
# 解码为字符串
if isinstance(response_body, bytes):
response_body = response_body.decode('utf-8')
article_data = json.loads(response_body)
logger.info(f"成功解析文章数据,共 {len(article_data)} 条")
except Exception as e:
logger.exception(f"解析文章数据失败: {str(e)}")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = f"解析文章数据失败: {str(e)}"
return resultDomain
# 确定时间筛选范围(在循环外计算,避免重复)
if not yesterday and start and end:
# 自定义时间范围
start_date = start
end_date = end
logger.info(f"使用自定义时间范围: {start_date} 到 {end_date}")
else:
# 默认昨天
yesterday_str = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
start_date = yesterday_str
end_date = yesterday_str
logger.info(f"使用默认时间范围(昨天): {yesterday_str}")
# 计算起始日期的前一天,用于提前终止循环(优化性能)
day_before_start = (datetime.strptime(start_date, '%Y-%m-%d') - timedelta(days=1)).strftime('%Y-%m-%d')
for article in article_data:
# 提取发布日期 "publishTime": "2025-11-21 10:04:20"
publish_date = article['publishTime'].split(" ")[0]
# 跳过未来的新闻(如果有)
if publish_date > end_date:
continue
# 在时间范围内的新闻
if publish_date >= start_date and publish_date <= end_date:
try:
# 提取来源,安全处理
source = article['source'].split("_")[1] if "_" in article.get('source', '') else article.get('source', '')
news_item = self.parse_news_detail(article['url'])
news_item.title = article['title']
news_item.publishTime = article['publishTime']
news_item.source = source
news_item.url = article['url']
news_list.append(news_item)
logger.info(f"添加新闻: {news_item.title} ({publish_date})")
except Exception as e:
logger.warning(f"解析文章详情失败: {article.get('title', 'unknown')} - {str(e)}")
continue
# 如果遇到比起始日期还早的新闻,提前终止(数据按时间倒序)
elif publish_date < day_before_start:
logger.info(f"已到达时间范围之前的新闻({publish_date}),停止遍历")
break
resultDomain.dataList = news_list
# with open("Xxqg_important_news_list.json", "w", encoding="utf-8") as f:
# json.dump([item.model_dump() for item in resultDomain.dataList] if resultDomain.dataList else [], f, ensure_ascii=False, indent=4)
return resultDomain
def crawl_important(self, total=10) -> ResultDomain:
"""
爬取重要新闻栏目
参考旧版myQiangguo爬虫方式,使用requests获取文章列表,然后用Selenium解析详情
Args:
total: 最多爬取的文章数量,默认10
Returns:
ResultDomain: 包含新闻列表的结果对象
"""
news_list = []
resultDomain = ResultDomain(code=0, message="", success=True, dataList=news_list)
if self.driver is None:
logger.error("WebDriver未初始化,无法继续爬取")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = "WebDriver未初始化"
return resultDomain
# 获取important配置
important_config = self.config.urls.get("important")
if not important_config:
logger.error("未找到important配置")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = "未找到important配置"
return resultDomain
resultDomain = self.crawl_base(important_config)
return resultDomain
def crawl_xuexishiping(self, total=10) -> ResultDomain:
"""
爬取学习时评栏目
Args:
total: 最多爬取的文章数量,默认10
Returns:
ResultDomain: 包含新闻列表的结果对象
"""
news_list = []
resultDomain = ResultDomain(code=0, message="", success=True, dataList=news_list)
if self.driver is None:
logger.error("WebDriver未初始化,无法继续爬取")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = "WebDriver未初始化"
return resultDomain
# 获取important配置
xuexishiping_config = self.config.urls.get("xuexishiping")
if not xuexishiping_config:
logger.error("xuexishiping")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = "xuexishiping"
return resultDomain
resultDomain = self.crawl_base(xuexishiping_config)
return resultDomain
def home(self, type="") -> ResultDomain:
"""获取首页数据"""
count = 0
url_base_map = {}
url_list = []
news_list = []
resultDomain = ResultDomain(code=0, message="", success=True, dataList=news_list)
home_config = self.config.urls.get("home")
self.driver.get(home_config.url)
try:
if self.driver is None:
resultDomain.message="driver未初始化"
return resultDomain
home_div = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'div.page-main > div.grid-cell > div.grid-cell'))
)
except Exception as e:
resultDomain.message=f"获取首页失败: {str(e)}"
return resultDomain
section_divs = home_div.find_elements(By.CSS_SELECTOR, 'section')
return resultDomain