Files
schoolNews/schoolNewsServ/crontab/Java调用Python详解.md
2025-11-10 16:03:50 +08:00

19 KiB
Raw Blame History

Java调用Python并获取返回结果详解

一、核心原理

Java通过 ProcessBuilderRuntime.exec() 创建操作系统进程来执行Python脚本然后通过进程的标准输入/输出流进行通信。

二、当前实现详解

1. 构建命令

// 步骤1: 构建命令列表
List<String> command = new ArrayList<>();

// 步骤2: 处理Windows/Linux系统差异
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("win")) {
    // Windows系统需要通过cmd执行
    command.add("cmd");      // 命令解释器
    command.add("/c");       // /c表示执行后关闭
    command.add(pythonPath); // python或python3
} else {
    // Linux/Mac系统直接执行
    command.add(pythonPath);
}

// 步骤3: 添加Python脚本和参数
command.add("main.py");      // Python脚本
command.add(category);       // 参数1: 分类
command.add(limit);          // 参数2: 数量
command.add(outputFile);     // 参数3: 输出文件

命令示例:

  • Windows: cmd /c python main.py politics 20 output/news.json
  • Linux: python3 main.py politics 20 output/news.json

2. 创建进程

// 创建进程构建器
ProcessBuilder processBuilder = new ProcessBuilder(command);

// 设置工作目录Python脚本所在目录
processBuilder.directory(scriptDir.toFile());

// 合并标准输出和错误输出(便于统一读取)
processBuilder.redirectErrorStream(true);

// 启动进程
Process process = processBuilder.start();

关键点:

  • directory(): 设置工作目录确保Python脚本能找到相对路径的资源
  • redirectErrorStream(true): 将stderr合并到stdout方便统一读取
  • start(): 异步启动进程,不会阻塞

3. 读取输出流

// 读取标准输出Python的print输出
StringBuilder output = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
        new InputStreamReader(process.getInputStream(), "UTF-8"))) {
    String line;
    while ((line = reader.readLine()) != null) {
        output.append(line).append("\n");
        logger.debug("Python输出: {}", line);
    }
}

重要说明:

  • process.getInputStream(): 获取Python进程的标准输出
  • 必须读取输出流,否则缓冲区满会导致进程阻塞
  • 使用UTF-8编码避免中文乱码

4. 等待进程结束

// 方式1: 带超时的等待(推荐)
boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);

if (!finished) {
    // 超时后强制终止进程
    process.destroy(); // 或 process.destroyForcibly() 强制终止
    throw new RuntimeException("任务超时");
}

// 方式2: 无限等待(不推荐,可能导致死锁)
int exitCode = process.waitFor();

退出码说明:

  • 0: 执行成功
  • 非0: 执行失败(通常是错误码)

5. 获取返回结果

当前实现通过文件传递方式获取结果:

// Python脚本将结果写入JSON文件
Path outputPath = scriptDir.resolve(outputFile);

// Java读取文件内容
String jsonContent = Files.readString(outputPath);

// 解析JSON
ObjectMapper mapper = new ObjectMapper();
List<Map<String, Object>> newsList = mapper.readValue(
    jsonContent, 
    List.class
);

三、三种数据传递方式对比

方式1: 文件传递(当前实现)

优点:

  • 适合大数据量
  • 数据持久化,便于调试
  • 实现简单

缺点:

  • ⚠️ 需要文件I/O操作
  • ⚠️ 需要管理临时文件
  • ⚠️ 可能有并发问题(文件名冲突)

实现示例:

// Java端
String outputFile = "output/result_" + System.currentTimeMillis() + ".json";
command.add(outputFile);

// Python端
import json
import sys

result = {"status": "success", "data": [...]}
with open(sys.argv[1], 'w', encoding='utf-8') as f:
    json.dump(result, f, ensure_ascii=False)

方式2: 标准输出传递(适合小数据)

