Files
ztb/spiders/taizhou.py

336 lines
12 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
"""
台州公共资源交易中心爬虫 基于 API + requests
"""
import logging
import os
import re
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
from .base import BaseSpider
from utils.attachment import AttachmentHandler
logger = logging.getLogger("ztb")
class TaizhouSpider(BaseSpider):
"""台州公共资源交易中心爬虫"""
# ---------- 列表数据 ----------
def _build_list_url(self, category_code: str, notice_code: str, page_num: int) -> str:
"""构建列表页 URLSSR 页面,页 1-6"""
base = self.config["base_url"]
if notice_code:
if category_code:
path = f"/jyxx/{category_code}/{notice_code}"
else:
# 当只有notice_code时直接使用/jyxx/{notice_code}
path = f"/jyxx/{notice_code}"
elif category_code:
path = f"/jyxx/{category_code}"
else:
path = "/jyxx"
if page_num <= 1:
return f"{base}{path}/trade_infor.html"
else:
return f"{base}{path}/{page_num}.html"
def fetch_list_via_api(self, page_index: int, page_size: int,
category_num: str, start_date: str = "",
end_date: str = "") -> list:
"""通过 API 获取列表(第 7 页以后)"""
resp = self.fetch(
self.config["api_url"],
method="POST",
data={
"siteGuid": self.config["site_guid"],
"categoryNum": category_num,
"content": "",
"pageIndex": page_index,
"pageSize": page_size,
"YZM": "",
"ImgGuid": "",
"startdate": start_date,
"enddate": end_date,
"xiaqucode": "",
"projectjiaoyitype": "",
"jytype": "",
"zhuanzai": "",
},
)
if resp is None:
return []
try:
data = resp.json()
return data.get("custom", {}).get("infodata", [])
except Exception as e:
logger.error(f"解析 API 响应失败: {e}")
return []
def parse_html_list(self, html: str) -> list:
"""解析 SSR 列表页 HTML"""
soup = BeautifulSoup(html, "html.parser")
items = []
for a in soup.select("a.public-list-item"):
title = a.get("title", "").strip()
href = a.get("href", "")
if href and not href.startswith("http"):
href = self.config["base_url"] + href
date_el = a.select_one("span.date")
date = date_el.text.strip() if date_el else ""
region_el = a.select_one("span.xiaquclass")
region = region_el.text.strip().strip("【】") if region_el else ""
item = {
"标题": title,
"发布日期": date,
"地区": region,
"链接": href,
"来源": self.config["name"],
}
# 解析标题:提取项目名称和批准文号(统一规则)
item.update(self._parse_title(title))
if title and href:
items.append(item)
return items
def parse_api_list(self, records: list) -> list:
"""解析 API 返回的列表数据"""
items = []
for rec in records:
title = rec.get("title2") or rec.get("title", "")
href = rec.get("infourl", "")
if href and not href.startswith("http"):
href = self.config["base_url"] + href
item = {
"标题": title.strip(),
"发布日期": rec.get("infodate", ""),
"地区": rec.get("xiaquname", "").strip("【】"),
"链接": href,
"来源": self.config["name"],
}
# 解析标题:提取项目名称和批准文号(统一规则)
item.update(self._parse_title(title))
items.append(item)
return items
# ---------- 详情页 ----------
def parse_detail(self, url: str) -> dict:
"""解析详情页"""
resp = self.fetch(url)
if resp is None:
return {}
detail = {}
soup = BeautifulSoup(resp.text, "html.parser")
# 提取发布时间(从页面内容中获取,含时分秒)
page_text = soup.get_text(separator="\n", strip=True)
publish_time = self._extract_publish_time(soup, page_text)
if publish_time:
detail["详情页发布时间"] = publish_time
# 解析表格字段
field_map = {
"项目名称": "项目名称",
"联系人": "联系人",
"联系方式": "联系方式",
"建设单位(招标人)": "招标人",
"建设单位(招标人)": "招标人",
"项目批准文件及文号": "项目批准文号",
"项目类型": "项目类型",
"招标方式": "招标方式",
"主要建设内容": "主要建设内容",
}
for row in soup.select("table tr"):
cells = row.select("td")
if len(cells) >= 2:
key = cells[0].get_text(strip=True)
value = cells[1].get_text(strip=True)
if key in field_map and value:
detail[field_map[key]] = value
if len(cells) >= 4:
key2 = cells[2].get_text(strip=True)
value2 = cells[3].get_text(strip=True)
if key2 == "联系方式" and value2:
detail["联系方式"] = value2
# 招标项目表(计划招标时间 / 预估合同金额)
for table in soup.select("table"):
headers = [th.get_text(strip=True) for th in table.select("th")]
if "计划招标时间" in headers:
data_rows = table.select("tbody tr") or [
r for r in table.select("tr") if r.select("td")
]
if data_rows:
cells = data_rows[0].select("td")
for i, h in enumerate(headers):
if i < len(cells):
val = cells[i].get_text(strip=True)
if h == "计划招标时间" and val:
detail["计划招标时间"] = val
elif "预估合同金额" in h and val:
detail["预估合同金额(万元)"] = val
break
return detail
# ---------- 附件 ----------
def _extract_attachments(self, url: str) -> list:
"""从详情页提取附件链接"""
resp = self.fetch(url)
if resp is None:
return []
attachments = []
for href in re.findall(r'href=["\']([^"\']*\.pdf[^"\']*)', resp.text):
if not href.startswith("http"):
href = self.config["base_url"] + href
attachments.append({"name": href.split("/")[-1], "url": href})
for href in re.findall(r'href=["\']([^"\']*\.docx?[^"\']*)', resp.text):
if not href.startswith("http"):
href = self.config["base_url"] + href
attachments.append({"name": href.split("/")[-1], "url": href})
return attachments
# ---------- 主流程 ----------
def crawl(self, max_pages: int = None, category: str = None,
notice_type: str = None, date_filter: str = None,
download_attachment: bool = False, **kwargs):
"""
执行爬取
Args:
max_pages: 最大爬取页数
category: 交易领域
notice_type: 公告类型
date_filter: 日期过滤
download_attachment: 是否下载附件
"""
if max_pages is None:
max_pages = self.spider_config.get("max_pages", 10)
page_size = 10 # 台州站固定每页 10 条
# 日期过滤
target_date = None
start_date = end_date = ""
if date_filter == "yesterday":
d = datetime.now() - timedelta(days=1)
target_date = d.strftime("%Y-%m-%d")
start_date = target_date + " 00:00:00"
end_date = target_date + " 23:59:59"
logger.info(f"过滤日期: {target_date}(昨天)")
elif date_filter:
target_date = date_filter
start_date = target_date + " 00:00:00"
end_date = target_date + " 23:59:59"
logger.info(f"过滤日期: {target_date}")
category_code = self.config.get("categories", {}).get(category, "")
notice_code = self.config.get("notice_types", {}).get(notice_type, "")
category_num = notice_code or category_code or "002"
# 附件
attachment_handler = None
if download_attachment:
attachment_dir = os.path.join(self.data_dir, "attachments")
attachment_handler = AttachmentHandler(attachment_dir)
logger.info(f"启用附件下载,保存到: {attachment_dir}")
logger.info(f"开始爬取: {self.config['name']}")
if category:
logger.info(f"交易领域: {category}")
if notice_type:
logger.info(f"公告类型: {notice_type}")
for page_num in range(1, max_pages + 1):
if self._check_limits():
break
logger.info(f"正在爬取第 {page_num} 页...")
# 页 1-7 用 SSR8+ 用 API
if page_num <= 7:
url = self._build_list_url(category_code, notice_code, page_num)
resp = self.fetch(url)
if resp is None:
break
page_items = self.parse_html_list(resp.text)
else:
records = self.fetch_list_via_api(
page_num - 1, page_size, category_num,
start_date, end_date,
)
if not records:
logger.info("没有更多数据")
break
page_items = self.parse_api_list(records)
if not page_items:
logger.info("没有更多数据")
break
# 日期过滤 + 去重
count = 0
has_older = False # 是否存在比目标日期更早的记录
for item in page_items:
if target_date and item["发布日期"] != target_date:
if target_date and item["发布日期"] < target_date:
has_older = True
continue
if self.is_duplicate(item["链接"]):
continue
# 详情页
self.detail_delay()
detail = self.parse_detail(item["链接"])
item.update(detail)
# 附件
if download_attachment and attachment_handler:
atts = self._extract_attachments(item["链接"])
if atts:
item["附件数量"] = len(atts)
att_names = []
for att in atts:
att_names.append(att["name"])
result = attachment_handler.download_and_extract(att["url"])
if result["success"] and result["text"]:
item["附件内容摘要"] = result["text"][:2000]
item["附件名称"] = " | ".join(att_names)
self.results.append(item)
count += 1
logger.info(f" 获取 {count} 条数据")
if count == 0:
if not target_date or has_older:
# 无日期过滤 / 已出现更早日期 → 停止
logger.info("当前页无新数据,停止翻页")
break
else:
# 页面全是比目标日期更新的数据,继续翻页
logger.info(" 当前页均为更新日期的数据,继续翻页")
self.delay()
continue
self.delay()
self.print_stats()
logger.info(f"爬取完成,共 {len(self.results)} 条数据")
return self.results