Files
ztb/processors/jiandaoyun.py

167 lines
6.1 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
"""
简道云数据上传模块
"""
import logging
import re
import requests
import urllib3
from config import JDY_CONFIG
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger("ztb")
class JiandaoyunUploader:
"""简道云 API 上传器"""
BASE_URL = "https://api.jiandaoyun.com/api/v5"
# 需要转换为数字的字段
NUMERIC_FIELDS = {"最高限价", "最高投标限价", "预估金额", "招标估算金额"}
# 需要转换为日期的字段
DATE_FIELDS = {"发布时间", "项目发布时间", "投标截止日", "招标时间"}
def __init__(self, api_key: str = None):
self.api_key = api_key or JDY_CONFIG["api_key"]
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
def upload_records(self, region_name: str, records: list) -> dict:
"""
上传记录到对应的简道云表单
Args:
region_name: 区域名称 "浙江招标文件公示"
records: 数据记录列表
Returns:
{"total": N, "success": N, "failed": N}
"""
form_config = JDY_CONFIG["forms"].get(region_name)
if not form_config:
logger.warning(f"{region_name}: 未找到简道云表单配置,跳过上传")
return {"total": len(records), "success": 0, "failed": len(records)}
app_id = form_config["app_id"]
entry_id = form_config["entry_id"]
field_mapping = form_config.get("field_mapping", {})
success = 0
failed = 0
for i, record in enumerate(records):
name = record.get("名称", f"记录{i+1}")
try:
jdy_data = self._convert(record, field_mapping)
if not jdy_data:
logger.debug(f"[{i+1}/{len(records)}] {name}: 无有效数据")
failed += 1
continue
result = self._create_record(app_id, entry_id, jdy_data)
if result and result.get("success"):
success += 1
if (i + 1) % 10 == 0 or (i + 1) == len(records):
logger.info(
f" [{i+1}/{len(records)}] 上传进度: "
f"成功{success} 失败{failed}")
else:
failed += 1
err = result.get("error", "未知") if result else "无返回"
logger.warning(
f" [{i+1}/{len(records)}] {name[:25]}: "
f"上传失败 - {err}")
except Exception as e:
failed += 1
logger.error(f" [{i+1}/{len(records)}] {name[:25]}: 异常 - {e}")
logger.info(f" {region_name} 上传完成: 成功 {success}, 失败 {failed}")
return {"total": len(records), "success": success, "failed": failed}
# ---------- 内部方法 ----------
def _create_record(self, app_id: str, entry_id: str, data: dict) -> dict:
"""调用简道云 API 创建单条记录"""
url = f"{self.BASE_URL}/app/entry/data/create"
payload = {"app_id": app_id, "entry_id": entry_id, "data": data}
try:
resp = requests.post(url, headers=self.headers,
json=payload, timeout=30, verify=False)
if not resp.text:
return {"success": False,
"error": f"空响应, status={resp.status_code}"}
result = resp.json()
if resp.status_code == 200 and result.get("data", {}).get("_id"):
return {"success": True,
"data_id": result["data"]["_id"]}
return {"success": False,
"error": result.get("msg", str(result))}
except Exception as e:
return {"success": False, "error": str(e)}
def _convert(self, record: dict, field_mapping: dict) -> dict:
"""将记录转换为简道云 API 格式"""
jdy_data = {}
for local_field, jdy_field in field_mapping.items():
value = record.get(local_field)
if not value or value in ("文档未提及", "详见公告"):
continue
if local_field in self.NUMERIC_FIELDS:
num = self._parse_price(value)
if num is not None:
jdy_data[jdy_field] = {"value": num}
elif local_field in self.DATE_FIELDS:
iso_date = self._to_iso8601(value)
if iso_date:
jdy_data[jdy_field] = {"value": iso_date}
else:
jdy_data[jdy_field] = {"value": value}
return jdy_data
@staticmethod
def _to_iso8601(date_str: str) -> str:
"""将日期字符串转为 ISO 8601 格式(北京时间 UTC+8"""
if not date_str or date_str in ("文档未提及", "详见公告"):
return ""
s = str(date_str).strip()
from datetime import datetime as _dt
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y-%m-%d"):
try:
dt = _dt.strptime(s, fmt)
return dt.strftime("%Y-%m-%dT%H:%M:%S") + "+08:00"
except ValueError:
continue
# 无法解析,原样返回
return s
@staticmethod
def _parse_price(price_str) -> int | None:
"""将价格字符串转为纯数字(元)"""
if not price_str or price_str in ("文档未提及", "详见公告"):
return None
s = str(price_str).strip()
s = re.sub(r'^[约≈大概]*', '', s)
s = re.sub(r'[(].*?[)]', '', s)
s = re.sub(r'[元人民币¥¥\s]', '', s)
try:
if "亿" in s:
return int(float(s.replace("亿", "")) * 100_000_000)
elif "" in s:
return int(float(s.replace("", "")) * 10_000)
else:
s = s.replace(",", "").replace("", "")
return int(float(s))
except (ValueError, TypeError):
return None