Files
ztb/scheduler.py
2026-02-25 18:17:00 +08:00

178 lines
5.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
r"""
定时爬取入口 —— 每天自动采集前一天的数据
使用方式:
1. 直接运行(单次采集昨天数据):
python scheduler.py
2. Windows 计划任务(每天早上 8:00 自动运行):
schtasks /create /tn "ZTB_Spider" /tr "python <项目路径>\scheduler.py" /sc daily /st 08:00
3. Linux cron每天早上 8:00:
0 8 * * * cd /path/to/ztb && python scheduler.py >> logs/cron.log 2>&1
"""
import glob
import logging
import sys
import os
import traceback
from datetime import datetime, timedelta
# 确保项目根目录在 sys.path 中
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from config import ZHEJIANG_CONFIG, TAIZHOU_CONFIG, SPIDER_CONFIG, DATA_DIR
from spiders import ZhejiangSpider, TaizhouSpider
from spiders.base import setup_logging
logger = logging.getLogger("ztb")
# ============ 爬取任务配置 ============
# 在这里定义每天要跑哪些任务
DAILY_TASKS = [
# 浙江省 - 工程建设 - 招标文件公示
{
"site": "zhejiang",
"max_pages": 100,
"category": "工程建设",
"notice_type": "招标文件公示",
"process": True,
"upload": True,
},
# 浙江省 - 工程建设 - 招标公告
{
"site": "zhejiang",
"max_pages": 100,
"category": "工程建设",
"notice_type": "招标公告",
"process": True,
"upload": True,
},
# 浙江省 - 工程建设 - 澄清修改
{
"site": "zhejiang",
"max_pages": 100,
"category": "工程建设",
"notice_type": "澄清修改",
"process": True,
"upload": True,
},
# 台州 - 工程建设 - 招标计划公示
{
"site": "taizhou",
"max_pages": 100,
"category": "工程建设",
"notice_type": "招标计划公示",
"process": True,
"upload": True,
},
]
# 数据文件保留天数
KEEP_DAYS = 30
def cleanup_old_files(directory: str, keep_days: int = KEEP_DAYS):
"""清理超过 keep_days 天的 CSV 和 JSON 文件"""
if not os.path.isdir(directory):
return
cutoff = datetime.now() - timedelta(days=keep_days)
removed = 0
for pattern in ("*.csv", "*.json"):
for filepath in glob.glob(os.path.join(directory, pattern)):
if os.path.getmtime(filepath) < cutoff.timestamp():
try:
os.remove(filepath)
removed += 1
except OSError:
pass
if removed:
logger.info(f"清理 {directory}{removed} 个超过 {keep_days} 天的文件")
def run_task(task: dict, date_filter: str = "yesterday") -> int:
"""执行单个爬取任务,返回采集条数"""
site = task["site"]
max_pages = task.get("max_pages", 10)
category = task.get("category")
notice_type = task.get("notice_type")
if site == "zhejiang":
config = ZHEJIANG_CONFIG
spider = ZhejiangSpider(config, SPIDER_CONFIG, DATA_DIR)
elif site == "taizhou":
config = TAIZHOU_CONFIG
spider = TaizhouSpider(config, SPIDER_CONFIG, DATA_DIR)
else:
logger.error(f"未知站点: {site}")
return 0
spider.crawl(
max_pages=max_pages,
category=category,
notice_type=notice_type,
date_filter=date_filter,
)
spider.save_to_csv()
# AI 处理 + 简道云上传
if task.get("process") and spider.results and notice_type:
from processors import ProcessingPipeline
pipeline = ProcessingPipeline()
pipeline.process_results(
spider.results,
site=site,
notice_type=notice_type,
upload=task.get("upload", False),
)
return len(spider.results)
def run_daily():
"""执行每日定时任务"""
setup_logging()
start = datetime.now()
logger.info("=" * 40)
logger.info(f"定时任务启动: {start.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"{len(DAILY_TASKS)} 个任务")
logger.info("=" * 40)
total = 0
errors = []
for i, task in enumerate(DAILY_TASKS, 1):
desc = f"{task['site']} / {task.get('category', '全部')}"
if task.get("notice_type"):
desc += f" / {task['notice_type']}"
logger.info(f"[{i}/{len(DAILY_TASKS)}] {desc}")
try:
count = run_task(task)
total += count
logger.info(f"[{i}/{len(DAILY_TASKS)}] 完成,{count}")
except Exception as e:
logger.error(f"[{i}/{len(DAILY_TASKS)}] 失败: {e}")
logger.debug(traceback.format_exc())
errors.append(desc)
# 清理过期数据文件
cleanup_old_files(DATA_DIR)
elapsed = (datetime.now() - start).total_seconds()
logger.info("=" * 40)
logger.info(f"定时任务完成: 共 {total} 条, 耗时 {elapsed:.0f}s")
if errors:
logger.error(f"失败任务: {', '.join(errors)}")
logger.info("=" * 40)
return total, errors
if __name__ == "__main__":
run_daily()