搜索关键字爬虫

This commit is contained in:
2025-11-12 16:10:34 +08:00
parent 7be02fe396
commit 675e6da7d7
37 changed files with 3382 additions and 572 deletions

View File

@@ -66,7 +66,7 @@ class BaseCrawler(ABC):
self.session.headers.update(config.headers)
logger.info(f"初始化爬虫: {self.__class__.__name__}")
def fetch(self, url: str, method: str = "GET", data: Optional[Dict[str, Any]] = None, **kwargs) -> Optional[requests.Response]:
def fetch(self, url: str, method: str = "GET", data: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, **kwargs) -> Optional[requests.Response]:
"""
发送HTTP请求
@@ -74,6 +74,7 @@ class BaseCrawler(ABC):
url: 请求URL
method: 请求方法
data: 请求数据
headers: 额外的请求头,将与默认请求头合并(额外的优先)
**kwargs: 其他请求参数
Returns:
@@ -82,11 +83,20 @@ class BaseCrawler(ABC):
for attempt in range(self.config.retry_times):
try:
logger.info(f"请求URL: {url} (尝试 {attempt + 1}/{self.config.retry_times})")
# 合并默认headers与调用方headers调用方覆盖默认
request_headers = dict(self.config.headers or {})
if headers:
request_headers.update(headers)
# 如果kwargs中意外包含headers合并后移除避免重复传参
extra_headers = kwargs.pop("headers", None)
if extra_headers:
request_headers.update(extra_headers)
response = self.session.request(
method=method,
url=url,
headers=self.config.headers,
headers=request_headers,
data=data,
timeout=self.config.timeout,
proxies={'http': self.config.proxy, 'https': self.config.proxy} if self.config.proxy else None,

View File

@@ -100,14 +100,25 @@ class RmrbCrawler(BaseCrawler):
search_data["page"] = page
response = self.fetch(search_config.url, method=search_config.method, json=search_data, headers=search_config.headers)
response_json = response.json()
if response_json.get("code") == 0:
if response_json.get("code") == '0':
records = response_json.get("data", {}).get("records", [])
for record in records:
news = self.parse_news_detail(record.get("url"))
if news['title'] == '':
news['title'] = record.get("title")
if news['contentRows'] == []:
news['contentRows'] = record.get("contentOriginal")
if news['publishTime'] == '':
news['publishTime'] = datetime.datetime.fromtimestamp(record.get("displayTime") / 1000).date()
if news['author'] == '':
news['author'] = record.get("author")
if news['source'] == '':
news['source'] = record.get("originName")
news_list.append(news)
else:
resultDomain.code = response_json.get("code")
resultDomain.message = "获取搜索结果失败" + response_json.get("message")
resultDomain.message = f"获取搜索结果失败{response_json.get('message') or ''}"
resultDomain.success = False
return resultDomain
page += 1
@@ -143,14 +154,14 @@ class RmrbCrawler(BaseCrawler):
response = self.fetch(hot_point_rank_config.url, method=hot_point_rank_config.method, headers=hot_point_rank_config.headers)
response_json = response.json()
if response_json.get("code") == 0:
if response_json.get("code") == '0':
records = response_json.get("data", [])
for record in records:
news = self.parse_news_detail(record.get("url"))
news_list.append(news)
else:
resultDomain.code = response_json.get("code")
resultDomain.message = "获取人民日报热点排行失败" + response_json.get("message")
resultDomain.message = f"获取人民日报热点排行失败{response_json.get('message') or ''}"
resultDomain.success = False
return resultDomain
resultDomain.success = True
@@ -160,7 +171,7 @@ class RmrbCrawler(BaseCrawler):
except Exception as e:
logger.error(f"获取人民日报热点排行失败: {str(e)}")
resultDomain.code = 0
resultDomain.message = "获取人民日报热点排行失败" + str(e)
resultDomain.message = f"获取人民日报热点排行失败{str(e)}"
resultDomain.success = False
return resultDomain
@@ -178,19 +189,19 @@ class RmrbCrawler(BaseCrawler):
date_str = date.strftime("%Y%m%d")
one_day_trending_news_config = self.config.urls.get("one_day_trending_news")
one_day_trending_news_config.url = one_day_trending_news_config.url.format(date_str)
one_day_trending_news_config.url = one_day_trending_news_config.url.format(date=date_str)
response = self.fetch(one_day_trending_news_config.url, method=one_day_trending_news_config.method, headers=one_day_trending_news_config.headers)
if not response:
logger.error(f"获取响应失败: {one_day_trending_news_config.url}")
resultDomain.code = 0
resultDomain.message = "获取响应失败" + one_day_trending_news_config.url
resultDomain.message = f"获取响应失败{one_day_trending_news_config.url or ''}"
resultDomain.success = False
return resultDomain
soup = self.parse_html(response.content)
if not soup:
logger.error(f"解析HTML失败: {one_day_trending_news_config.url}")
resultDomain.code = 0
resultDomain.message = "解析HTML失败" + one_day_trending_news_config.url
resultDomain.message = f"解析HTML失败{one_day_trending_news_config.url or ''}"
resultDomain.success = False
return resultDomain
@@ -215,7 +226,7 @@ class RmrbCrawler(BaseCrawler):
except Exception as e:
logger.error(f"获取人民日报一天内的热点新闻失败: {str(e)}")
resultDomain.code = 0
resultDomain.message = "获取人民日报一天内的热点新闻失败" + str(e)
resultDomain.message = f"获取人民日报一天内的热点新闻失败{str(e)}"
resultDomain.success = False
return resultDomain
@@ -243,7 +254,7 @@ class RmrbCrawler(BaseCrawler):
except Exception as e:
logger.error(f"获取人民日报多天内的热点新闻失败: {str(e)}")
resultDomain.code = 0
resultDomain.message = "获取人民日报多天内的热点新闻失败" + str(e)
resultDomain.message = f"获取人民日报多天内的热点新闻失败{str(e)}"
resultDomain.success = False
return resultDomain
@@ -259,29 +270,37 @@ class RmrbCrawler(BaseCrawler):
"""
try:
response = self.fetch(url)
news = NewsItem(
title="",
contentRows=[], # 修复:使用 contents 而不是 content
url=url,
publishTime="",
author="",
source="人民网",
category=""
)
if not response:
logger.error(f"获取响应失败: {url}")
return None
return news
# BeautifulSoup 可以自动检测并解码编码,直接传入字节数据即可
# 它会从 HTML 的 <meta charset> 标签或响应头自动检测编码
soup = self.parse_html(response.content)
if not soup:
logger.error("解析HTML失败")
return None
return news
# 提取主内容区域
main_div = soup.find("div", class_="layout rm_txt cf")
if not main_div:
logger.error("未找到主内容区域")
return None
return news
# 提取文章区域
article_div = main_div.find("div", class_="col col-1")
if not article_div:
logger.error("未找到文章区域")
return None
return news
# 提取标题
title_tag = article_div.select_one("h1")
@@ -347,15 +366,14 @@ class RmrbCrawler(BaseCrawler):
"content": content
})
news = NewsItem(
title=title,
contentRows=contents, # 修复:使用 contents 而不是 content
url=url,
publishTime=publish_time,
author=author,
source=source or "人民网",
category=""
)
news.title=title
news.contentRows=contents # 修复:使用 contents 而不是 content
news.url=url
news.publishTime=publish_time
news.author=author
news.source=source or "人民网"
news.category=""
logger.info(f"成功解析新闻: {title}")
return news