优点:

  • 实时传输,无需文件
  • 适合小数据量(< 1MB
  • 无文件管理开销

缺点:

  • ⚠️ 大数据量可能阻塞
  • ⚠️ 不能传递二进制数据
  • ⚠️ 需要与日志输出区分

实现示例:

// Java端读取标准输出
StringBuilder result = new StringBuilder();
try (BufferedReader reader = new BufferedReader(
        new InputStreamReader(process.getInputStream(), "UTF-8"))) {
    String line;
    while ((line = reader.readLine()) != null) {
        // 约定:以特定标记区分结果和日志
        if (line.startsWith("RESULT:")) {
            result.append(line.substring(7)); // 去掉"RESULT:"前缀
        } else {
            logger.info("Python日志: {}", line);
        }
    }
}

// 解析JSON结果
String jsonResult = result.toString();
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> data = mapper.readValue(jsonResult, Map.class);
# Python端输出结果
import json
import sys

# 日志输出到stderr
print("开始爬取...", file=sys.stderr)

# 结果输出到stdout带标记
result = {"status": "success", "data": [...]}
print("RESULT:" + json.dumps(result, ensure_ascii=False))

方式3: 标准输入传递参数(双向通信)

优点:

  • 可以传递复杂参数
  • 支持交互式通信

缺点:

  • ⚠️ 实现复杂
  • ⚠️ 需要处理流关闭时机

实现示例:

// Java端通过标准输入传递参数
ProcessBuilder pb = new ProcessBuilder("python", "script.py");
Process process = pb.start();

// 写入参数到标准输入
try (BufferedWriter writer = new BufferedWriter(
        new OutputStreamWriter(process.getOutputStream(), "UTF-8"))) {
    String params = "{\"category\":\"politics\",\"limit\":20}";
    writer.write(params);
    writer.newLine();
    writer.flush();
}

// 关闭输入流告诉Python输入结束
process.getOutputStream().close();

// 读取输出
// ... 同方式2
# Python端从标准输入读取参数
import json
import sys

# 读取参数
params_json = sys.stdin.readline().strip()
params = json.loads(params_json)

category = params.get("category", "politics")
limit = params.get("limit", 20)

# 执行爬取
result = crawl_news(category, limit)

# 输出结果
print(json.dumps(result, ensure_ascii=False))

四、完整优化实现

改进版实现(支持多种方式)

package org.xyzh.crontab.task.newsTask;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

/**
 * Java调用Python的完整实现
 */
@Component("newsCrewerTask")
public class NewsCrewerTask {

    private static final Logger logger = LoggerFactory.getLogger(NewsCrewerTask.class);
    private final ObjectMapper objectMapper = new ObjectMapper();
    
    @Value("${crewer.python.path:python}")
    private String pythonPath;

    @Value("${crewer.script.path:../schoolNewsCrewer}")
    private String scriptPath;

    @Value("${crewer.timeout:300}")
    private int timeout;

    /**
     * 方式1: 通过文件传递结果(当前实现,适合大数据)
     */
    public List<Map<String, Object>> executeByFile(String category, int limit) {
        logger.info("执行爬虫任务 - 文件方式");
        
        try {
            // 1. 构建命令
            List<String> command = buildCommand("main.py", category, String.valueOf(limit));
            
            // 2. 生成输出文件
            String timestamp = String.valueOf(System.currentTimeMillis());
            String outputFile = String.format("output/news_%s_%s.json", category, timestamp);
            command.add(outputFile);
            
            // 3. 执行进程
            ProcessResult result = executeProcess(command);
            
            if (result.getExitCode() != 0) {
                throw new RuntimeException("Python执行失败: " + result.getOutput());
            }
            
            // 4. 读取结果文件
            Path outputPath = Paths.get(scriptPath).resolve(outputFile);
            if (!Files.exists(outputPath)) {
                throw new RuntimeException("输出文件不存在: " + outputFile);
            }
            
            String jsonContent = Files.readString(outputPath, StandardCharsets.UTF_8);
            List<Map<String, Object>> newsList = objectMapper.readValue(
                jsonContent, 
                objectMapper.getTypeFactory().constructCollectionType(List.class, Map.class)
            );
            
            // 5. 清理临时文件(可选)
            // Files.deleteIfExists(outputPath);
            
            return newsList;
            
        } catch (Exception e) {
            logger.error("执行失败", e);
            throw new RuntimeException("爬虫任务执行失败", e);
        }
    }

    /**
     * 方式2: 通过标准输出传递结果(适合小数据)
     */
    public List<Map<String, Object>> executeByStdout(String category, int limit) {
        logger.info("执行爬虫任务 - 标准输出方式");
        
        try {
            // 1. 构建命令使用特殊脚本输出JSON到stdout
            List<String> command = buildCommand("main_stdout.py", category, String.valueOf(limit));
            
            // 2. 执行进程
            ProcessResult result = executeProcess(command);
            
            if (result.getExitCode() != 0) {
                throw new RuntimeException("Python执行失败: " + result.getOutput());
            }
            
            // 3. 从输出中提取JSON约定最后一行是JSON结果
            String output = result.getOutput();
            String[] lines = output.split("\n");
            
            // 查找JSON行以{或[开头)
            String jsonLine = null;
            for (int i = lines.length - 1; i >= 0; i--) {
                String line = lines[i].trim();
                if (line.startsWith("{") || line.startsWith("[")) {
                    jsonLine = line;
                    break;
                }
            }
            
            if (jsonLine == null) {
                throw new RuntimeException("未找到JSON结果");
            }
            
            // 4. 解析JSON
            List<Map<String, Object>> newsList = objectMapper.readValue(
                jsonLine,
                objectMapper.getTypeFactory().constructCollectionType(List.class, Map.class)
            );
            
            return newsList;
            
        } catch (Exception e) {
            logger.error("执行失败", e);
            throw new RuntimeException("爬虫任务执行失败", e);
        }
    }

    /**
     * 方式3: 通过标准输入传递参数(双向通信)
     */
    public List<Map<String, Object>> executeByStdin(String category, int limit) {
        logger.info("执行爬虫任务 - 标准输入方式");
        
        Process process = null;
        try {
            // 1. 构建命令
            List<String> command = buildCommand("main_stdin.py");
            ProcessBuilder pb = new ProcessBuilder(command);
            pb.directory(Paths.get(scriptPath).toFile());
            pb.redirectErrorStream(true);
            
            // 2. 启动进程
            process = pb.start();
            
            // 3. 写入参数到标准输入
            try (BufferedWriter writer = new BufferedWriter(
                    new OutputStreamWriter(process.getOutputStream(), StandardCharsets.UTF_8))) {
                
                Map<String, Object> params = Map.of(
                    "category", category,
                    "limit", limit
                );
                
                String paramsJson = objectMapper.writeValueAsString(params);
                writer.write(paramsJson);
                writer.newLine();
                writer.flush();
            }
            
            // 4. 关闭输入流(重要!)
            process.getOutputStream().close();
            
            // 5. 读取输出
            StringBuilder output = new StringBuilder();
            try (BufferedReader reader = new BufferedReader(
                    new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    output.append(line).append("\n");
                }
            }
            
            // 6. 等待进程结束
            boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
            if (!finished) {
                process.destroyForcibly();
                throw new RuntimeException("任务超时");
            }
            
            int exitCode = process.exitValue();
            if (exitCode != 0) {
                throw new RuntimeException("Python执行失败退出码: " + exitCode);
            }
            
            // 7. 解析结果
            String jsonResult = output.toString().trim();
            List<Map<String, Object>> newsList = objectMapper.readValue(
                jsonResult,
                objectMapper.getTypeFactory().constructCollectionType(List.class, Map.class)
            );
            
            return newsList;
            
        } catch (Exception e) {
            logger.error("执行失败", e);
            throw new RuntimeException("爬虫任务执行失败", e);
        } finally {
            if (process != null && process.isAlive()) {
                process.destroyForcibly();
            }
        }
    }

    /**
     * 通用进程执行方法
     */
    private ProcessResult executeProcess(List<String> command) throws IOException, InterruptedException {
        long startTime = System.currentTimeMillis();
        
        // 创建进程构建器
        ProcessBuilder pb = new ProcessBuilder(command);
        pb.directory(Paths.get(scriptPath).toFile());
        pb.redirectErrorStream(true);
        
        logger.info("执行命令: {}", String.join(" ", command));
        
        // 启动进程
        Process process = pb.start();
        
        // 读取输出(必须在单独线程中,避免阻塞)
        StringBuilder output = new StringBuilder();
        StringBuilder error = new StringBuilder();
        
        // 使用CompletableFuture异步读取避免死锁
        CompletableFuture<String> outputFuture = CompletableFuture.supplyAsync(() -> {
            try (BufferedReader reader = new BufferedReader(
                    new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    output.append(line).append("\n");
                    logger.debug("Python输出: {}", line);
                }
                return output.toString();
            } catch (IOException e) {
                logger.error("读取输出失败", e);
                return "";
            }
        });
        
        // 等待进程结束(带超时)
        boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
        
        if (!finished) {
            process.destroyForcibly();
            throw new RuntimeException("任务超时(超过" + timeout + "秒)");
        }
        
        // 获取输出
        String outputStr = outputFuture.get(5, TimeUnit.SECONDS);
        
        int exitCode = process.exitValue();
        long duration = System.currentTimeMillis() - startTime;
        
        logger.info("进程执行完成 - 退出码: {}, 耗时: {}ms", exitCode, duration);
        
        return new ProcessResult(exitCode, outputStr, duration);
    }

    /**
     * 构建命令列表
     */
    private List<String> buildCommand(String... args) {
        List<String> command = new ArrayList<>();
        
        String os = System.getProperty("os.name").toLowerCase();
        if (os.contains("win")) {
            command.add("cmd");
            command.add("/c");
            command.add(pythonPath);
        } else {
            command.add(pythonPath);
        }
        
        for (String arg : args) {
            command.add(arg);
        }
        
        return command;
    }

    /**
     * 进程执行结果
     */
    private static class ProcessResult {
        private final int exitCode;
        private final String output;
        private final long duration;

        public ProcessResult(int exitCode, String output, long duration) {
            this.exitCode = exitCode;
            this.output = output;
            this.duration = duration;
        }

        public int getExitCode() {
            return exitCode;
        }

        public String getOutput() {
            return output;
        }

        public long getDuration() {
            return duration;
        }
    }
}

五、关键注意事项

1. 必须读取输出流

错误示例:

Process process = pb.start();
int exitCode = process.waitFor(); // 可能永远阻塞!

原因: 如果输出缓冲区满了Python进程会阻塞等待读取。

正确做法:

Process process = pb.start();

// 必须读取输出流
Thread outputThread = new Thread(() -> {
    try (BufferedReader reader = ...) {
        // 读取输出
    }
});
outputThread.start();

process.waitFor();

2. 处理编码问题

// 指定UTF-8编码避免中文乱码
new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8)
new OutputStreamWriter(process.getOutputStream(), StandardCharsets.UTF_8)

3. 超时控制

// 使用带超时的waitFor
boolean finished = process.waitFor(timeout, TimeUnit.SECONDS);
if (!finished) {
    process.destroyForcibly(); // 强制终止
}

4. 资源清理

try {
    // 执行逻辑
} finally {
    if (process != null && process.isAlive()) {
        process.destroyForcibly();
    }
    // 关闭流
    process.getInputStream().close();
    process.getOutputStream().close();
    process.getErrorStream().close();
}

5. 错误处理

// 检查退出码
if (exitCode != 0) {
    // 读取错误输出
    String error = readErrorStream(process);
    throw new RuntimeException("执行失败: " + error);
}

六、性能优化建议

  1. 使用线程池:如果频繁调用,使用线程池管理进程
  2. 连接复用考虑Python服务模式HTTP/GRPC
  3. 异步执行使用CompletableFuture异步执行
  4. 缓存结果:对相同参数的请求缓存结果

七、总结

  • 文件传递:适合大数据量,当前实现方式
  • 标准输出:适合小数据量,实时传输
  • 标准输入:适合复杂参数,双向通信

根据实际需求选择合适的方式,当前的文件传递方式已经足够好!