653 lines
19 KiB
Markdown
653 lines
19 KiB
Markdown
# Java调用Python并获取返回结果详解
|
||
|
||
## 一、核心原理
|
||
|
||
Java通过 `ProcessBuilder` 或 `Runtime.exec()` 创建操作系统进程来执行Python脚本,然后通过进程的标准输入/输出流进行通信。
|
||
|
||
## 二、当前实现详解
|
||
|
||
### 1. 构建命令
|
||
|
||
```java
|
||
// 步骤1: 构建命令列表
|
||
List<String> command = new ArrayList<>();
|
||
|
||
// 步骤2: 处理Windows/Linux系统差异
|
||
String os = System.getProperty("os.name").toLowerCase();
|
||
if (os.contains("win")) {
|
||
// Windows系统需要通过cmd执行
|
||
command.add("cmd"); // 命令解释器
|
||
command.add("/c"); // /c表示执行后关闭
|
||
command.add(pythonPath); // python或python3
|
||
} else {
|
||
// Linux/Mac系统直接执行
|
||
command.add(pythonPath);
|
||
}
|
||
|
||
// 步骤3: 添加Python脚本和参数
|
||
command.add("main.py"); // Python脚本
|
||
command.add(category); // 参数1: 分类
|
||
command.add(limit); // 参数2: 数量
|
||
command.add(outputFile); // 参数3: 输出文件
|
||
```
|
||
|
||
**命令示例:**
|
||
- Windows: `cmd /c python main.py politics 20 output/news.json`
|
||
- Linux: `python3 main.py politics 20 output/news.json`
|
||
|
||
### 2. 创建进程
|
||
|
||
```java
|
||
// 创建进程构建器
|
||
ProcessBuilder processBuilder = new ProcessBuilder(command);
|
||
|
||
// 设置工作目录(Python脚本所在目录)
|
||
processBuilder.directory(scriptDir.toFile());
|
||
|
||
// 合并标准输出和错误输出(便于统一读取)
|
||
processBuilder.redirectErrorStream(true);
|
||
|
||
// 启动进程
|
||
Process process = processBuilder.start();
|
||
```
|
||
|
||
**关键点:**
|
||
- `directory()`: 设置工作目录,确保Python脚本能找到相对路径的资源
|
||
- `redirectErrorStream(true)`: 将stderr合并到stdout,方便统一读取
|
||
- `start()`: 异步启动进程,不会阻塞
|
||
|
||
### 3. 读取输出流
|
||
|
||
```java
|
||
// 读取标准输出(Python的print输出)
|
||
StringBuilder output = new StringBuilder();
|
||
try (BufferedReader reader = new BufferedReader(
|
||
new InputStreamReader(process.getInputStream(), "UTF-8"))) {
|
||
String line;
|
||
while ((line = reader.readLine()) != null) {
|
||
output.append(line).append("\n");
|
||
logger.debug("Python输出: {}", line);
|
||
}
|
||
}
|
||
```
|
||
|
||
**重要说明:**
|
||
- `process.getInputStream()`: 获取Python进程的标准输出
|
||
- 必须读取输出流,否则缓冲区满会导致进程阻塞
|
||
- 使用UTF-8编码避免中文乱码
|
||
|
||
### 4. 等待进程结束
|
||
|
||
```java
|
||
// 方式1: 带超时的等待(推荐)
|
||
boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
|
||
|
||
if (!finished) {
|
||
// 超时后强制终止进程
|
||
process.destroy(); // 或 process.destroyForcibly() 强制终止
|
||
throw new RuntimeException("任务超时");
|
||
}
|
||
|
||
// 方式2: 无限等待(不推荐,可能导致死锁)
|
||
int exitCode = process.waitFor();
|
||
```
|
||
|
||
**退出码说明:**
|
||
- `0`: 执行成功
|
||
- `非0`: 执行失败(通常是错误码)
|
||
|
||
### 5. 获取返回结果
|
||
|
||
当前实现通过**文件传递**方式获取结果:
|
||
|
||
```java
|
||
// Python脚本将结果写入JSON文件
|
||
Path outputPath = scriptDir.resolve(outputFile);
|
||
|
||
// Java读取文件内容
|
||
String jsonContent = Files.readString(outputPath);
|
||
|
||
// 解析JSON
|
||
ObjectMapper mapper = new ObjectMapper();
|
||
List<Map<String, Object>> newsList = mapper.readValue(
|
||
jsonContent,
|
||
List.class
|
||
);
|
||
```
|
||
|
||
## 三、三种数据传递方式对比
|
||
|
||
### 方式1: 文件传递(当前实现)
|
||
|
||
**优点:**
|
||
- ✅ 适合大数据量
|
||
- ✅ 数据持久化,便于调试
|
||
- ✅ 实现简单
|
||
|
||
**缺点:**
|
||
- ⚠️ 需要文件I/O操作
|
||
- ⚠️ 需要管理临时文件
|
||
- ⚠️ 可能有并发问题(文件名冲突)
|
||
|
||
**实现示例:**
|
||
|
||
```java
|
||
// Java端
|
||
String outputFile = "output/result_" + System.currentTimeMillis() + ".json";
|
||
command.add(outputFile);
|
||
|
||
// Python端
|
||
import json
|
||
import sys
|
||
|
||
result = {"status": "success", "data": [...]}
|
||
with open(sys.argv[1], 'w', encoding='utf-8') as f:
|
||
json.dump(result, f, ensure_ascii=False)
|
||
```
|
||
|
||
### 方式2: 标准输出传递(适合小数据)
|
||
|
||
**优点:**
|
||
- ✅ 实时传输,无需文件
|
||
- ✅ 适合小数据量(< 1MB)
|
||
- ✅ 无文件管理开销
|
||
|
||
**缺点:**
|
||
- ⚠️ 大数据量可能阻塞
|
||
- ⚠️ 不能传递二进制数据
|
||
- ⚠️ 需要与日志输出区分
|
||
|
||
**实现示例:**
|
||
|
||
```java
|
||
// Java端:读取标准输出
|
||
StringBuilder result = new StringBuilder();
|
||
try (BufferedReader reader = new BufferedReader(
|
||
new InputStreamReader(process.getInputStream(), "UTF-8"))) {
|
||
String line;
|
||
while ((line = reader.readLine()) != null) {
|
||
// 约定:以特定标记区分结果和日志
|
||
if (line.startsWith("RESULT:")) {
|
||
result.append(line.substring(7)); // 去掉"RESULT:"前缀
|
||
} else {
|
||
logger.info("Python日志: {}", line);
|
||
}
|
||
}
|
||
}
|
||
|
||
// 解析JSON结果
|
||
String jsonResult = result.toString();
|
||
ObjectMapper mapper = new ObjectMapper();
|
||
Map<String, Object> data = mapper.readValue(jsonResult, Map.class);
|
||
```
|
||
|
||
```python
|
||
# Python端:输出结果
|
||
import json
|
||
import sys
|
||
|
||
# 日志输出到stderr
|
||
print("开始爬取...", file=sys.stderr)
|
||
|
||
# 结果输出到stdout(带标记)
|
||
result = {"status": "success", "data": [...]}
|
||
print("RESULT:" + json.dumps(result, ensure_ascii=False))
|
||
```
|
||
|
||
### 方式3: 标准输入传递参数(双向通信)
|
||
|
||
**优点:**
|
||
- ✅ 可以传递复杂参数
|
||
- ✅ 支持交互式通信
|
||
|
||
**缺点:**
|
||
- ⚠️ 实现复杂
|
||
- ⚠️ 需要处理流关闭时机
|
||
|
||
**实现示例:**
|
||
|
||
```java
|
||
// Java端:通过标准输入传递参数
|
||
ProcessBuilder pb = new ProcessBuilder("python", "script.py");
|
||
Process process = pb.start();
|
||
|
||
// 写入参数到标准输入
|
||
try (BufferedWriter writer = new BufferedWriter(
|
||
new OutputStreamWriter(process.getOutputStream(), "UTF-8"))) {
|
||
String params = "{\"category\":\"politics\",\"limit\":20}";
|
||
writer.write(params);
|
||
writer.newLine();
|
||
writer.flush();
|
||
}
|
||
|
||
// 关闭输入流(告诉Python输入结束)
|
||
process.getOutputStream().close();
|
||
|
||
// 读取输出
|
||
// ... 同方式2
|
||
```
|
||
|
||
```python
|
||
# Python端:从标准输入读取参数
|
||
import json
|
||
import sys
|
||
|
||
# 读取参数
|
||
params_json = sys.stdin.readline().strip()
|
||
params = json.loads(params_json)
|
||
|
||
category = params.get("category", "politics")
|
||
limit = params.get("limit", 20)
|
||
|
||
# 执行爬取
|
||
result = crawl_news(category, limit)
|
||
|
||
# 输出结果
|
||
print(json.dumps(result, ensure_ascii=False))
|
||
```
|
||
|
||
## 四、完整优化实现
|
||
|
||
### 改进版实现(支持多种方式)
|
||
|
||
```java
|
||
package org.xyzh.crontab.task.newsTask;
|
||
|
||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||
import org.slf4j.Logger;
|
||
import org.slf4j.LoggerFactory;
|
||
import org.springframework.beans.factory.annotation.Value;
|
||
import org.springframework.stereotype.Component;
|
||
|
||
import java.io.*;
|
||
import java.nio.charset.StandardCharsets;
|
||
import java.nio.file.Files;
|
||
import java.nio.file.Path;
|
||
import java.nio.file.Paths;
|
||
import java.util.ArrayList;
|
||
import java.util.List;
|
||
import java.util.Map;
|
||
import java.util.concurrent.*;
|
||
|
||
/**
|
||
* Java调用Python的完整实现
|
||
*/
|
||
@Component("newsCrewerTask")
|
||
public class NewsCrewerTask {
|
||
|
||
private static final Logger logger = LoggerFactory.getLogger(NewsCrewerTask.class);
|
||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||
|
||
@Value("${crewer.python.path:python}")
|
||
private String pythonPath;
|
||
|
||
@Value("${crewer.script.path:../schoolNewsCrewer}")
|
||
private String scriptPath;
|
||
|
||
@Value("${crewer.timeout:300}")
|
||
private int timeout;
|
||
|
||
/**
|
||
* 方式1: 通过文件传递结果(当前实现,适合大数据)
|
||
*/
|
||
public List<Map<String, Object>> executeByFile(String category, int limit) {
|
||
logger.info("执行爬虫任务 - 文件方式");
|
||
|
||
try {
|
||
// 1. 构建命令
|
||
List<String> command = buildCommand("main.py", category, String.valueOf(limit));
|
||
|
||
// 2. 生成输出文件
|
||
String timestamp = String.valueOf(System.currentTimeMillis());
|
||
String outputFile = String.format("output/news_%s_%s.json", category, timestamp);
|
||
command.add(outputFile);
|
||
|
||
// 3. 执行进程
|
||
ProcessResult result = executeProcess(command);
|
||
|
||
if (result.getExitCode() != 0) {
|
||
throw new RuntimeException("Python执行失败: " + result.getOutput());
|
||
}
|
||
|
||
// 4. 读取结果文件
|
||
Path outputPath = Paths.get(scriptPath).resolve(outputFile);
|
||
if (!Files.exists(outputPath)) {
|
||
throw new RuntimeException("输出文件不存在: " + outputFile);
|
||
}
|
||
|
||
String jsonContent = Files.readString(outputPath, StandardCharsets.UTF_8);
|
||
List<Map<String, Object>> newsList = objectMapper.readValue(
|
||
jsonContent,
|
||
objectMapper.getTypeFactory().constructCollectionType(List.class, Map.class)
|
||
);
|
||
|
||
// 5. 清理临时文件(可选)
|
||
// Files.deleteIfExists(outputPath);
|
||
|
||
return newsList;
|
||
|
||
} catch (Exception e) {
|
||
logger.error("执行失败", e);
|
||
throw new RuntimeException("爬虫任务执行失败", e);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 方式2: 通过标准输出传递结果(适合小数据)
|
||
*/
|
||
public List<Map<String, Object>> executeByStdout(String category, int limit) {
|
||
logger.info("执行爬虫任务 - 标准输出方式");
|
||
|
||
try {
|
||
// 1. 构建命令(使用特殊脚本,输出JSON到stdout)
|
||
List<String> command = buildCommand("main_stdout.py", category, String.valueOf(limit));
|
||
|
||
// 2. 执行进程
|
||
ProcessResult result = executeProcess(command);
|
||
|
||
if (result.getExitCode() != 0) {
|
||
throw new RuntimeException("Python执行失败: " + result.getOutput());
|
||
}
|
||
|
||
// 3. 从输出中提取JSON(约定:最后一行是JSON结果)
|
||
String output = result.getOutput();
|
||
String[] lines = output.split("\n");
|
||
|
||
// 查找JSON行(以{或[开头)
|
||
String jsonLine = null;
|
||
for (int i = lines.length - 1; i >= 0; i--) {
|
||
String line = lines[i].trim();
|
||
if (line.startsWith("{") || line.startsWith("[")) {
|
||
jsonLine = line;
|
||
break;
|
||
}
|
||
}
|
||
|
||
if (jsonLine == null) {
|
||
throw new RuntimeException("未找到JSON结果");
|
||
}
|
||
|
||
// 4. 解析JSON
|
||
List<Map<String, Object>> newsList = objectMapper.readValue(
|
||
jsonLine,
|
||
objectMapper.getTypeFactory().constructCollectionType(List.class, Map.class)
|
||
);
|
||
|
||
return newsList;
|
||
|
||
} catch (Exception e) {
|
||
logger.error("执行失败", e);
|
||
throw new RuntimeException("爬虫任务执行失败", e);
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 方式3: 通过标准输入传递参数(双向通信)
|
||
*/
|
||
public List<Map<String, Object>> executeByStdin(String category, int limit) {
|
||
logger.info("执行爬虫任务 - 标准输入方式");
|
||
|
||
Process process = null;
|
||
try {
|
||
// 1. 构建命令
|
||
List<String> command = buildCommand("main_stdin.py");
|
||
ProcessBuilder pb = new ProcessBuilder(command);
|
||
pb.directory(Paths.get(scriptPath).toFile());
|
||
pb.redirectErrorStream(true);
|
||
|
||
// 2. 启动进程
|
||
process = pb.start();
|
||
|
||
// 3. 写入参数到标准输入
|
||
try (BufferedWriter writer = new BufferedWriter(
|
||
new OutputStreamWriter(process.getOutputStream(), StandardCharsets.UTF_8))) {
|
||
|
||
Map<String, Object> params = Map.of(
|
||
"category", category,
|
||
"limit", limit
|
||
);
|
||
|
||
String paramsJson = objectMapper.writeValueAsString(params);
|
||
writer.write(paramsJson);
|
||
writer.newLine();
|
||
writer.flush();
|
||
}
|
||
|
||
// 4. 关闭输入流(重要!)
|
||
process.getOutputStream().close();
|
||
|
||
// 5. 读取输出
|
||
StringBuilder output = new StringBuilder();
|
||
try (BufferedReader reader = new BufferedReader(
|
||
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
|
||
String line;
|
||
while ((line = reader.readLine()) != null) {
|
||
output.append(line).append("\n");
|
||
}
|
||
}
|
||
|
||
// 6. 等待进程结束
|
||
boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
|
||
if (!finished) {
|
||
process.destroyForcibly();
|
||
throw new RuntimeException("任务超时");
|
||
}
|
||
|
||
int exitCode = process.exitValue();
|
||
if (exitCode != 0) {
|
||
throw new RuntimeException("Python执行失败,退出码: " + exitCode);
|
||
}
|
||
|
||
// 7. 解析结果
|
||
String jsonResult = output.toString().trim();
|
||
List<Map<String, Object>> newsList = objectMapper.readValue(
|
||
jsonResult,
|
||
objectMapper.getTypeFactory().constructCollectionType(List.class, Map.class)
|
||
);
|
||
|
||
return newsList;
|
||
|
||
} catch (Exception e) {
|
||
logger.error("执行失败", e);
|
||
throw new RuntimeException("爬虫任务执行失败", e);
|
||
} finally {
|
||
if (process != null && process.isAlive()) {
|
||
process.destroyForcibly();
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 通用进程执行方法
|
||
*/
|
||
private ProcessResult executeProcess(List<String> command) throws IOException, InterruptedException {
|
||
long startTime = System.currentTimeMillis();
|
||
|
||
// 创建进程构建器
|
||
ProcessBuilder pb = new ProcessBuilder(command);
|
||
pb.directory(Paths.get(scriptPath).toFile());
|
||
pb.redirectErrorStream(true);
|
||
|
||
logger.info("执行命令: {}", String.join(" ", command));
|
||
|
||
// 启动进程
|
||
Process process = pb.start();
|
||
|
||
// 读取输出(必须在单独线程中,避免阻塞)
|
||
StringBuilder output = new StringBuilder();
|
||
StringBuilder error = new StringBuilder();
|
||
|
||
// 使用CompletableFuture异步读取,避免死锁
|
||
CompletableFuture<String> outputFuture = CompletableFuture.supplyAsync(() -> {
|
||
try (BufferedReader reader = new BufferedReader(
|
||
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
|
||
String line;
|
||
while ((line = reader.readLine()) != null) {
|
||
output.append(line).append("\n");
|
||
logger.debug("Python输出: {}", line);
|
||
}
|
||
return output.toString();
|
||
} catch (IOException e) {
|
||
logger.error("读取输出失败", e);
|
||
return "";
|
||
}
|
||
});
|
||
|
||
// 等待进程结束(带超时)
|
||
boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
|
||
|
||
if (!finished) {
|
||
process.destroyForcibly();
|
||
throw new RuntimeException("任务超时(超过" + timeout + "秒)");
|
||
}
|
||
|
||
// 获取输出
|
||
String outputStr = outputFuture.get(5, TimeUnit.SECONDS);
|
||
|
||
int exitCode = process.exitValue();
|
||
long duration = System.currentTimeMillis() - startTime;
|
||
|
||
logger.info("进程执行完成 - 退出码: {}, 耗时: {}ms", exitCode, duration);
|
||
|
||
return new ProcessResult(exitCode, outputStr, duration);
|
||
}
|
||
|
||
/**
|
||
* 构建命令列表
|
||
*/
|
||
private List<String> buildCommand(String... args) {
|
||
List<String> command = new ArrayList<>();
|
||
|
||
String os = System.getProperty("os.name").toLowerCase();
|
||
if (os.contains("win")) {
|
||
command.add("cmd");
|
||
command.add("/c");
|
||
command.add(pythonPath);
|
||
} else {
|
||
command.add(pythonPath);
|
||
}
|
||
|
||
for (String arg : args) {
|
||
command.add(arg);
|
||
}
|
||
|
||
return command;
|
||
}
|
||
|
||
/**
|
||
* 进程执行结果
|
||
*/
|
||
private static class ProcessResult {
|
||
private final int exitCode;
|
||
private final String output;
|
||
private final long duration;
|
||
|
||
public ProcessResult(int exitCode, String output, long duration) {
|
||
this.exitCode = exitCode;
|
||
this.output = output;
|
||
this.duration = duration;
|
||
}
|
||
|
||
public int getExitCode() {
|
||
return exitCode;
|
||
}
|
||
|
||
public String getOutput() {
|
||
return output;
|
||
}
|
||
|
||
public long getDuration() {
|
||
return duration;
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
## 五、关键注意事项
|
||
|
||
### 1. 必须读取输出流
|
||
|
||
**错误示例:**
|
||
```java
|
||
Process process = pb.start();
|
||
int exitCode = process.waitFor(); // 可能永远阻塞!
|
||
```
|
||
|
||
**原因:** 如果输出缓冲区满了,Python进程会阻塞等待读取。
|
||
|
||
**正确做法:**
|
||
```java
|
||
Process process = pb.start();
|
||
|
||
// 必须读取输出流
|
||
Thread outputThread = new Thread(() -> {
|
||
try (BufferedReader reader = ...) {
|
||
// 读取输出
|
||
}
|
||
});
|
||
outputThread.start();
|
||
|
||
process.waitFor();
|
||
```
|
||
|
||
### 2. 处理编码问题
|
||
|
||
```java
|
||
// 指定UTF-8编码,避免中文乱码
|
||
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8)
|
||
new OutputStreamWriter(process.getOutputStream(), StandardCharsets.UTF_8)
|
||
```
|
||
|
||
### 3. 超时控制
|
||
|
||
```java
|
||
// 使用带超时的waitFor
|
||
boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
|
||
if (!finished) {
|
||
process.destroyForcibly(); // 强制终止
|
||
}
|
||
```
|
||
|
||
### 4. 资源清理
|
||
|
||
```java
|
||
try {
|
||
// 执行逻辑
|
||
} finally {
|
||
if (process != null && process.isAlive()) {
|
||
process.destroyForcibly();
|
||
}
|
||
// 关闭流
|
||
process.getInputStream().close();
|
||
process.getOutputStream().close();
|
||
process.getErrorStream().close();
|
||
}
|
||
```
|
||
|
||
### 5. 错误处理
|
||
|
||
```java
|
||
// 检查退出码
|
||
if (exitCode != 0) {
|
||
// 读取错误输出
|
||
String error = readErrorStream(process);
|
||
throw new RuntimeException("执行失败: " + error);
|
||
}
|
||
```
|
||
|
||
## 六、性能优化建议
|
||
|
||
1. **使用线程池**:如果频繁调用,使用线程池管理进程
|
||
2. **连接复用**:考虑Python服务模式(HTTP/GRPC)
|
||
3. **异步执行**:使用CompletableFuture异步执行
|
||
4. **缓存结果**:对相同参数的请求缓存结果
|
||
|
||
## 七、总结
|
||
|
||
- **文件传递**:适合大数据量,当前实现方式
|
||
- **标准输出**:适合小数据量,实时传输
|
||
- **标准输入**:适合复杂参数,双向通信
|
||
|
||
根据实际需求选择合适的方式,当前的文件传递方式已经足够好!
|
||
|