重要新闻

This commit is contained in:
2025-11-21 14:55:50 +08:00
parent 0e7cee3070
commit 7ccec2b624
4 changed files with 2018 additions and 219 deletions

View File

@@ -78,6 +78,27 @@ class XxqgCrawler(BaseCrawler):
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
),
"home": UrlConfig(
url="https://www.xuexi.cn/",
method="GET",
params={},
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
'Referer': 'https://www.xuexi.cn/',
'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
)
},
@@ -179,6 +200,127 @@ class XxqgCrawler(BaseCrawler):
# 相对路径,补全域名
return self.config.base_url + url
def parse_news_detail(self, url: str) -> NewsItem:
news_item = NewsItem(title='', contentRows=[], url=url)
if self.driver is None:
return news_item
try:
self.driver.get(url)
article_area_div = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'div.render-detail-article'))
)
except Exception as e:
logger.warning(f"访问文章页失败或未找到文章区域: {url}, {e}")
return news_item
# 基础信息获取
try:
title_div = article_area_div.find_element(By.CSS_SELECTOR, "div.render-detail-title")
news_item.title = title_div.text.strip()
except Exception as e:
logger.warning(f"提取标题失败: {e}")
try:
time_div = article_area_div.find_element(By.CSS_SELECTOR, "span.render-detail-time")
news_item.publishTime = time_div.text.strip()
except Exception as e:
logger.warning(f"提取发布时间失败: {e}")
try:
source_div = article_area_div.find_element(By.CSS_SELECTOR, "span.render-detail-source")
news_item.source = source_div.text.strip().split("")[1]
except Exception as e:
logger.warning(f"提取来源失败: {e}")
# 获取文章内容区域
try:
article_content_div = article_area_div.find_element(By.CSS_SELECTOR, "div.render-detail-article-content")
except Exception as e:
logger.warning(f"未找到文章内容区域: {e}")
return news_item
# 检查是否有分页
def is_page():
try:
page_div = article_content_div.find_element(By.CSS_SELECTOR, "div.detail-pagination-wrap")
return page_div is not None and page_div.is_displayed()
except:
return False
def get_content_rows():
"""提取文章内容行"""
try:
content_div = article_content_div.find_element(By.CSS_SELECTOR, "div.render-detail-content")
except Exception as e:
logger.warning(f"未找到内容区域: {str(e)}")
return
# 获取所有直接子元素
children = content_div.find_elements(By.XPATH, "./*")
for child in children:
try:
# 获取元素的class属性
class_name = child.get_attribute("class") or ""
# 图片元素
if "article-img" in class_name:
try:
img = child.find_element(By.TAG_NAME, "img")
img_src = img.get_attribute("src")
if img_src:
# 规范化URL
img_src = self._normalize_url(img_src)
# 添加图片标签
news_item.contentRows.append({
"type": "img",
"content": f'<img src="{img_src}" />'
})
logger.debug(f"提取图片: {img_src}")
continue
except Exception as e:
logger.warning(f"提取图片失败: {str(e)}")
# 视频元素
if "article-video" in class_name:
try:
video = child.find_element(By.TAG_NAME, "video")
video_src = video.get_attribute("src")
if video_src:
# 规范化URL
video_src = self._normalize_url(video_src)
# 添加视频标签
news_item.contentRows.append({
"type": "video",
"content": f'<video src="{video_src}" controls></video>'
})
logger.debug(f"提取视频: {video_src}")
continue
except Exception as e:
logger.warning(f"提取视频失败: {str(e)}")
# 文字元素(作为最后的兜底)
text_content = child.text.strip()
# 过滤空内容
if text_content:
news_item.contentRows.append({
"type": "text",
"content": text_content
})
logger.debug(f"提取文字: {text_content[:50]}...")
except Exception as e:
logger.warning(f"处理内容元素失败: {str(e)}")
continue
get_content_rows()
if is_page():
pass
logger.info(f"解析文章详情完成: {news_item.model_dump()}")
return news_item
def search(self, keyword, total=10) -> ResultDomain:
@@ -333,124 +475,178 @@ class XxqgCrawler(BaseCrawler):
with open("Xxqg_news_list.json", "w", encoding="utf-8") as f:
json.dump([item.model_dump() for item in resultDomain.dataList] if resultDomain.dataList else [], f, ensure_ascii=False, indent=4)
return resultDomain
def crawl_important(self, total=10) -> ResultDomain:
"""
爬取重要新闻栏目
参考旧版myQiangguo爬虫方式使用requests获取文章列表然后用Selenium解析详情
Args:
total: 最多爬取的文章数量默认10
Returns:
ResultDomain: 包含新闻列表的结果对象
"""
news_list = []
resultDomain = ResultDomain(code=0, message="", success=True, dataList=news_list)
def parse_news_detail(self, url: str) -> NewsItem:
news_item = NewsItem(title='', contentRows=[], url=url)
if self.driver is None:
return news_item
logger.error("WebDriver未初始化无法继续爬取")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = "WebDriver未初始化"
return resultDomain
# 获取important配置
important_config = self.config.urls.get("important")
if not important_config:
logger.error("未找到important配置")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = "未找到important配置"
return resultDomain
self.driver.get(important_config.url)
try:
self.driver.get(url)
article_area_div = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'div.render-detail-article'))
if self.driver is None:
resultDomain.message="driver未初始化"
return resultDomain
left_div = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'div#page-main'))
)
except Exception as e:
logger.warning(f"访问文章页失败或未找到文章区域: {url}, {e}")
return news_item
logger.exception(f"访问页失败: {str(e)}")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = f"访问首页失败: {str(e)}"
return resultDomain
# 基础信息获取
# 从selenium-wire捕获的请求中筛选包含JSON数据的请求
time.sleep(3) # 等待所有请求完成
request_list = self.driver.requests
json_request = []
target_path = "98d5ae483720f701144e4dabf99a4a34/5957f69bffab66811b99940516ec8784"
target_request = None
logger.info(f"开始查找目标JSON请求共有 {len(request_list)} 个请求")
# 首先查找包含完整路径的JSON请求
for request in request_list:
if ".json" in request.url:
json_request.append(request)
if target_path in request.url:
target_request = request
if target_request is None:
logger.error("未找到目标JSON请求")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = "未找到目标JSON请求"
return resultDomain
# 解析meta请求响应获取channelId
try:
title_div = article_area_div.find_element(By.CSS_SELECTOR, "div.render-detail-title")
news_item.title = title_div.text.strip()
except Exception as e:
logger.warning(f"提取标题失败: {e}")
try:
time_div = article_area_div.find_element(By.CSS_SELECTOR, "div.render-detail-time")
news_item.publishTime = time_div.text.strip()
except Exception as e:
logger.warning(f"提取发布时间失败: {e}")
try:
source_div = article_area_div.find_element(By.CSS_SELECTOR, "div.render-detail-source")
news_item.source = source_div.text.strip().split("")[1]
except Exception as e:
logger.warning(f"提取来源失败: {e}")
# 获取文章内容区域
try:
article_content_div = article_area_div.find_element(By.CSS_SELECTOR, "div.render-detail-article-content")
except Exception as e:
logger.warning(f"未找到文章内容区域: {e}")
return news_item
# 检查是否有分页
def is_page():
try:
page_div = article_content_div.find_element(By.CSS_SELECTOR, "div.detail-pagination-wrap")
return page_div is not None and page_div.is_displayed()
except:
return False
def get_content_rows():
"""提取文章内容行"""
try:
content_div = article_content_div.find_element(By.CSS_SELECTOR, "div.render-detail-content")
except Exception as e:
logger.warning(f"未找到内容区域: {str(e)}")
return
meta_data = json.loads(target_request.response.body)
logger.info(f"Meta响应数据: {meta_data}")
# 获取所有直接子元素
children = content_div.find_elements(By.XPATH, "./*")
# 提取channelId
if 'pageData' in meta_data and 'channel' in meta_data['pageData']:
meta_id = meta_data['pageData']['channel']['channelId']
logger.info(f"成功获取channelId: {meta_id}")
else:
logger.error(f"Meta数据结构异常无法找到channelId。数据结构: {meta_data.keys()}")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = "无法从meta请求中提取channelId"
return resultDomain
except Exception as e:
logger.exception(f"解析meta请求失败: {str(e)}")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = f"解析meta请求失败: {str(e)}"
return resultDomain
# 使用channelId查找文章数据请求
data_request = None
for json_item in json_request:
if meta_id in json_item.url:
data_request = json_item
break
if data_request is None:
logger.error("未找到目标JSON请求")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = "未找到目标JSON请求"
return resultDomain
# 解析文章数据请求响应可能是gzip压缩的
try:
response_body = data_request.response.body
for child in children:
try:
# 获取元素的class属性
class_name = child.get_attribute("class") or ""
# 图片元素
if "article-img" in class_name:
try:
img = child.find_element(By.TAG_NAME, "img")
img_src = img.get_attribute("src")
if img_src:
# 规范化URL
img_src = self._normalize_url(img_src)
# 添加图片标签
news_item.contentRows.append({
"type": "img",
"content": f'<img src="{img_src}" />'
})
logger.debug(f"提取图片: {img_src}")
continue
except Exception as e:
logger.warning(f"提取图片失败: {str(e)}")
# 视频元素
if "article-video" in class_name:
try:
video = child.find_element(By.TAG_NAME, "video")
video_src = video.get_attribute("src")
if video_src:
# 规范化URL
video_src = self._normalize_url(video_src)
# 添加视频标签
news_item.contentRows.append({
"type": "video",
"content": f'<video src="{video_src}" controls></video>'
})
logger.debug(f"提取视频: {video_src}")
continue
except Exception as e:
logger.warning(f"提取视频失败: {str(e)}")
# 文字元素(作为最后的兜底)
text_content = child.text.strip()
# 过滤空内容
if text_content:
news_item.contentRows.append({
"type": "text",
"content": text_content
})
logger.debug(f"提取文字: {text_content[:50]}...")
except Exception as e:
logger.warning(f"处理内容元素失败: {str(e)}")
continue
# 检查是否是gzip压缩
if response_body[:2] == b'\x1f\x8b': # gzip magic number
import gzip
response_body = gzip.decompress(response_body)
logger.info("检测到gzip压缩已解压")
# 解码为字符串
if isinstance(response_body, bytes):
response_body = response_body.decode('utf-8')
article_data = json.loads(response_body)
logger.info(f"成功解析文章数据,共 {len(article_data)}")
except Exception as e:
logger.exception(f"解析文章数据失败: {str(e)}")
resultDomain.code = 1
resultDomain.success = False
resultDomain.message = f"解析文章数据失败: {str(e)}"
return resultDomain
get_content_rows()
for article in article_data:
# 判断是昨天的新闻 "publishTime": "2025-11-21 10:04:20",
publish_date = article['publishTime'].split(" ")[0]
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
day_before_yesterday = (datetime.now() - timedelta(days=2)).strftime('%Y-%m-%d')
if publish_date == yesterday:
news_item = self.parse_news_detail(article['url'])
news_item.title = article['title']
news_item.publishTime = article['publishTime']
news_item.source = article['source'].split("_")[1]
news_item.url = article['url']
news_list.append(news_item)
logger.info(f"添加昨日新闻: {news_item.title}")
elif publish_date == day_before_yesterday:
# 遇到前天的新闻就停止,因为数据是按时间倒序排列的
logger.info("已到达前天的新闻,停止遍历")
break
if is_page():
pass
logger.info(f"解析文章详情完成: {news_item.model_dump()}")
return news_item
resultDomain.dataList = news_list
# with open("Xxqg_important_news_list.json", "w", encoding="utf-8") as f:
# json.dump([item.model_dump() for item in resultDomain.dataList] if resultDomain.dataList else [], f, ensure_ascii=False, indent=4)
return resultDomain
def home(self, type="") -> ResultDomain:
"""获取首页数据"""
count = 0
url_base_map = {}
url_list = []
news_list = []
resultDomain = ResultDomain(code=0, message="", success=True, dataList=news_list)
home_config = self.config.urls.get("home")
self.driver.get(home_config.url)
try:
if self.driver is None:
resultDomain.message="driver未初始化"
return resultDomain
home_div = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'div.page-main > div.grid-cell > div.grid-cell'))
)
except Exception as e:
resultDomain.message=f"获取首页失败: {str(e)}"
return resultDomain
section_divs = home_div.find_elements(By.CSS_SELECTOR, 'section')
return resultDomain