Files
ztb/processors/pipeline.py

203 lines
8.0 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
"""
处理管道 - 将爬虫结果经 DeepSeek AI 处理后上传简道云
"""
import json
import logging
import os
import time
from datetime import datetime
from config import REGION_CONFIGS, PROCESSING_CONFIG
from .content_fetcher import ContentFetcher
from .deepseek import DeepSeekProcessor
from .jiandaoyun import JiandaoyunUploader
logger = logging.getLogger("ztb")
class ProcessingPipeline:
"""处理管道:爬虫结果 → 内容获取 → AI提取 → 上传简道云"""
def __init__(self):
temp_dir = PROCESSING_CONFIG.get("temp_dir", "temp_files")
self.fetcher = ContentFetcher(temp_dir=temp_dir)
self.deepseek = DeepSeekProcessor()
self.uploader = JiandaoyunUploader()
self.api_delay = PROCESSING_CONFIG.get("api_delay", 1)
self.output_dir = PROCESSING_CONFIG.get("output_dir", "data")
def process_results(self, results: list, site: str,
notice_type: str, upload: bool = False) -> list:
"""
处理爬虫结果字段映射 内容获取 AI提取 可选上传
Args:
results: 爬虫返回的结果列表
site: 站点标识 ("zhejiang" / "taizhou")
notice_type: 公告类型 ( "招标文件公示")
upload: 是否上传简道云
Returns:
处理后的记录列表
"""
# 查找区域配置
region_key = f"{site}:{notice_type}"
region_cfg = REGION_CONFIGS.get(region_key)
if not region_cfg:
logger.warning(f"未找到区域配置: {region_key}跳过AI处理")
return results
region_name = region_cfg["region_name"]
link_field = region_cfg["link_field"]
ai_fields = region_cfg["ai_fields"]
logger.info(f"开始AI处理: {region_name}, {len(results)} 条记录")
logger.info(f" 需要提取的字段: {ai_fields}")
processed = []
success_count = 0
fail_count = 0
for i, item in enumerate(results):
# 1. 字段映射:爬虫字段 → 处理字段
record = self._map_fields(item, link_field, notice_type)
name = record.get("名称", "未知")[:35]
logger.info(f" [{i+1}/{len(results)}] {name}")
# 2. 获取全文内容
url = record.get(link_field, "")
if not url:
logger.warning(f" 无详情链接,跳过")
processed.append(record)
fail_count += 1
continue
content = self.fetcher.get_full_content(url)
if not content or len(content) < 200:
logger.warning(
f" 内容过少({len(content) if content else 0}字符),跳过")
processed.append(record)
fail_count += 1
continue
logger.info(f" 获取到 {len(content)} 字符内容")
# 3. DeepSeek 提取
extracted = self.deepseek.extract_fields(
content, ai_fields, region_name)
# 4. 提取发布时间从content中
import re
publish_time_match = re.search(r'发布时间:\s*(.*?)\n', content)
if publish_time_match:
extracted_publish_time = publish_time_match.group(1).strip()
# 如果提取到了更详细的发布时间(包含时分秒),更新记录
if extracted_publish_time:
record["发布时间"] = extracted_publish_time
record["项目发布时间"] = extracted_publish_time # 同时更新项目发布时间,确保一致性
logger.info(f" ✓ 发布时间: {extracted_publish_time}")
# 5. 合并结果AI 优先,原有值保底)
for field in ai_fields:
# 保留原始的项目名称、项目批准文号和批准文号不被AI覆盖
if field in ["项目名称", "项目批准文号", "批准文号"] and record.get(field):
logger.debug(f" 保留原始 {field}: {record[field][:50]}")
continue
ai_val = extracted.get(field, "")
if ai_val and ai_val != "文档未提及":
record[field] = ai_val
logger.info(f"{field}: {ai_val[:50]}")
elif not record.get(field):
record[field] = ai_val or "文档未提及"
logger.debug(f"{field}: {record[field]}")
# 处理最高限价字段:优先使用最高投标限价,为空时使用最高限价
max_price = record.get("最高投标限价", "")
if not max_price:
max_price = record.get("最高限价", "")
if max_price:
record["最高投标限价"] = max_price
record["最高限价"] = max_price
processed.append(record)
success_count += 1
# API 限流
time.sleep(self.api_delay)
logger.info(f"AI处理完成: 成功 {success_count}, 失败 {fail_count}")
# 保存 AI 处理结果
self._save_results(processed, region_name)
# 上传简道云
if upload:
logger.info(f"开始上传 {region_name} 到简道云...")
self.uploader.upload_records(region_name, processed)
return processed
# ---------- 字段映射 ----------
@staticmethod
def _map_fields(item: dict, link_field: str,
notice_type: str) -> dict:
"""将爬虫输出字段映射为处理所需字段"""
record = {}
# 基础字段映射:优先使用项目名称(已处理掉批准文号的名称)
record["名称"] = item.get("项目名称", item.get("标题", ""))
# 优先使用爬虫从详情页内容提取的发布时间(含时分秒)
pub_time = item.get("详情页发布时间", "")
record["发布时间"] = pub_time
record["项目发布时间"] = pub_time
record["地区"] = item.get("地区", "")
record["招标阶段"] = item.get("公告类型", notice_type)
record["来源"] = item.get("来源", "")
# 链接字段:根据公告类型映射
record[link_field] = item.get("链接", "")
# 保留爬虫已提取的额外字段
extra_fields = [
"项目名称", "项目代码", "招标人", "招标代理",
"项目批准文号", "项目类型", "预估合同金额(万元)",
"计划招标时间", "联系电话", "招标估算金额",
]
for f in extra_fields:
if f in item and item[f]:
record[f] = item[f]
# 别名映射
if "项目批准文号" in record:
record.setdefault("批准文号", record["项目批准文号"])
if "项目类型" in record:
record.setdefault("类型", record["项目类型"])
if "预估合同金额(万元)" in record:
val = record["预估合同金额(万元)"]
record.setdefault("预估金额", f"{val}万元" if val else "")
if "计划招标时间" in record:
record.setdefault("招标时间", record["计划招标时间"])
return record
# ---------- 结果保存 ----------
def _save_results(self, records: list, region_name: str):
"""保存 AI 处理结果为 JSON"""
os.makedirs(self.output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filepath = os.path.join(
self.output_dir, f"{region_name}_AI处理_{timestamp}.json")
with open(filepath, "w", encoding="utf-8") as f:
json.dump({
"处理时间": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"总记录数": len(records),
"data": records,
}, f, ensure_ascii=False, indent=2)
logger.info(f"AI处理结果已保存: {filepath}")