会话总结工作流接入、前后端处理

This commit is contained in:
2026-01-01 15:12:29 +08:00
parent 4e373e6d2c
commit eb15706ccc
22 changed files with 1738 additions and 43 deletions

View File

@@ -572,6 +572,45 @@ public class DifyApiClient {
}
}
// ===================== 工作流 API =====================
/**
* 执行工作流(阻塞模式)
*/
public WorkflowRunResponse runWorkflowBlocking(WorkflowRunRequest request, String apiKey) {
String url = difyConfig.getFullApiUrl("/workflows/run");
try {
// 设置为阻塞模式
request.setResponseMode("blocking");
String jsonBody = JSON.toJSONString(request);
logger.info("调用Dify工作流接口: url={}, request={}", url, jsonBody);
Request httpRequest = new Request.Builder()
.url(url)
.header("Authorization", "Bearer " + getApiKey(apiKey))
.header("Content-Type", "application/json")
.post(RequestBody.create(jsonBody, MediaType.parse("application/json")))
.build();
try (Response response = httpClient.newCall(httpRequest).execute()) {
String responseBody = response.body() != null ? response.body().string() : "";
if (!response.isSuccessful()) {
logger.error("工作流执行失败: {} - {}", response.code(), responseBody);
throw new DifyException("工作流执行失败: " + responseBody);
}
logger.info("工作流执行成功: {}", responseBody);
return JSON.parseObject(responseBody, WorkflowRunResponse.class);
}
} catch (IOException e) {
logger.error("工作流执行异常", e);
throw new DifyException("工作流执行异常: " + e.getMessage(), e);
}
}
/**
* 停止对话生成
*/

View File

@@ -0,0 +1,34 @@
package org.xyzh.ai.client.dto;
import com.alibaba.fastjson2.annotation.JSONField;
import lombok.Data;
import java.util.Map;
/**
* @description 工作流执行请求
* @filename WorkflowRunRequest.java
* @author system
* @copyright xyzh
* @since 2026-01-01
*/
@Data
public class WorkflowRunRequest {
/**
* 输入变量Dify 工作流 API 必需字段)
*/
@JSONField(serializeFeatures = com.alibaba.fastjson2.JSONWriter.Feature.WriteMapNullValue)
private Map<String, Object> inputs = new java.util.HashMap<>();
/**
* 响应模式streaming流式、blocking阻塞
*/
@JSONField(name = "response_mode")
private String responseMode = "blocking";
/**
* 用户标识
*/
private String user;
}

View File

@@ -0,0 +1,86 @@
package org.xyzh.ai.client.dto;
import com.alibaba.fastjson2.annotation.JSONField;
import lombok.Data;
import java.util.Map;
/**
* @description 工作流执行响应(阻塞模式)
* @filename WorkflowRunResponse.java
* @author system
* @copyright xyzh
* @since 2026-01-01
*/
@Data
public class WorkflowRunResponse {
/**
* 工作流执行ID
*/
@JSONField(name = "workflow_run_id")
private String workflowRunId;
/**
* 任务ID
*/
@JSONField(name = "task_id")
private String taskId;
/**
* 工作流执行数据
*/
private WorkflowData data;
@Data
public static class WorkflowData {
/**
* 执行ID
*/
private String id;
/**
* 工作流ID
*/
@JSONField(name = "workflow_id")
private String workflowId;
/**
* 执行状态running、succeeded、failed、stopped
*/
private String status;
/**
* 工作流输出结果
*/
private Map<String, Object> outputs;
/**
* 错误信息
*/
private String error;
/**
* 执行耗时(秒)
*/
@JSONField(name = "elapsed_time")
private Double elapsedTime;
/**
* 总Token数
*/
@JSONField(name = "total_tokens")
private Integer totalTokens;
/**
* 创建时间
*/
@JSONField(name = "created_at")
private Long createdAt;
/**
* 完成时间
*/
@JSONField(name = "finished_at")
private Long finishedAt;
}
}

View File

