Files
ztb/test_random_data_extract.py
2026-02-13 18:15:20 +08:00

164 lines
5.2 KiB
Python

# -*- coding: utf-8 -*-
"""
随机选择原始数据进行提取测试
特别关注项目名称和项目批准文号
"""
import logging
import sys
import os
import csv
import random
# 添加当前目录到模块搜索路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 导入配置和处理器
from config import REGION_CONFIGS
from processors.content_fetcher import ContentFetcher
from processors.deepseek import DeepSeekProcessor
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 原始数据文件路径
CSV_FILE = "data/浙江省公共资源交易中心_20260213_142920.csv"
# 结果输出文件
OUTPUT_MD = "随机提取分析报告.md"
def read_csv_data(file_path):
"""读取CSV文件数据"""
data = []
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
data.append(row)
return data
def extract_data_from_url(url, title):
"""从URL提取数据"""
try:
# 获取内容
fetcher = ContentFetcher(temp_dir="temp_files")
content = fetcher.get_full_content(url)
if not content:
logger.warning(f"无法获取内容: {url}")
return None
logger.info(f"获取到内容长度: {len(content)} 字符")
# 执行提取
processor = DeepSeekProcessor()
# 获取浙江招标文件公示的配置
config_key = "zhejiang:招标文件公示"
if config_key not in REGION_CONFIGS:
logger.error(f"未找到配置: {config_key}")
return None
ai_fields = REGION_CONFIGS[config_key]["ai_fields"]
logger.info(f"需要提取的字段: {ai_fields}")
# 执行提取
extracted = processor.extract_fields(content, ai_fields, "浙江")
# 添加项目名称
extracted["项目名称"] = title
return extracted
except Exception as e:
logger.error(f"提取失败: {e}")
return None
def main():
"""主函数"""
logger.info("开始随机提取测试")
# 读取CSV数据
if not os.path.exists(CSV_FILE):
logger.error(f"CSV文件不存在: {CSV_FILE}")
return
data = read_csv_data(CSV_FILE)
logger.info(f"总数据量: {len(data)}")
# 随机选择5条数据
if len(data) > 5:
selected_data = random.sample(data, 5)
else:
selected_data = data
logger.info(f"随机选择了 {len(selected_data)} 条数据")
# 提取结果
results = []
for i, item in enumerate(selected_data, 1):
title = item.get("标题", "")
url = item.get("链接", "")
project_name = item.get("项目名称", "")
approval_number = item.get("项目批准文号", "")
logger.info(f"\n{'-'*50}")
logger.info(f"测试 {i}: {title}")
logger.info(f"URL: {url}")
logger.info(f"项目名称: {project_name}")
logger.info(f"项目批准文号: {approval_number}")
if not url:
logger.warning("无链接,跳过")
continue
# 提取数据
extracted = extract_data_from_url(url, project_name)
# 直接从CSV中添加项目批准文号
if approval_number:
extracted["批准文号"] = approval_number
if extracted:
results.append(extracted)
logger.info(f"提取成功!")
else:
logger.warning("提取失败")
# 处理最高限价字段:优先使用最高投标限价,为空时使用最高限价
for result in results:
max_price = result.get("最高投标限价", "")
if not max_price:
max_price = result.get("最高限价", "")
result["最高投标限价"] = max_price or "文档未提及"
# 生成MD报告
import datetime
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(OUTPUT_MD, 'w', encoding='utf-8') as f:
f.write("# 随机提取分析报告\n\n")
f.write(f"生成时间: {current_time}\n\n")
f.write(f"共提取了 {len(results)} 条数据\n\n")
for i, result in enumerate(results, 1):
f.write(f"## 提取结果 {i}\n\n")
f.write(f"### 项目名称\n{result.get('项目名称', '文档未提及')}\n\n")
f.write(f"### 项目批准文号\n{result.get('批准文号', '文档未提及')}\n\n")
f.write(f"### 其他关键信息\n")
f.write("| 字段 | 值 |\n")
f.write("|------|------|\n")
# 选择重要字段展示
important_fields = ["类型", "地区", "投标截止日", "最高投标限价", "资质要求", "业绩要求", "评标办法", "有无答辩", "招标人"]
for field in important_fields:
value = result.get(field, "文档未提及")
f.write(f"| {field} | {value} |\n")
f.write("\n")
logger.info(f"报告生成完成: {OUTPUT_MD}")
if __name__ == "__main__":
main()