Files
ztb/scheduler.py

178 lines
5.0 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
r"""
定时爬取入口 每天自动采集前一天的数据
使用方式:
1. 直接运行单次采集昨天数据:
python scheduler.py
2. Windows 计划任务每天早上 8:00 自动运行:
schtasks /create /tn "ZTB_Spider" /tr "python <项目路径>\scheduler.py" /sc daily /st 08:00
3. Linux cron每天早上 8:00:
0 8 * * * cd /path/to/ztb && python scheduler.py >> logs/cron.log 2>&1
"""
import glob
import logging
import sys
import os
import traceback
from datetime import datetime, timedelta
# 确保项目根目录在 sys.path 中
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from config import ZHEJIANG_CONFIG, TAIZHOU_CONFIG, SPIDER_CONFIG, DATA_DIR
from spiders import ZhejiangSpider, TaizhouSpider
from spiders.base import setup_logging
logger = logging.getLogger("ztb")
# ============ 爬取任务配置 ============
# 在这里定义每天要跑哪些任务
DAILY_TASKS = [
# 浙江省 - 工程建设 - 招标文件公示
{
"site": "zhejiang",
"max_pages": 100,
"category": "工程建设",
"notice_type": "招标文件公示",
"process": True,
"upload": True,
},
# 浙江省 - 工程建设 - 招标公告
{
"site": "zhejiang",
"max_pages": 100,
"category": "工程建设",
"notice_type": "招标公告",
"process": True,
"upload": True,
},
# 浙江省 - 工程建设 - 澄清修改
{
"site": "zhejiang",
"max_pages": 100,
"category": "工程建设",
"notice_type": "澄清修改",
"process": True,
"upload": True,
},
# 台州 - 工程建设 - 招标计划公示
{
"site": "taizhou",
"max_pages": 100,
"category": "工程建设",
"notice_type": "招标计划公示",
"process": True,
"upload": True,
},
]
# 数据文件保留天数
KEEP_DAYS = 30
def cleanup_old_files(directory: str, keep_days: int = KEEP_DAYS):
"""清理超过 keep_days 天的 CSV 和 JSON 文件"""
if not os.path.isdir(directory):
return
cutoff = datetime.now() - timedelta(days=keep_days)
removed = 0
for pattern in ("*.csv", "*.json"):
for filepath in glob.glob(os.path.join(directory, pattern)):
if os.path.getmtime(filepath) < cutoff.timestamp():
try:
os.remove(filepath)
removed += 1
except OSError:
pass
if removed:
logger.info(f"清理 {directory}{removed} 个超过 {keep_days} 天的文件")
def run_task(task: dict, date_filter: str = "yesterday") -> int:
"""执行单个爬取任务,返回采集条数"""
site = task["site"]
max_pages = task.get("max_pages", 10)
category = task.get("category")
notice_type = task.get("notice_type")
if site == "zhejiang":
config = ZHEJIANG_CONFIG
spider = ZhejiangSpider(config, SPIDER_CONFIG, DATA_DIR)
elif site == "taizhou":
config = TAIZHOU_CONFIG
spider = TaizhouSpider(config, SPIDER_CONFIG, DATA_DIR)
else:
logger.error(f"未知站点: {site}")
return 0
spider.crawl(
max_pages=max_pages,
category=category,
notice_type=notice_type,
date_filter=date_filter,
)
spider.save_to_csv()
# AI 处理 + 简道云上传
if task.get("process") and spider.results and notice_type:
from processors import ProcessingPipeline
pipeline = ProcessingPipeline()
pipeline.process_results(
spider.results,
site=site,
notice_type=notice_type,
upload=task.get("upload", False),
)
return len(spider.results)
def run_daily():
"""执行每日定时任务"""
setup_logging()
start = datetime.now()
logger.info("=" * 40)
logger.info(f"定时任务启动: {start.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"{len(DAILY_TASKS)} 个任务")
logger.info("=" * 40)
total = 0
errors = []
for i, task in enumerate(DAILY_TASKS, 1):
desc = f"{task['site']} / {task.get('category', '全部')}"
if task.get("notice_type"):
desc += f" / {task['notice_type']}"
logger.info(f"[{i}/{len(DAILY_TASKS)}] {desc}")
try:
count = run_task(task)
total += count
logger.info(f"[{i}/{len(DAILY_TASKS)}] 完成,{count}")
except Exception as e:
logger.error(f"[{i}/{len(DAILY_TASKS)}] 失败: {e}")
logger.debug(traceback.format_exc())
errors.append(desc)
# 清理过期数据文件
cleanup_old_files(DATA_DIR)
elapsed = (datetime.now() - start).total_seconds()
logger.info("=" * 40)
logger.info(f"定时任务完成: 共 {total} 条, 耗时 {elapsed:.0f}s")
if errors:
logger.error(f"失败任务: {', '.join(errors)}")
logger.info("=" * 40)
return total, errors
if __name__ == "__main__":
run_daily()