定时任务创建修改

This commit is contained in:
2025-11-10 19:04:16 +08:00
parent 9adbd6d365
commit 81ec0f0fc9
18 changed files with 636 additions and 554 deletions

View File

@@ -1,21 +1,16 @@
package org.xyzh.crontab.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.xyzh.crontab.task.newsTask.ScriptDomain;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import java.util.List;
import org.springframework.stereotype.Component;
@Data
@ConfigurationProperties(prefix = "crawler")
@Component
public class CrawlerProperties {
@Value("${crawler.base.path}")
private String pythonPath;
private String basePath;
@Value("${crawler.script}")
private List<ScriptDomain> scripts;
}

View File

@@ -0,0 +1,21 @@
package org.xyzh.crontab.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.xyzh.crontab.pojo.CrontabItem;
import java.util.List;
@Data
@Component
@ConfigurationProperties(prefix = "crontab")
public class CrontabProperties {
/**
* 可供前端选择的“爬虫任务模版”列表(仅描述爬虫能力,不含调度信息)
*/
private List<CrontabItem> items;
}

View File

@@ -1,8 +0,0 @@
package org.xyzh.crontab.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "crontab")
public class CrontabPrpperties {
}

View File

@@ -12,6 +12,8 @@ import org.xyzh.common.dto.crontab.TbCrontabTask;
import org.xyzh.common.dto.crontab.TbCrontabLog;
import org.xyzh.common.utils.IDUtils;
import org.xyzh.crontab.pojo.CrontabItem;
import org.xyzh.common.utils.spring.SpringContextUtil;
import org.xyzh.crontab.config.CrontabProperties;
import java.util.Date;
import org.springframework.web.bind.annotation.GetMapping;
@@ -39,8 +41,17 @@ public class CrontabController {
* @return
*/
@GetMapping("/getEnabledCrontabList")
public ResultDomain<CrontabItem> getEnabledCrontabList(@RequestParam String param) {
return null;
public ResultDomain<CrontabItem> getEnabledCrontabList(@RequestParam(required = false) String param) {
ResultDomain<CrontabItem> rd = new ResultDomain<>();
try {
// 仅返回爬虫能力的元信息(任务模版列表),不包含调度相关内容
CrontabProperties props =
SpringContextUtil.getBean(CrontabProperties.class);
rd.success("ok", props.getItems());
} catch (Exception e) {
rd.fail("获取可创建定时任务失败: " + e.getMessage());
}
return rd;
}
/**
@@ -50,7 +61,14 @@ public class CrontabController {
*/
@PostMapping("/crontabTask")
public ResultDomain<TbCrontabTask> createCrontab(@RequestBody TbCrontabTask crontabItem) {
return null;
ResultDomain<TbCrontabTask> rd = new ResultDomain<>();
try {
return crontabService.createTask(crontabItem);
} catch (Exception e) {
logger.error("创建定时任务失败", e);
rd.fail("创建定时任务失败: " + e.getMessage());
return rd;
}
}
/**
@@ -60,7 +78,14 @@ public class CrontabController {
*/
@PutMapping("/crontabTask")
public ResultDomain<TbCrontabTask> updateCrontab(@RequestBody TbCrontabTask crontabItem) {
return null;
ResultDomain<TbCrontabTask> rd = new ResultDomain<>();
try {
return crontabService.updateTask(crontabItem);
} catch (Exception e) {
logger.error("更新定时任务失败", e);
rd.fail("更新定时任务失败: " + e.getMessage());
return rd;
}
}
/**
@@ -70,7 +95,18 @@ public class CrontabController {
*/
@DeleteMapping("/crontabTask")
public ResultDomain<TbCrontabTask> deleteCrontab(@RequestBody TbCrontabTask crontabItem) {
return null;
ResultDomain<TbCrontabTask> rd = new ResultDomain<>();
try {
if (crontabItem.getTaskId() == null || crontabItem.getTaskId().isEmpty()) {
rd.fail("任务ID不能为空");
return rd;
}
return crontabService.deleteTask(crontabItem.getTaskId());
} catch (Exception e) {
logger.error("删除定时任务失败", e);
rd.fail("删除定时任务失败: " + e.getMessage());
return rd;
}
}
/**
@@ -80,7 +116,16 @@ public class CrontabController {
*/
@PostMapping("/crontabTaskPage")
public ResultDomain<TbCrontabTask> getCrontabTask(@RequestBody PageRequest<TbCrontabTask> pageRequest) {
return null;
ResultDomain<TbCrontabTask> rd = new ResultDomain<>();
try {
TbCrontabTask filter = pageRequest.getFilter();
PageParam pageParam = pageRequest.getPageParam();
return crontabService.getTaskPage(filter, pageParam);
} catch (Exception e) {
logger.error("获取定时任务分页列表失败", e);
rd.fail("获取定时任务分页列表失败: " + e.getMessage());
return rd;
}
}
/**
@@ -90,7 +135,16 @@ public class CrontabController {
*/
@PostMapping("/crontabTaskLogPage")
public ResultDomain<TbCrontabLog> getCrontabTaskLog(@RequestBody PageRequest<TbCrontabLog> pageRequest) {
return null;
ResultDomain<TbCrontabLog> rd = new ResultDomain<>();
try {
TbCrontabLog filter = pageRequest.getFilter();
PageParam pageParam = pageRequest.getPageParam();
return crontabService.getLogPage(filter, pageParam);
} catch (Exception e) {
logger.error("获取定时任务日志分页列表失败", e);
rd.fail("获取定时任务日志分页列表失败: " + e.getMessage());
return rd;
}
}

View File

@@ -31,32 +31,60 @@ public class DataCollectionItemController {
/**
* @description 查看一个任务日志对应创建的所有数据采集项
* @param taskLogId
* @return
* @param taskLogId 日志ID
* @return ResultDomain<DataCollectionItemVO>
*/
@GetMapping("/task/{taskLogId}")
public ResultDomain<DataCollectionItemVO> getTaskLogDataCollectionItemList(@PathVariable String taskLogId) {
return null;
TbDataCollectionItem filter = new TbDataCollectionItem();
filter.setLogId(taskLogId);
return itemService.getItemList(filter);
}
/**
* @description 获取数据采集项分页列表
* @param pageRequest
* @return
* @param pageRequest 分页请求filter支持按logId、taskId、status等过滤
* @return ResultDomain<DataCollectionItemVO>
*/
@PostMapping("/page")
public ResultDomain<DataCollectionItemVO> getCollectionItemPage(@RequestBody PageRequest<DataCollectionItemVO> pageRequest) {
return null;
public ResultDomain<DataCollectionItemVO> getCollectionItemPage(@RequestBody PageRequest<TbDataCollectionItem> pageRequest) {
TbDataCollectionItem filter = pageRequest.getFilter();
PageParam pageParam = pageRequest.getPageParam();
return itemService.getItemPage(filter, pageParam);
}
/**
* @description 转换成文章
* @param dataCollectionItem
* @return
* @param request 转换请求itemId、tagId
* @return ResultDomain<DataCollectionItemVO>
*/
@PostMapping("/resource")
public ResultDomain<DataCollectionItemVO> convertToArticle(@RequestBody DataCollectionItemVO dataCollectionItem) {
return null;
public ResultDomain<String> convertToArticle(@RequestBody ConvertRequest request) {
return itemService.convertToResource(request.getItemId(), request.getTagId());
}
/**
* @description 转换请求
*/
public static class ConvertRequest {
private String itemId;
private String tagId;
public String getItemId() {
return itemId;
}
public void setItemId(String itemId) {
this.itemId = itemId;
}
public String getTagId() {
return tagId;
}
public void setTagId(String tagId) {
this.tagId = tagId;
}
}
}

View File

@@ -4,6 +4,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
import java.util.Map;
import com.alibaba.fastjson2.annotation.JSONField;
@Data
@NoArgsConstructor
@@ -15,6 +16,7 @@ public class CrontabItem {
@Data
public class CrontabMethod {
private String name;
@JSONField(name = "class")
private String clazz;
private String path;
private Map<String, Object> params;

View File

@@ -0,0 +1,108 @@
package org.xyzh.crontab.pojo;
import com.alibaba.fastjson2.JSON;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
/**
* @description 定时任务参数封装
* @filename TaskParams.java
* @author yslg
* @copyright xyzh
* @since 2025-11-10
*/
@Data
@NoArgsConstructor
public class TaskParams {
/**
* 任务分组对应yml中的item.name
*/
private String taskGroup;
/**
* 方法名称对应yml中的method.name
*/
private String methodName;
/**
* 实际参数
*/
private Map<String, Object> params;
/**
* 从JSON字符串解析
*/
public static TaskParams fromJson(String json) {
if (json == null || json.trim().isEmpty()) {
return null;
}
try {
return JSON.parseObject(json, TaskParams.class);
} catch (Exception e) {
return null;
}
}
/**
* 转换为JSON字符串
*/
public String toJson() {
return JSON.toJSONString(this);
}
/**
* 获取参数值
*/
public Object getParam(String key) {
return params != null ? params.get(key) : null;
}
/**
* 设置参数值
*/
public void setParam(String key, Object value) {
if (params != null) {
params.put(key, value);
}
}
/**
* 获取字符串参数
*/
public String getParamAsString(String key) {
Object value = getParam(key);
return value != null ? value.toString() : null;
}
/**
* 获取整数参数
*/
public Integer getParamAsInt(String key) {
Object value = getParam(key);
if (value instanceof Integer) {
return (Integer) value;
} else if (value != null) {
try {
return Integer.parseInt(value.toString());
} catch (NumberFormatException e) {
return null;
}
}
return null;
}
/**
* 获取布尔参数
*/
public Boolean getParamAsBoolean(String key) {
Object value = getParam(key);
if (value instanceof Boolean) {
return (Boolean) value;
} else if (value != null) {
return Boolean.parseBoolean(value.toString());
}
return null;
}
}

View File

@@ -9,9 +9,11 @@ import org.xyzh.common.dto.crontab.TbCrontabLog;
import org.xyzh.common.dto.crontab.TbCrontabTask;
import org.xyzh.common.utils.IDUtils;
import org.xyzh.crontab.mapper.CrontabLogMapper;
import org.xyzh.crontab.pojo.TaskParams;
import java.lang.reflect.Method;
import java.util.Date;
import java.util.HashMap;
/**
* @description 任务执行器
@@ -59,20 +61,16 @@ public class TaskExecutor {
// 获取Bean实例
Object bean = applicationContext.getBean(task.getBeanName());
// 获取方法
// 注入taskId和logId到参数中适用于所有BaseTask子类
String methodParams = injectTaskContext(bean, task, log);
// 获取并调用方法
Method method;
if (task.getMethodParams() != null && !task.getMethodParams().isEmpty()) {
// 如果有参数,需要解析参数类型
if (methodParams != null && !methodParams.isEmpty()) {
method = bean.getClass().getMethod(task.getMethodName(), String.class);
// 如果是newsCrewerTask将taskId添加到参数前面
String methodParams = task.getMethodParams();
if ("newsCrewerTask".equals(task.getBeanName()) && task.getTaskId() != null) {
methodParams = task.getTaskId() + "|" + methodParams;
}
method.invoke(bean, methodParams);
} else {
// 无参方法
method = bean.getClass().getMethod(task.getMethodName());
method.invoke(bean);
}
@@ -126,8 +124,40 @@ public class TaskExecutor {
if (stackTrace.length > limit) {
sb.append("\t... ").append(stackTrace.length - limit).append(" more\n");
}
return sb.toString();
}
/**
* @description 注入任务上下文taskId和logId到参数中
* @param bean Bean实例
* @param task 任务对象
* @param log 日志对象
* @return 注入后的参数字符串
*/
private String injectTaskContext(Object bean, TbCrontabTask task, TbCrontabLog log) {
String methodParams = task.getMethodParams();
// 如果Bean是BaseTask的子类注入taskId和logId到JSON参数中
if (bean instanceof org.xyzh.crontab.task.BaseTask) {
try {
TaskParams taskParams = TaskParams.fromJson(methodParams);
if (taskParams != null) {
// 注入taskId和logId
if (taskParams.getParams() == null) {
taskParams.setParams(new HashMap<>());
}
taskParams.getParams().put("taskId", task.getTaskId());
taskParams.getParams().put("logId", log.getID());
methodParams = taskParams.toJson();
logger.debug("已注入任务上下文: taskId={}, logId={}", task.getTaskId(), log.getID());
}
} catch (Exception e) {
logger.warn("注入任务上下文失败,使用原始参数: {}", e.getMessage());
}
}
return methodParams;
}
}

View File

@@ -0,0 +1,62 @@
package org.xyzh.crontab.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xyzh.crontab.pojo.TaskParams;
/**
* @description 定时任务基类
* @filename BaseTask.java
* @author yslg
* @copyright xyzh
* @since 2025-11-10
*/
public abstract class BaseTask {
protected final Logger logger = LoggerFactory.getLogger(getClass());
/**
* 执行任务(统一入口)
* @param params JSON格式的参数字符串
*/
public void execute(String params) {
logger.info("开始执行定时任务,参数: {}", params);
try {
// 解析参数
TaskParams taskParams = parseParams(params);
if (taskParams == null) {
logger.error("参数解析失败,任务终止");
throw new RuntimeException("参数解析失败");
}
logger.info("任务分组: {}, 方法名称: {}",
taskParams.getTaskGroup(), taskParams.getMethodName());
// 执行具体任务
doExecute(taskParams);
logger.info("定时任务执行完成");
} catch (Exception e) {
logger.error("定时任务执行异常: ", e);
throw new RuntimeException("定时任务执行异常", e);
}
}
/**
* 解析参数
* @param params 参数字符串
* @return TaskParams 解析后的参数对象
*/
protected TaskParams parseParams(String params) {
return TaskParams.fromJson(params);
}
/**
* 执行具体任务(由子类实现)
* @param taskParams 任务参数
*/
protected abstract void doExecute(TaskParams taskParams) throws Exception;
}

View File

@@ -0,0 +1,101 @@
package org.xyzh.crontab.task;
import org.xyzh.crontab.pojo.TaskParams;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* @description 命令行任务抽象类
* @filename CommandTask.java
* @author yslg
* @copyright xyzh
* @since 2025-11-10
*/
public abstract class CommandTask extends BaseTask {
/**
* 执行命令行任务
*/
@Override
protected void doExecute(TaskParams taskParams) throws Exception {
// 获取工作目录
String workingDir = getWorkingDirectory(taskParams);
Path workDir = Paths.get(workingDir);
if (!Files.exists(workDir)) {
throw new RuntimeException("工作目录不存在: " + workingDir);
}
// 构建命令
List<String> command = buildCommand(taskParams);
logger.info("执行命令: {}", String.join(" ", command));
// 创建进程构建器
ProcessBuilder processBuilder = new ProcessBuilder(command);
processBuilder.directory(workDir.toFile());
processBuilder.redirectErrorStream(true);
// 启动进程
Process process = processBuilder.start();
// 读取输出
StringBuilder output = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), "UTF-8"))) {
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append("\n");
logger.debug("命令输出: {}", line);
}
}
// 等待进程结束
int timeout = getTimeout(taskParams);
boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
if (!finished) {
process.destroy();
throw new RuntimeException("命令执行超时(超过" + timeout + "秒)");
}
int exitCode = process.exitValue();
if (exitCode == 0) {
logger.info("命令执行成功");
// 处理执行结果
handleResult(taskParams, output.toString());
} else {
logger.error("命令执行失败,退出码: {}", exitCode);
logger.error("输出内容:\n{}", output.toString());
throw new RuntimeException("命令执行失败,退出码: " + exitCode);
}
}
/**
* 获取工作目录(由子类实现)
*/
protected abstract String getWorkingDirectory(TaskParams taskParams);
/**
* 构建命令(由子类实现)
*/
protected abstract List<String> buildCommand(TaskParams taskParams) throws Exception;
/**
* 获取超时时间(秒)
*/
protected int getTimeout(TaskParams taskParams) {
return 300; // 默认5分钟
}
/**
* 处理执行结果(由子类实现)
*/
protected abstract void handleResult(TaskParams taskParams, String output) throws Exception;
}

