# -*- coding: utf-8 -*- """ 浙江省公共资源交易中心爬虫 —— 基于 API + requests """ import json import logging import os import re from datetime import datetime, timedelta from bs4 import BeautifulSoup from .base import BaseSpider from utils.attachment import AttachmentHandler logger = logging.getLogger("ztb") class ZhejiangSpider(BaseSpider): """浙江省公共资源交易中心爬虫""" # ---------- API 列表 ---------- def _build_payload(self, page_index: int, page_size: int, category_code: str, notice_code: str, start_date: str, end_date: str) -> dict: """构建浙江省 API 请求体""" condition = [] if notice_code: condition.append({ "fieldName": "categorynum", "isLike": True, "likeType": 2, "equal": notice_code, }) elif category_code: condition.append({ "fieldName": "categorynum", "isLike": True, "likeType": 2, "equal": category_code, }) time_cond = [] if start_date and end_date: time_cond.append({ "fieldName": "webdate", "startTime": f"{start_date} 00:00:00", "endTime": f"{end_date} 23:59:59", }) return { "token": "", "pn": page_index * page_size, "rn": page_size, "sdt": "", "edt": "", "wd": "", "inc_wd": "", "exc_wd": "", "fields": "title", "cnum": "001", "sort": '{"webdate":"0"}', "ssort": "title", "cl": 5000, "terminal": "", "condition": condition or None, "time": time_cond or None, "highlights": "", "statistics": None, "unionCondition": None, "accuracy": "", "noParticiple": "0", "searchRange": None, "isBusiness": "1", } def fetch_list_page(self, page_index: int, page_size: int, category_code: str, notice_code: str, start_date: str, end_date: str) -> list: """通过 API 获取一页列表数据""" payload = self._build_payload( page_index, page_size, category_code, notice_code, start_date, end_date, ) resp = self.fetch( self.config["api_url"], method="POST", json=payload, headers={"Referer": self.config["base_url"] + "/jyxxgk/list.html"}, ) if resp is None: return [] try: data = resp.json() return data.get("result", {}).get("records", []) except Exception as e: logger.error(f"解析 API 响应失败: {e}") return [] # ---------- 解析记录 ---------- def _parse_record(self, record: dict, source: str) -> dict: """将 API 原始记录转换为结果字典""" title = record.get("title", "").strip() link = record.get("linkurl", "") if link and not link.startswith("http"): link = "https://ggzy.zj.gov.cn" + link date_str = record.get("webdate", "") date_short = date_str.split(" ")[0] if date_str else "" item = { "标题": title, "发布日期": date_short, "地区": record.get("infod", ""), "公告类型": record.get("categoryname", ""), "链接": link, "来源": source, } # 解析标题:提取项目名称和批准文号(统一规则) item.update(self._parse_title(title)) return item @staticmethod def _parse_content_fields(content: str) -> dict: """从 API content 字段提取结构化信息""" if not content: return {} # 清理 HTML 实体 import html as html_mod text = html_mod.unescape(content) text = re.sub(r"<[^>]+>", "", text) # 去 HTML 标签 text = re.sub(r"\s+", " ", text).strip() fields = {} patterns = { "项目名称": r"项目名称[:::]\s*(.+?)\s{2,}", "项目代码": r"项目代码[:::]\s*(.+?)\s{2,}", "招标人": r"招标人[:::].*?名称[:::]\s*(.+?)\s{2,}", "招标代理": r"代理机构[:::].*?名称[:::]\s*(.+?)\s{2,}", "联系电话": r"电\s*话[:::]\s*([\d\-]+)", "招标估算金额": r"招标估算金额[:::]\s*([\d,\.]+\s*元)", } for key, pat in patterns.items(): m = re.search(pat, text) if m: fields[key] = m.group(1).strip() return fields # ---------- 详情页补充 ---------- def parse_detail(self, url: str) -> dict: """访问详情页,提取项目名称和批准文号等结构化字段""" resp = self.fetch(url) if resp is None: return {} detail = {} soup = BeautifulSoup(resp.text, "html.parser") # 解析表格字段 field_map = { "项目名称": "项目名称", "项目批准文件及文号": "项目批准文号", "项目批准文号": "项目批准文号", "批准文号": "项目批准文号", "建设单位(招标人)": "招标人", "建设单位(招标人)": "招标人", "招标人": "招标人", "项目类型": "项目类型", "招标方式": "招标方式", "联系人": "联系人", "联系方式": "联系方式", } for row in soup.select("table tr"): cells = row.select("td") if len(cells) >= 2: key = cells[0].get_text(strip=True) value = cells[1].get_text(strip=True) if key in field_map and value: detail[field_map[key]] = value if len(cells) >= 4: key2 = cells[2].get_text(strip=True) value2 = cells[3].get_text(strip=True) if key2 in field_map and value2: detail[field_map[key2]] = value2 # 招标项目表(计划招标时间 / 预估合同金额) for table in soup.select("table"): headers = [th.get_text(strip=True) for th in table.select("th")] if "计划招标时间" in headers: data_rows = table.select("tbody tr") or [ r for r in table.select("tr") if r.select("td") ] if data_rows: cells = data_rows[0].select("td") for i, h in enumerate(headers): if i < len(cells): val = cells[i].get_text(strip=True) if h == "计划招标时间" and val: detail["计划招标时间"] = val elif "预估合同金额" in h and val: detail["预估合同金额(万元)"] = val break return detail # ---------- 附件 ---------- def _extract_attachments_from_detail(self, url: str) -> list: """访问详情页,提取附件链接""" resp = self.fetch(url) if resp is None: return [] attachments = [] # PDF for href in re.findall(r'href=["\']([^"\']*\.pdf[^"\']*)', resp.text): if not href.startswith("http"): href = self.config["base_url"] + href name = href.split("/")[-1] attachments.append({"name": name, "url": href}) # Word for href in re.findall(r'href=["\']([^"\']*\.docx?[^"\']*)', resp.text): if not href.startswith("http"): href = self.config["base_url"] + href name = href.split("/")[-1] attachments.append({"name": name, "url": href}) return attachments # ---------- 主流程 ---------- def crawl(self, max_pages: int = None, category: str = None, notice_type: str = None, date_filter: str = None, download_attachment: bool = False, **kwargs): """ 执行爬取 Args: max_pages: 最大爬取页数 category: 交易领域(如 "工程建设") notice_type: 公告类型(如 "招标公告") date_filter: 日期过滤("yesterday" 或 "2026-02-03") download_attachment: 是否下载附件 """ if max_pages is None: max_pages = self.spider_config.get("max_pages", 10) page_size = self.spider_config.get("page_size", 20) # 日期范围 if date_filter == "yesterday": d = datetime.now() - timedelta(days=1) start_date = end_date = d.strftime("%Y-%m-%d") logger.info(f"过滤日期: {start_date}(昨天)") elif date_filter: start_date = end_date = date_filter logger.info(f"过滤日期: {start_date}") else: # 默认近一个月 end_date = datetime.now().strftime("%Y-%m-%d") start_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d") category_code = self.config.get("categories", {}).get(category, "") notice_code = self.config.get("notice_types", {}).get(notice_type, "") # 附件处理器 attachment_handler = None if download_attachment: attachment_dir = os.path.join(self.data_dir, "attachments") attachment_handler = AttachmentHandler(attachment_dir) logger.info(f"启用附件下载,保存到: {attachment_dir}") logger.info(f"开始爬取: {self.config['name']}") if category: logger.info(f"交易领域: {category}") if notice_type: logger.info(f"公告类型: {notice_type}") for page_idx in range(max_pages): if self._check_limits(): break logger.info(f"正在爬取第 {page_idx + 1} 页...") records = self.fetch_list_page( page_idx, page_size, category_code, notice_code, start_date, end_date, ) if not records: logger.info("没有更多数据") break count = 0 for rec in records: link = rec.get("linkurl", "") if link and not link.startswith("http"): link = self.config["base_url"] + link if self.is_duplicate(link): continue item = self._parse_record(rec, self.config["name"]) # 从 content 提取详情字段 detail = self._parse_content_fields(rec.get("content", "")) item.update(detail) # 详情页补充:提取项目名称和批准文号等 self.detail_delay() page_detail = self.parse_detail(link) # 详情页字段仅补充,不覆盖已有值 for k, v in page_detail.items(): if not item.get(k): item[k] = v # 附件 if download_attachment and attachment_handler: atts = self._extract_attachments_from_detail(link) if atts: item["附件数量"] = len(atts) att_names = [] for att in atts: att_names.append(att["name"]) result = attachment_handler.download_and_extract(att["url"]) if result["success"] and result["text"]: item["附件内容摘要"] = result["text"][:2000] item["附件名称"] = " | ".join(att_names) self.results.append(item) count += 1 logger.info(f" 获取 {count} 条数据") if count == 0: logger.info("当前页无新数据,停止翻页") break self.delay() self.print_stats() logger.info(f"爬取完成,共 {len(self.results)} 条数据") return self.results