Files
ztb/spiders/zhejiang.py

306 lines
11 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
"""
浙江省公共资源交易中心爬虫 基于 API + requests
"""
import json
import logging
import os
import re
from datetime import datetime, timedelta
from .base import BaseSpider
from utils.attachment import AttachmentHandler
logger = logging.getLogger("ztb")
class ZhejiangSpider(BaseSpider):
"""浙江省公共资源交易中心爬虫"""
# ---------- API 列表 ----------
def _build_payload(self, page_index: int, page_size: int,
category_code: str, notice_code: str,
start_date: str, end_date: str) -> dict:
"""构建浙江省 API 请求体"""
condition = []
if notice_code:
condition.append({
"fieldName": "categorynum",
"isLike": True,
"likeType": 2,
"equal": notice_code,
})
elif category_code:
condition.append({
"fieldName": "categorynum",
"isLike": True,
"likeType": 2,
"equal": category_code,
})
time_cond = []
if start_date and end_date:
time_cond.append({
"fieldName": "webdate",
"startTime": f"{start_date} 00:00:00",
"endTime": f"{end_date} 23:59:59",
})
return {
"token": "",
"pn": page_index * page_size,
"rn": page_size,
"sdt": "", "edt": "",
"wd": "", "inc_wd": "", "exc_wd": "",
"fields": "title",
"cnum": "001",
"sort": '{"webdate":"0"}',
"ssort": "title",
"cl": 5000,
"terminal": "",
"condition": condition or None,
"time": time_cond or None,
"highlights": "",
"statistics": None,
"unionCondition": None,
"accuracy": "",
"noParticiple": "0",
"searchRange": None,
"isBusiness": "1",
}
def fetch_list_page(self, page_index: int, page_size: int,
category_code: str, notice_code: str,
start_date: str, end_date: str) -> list:
"""通过 API 获取一页列表数据"""
payload = self._build_payload(
page_index, page_size, category_code, notice_code,
start_date, end_date,
)
resp = self.fetch(
self.config["api_url"],
method="POST",
json=payload,
headers={"Referer": self.config["base_url"] + "/jyxxgk/list.html"},
)
if resp is None:
return []
try:
data = resp.json()
return data.get("result", {}).get("records", [])
except Exception as e:
logger.error(f"解析 API 响应失败: {e}")
return []
# ---------- 解析记录 ----------
@staticmethod
def _parse_record(record: dict, source: str) -> dict:
"""将 API 原始记录转换为结果字典"""
title = record.get("title", "").strip()
link = record.get("linkurl", "")
if link and not link.startswith("http"):
link = "https://ggzy.zj.gov.cn" + link
date_str = record.get("webdate", "")
date_short = date_str.split(" ")[0] if date_str else ""
item = {
"标题": title,
"发布日期": date_short,
"地区": record.get("infod", ""),
"公告类型": record.get("categoryname", ""),
"链接": link,
"来源": source,
}
# 解析特定格式的标题:[招标文件]项目名称[批准文号]
import re
# 改进的正则表达式,确保正确匹配标题格式
title_pattern = r"\[(?:招标文件|招标公告)\]\s*(.*?)\s*\[([A-Z0-9]+)\]\s*$"
match = re.search(title_pattern, title)
if match:
project_name = match.group(1).strip()
# 删除结尾的"招标文件公示"、"招标文件预公示"等后缀
suffixes = ["招标文件公示", "招标文件预公示", "招标公告", "招标预公告"]
for suffix in suffixes:
if project_name.endswith(suffix):
project_name = project_name[:-len(suffix)].strip()
item["项目名称"] = project_name
item["项目批准文号"] = match.group(2).strip()
else:
# 如果正则匹配失败,直接使用标题作为项目名称
project_name = title
# 删除结尾的"招标文件公示"、"招标文件预公示"等后缀
suffixes = ["招标文件公示", "招标文件预公示", "招标公告", "招标预公告"]
for suffix in suffixes:
if project_name.endswith(suffix):
project_name = project_name[:-len(suffix)].strip()
# 尝试从标题中提取批准文号
number_pattern = r"\[([A-Z0-9]+)\]\s*$"
match = re.search(number_pattern, project_name)
if match:
item["项目批准文号"] = match.group(1).strip()
# 从项目名称中删除批准文号部分
project_name = project_name[:match.start()].strip()
item["项目名称"] = project_name
return item
@staticmethod
def _parse_content_fields(content: str) -> dict:
"""从 API content 字段提取结构化信息"""
if not content:
return {}
# 清理 HTML 实体
import html as html_mod
text = html_mod.unescape(content)
text = re.sub(r"<[^>]+>", "", text) # 去 HTML 标签
text = re.sub(r"\s+", " ", text).strip()
fields = {}
patterns = {
"项目名称": r"项目名称[::]\s*(.+?)\s{2,}",
"项目代码": r"项目代码[::]\s*(.+?)\s{2,}",
"招标人": r"招标人[::].*?名称[::]\s*(.+?)\s{2,}",
"招标代理": r"代理机构[::].*?名称[::]\s*(.+?)\s{2,}",
"联系电话": r"\s*话[::]\s*([\d\-]+)",
"招标估算金额": r"招标估算金额[::]\s*([\d,\.]+\s*元)",
}
for key, pat in patterns.items():
m = re.search(pat, text)
if m:
fields[key] = m.group(1).strip()
return fields
# ---------- 附件 ----------
def _extract_attachments_from_detail(self, url: str) -> list:
"""访问详情页,提取附件链接"""
resp = self.fetch(url)
if resp is None:
return []
attachments = []
# PDF
for href in re.findall(r'href=["\']([^"\']*\.pdf[^"\']*)', resp.text):
if not href.startswith("http"):
href = self.config["base_url"] + href
name = href.split("/")[-1]
attachments.append({"name": name, "url": href})
# Word
for href in re.findall(r'href=["\']([^"\']*\.docx?[^"\']*)', resp.text):
if not href.startswith("http"):
href = self.config["base_url"] + href
name = href.split("/")[-1]
attachments.append({"name": name, "url": href})
return attachments
# ---------- 主流程 ----------
def crawl(self, max_pages: int = None, category: str = None,
notice_type: str = None, date_filter: str = None,
download_attachment: bool = False, **kwargs):
"""
执行爬取
Args:
max_pages: 最大爬取页数
category: 交易领域 "工程建设"
notice_type: 公告类型 "招标公告"
date_filter: 日期过滤"yesterday" "2026-02-03"
download_attachment: 是否下载附件
"""
if max_pages is None:
max_pages = self.spider_config.get("max_pages", 10)
page_size = self.spider_config.get("page_size", 20)
# 日期范围
if date_filter == "yesterday":
d = datetime.now() - timedelta(days=1)
start_date = end_date = d.strftime("%Y-%m-%d")
logger.info(f"过滤日期: {start_date}(昨天)")
elif date_filter:
start_date = end_date = date_filter
logger.info(f"过滤日期: {start_date}")
else:
# 默认近一个月
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
category_code = self.config.get("categories", {}).get(category, "")
notice_code = self.config.get("notice_types", {}).get(notice_type, "")
# 附件处理器
attachment_handler = None
if download_attachment:
attachment_dir = os.path.join(self.data_dir, "attachments")
attachment_handler = AttachmentHandler(attachment_dir)
logger.info(f"启用附件下载,保存到: {attachment_dir}")
logger.info(f"开始爬取: {self.config['name']}")
if category:
logger.info(f"交易领域: {category}")
if notice_type:
logger.info(f"公告类型: {notice_type}")
for page_idx in range(max_pages):
if self._check_limits():
break
logger.info(f"正在爬取第 {page_idx + 1} 页...")
records = self.fetch_list_page(
page_idx, page_size, category_code, notice_code,
start_date, end_date,
)
if not records:
logger.info("没有更多数据")
break
count = 0
for rec in records:
link = rec.get("linkurl", "")
if link and not link.startswith("http"):
link = self.config["base_url"] + link
if self.is_duplicate(link):
continue
item = self._parse_record(rec, self.config["name"])
# 从 content 提取详情字段
detail = self._parse_content_fields(rec.get("content", ""))
item.update(detail)
# 附件
if download_attachment and attachment_handler:
self.detail_delay()
atts = self._extract_attachments_from_detail(link)
if atts:
item["附件数量"] = len(atts)
att_names = []
for att in atts:
att_names.append(att["name"])
result = attachment_handler.download_and_extract(att["url"])
if result["success"] and result["text"]:
item["附件内容摘要"] = result["text"][:2000]
item["附件名称"] = " | ".join(att_names)
self.results.append(item)
count += 1
logger.info(f" 获取 {count} 条数据")
if count == 0:
logger.info("当前页无新数据,停止翻页")
break
self.delay()
self.print_stats()
logger.info(f"爬取完成,共 {len(self.results)} 条数据")
return self.results