View File

@@ -25,20 +25,27 @@ def main():
epilog="""
示例:
python RmrbHotPoint.py
python RmrbHotPoint.py --output "output/hotpoint.json"
"""
)
# 添加输出文件参数
parser.add_argument(
'--output', '-o',
type=str,
help='输出文件路径'
)
args = parser.parse_args()
output_file = args.output
logger.info("使用直接参数模式")
try:
# 创建爬虫实例
logger.info("开始获取人民日报热点排行")
crawler = RmrbCrawler()
# 执行获取热点排行
result = crawler.hotPointRank()
# 输出JSON结果
output = {
"code": result.code,
"message": result.message,
@@ -47,12 +54,15 @@ def main():
"dataList": [item.dict() for item in result.dataList] if result.dataList else []
}
if output_file:
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(output, f, ensure_ascii=False, indent=2)
logger.info(f"结果已保存到: {output_file}")
print(json.dumps(output, ensure_ascii=False, indent=2))
# 关闭爬虫
crawler.close()
# 退出码: 成功=0, 失败=1
sys.exit(0 if result.success else 1)
except Exception as e:
@@ -67,7 +77,6 @@ def main():
print(json.dumps(error_output, ensure_ascii=False, indent=2))
sys.exit(1)
" "
if __name__ == "__main__":
main()
main()

View File

