temp定时任务修改

This commit is contained in:
2025-11-10 16:03:50 +08:00
parent e8b76278e9
commit 9adbd6d365
38 changed files with 2710 additions and 2032 deletions

View File

@@ -0,0 +1,652 @@
# Java调用Python并获取返回结果详解
## 一、核心原理
Java通过 `ProcessBuilder``Runtime.exec()` 创建操作系统进程来执行Python脚本然后通过进程的标准输入/输出流进行通信。
## 二、当前实现详解
### 1. 构建命令
```java
// 步骤1: 构建命令列表
List<String> command = new ArrayList<>();
// 步骤2: 处理Windows/Linux系统差异
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("win")) {
// Windows系统需要通过cmd执行
command.add("cmd"); // 命令解释器
command.add("/c"); // /c表示执行后关闭
command.add(pythonPath); // python或python3
} else {
// Linux/Mac系统直接执行
command.add(pythonPath);
}
// 步骤3: 添加Python脚本和参数
command.add("main.py"); // Python脚本
command.add(category); // 参数1: 分类
command.add(limit); // 参数2: 数量
command.add(outputFile); // 参数3: 输出文件
```
**命令示例:**
- Windows: `cmd /c python main.py politics 20 output/news.json`
- Linux: `python3 main.py politics 20 output/news.json`
### 2. 创建进程
```java
// 创建进程构建器
ProcessBuilder processBuilder = new ProcessBuilder(command);
// 设置工作目录Python脚本所在目录
processBuilder.directory(scriptDir.toFile());
// 合并标准输出和错误输出(便于统一读取)
processBuilder.redirectErrorStream(true);
// 启动进程
Process process = processBuilder.start();
```
**关键点:**
- `directory()`: 设置工作目录确保Python脚本能找到相对路径的资源
- `redirectErrorStream(true)`: 将stderr合并到stdout方便统一读取
- `start()`: 异步启动进程,不会阻塞
### 3. 读取输出流
```java
// 读取标准输出Python的print输出
StringBuilder output = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), "UTF-8"))) {
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append("\n");
logger.debug("Python输出: {}", line);
}
}
```
**重要说明:**
- `process.getInputStream()`: 获取Python进程的标准输出
- 必须读取输出流,否则缓冲区满会导致进程阻塞
- 使用UTF-8编码避免中文乱码
### 4. 等待进程结束
```java
// 方式1: 带超时的等待(推荐)
boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
if (!finished) {
// 超时后强制终止进程
process.destroy(); // 或 process.destroyForcibly() 强制终止
throw new RuntimeException("任务超时");
}
// 方式2: 无限等待(不推荐,可能导致死锁)
int exitCode = process.waitFor();
```
**退出码说明:**
- `0`: 执行成功
- `非0`: 执行失败(通常是错误码)
### 5. 获取返回结果
当前实现通过**文件传递**方式获取结果:
```java
// Python脚本将结果写入JSON文件
Path outputPath = scriptDir.resolve(outputFile);
// Java读取文件内容
String jsonContent = Files.readString(outputPath);
// 解析JSON
ObjectMapper mapper = new ObjectMapper();
List<Map<String, Object>> newsList = mapper.readValue(
jsonContent,
List.class
);
```
## 三、三种数据传递方式对比
### 方式1: 文件传递(当前实现)
**优点:**
- ✅ 适合大数据量
- ✅ 数据持久化,便于调试
- ✅ 实现简单
**缺点:**
- ⚠️ 需要文件I/O操作
- ⚠️ 需要管理临时文件
- ⚠️ 可能有并发问题(文件名冲突)
**实现示例:**
```java
// Java端
String outputFile = "output/result_" + System.currentTimeMillis() + ".json";
command.add(outputFile);
// Python端
import json
import sys
result = {"status": "success", "data": [...]}
with open(sys.argv[1], 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False)
```
### 方式2: 标准输出传递(适合小数据)
**优点:**
- ✅ 实时传输,无需文件
- ✅ 适合小数据量(< 1MB
- 无文件管理开销
**缺点:**
- 大数据量可能阻塞
- 不能传递二进制数据
- 需要与日志输出区分
**实现示例:**
```java
// Java端读取标准输出
StringBuilder result = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), "UTF-8"))) {
String line;
while ((line = reader.readLine()) != null) {
// 约定:以特定标记区分结果和日志
if (line.startsWith("RESULT:")) {
result.append(line.substring(7)); // 去掉"RESULT:"前缀
} else {
logger.info("Python日志: {}", line);
}
}
}
// 解析JSON结果
String jsonResult = result.toString();
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> data = mapper.readValue(jsonResult, Map.class);
```
```python
# Python端输出结果
import json
import sys
# 日志输出到stderr
print("开始爬取...", file=sys.stderr)
# 结果输出到stdout带标记
result = {"status": "success", "data": [...]}
print("RESULT:" + json.dumps(result, ensure_ascii=False))
```
### 方式3: 标准输入传递参数(双向通信)
**优点:**
- 可以传递复杂参数
- 支持交互式通信
**缺点:**
- 实现复杂
- 需要处理流关闭时机
**实现示例:**
```java
// Java端通过标准输入传递参数
ProcessBuilder pb = new ProcessBuilder("python", "script.py");
Process process = pb.start();
// 写入参数到标准输入
try (BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(process.getOutputStream(), "UTF-8"))) {
String params = "{\"category\":\"politics\",\"limit\":20}";
writer.write(params);
writer.newLine();
writer.flush();
}
// 关闭输入流告诉Python输入结束
process.getOutputStream().close();
// 读取输出
// ... 同方式2
```
```python
# Python端从标准输入读取参数
import json
import sys
# 读取参数
params_json = sys.stdin.readline().strip()
params = json.loads(params_json)
category = params.get("category", "politics")
limit = params.get("limit", 20)
# 执行爬取
result = crawl_news(category, limit)
# 输出结果
print(json.dumps(result, ensure_ascii=False))
```
## 四、完整优化实现
### 改进版实现(支持多种方式)
```java
package org.xyzh.crontab.task.newsTask;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/**
* Java调用Python的完整实现
*/
@Component("newsCrewerTask")
public class NewsCrewerTask {
private static final Logger logger = LoggerFactory.getLogger(NewsCrewerTask.class);
private final ObjectMapper objectMapper = new ObjectMapper();
@Value("${crewer.python.path:python}")
private String pythonPath;
@Value("${crewer.script.path:../schoolNewsCrewer}")
private String scriptPath;
@Value("${crewer.timeout:300}")
private int timeout;
/**
* 方式1: 通过文件传递结果(当前实现,适合大数据)
*/
public List<Map<String, Object>> executeByFile(String category, int limit) {
logger.info("执行爬虫任务 - 文件方式");
try {
// 1. 构建命令
List<String> command = buildCommand("main.py", category, String.valueOf(limit));
// 2. 生成输出文件
String timestamp = String.valueOf(System.currentTimeMillis());
String outputFile = String.format("output/news_%s_%s.json", category, timestamp);
command.add(outputFile);
// 3. 执行进程
ProcessResult result = executeProcess(command);
if (result.getExitCode() != 0) {
throw new RuntimeException("Python执行失败: " + result.getOutput());
}
// 4. 读取结果文件
Path outputPath = Paths.get(scriptPath).resolve(outputFile);
if (!Files.exists(outputPath)) {
throw new RuntimeException("输出文件不存在: " + outputFile);
}
String jsonContent = Files.readString(outputPath, StandardCharsets.UTF_8);
List<Map<String, Object>> newsList = objectMapper.readValue(
jsonContent,
objectMapper.getTypeFactory().constructCollectionType(List.class, Map.class)
);
// 5. 清理临时文件(可选)
// Files.deleteIfExists(outputPath);
return newsList;
} catch (Exception e) {
logger.error("执行失败", e);
throw new RuntimeException("爬虫任务执行失败", e);
}
}
/**
* 方式2: 通过标准输出传递结果(适合小数据)
*/
public List<Map<String, Object>> executeByStdout(String category, int limit) {
logger.info("执行爬虫任务 - 标准输出方式");
try {
// 1. 构建命令使用特殊脚本输出JSON到stdout
List<String> command = buildCommand("main_stdout.py", category, String.valueOf(limit));
// 2. 执行进程
ProcessResult result = executeProcess(command);
if (result.getExitCode() != 0) {
throw new RuntimeException("Python执行失败: " + result.getOutput());
}
// 3. 从输出中提取JSON约定最后一行是JSON结果
String output = result.getOutput();
String[] lines = output.split("\n");
// 查找JSON行以{或[开头)
String jsonLine = null;
for (int i = lines.length - 1; i >= 0; i--) {
String line = lines[i].trim();
if (line.startsWith("{") || line.startsWith("[")) {
jsonLine = line;
break;
}
}
if (jsonLine == null) {
throw new RuntimeException("未找到JSON结果");
}
// 4. 解析JSON
List<Map<String, Object>> newsList = objectMapper.readValue(
jsonLine,
objectMapper.getTypeFactory().constructCollectionType(List.class, Map.class)
);
return newsList;
} catch (Exception e) {
logger.error("执行失败", e);
throw new RuntimeException("爬虫任务执行失败", e);
}
}
/**
* 方式3: 通过标准输入传递参数(双向通信)
*/
public List<Map<String, Object>> executeByStdin(String category, int limit) {
logger.info("执行爬虫任务 - 标准输入方式");
Process process = null;
try {
// 1. 构建命令
List<String> command = buildCommand("main_stdin.py");
ProcessBuilder pb = new ProcessBuilder(command);
pb.directory(Paths.get(scriptPath).toFile());
pb.redirectErrorStream(true);
// 2. 启动进程
process = pb.start();
// 3. 写入参数到标准输入
try (BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(process.getOutputStream(), StandardCharsets.UTF_8))) {
Map<String, Object> params = Map.of(
"category", category,
"limit", limit
);
String paramsJson = objectMapper.writeValueAsString(params);
writer.write(paramsJson);
writer.newLine();
writer.flush();
}
// 4. 关闭输入流(重要!)
process.getOutputStream().close();
// 5. 读取输出
StringBuilder output = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append("\n");
}
}
// 6. 等待进程结束
boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
if (!finished) {
process.destroyForcibly();
throw new RuntimeException("任务超时");
}
int exitCode = process.exitValue();
if (exitCode != 0) {
throw new RuntimeException("Python执行失败退出码: " + exitCode);
}
// 7. 解析结果
String jsonResult = output.toString().trim();
List<Map<String, Object>> newsList = objectMapper.readValue(
jsonResult,
objectMapper.getTypeFactory().constructCollectionType(List.class, Map.class)
);
return newsList;
} catch (Exception e) {
logger.error("执行失败", e);
throw new RuntimeException("爬虫任务执行失败", e);
} finally {
if (process != null && process.isAlive()) {
process.destroyForcibly();
}
}
}
/**
* 通用进程执行方法
*/
private ProcessResult executeProcess(List<String> command) throws IOException, InterruptedException {
long startTime = System.currentTimeMillis();
// 创建进程构建器
ProcessBuilder pb = new ProcessBuilder(command);
pb.directory(Paths.get(scriptPath).toFile());
pb.redirectErrorStream(true);
logger.info("执行命令: {}", String.join(" ", command));
// 启动进程
Process process = pb.start();
// 读取输出(必须在单独线程中,避免阻塞)
StringBuilder output = new StringBuilder();
StringBuilder error = new StringBuilder();
// 使用CompletableFuture异步读取避免死锁
CompletableFuture<String> outputFuture = CompletableFuture.supplyAsync(() -> {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append("\n");
logger.debug("Python输出: {}", line);
}
return output.toString();
} catch (IOException e) {
logger.error("读取输出失败", e);
return "";
}
});
// 等待进程结束(带超时)
boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
if (!finished) {
process.destroyForcibly();
throw new RuntimeException("任务超时(超过" + timeout + "秒)");
}
// 获取输出
String outputStr = outputFuture.get(5, TimeUnit.SECONDS);
int exitCode = process.exitValue();
long duration = System.currentTimeMillis() - startTime;
logger.info("进程执行完成 - 退出码: {}, 耗时: {}ms", exitCode, duration);
return new ProcessResult(exitCode, outputStr, duration);
}
/**
* 构建命令列表
*/
private List<String> buildCommand(String... args) {
List<String> command = new ArrayList<>();
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("win")) {
command.add("cmd");
command.add("/c");
command.add(pythonPath);
} else {
command.add(pythonPath);
}
for (String arg : args) {
command.add(arg);
}
return command;
}
/**
* 进程执行结果
*/
private static class ProcessResult {
private final int exitCode;
private final String output;
private final long duration;
public ProcessResult(int exitCode, String output, long duration) {
this.exitCode = exitCode;
this.output = output;
this.duration = duration;
}
public int getExitCode() {
return exitCode;
}
public String getOutput() {
return output;
}
public long getDuration() {
return duration;
}
}
}
```
## 五、关键注意事项
### 1. 必须读取输出流
**错误示例:**
```java
Process process = pb.start();
int exitCode = process.waitFor(); // 可能永远阻塞!
```
**原因:** 如果输出缓冲区满了Python进程会阻塞等待读取
**正确做法:**
```java
Process process = pb.start();
// 必须读取输出流
Thread outputThread = new Thread(() -> {
try (BufferedReader reader = ...) {
// 读取输出
}
});
outputThread.start();
process.waitFor();
```
### 2. 处理编码问题
```java
// 指定UTF-8编码避免中文乱码
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8)
new OutputStreamWriter(process.getOutputStream(), StandardCharsets.UTF_8)
```
### 3. 超时控制
```java
// 使用带超时的waitFor
boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
if (!finished) {
process.destroyForcibly(); // 强制终止
}
```
### 4. 资源清理
```java
try {
// 执行逻辑
} finally {
if (process != null && process.isAlive()) {
process.destroyForcibly();
}
// 关闭流
process.getInputStream().close();
process.getOutputStream().close();
process.getErrorStream().close();
}
```
### 5. 错误处理
```java
// 检查退出码
if (exitCode != 0) {
// 读取错误输出
String error = readErrorStream(process);
throw new RuntimeException("执行失败: " + error);
}
```
## 六、性能优化建议
1. **使用线程池**如果频繁调用使用线程池管理进程
2. **连接复用**考虑Python服务模式HTTP/GRPC
3. **异步执行**使用CompletableFuture异步执行
4. **缓存结果**对相同参数的请求缓存结果
## 七、总结
- **文件传递**适合大数据量当前实现方式
- **标准输出**适合小数据量实时传输
- **标准输入**适合复杂参数双向通信
根据实际需求选择合适的方式当前的文件传递方式已经足够好