# 新华网爬虫 from itertools import count from typing import List, Optional from bs4 import Tag from pydantic import InstanceOf from sqlalchemy import false from core.ResultDomain import ResultDomain from crawler.BaseCrawler import BaseCrawler, CrawlerConfig, NewsItem, UrlConfig from loguru import logger import re import chardet from datetime import datetime, timedelta from bs4.element import NavigableString from urllib.parse import urlparse, urlencode, urlunparse import json from seleniumwire import webdriver # 注意不是 selenium from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.chrome.service import Service import platform from urllib.parse import urlparse, parse_qs import time import random import os class XxqgCrawler(BaseCrawler): def __init__(self): """初始化学习强国爬虫""" config = CrawlerConfig( base_url="https://www.xuexi.cn/", urls={ "search": UrlConfig( url="https://static.xuexi.cn/search/online/index.html", apiurl="https://search.xuexi.cn/api/search", method="GET", params={ "query": "" }, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0', 'Referer': 'https://www.xuexi.cn/', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } ), "important": UrlConfig( url="https://www.xuexi.cn/98d5ae483720f701144e4dabf99a4a34/5957f69bffab66811b99940516ec8784.html", method="GET", params={ "path": "98d5ae483720f701144e4dabf99a4a34/5957f69bffab66811b99940516ec8784" }, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0', 'Referer': 'https://www.xuexi.cn/', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } ), "xuexishiping":UrlConfig( url="https://www.xuexi.cn/d05cad69216e688d304bb91ef3aac4c6/9a3668c13f6e303932b5e0e100fc248b.html", method="GET", params={ "path": "d05cad69216e688d304bb91ef3aac4c6/9a3668c13f6e303932b5e0e100fc248b" }, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0', 'Referer': 'https://www.xuexi.cn/', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } ), "zongheshiping": UrlConfig( url="https://www.xuexi.cn/7097477a9643eacffe4cc101e4906fdb/9a3668c13f6e303932b5e0e100fc248b.html", method="GET", params={ "path": "7097477a9643eacffe4cc101e4906fdb/9a3668c13f6e303932b5e0e100fc248b" }, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0', 'Referer': 'https://www.xuexi.cn/', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } ), "zhongxuanbu": UrlConfig( url="https://www.xuexi.cn/105c2fa2843fa9e6d17440e172115c92/9a3668c13f6e303932b5e0e100fc248b.html", method="GET", params={ "path": "105c2fa2843fa9e6d17440e172115c92/9a3668c13f6e303932b5e0e100fc248b" }, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0', 'Referer': 'https://www.xuexi.cn/', 'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"' } ), }, ) super().__init__(config) # 初始化时创建driver self.driver = self._init_driver() def _init_driver(self): """初始化并返回Chrome WebDriver实例""" chrome_options = Options() # 确保浏览器可见,不使用无头模式 # 或者完全删除这行,因为默认就是有界面模式 chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--disable-blink-features=AutomationControlled') chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) chrome_options.add_experimental_option('useAutomationExtension', False) chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36') # 确保浏览器可见 chrome_options.add_argument('--start-maximized') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--disable-web-security') chrome_options.add_argument('--allow-running-insecure-content') chrome_options.add_argument('--disable-features=VizDisplayCompositor') # 判断系统类型获取对应的chromedriver路径 chrome_driver_path = 'win/chromedriver.exe' if platform.system() == 'Linux': chrome_driver_path = 'linux/chromedriver' service = Service(executable_path=chrome_driver_path) driver = None try: driver = webdriver.Chrome(service=service, options=chrome_options) logger.info("Chrome浏览器初始化成功") except Exception as e: logger.error(f"Chrome浏览器初始化失败: {str(e)}") return driver # 设置隐式等待时间 # driver.implicitly_wait(10) # 访问主页获取初始Cookie logger.info("访问主页获取初始Cookie") try: driver.get(self.config.base_url) except Exception as e: logger.error(f"访问URL失败: {self.config.base_url}, 错误: {str(e)}") return driver time.sleep(random.uniform(2, 4)) # 检查是否有验证页面 page_source = driver.page_source if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source: logger.warning("检测到验证页面,尝试手动处理验证") # 尝试等待用户手动处理验证 logger.info("请在30秒内手动完成验证...") time.sleep(30) # 刷新页面,检查验证是否完成 driver.refresh() time.sleep(random.uniform(2, 4)) # 再次检查验证状态 page_source = driver.page_source if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source: logger.error("验证未完成,无法继续爬取") # self.driver.quit() # self.driver = None return driver return driver def _normalize_url(self, url: str) -> str: """ 规范化 URL,补全协议和域名 Args: url: 原始 URL Returns: 完整的 URL """ if not url: return url # 已经是完整 URL if url.startswith("http://") or url.startswith("https://"): return url # 协议相对 URL,补充 https: if url.startswith("//"): return "https:" + url # 相对路径,补全域名 return self.config.base_url + url def parse_news_detail(self, url: str) -> NewsItem: news_item = NewsItem(title='', contentRows=[], url=url) if self.driver is None: return news_item try: self.driver.get(url) article_area_div = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, 'div.render-detail-article')) ) except Exception as e: logger.warning(f"访问文章页失败或未找到文章区域: {url}, {e}") return news_item # 基础信息获取 try: title_div = article_area_div.find_element(By.CSS_SELECTOR, "div.render-detail-title") news_item.title = title_div.text.strip() except Exception as e: logger.warning(f"提取标题失败: {e}") try: time_div = article_area_div.find_element(By.CSS_SELECTOR, "span.render-detail-time") news_item.publishTime = time_div.text.strip() except Exception as e: logger.warning(f"提取发布时间失败: {e}") try: source_div = article_area_div.find_element(By.CSS_SELECTOR, "span.render-detail-resource") news_item.source = source_div.text.strip().split(":")[1] except Exception as e: logger.warning(f"提取来源失败: {e}") # 获取文章内容区域 try: article_content_div = article_area_div.find_element(By.CSS_SELECTOR, "div.render-detail-article-content") except Exception as e: logger.warning(f"未找到文章内容区域: {e}") return news_item # 检查是否有分页 def is_page(): try: page_div = article_content_div.find_element(By.CSS_SELECTOR, "div.detail-pagination-wrap") return page_div is not None and page_div.is_displayed() except: return False def get_content_rows(): """提取文章内容行""" try: content_div = article_content_div.find_element(By.CSS_SELECTOR, "div.render-detail-content") except Exception as e: logger.warning(f"未找到内容区域: {str(e)}") return # 获取所有直接子元素 children = content_div.find_elements(By.XPATH, "./*") for child in children: try: # 获取元素的class属性 class_name = child.get_attribute("class") or "" # 图片元素 if "article-img" in class_name: try: img = child.find_element(By.TAG_NAME, "img") img_src = img.get_attribute("src") if img_src: # 规范化URL img_src = self._normalize_url(img_src) # 添加图片标签 news_item.contentRows.append({ "type": "img", "content": f'' }) logger.debug(f"提取图片: {img_src}") continue except Exception as e: logger.warning(f"提取图片失败: {str(e)}") # 视频元素 if "article-video" in class_name: try: video = child.find_element(By.TAG_NAME, "video") video_src = video.get_attribute("src") if video_src: # 规范化URL video_src = self._normalize_url(video_src) # 添加视频标签 news_item.contentRows.append({ "type": "video", "content": f'' }) logger.debug(f"提取视频: {video_src}") continue except Exception as e: logger.warning(f"提取视频失败: {str(e)}") # 文字元素(作为最后的兜底) text_content = child.text.strip() # 过滤空内容 if text_content: news_item.contentRows.append({ "type": "text", "content": text_content }) logger.debug(f"提取文字: {text_content[:50]}...") except Exception as e: logger.warning(f"处理内容元素失败: {str(e)}") continue get_content_rows() if is_page(): pass logger.info(f"解析文章详情完成: {news_item.model_dump()}") return news_item def search(self, keyword, total=10) -> ResultDomain: """搜索新闻""" search_config = self.config.urls.get("search") if not self.driver: logger.error("WebDriver未初始化,无法继续爬取") return ResultDomain(code=1, message="WebDriver未初始化,无法继续爬取", success=False) count = 0 url_base_map = {} url_list = [] news_list = [] resultDomain = ResultDomain(code=0, message="", success=True, dataList=news_list) def get_search_url(): """从当前页面提取URL数据""" nonlocal count try: # 等待页面加载完成 # assert self.driver is not None, "WebDriver未初始化" if self.driver is None: logger.error("WebDriver未初始化") return wait = WebDriverWait(self.driver, 10) wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div.search-result"))) wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div.c-card:not(.c-sc)"))) # 解析HTML搜索结果 home = self.driver.find_element(By.CSS_SELECTOR, "div.home") search_content = home.find_element(By.CSS_SELECTOR, "div.search-content") search_result_div = search_content.find_element(By.CSS_SELECTOR, "div.search-result") item_s = search_result_div.find_elements(By.CSS_SELECTOR, "div.c-card:not(.c-sc)") for item in item_s: if count >= total: break try: # 从 a 标签获取 URL link = item.find_element(By.CSS_SELECTOR, "a[href]") url = link.get_attribute("href") # 从 h3 > span.title 获取标题 title = item.find_element(By.CSS_SELECTOR, "h3 span.title").text # 从 div.time 获取来源和时间 time_element = item.find_element(By.CSS_SELECTOR, "div.time") time_text = time_element.text.strip() # 判断是换行符分隔还是空格分隔 if '\n' in time_text: time_lines = time_text.split('\n') source = time_lines[0].strip() if len(time_lines) > 0 else '' publish_time = time_lines[1].strip() if len(time_lines) > 1 else '' else: # 空格分隔,使用正则提取日期格式 date_match = re.search(r'\d{4}-\d{2}-\d{2}', time_text) if date_match: publish_time = date_match.group() source = time_text[:date_match.start()].strip() else: source = '' publish_time = time_text url_base_map[url] = { 'title': title, 'source': source, 'publishTime': publish_time } url_list.append(url) count += 1 except Exception as e: logger.warning(f"解析某个搜索结果失败: {str(e)}") continue logger.info(f"本页提取到 {len(item_s)} 条搜索结果") except Exception as e: logger.exception(f"提取URL过程出错: {str(e)}") # 方式1:初次手动点击按钮进入 logger.info("访问搜索页面并手动点击搜索") self.driver.get(search_config.url) time.sleep(2) home = self.driver.find_element(By.CSS_SELECTOR, "div.home") logger.info(home) input_wapper_div = self.driver.find_element(By.CSS_SELECTOR, 'div.search-input-wrapper') input_div = input_wapper_div.find_element(By.CSS_SELECTOR, 'input.search-type-input-compact') input_div.send_keys(keyword) search_btn = input_wapper_div.find_element(By.CSS_SELECTOR, 'button[type="submit"]') search_btn.click() time.sleep(2) # 提取第一页数据 get_search_url() # 方式2:后续页直接通过URL进入 while count < total: # 记录提取前的数量 count_before = count # 构建下一页URL current_url = self.driver.current_url qs = urlparse(current_url) param = parse_qs(qs.query) current_page = int(param.get('page', ['1'])[0]) param['page'] = [str(current_page + 1)] new_url = urlunparse((qs.scheme, qs.netloc, qs.path, qs.params, urlencode(param, doseq=True), qs.fragment)) logger.info(f"翻页到第 {current_page + 1} 页") # 直接访问新页面 self.driver.get(new_url) time.sleep(2) # 提取数据 get_search_url() # 如果本页没有提取到新数据,说明没有更多结果 if count == count_before: logger.info("本页没有提取到新数据,结束翻页") break logger.info(f"共提取 {len(url_list)} 条URL") # 解析文章详情 for url in url_list: try: news_item = self.parse_news_detail(url) if news_item: # 如果某些为空,根据url_base_map补齐 if news_item.title is None or news_item.title.strip() == "": news_item.title = url_base_map[url].get("title", "") if news_item.publishTime is None or news_item.publishTime.strip() == "": news_item.publishTime = url_base_map[url].get("publishTime", "") if news_item.source is None or news_item.source.strip() == "": news_item.source = url_base_map[url].get("source", "") news_list.append(news_item) except Exception as e: logger.warning(f"解析文章详情失败: {str(e)}") continue resultDomain.dataList = news_list # with open("Xxqg_news_list.json", "w", encoding="utf-8") as f: # json.dump([item.model_dump() for item in resultDomain.dataList] if resultDomain.dataList else [], f, ensure_ascii=False, indent=4) return resultDomain def crawl_base(self, config: CrawlerConfig, yesterday=True, start:Optional[str]=None, end:Optional[str]=None) -> ResultDomain: news_list = [] resultDomain = ResultDomain(code=0, message="", success=True, dataList=news_list) if self.driver is None: logger.error("WebDriver未初始化,无法继续爬取") resultDomain.code = 1 resultDomain.success = False resultDomain.message = "WebDriver未初始化" return resultDomain self.driver.get(config.url) try: if self.driver is None: resultDomain.message="driver未初始化" return resultDomain left_div = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, 'div#page-main')) ) except Exception as e: logger.exception(f"访问首页失败: {str(e)}") resultDomain.code = 1 resultDomain.success = False resultDomain.message = f"访问首页失败: {str(e)}" return resultDomain # 从selenium-wire捕获的请求中筛选包含JSON数据的请求 time.sleep(3) # 等待所有请求完成 request_list = self.driver.requests json_request = [] target_path = config.params.get("path") target_request = None logger.info(f"开始查找目标JSON请求,共有 {len(request_list)} 个请求") # 首先查找包含完整路径的JSON请求 for request in request_list: if ".json" in request.url: json_request.append(request) if target_path in request.url: target_request = request if target_request is None: logger.error("未找到目标JSON请求") resultDomain.code = 1 resultDomain.success = False resultDomain.message = "未找到目标JSON请求" return resultDomain # 解析meta请求响应获取channelId try: meta_data = json.loads(target_request.response.body) logger.info(f"Meta响应数据: {meta_data}") # 提取channelId if 'pageData' in meta_data and 'channel' in meta_data['pageData']: meta_id = meta_data['pageData']['channel']['channelId'] logger.info(f"成功获取channelId: {meta_id}") else: logger.error(f"Meta数据结构异常,无法找到channelId。数据结构: {meta_data.keys()}") resultDomain.code = 1 resultDomain.success = False resultDomain.message = "无法从meta请求中提取channelId" return resultDomain except Exception as e: logger.exception(f"解析meta请求失败: {str(e)}") resultDomain.code = 1 resultDomain.success = False resultDomain.message = f"解析meta请求失败: {str(e)}" return resultDomain # 使用channelId查找文章数据请求 data_request = None for json_item in json_request: if meta_id in json_item.url: data_request = json_item break if data_request is None: logger.error("未找到目标JSON请求") resultDomain.code = 1 resultDomain.success = False resultDomain.message = "未找到目标JSON请求" return resultDomain # 解析文章数据请求响应(可能是gzip压缩的) try: response_body = data_request.response.body # 检查是否是gzip压缩 if response_body[:2] == b'\x1f\x8b': # gzip magic number import gzip response_body = gzip.decompress(response_body) logger.info("检测到gzip压缩,已解压") # 解码为字符串 if isinstance(response_body, bytes): response_body = response_body.decode('utf-8') article_data = json.loads(response_body) logger.info(f"成功解析文章数据,共 {len(article_data)} 条") except Exception as e: logger.exception(f"解析文章数据失败: {str(e)}") resultDomain.code = 1 resultDomain.success = False resultDomain.message = f"解析文章数据失败: {str(e)}" return resultDomain # 确定时间筛选范围(在循环外计算,避免重复) if not yesterday and start and end: # 自定义时间范围 start_date = start end_date = end logger.info(f"使用自定义时间范围: {start_date} 到 {end_date}") else: # 默认昨天 yesterday_str = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') start_date = yesterday_str end_date = yesterday_str logger.info(f"使用默认时间范围(昨天): {yesterday_str}") # 计算起始日期的前一天,用于提前终止循环(优化性能) day_before_start = (datetime.strptime(start_date, '%Y-%m-%d') - timedelta(days=1)).strftime('%Y-%m-%d') for article in article_data: # 提取发布日期 "publishTime": "2025-11-21 10:04:20" publish_date = article['publishTime'].split(" ")[0] # 跳过未来的新闻(如果有) if publish_date > end_date: continue # 在时间范围内的新闻 if publish_date >= start_date and publish_date <= end_date: try: # 提取来源,安全处理 source = article['source'].split("_")[1] if "_" in article.get('source', '') else article.get('source', '') news_item = self.parse_news_detail(article['url']) news_item.title = article['title'] news_item.publishTime = article['publishTime'] news_item.source = source news_item.url = article['url'] news_list.append(news_item) logger.info(f"添加新闻: {news_item.title} ({publish_date})") except Exception as e: logger.warning(f"解析文章详情失败: {article.get('title', 'unknown')} - {str(e)}") continue # 如果遇到比起始日期还早的新闻,提前终止(数据按时间倒序) elif publish_date < day_before_start: logger.info(f"已到达时间范围之前的新闻({publish_date}),停止遍历") break resultDomain.dataList = news_list # with open("Xxqg_important_news_list.json", "w", encoding="utf-8") as f: # json.dump([item.model_dump() for item in resultDomain.dataList] if resultDomain.dataList else [], f, ensure_ascii=False, indent=4) return resultDomain def crawl_important(self, total=10) -> ResultDomain: """ 爬取重要新闻栏目 参考旧版myQiangguo爬虫方式,使用requests获取文章列表,然后用Selenium解析详情 Args: total: 最多爬取的文章数量,默认10 Returns: ResultDomain: 包含新闻列表的结果对象 """ news_list = [] resultDomain = ResultDomain(code=0, message="", success=True, dataList=news_list) if self.driver is None: logger.error("WebDriver未初始化,无法继续爬取") resultDomain.code = 1 resultDomain.success = False resultDomain.message = "WebDriver未初始化" return resultDomain # 获取important配置 important_config = self.config.urls.get("important") if not important_config: logger.error("未找到important配置") resultDomain.code = 1 resultDomain.success = False resultDomain.message = "未找到important配置" return resultDomain resultDomain = self.crawl_base(important_config) return resultDomain def crawl_xuexishiping(self, total=10) -> ResultDomain: """ 爬取学习时评栏目 Args: total: 最多爬取的文章数量,默认10 Returns: ResultDomain: 包含新闻列表的结果对象 """ news_list = [] resultDomain = ResultDomain(code=0, message="", success=True, dataList=news_list) if self.driver is None: logger.error("WebDriver未初始化,无法继续爬取") resultDomain.code = 1 resultDomain.success = False resultDomain.message = "WebDriver未初始化" return resultDomain # 获取important配置 xuexishiping_config = self.config.urls.get("xuexishiping") if not xuexishiping_config: logger.error("xuexishiping") resultDomain.code = 1 resultDomain.success = False resultDomain.message = "xuexishiping" return resultDomain resultDomain = self.crawl_base(xuexishiping_config) return resultDomain def home(self, type="") -> ResultDomain: """获取首页数据""" count = 0 url_base_map = {} url_list = [] news_list = [] resultDomain = ResultDomain(code=0, message="", success=True, dataList=news_list) home_config = self.config.urls.get("home") self.driver.get(home_config.url) try: if self.driver is None: resultDomain.message="driver未初始化" return resultDomain home_div = WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.CSS_SELECTOR, 'div.page-main > div.grid-cell > div.grid-cell')) ) except Exception as e: resultDomain.message=f"获取首页失败: {str(e)}" return resultDomain section_divs = home_div.find_elements(By.CSS_SELECTOR, 'section') return resultDomain