Files
ztb/spiders/taizhou.py

336 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
台州公共资源交易中心爬虫 —— 基于 API + requests
"""
import logging
import os
import re
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
from .base import BaseSpider
from utils.attachment import AttachmentHandler
logger = logging.getLogger("ztb")
class TaizhouSpider(BaseSpider):
"""台州公共资源交易中心爬虫"""
# ---------- 列表数据 ----------
def _build_list_url(self, category_code: str, notice_code: str, page_num: int) -> str:
"""构建列表页 URLSSR 页面,页 1-6"""
base = self.config["base_url"]
if notice_code:
if category_code:
path = f"/jyxx/{category_code}/{notice_code}"
else:
# 当只有notice_code时直接使用/jyxx/{notice_code}
path = f"/jyxx/{notice_code}"
elif category_code:
path = f"/jyxx/{category_code}"
else:
path = "/jyxx"
if page_num <= 1:
return f"{base}{path}/trade_infor.html"
else:
return f"{base}{path}/{page_num}.html"
def fetch_list_via_api(self, page_index: int, page_size: int,
category_num: str, start_date: str = "",
end_date: str = "") -> list:
"""通过 API 获取列表(第 7 页以后)"""
resp = self.fetch(
self.config["api_url"],
method="POST",
data={
"siteGuid": self.config["site_guid"],
"categoryNum": category_num,
"content": "",
"pageIndex": page_index,
"pageSize": page_size,
"YZM": "",
"ImgGuid": "",
"startdate": start_date,
"enddate": end_date,
"xiaqucode": "",
"projectjiaoyitype": "",
"jytype": "",
"zhuanzai": "",
},
)
if resp is None:
return []
try:
data = resp.json()
return data.get("custom", {}).get("infodata", [])
except Exception as e:
logger.error(f"解析 API 响应失败: {e}")
return []
def parse_html_list(self, html: str) -> list:
"""解析 SSR 列表页 HTML"""
soup = BeautifulSoup(html, "html.parser")
items = []
for a in soup.select("a.public-list-item"):
title = a.get("title", "").strip()
href = a.get("href", "")
if href and not href.startswith("http"):
href = self.config["base_url"] + href
date_el = a.select_one("span.date")
date = date_el.text.strip() if date_el else ""
region_el = a.select_one("span.xiaquclass")
region = region_el.text.strip().strip("【】") if region_el else ""
item = {
"标题": title,
"发布日期": date,
"地区": region,
"链接": href,
"来源": self.config["name"],
}
# 解析标题:提取项目名称和批准文号(统一规则)
item.update(self._parse_title(title))
if title and href:
items.append(item)
return items
def parse_api_list(self, records: list) -> list:
"""解析 API 返回的列表数据"""
items = []
for rec in records:
title = rec.get("title2") or rec.get("title", "")
href = rec.get("infourl", "")
if href and not href.startswith("http"):
href = self.config["base_url"] + href
item = {
"标题": title.strip(),
"发布日期": rec.get("infodate", ""),
"地区": rec.get("xiaquname", "").strip("【】"),
"链接": href,
"来源": self.config["name"],
}
# 解析标题:提取项目名称和批准文号(统一规则)
item.update(self._parse_title(title))
items.append(item)
return items
# ---------- 详情页 ----------
def parse_detail(self, url: str) -> dict:
"""解析详情页"""
resp = self.fetch(url)
if resp is None:
return {}
detail = {}
soup = BeautifulSoup(resp.text, "html.parser")
# 提取发布时间(从页面内容中获取,含时分秒)
page_text = soup.get_text(separator="\n", strip=True)
publish_time = self._extract_publish_time(soup, page_text)
if publish_time:
detail["详情页发布时间"] = publish_time
# 解析表格字段
field_map = {
"项目名称": "项目名称",
"联系人": "联系人",
"联系方式": "联系方式",
"建设单位(招标人)": "招标人",
"建设单位(招标人)": "招标人",
"项目批准文件及文号": "项目批准文号",
"项目类型": "项目类型",
"招标方式": "招标方式",
"主要建设内容": "主要建设内容",
}
for row in soup.select("table tr"):
cells = row.select("td")
if len(cells) >= 2:
key = cells[0].get_text(strip=True)
value = cells[1].get_text(strip=True)
if key in field_map and value:
detail[field_map[key]] = value
if len(cells) >= 4:
key2 = cells[2].get_text(strip=True)
value2 = cells[3].get_text(strip=True)
if key2 == "联系方式" and value2:
detail["联系方式"] = value2
# 招标项目表(计划招标时间 / 预估合同金额)
for table in soup.select("table"):
headers = [th.get_text(strip=True) for th in table.select("th")]
if "计划招标时间" in headers:
data_rows = table.select("tbody tr") or [
r for r in table.select("tr") if r.select("td")
]
if data_rows:
cells = data_rows[0].select("td")
for i, h in enumerate(headers):
if i < len(cells):
val = cells[i].get_text(strip=True)
if h == "计划招标时间" and val:
detail["计划招标时间"] = val
elif "预估合同金额" in h and val:
detail["预估合同金额(万元)"] = val
break
return detail
# ---------- 附件 ----------
def _extract_attachments(self, url: str) -> list:
"""从详情页提取附件链接"""
resp = self.fetch(url)
if resp is None:
return []
attachments = []
for href in re.findall(r'href=["\']([^"\']*\.pdf[^"\']*)', resp.text):
if not href.startswith("http"):
href = self.config["base_url"] + href
attachments.append({"name": href.split("/")[-1], "url": href})
for href in re.findall(r'href=["\']([^"\']*\.docx?[^"\']*)', resp.text):
if not href.startswith("http"):
href = self.config["base_url"] + href
attachments.append({"name": href.split("/")[-1], "url": href})
return attachments
# ---------- 主流程 ----------
def crawl(self, max_pages: int = None, category: str = None,
notice_type: str = None, date_filter: str = None,
download_attachment: bool = False, **kwargs):
"""
执行爬取
Args:
max_pages: 最大爬取页数
category: 交易领域
notice_type: 公告类型
date_filter: 日期过滤
download_attachment: 是否下载附件
"""
if max_pages is None:
max_pages = self.spider_config.get("max_pages", 10)
page_size = 10 # 台州站固定每页 10 条
# 日期过滤
target_date = None
start_date = end_date = ""
if date_filter == "yesterday":
d = datetime.now() - timedelta(days=1)
target_date = d.strftime("%Y-%m-%d")
start_date = target_date + " 00:00:00"
end_date = target_date + " 23:59:59"
logger.info(f"过滤日期: {target_date}(昨天)")
elif date_filter:
target_date = date_filter
start_date = target_date + " 00:00:00"
end_date = target_date + " 23:59:59"
logger.info(f"过滤日期: {target_date}")
category_code = self.config.get("categories", {}).get(category, "")
notice_code = self.config.get("notice_types", {}).get(notice_type, "")
category_num = notice_code or category_code or "002"
# 附件
attachment_handler = None
if download_attachment:
attachment_dir = os.path.join(self.data_dir, "attachments")
attachment_handler = AttachmentHandler(attachment_dir)
logger.info(f"启用附件下载,保存到: {attachment_dir}")
logger.info(f"开始爬取: {self.config['name']}")
if category:
logger.info(f"交易领域: {category}")
if notice_type:
logger.info(f"公告类型: {notice_type}")
for page_num in range(1, max_pages + 1):
if self._check_limits():
break
logger.info(f"正在爬取第 {page_num} 页...")
# 页 1-7 用 SSR8+ 用 API
if page_num <= 7:
url = self._build_list_url(category_code, notice_code, page_num)
resp = self.fetch(url)
if resp is None:
break
page_items = self.parse_html_list(resp.text)
else:
records = self.fetch_list_via_api(
page_num - 1, page_size, category_num,
start_date, end_date,
)
if not records:
logger.info("没有更多数据")
break
page_items = self.parse_api_list(records)
if not page_items:
logger.info("没有更多数据")
break
# 日期过滤 + 去重
count = 0
has_older = False # 是否存在比目标日期更早的记录
for item in page_items:
if target_date and item["发布日期"] != target_date:
if target_date and item["发布日期"] < target_date:
has_older = True
continue
if self.is_duplicate(item["链接"]):
continue
# 详情页
self.detail_delay()
detail = self.parse_detail(item["链接"])
item.update(detail)
# 附件
if download_attachment and attachment_handler:
atts = self._extract_attachments(item["链接"])
if atts:
item["附件数量"] = len(atts)
att_names = []
for att in atts:
att_names.append(att["name"])
result = attachment_handler.download_and_extract(att["url"])
if result["success"] and result["text"]:
item["附件内容摘要"] = result["text"][:2000]
item["附件名称"] = " | ".join(att_names)
self.results.append(item)
count += 1
logger.info(f" 获取 {count} 条数据")
if count == 0:
if not target_date or has_older:
# 无日期过滤 / 已出现更早日期 → 停止
logger.info("当前页无新数据,停止翻页")
break
else:
# 页面全是比目标日期更新的数据,继续翻页
logger.info(" 当前页均为更新日期的数据,继续翻页")
self.delay()
continue
self.delay()
self.print_stats()
logger.info(f"爬取完成,共 {len(self.results)} 条数据")
return self.results