Files
schoolNews/schoolNewsServ/crontab/Java调用Python详解.md
2025-11-10 16:03:50 +08:00

653 lines
19 KiB
Markdown
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Java调用Python并获取返回结果详解
## 一、核心原理
Java通过 `ProcessBuilder``Runtime.exec()` 创建操作系统进程来执行Python脚本然后通过进程的标准输入/输出流进行通信。
## 二、当前实现详解
### 1. 构建命令
```java
// 步骤1: 构建命令列表
List<String> command = new ArrayList<>();
// 步骤2: 处理Windows/Linux系统差异
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("win")) {
// Windows系统需要通过cmd执行
command.add("cmd"); // 命令解释器
command.add("/c"); // /c表示执行后关闭
command.add(pythonPath); // python或python3
} else {
// Linux/Mac系统直接执行
command.add(pythonPath);
}
// 步骤3: 添加Python脚本和参数
command.add("main.py"); // Python脚本
command.add(category); // 参数1: 分类
command.add(limit); // 参数2: 数量
command.add(outputFile); // 参数3: 输出文件
```
**命令示例:**
- Windows: `cmd /c python main.py politics 20 output/news.json`
- Linux: `python3 main.py politics 20 output/news.json`
### 2. 创建进程
```java
// 创建进程构建器
ProcessBuilder processBuilder = new ProcessBuilder(command);
// 设置工作目录Python脚本所在目录
processBuilder.directory(scriptDir.toFile());
// 合并标准输出和错误输出(便于统一读取)
processBuilder.redirectErrorStream(true);
// 启动进程
Process process = processBuilder.start();
```
**关键点:**
- `directory()`: 设置工作目录确保Python脚本能找到相对路径的资源
- `redirectErrorStream(true)`: 将stderr合并到stdout方便统一读取
- `start()`: 异步启动进程,不会阻塞
### 3. 读取输出流
```java
// 读取标准输出Python的print输出
StringBuilder output = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), "UTF-8"))) {
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append("\n");
logger.debug("Python输出: {}", line);
}
}
```
**重要说明:**
- `process.getInputStream()`: 获取Python进程的标准输出
- 必须读取输出流,否则缓冲区满会导致进程阻塞
- 使用UTF-8编码避免中文乱码
### 4. 等待进程结束
```java
// 方式1: 带超时的等待(推荐)
boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
if (!finished) {
// 超时后强制终止进程
process.destroy(); // 或 process.destroyForcibly() 强制终止
throw new RuntimeException("任务超时");
}
// 方式2: 无限等待(不推荐,可能导致死锁)
int exitCode = process.waitFor();
```
**退出码说明:**
- `0`: 执行成功
- `非0`: 执行失败(通常是错误码)
### 5. 获取返回结果
当前实现通过**文件传递**方式获取结果:
```java
// Python脚本将结果写入JSON文件
Path outputPath = scriptDir.resolve(outputFile);
// Java读取文件内容
String jsonContent = Files.readString(outputPath);
// 解析JSON
ObjectMapper mapper = new ObjectMapper();
List<Map<String, Object>> newsList = mapper.readValue(
jsonContent,
List.class
);
```
## 三、三种数据传递方式对比
### 方式1: 文件传递(当前实现)
**优点:**
- ✅ 适合大数据量
- ✅ 数据持久化,便于调试
- ✅ 实现简单
**缺点:**
- ⚠️ 需要文件I/O操作
- ⚠️ 需要管理临时文件
- ⚠️ 可能有并发问题(文件名冲突)
**实现示例:**
```java
// Java端
String outputFile = "output/result_" + System.currentTimeMillis() + ".json";
command.add(outputFile);
// Python端
import json
import sys
result = {"status": "success", "data": [...]}
with open(sys.argv[1], 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False)
```
### 方式2: 标准输出传递(适合小数据)
**优点:**
- ✅ 实时传输,无需文件
- ✅ 适合小数据量(< 1MB
- 无文件管理开销
**缺点:**
- 大数据量可能阻塞
- 不能传递二进制数据
- 需要与日志输出区分
**实现示例:**
```java
// Java端读取标准输出
StringBuilder result = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), "UTF-8"))) {
String line;
while ((line = reader.readLine()) != null) {
// 约定:以特定标记区分结果和日志
if (line.startsWith("RESULT:")) {
result.append(line.substring(7)); // 去掉"RESULT:"前缀
} else {
logger.info("Python日志: {}", line);
}
}
}
// 解析JSON结果
String jsonResult = result.toString();
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> data = mapper.readValue(jsonResult, Map.class);
```
```python
# Python端输出结果
import json
import sys
# 日志输出到stderr
print("开始爬取...", file=sys.stderr)
# 结果输出到stdout带标记
result = {"status": "success", "data": [...]}
print("RESULT:" + json.dumps(result, ensure_ascii=False))
```
### 方式3: 标准输入传递参数(双向通信)
**优点:**
- 可以传递复杂参数
- 支持交互式通信
**缺点:**
- 实现复杂
- 需要处理流关闭时机
**实现示例:**
```java
// Java端通过标准输入传递参数
ProcessBuilder pb = new ProcessBuilder("python", "script.py");
Process process = pb.start();
// 写入参数到标准输入
try (BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(process.getOutputStream(), "UTF-8"))) {
String params = "{\"category\":\"politics\",\"limit\":20}";
writer.write(params);
writer.newLine();
writer.flush();
}
// 关闭输入流告诉Python输入结束
process.getOutputStream().close();
// 读取输出
// ... 同方式2
```
```python
# Python端从标准输入读取参数
import json
import sys
# 读取参数
params_json = sys.stdin.readline().strip()
params = json.loads(params_json)
category = params.get("category", "politics")
limit = params.get("limit", 20)
# 执行爬取
result = crawl_news(category, limit)
# 输出结果
print(json.dumps(result, ensure_ascii=False))
```
## 四、完整优化实现
### 改进版实现(支持多种方式)
```java
package org.xyzh.crontab.task.newsTask;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/**
* Java调用Python的完整实现
*/
@Component("newsCrewerTask")
public class NewsCrewerTask {
private static final Logger logger = LoggerFactory.getLogger(NewsCrewerTask.class);
private final ObjectMapper objectMapper = new ObjectMapper();
@Value("${crewer.python.path:python}")
private String pythonPath;
@Value("${crewer.script.path:../schoolNewsCrewer}")
private String scriptPath;
@Value("${crewer.timeout:300}")
private int timeout;
/**
* 方式1: 通过文件传递结果(当前实现,适合大数据)
*/
public List<Map<String, Object>> executeByFile(String category, int limit) {
logger.info("执行爬虫任务 - 文件方式");
try {
// 1. 构建命令
List<String> command = buildCommand("main.py", category, String.valueOf(limit));
// 2. 生成输出文件
String timestamp = String.valueOf(System.currentTimeMillis());
String outputFile = String.format("output/news_%s_%s.json", category, timestamp);
command.add(outputFile);
// 3. 执行进程
ProcessResult result = executeProcess(command);
if (result.getExitCode() != 0) {
throw new RuntimeException("Python执行失败: " + result.getOutput());
}
// 4. 读取结果文件
Path outputPath = Paths.get(scriptPath).resolve(outputFile);
if (!Files.exists(outputPath)) {
throw new RuntimeException("输出文件不存在: " + outputFile);
}
String jsonContent = Files.readString(outputPath, StandardCharsets.UTF_8);
List<Map<String, Object>> newsList = objectMapper.readValue(
jsonContent,
objectMapper.getTypeFactory().constructCollectionType(List.class, Map.class)
);
// 5. 清理临时文件(可选)
// Files.deleteIfExists(outputPath);
return newsList;
} catch (Exception e) {
logger.error("执行失败", e);
throw new RuntimeException("爬虫任务执行失败", e);
}
}
/**
* 方式2: 通过标准输出传递结果(适合小数据)
*/
public List<Map<String, Object>> executeByStdout(String category, int limit) {
logger.info("执行爬虫任务 - 标准输出方式");
try {
// 1. 构建命令使用特殊脚本输出JSON到stdout
List<String> command = buildCommand("main_stdout.py", category, String.valueOf(limit));
// 2. 执行进程
ProcessResult result = executeProcess(command);
if (result.getExitCode() != 0) {
throw new RuntimeException("Python执行失败: " + result.getOutput());
}
// 3. 从输出中提取JSON约定最后一行是JSON结果
String output = result.getOutput();
String[] lines = output.split("\n");
// 查找JSON行以{或[开头)
String jsonLine = null;
for (int i = lines.length - 1; i >= 0; i--) {
String line = lines[i].trim();
if (line.startsWith("{") || line.startsWith("[")) {
jsonLine = line;
break;
}
}
if (jsonLine == null) {
throw new RuntimeException("未找到JSON结果");
}
// 4. 解析JSON
List<Map<String, Object>> newsList = objectMapper.readValue(
jsonLine,
objectMapper.getTypeFactory().constructCollectionType(List.class, Map.class)
);
return newsList;
} catch (Exception e) {
logger.error("执行失败", e);
throw new RuntimeException("爬虫任务执行失败", e);
}
}
/**
* 方式3: 通过标准输入传递参数(双向通信)
*/
public List<Map<String, Object>> executeByStdin(String category, int limit) {
logger.info("执行爬虫任务 - 标准输入方式");
Process process = null;
try {
// 1. 构建命令
List<String> command = buildCommand("main_stdin.py");
ProcessBuilder pb = new ProcessBuilder(command);
pb.directory(Paths.get(scriptPath).toFile());
pb.redirectErrorStream(true);
// 2. 启动进程
process = pb.start();
// 3. 写入参数到标准输入
try (BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(process.getOutputStream(), StandardCharsets.UTF_8))) {
Map<String, Object> params = Map.of(
"category", category,
"limit", limit
);
String paramsJson = objectMapper.writeValueAsString(params);
writer.write(paramsJson);
writer.newLine();
writer.flush();
}
// 4. 关闭输入流(重要!)
process.getOutputStream().close();
// 5. 读取输出
StringBuilder output = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append("\n");
}
}
// 6. 等待进程结束
boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
if (!finished) {
process.destroyForcibly();
throw new RuntimeException("任务超时");
}
int exitCode = process.exitValue();
if (exitCode != 0) {
throw new RuntimeException("Python执行失败退出码: " + exitCode);
}
// 7. 解析结果
String jsonResult = output.toString().trim();
List<Map<String, Object>> newsList = objectMapper.readValue(
jsonResult,
objectMapper.getTypeFactory().constructCollectionType(List.class, Map.class)
);
return newsList;
} catch (Exception e) {
logger.error("执行失败", e);
throw new RuntimeException("爬虫任务执行失败", e);
} finally {
if (process != null && process.isAlive()) {
process.destroyForcibly();
}
}
}
/**
* 通用进程执行方法
*/
private ProcessResult executeProcess(List<String> command) throws IOException, InterruptedException {
long startTime = System.currentTimeMillis();
// 创建进程构建器
ProcessBuilder pb = new ProcessBuilder(command);
pb.directory(Paths.get(scriptPath).toFile());
pb.redirectErrorStream(true);
logger.info("执行命令: {}", String.join(" ", command));
// 启动进程
Process process = pb.start();
// 读取输出(必须在单独线程中,避免阻塞)
StringBuilder output = new StringBuilder();
StringBuilder error = new StringBuilder();
// 使用CompletableFuture异步读取避免死锁
CompletableFuture<String> outputFuture = CompletableFuture.supplyAsync(() -> {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append("\n");
logger.debug("Python输出: {}", line);
}
return output.toString();
} catch (IOException e) {
logger.error("读取输出失败", e);
return "";
}
});
// 等待进程结束(带超时)
boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
if (!finished) {
process.destroyForcibly();
throw new RuntimeException("任务超时(超过" + timeout + "秒)");
}
// 获取输出
String outputStr = outputFuture.get(5, TimeUnit.SECONDS);
int exitCode = process.exitValue();
long duration = System.currentTimeMillis() - startTime;
logger.info("进程执行完成 - 退出码: {}, 耗时: {}ms", exitCode, duration);
return new ProcessResult(exitCode, outputStr, duration);
}
/**
* 构建命令列表
*/
private List<String> buildCommand(String... args) {
List<String> command = new ArrayList<>();
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("win")) {
command.add("cmd");
command.add("/c");
command.add(pythonPath);
} else {
command.add(pythonPath);
}
for (String arg : args) {
command.add(arg);
}
return command;
}
/**
* 进程执行结果
*/
private static class ProcessResult {
private final int exitCode;
private final String output;
private final long duration;
public ProcessResult(int exitCode, String output, long duration) {
this.exitCode = exitCode;
this.output = output;
this.duration = duration;
}
public int getExitCode() {
return exitCode;
}
public String getOutput() {
return output;
}
public long getDuration() {
return duration;
}
}
}
```
## 五、关键注意事项
### 1. 必须读取输出流
**错误示例:**
```java
Process process = pb.start();
int exitCode = process.waitFor(); // 可能永远阻塞!
```
**原因:** 如果输出缓冲区满了Python进程会阻塞等待读取
**正确做法:**
```java
Process process = pb.start();
// 必须读取输出流
Thread outputThread = new Thread(() -> {
try (BufferedReader reader = ...) {
// 读取输出
}
});
outputThread.start();
process.waitFor();
```
### 2. 处理编码问题
```java
// 指定UTF-8编码避免中文乱码
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8)
new OutputStreamWriter(process.getOutputStream(), StandardCharsets.UTF_8)
```
### 3. 超时控制
```java
// 使用带超时的waitFor
boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
if (!finished) {
process.destroyForcibly(); // 强制终止
}
```
### 4. 资源清理
```java
try {
// 执行逻辑
} finally {
if (process != null && process.isAlive()) {
process.destroyForcibly();
}
// 关闭流
process.getInputStream().close();
process.getOutputStream().close();
process.getErrorStream().close();
}
```
### 5. 错误处理
```java
// 检查退出码
if (exitCode != 0) {
// 读取错误输出
String error = readErrorStream(process);
throw new RuntimeException("执行失败: " + error);
}
```
## 六、性能优化建议
1. **使用线程池**如果频繁调用使用线程池管理进程
2. **连接复用**考虑Python服务模式HTTP/GRPC
3. **异步执行**使用CompletableFuture异步执行
4. **缓存结果**对相同参数的请求缓存结果
## 七、总结
- **文件传递**适合大数据量当前实现方式
- **标准输出**适合小数据量实时传输
- **标准输入**适合复杂参数双向通信
根据实际需求选择合适的方式当前的文件传递方式已经足够好