# -*- coding: utf-8 -*- """ 简道云数据上传模块 """ import logging import re import requests import urllib3 from config import JDY_CONFIG urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) logger = logging.getLogger("ztb") class JiandaoyunUploader: """简道云 API 上传器""" BASE_URL = "https://api.jiandaoyun.com/api/v5" # 需要转换为数字的字段 NUMERIC_FIELDS = {"最高限价", "最高投标限价", "预估金额", "招标估算金额"} def __init__(self, api_key: str = None): self.api_key = api_key or JDY_CONFIG["api_key"] self.headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } def upload_records(self, region_name: str, records: list) -> dict: """ 上传记录到对应的简道云表单 Args: region_name: 区域名称(如 "浙江招标文件公示") records: 数据记录列表 Returns: {"total": N, "success": N, "failed": N} """ form_config = JDY_CONFIG["forms"].get(region_name) if not form_config: logger.warning(f"{region_name}: 未找到简道云表单配置,跳过上传") return {"total": len(records), "success": 0, "failed": len(records)} app_id = form_config["app_id"] entry_id = form_config["entry_id"] field_mapping = form_config.get("field_mapping", {}) success = 0 failed = 0 for i, record in enumerate(records): name = record.get("名称", f"记录{i+1}") try: jdy_data = self._convert(record, field_mapping) if not jdy_data: logger.debug(f"[{i+1}/{len(records)}] {name}: 无有效数据") failed += 1 continue result = self._create_record(app_id, entry_id, jdy_data) if result and result.get("success"): success += 1 if (i + 1) % 10 == 0 or (i + 1) == len(records): logger.info( f" [{i+1}/{len(records)}] 上传进度: " f"成功{success} 失败{failed}") else: failed += 1 err = result.get("error", "未知") if result else "无返回" logger.warning( f" [{i+1}/{len(records)}] {name[:25]}: " f"上传失败 - {err}") except Exception as e: failed += 1 logger.error(f" [{i+1}/{len(records)}] {name[:25]}: 异常 - {e}") logger.info(f" {region_name} 上传完成: 成功 {success}, 失败 {failed}") return {"total": len(records), "success": success, "failed": failed} # ---------- 内部方法 ---------- def _create_record(self, app_id: str, entry_id: str, data: dict) -> dict: """调用简道云 API 创建单条记录""" url = f"{self.BASE_URL}/app/entry/data/create" payload = {"app_id": app_id, "entry_id": entry_id, "data": data} try: resp = requests.post(url, headers=self.headers, json=payload, timeout=30, verify=False) if not resp.text: return {"success": False, "error": f"空响应, status={resp.status_code}"} result = resp.json() if resp.status_code == 200 and result.get("data", {}).get("_id"): return {"success": True, "data_id": result["data"]["_id"]} return {"success": False, "error": result.get("msg", str(result))} except Exception as e: return {"success": False, "error": str(e)} def _convert(self, record: dict, field_mapping: dict) -> dict: """将记录转换为简道云 API 格式""" jdy_data = {} for local_field, jdy_field in field_mapping.items(): value = record.get(local_field) if not value or value in ("文档未提及", "详见公告"): continue if local_field in self.NUMERIC_FIELDS: num = self._parse_price(value) if num is not None: jdy_data[jdy_field] = {"value": num} else: jdy_data[jdy_field] = {"value": value} return jdy_data @staticmethod def _parse_price(price_str) -> int | None: """将价格字符串转为纯数字(元)""" if not price_str or price_str in ("文档未提及", "详见公告"): return None s = str(price_str).strip() s = re.sub(r'^[约≈大概]*', '', s) s = re.sub(r'[((].*?[))]', '', s) s = re.sub(r'[元人民币¥¥\s]', '', s) try: if "亿" in s: return int(float(s.replace("亿", "")) * 100_000_000) elif "万" in s: return int(float(s.replace("万", "")) * 10_000) else: s = s.replace(",", "").replace(",", "") return int(float(s)) except (ValueError, TypeError): return None