178 lines
5.0 KiB
Python
178 lines
5.0 KiB
Python
# -*- coding: utf-8 -*-
|
||
r"""
|
||
定时爬取入口 —— 每天自动采集前一天的数据
|
||
|
||
使用方式:
|
||
1. 直接运行(单次采集昨天数据):
|
||
python scheduler.py
|
||
|
||
2. Windows 计划任务(每天早上 8:00 自动运行):
|
||
schtasks /create /tn "ZTB_Spider" /tr "python <项目路径>\scheduler.py" /sc daily /st 08:00
|
||
|
||
3. Linux cron(每天早上 8:00):
|
||
0 8 * * * cd /path/to/ztb && python scheduler.py >> logs/cron.log 2>&1
|
||
"""
|
||
import glob
|
||
import logging
|
||
import sys
|
||
import os
|
||
import traceback
|
||
from datetime import datetime, timedelta
|
||
|
||
# 确保项目根目录在 sys.path 中
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
from config import ZHEJIANG_CONFIG, TAIZHOU_CONFIG, SPIDER_CONFIG, DATA_DIR
|
||
from spiders import ZhejiangSpider, TaizhouSpider
|
||
from spiders.base import setup_logging
|
||
|
||
logger = logging.getLogger("ztb")
|
||
|
||
|
||
# ============ 爬取任务配置 ============
|
||
# 在这里定义每天要跑哪些任务
|
||
|
||
DAILY_TASKS = [
|
||
# 浙江省 - 工程建设 - 招标文件公示
|
||
{
|
||
"site": "zhejiang",
|
||
"max_pages": 100,
|
||
"category": "工程建设",
|
||
"notice_type": "招标文件公示",
|
||
"process": True,
|
||
"upload": True,
|
||
},
|
||
# 浙江省 - 工程建设 - 招标公告
|
||
{
|
||
"site": "zhejiang",
|
||
"max_pages": 100,
|
||
"category": "工程建设",
|
||
"notice_type": "招标公告",
|
||
"process": True,
|
||
"upload": True,
|
||
},
|
||
# 浙江省 - 工程建设 - 澄清修改
|
||
{
|
||
"site": "zhejiang",
|
||
"max_pages": 100,
|
||
"category": "工程建设",
|
||
"notice_type": "澄清修改",
|
||
"process": True,
|
||
"upload": True,
|
||
},
|
||
# 台州 - 工程建设 - 招标计划公示
|
||
{
|
||
"site": "taizhou",
|
||
"max_pages": 100,
|
||
"category": "工程建设",
|
||
"notice_type": "招标计划公示",
|
||
"process": True,
|
||
"upload": True,
|
||
},
|
||
]
|
||
|
||
|
||
# 数据文件保留天数
|
||
KEEP_DAYS = 30
|
||
|
||
|
||
def cleanup_old_files(directory: str, keep_days: int = KEEP_DAYS):
|
||
"""清理超过 keep_days 天的 CSV 和 JSON 文件"""
|
||
if not os.path.isdir(directory):
|
||
return
|
||
cutoff = datetime.now() - timedelta(days=keep_days)
|
||
removed = 0
|
||
for pattern in ("*.csv", "*.json"):
|
||
for filepath in glob.glob(os.path.join(directory, pattern)):
|
||
if os.path.getmtime(filepath) < cutoff.timestamp():
|
||
try:
|
||
os.remove(filepath)
|
||
removed += 1
|
||
except OSError:
|
||
pass
|
||
if removed:
|
||
logger.info(f"清理 {directory} 中 {removed} 个超过 {keep_days} 天的文件")
|
||
|
||
|
||
def run_task(task: dict, date_filter: str = "yesterday") -> int:
|
||
"""执行单个爬取任务,返回采集条数"""
|
||
site = task["site"]
|
||
max_pages = task.get("max_pages", 10)
|
||
category = task.get("category")
|
||
notice_type = task.get("notice_type")
|
||
|
||
if site == "zhejiang":
|
||
config = ZHEJIANG_CONFIG
|
||
spider = ZhejiangSpider(config, SPIDER_CONFIG, DATA_DIR)
|
||
elif site == "taizhou":
|
||
config = TAIZHOU_CONFIG
|
||
spider = TaizhouSpider(config, SPIDER_CONFIG, DATA_DIR)
|
||
else:
|
||
logger.error(f"未知站点: {site}")
|
||
return 0
|
||
|
||
spider.crawl(
|
||
max_pages=max_pages,
|
||
category=category,
|
||
notice_type=notice_type,
|
||
date_filter=date_filter,
|
||
)
|
||
spider.save_to_csv()
|
||
|
||
# AI 处理 + 简道云上传
|
||
if task.get("process") and spider.results and notice_type:
|
||
from processors import ProcessingPipeline
|
||
pipeline = ProcessingPipeline()
|
||
pipeline.process_results(
|
||
spider.results,
|
||
site=site,
|
||
notice_type=notice_type,
|
||
upload=task.get("upload", False),
|
||
)
|
||
|
||
return len(spider.results)
|
||
|
||
|
||
def run_daily():
|
||
"""执行每日定时任务"""
|
||
setup_logging()
|
||
start = datetime.now()
|
||
logger.info("=" * 40)
|
||
logger.info(f"定时任务启动: {start.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
logger.info(f"共 {len(DAILY_TASKS)} 个任务")
|
||
logger.info("=" * 40)
|
||
|
||
total = 0
|
||
errors = []
|
||
|
||
for i, task in enumerate(DAILY_TASKS, 1):
|
||
desc = f"{task['site']} / {task.get('category', '全部')}"
|
||
if task.get("notice_type"):
|
||
desc += f" / {task['notice_type']}"
|
||
|
||
logger.info(f"[{i}/{len(DAILY_TASKS)}] {desc}")
|
||
try:
|
||
count = run_task(task)
|
||
total += count
|
||
logger.info(f"[{i}/{len(DAILY_TASKS)}] 完成,{count} 条")
|
||
except Exception as e:
|
||
logger.error(f"[{i}/{len(DAILY_TASKS)}] 失败: {e}")
|
||
logger.debug(traceback.format_exc())
|
||
errors.append(desc)
|
||
|
||
# 清理过期数据文件
|
||
cleanup_old_files(DATA_DIR)
|
||
|
||
elapsed = (datetime.now() - start).total_seconds()
|
||
logger.info("=" * 40)
|
||
logger.info(f"定时任务完成: 共 {total} 条, 耗时 {elapsed:.0f}s")
|
||
if errors:
|
||
logger.error(f"失败任务: {', '.join(errors)}")
|
||
logger.info("=" * 40)
|
||
|
||
return total, errors
|
||
|
||
|
||
if __name__ == "__main__":
|
||
run_daily()
|