@@ -13,6 +13,9 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.xyzh.ai.client.DifyApiClient;
import org.xyzh.ai.client.callback.StreamCallback;
import org.xyzh.ai.client.dto.ChatRequest;
import org.xyzh.ai.client.dto.ChatResponse;
import org.xyzh.ai.client.dto.WorkflowRunRequest;
import org.xyzh.ai.client.dto.WorkflowRunResponse;
import org.xyzh.ai.config.DifyConfig;
import org.xyzh.ai.mapper.TbChatMapper;
import org.xyzh.ai.mapper.TbChatMessageMapper;
@@ -60,6 +63,7 @@ public class AgentChatServiceImpl implements AgentChatService {
private static final Logger logger = LoggerFactory.getLogger(AgentChatServiceImpl.class);
private static final String CHAT_SESSION_PREFIX = "ai:chat:session:";
private static final String WORKFLOW_SESSION_PREFIX = "ai:workflow:session:";
private static final long SESSION_TTL = 5 * 60;
@Autowired
@@ -325,11 +329,15 @@ public class AgentChatServiceImpl implements AgentChatService {
sessionData.put("service", prepareData.getService());
sessionData.put("isGuest", isGuest);
sessionData.put("inputsMap", inputsMap); // 存储处理好的 inputs
sessionData.put("appType", prepareData.getAppType()); // 存储应用类型
String cacheKey = CHAT_SESSION_PREFIX + sessionId;
// 根据应用类型选择不同的Redis key前缀
String appType = prepareData.getAppType();
String prefix = "workflow".equals(appType) ? WORKFLOW_SESSION_PREFIX : CHAT_SESSION_PREFIX;
String cacheKey = prefix + sessionId;
redisService.set(cacheKey, sessionData, SESSION_TTL, TimeUnit.SECONDS);
logger.info("准备对话会话: sessionId={}, agentId={}", sessionId, agentId);
logger.info("准备{}会话: sessionId={}, agentId={}", appType, sessionId, agentId);
return ResultDomain.success("准备成功", sessionId);
}
@@ -479,6 +487,109 @@ public class AgentChatServiceImpl implements AgentChatService {
return emitter;
}
@Override
public ResultDomain<String> blockingChatMessageWithSession(String sessionId) {
try {
// 1. 从Redis获取会话数据
String cacheKey = CHAT_SESSION_PREFIX + sessionId;
@SuppressWarnings("unchecked")
Map<String, Object> sessionData = redisService.get(cacheKey, Map.class);
if (sessionData == null) {
return ResultDomain.failure("会话已过期");
}
// 2. 解析会话数据
String agentId = (String) sessionData.get("agentId");
String chatId = (String) sessionData.get("chatId");
String query = (String) sessionData.get("query");
String userId = (String) sessionData.get("userId");
String apiKey = (String) sessionData.get("apiKey");
@SuppressWarnings("unchecked")
Map<String, Object> inputsMap = (Map<String, Object>) sessionData.get("inputsMap");
@SuppressWarnings("unchecked")
List<DifyFileInfo> filesData = (List<DifyFileInfo>) sessionData.get("filesData");
// 3. 删除已使用的会话数据
redisService.delete(cacheKey);
// 4. 保存用户消息(如果有 chatId 的话)
if (StringUtils.hasText(chatId)) {
String userMessageId = IdUtil.getSnowflakeId();
TbChatMessage userMessage = new TbChatMessage();
userMessage.setOptsn(IdUtil.getOptsn());
userMessage.setMessageId(userMessageId);
userMessage.setChatId(chatId);
userMessage.setRole("user");
userMessage.setContent(query);
// 提取系统文件ID列表保存到消息中
if (filesData != null && !filesData.isEmpty()) {
List<String> sysFileIds = filesData.stream()
.map(DifyFileInfo::getSysFileId)
.filter(StringUtils::hasText)
.collect(java.util.stream.Collectors.toList());
if (!sysFileIds.isEmpty()) {
userMessage.setFiles(sysFileIds);
}
}
chatMessageMapper.insertChatMessage(userMessage);
}
// 5. 构建Dify请求
ChatRequest chatRequest = new ChatRequest();
chatRequest.setQuery(query);
chatRequest.setUser(userId);
chatRequest.setResponseMode("blocking");
// 使用从Redis获取的inputsMap如果为空则创建新的
if (inputsMap == null) {
inputsMap = new HashMap<>();
}
chatRequest.setInputs(inputsMap); // Dify API 要求 inputs 必传
if (filesData != null && !filesData.isEmpty()) {
chatRequest.setFiles(filesData);
}
// 6. 调用Dify阻塞式接口
logger.info("调用Dify阻塞式接口: agentId={}, userId={}", agentId, userId);
ChatResponse chatResponse = difyApiClient.blockingChat(chatRequest, apiKey);
if (chatResponse == null || chatResponse.getAnswer() == null) {
return ResultDomain.failure("工作流返回结果为空");
}
String answer = chatResponse.getAnswer();
// 7. 保存AI回复消息如果有 chatId 的话)
if (StringUtils.hasText(chatId)) {
String aiMessageId = IdUtil.getSnowflakeId();
TbChatMessage aiMessage = new TbChatMessage();
aiMessage.setOptsn(IdUtil.getOptsn());
aiMessage.setMessageId(aiMessageId);
aiMessage.setDifyMessageId(chatResponse.getMessageId());
aiMessage.setChatId(chatId);
aiMessage.setRole("ai");
aiMessage.setContent(answer);
chatMessageMapper.insertChatMessage(aiMessage);
logger.info("阻塞式对话完成: chatId={}, aiMessageId={}", chatId, aiMessageId);
} else {
logger.info("阻塞式对话完成无chatId: userId={}", userId);
}
return ResultDomain.success("对话成功", answer);
} catch (Exception e) {
logger.error("阻塞式对话异常: sessionId={}", sessionId, e);
return ResultDomain.failure("对话失败: " + e.getMessage());
}
}
@Override
public ResultDomain<Boolean> stopChatMessageByTaskId(TbChat filter, String taskId) {
// 1. 获取智能体
@@ -547,4 +658,116 @@ public class AgentChatServiceImpl implements AgentChatService {
return ResultDomain.failure("评价失败");
}
@Override
public ResultDomain<String> runWorkflowWithSession(String sessionId) {
try {
// 1. 从Redis获取会话数据使用workflow前缀
String cacheKey = WORKFLOW_SESSION_PREFIX + sessionId;
Map<String, Object> sessionData = redisService.get(cacheKey, Map.class);
if (sessionData == null) {
return ResultDomain.failure("会话已过期");
}
// 2. 解析会话数据
String agentId = (String) sessionData.get("agentId");
String userId = (String) sessionData.get("userId");
String apiKey = (String) sessionData.get("apiKey");
Map<String, Object> inputsMap = (Map<String, Object>) sessionData.get("inputsMap");
// 3. 删除已使用的会话数据
redisService.delete(cacheKey);
// 4. 构建工作流请求
WorkflowRunRequest workflowRequest = new WorkflowRunRequest();
workflowRequest.setInputs(inputsMap != null ? inputsMap : new HashMap<>());
workflowRequest.setResponseMode("blocking");
workflowRequest.setUser(userId);
logger.info("执行工作流: agentId={}, userId={}, sessionId={}", agentId, userId, sessionId);
// 5. 调用Dify工作流接口
WorkflowRunResponse workflowResponse = difyApiClient.runWorkflowBlocking(workflowRequest, apiKey);
if (workflowResponse == null || workflowResponse.getData() == null) {
return ResultDomain.failure("工作流执行失败:返回结果为空");
}
// 6. 检查工作流执行状态
String status = workflowResponse.getData().getStatus();
if (!"succeeded".equals(status)) {
String error = workflowResponse.getData().getError();
logger.error("工作流执行失败: status={}, error={}", status, error);
return ResultDomain.failure("工作流执行失败: " + (error != null ? error : status));
}
// 7. 提取outputs
Map<String, Object> outputs = workflowResponse.getData().getOutputs();
if (outputs == null) {
return ResultDomain.failure("工作流执行失败outputs为空");
}
// 8. 将outputs转为JSON字符串返回
String outputsJson = JSON.toJSONString(outputs);
logger.info("工作流执行成功: agentId={}, workflowRunId={}", agentId, workflowResponse.getWorkflowRunId());
return ResultDomain.success("工作流执行成功", outputsJson);
} catch (Exception e) {
logger.error("工作流执行异常: sessionId={}", sessionId, e);
return ResultDomain.failure("工作流执行异常: " + e.getMessage());
}
}
@Override
public ResultDomain<String> runWorkflowBlocking(String agentId, Map<String, Object> inputs, String userId) {
try {
// 1. 获取智能体信息
ResultDomain<TbAgent> agentResult = agentService.selectAgentById(agentId);
if (!agentResult.getSuccess() || agentResult.getData() == null) {
return ResultDomain.failure("智能体不存在");
}
TbAgent agent = agentResult.getData();
// 2. 构建工作流请求
WorkflowRunRequest workflowRequest = new WorkflowRunRequest();
workflowRequest.setInputs(inputs);
workflowRequest.setResponseMode("blocking");
workflowRequest.setUser(userId);
logger.info("执行工作流: agentId={}, userId={}, inputs={}", agentId, userId, JSON.toJSONString(inputs));
// 3. 调用Dify工作流接口
WorkflowRunResponse workflowResponse = difyApiClient.runWorkflowBlocking(workflowRequest, agent.getApiKey());
if (workflowResponse == null || workflowResponse.getData() == null) {
return ResultDomain.failure("工作流执行失败:返回结果为空");
}
// 4. 检查工作流执行状态
String status = workflowResponse.getData().getStatus();
if (!"succeeded".equals(status)) {
String error = workflowResponse.getData().getError();
logger.error("工作流执行失败: status={}, error={}", status, error);
return ResultDomain.failure("工作流执行失败: " + (error != null ? error : status));
}
// 5. 提取outputs
Map<String, Object> outputs = workflowResponse.getData().getOutputs();
if (outputs == null) {
return ResultDomain.failure("工作流执行失败outputs为空");
}
// 6. 将outputs转为JSON字符串返回
String outputsJson = JSON.toJSONString(outputs);
logger.info("工作流执行成功: agentId={}, workflowRunId={}", agentId, workflowResponse.getWorkflowRunId());
return ResultDomain.success("工作流执行成功", outputsJson);
} catch (Exception e) {
logger.error("工作流执行异常: agentId={}, userId={}", agentId, userId, e);
return ResultDomain.failure("工作流执行异常: " + e.getMessage());
}
}
}