# -*- coding: utf-8 -*- r""" 定时爬取入口 —— 每天自动采集前一天的数据 使用方式: 1. 直接运行(单次采集昨天数据): python scheduler.py 2. Windows 计划任务(每天早上 8:00 自动运行): schtasks /create /tn "ZTB_Spider" /tr "python <项目路径>\scheduler.py" /sc daily /st 08:00 3. Linux cron(每天早上 8:00): 0 8 * * * cd /path/to/ztb && python scheduler.py >> logs/cron.log 2>&1 """ import glob import logging import sys import os import traceback from datetime import datetime, timedelta # 确保项目根目录在 sys.path 中 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from config import ZHEJIANG_CONFIG, TAIZHOU_CONFIG, SPIDER_CONFIG, DATA_DIR from spiders import ZhejiangSpider, TaizhouSpider from spiders.base import setup_logging logger = logging.getLogger("ztb") # ============ 爬取任务配置 ============ # 在这里定义每天要跑哪些任务 DAILY_TASKS = [ # 浙江省 - 工程建设 - 招标文件公示 { "site": "zhejiang", "max_pages": 100, "category": "工程建设", "notice_type": "招标文件公示", "process": True, "upload": True, }, # 浙江省 - 工程建设 - 招标公告 { "site": "zhejiang", "max_pages": 100, "category": "工程建设", "notice_type": "招标公告", "process": True, "upload": True, }, # 浙江省 - 工程建设 - 澄清修改 { "site": "zhejiang", "max_pages": 100, "category": "工程建设", "notice_type": "澄清修改", "process": True, "upload": True, }, # 台州 - 工程建设 - 招标计划公示 { "site": "taizhou", "max_pages": 100, "category": "工程建设", "notice_type": "招标计划公示", "process": True, "upload": True, }, ] # 数据文件保留天数 KEEP_DAYS = 30 def cleanup_old_files(directory: str, keep_days: int = KEEP_DAYS): """清理超过 keep_days 天的 CSV 和 JSON 文件""" if not os.path.isdir(directory): return cutoff = datetime.now() - timedelta(days=keep_days) removed = 0 for pattern in ("*.csv", "*.json"): for filepath in glob.glob(os.path.join(directory, pattern)): if os.path.getmtime(filepath) < cutoff.timestamp(): try: os.remove(filepath) removed += 1 except OSError: pass if removed: logger.info(f"清理 {directory} 中 {removed} 个超过 {keep_days} 天的文件") def run_task(task: dict, date_filter: str = "yesterday") -> int: """执行单个爬取任务,返回采集条数""" site = task["site"] max_pages = task.get("max_pages", 10) category = task.get("category") notice_type = task.get("notice_type") if site == "zhejiang": config = ZHEJIANG_CONFIG spider = ZhejiangSpider(config, SPIDER_CONFIG, DATA_DIR) elif site == "taizhou": config = TAIZHOU_CONFIG spider = TaizhouSpider(config, SPIDER_CONFIG, DATA_DIR) else: logger.error(f"未知站点: {site}") return 0 spider.crawl( max_pages=max_pages, category=category, notice_type=notice_type, date_filter=date_filter, ) spider.save_to_csv() # AI 处理 + 简道云上传 if task.get("process") and spider.results and notice_type: from processors import ProcessingPipeline pipeline = ProcessingPipeline() pipeline.process_results( spider.results, site=site, notice_type=notice_type, upload=task.get("upload", False), ) return len(spider.results) def run_daily(): """执行每日定时任务""" setup_logging() start = datetime.now() logger.info("=" * 40) logger.info(f"定时任务启动: {start.strftime('%Y-%m-%d %H:%M:%S')}") logger.info(f"共 {len(DAILY_TASKS)} 个任务") logger.info("=" * 40) total = 0 errors = [] for i, task in enumerate(DAILY_TASKS, 1): desc = f"{task['site']} / {task.get('category', '全部')}" if task.get("notice_type"): desc += f" / {task['notice_type']}" logger.info(f"[{i}/{len(DAILY_TASKS)}] {desc}") try: count = run_task(task) total += count logger.info(f"[{i}/{len(DAILY_TASKS)}] 完成,{count} 条") except Exception as e: logger.error(f"[{i}/{len(DAILY_TASKS)}] 失败: {e}") logger.debug(traceback.format_exc()) errors.append(desc) # 清理过期数据文件 cleanup_old_files(DATA_DIR) elapsed = (datetime.now() - start).total_seconds() logger.info("=" * 40) logger.info(f"定时任务完成: 共 {total} 条, 耗时 {elapsed:.0f}s") if errors: logger.error(f"失败任务: {', '.join(errors)}") logger.info("=" * 40) return total, errors if __name__ == "__main__": run_daily()