diff --git a/schoolNewsCrawler/crawler/xhw/XhwCrawler.py b/schoolNewsCrawler/crawler/xhw/XhwCrawler.py index 86e600b..629fcf3 100644 --- a/schoolNewsCrawler/crawler/xhw/XhwCrawler.py +++ b/schoolNewsCrawler/crawler/xhw/XhwCrawler.py @@ -1,4 +1,5 @@ # 新华网爬虫 +from itertools import count from typing import List, Optional from bs4 import Tag @@ -84,24 +85,24 @@ class XhwCrawler(BaseCrawler): chrome_options.add_argument('--disable-web-security') chrome_options.add_argument('--allow-running-insecure-content') chrome_options.add_argument('--disable-features=VizDisplayCompositor') - chrome_options.add_argument('--remote-debugging-port=9222') # 添加调试端口 service = Service(executable_path=r"chromedriver.exe") + driver = None try: - self.driver = webdriver.Chrome(service=service, options=chrome_options) + driver = webdriver.Chrome(service=service, options=chrome_options) logger.info("Chrome浏览器初始化成功") except Exception as e: logger.error(f"Chrome浏览器初始化失败: {str(e)}") return None # 设置隐式等待时间 - self.driver.implicitly_wait(10) + # driver.implicitly_wait(10) # 访问主页获取初始Cookie logger.info("访问主页获取初始Cookie") logger.info(f"准备访问URL: {self.config.base_url}") try: - self.driver.get(self.config.base_url) + driver.get(self.config.base_url) logger.info(f"成功访问URL: {self.config.base_url}") except Exception as e: logger.error(f"访问URL失败: {self.config.base_url}, 错误: {str(e)}") @@ -109,7 +110,7 @@ class XhwCrawler(BaseCrawler): time.sleep(random.uniform(2, 4)) # 检查是否有验证页面 - page_source = self.driver.page_source + page_source = driver.page_source if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source: logger.warning("检测到验证页面,尝试手动处理验证") @@ -118,318 +119,244 @@ class XhwCrawler(BaseCrawler): time.sleep(30) # 刷新页面,检查验证是否完成 - self.driver.refresh() + driver.refresh() time.sleep(random.uniform(2, 4)) # 再次检查验证状态 - page_source = self.driver.page_source + page_source = driver.page_source if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source: logger.error("验证未完成,无法继续爬取") # self.driver.quit() # self.driver = None return None - return self.driver + return driver - def __del__(self): - """析构函数,确保关闭浏览器""" - if hasattr(self, 'driver') and self.driver: - self.driver.quit() - logger.info("浏览器已关闭") - - def search(self, key:str, total=10, action="news") -> ResultDomain: - # 检查driver是否已初始化 - if not self.driver: - logger.error("WebDriver未初始化,无法继续爬取") - resultDomain = ResultDomain(code=1, message="WebDriver未初始化,无法继续爬取", success=False) - return resultDomain - - # 直接使用self.driver - news_urls = [] - news_list = [] - resultDomain = ResultDomain(code=0, message="", success=True, dataList=news_list) - # 获取搜索配置 - search_config = self.config.urls.get("search") - if not search_config: - logger.error("未找到搜索URL配置") - resultDomain.code = 0 - resultDomain.message = "未找到搜索URL配置" - resultDomain.success = False - return resultDomain - pagesize = 10 - # 准备搜索参数 - search_data = search_config.params.copy() - search_data["k"] = key - search_data["action"] = action - - # 获取新闻url - url_base_map = {} - # 向上取整计算需要的页数 - total_pages = (total + pagesize - 1) // pagesize - for page in range(1, total_pages + 1): - search_data["page"] = page - pageHtml = search_config.url + "?" + urlencode(search_data) - # 分页的html - logger.info(f"请求URL: {pageHtml}") - - # 使用Selenium访问页面 - self.driver.get(pageHtml) - time.sleep(random.uniform(2, 4)) - - # 检查是否有验证页面 - if not self.driver: - logger.error("WebDriver已失效,无法继续爬取") - resultDomain = ResultDomain(code=1, message="WebDriver已失效,无法继续爬取", success=False) - return resultDomain - - page_source = self.driver.page_source - if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source: - logger.warning("检测到验证页面,尝试手动处理验证") - logger.info("请在30秒内手动完成验证...") - time.sleep(30) - - # 检查driver是否仍然有效 - if not self.driver: - logger.error("WebDriver已失效,无法继续爬取") - resultDomain = ResultDomain(code=1, message="WebDriver已失效,无法继续爬取", success=False) - return resultDomain - - self.driver.refresh() - time.sleep(random.uniform(2, 4)) - - # 再次检查验证状态 - if not self.driver: - logger.error("WebDriver已失效,无法继续爬取") - resultDomain = ResultDomain(code=1, message="WebDriver已失效,无法继续爬取", success=False) - return resultDomain - - page_source = self.driver.page_source - if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source: - logger.error("验证未完成,无法继续爬取") - resultDomain = ResultDomain(code=1, message="验证未完成,无法继续爬取", success=False) - return resultDomain - - # 解析页面内容 - pageSoup = self.parse_html(page_source) - logger.info(f"解析后的HTML内容: {str(pageSoup)[:500]}...") # 只输出前500个字符 - # 从分页中获取新闻url - searchMainDiv = pageSoup.find("div", class_="page-search-main") - if not searchMainDiv: - logger.error("未找到搜索主体部分") - resultDomain.code = 0 - resultDomain.message = "未找到搜索主体部分" - resultDomain.success = False - return resultDomain - searchGroupDiv = searchMainDiv.find("div", class_="page-search-group") - if not searchGroupDiv: - logger.error("未找到搜索组") - resultDomain.code = 0 - resultDomain.message = "未找到搜索组" - resultDomain.success = False - return resultDomain - newsDiv = searchGroupDiv.find("div", class_="page-search-news") - if not newsDiv: - logger.error("未找到新闻列表") - resultDomain.code = 0 - resultDomain.message = "未找到新闻列表" - resultDomain.success = False - return resultDomain - newsList = newsDiv.find_all("div", class_="group") - for news in newsList: - news_info = news.find("div.head") - news_title = news_info.find("div.title") - news_date = news_info.find("div.date").text.strip() - url = news_title.find("a").get("href") - url_base_map[url] = {"title": news_title.get_text(strip=True), "date": news_date} - news_urls.append(url) - # 临时保存url到url.json - with open("url.json", "w", encoding="utf-8") as f: - json.dump(url_base_map, f, ensure_ascii=False, indent=4) - # 从新闻url中获取新闻详情 - for news_url in news_urls: - news = self.parse_news_detail(news_url) - news.title = url_base_map.get(news_url, {}).get("title") - news.publishTime = url_base_map.get(news_url, {}).get("date") - news_list.append(news) - - # 临时保存新闻到news.json - with open("news.json", "w", encoding="utf-8") as f: - json.dump(news_list, f, ensure_ascii=False, indent=4) - - # 关闭浏览器 - if self.driver: - self.driver.quit() - logger.info("浏览器已关闭") - - return resultDomain - def parse_news_detail(self, url: str) -> Optional[NewsItem]: - return self.parse_xhsz_news_detail_selenium(url) + return self.parse_xhsz_news_detail(url) + + def parse_xhsz_news_detail(self, url: str) -> NewsItem: + """ + 使用Selenium解析新华网新闻详情页 + 异常局部捕获,保证返回 NewsItem 对象,即使部分内容解析失败 + """ + news_item = NewsItem(title="", contentRows=[], url=url) - def parse_xhsz_news_detail_selenium(self, url: str) -> Optional[NewsItem]: - # 检查driver是否已初始化 if not self.driver: logger.error("WebDriver未初始化,无法获取新闻详情") - return None - - newsItem = NewsItem(title="", contentRows=[], url=url) + return news_item - # 使用Selenium访问新闻详情页 - self.driver.get(url) - time.sleep(random.uniform(2, 4)) + try: + self.driver.get(url) + time.sleep(2) + except Exception as e: + logger.warning(f"访问新闻详情页失败: {url}, {e}") + return news_item + + # 滑动验证处理 + try: + sliders = self.driver.find_elements(By.CSS_SELECTOR, ".handler.handler_bg") + if sliders: + slider = sliders[0] + action_chain = ActionChains(self.driver) + action_chain.click_and_hold(slider).perform() + distance = 1000 + tracks = [distance*0.2, distance*0.3, distance*0.25, distance*0.25] + for track in tracks: + action_chain.move_by_offset(int(track), 0).pause(1) + action_chain.perform() + action_chain.release().perform() + time.sleep(2) + except Exception as e: + logger.info(f"滑块验证处理失败或未出现: {e}") - # 检查是否有验证页面 - if not self.driver: - logger.error("WebDriver已失效,无法获取新闻详情") - return None - - page_source = self.driver.page_source - if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source: - logger.warning("检测到验证页面,尝试手动处理验证") - logger.info("请在30秒内手动完成验证...") - time.sleep(30) - - # 检查driver是否仍然有效 - if not self.driver: - logger.error("WebDriver已失效,无法获取新闻详情") - return None - - self.driver.refresh() - time.sleep(random.uniform(2, 4)) + final_url = self.driver.current_url + if final_url != url: + news_item = self.parse_xh_news_detail(final_url) + news_item.url = url + return news_item - # 再次检查验证状态 - if not self.driver: - logger.error("WebDriver已失效,无法获取新闻详情") - return None - - page_source = self.driver.page_source - if "验证" in page_source or "captcha" in page_source.lower() or "滑动验证" in page_source: - logger.error("验证未完成,无法获取新闻详情") - return None + # 新闻主体 + try: + main_div = WebDriverWait(self.driver, 10).until( + EC.presence_of_element_located((By.CSS_SELECTOR, "div.page-news-detail")) + ) + except Exception as e: + logger.warning(f"未找到新闻主体: {url}, {e}") + return news_item - # 解析页面内容 - newsDetailSoup = self.parse_html(page_source) + try: + article_div = main_div.find_element(By.CSS_SELECTOR, "div.page-news-l") + except: + logger.warning(f"未找到文章主体: {url}") + return news_item - # 查找新闻主体部分 - main_div = newsDetailSoup.find("div.page-news-detail") - if not main_div: - logger.error(f"未找到新闻主体部分: {url}") - return None + # 标题 + try: + title_div = article_div.find_element(By.CSS_SELECTOR, "div.page-news-detail-title") + news_item.title = title_div.text.strip() + except: + pass - article_div = main_div.find("div.page-news-l") - if not article_div: - logger.error(f"未找到新闻文章部分: {url}") - return None - - # 获取标题 - title_div = article_div.find("div.page-news-detail-title") - if title_div: - newsItem.title = title_div.text.strip() - - # 获取新闻元信息 - channal_div = article_div.find("div.page-news-detail-note") - if channal_div: - channal_items_div = channal_div.find_all("div.item") - for item in channal_items_div: + # 新闻元信息 + try: + channal_div = article_div.find_element(By.CSS_SELECTOR, "div.page-news-detail-note") + channal_items = channal_div.find_elements(By.CSS_SELECTOR, "div.item") + for item in channal_items: text = item.text.strip() if "来源" in text: - parts = text.split(":", 1) - if len(parts) > 1: - newsItem.source = parts[1].strip() + news_item.source = text.split(":", 1)[-1].strip() elif "发布时间" in text: - parts = text.split(":", 1) - if len(parts) > 1: - newsItem.publishTime = parts[1].strip() + news_item.publishTime = text.split(":", 1)[-1].strip() elif "浏览人数" in text: - parts = text.split(":", 1) - if len(parts) > 1: - newsItem.viewCount = parts[1].strip() + try: + news_item.viewCount = int(text.split(":", 1)[-1].strip()) + except: + pass + except: + pass - # 获取新闻内容 - content_div = article_div.find("div.page-news-detail-content") - if content_div: - # 遍历内容区域中的所有元素 - for child in content_div.children: - if not isinstance(child, Tag): + # 内容 + try: + content_div = article_div.find_element(By.CSS_SELECTOR, "div.page-news-detail-content") + children = content_div.find_elements(By.XPATH, "./*") + for child in children: + try: + tag_name = child.tag_name.lower() + if tag_name == "p": + text = child.text.strip().replace("\xa0", "") + if not text and len(child.find_elements(By.TAG_NAME, "img")) == 0: + continue + + # 图片 + try: + img = child.find_element(By.TAG_NAME, "img") + src = img.get_attribute("src") + if src and not src.startswith("http"): + src = self.config.base_url + src + news_item.contentRows.append({"tag": "img", "content": f""}) + continue + except: + pass + + # 视频 + try: + video = child.find_element(By.TAG_NAME, "video") + src = video.get_attribute("src") + if src and not src.startswith("http"): + src = self.config.base_url + src + news_item.contentRows.append({"tag": "video", "content": f"