View File

@@ -0,0 +1,76 @@
package org.xyzh.crontab.task;
import org.springframework.beans.factory.annotation.Autowired;
import org.xyzh.crontab.config.CrawlerProperties;
import org.xyzh.crontab.pojo.TaskParams;
import java.util.ArrayList;
import java.util.List;
/**
* @description Python命令任务抽象类
* @filename PythonCommandTask.java
* @author yslg
* @copyright xyzh
* @since 2025-11-10
*/
public abstract class PythonCommandTask extends CommandTask {
@Autowired
protected CrawlerProperties crawlerProperties;
/**
* 获取Python可执行文件路径
*/
protected String getPythonPath() {
return crawlerProperties.getPythonPath() != null
? crawlerProperties.getPythonPath()
: "python";
}
/**
* 获取脚本基础路径
*/
protected String getScriptBasePath() {
return crawlerProperties.getBasePath() != null
? crawlerProperties.getBasePath()
: "../schoolNewsCrawler";
}
/**
* 获取工作目录(返回脚本基础路径)
*/
@Override
protected String getWorkingDirectory(TaskParams taskParams) {
return getScriptBasePath();
}
/**
* 构建Python命令
*/
@Override
protected List<String> buildCommand(TaskParams taskParams) throws Exception {
List<String> command = new ArrayList<>();
// 检查操作系统
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("win")) {
command.add("cmd");
command.add("/c");
}
command.add(getPythonPath());
// 添加Python脚本和参数由子类实现
command.addAll(buildPythonArgs(taskParams));
return command;
}
/**
* 构建Python脚本参数由子类实现
* @param taskParams 任务参数
* @return Python脚本名称和参数列表
*/
protected abstract List<String> buildPythonArgs(TaskParams taskParams) throws Exception;
}