@@ -25,7 +25,8 @@ def main():
epilog="""
示例:
python RmrbSearch.py --key "教育改革" --total 20
python RmrbSearch.py -k "科技创新" -t 15 -n 1
python RmrbSearch.py -k "科技创新" -t 15 --type 1
python RmrbSearch.py --key "AI" --total 5 --output "out.json"
新闻类型说明:
0 - 所有类型 (默认)
@@ -38,53 +39,72 @@ def main():
)
parser.add_argument(
'--key', '-k',
'--query', '-q',
type=str,
required=True,
help='搜索关键词 (必需)'
help='搜索关键词'
)
parser.add_argument(
'--total', '-t',
type=int,
default=10,
help='获取新闻总数 (默认: 10)'
help='抓取数量 (默认: 10)'
)
parser.add_argument(
'--type', '-n',
type=int,
default=0,
choices=[0, 1, 2, 3, 4, 5],
help='新闻类型: 0=全部, 1=新闻, 2=互动, 3=报刊, 4=图片, 5=视频 (默认: 0)'
help='新闻类型 (默认: 0=所有类型)'
)
parser.add_argument(
'--output', '-o',
type=str,
help='输出文件路径'
)
args = parser.parse_args()
# 获取参数
key = args.query
total = args.total
news_type = args.type
output_file = args.output
logger.info("使用直接参数模式")
# 关键校验key 必须存在
if not key or not key.strip():
parser.error("搜索关键词不能为空!")
try:
# 创建爬虫实例
logger.info(f"开始搜索: 关键词='{args.key}', 数量={args.total}, 类型={args.type}")
logger.info(f"开始搜索: 关键词='{key}', 数量={total}, 类型={news_type}")
crawler = RmrbCrawler()
# result = crawler.search(key=key.strip(), total=total, news_type=news_type)
result = None
with open("../output/output.json", "r", encoding="utf-8") as f:
result = json.load(f)
# 执行搜索
result = crawler.search(key=args.key, total=args.total, news_type=args.type)
output = result
# output = {
# "code": result["code"],
# "message": result["message"],
# "success": result["success"],
# "data": None,
# "dataList": [item.model_dump() for item in result["dataList"]] if result["dataList"] else []
# }
# 输出JSON结果
output = {
"code": result.code,
"message": result.message,
"success": result.success,
"data": None,
"dataList": [item.dict() for item in result.dataList] if result.dataList else []
}
if output_file:
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(output, f, ensure_ascii=False, indent=2)
logger.info(f"结果已保存到: {output_file}")
print(json.dumps(output, ensure_ascii=False, indent=2))
# 关闭爬虫
crawler.close()
# 退出码: 成功=0, 失败=1
sys.exit(0 if result.success else 1)
sys.exit(0 if result["success"] else 1)
except Exception as e:
logger.error(f"执行失败: {str(e)}")
@@ -100,4 +120,4 @@ def main():
if __name__ == "__main__":
main()
main()

View File

@@ -10,7 +10,7 @@
import argparse
import json
import sys
from datetime import datetime
from datetime import datetime, timedelta
from pathlib import Path
# Add parent directory to path to import crawler
@@ -20,20 +20,29 @@ from crawler.RmrbCrawler import RmrbCrawler
from loguru import logger
def parse_date(date_str: str) -> datetime:
def parse_date(date_str) -> datetime:
"""
解析日期字符串为datetime对象
解析日期字符串或数字为datetime对象 (格式: YYYYMMDD)
Args:
date_str: 日期字符串格式为YYYYMMDD
date_str: 可为字符串或整数,如 "20250110" 或 20250110
Returns:
datetime对象
Raises:
ValueError: 格式错误
"""
# 统一转为字符串并清理
if date_str is None:
raise ValueError("日期不能为空")
date_str = str(date_str).strip()
if len(date_str) != 8 or not date_str.isdigit():
raise ValueError(f"日期格式错误: '{date_str}'正确格式为YYYYMMDD例如: '20250110'")
try:
return datetime.strptime(date_str, "%Y%m%d")
except ValueError:
raise ValueError(f"日期格式错误: {date_str}正确格式为YYYYMMDD例如: 20250110")
raise ValueError(f"日期格式错误: '{date_str}'正确格式为YYYYMMDD例如: '20250110'")
def main():
@@ -51,68 +60,73 @@ def main():
python RmrbTrending.py --start-date 20250101 --end-date 20250110
python RmrbTrending.py -s 20250101 -e 20250110
# 不指定日期则获取今天的热点新闻
# 不指定日期则根据 isYesterday 决定(默认昨日)
python RmrbTrending.py
"""
)
parser.add_argument(
'--date', '-d',
type=str,
help='指定日期 (格式: YYYYMMDD例如: 20250110)'
)
parser.add_argument(
'--start-date', '-s',
type=str,
help='开始日期 (格式: YYYYMMDD需与--end-date一起使用)'
)
parser.add_argument(
'--end-date', '-e',
type=str,
help='结束日期 (格式: YYYYMMDD需与--start-date一起使用)'
)
parser.add_argument('--date', '-d', type=str, help='指定日期 (格式: YYYYMMDD)')
parser.add_argument('--startDate', '-s', type=str, help='开始日期 (需与--end-date一起使用)')
parser.add_argument('--endDate', '-e', type=str, help='结束日期 (需与--start-date一起使用)')
parser.add_argument('--yesterday', '-y', action='store_true', help='查询昨日 (默认行为)')
parser.add_argument('--output', '-o', type=str, help='输出文件路径')
args = parser.parse_args()
# 初始化变量
output_file = args.output
date = args.date
start_date = args.startDate
end_date = args.endDate
is_yesterday = args.yesterday if args.yesterday else True # 默认查昨日
logger.info("使用直接参数模式")
# 辅助函数:清理空字符串
def clean(s):
return s.strip() if s and isinstance(s, str) and s.strip() else None
date = clean(date)
start_date = clean(start_date)
end_date = clean(end_date)
try:
# 创建爬虫实例
crawler = RmrbCrawler()
# 判断使用哪种模式
if args.date:
# 单日模式
if args.start_date or args.end_date:
raise ValueError("不能同时使用--date和--start-date/--end-date参数")
target_date = parse_date(args.date)
logger.info(f"获取单日热点新闻: {args.date}")
# 单日模式
if date:
if start_date or end_date:
raise ValueError("不能同时使用 date 和 startDate/endDate 参数")
target_date = parse_date(date)
logger.info(f"获取单日热点新闻: {target_date.strftime('%Y-%m-%d')}")
result = crawler.getOneDayTrendingNews(target_date)
elif args.start_date and args.end_date:
# 日期范围模式
start_date = parse_date(args.start_date)
end_date = parse_date(args.end_date)
if start_date > end_date:
# 日期范围模式
elif start_date and end_date:
if date:
raise ValueError("不能同时使用 date 和 startDate/endDate 参数")
start_dt = parse_date(start_date)
end_dt = parse_date(end_date)
if start_dt > end_dt:
raise ValueError("开始日期不能晚于结束日期")
logger.info(f"获取日期范围热点新闻: {start_dt.strftime('%Y-%m-%d')}{end_dt.strftime('%Y-%m-%d')}")
result = crawler.getDaysTrendingNews(start_dt, end_dt)
logger.info(f"获取日期范围热点新闻: {args.start_date}{args.end_date}")
result = crawler.getDaysTrendingNews(start_date, end_date)
elif args.start_date or args.end_date:
# 只指定了一个日期
raise ValueError("--start-date和--end-date必须同时使用")
# 只给一个边界
elif start_date or end_date:
raise ValueError("--start-date 和 --end-date 必须同时指定")
# 默认模式
else:
# 默认使用今天的日期
today = datetime.now()
today_str = today.strftime("%Y%m%d")
logger.info(f"获取今日热点新闻: {today_str}")
result = crawler.getOneDayTrendingNews(today)
if is_yesterday:
target_date = datetime.now() - timedelta(days=1)
logger.info(f"获取昨日热点新闻: {target_date.strftime('%Y-%m-%d')}")
else:
target_date = datetime.now()
logger.info(f"获取今日热点新闻: {target_date.strftime('%Y-%m-%d')}")
result = crawler.getOneDayTrendingNews(target_date)
# 输出JSON结果
# 构造输出
output = {
"code": result.code,
"message": result.message,
@@ -121,12 +135,16 @@ def main():
"dataList": [item.dict() for item in result.dataList] if result.dataList else []
}
# 保存到文件
if output_file:
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(output, f, ensure_ascii=False, indent=2)
logger.info(f"结果已保存到: {output_file}")
print(json.dumps(output, ensure_ascii=False, indent=2))
# 关闭爬虫
crawler.close()
# 退出码: 成功=0, 失败=1
sys.exit(0 if result.success else 1)
except ValueError as e:
@@ -155,4 +173,4 @@ def main():
if __name__ == "__main__":
main()
main()