# -*- coding: utf-8 -*- """ 处理管道 - 将爬虫结果经 DeepSeek AI 处理后上传简道云 """ import json import logging import os import time from datetime import datetime from config import REGION_CONFIGS, PROCESSING_CONFIG from .content_fetcher import ContentFetcher from .deepseek import DeepSeekProcessor from .jiandaoyun import JiandaoyunUploader logger = logging.getLogger("ztb") class ProcessingPipeline: """处理管道:爬虫结果 → 内容获取 → AI提取 → 上传简道云""" def __init__(self): temp_dir = PROCESSING_CONFIG.get("temp_dir", "temp_files") self.fetcher = ContentFetcher(temp_dir=temp_dir) self.deepseek = DeepSeekProcessor() self.uploader = JiandaoyunUploader() self.api_delay = PROCESSING_CONFIG.get("api_delay", 1) self.output_dir = PROCESSING_CONFIG.get("output_dir", "data") def process_results(self, results: list, site: str, notice_type: str, upload: bool = False) -> list: """ 处理爬虫结果:字段映射 → 内容获取 → AI提取 → 可选上传 Args: results: 爬虫返回的结果列表 site: 站点标识 ("zhejiang" / "taizhou") notice_type: 公告类型 (如 "招标文件公示") upload: 是否上传简道云 Returns: 处理后的记录列表 """ # 查找区域配置 region_key = f"{site}:{notice_type}" region_cfg = REGION_CONFIGS.get(region_key) if not region_cfg: logger.warning(f"未找到区域配置: {region_key},跳过AI处理") return results region_name = region_cfg["region_name"] link_field = region_cfg["link_field"] ai_fields = region_cfg["ai_fields"] logger.info(f"开始AI处理: {region_name}, {len(results)} 条记录") logger.info(f" 需要提取的字段: {ai_fields}") processed = [] success_count = 0 fail_count = 0 for i, item in enumerate(results): # 1. 字段映射:爬虫字段 → 处理字段 record = self._map_fields(item, link_field, notice_type) name = record.get("名称", "未知")[:35] logger.info(f" [{i+1}/{len(results)}] {name}") # 2. 获取全文内容 url = record.get(link_field, "") if not url: logger.warning(f" 无详情链接,跳过") processed.append(record) fail_count += 1 continue content = self.fetcher.get_full_content(url) if not content or len(content) < 200: logger.warning( f" 内容过少({len(content) if content else 0}字符),跳过") processed.append(record) fail_count += 1 continue logger.info(f" 获取到 {len(content)} 字符内容") # 3. DeepSeek 提取 extracted = self.deepseek.extract_fields( content, ai_fields, region_name) # 4. 提取发布时间(从content中) import re publish_time_match = re.search(r'发布时间:\s*(.*?)\n', content) if publish_time_match: extracted_publish_time = publish_time_match.group(1).strip() # 如果提取到了更详细的发布时间(包含时分秒),更新记录 if extracted_publish_time: record["发布时间"] = extracted_publish_time record["项目发布时间"] = extracted_publish_time # 同时更新项目发布时间,确保一致性 logger.info(f" ✓ 发布时间: {extracted_publish_time}") # 5. 合并结果(AI 优先,原有值保底) for field in ai_fields: # 保留原始的项目名称、项目批准文号和批准文号,不被AI覆盖 if field in ["项目名称", "项目批准文号", "批准文号"] and record.get(field): logger.debug(f" 保留原始 {field}: {record[field][:50]}") continue ai_val = extracted.get(field, "") if ai_val and ai_val != "文档未提及": record[field] = ai_val logger.info(f" ✓ {field}: {ai_val[:50]}") elif not record.get(field): record[field] = ai_val or "文档未提及" logger.debug(f" ○ {field}: {record[field]}") # 处理最高限价字段:优先使用最高投标限价,为空时使用最高限价 max_price = record.get("最高投标限价", "") if not max_price: max_price = record.get("最高限价", "") if max_price: record["最高投标限价"] = max_price record["最高限价"] = max_price processed.append(record) success_count += 1 # API 限流 time.sleep(self.api_delay) logger.info(f"AI处理完成: 成功 {success_count}, 失败 {fail_count}") # 保存 AI 处理结果 self._save_results(processed, region_name) # 上传简道云 if upload: logger.info(f"开始上传 {region_name} 到简道云...") self.uploader.upload_records(region_name, processed) return processed # ---------- 字段映射 ---------- @staticmethod def _map_fields(item: dict, link_field: str, notice_type: str) -> dict: """将爬虫输出字段映射为处理所需字段""" record = {} # 基础字段映射:优先使用项目名称(已处理掉批准文号的名称) record["名称"] = item.get("项目名称", item.get("标题", "")) # 优先使用爬虫从详情页内容提取的发布时间(含时分秒) pub_time = item.get("详情页发布时间", "") record["发布时间"] = pub_time record["项目发布时间"] = pub_time record["地区"] = item.get("地区", "") record["招标阶段"] = item.get("公告类型", notice_type) record["来源"] = item.get("来源", "") # 链接字段:根据公告类型映射 record[link_field] = item.get("链接", "") # 保留爬虫已提取的额外字段 extra_fields = [ "项目名称", "项目代码", "招标人", "招标代理", "项目批准文号", "项目类型", "预估合同金额(万元)", "计划招标时间", "联系电话", "招标估算金额", ] for f in extra_fields: if f in item and item[f]: record[f] = item[f] # 别名映射 if "项目批准文号" in record: record.setdefault("批准文号", record["项目批准文号"]) if "项目类型" in record: record.setdefault("类型", record["项目类型"]) if "预估合同金额(万元)" in record: val = record["预估合同金额(万元)"] record.setdefault("预估金额", f"{val}万元" if val else "") if "计划招标时间" in record: record.setdefault("招标时间", record["计划招标时间"]) return record # ---------- 结果保存 ---------- def _save_results(self, records: list, region_name: str): """保存 AI 处理结果为 JSON""" os.makedirs(self.output_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filepath = os.path.join( self.output_dir, f"{region_name}_AI处理_{timestamp}.json") with open(filepath, "w", encoding="utf-8") as f: json.dump({ "处理时间": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "总记录数": len(records), "data": records, }, f, ensure_ascii=False, indent=2) logger.info(f"AI处理结果已保存: {filepath}")