Files
ztb/processors/pipeline.py
ztb-system d823936436 feat: 添加招标估算金额字段并修复项目名称显示
- 移除台州招标文件公示相关配置
- 添加浙江招标公告招标估算金额字段
- 修复项目名称匹配规则,优先使用处理后的项目名称
- 更新简道云字段映射
- 添加测试文件
2026-02-24 19:55:56 +08:00

203 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
处理管道 - 将爬虫结果经 DeepSeek AI 处理后上传简道云
"""
import json
import logging
import os
import time
from datetime import datetime
from config import REGION_CONFIGS, PROCESSING_CONFIG
from .content_fetcher import ContentFetcher
from .deepseek import DeepSeekProcessor
from .jiandaoyun import JiandaoyunUploader
logger = logging.getLogger("ztb")
class ProcessingPipeline:
"""处理管道:爬虫结果 → 内容获取 → AI提取 → 上传简道云"""
def __init__(self):
temp_dir = PROCESSING_CONFIG.get("temp_dir", "temp_files")
self.fetcher = ContentFetcher(temp_dir=temp_dir)
self.deepseek = DeepSeekProcessor()
self.uploader = JiandaoyunUploader()
self.api_delay = PROCESSING_CONFIG.get("api_delay", 1)
self.output_dir = PROCESSING_CONFIG.get("output_dir", "data")
def process_results(self, results: list, site: str,
notice_type: str, upload: bool = False) -> list:
"""
处理爬虫结果:字段映射 → 内容获取 → AI提取 → 可选上传
Args:
results: 爬虫返回的结果列表
site: 站点标识 ("zhejiang" / "taizhou")
notice_type: 公告类型 (如 "招标文件公示")
upload: 是否上传简道云
Returns:
处理后的记录列表
"""
# 查找区域配置
region_key = f"{site}:{notice_type}"
region_cfg = REGION_CONFIGS.get(region_key)
if not region_cfg:
logger.warning(f"未找到区域配置: {region_key}跳过AI处理")
return results
region_name = region_cfg["region_name"]
link_field = region_cfg["link_field"]
ai_fields = region_cfg["ai_fields"]
logger.info(f"开始AI处理: {region_name}, {len(results)} 条记录")
logger.info(f" 需要提取的字段: {ai_fields}")
processed = []
success_count = 0
fail_count = 0
for i, item in enumerate(results):
# 1. 字段映射:爬虫字段 → 处理字段
record = self._map_fields(item, link_field, notice_type)
name = record.get("名称", "未知")[:35]
logger.info(f" [{i+1}/{len(results)}] {name}")
# 2. 获取全文内容
url = record.get(link_field, "")
if not url:
logger.warning(f" 无详情链接,跳过")
processed.append(record)
fail_count += 1
continue
content = self.fetcher.get_full_content(url)
if not content or len(content) < 200:
logger.warning(
f" 内容过少({len(content) if content else 0}字符),跳过")
processed.append(record)
fail_count += 1
continue
logger.info(f" 获取到 {len(content)} 字符内容")
# 3. DeepSeek 提取
extracted = self.deepseek.extract_fields(
content, ai_fields, region_name)
# 4. 提取发布时间从content中
import re
publish_time_match = re.search(r'发布时间:\s*(.*?)\n', content)
if publish_time_match:
extracted_publish_time = publish_time_match.group(1).strip()
# 如果提取到了更详细的发布时间(包含时分秒),更新记录
if extracted_publish_time:
record["发布时间"] = extracted_publish_time
record["项目发布时间"] = extracted_publish_time # 同时更新项目发布时间,确保一致性
logger.info(f" ✓ 发布时间: {extracted_publish_time}")
# 5. 合并结果AI 优先,原有值保底)
for field in ai_fields:
# 保留原始的项目名称、项目批准文号和批准文号不被AI覆盖
if field in ["项目名称", "项目批准文号", "批准文号"] and record.get(field):
logger.debug(f" 保留原始 {field}: {record[field][:50]}")
continue
ai_val = extracted.get(field, "")
if ai_val and ai_val != "文档未提及":
record[field] = ai_val
logger.info(f"{field}: {ai_val[:50]}")
elif not record.get(field):
record[field] = ai_val or "文档未提及"
logger.debug(f"{field}: {record[field]}")
# 处理最高限价字段:优先使用最高投标限价,为空时使用最高限价
max_price = record.get("最高投标限价", "")
if not max_price:
max_price = record.get("最高限价", "")
if max_price:
record["最高投标限价"] = max_price
record["最高限价"] = max_price
processed.append(record)
success_count += 1
# API 限流
time.sleep(self.api_delay)
logger.info(f"AI处理完成: 成功 {success_count}, 失败 {fail_count}")
# 保存 AI 处理结果
self._save_results(processed, region_name)
# 上传简道云
if upload:
logger.info(f"开始上传 {region_name} 到简道云...")
self.uploader.upload_records(region_name, processed)
return processed
# ---------- 字段映射 ----------
@staticmethod
def _map_fields(item: dict, link_field: str,
notice_type: str) -> dict:
"""将爬虫输出字段映射为处理所需字段"""
record = {}
# 基础字段映射:优先使用项目名称(已处理掉批准文号的名称)
record["名称"] = item.get("项目名称", item.get("标题", ""))
pub_date = item.get("发布日期", item.get("项目发布时间", ""))
record["发布时间"] = pub_date
# 项目发布时间修复:使用与发布时间相同的值,确保格式一致
record["项目发布时间"] = pub_date # 台州招标计划 JDY 使用此字段名
record["地区"] = item.get("地区", "")
record["招标阶段"] = item.get("公告类型", notice_type)
record["来源"] = item.get("来源", "")
# 链接字段:根据公告类型映射
record[link_field] = item.get("链接", "")
# 保留爬虫已提取的额外字段
extra_fields = [
"项目名称", "项目代码", "招标人", "招标代理",
"项目批准文号", "项目类型", "预估合同金额(万元)",
"计划招标时间", "联系电话", "招标估算金额",
]
for f in extra_fields:
if f in item and item[f]:
record[f] = item[f]
# 别名映射
if "项目批准文号" in record:
record.setdefault("批准文号", record["项目批准文号"])
if "项目类型" in record:
record.setdefault("类型", record["项目类型"])
if "预估合同金额(万元)" in record:
val = record["预估合同金额(万元)"]
record.setdefault("预估金额", f"{val}万元" if val else "")
if "计划招标时间" in record:
record.setdefault("招标时间", record["计划招标时间"])
return record
# ---------- 结果保存 ----------
def _save_results(self, records: list, region_name: str):
"""保存 AI 处理结果为 JSON"""
os.makedirs(self.output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filepath = os.path.join(
self.output_dir, f"{region_name}_AI处理_{timestamp}.json")
with open(filepath, "w", encoding="utf-8") as f:
json.dump({
"处理时间": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"总记录数": len(records),
"data": records,
}, f, ensure_ascii=False, indent=2)
logger.info(f"AI处理结果已保存: {filepath}")