View File

@@ -1,16 +1,16 @@
package org.xyzh.crontab.task.newsTask;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.xyzh.api.crontab.DataCollectionItemService;
import org.xyzh.common.core.domain.ResultDomain;
import org.xyzh.common.dto.crontab.TbDataCollectionItem;
import org.xyzh.crontab.config.CrontabProperties;
import org.xyzh.crontab.pojo.TaskParams;
import org.xyzh.crontab.task.PythonCommandTask;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -18,235 +18,142 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* @description 新闻爬虫定时任务
* @filename NewsCrewerTask.java
* @filename NewsCrawlerTask.java
* @author yslg
* @copyright xyzh
* @since 2025-11-08
*/
@Component("newsCrewerTask")
public class NewsCrawlerTask {
public class NewsCrawlerTask extends PythonCommandTask {
private static final Logger logger = LoggerFactory.getLogger(NewsCrawlerTask.class);
@Value("${crewer.python.path:python}")
private String pythonPath;
@Value("${crewer.script.path:../schoolNewsCrewer}")
private String scriptPath;
@Value("${crewer.timeout:300}")
private int timeout;
@Autowired
private CrontabProperties crontabProperties;
@Autowired
private DataCollectionItemService itemService;
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* @description 执行新闻爬虫任务默认爬取人民日报政治类新闻20条
* @author yslg
* @since 2025-11-08
* 构建Python脚本参数
*/
public void execute() {
execute("rmrb,politics,20");
@Override
protected List<String> buildPythonArgs(TaskParams taskParams) throws Exception {
List<String> args = new ArrayList<>();
String methodName = taskParams.getMethodName();
String source = "rmrb";
String category = "politics";
String limit = "20";
// 根据不同的方法名称构建不同的参数
if ("关键字搜索爬取".equals(methodName)) {
String query = taskParams.getParamAsString("query");
Integer total = taskParams.getParamAsInt("total");
category = query != null ? query : "politics";
limit = total != null ? total.toString() : "20";
} else if ("排行榜爬取".equals(methodName)) {
category = "ranking";
} else if ("往日精彩头条爬取".equals(methodName)) {
String startDate = taskParams.getParamAsString("startDate");
String endDate = taskParams.getParamAsString("endDate");
Boolean isYesterday = taskParams.getParamAsBoolean("isYesterday");
category = "history";
// 这里可以将日期参数传递给Python脚本
}
// 生成输出文件名
String timestamp = String.valueOf(System.currentTimeMillis());
String outputFile = String.format("output/news_%s_%s_%s.json", source, category, timestamp);
// 保存输出文件路径到params中供handleResult使用
taskParams.setParam("_outputFile", outputFile);
// 添加脚本和参数
args.add("main.py");
args.add(category);
args.add(limit);
args.add(outputFile);
logger.info("爬虫参数 - 来源: {}, 分类: {}, 数量: {}", source, category, limit);
return args;
}
/**
* @description 执行新闻爬虫任务
* @param params 参数格式: "source,category,limit" 或 "taskId|source,category,limit"
* 如果包含taskId格式为: "taskId|source,category,limit"
* source: 新闻源rmrb-人民日报)
* category: 分类politics-政治, society-社会等)
* limit: 爬取数量
* @author yslg
* @since 2025-11-08
* 处理执行结果
*/
public void execute(String params) {
logger.info("开始执行新闻爬虫任务,参数: {}", params);
@Override
protected void handleResult(TaskParams taskParams, String output) throws Exception {
String outputFile = taskParams.getParamAsString("_outputFile");
Path outputPath = Paths.get(getScriptBasePath()).resolve(outputFile);
try {
// 解析参数支持taskId|source,category,limit格式
String taskId = null;
String actualParams = params;
if (params.contains("|")) {
String[] parts = params.split("\\|", 2);
taskId = parts[0];
actualParams = parts[1];
}
String[] paramArray = actualParams.split(",");
String source = paramArray.length > 0 ? paramArray[0] : "rmrb";
String category = paramArray.length > 1 ? paramArray[1] : "politics";
String limit = paramArray.length > 2 ? paramArray[2] : "20";
if (!Files.exists(outputPath)) {
logger.warn("输出文件不存在: {}", outputFile);
return;
}
logger.info("爬虫参数 - 来源: {}, 分类: {}, 数量: {}", source, category, limit);
// 读取并解析结果文件
String jsonContent = Files.readString(outputPath);
List<ArticleStruct> newsList = JSON.parseObject(
jsonContent,
new TypeReference<List<ArticleStruct>>() {}
);
// 验证Python和脚本路径
Path scriptDir = Paths.get(scriptPath);
if (!Files.exists(scriptDir)) {
throw new RuntimeException("爬虫脚本目录不存在: " + scriptPath);
}
logger.info("成功爬取 {} 条新闻", newsList.size());
// 构建Python命令
List<String> command = new ArrayList<>();
// 检查是否是Windows系统
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("win")) {
command.add("cmd");
command.add("/c");
command.add(pythonPath);
} else {
command.add(pythonPath);
}
command.add("main.py");
command.add(category);
command.add(limit);
// 获取taskId和logId
String taskId = taskParams.getParamAsString("taskId");
String logId = taskParams.getParamAsString("logId");
// 生成输出文件名
String timestamp = String.valueOf(System.currentTimeMillis());
String outputFile = String.format("output/news_%s_%s_%s.json", source, category, timestamp);
command.add(outputFile);
logger.info("执行命令: {}", String.join(" ", command));
// 创建进程构建器
ProcessBuilder processBuilder = new ProcessBuilder(command);
processBuilder.directory(scriptDir.toFile());
processBuilder.redirectErrorStream(true);
// 启动进程
Process process = processBuilder.start();
// 读取输出
StringBuilder output = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), "UTF-8"))) {
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append("\n");
logger.debug("Python输出: {}", line);
}
}
// 等待进程结束
boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
if (!finished) {
process.destroy();
throw new RuntimeException("爬虫任务超时(超过" + timeout + "秒)");
}
int exitCode = process.exitValue();
if (exitCode == 0) {
logger.info("新闻爬虫任务执行成功");
// 读取并解析结果文件
Path outputPath = scriptDir.resolve(outputFile);
if (Files.exists(outputPath)) {
String jsonContent = Files.readString(outputPath);
ObjectMapper mapper = new ObjectMapper();
List<Map<String, Object>> newsList = mapper.readValue(
jsonContent,
List.class
);
logger.info("成功爬取 {} 条新闻", newsList.size());
// 保存新闻数据到数据库
if (taskId != null && !taskId.isEmpty()) {
saveNewsToDatabase(newsList, taskId, source, category);
} else {
logger.warn("未提供任务ID跳过数据保存");
}
} else {
logger.warn("输出文件不存在: {}", outputFile);
}
} else {
logger.error("新闻爬虫任务执行失败,退出码: {}", exitCode);
logger.error("输出内容:\n{}", output.toString());
throw new RuntimeException("爬虫任务执行失败,退出码: " + exitCode);
}
} catch (Exception e) {
logger.error("新闻爬虫任务执行异常: ", e);
throw new RuntimeException("新闻爬虫任务执行异常", e);
// 保存新闻数据到数据库
if (taskId != null && !taskId.isEmpty() && logId != null && !logId.isEmpty()) {
saveNewsToDatabase(newsList, taskId, logId);
} else {
logger.warn("未提供任务ID或日志ID跳过数据保存");
}
}
/**
* @description 测试Python环境
* @author yslg
* @since 2025-11-08
* 将新闻数据保存到数据库
*/
public void testPythonEnvironment() {
logger.info("测试Python环境...");
try {
ProcessBuilder pb = new ProcessBuilder(pythonPath, "--version");
Process process = pb.start();
BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream())
);
String version = reader.readLine();
int exitCode = process.waitFor();
if (exitCode == 0) {
logger.info("Python环境正常: {}", version);
} else {
logger.error("Python环境异常");
}
} catch (Exception e) {
logger.error("测试Python环境失败: ", e);
}
}
private void saveNewsToDatabase(List<ArticleStruct> newsList, String taskId, String logId) {
logger.info("开始保存 {} 条新闻到数据库任务ID: {}日志ID: {}", newsList.size(), taskId, logId);
/**
* @description 将新闻数据保存到数据库
* @param newsList 新闻列表
* @param taskId 任务ID
* @param source 新闻来源
* @param category 分类
* @author yslg
* @since 2025-11-08
*/
private void saveNewsToDatabase(List<Map<String, Object>> newsList, String taskId, String source, String category) {
logger.info("开始保存 {} 条新闻到数据库任务ID: {}", newsList.size(), taskId);
try {
List<TbDataCollectionItem> itemList = new ArrayList<>();
Date now = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (Map<String, Object> news : newsList) {
for (ArticleStruct news : newsList) {
try {
TbDataCollectionItem item = new TbDataCollectionItem();
// 基本信息
item.setTaskId(taskId);
item.setTitle(getStringValue(news, "title"));
item.setContent(getStringValue(news, "content"));
item.setSummary(getStringValue(news, "summary"));
item.setSource(source.equals("rmrb") ? "人民日报" : source);
item.setSourceUrl(getStringValue(news, "url"));
item.setCategory(category);
item.setAuthor(getStringValue(news, "author"));
item.setLogId(logId);
item.setTitle(news.getTitle());
// 拼接HTML内容
if (news.getContentRows() != null && !news.getContentRows().isEmpty()) {
StringBuilder html = new StringBuilder();
for (ArticleStruct.RowStruct row : news.getContentRows()) {
if (row != null && row.getContent() != null) {
html.append(row.getContent());
}
}
item.setContent(html.toString());
}
item.setSource(news.getSource());
item.setSourceUrl(news.getUrl());
item.setAuthor(news.getAuthor());
// 发布时间
String publishTimeStr = getStringValue(news, "publish_time");
String publishTimeStr = news.getPublishTime();
if (publishTimeStr != null && !publishTimeStr.isEmpty()) {
try {
item.setPublishTime(dateFormat.parse(publishTimeStr));
@@ -257,41 +164,17 @@ public class NewsCrawlerTask {
} else {
item.setPublishTime(now);
}
// 封面图片
item.setCoverImage(getStringValue(news, "cover_image"));
// 图片列表JSON格式
Object imagesObj = news.get("images");
if (imagesObj != null) {
if (imagesObj instanceof List) {
item.setImages(objectMapper.writeValueAsString(imagesObj));
} else if (imagesObj instanceof String) {
item.setImages((String) imagesObj);
}
}
// 标签
Object tagsObj = news.get("tags");
if (tagsObj != null) {
if (tagsObj instanceof List) {
List<String> tags = (List<String>) tagsObj;
item.setTags(String.join(",", tags));
} else if (tagsObj instanceof String) {
item.setTags((String) tagsObj);
}
}
// 状态和时间
item.setStatus(0); // 未处理
item.setCrawlTime(now);
itemList.add(item);
} catch (Exception e) {
logger.error("转换新闻数据失败: ", e);
}
}
// 批量保存
if (!itemList.isEmpty()) {
ResultDomain<Integer> result = itemService.batchCreateItems(itemList);
@@ -303,26 +186,9 @@ public class NewsCrawlerTask {
} else {
logger.warn("没有有效的新闻数据需要保存");
}
} catch (Exception e) {
logger.error("保存新闻数据到数据库异常: ", e);
}
}
/**
* @description 从Map中安全获取字符串值
* @param map Map对象
* @param key 键
* @return String 值
* @author yslg
* @since 2025-11-08
*/
private String getStringValue(Map<String, Object> map, String key) {
Object value = map.get(key);
if (value == null) {
return null;
}
return value.toString();
}
}

View File

@@ -1,13 +0,0 @@
package org.xyzh.crontab.task.newsTask;
abstract public class NewsTask {
// 爬取网站目标
private String target;
// 爬取搜索
private String query;
}

View File

@@ -1,234 +0,0 @@
package org.xyzh.crontab.task.newsTask;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Java调用Python的简化示例
* 用于学习和理解核心原理
*/
public class PythonExecutorExample {
/**
* 示例1: 最简单的调用方式
*/
public static void example1_Simple() throws Exception {
// 1. 构建命令
ProcessBuilder pb = new ProcessBuilder("python", "script.py", "arg1", "arg2");
// 2. 启动进程
Process process = pb.start();
// 3. 读取输出
BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8)
);
String line;
while ((line = reader.readLine()) != null) {
System.out.println("Python输出: " + line);
}
// 4. 等待结束
int exitCode = process.waitFor();
System.out.println("退出码: " + exitCode);
}
/**
* 示例2: 获取返回结果(通过标准输出)
*/
public static String example2_GetResult() throws Exception {
ProcessBuilder pb = new ProcessBuilder("python", "script.py");
Process process = pb.start();
// 读取所有输出
StringBuilder result = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
result.append(line);
}
}
process.waitFor();
return result.toString();
}
/**
* 示例3: 带超时控制
*/
public static void example3_WithTimeout() throws Exception {
ProcessBuilder pb = new ProcessBuilder("python", "script.py");
Process process = pb.start();
// 带超时的等待5秒
boolean finished = process.waitFor(5, TimeUnit.SECONDS);
if (!finished) {
// 超时,强制终止
process.destroyForcibly();
System.out.println("任务超时");
} else {
int exitCode = process.exitValue();
System.out.println("执行完成,退出码: " + exitCode);
}
}
/**
* 示例4: 传递参数(通过命令行)
*/
public static void example4_PassArgs() throws Exception {
// 方式1: 通过命令行参数
List<String> command = new ArrayList<>();
command.add("python");
command.add("script.py");
command.add("category=politics");
command.add("limit=20");
ProcessBuilder pb = new ProcessBuilder(command);
Process process = pb.start();
// ... 读取输出
process.waitFor();
}
/**
* 示例5: 传递参数(通过标准输入)
*/
public static void example5_PassArgsByStdin() throws Exception {
ProcessBuilder pb = new ProcessBuilder("python", "script.py");
Process process = pb.start();
// 写入参数到标准输入
try (BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(process.getOutputStream(), StandardCharsets.UTF_8))) {
writer.write("{\"category\":\"politics\",\"limit\":20}");
writer.newLine();
writer.flush();
}
// 关闭输入流重要告诉Python输入结束
process.getOutputStream().close();
// 读取输出
BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8)
);
String result = reader.readLine();
System.out.println("结果: " + result);
process.waitFor();
}
/**
* 示例6: 处理Windows/Linux差异
*/
public static void example6_CrossPlatform() throws Exception {
List<String> command = new ArrayList<>();
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("win")) {
// Windows需要通过cmd执行
command.add("cmd");
command.add("/c");
command.add("python");
} else {
// Linux/Mac直接执行
command.add("python3");
}
command.add("script.py");
ProcessBuilder pb = new ProcessBuilder(command);
Process process = pb.start();
process.waitFor();
}
/**
* 示例7: 完整的错误处理
*/
public static void example7_Complete() throws Exception {
ProcessBuilder pb = new ProcessBuilder("python", "script.py");
// 合并标准输出和错误输出
pb.redirectErrorStream(true);
// 设置工作目录
pb.directory(new File("/path/to/script"));
Process process = null;
try {
process = pb.start();
// 读取输出
StringBuilder output = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append("\n");
}
}
// 等待结束(带超时)
boolean finished = process.waitFor(30, TimeUnit.SECONDS);
if (!finished) {
process.destroyForcibly();
throw new RuntimeException("任务超时");
}
int exitCode = process.exitValue();
if (exitCode == 0) {
System.out.println("执行成功");
System.out.println("输出: " + output.toString());
} else {
System.err.println("执行失败,退出码: " + exitCode);
System.err.println("错误输出: " + output.toString());
throw new RuntimeException("Python执行失败");
}
} catch (Exception e) {
throw new RuntimeException("执行异常", e);
} finally {
// 清理资源
if (process != null && process.isAlive()) {
process.destroyForcibly();
}
}
}
/**
* 示例8: 异步执行(不阻塞)
*/
public static void example8_Async() {
new Thread(() -> {
try {
ProcessBuilder pb = new ProcessBuilder("python", "script.py");
Process process = pb.start();
// 在后台线程中读取输出
BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8)
);
String line;
while ((line = reader.readLine()) != null) {
System.out.println("后台输出: " + line);
}
process.waitFor();
System.out.println("后台任务完成");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
System.out.println("主线程继续执行...");
}
}

View File

@@ -1,15 +0,0 @@
package org.xyzh.crontab.task.newsTask;
import lombok.Data;
@Data
public class ScriptDomain {
private String name;
private String path;
private String method;
private String param;
private String output;
}

View File

@@ -3,12 +3,6 @@ crawler:
path: C:/Python312/python.exe
base:
path: F:/Project/schoolNews/schoolNewsCrawler
script:
- name: xxx爬虫
path: crawler/xxx.py
method: xxx
param: xxx
output: xxx
crontab:
items: #可供前端选择的定时任务列表