feat(compensation): 实现MQ补偿机制(Outbox Pattern) + 安全审计修复
- 新增 compensation_tasks 表 + CompensationTask 实体 + Repository - 新增 CompensationService 补偿任务写入服务 - 新增 CompensationScheduler 定时扫描(CAS抢占+指数退避+失败告警+清理) - 改造 OrderServiceImpl/AdminServiceImpl 4处 afterCommit catch → 写补偿表 - 移除 OrderServiceImpl 未使用的 transactionTemplate - PointsServiceImpl 添加缺失的 @Slf4j - MapperScan 添加 compensation 包扫描 - 审计修复: Class.forName白名单校验、markSuccess/markRetryOrFailed添加status前置条件、CAS后重查防stale snapshot - 更新待实现功能清单
This commit is contained in:
@@ -1,9 +1,13 @@
|
||||
package com.openclaw;
|
||||
|
||||
import org.mybatis.spring.annotation.MapperScan;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableScheduling
|
||||
@MapperScan({"com.openclaw.module.**.repository", "com.openclaw.common.leaf", "com.openclaw.common.compensation"})
|
||||
public class OpenclawApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
@@ -0,0 +1,126 @@
|
||||
package com.openclaw.common.compensation;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class CompensationScheduler {
|
||||
|
||||
private final CompensationTaskRepository taskRepo;
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
/** 允许反序列化的事件类白名单(防止DB被入侵时的任意类实例化) */
|
||||
private static final Set<String> ALLOWED_EVENT_CLASSES = Set.of(
|
||||
"com.openclaw.common.event.OrderPaidEvent",
|
||||
"com.openclaw.common.event.OrderTimeoutEvent",
|
||||
"com.openclaw.common.event.RefundApprovedEvent",
|
||||
"java.lang.String"
|
||||
);
|
||||
|
||||
/** 指数退避间隔(分钟):1, 5, 30, 120, 720 */
|
||||
private static final int[] BACKOFF_MINUTES = {1, 5, 30, 120, 720};
|
||||
|
||||
/** 每5分钟扫描补偿任务 */
|
||||
@Scheduled(cron = "0 */5 * * * ?")
|
||||
public void scanAndRetry() {
|
||||
List<CompensationTask> tasks = taskRepo.findPendingTasks(LocalDateTime.now());
|
||||
if (tasks.isEmpty()) return;
|
||||
|
||||
log.info("[Compensation] 扫描到 {} 条待补偿任务", tasks.size());
|
||||
for (CompensationTask task : tasks) {
|
||||
// CAS抢占:防止多实例重复执行
|
||||
int claimed = taskRepo.casClaimTask(task.getId());
|
||||
if (claimed == 0) continue;
|
||||
|
||||
// 重新查询最新状态,防止stale snapshot导致retryCount/maxRetries不准确
|
||||
CompensationTask freshTask = taskRepo.selectById(task.getId());
|
||||
if (freshTask == null) continue;
|
||||
|
||||
try {
|
||||
executeTask(freshTask);
|
||||
taskRepo.markSuccess(freshTask.getId());
|
||||
log.info("[Compensation] 补偿成功: id={}, type={}, bizKey={}", freshTask.getId(), freshTask.getTaskType(), freshTask.getBizKey());
|
||||
} catch (Exception e) {
|
||||
handleFailure(freshTask, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void executeTask(CompensationTask task) throws Exception {
|
||||
String type = task.getTaskType();
|
||||
if (type != null && type.startsWith("mq_")) {
|
||||
executeMqTask(task);
|
||||
} else {
|
||||
throw new UnsupportedOperationException("未知补偿任务类型: " + type);
|
||||
}
|
||||
}
|
||||
|
||||
private void executeMqTask(CompensationTask task) throws Exception {
|
||||
JsonNode root = objectMapper.readTree(task.getPayload());
|
||||
String exchange = root.get("exchange").asText();
|
||||
String routingKey = root.get("routingKey").asText();
|
||||
String eventJson = root.get("event").asText();
|
||||
String eventClass = root.get("eventClass").asText();
|
||||
|
||||
if (!ALLOWED_EVENT_CLASSES.contains(eventClass)) {
|
||||
throw new SecurityException("不允许的事件类型: " + eventClass);
|
||||
}
|
||||
Object event = objectMapper.readValue(eventJson, Class.forName(eventClass));
|
||||
rabbitTemplate.convertAndSend(exchange, routingKey, event);
|
||||
}
|
||||
|
||||
private void handleFailure(CompensationTask task, Exception e) {
|
||||
int nextRetry = task.getRetryCount() + 1;
|
||||
String errorMsg = e.getMessage();
|
||||
if (errorMsg != null && errorMsg.length() > 500) {
|
||||
errorMsg = errorMsg.substring(0, 500);
|
||||
}
|
||||
|
||||
if (nextRetry >= task.getMaxRetries()) {
|
||||
// 重试耗尽 → 标记failed + 告警日志
|
||||
taskRepo.markRetryOrFailed(task.getId(), "failed", LocalDateTime.now(), errorMsg);
|
||||
log.error("[Compensation] 补偿任务重试耗尽,需人工介入! id={}, type={}, bizKey={}, retries={}",
|
||||
task.getId(), task.getTaskType(), task.getBizKey(), nextRetry);
|
||||
} else {
|
||||
// 指数退避重试
|
||||
int backoffIdx = Math.min(nextRetry, BACKOFF_MINUTES.length - 1);
|
||||
LocalDateTime nextRetryAt = LocalDateTime.now().plusMinutes(BACKOFF_MINUTES[backoffIdx]);
|
||||
taskRepo.markRetryOrFailed(task.getId(), "pending", nextRetryAt, errorMsg);
|
||||
log.warn("[Compensation] 补偿任务第{}次失败,将于{}重试: id={}, bizKey={}",
|
||||
nextRetry, nextRetryAt, task.getId(), task.getBizKey());
|
||||
}
|
||||
}
|
||||
|
||||
/** 每天凌晨3点清理30天前的成功记录 */
|
||||
@Scheduled(cron = "0 0 3 * * ?")
|
||||
public void cleanupOldTasks() {
|
||||
// 使用MyBatis-Plus的条件删除即可
|
||||
LocalDateTime threshold = LocalDateTime.now().minusDays(30);
|
||||
try {
|
||||
// 多实例下可能重复执行DELETE,但DELETE是幂等操作,无需分布式锁
|
||||
LambdaQueryWrapper<CompensationTask> wrapper = new LambdaQueryWrapper<>();
|
||||
wrapper.eq(CompensationTask::getStatus, "success")
|
||||
.lt(CompensationTask::getUpdatedAt, threshold);
|
||||
int deleted = taskRepo.delete(wrapper);
|
||||
if (deleted > 0) {
|
||||
log.info("[Compensation] 清理 {} 条30天前的成功补偿记录", deleted);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[Compensation] 清理旧补偿记录失败", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
package com.openclaw.common.compensation;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Map;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class CompensationService {
|
||||
|
||||
private final CompensationTaskRepository taskRepo;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
/**
|
||||
* 创建MQ补偿任务(MQ发送失败时调用)
|
||||
* @param taskType 任务类型,如 mq_refund_approved
|
||||
* @param bizKey 业务幂等键,如 refund_123
|
||||
* @param exchange MQ交换机
|
||||
* @param routingKey MQ路由键
|
||||
* @param eventPayload 事件对象(会被序列化为JSON)
|
||||
*/
|
||||
public void createMqTask(String taskType, String bizKey, String exchange, String routingKey, Object eventPayload) {
|
||||
try {
|
||||
String payload = objectMapper.writeValueAsString(Map.of(
|
||||
"exchange", exchange,
|
||||
"routingKey", routingKey,
|
||||
"event", objectMapper.writeValueAsString(eventPayload),
|
||||
"eventClass", eventPayload.getClass().getName() // 存储完整类名用于反序列化,CompensationScheduler有白名单校验
|
||||
));
|
||||
|
||||
CompensationTask task = new CompensationTask();
|
||||
task.setTaskType(taskType);
|
||||
task.setBizKey(bizKey);
|
||||
task.setPayload(payload);
|
||||
task.setStatus("pending");
|
||||
task.setRetryCount(0);
|
||||
task.setMaxRetries(5);
|
||||
task.setNextRetryAt(LocalDateTime.now().plusMinutes(1));
|
||||
|
||||
taskRepo.insert(task);
|
||||
log.info("[Compensation] 补偿任务已创建: type={}, bizKey={}", taskType, bizKey);
|
||||
} catch (Exception e) {
|
||||
// 补偿表写入也失败了,只能记日志(最后兜底)
|
||||
log.error("[Compensation] 补偿任务创建失败,需人工介入: type={}, bizKey={}", taskType, bizKey, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.openclaw.common.compensation;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.*;
|
||||
import lombok.Data;
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Data
|
||||
@TableName("compensation_tasks")
|
||||
public class CompensationTask {
|
||||
@TableId(type = IdType.AUTO)
|
||||
private Long id;
|
||||
private String taskType;
|
||||
private String bizKey;
|
||||
private String payload;
|
||||
private String status;
|
||||
private Integer retryCount;
|
||||
private Integer maxRetries;
|
||||
private LocalDateTime nextRetryAt;
|
||||
private String errorMsg;
|
||||
private LocalDateTime createdAt;
|
||||
private LocalDateTime updatedAt;
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.openclaw.common.compensation;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
import org.apache.ibatis.annotations.Select;
|
||||
import org.apache.ibatis.annotations.Update;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
|
||||
public interface CompensationTaskRepository extends BaseMapper<CompensationTask> {
|
||||
|
||||
@Select("SELECT * FROM compensation_tasks WHERE status = 'pending' AND next_retry_at <= #{now} ORDER BY created_at ASC LIMIT 100")
|
||||
List<CompensationTask> findPendingTasks(@Param("now") LocalDateTime now);
|
||||
|
||||
@Update("UPDATE compensation_tasks SET status = 'processing', updated_at = NOW() WHERE id = #{id} AND status = 'pending'")
|
||||
int casClaimTask(@Param("id") Long id);
|
||||
|
||||
@Update("UPDATE compensation_tasks SET status = 'success', updated_at = NOW() WHERE id = #{id} AND status = 'processing'")
|
||||
int markSuccess(@Param("id") Long id);
|
||||
|
||||
@Update("UPDATE compensation_tasks SET status = #{status}, retry_count = retry_count + 1, next_retry_at = #{nextRetryAt}, error_msg = #{errorMsg}, updated_at = NOW() WHERE id = #{id} AND status = 'processing'")
|
||||
int markRetryOrFailed(@Param("id") Long id, @Param("status") String status, @Param("nextRetryAt") LocalDateTime nextRetryAt, @Param("errorMsg") String errorMsg);
|
||||
}
|
||||
@@ -0,0 +1,591 @@
|
||||
package com.openclaw.module.admin.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.openclaw.constant.ErrorCode;
|
||||
import com.openclaw.exception.BusinessException;
|
||||
import com.openclaw.module.admin.dto.AdminLoginDTO;
|
||||
import com.openclaw.module.admin.dto.AdminSkillCreateDTO;
|
||||
import com.openclaw.module.admin.service.AdminService;
|
||||
import com.openclaw.module.admin.vo.*;
|
||||
import com.openclaw.common.event.RefundApprovedEvent;
|
||||
import com.openclaw.common.mq.MQConstants;
|
||||
import com.openclaw.module.order.entity.Order;
|
||||
import com.openclaw.module.order.entity.OrderItem;
|
||||
import com.openclaw.module.order.entity.OrderRefund;
|
||||
import com.openclaw.module.order.repository.OrderItemRepository;
|
||||
import com.openclaw.module.order.repository.OrderRefundRepository;
|
||||
import com.openclaw.module.order.repository.OrderRepository;
|
||||
import com.openclaw.module.order.vo.OrderItemVO;
|
||||
import com.openclaw.module.points.entity.PointsRecord;
|
||||
import com.openclaw.module.points.entity.UserPoints;
|
||||
import com.openclaw.module.points.repository.PointsRecordRepository;
|
||||
import com.openclaw.module.points.repository.UserPointsRepository;
|
||||
import com.openclaw.module.points.service.PointsService;
|
||||
import com.openclaw.module.skill.entity.Skill;
|
||||
import com.openclaw.module.skill.entity.SkillCategory;
|
||||
import com.openclaw.module.skill.entity.SkillReview;
|
||||
import com.openclaw.module.skill.repository.SkillCategoryRepository;
|
||||
import com.openclaw.module.skill.repository.SkillRepository;
|
||||
import com.openclaw.module.skill.repository.SkillReviewRepository;
|
||||
import com.openclaw.module.user.entity.User;
|
||||
import com.openclaw.module.user.repository.UserRepository;
|
||||
import com.openclaw.util.JwtUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
|
||||
import org.springframework.security.crypto.password.PasswordEncoder;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.transaction.support.TransactionSynchronization;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
import com.openclaw.common.compensation.CompensationService;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.LocalTime;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class AdminServiceImpl implements AdminService {
|
||||
|
||||
private final UserRepository userRepo;
|
||||
private final SkillRepository skillRepo;
|
||||
private final SkillCategoryRepository categoryRepo;
|
||||
private final SkillReviewRepository reviewRepo;
|
||||
private final OrderRepository orderRepo;
|
||||
private final OrderItemRepository orderItemRepo;
|
||||
private final OrderRefundRepository refundRepo;
|
||||
private final PointsRecordRepository pointsRecordRepo;
|
||||
private final UserPointsRepository userPointsRepo;
|
||||
private final PointsService pointsService;
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
private final JwtUtil jwtUtil;
|
||||
private final PasswordEncoder passwordEncoder;
|
||||
private final CompensationService compensationService;
|
||||
|
||||
@Override
|
||||
public AdminLoginVO login(AdminLoginDTO dto) {
|
||||
User user = userRepo.findByPhone(dto.getUsername())
|
||||
.orElseThrow(() -> new BusinessException(401, "用户名或密码错误"));
|
||||
if (!passwordEncoder.matches(dto.getPassword(), user.getPasswordHash())) {
|
||||
throw new BusinessException(401, "用户名或密码错误");
|
||||
}
|
||||
String role = user.getRole();
|
||||
if (!"admin".equals(role) && !"super_admin".equals(role)) {
|
||||
throw new BusinessException(403, "无管理员权限");
|
||||
}
|
||||
if ("banned".equals(user.getStatus())) {
|
||||
throw new BusinessException(403, "账号已被封禁");
|
||||
}
|
||||
AdminLoginVO vo = new AdminLoginVO();
|
||||
vo.setToken(jwtUtil.generate(user.getId(), role));
|
||||
vo.setUsername(user.getNickname() != null ? user.getNickname() : user.getPhone());
|
||||
vo.setRole(role);
|
||||
return vo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DashboardStatsVO getDashboardStats() {
|
||||
DashboardStatsVO vo = new DashboardStatsVO();
|
||||
LocalDateTime todayStart = LocalDateTime.of(LocalDate.now(), LocalTime.MIN);
|
||||
|
||||
vo.setTotalUsers(userRepo.selectCount(null));
|
||||
vo.setActiveUsers(userRepo.selectCount(
|
||||
new LambdaQueryWrapper<User>().eq(User::getStatus, "active")));
|
||||
vo.setTotalSkills(skillRepo.selectCount(null));
|
||||
vo.setActiveSkills(skillRepo.selectCount(
|
||||
new LambdaQueryWrapper<Skill>().eq(Skill::getStatus, "approved")));
|
||||
vo.setTotalOrders(orderRepo.selectCount(null));
|
||||
vo.setCompletedOrders(orderRepo.selectCount(
|
||||
new LambdaQueryWrapper<Order>().in(Order::getStatus, "paid", "completed")));
|
||||
|
||||
// 今日数据
|
||||
vo.setTodayNewUsers(userRepo.selectCount(
|
||||
new LambdaQueryWrapper<User>().ge(User::getCreatedAt, todayStart)));
|
||||
vo.setTodayOrders(orderRepo.selectCount(
|
||||
new LambdaQueryWrapper<Order>().ge(Order::getCreatedAt, todayStart)));
|
||||
|
||||
// 积分汇总
|
||||
vo.setTotalPointsIssued(userPointsRepo.sumTotalEarned());
|
||||
vo.setTotalPointsConsumed(userPointsRepo.sumTotalConsumed());
|
||||
vo.setTotalRevenue(orderRepo.sumTotalRevenue());
|
||||
|
||||
return vo;
|
||||
}
|
||||
|
||||
// ==================== 用户管理 ====================
|
||||
|
||||
@Override
|
||||
public IPage<AdminUserVO> listUsers(String keyword, String status, String role, int pageNum, int pageSize) {
|
||||
LambdaQueryWrapper<User> wrapper = new LambdaQueryWrapper<>();
|
||||
if (keyword != null && !keyword.isBlank()) {
|
||||
wrapper.and(w -> w.like(User::getNickname, keyword).or().like(User::getPhone, keyword));
|
||||
}
|
||||
if (status != null && !status.isBlank()) {
|
||||
wrapper.eq(User::getStatus, status);
|
||||
}
|
||||
if (role != null && !role.isBlank()) {
|
||||
wrapper.eq(User::getRole, role);
|
||||
}
|
||||
wrapper.orderByDesc(User::getCreatedAt);
|
||||
|
||||
IPage<User> page = userRepo.selectPage(new Page<>(pageNum, pageSize), wrapper);
|
||||
return page.convert(this::toAdminUserVO);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminUserVO getUserDetail(Long userId) {
|
||||
User user = userRepo.selectById(userId);
|
||||
if (user == null) throw new BusinessException(ErrorCode.USER_NOT_FOUND);
|
||||
return toAdminUserVO(user);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void banUser(Long userId, String reason) {
|
||||
User user = userRepo.selectById(userId);
|
||||
if (user == null) throw new BusinessException(ErrorCode.USER_NOT_FOUND);
|
||||
user.setStatus("banned");
|
||||
user.setBanReason(reason);
|
||||
userRepo.updateById(user);
|
||||
log.info("[Admin] 封禁用户: userId={}, reason={}", userId, reason);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void unbanUser(Long userId) {
|
||||
User user = userRepo.selectById(userId);
|
||||
if (user == null) throw new BusinessException(ErrorCode.USER_NOT_FOUND);
|
||||
user.setStatus("active");
|
||||
user.setBanReason(null);
|
||||
userRepo.updateById(user);
|
||||
log.info("[Admin] 解封用户: userId={}", userId);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void changeUserRole(Long userId, String role) {
|
||||
User user = userRepo.selectById(userId);
|
||||
if (user == null) throw new BusinessException(ErrorCode.USER_NOT_FOUND);
|
||||
user.setRole(role);
|
||||
userRepo.updateById(user);
|
||||
log.info("[Admin] 修改用户角色: userId={}, newRole={}", userId, role);
|
||||
}
|
||||
|
||||
// ==================== Skill管理 ====================
|
||||
|
||||
@Override
|
||||
public IPage<AdminSkillVO> listSkills(String keyword, String status, Integer categoryId, int pageNum, int pageSize) {
|
||||
LambdaQueryWrapper<Skill> wrapper = new LambdaQueryWrapper<>();
|
||||
if (keyword != null && !keyword.isBlank()) {
|
||||
wrapper.like(Skill::getName, keyword);
|
||||
}
|
||||
if (status != null && !status.isBlank()) {
|
||||
wrapper.eq(Skill::getStatus, status);
|
||||
}
|
||||
if (categoryId != null) {
|
||||
wrapper.eq(Skill::getCategoryId, categoryId);
|
||||
}
|
||||
wrapper.orderByDesc(Skill::getCreatedAt);
|
||||
|
||||
IPage<Skill> page = skillRepo.selectPage(new Page<>(pageNum, pageSize), wrapper);
|
||||
return page.convert(this::toAdminSkillVO);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminSkillVO getSkillDetail(Long skillId) {
|
||||
Skill skill = skillRepo.selectById(skillId);
|
||||
if (skill == null) throw new BusinessException(ErrorCode.SKILL_NOT_FOUND);
|
||||
return toAdminSkillVO(skill);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void auditSkill(Long skillId, String action, String rejectReason) {
|
||||
Skill skill = skillRepo.selectById(skillId);
|
||||
if (skill == null) throw new BusinessException(ErrorCode.SKILL_NOT_FOUND);
|
||||
|
||||
if ("approve".equals(action)) {
|
||||
skill.setStatus("approved");
|
||||
skill.setRejectReason(null);
|
||||
} else if ("reject".equals(action)) {
|
||||
skill.setStatus("rejected");
|
||||
skill.setRejectReason(rejectReason);
|
||||
} else {
|
||||
throw new BusinessException(400, "无效的审核操作");
|
||||
}
|
||||
skillRepo.updateById(skill);
|
||||
log.info("[Admin] 审核Skill: skillId={}, action={}", skillId, action);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void offlineSkill(Long skillId) {
|
||||
Skill skill = skillRepo.selectById(skillId);
|
||||
if (skill == null) throw new BusinessException(ErrorCode.SKILL_NOT_FOUND);
|
||||
skill.setStatus("offline");
|
||||
skillRepo.updateById(skill);
|
||||
log.info("[Admin] 下架Skill: skillId={}", skillId);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void toggleFeatured(Long skillId) {
|
||||
Skill skill = skillRepo.selectById(skillId);
|
||||
if (skill == null) throw new BusinessException(ErrorCode.SKILL_NOT_FOUND);
|
||||
Boolean current = skill.getIsFeatured();
|
||||
skill.setIsFeatured(current != null && current ? false : true);
|
||||
skillRepo.updateById(skill);
|
||||
log.info("[Admin] 切换推荐状态: skillId={}, isFeatured={}", skillId, skill.getIsFeatured());
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public AdminSkillVO createSkill(Long adminUserId, AdminSkillCreateDTO dto) {
|
||||
Skill skill = new Skill();
|
||||
skill.setCreatorId(adminUserId);
|
||||
skill.setName(dto.getName());
|
||||
skill.setDescription(dto.getDescription());
|
||||
skill.setCoverImageUrl(dto.getCoverImageUrl());
|
||||
skill.setCategoryId(dto.getCategoryId());
|
||||
skill.setPrice(dto.getPrice());
|
||||
skill.setIsFree(dto.getIsFree());
|
||||
skill.setVersion(dto.getVersion());
|
||||
skill.setFileUrl(dto.getFileUrl());
|
||||
skill.setFileSize(dto.getFileSize());
|
||||
skill.setStatus("approved"); // 管理员上传直接通过审核
|
||||
skill.setDownloadCount(0);
|
||||
skill.setAuditorId(adminUserId);
|
||||
skill.setAuditedAt(LocalDateTime.now());
|
||||
skillRepo.insert(skill);
|
||||
log.info("[Admin] 管理员上传Skill: skillId={}, name={}", skill.getId(), skill.getName());
|
||||
return toAdminSkillVO(skill);
|
||||
}
|
||||
|
||||
// ==================== 订单管理 ====================
|
||||
|
||||
@Override
|
||||
public IPage<AdminOrderVO> listOrders(String keyword, String status, int pageNum, int pageSize) {
|
||||
LambdaQueryWrapper<Order> wrapper = new LambdaQueryWrapper<>();
|
||||
if (keyword != null && !keyword.isBlank()) {
|
||||
wrapper.like(Order::getOrderNo, keyword);
|
||||
}
|
||||
if (status != null && !status.isBlank()) {
|
||||
wrapper.eq(Order::getStatus, status);
|
||||
}
|
||||
wrapper.orderByDesc(Order::getCreatedAt);
|
||||
|
||||
IPage<Order> page = orderRepo.selectPage(new Page<>(pageNum, pageSize), wrapper);
|
||||
return page.convert(this::toAdminOrderVO);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminOrderVO getOrderDetail(Long orderId) {
|
||||
Order order = orderRepo.selectById(orderId);
|
||||
if (order == null) throw new BusinessException(ErrorCode.ORDER_NOT_FOUND);
|
||||
return toAdminOrderVO(order);
|
||||
}
|
||||
|
||||
// ==================== 退款管理 ====================
|
||||
|
||||
@Override
|
||||
public IPage<AdminRefundVO> listRefunds(String keyword, String status, int pageNum, int pageSize) {
|
||||
LambdaQueryWrapper<OrderRefund> wrapper = new LambdaQueryWrapper<>();
|
||||
if (keyword != null && !keyword.isBlank()) {
|
||||
wrapper.like(OrderRefund::getRefundNo, keyword);
|
||||
}
|
||||
if (status != null && !status.isBlank()) {
|
||||
wrapper.eq(OrderRefund::getStatus, status);
|
||||
}
|
||||
wrapper.orderByDesc(OrderRefund::getCreatedAt);
|
||||
|
||||
IPage<OrderRefund> page = refundRepo.selectPage(new Page<>(pageNum, pageSize), wrapper);
|
||||
return page.convert(this::toAdminRefundVO);
|
||||
}
|
||||
|
||||
// ==================== 评论管理 ====================
|
||||
|
||||
@Override
|
||||
public IPage<AdminCommentVO> listComments(String keyword, Long skillId, int pageNum, int pageSize) {
|
||||
LambdaQueryWrapper<SkillReview> wrapper = new LambdaQueryWrapper<>();
|
||||
if (keyword != null && !keyword.isBlank()) {
|
||||
wrapper.like(SkillReview::getContent, keyword);
|
||||
}
|
||||
if (skillId != null) {
|
||||
wrapper.eq(SkillReview::getSkillId, skillId);
|
||||
}
|
||||
wrapper.orderByDesc(SkillReview::getCreatedAt);
|
||||
|
||||
IPage<SkillReview> page = reviewRepo.selectPage(new Page<>(pageNum, pageSize), wrapper);
|
||||
return page.convert(this::toAdminCommentVO);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void deleteComment(Long commentId) {
|
||||
SkillReview review = reviewRepo.selectById(commentId);
|
||||
if (review == null) throw new BusinessException(400, "评论不存在");
|
||||
reviewRepo.deleteById(commentId);
|
||||
log.info("[Admin] 删除评论: commentId={}", commentId);
|
||||
}
|
||||
|
||||
// ==================== 积分管理 ====================
|
||||
|
||||
@Override
|
||||
public IPage<AdminPointsRecordVO> listPointsRecords(Long userId, String pointsType, int pageNum, int pageSize) {
|
||||
LambdaQueryWrapper<PointsRecord> wrapper = new LambdaQueryWrapper<>();
|
||||
if (userId != null) {
|
||||
wrapper.eq(PointsRecord::getUserId, userId);
|
||||
}
|
||||
if (pointsType != null && !pointsType.isBlank()) {
|
||||
wrapper.eq(PointsRecord::getPointsType, pointsType);
|
||||
}
|
||||
wrapper.orderByDesc(PointsRecord::getCreatedAt);
|
||||
|
||||
IPage<PointsRecord> page = pointsRecordRepo.selectPage(new Page<>(pageNum, pageSize), wrapper);
|
||||
return page.convert(this::toAdminPointsRecordVO);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void adjustPoints(Long userId, int amount, String reason) {
|
||||
User user = userRepo.selectById(userId);
|
||||
if (user == null) throw new BusinessException(ErrorCode.USER_NOT_FOUND);
|
||||
|
||||
pointsService.adjustByAdmin(userId, amount, reason);
|
||||
log.info("[Admin] 调整积分: userId={}, amount={}, reason={}", userId, amount, reason);
|
||||
}
|
||||
|
||||
// ==================== VO转换 ====================
|
||||
|
||||
private AdminUserVO toAdminUserVO(User user) {
|
||||
AdminUserVO vo = new AdminUserVO();
|
||||
vo.setId(user.getId());
|
||||
vo.setPhone(user.getPhone());
|
||||
vo.setNickname(user.getNickname());
|
||||
vo.setAvatarUrl(user.getAvatarUrl());
|
||||
vo.setRole(user.getRole());
|
||||
vo.setStatus(user.getStatus());
|
||||
vo.setMemberLevel(user.getMemberLevel());
|
||||
vo.setGrowthValue(user.getGrowthValue());
|
||||
vo.setBanReason(user.getBanReason());
|
||||
vo.setCreatedAt(user.getCreatedAt());
|
||||
vo.setUpdatedAt(user.getUpdatedAt());
|
||||
|
||||
UserPoints points = userPointsRepo.findByUserId(user.getId());
|
||||
if (points != null) {
|
||||
vo.setAvailablePoints(points.getAvailablePoints());
|
||||
}
|
||||
return vo;
|
||||
}
|
||||
|
||||
private AdminSkillVO toAdminSkillVO(Skill skill) {
|
||||
AdminSkillVO vo = new AdminSkillVO();
|
||||
vo.setId(skill.getId());
|
||||
vo.setName(skill.getName());
|
||||
vo.setDescription(skill.getDescription());
|
||||
vo.setCoverImageUrl(skill.getCoverImageUrl());
|
||||
vo.setCategoryId(skill.getCategoryId());
|
||||
vo.setPrice(skill.getPrice());
|
||||
vo.setIsFree(skill.getIsFree());
|
||||
vo.setStatus(skill.getStatus());
|
||||
vo.setRejectReason(skill.getRejectReason());
|
||||
vo.setDownloadCount(skill.getDownloadCount());
|
||||
vo.setRating(skill.getRating());
|
||||
vo.setRatingCount(skill.getRatingCount());
|
||||
vo.setVersion(skill.getVersion());
|
||||
vo.setCreatorId(skill.getCreatorId());
|
||||
vo.setIsFeatured(skill.getIsFeatured());
|
||||
vo.setCreatedAt(skill.getCreatedAt());
|
||||
|
||||
if (skill.getCategoryId() != null) {
|
||||
SkillCategory cat = categoryRepo.selectById(skill.getCategoryId());
|
||||
if (cat != null) vo.setCategoryName(cat.getName());
|
||||
}
|
||||
if (skill.getCreatorId() != null) {
|
||||
User creator = userRepo.selectById(skill.getCreatorId());
|
||||
if (creator != null) vo.setCreatorNickname(creator.getNickname());
|
||||
}
|
||||
return vo;
|
||||
}
|
||||
|
||||
private AdminOrderVO toAdminOrderVO(Order order) {
|
||||
AdminOrderVO vo = new AdminOrderVO();
|
||||
vo.setId(order.getId());
|
||||
vo.setOrderNo(order.getOrderNo());
|
||||
vo.setUserId(order.getUserId());
|
||||
vo.setTotalAmount(order.getTotalAmount());
|
||||
vo.setCashAmount(order.getCashAmount());
|
||||
vo.setPointsUsed(order.getPointsUsed());
|
||||
vo.setStatus(order.getStatus());
|
||||
vo.setStatusLabel(getStatusLabel(order.getStatus()));
|
||||
vo.setPaymentMethod(order.getPaymentMethod());
|
||||
vo.setCreatedAt(order.getCreatedAt());
|
||||
vo.setPaidAt(order.getPaidAt());
|
||||
|
||||
User user = userRepo.selectById(order.getUserId());
|
||||
if (user != null) vo.setUserNickname(user.getNickname());
|
||||
|
||||
List<OrderItem> items = orderItemRepo.selectList(
|
||||
new LambdaQueryWrapper<OrderItem>().eq(OrderItem::getOrderId, order.getId()));
|
||||
vo.setItems(items.stream().map(item -> {
|
||||
OrderItemVO itemVO = new OrderItemVO();
|
||||
itemVO.setSkillId(item.getSkillId());
|
||||
itemVO.setSkillName(item.getSkillName());
|
||||
itemVO.setSkillCover(item.getSkillCover());
|
||||
itemVO.setUnitPrice(item.getUnitPrice());
|
||||
itemVO.setQuantity(item.getQuantity());
|
||||
itemVO.setTotalPrice(item.getTotalPrice());
|
||||
return itemVO;
|
||||
}).collect(Collectors.toList()));
|
||||
return vo;
|
||||
}
|
||||
|
||||
private AdminCommentVO toAdminCommentVO(SkillReview review) {
|
||||
AdminCommentVO vo = new AdminCommentVO();
|
||||
vo.setId(review.getId());
|
||||
vo.setSkillId(review.getSkillId());
|
||||
vo.setUserId(review.getUserId());
|
||||
vo.setRating(review.getRating());
|
||||
vo.setContent(review.getContent());
|
||||
vo.setImages(review.getImages());
|
||||
vo.setHelpfulCount(review.getHelpfulCount());
|
||||
vo.setCreatedAt(review.getCreatedAt());
|
||||
|
||||
Skill skill = skillRepo.selectById(review.getSkillId());
|
||||
if (skill != null) vo.setSkillName(skill.getName());
|
||||
|
||||
User user = userRepo.selectById(review.getUserId());
|
||||
if (user != null) vo.setUserNickname(user.getNickname());
|
||||
return vo;
|
||||
}
|
||||
|
||||
private AdminPointsRecordVO toAdminPointsRecordVO(PointsRecord record) {
|
||||
AdminPointsRecordVO vo = new AdminPointsRecordVO();
|
||||
vo.setId(record.getId());
|
||||
vo.setUserId(record.getUserId());
|
||||
vo.setPointsType(record.getPointsType());
|
||||
vo.setSource(record.getSource());
|
||||
vo.setAmount(record.getAmount());
|
||||
vo.setBalance(record.getBalance());
|
||||
vo.setDescription(record.getDescription());
|
||||
vo.setCreatedAt(record.getCreatedAt());
|
||||
|
||||
User user = userRepo.selectById(record.getUserId());
|
||||
if (user != null) vo.setUserNickname(user.getNickname());
|
||||
return vo;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void approveRefund(Long refundId, Long operatorId) {
|
||||
OrderRefund refund = refundRepo.selectById(refundId);
|
||||
if (refund == null) throw new BusinessException(ErrorCode.PARAM_ERROR);
|
||||
if (!"pending".equals(refund.getStatus())) {
|
||||
throw new BusinessException(ErrorCode.ORDER_STATUS_ERROR);
|
||||
}
|
||||
|
||||
refund.setStatus("approved");
|
||||
refund.setOperatorId(operatorId);
|
||||
refund.setProcessedAt(LocalDateTime.now());
|
||||
refundRepo.updateById(refund);
|
||||
|
||||
Order order = orderRepo.selectById(refund.getOrderId());
|
||||
if (order != null) {
|
||||
order.setStatus("refunded");
|
||||
orderRepo.updateById(order);
|
||||
}
|
||||
|
||||
// 事务提交后再发MQ,防止事务回滚但消息已发出的不一致问题
|
||||
RefundApprovedEvent event = new RefundApprovedEvent(
|
||||
refund.getId(), refund.getOrderId(),
|
||||
order != null ? order.getUserId() : null,
|
||||
refund.getRefundAmount(), refund.getRefundPoints());
|
||||
final Long logRefundId = refundId;
|
||||
final Long logOrderId = refund.getOrderId();
|
||||
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
|
||||
@Override
|
||||
public void afterCommit() {
|
||||
try {
|
||||
rabbitTemplate.convertAndSend(MQConstants.EXCHANGE_TOPIC, MQConstants.RK_REFUND_APPROVED, event);
|
||||
log.info("[Admin] 退款审批通过,MQ已发送: refundId={}, orderId={}", logRefundId, logOrderId);
|
||||
} catch (Exception e) {
|
||||
log.error("[Admin] 退款审批MQ发送失败,写入补偿表: refundId={}", logRefundId, e);
|
||||
compensationService.createMqTask("mq_refund_approved", "refund_" + logRefundId,
|
||||
MQConstants.EXCHANGE_TOPIC, MQConstants.RK_REFUND_APPROVED, event);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void rejectRefund(Long refundId, String rejectReason, Long operatorId) {
|
||||
OrderRefund refund = refundRepo.selectById(refundId);
|
||||
if (refund == null) throw new BusinessException(ErrorCode.PARAM_ERROR);
|
||||
if (!"pending".equals(refund.getStatus())) {
|
||||
throw new BusinessException(ErrorCode.ORDER_STATUS_ERROR);
|
||||
}
|
||||
|
||||
refund.setStatus("rejected");
|
||||
refund.setRejectReason(rejectReason);
|
||||
refund.setOperatorId(operatorId);
|
||||
refund.setProcessedAt(LocalDateTime.now());
|
||||
refundRepo.updateById(refund);
|
||||
|
||||
// 恢复订单到退款前的原始状态
|
||||
Order order = orderRepo.selectById(refund.getOrderId());
|
||||
if (order != null && "refunding".equals(order.getStatus())) {
|
||||
String restoreStatus = refund.getPreviousOrderStatus();
|
||||
if (restoreStatus == null || restoreStatus.isEmpty()) {
|
||||
restoreStatus = "completed";
|
||||
}
|
||||
order.setStatus(restoreStatus);
|
||||
orderRepo.updateById(order);
|
||||
}
|
||||
log.info("[Admin] 退款已拒绝: refundId={}, reason={}, operatorId={}", refundId, rejectReason, operatorId);
|
||||
}
|
||||
|
||||
private AdminRefundVO toAdminRefundVO(OrderRefund refund) {
|
||||
AdminRefundVO vo = new AdminRefundVO();
|
||||
vo.setId(refund.getId());
|
||||
vo.setOrderId(refund.getOrderId());
|
||||
vo.setRefundNo(refund.getRefundNo());
|
||||
vo.setRefundAmount(refund.getRefundAmount());
|
||||
vo.setRefundPoints(refund.getRefundPoints());
|
||||
vo.setReason(refund.getReason());
|
||||
vo.setStatus(refund.getStatus());
|
||||
vo.setRejectReason(refund.getRejectReason());
|
||||
vo.setOperatorId(refund.getOperatorId());
|
||||
vo.setProcessedAt(refund.getProcessedAt());
|
||||
vo.setCreatedAt(refund.getCreatedAt());
|
||||
|
||||
Order order = orderRepo.selectById(refund.getOrderId());
|
||||
if (order != null) {
|
||||
vo.setOrderNo(order.getOrderNo());
|
||||
vo.setUserId(order.getUserId());
|
||||
User user = userRepo.selectById(order.getUserId());
|
||||
if (user != null) vo.setUserNickname(user.getNickname());
|
||||
}
|
||||
return vo;
|
||||
}
|
||||
|
||||
private String getStatusLabel(String status) {
|
||||
return switch (status) {
|
||||
case "pending" -> "待支付";
|
||||
case "paid" -> "已支付";
|
||||
case "completed" -> "已完成";
|
||||
case "cancelled" -> "已取消";
|
||||
case "refunding" -> "退款中";
|
||||
case "refunded" -> "已退款";
|
||||
default -> status;
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -18,11 +18,16 @@ import com.openclaw.module.order.vo.*;
|
||||
import com.openclaw.common.event.OrderPaidEvent;
|
||||
import com.openclaw.common.event.OrderTimeoutEvent;
|
||||
import com.openclaw.common.mq.MQConstants;
|
||||
import com.openclaw.common.compensation.CompensationService;
|
||||
import com.openclaw.module.coupon.service.CouponService;
|
||||
import com.openclaw.module.coupon.vo.CouponCalcResultVO;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.transaction.support.TransactionSynchronization;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.RoundingMode;
|
||||
@@ -46,9 +51,16 @@ public class OrderServiceImpl implements OrderService {
|
||||
private final SkillService skillService;
|
||||
private final IdGenerator idGenerator;
|
||||
private final RabbitTemplate rabbitTemplate;
|
||||
private final CompensationService compensationService;
|
||||
private final CouponService couponService;
|
||||
|
||||
@Override
|
||||
public OrderPreviewVO previewOrder(Long userId, List<Long> skillIds, Integer pointsToUse) {
|
||||
return previewOrder(userId, skillIds, pointsToUse, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public OrderPreviewVO previewOrder(Long userId, List<Long> skillIds, Integer pointsToUse, Long couponId) {
|
||||
// 1. 查询 Skill 价格
|
||||
List<Skill> skills = skillRepo.selectBatchIds(skillIds);
|
||||
if (skills.isEmpty()) throw new BusinessException(ErrorCode.SKILL_NOT_FOUND);
|
||||
@@ -73,6 +85,20 @@ public class OrderServiceImpl implements OrderService {
|
||||
.divide(BigDecimal.valueOf(POINTS_RATE), 2, RoundingMode.DOWN);
|
||||
BigDecimal cash = totalAmount.subtract(deduct).max(BigDecimal.ZERO);
|
||||
|
||||
// 5b. 优惠券抵扣
|
||||
BigDecimal couponDeduct = BigDecimal.ZERO;
|
||||
String couponName = null;
|
||||
Long appliedCouponId = null;
|
||||
if (couponId != null) {
|
||||
CouponCalcResultVO calcResult = couponService.calcDiscount(userId, couponId, cash);
|
||||
if (Boolean.TRUE.equals(calcResult.getApplicable())) {
|
||||
couponDeduct = calcResult.getCouponDeductAmount();
|
||||
couponName = calcResult.getCouponName();
|
||||
appliedCouponId = couponId;
|
||||
cash = cash.subtract(couponDeduct).max(BigDecimal.ZERO);
|
||||
}
|
||||
}
|
||||
|
||||
// 6. 组装返回
|
||||
OrderPreviewVO vo = new OrderPreviewVO();
|
||||
vo.setItems(skills.stream().map(s -> {
|
||||
@@ -91,6 +117,9 @@ public class OrderServiceImpl implements OrderService {
|
||||
vo.setCashAmount(cash);
|
||||
vo.setAvailablePoints(availablePoints);
|
||||
vo.setMaxPointsCanUse(maxPoints);
|
||||
vo.setCouponId(appliedCouponId);
|
||||
vo.setCouponName(couponName);
|
||||
vo.setCouponDeductAmount(couponDeduct);
|
||||
vo.setPointsRate(POINTS_RATE);
|
||||
return vo;
|
||||
}
|
||||
@@ -131,6 +160,19 @@ public class OrderServiceImpl implements OrderService {
|
||||
.divide(BigDecimal.valueOf(POINTS_RATE), 2, RoundingMode.DOWN);
|
||||
BigDecimal cashAmount = totalAmount.subtract(pointsDeductAmount).max(BigDecimal.ZERO);
|
||||
|
||||
// 4b. 优惠券抵扣
|
||||
BigDecimal couponDeductAmount = BigDecimal.ZERO;
|
||||
Long couponId = dto.getCouponId();
|
||||
if (couponId != null) {
|
||||
CouponCalcResultVO calcResult = couponService.calcDiscount(userId, couponId, cashAmount);
|
||||
if (Boolean.TRUE.equals(calcResult.getApplicable())) {
|
||||
couponDeductAmount = calcResult.getCouponDeductAmount();
|
||||
cashAmount = cashAmount.subtract(couponDeductAmount).max(BigDecimal.ZERO);
|
||||
} else {
|
||||
throw new BusinessException(ErrorCode.COUPON_NOT_USABLE);
|
||||
}
|
||||
}
|
||||
|
||||
// 4.1 自动判定支付方式
|
||||
String paymentMethod = dto.getPaymentMethod();
|
||||
if (cashAmount.compareTo(BigDecimal.ZERO) == 0 && pointsToUse > 0) {
|
||||
@@ -147,6 +189,8 @@ public class OrderServiceImpl implements OrderService {
|
||||
order.setCashAmount(cashAmount);
|
||||
order.setPointsUsed(pointsToUse);
|
||||
order.setPointsDeductAmount(pointsDeductAmount);
|
||||
order.setCouponId(couponId);
|
||||
order.setCouponDeductAmount(couponDeductAmount);
|
||||
order.setStatus("pending");
|
||||
order.setPaymentMethod(paymentMethod);
|
||||
order.setExpiredAt(LocalDateTime.now().plusHours(1));
|
||||
@@ -170,28 +214,45 @@ public class OrderServiceImpl implements OrderService {
|
||||
pointsService.freezePoints(userId, pointsToUse, order.getId());
|
||||
}
|
||||
|
||||
// 8. 纯积分支付:直接扣减冻结积分并完成订单
|
||||
if (cashAmount.compareTo(BigDecimal.ZERO) == 0 && pointsToUse > 0) {
|
||||
pointsService.consumeFrozenPoints(userId, pointsToUse, order.getId());
|
||||
// 7b. 核销优惠券
|
||||
if (couponId != null) {
|
||||
couponService.useCoupon(userId, couponId, order.getId());
|
||||
}
|
||||
|
||||
// 8. 免现金支付(纯积分 或 优惠券全额抵扣):直接完成订单
|
||||
if (cashAmount.compareTo(BigDecimal.ZERO) == 0) {
|
||||
if (pointsToUse > 0) {
|
||||
pointsService.consumeFrozenPoints(userId, pointsToUse, order.getId());
|
||||
}
|
||||
order.setStatus("completed");
|
||||
order.setPaidAt(LocalDateTime.now());
|
||||
orderRepo.updateById(order);
|
||||
// 发放 Skill 访问权限
|
||||
String grantSource = pointsToUse > 0 ? "points" : "coupon";
|
||||
for (Skill skill : skills) {
|
||||
skillService.grantAccess(userId, skill.getId(), order.getId(), "points");
|
||||
skillService.grantAccess(userId, skill.getId(), order.getId(), grantSource);
|
||||
}
|
||||
log.info("纯积分订单直接完成: orderId={}, points={}", order.getId(), pointsToUse);
|
||||
log.info("免现金订单直接完成: orderId={}, points={}, couponId={}", order.getId(), pointsToUse, couponId);
|
||||
return toVO(order, skills);
|
||||
}
|
||||
|
||||
// 9. 非纯积分:发送订单超时延迟消息(1小时后自动取消)
|
||||
try {
|
||||
OrderTimeoutEvent timeoutEvent = new OrderTimeoutEvent(order.getId(), userId, order.getOrderNo());
|
||||
rabbitTemplate.convertAndSend(MQConstants.EXCHANGE_TOPIC, "delay.order.create", timeoutEvent);
|
||||
log.info("[MQ] 发送订单超时延迟消息: orderId={}, orderNo={}", order.getId(), order.getOrderNo());
|
||||
} catch (Exception e) {
|
||||
log.error("[MQ] 发送订单超时延迟消息失败: orderId={}", order.getId(), e);
|
||||
}
|
||||
// 9. 非纯积分:事务提交后发送订单超时延迟消息(1小时后自动取消)
|
||||
final Long finalOrderId = order.getId();
|
||||
final String finalOrderNo = order.getOrderNo();
|
||||
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
|
||||
@Override
|
||||
public void afterCommit() {
|
||||
OrderTimeoutEvent timeoutEvent = new OrderTimeoutEvent(finalOrderId, userId, finalOrderNo);
|
||||
try {
|
||||
rabbitTemplate.convertAndSend(MQConstants.EXCHANGE_TOPIC, "delay.order.create", timeoutEvent);
|
||||
log.info("[MQ] 发送订单超时延迟消息: orderId={}, orderNo={}", finalOrderId, finalOrderNo);
|
||||
} catch (Exception e) {
|
||||
log.error("[MQ] 发送订单超时延迟消息失败,写入补偿表: orderId={}", finalOrderId, e);
|
||||
compensationService.createMqTask("mq_order_timeout", "order_timeout_" + finalOrderId,
|
||||
MQConstants.EXCHANGE_TOPIC, "delay.order.create", timeoutEvent);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return toVO(order, skills);
|
||||
}
|
||||
@@ -252,26 +313,24 @@ public class OrderServiceImpl implements OrderService {
|
||||
order.setStatus("paid");
|
||||
order.setPaidAt(now);
|
||||
|
||||
// 发布订单支付成功事件(异步发放Skill访问权限)
|
||||
try {
|
||||
OrderPaidEvent event = new OrderPaidEvent(order.getId(), userId, order.getOrderNo(), paymentNo);
|
||||
rabbitTemplate.convertAndSend(MQConstants.EXCHANGE_TOPIC, MQConstants.RK_ORDER_PAID, event);
|
||||
log.info("[MQ] 发布订单支付事件: orderId={}, orderNo={}", order.getId(), order.getOrderNo());
|
||||
} catch (Exception e) {
|
||||
log.error("[MQ] 发布订单支付事件失败,降级同步处理: orderId={}", order.getId(), e);
|
||||
List<OrderItem> items = orderItemRepo.selectList(
|
||||
new LambdaQueryWrapper<OrderItem>().eq(OrderItem::getOrderId, orderId));
|
||||
for (OrderItem item : items) {
|
||||
skillService.grantAccess(userId, item.getSkillId(), orderId, "paid");
|
||||
// 事务提交后发布订单支付成功事件(异步发放Skill访问权限)
|
||||
final Long payOrderId = orderId;
|
||||
final Long payUserId = userId;
|
||||
final String payOrderNo = order.getOrderNo();
|
||||
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
|
||||
@Override
|
||||
public void afterCommit() {
|
||||
OrderPaidEvent event = new OrderPaidEvent(payOrderId, payUserId, payOrderNo, paymentNo);
|
||||
try {
|
||||
rabbitTemplate.convertAndSend(MQConstants.EXCHANGE_TOPIC, MQConstants.RK_ORDER_PAID, event);
|
||||
log.info("[MQ] 发布订单支付事件: orderId={}, orderNo={}", payOrderId, payOrderNo);
|
||||
} catch (Exception e) {
|
||||
log.error("[MQ] 发布订单支付事件失败,写入补偿表: orderId={}", payOrderId, e);
|
||||
compensationService.createMqTask("mq_order_paid", "order_paid_" + payOrderId,
|
||||
MQConstants.EXCHANGE_TOPIC, MQConstants.RK_ORDER_PAID, event);
|
||||
}
|
||||
}
|
||||
// MQ 失败降级:同步消费冻结积分
|
||||
if (order.getPointsUsed() != null && order.getPointsUsed() > 0) {
|
||||
pointsService.consumeFrozenPoints(userId, order.getPointsUsed(), orderId);
|
||||
}
|
||||
// MQ 失败降级:同步完成订单状态转换
|
||||
order.setStatus("completed");
|
||||
orderRepo.updateById(order);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -293,13 +352,27 @@ public class OrderServiceImpl implements OrderService {
|
||||
pointsService.unfreezePoints(userId, order.getPointsUsed(), orderId);
|
||||
}
|
||||
|
||||
// 发布订单取消事件
|
||||
try {
|
||||
rabbitTemplate.convertAndSend(MQConstants.EXCHANGE_TOPIC, MQConstants.RK_ORDER_CANCELLED, order.getOrderNo());
|
||||
log.info("[MQ] 发布订单取消事件: orderId={}, orderNo={}", orderId, order.getOrderNo());
|
||||
} catch (Exception e) {
|
||||
log.error("[MQ] 发布订单取消事件失败: orderId={}", orderId, e);
|
||||
// 退还优惠券
|
||||
if (order.getCouponId() != null) {
|
||||
couponService.returnCoupon(order.getCouponId());
|
||||
}
|
||||
|
||||
// 事务提交后发布订单取消事件
|
||||
final String cancelOrderNo = order.getOrderNo();
|
||||
final Long cancelOrderId = orderId;
|
||||
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
|
||||
@Override
|
||||
public void afterCommit() {
|
||||
try {
|
||||
rabbitTemplate.convertAndSend(MQConstants.EXCHANGE_TOPIC, MQConstants.RK_ORDER_CANCELLED, cancelOrderNo);
|
||||
log.info("[MQ] 发布订单取消事件: orderId={}, orderNo={}", cancelOrderId, cancelOrderNo);
|
||||
} catch (Exception e) {
|
||||
log.error("[MQ] 发布订单取消事件失败,写入补偿表: orderId={}", cancelOrderId, e);
|
||||
compensationService.createMqTask("mq_order_cancelled", "order_cancel_" + cancelOrderId,
|
||||
MQConstants.EXCHANGE_TOPIC, MQConstants.RK_ORDER_CANCELLED, cancelOrderNo);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -309,9 +382,17 @@ public class OrderServiceImpl implements OrderService {
|
||||
if (order == null || !order.getUserId().equals(userId)) {
|
||||
throw new BusinessException(ErrorCode.ORDER_NOT_FOUND);
|
||||
}
|
||||
if (!"paid".equals(order.getStatus())) {
|
||||
if (!"paid".equals(order.getStatus()) && !"completed".equals(order.getStatus())) {
|
||||
throw new BusinessException(ErrorCode.ORDER_STATUS_ERROR);
|
||||
}
|
||||
Long refundCount = refundRepo.selectCount(
|
||||
new LambdaQueryWrapper<OrderRefund>()
|
||||
.eq(OrderRefund::getOrderId, orderId)
|
||||
.in(OrderRefund::getStatus, "pending", "approved", "completed")
|
||||
);
|
||||
if (refundCount != null && refundCount > 0) {
|
||||
throw new BusinessException(409, "该订单已有退款申请,请勿重复提交");
|
||||
}
|
||||
|
||||
OrderRefund refund = new OrderRefund();
|
||||
refund.setOrderId(orderId);
|
||||
@@ -320,9 +401,14 @@ public class OrderServiceImpl implements OrderService {
|
||||
refund.setRefundPoints(order.getPointsUsed());
|
||||
refund.setReason(dto.getReason());
|
||||
if (dto.getImages() != null) {
|
||||
refund.setImages(dto.getImages().toString());
|
||||
try {
|
||||
refund.setImages(new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(dto.getImages()));
|
||||
} catch (Exception e) {
|
||||
refund.setImages(dto.getImages().toString());
|
||||
}
|
||||
}
|
||||
refund.setStatus("pending");
|
||||
refund.setPreviousOrderStatus(order.getStatus());
|
||||
refundRepo.insert(refund);
|
||||
|
||||
order.setStatus("refunding");
|
||||
@@ -337,6 +423,8 @@ public class OrderServiceImpl implements OrderService {
|
||||
vo.setCashAmount(order.getCashAmount());
|
||||
vo.setPointsUsed(order.getPointsUsed());
|
||||
vo.setPointsDeductAmount(order.getPointsDeductAmount());
|
||||
vo.setCouponId(order.getCouponId());
|
||||
vo.setCouponDeductAmount(order.getCouponDeductAmount());
|
||||
vo.setStatus(order.getStatus());
|
||||
vo.setStatusLabel(getStatusLabel(order.getStatus()));
|
||||
vo.setPaymentMethod(order.getPaymentMethod());
|
||||
@@ -364,6 +452,8 @@ public class OrderServiceImpl implements OrderService {
|
||||
vo.setCashAmount(order.getCashAmount());
|
||||
vo.setPointsUsed(order.getPointsUsed());
|
||||
vo.setPointsDeductAmount(order.getPointsDeductAmount());
|
||||
vo.setCouponId(order.getCouponId());
|
||||
vo.setCouponDeductAmount(order.getCouponDeductAmount());
|
||||
vo.setStatus(order.getStatus());
|
||||
vo.setStatusLabel(getStatusLabel(order.getStatus()));
|
||||
vo.setPaymentMethod(order.getPaymentMethod());
|
||||
|
||||
@@ -6,15 +6,21 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.openclaw.constant.ErrorCode;
|
||||
import com.openclaw.module.points.entity.*;
|
||||
import com.openclaw.exception.BusinessException;
|
||||
import com.openclaw.module.member.service.MemberService;
|
||||
import com.openclaw.module.points.repository.*;
|
||||
import com.openclaw.module.points.service.PointsService;
|
||||
import com.openclaw.module.points.vo.*;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class PointsServiceImpl implements PointsService {
|
||||
@@ -22,6 +28,13 @@ public class PointsServiceImpl implements PointsService {
|
||||
private final UserPointsRepository userPointsRepo;
|
||||
private final PointsRecordRepository recordRepo;
|
||||
private final PointsRuleRepository ruleRepo;
|
||||
private final PointsBatchRepository batchRepo;
|
||||
private final @Lazy MemberService memberService;
|
||||
|
||||
/** 默认积分有效期(天) */
|
||||
private static final int DEFAULT_EXPIRE_DAYS = 365;
|
||||
/** 永不过期占位时间 */
|
||||
private static final LocalDateTime NEVER_EXPIRE = LocalDateTime.of(2099, 12, 31, 23, 59, 59);
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
@@ -40,7 +53,16 @@ public class PointsServiceImpl implements PointsService {
|
||||
public PointsBalanceVO getBalance(Long userId) {
|
||||
UserPoints up = userPointsRepo.findByUserId(userId);
|
||||
PointsBalanceVO vo = new PointsBalanceVO();
|
||||
if (up == null) return vo;
|
||||
if (up == null) {
|
||||
vo.setAvailablePoints(0);
|
||||
vo.setFrozenPoints(0);
|
||||
vo.setTotalEarned(0);
|
||||
vo.setTotalConsumed(0);
|
||||
vo.setSignInStreak(0);
|
||||
vo.setSignedInToday(false);
|
||||
vo.setExpiringPoints(0);
|
||||
return vo;
|
||||
}
|
||||
vo.setAvailablePoints(up.getAvailablePoints());
|
||||
vo.setFrozenPoints(up.getFrozenPoints());
|
||||
vo.setTotalEarned(up.getTotalEarned());
|
||||
@@ -48,6 +70,10 @@ public class PointsServiceImpl implements PointsService {
|
||||
vo.setLastSignInDate(up.getLastSignInDate());
|
||||
vo.setSignInStreak(up.getSignInStreak());
|
||||
vo.setSignedInToday(LocalDate.now().equals(up.getLastSignInDate()));
|
||||
// 查询7天内即将过期的积分
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
int expiringPoints = batchRepo.sumExpiringPoints(userId, now, now.plusDays(7));
|
||||
vo.setExpiringPoints(expiringPoints);
|
||||
return vo;
|
||||
}
|
||||
|
||||
@@ -65,6 +91,10 @@ public class PointsServiceImpl implements PointsService {
|
||||
@Transactional
|
||||
public int signIn(Long userId) {
|
||||
UserPoints up = userPointsRepo.findByUserId(userId);
|
||||
if (up == null) {
|
||||
initUserPoints(userId);
|
||||
up = userPointsRepo.findByUserId(userId);
|
||||
}
|
||||
LocalDate today = LocalDate.now();
|
||||
|
||||
// 今日已签到
|
||||
@@ -77,14 +107,19 @@ public class PointsServiceImpl implements PointsService {
|
||||
today.minusDays(1).equals(up.getLastSignInDate());
|
||||
int streak = consecutive ? up.getSignInStreak() + 1 : 1;
|
||||
|
||||
// 签到积分:连续签到递增,最高20分
|
||||
int points = Math.min(5 + (streak - 1) * 1, 20);
|
||||
// 签到积分:连续签到递增,最高20分,乘以会员倍率
|
||||
int basePoints = Math.min(5 + (streak - 1) * 1, 20);
|
||||
java.math.BigDecimal multiplier = memberService.getSignInMultiplier(userId);
|
||||
int points = (int) Math.round(basePoints * multiplier.doubleValue());
|
||||
|
||||
up.setLastSignInDate(today);
|
||||
up.setSignInStreak(streak);
|
||||
userPointsRepo.updateById(up);
|
||||
// 只更新签到字段,避免 updateById 覆盖 availablePoints
|
||||
userPointsRepo.updateSignIn(userId, today, streak);
|
||||
|
||||
addPoints(userId, "earn", "sign_in", points, points, "每日签到", null, null);
|
||||
int newBalance = up.getAvailablePoints() + points;
|
||||
addPoints(userId, "earn", "sign_in", points, newBalance, "每日签到", null, null);
|
||||
createBatch(userId, "sign_in", points, null, null);
|
||||
// 签到获得成长值+1
|
||||
memberService.addGrowth(userId, 1, "sign_in", null, "每日签到");
|
||||
return points;
|
||||
}
|
||||
|
||||
@@ -95,25 +130,37 @@ public class PointsServiceImpl implements PointsService {
|
||||
if (rule == null || !rule.getEnabled()) return;
|
||||
|
||||
UserPoints up = userPointsRepo.findByUserId(userId);
|
||||
if (up == null) {
|
||||
initUserPoints(userId);
|
||||
up = userPointsRepo.findByUserId(userId);
|
||||
}
|
||||
int newBalance = up.getAvailablePoints() + rule.getPointsAmount();
|
||||
addPoints(userId, "earn", source, rule.getPointsAmount(), newBalance,
|
||||
rule.getRuleName(), relatedId, relatedType);
|
||||
createBatch(userId, source, rule.getPointsAmount(), relatedId, relatedType);
|
||||
// 赚取积分同步增加成长值
|
||||
int growth = getGrowthForSource(source);
|
||||
if (growth > 0) {
|
||||
memberService.addGrowth(userId, growth, source, relatedId, rule.getRuleName());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void consumePoints(Long userId, int amount, Long relatedId, String relatedType) {
|
||||
UserPoints up = userPointsRepo.findByUserId(userId);
|
||||
if (up.getAvailablePoints() < amount) throw new BusinessException(ErrorCode.POINTS_NOT_ENOUGH);
|
||||
if (up == null || up.getAvailablePoints() < amount) throw new BusinessException(ErrorCode.POINTS_NOT_ENOUGH);
|
||||
int newBalance = up.getAvailablePoints() - amount;
|
||||
addPoints(userId, "consume", "skill_purchase", -amount, newBalance,
|
||||
"兑换Skill", relatedId, relatedType);
|
||||
consumeBatchesFIFO(userId, amount);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void freezePoints(Long userId, int amount, Long orderId) {
|
||||
userPointsRepo.freezePoints(userId, amount);
|
||||
int rows = userPointsRepo.freezePoints(userId, amount);
|
||||
if (rows == 0) throw new BusinessException(ErrorCode.POINTS_NOT_ENOUGH);
|
||||
addPoints(userId, "freeze", "skill_purchase", -amount,
|
||||
userPointsRepo.findByUserId(userId).getAvailablePoints(),
|
||||
"积分冻结-订单" + orderId, orderId, "order");
|
||||
@@ -122,36 +169,176 @@ public class PointsServiceImpl implements PointsService {
|
||||
@Override
|
||||
@Transactional
|
||||
public void unfreezePoints(Long userId, int amount, Long orderId) {
|
||||
userPointsRepo.unfreezePoints(userId, amount);
|
||||
int rows = userPointsRepo.unfreezePoints(userId, amount);
|
||||
if (rows == 0) throw new BusinessException(ErrorCode.POINTS_NOT_ENOUGH);
|
||||
addPoints(userId, "unfreeze", "skill_purchase", amount,
|
||||
userPointsRepo.findByUserId(userId).getAvailablePoints(),
|
||||
"积分解冻-订单取消" + orderId, orderId, "order");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void consumeFrozenPoints(Long userId, int amount, Long orderId) {
|
||||
int rows = userPointsRepo.consumeFrozenPoints(userId, amount);
|
||||
if (rows == 0) throw new BusinessException(ErrorCode.POINTS_NOT_ENOUGH);
|
||||
UserPoints up = userPointsRepo.findByUserId(userId);
|
||||
PointsRecord r = new PointsRecord();
|
||||
r.setUserId(userId);
|
||||
r.setPointsType("consume");
|
||||
r.setSource("skill_purchase");
|
||||
r.setAmount(-amount);
|
||||
r.setBalance(up.getAvailablePoints());
|
||||
r.setDescription("积分消费-订单" + orderId);
|
||||
r.setRelatedId(orderId);
|
||||
r.setRelatedType("order");
|
||||
recordRepo.insert(r);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasEnoughPoints(Long userId, int required) {
|
||||
UserPoints up = userPointsRepo.findByUserId(userId);
|
||||
return up != null && up.getAvailablePoints() >= required;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void adjustByAdmin(Long userId, int amount, String reason) {
|
||||
UserPoints up = userPointsRepo.findByUserId(userId);
|
||||
if (up == null) throw new BusinessException(ErrorCode.USER_NOT_FOUND);
|
||||
int newBalance = up.getAvailablePoints() + amount;
|
||||
if (newBalance < 0) throw new BusinessException(ErrorCode.POINTS_NOT_ENOUGH);
|
||||
String type = amount >= 0 ? "earn" : "consume";
|
||||
String source = amount >= 0 ? "admin_add" : "admin_deduct";
|
||||
String desc = reason != null && !reason.isBlank() ? reason : "管理员调整";
|
||||
addPoints(userId, type, source, amount, newBalance, desc, null, "admin_adjust");
|
||||
if (amount > 0) {
|
||||
createBatch(userId, "admin_add", amount, null, "admin_adjust");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void addRechargePoints(Long userId, int totalPoints, Long rechargeOrderId) {
|
||||
UserPoints up = userPointsRepo.findByUserId(userId);
|
||||
if (up == null) {
|
||||
initUserPoints(userId);
|
||||
up = userPointsRepo.findByUserId(userId);
|
||||
}
|
||||
int newBalance = up.getAvailablePoints() + totalPoints;
|
||||
addPoints(userId, "earn", "recharge", totalPoints, newBalance,
|
||||
"充值赠送积分", rechargeOrderId, "recharge_order");
|
||||
createBatch(userId, "recharge", totalPoints, rechargeOrderId, "recharge_order");
|
||||
// 充值获得成长值 = 积分/10,最少1
|
||||
int growth = Math.max(1, totalPoints / 10);
|
||||
memberService.addGrowth(userId, growth, "recharge", rechargeOrderId, "充值赠送");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void refundPoints(Long userId, int amount, Long orderId) {
|
||||
UserPoints up = userPointsRepo.findByUserId(userId);
|
||||
if (up == null) return;
|
||||
int newBalance = up.getAvailablePoints() + amount;
|
||||
addPoints(userId, "earn", "refund", amount, newBalance,
|
||||
"退款退还积分", orderId, "order");
|
||||
createBatch(userId, "refund", amount, orderId, "order");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void freezeForActivity(Long userId, int amount, Long activityId, String activityTitle) {
|
||||
int rows = userPointsRepo.freezePoints(userId, amount);
|
||||
if (rows == 0) throw new BusinessException(ErrorCode.POINTS_NOT_ENOUGH);
|
||||
addPoints(userId, "freeze", "activity_freeze", -amount,
|
||||
userPointsRepo.findByUserId(userId).getAvailablePoints(),
|
||||
"活动冻结-" + activityTitle, activityId, "activity");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void unfreezeForActivity(Long userId, int amount, Long activityId, String activityTitle) {
|
||||
int rows = userPointsRepo.unfreezePoints(userId, amount);
|
||||
if (rows == 0) throw new BusinessException(ErrorCode.POINTS_NOT_ENOUGH);
|
||||
addPoints(userId, "unfreeze", "activity_unfreeze", amount,
|
||||
userPointsRepo.findByUserId(userId).getAvailablePoints(),
|
||||
"活动解冻-" + activityTitle, activityId, "activity");
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void consumeFrozenForActivity(Long userId, int amount, Long activityId, String activityTitle) {
|
||||
int rows = userPointsRepo.consumeFrozenPoints(userId, amount);
|
||||
if (rows == 0) throw new BusinessException(ErrorCode.POINTS_NOT_ENOUGH);
|
||||
UserPoints up = userPointsRepo.findByUserId(userId);
|
||||
PointsRecord r = new PointsRecord();
|
||||
r.setUserId(userId);
|
||||
r.setPointsType("consume");
|
||||
r.setSource("activity");
|
||||
r.setAmount(-amount);
|
||||
r.setBalance(up.getAvailablePoints());
|
||||
r.setDescription("活动消费-" + activityTitle);
|
||||
r.setRelatedId(activityId);
|
||||
r.setRelatedType("activity");
|
||||
recordRepo.insert(r);
|
||||
consumeBatchesFIFO(userId, amount);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public int expirePoints() {
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
List<PointsBatch> expiredBatches = batchRepo.findExpiredBatches(now);
|
||||
int totalExpired = 0;
|
||||
|
||||
for (PointsBatch batch : expiredBatches) {
|
||||
int expireAmount = batch.getRemainAmount();
|
||||
if (expireAmount <= 0) continue;
|
||||
|
||||
int rows = userPointsRepo.addAvailablePoints(batch.getUserId(), -expireAmount);
|
||||
if (rows == 0) continue;
|
||||
|
||||
batchRepo.markExpired(batch.getId());
|
||||
|
||||
PointsRecord record = new PointsRecord();
|
||||
record.setUserId(batch.getUserId());
|
||||
record.setPointsType("expire");
|
||||
record.setSource("expire");
|
||||
record.setAmount(-expireAmount);
|
||||
record.setBalance(userPointsRepo.findByUserId(batch.getUserId()).getAvailablePoints());
|
||||
record.setDescription("积分过期: 来源=" + batch.getSource() + ", 获得时间=" + batch.getEarnedAt().toLocalDate());
|
||||
record.setRelatedId(batch.getId());
|
||||
record.setRelatedType("points_batch");
|
||||
recordRepo.insert(record);
|
||||
|
||||
totalExpired += expireAmount;
|
||||
}
|
||||
return totalExpired;
|
||||
}
|
||||
|
||||
private void addPoints(Long userId, String type, String source, int amount,
|
||||
int balance, String desc, Long relatedId, String relatedType) {
|
||||
// 更新账户
|
||||
// 更新账户(乐观锁:WHERE available_points + amount >= 0)
|
||||
if ("earn".equals(type)) {
|
||||
userPointsRepo.addAvailablePoints(userId, amount);
|
||||
int rows = userPointsRepo.addAvailablePoints(userId, amount);
|
||||
if (rows == 0) throw new BusinessException(ErrorCode.POINTS_NOT_ENOUGH);
|
||||
userPointsRepo.addTotalEarned(userId, amount);
|
||||
} else if ("consume".equals(type)) {
|
||||
userPointsRepo.addAvailablePoints(userId, amount); // amount为负数
|
||||
int rows = userPointsRepo.addAvailablePoints(userId, amount); // amount为负数
|
||||
if (rows == 0) throw new BusinessException(ErrorCode.POINTS_NOT_ENOUGH);
|
||||
userPointsRepo.addTotalConsumed(userId, -amount);
|
||||
}
|
||||
|
||||
// 原子更新后重新读取实际余额,避免并发下 balance 不准确
|
||||
UserPoints updatedPoints = userPointsRepo.findByUserId(userId);
|
||||
int actualBalance = updatedPoints != null ? updatedPoints.getAvailablePoints() : balance;
|
||||
|
||||
// 记录流水
|
||||
PointsRecord r = new PointsRecord();
|
||||
r.setUserId(userId);
|
||||
r.setPointsType(type);
|
||||
r.setSource(source);
|
||||
r.setAmount(amount);
|
||||
r.setBalance(balance);
|
||||
r.setBalance(actualBalance);
|
||||
r.setDescription(desc);
|
||||
r.setRelatedId(relatedId);
|
||||
r.setRelatedType(relatedType);
|
||||
@@ -171,6 +358,71 @@ public class PointsServiceImpl implements PointsService {
|
||||
return vo;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void createPointsBatch(Long userId, String source, int amount, Long relatedId, String relatedType) {
|
||||
if (amount <= 0) {
|
||||
log.warn("[Points] createPointsBatch amount非法,跳过: userId={}, source={}, amount={}", userId, source, amount);
|
||||
return;
|
||||
}
|
||||
createBatch(userId, source, amount, relatedId, relatedType);
|
||||
}
|
||||
|
||||
/** 创建积分批次(用于过期追踪),根据 source 从 points_rules 读取差异化有效期 */
|
||||
private void createBatch(Long userId, String source, int amount, Long relatedId, String relatedType) {
|
||||
int expireDays = getExpireDaysBySource(source);
|
||||
LocalDateTime expireAt = expireDays > 0
|
||||
? LocalDateTime.now().plusDays(expireDays)
|
||||
: NEVER_EXPIRE; // 0=永不过期
|
||||
PointsBatch batch = new PointsBatch();
|
||||
batch.setUserId(userId);
|
||||
batch.setSource(source);
|
||||
batch.setOriginalAmount(amount);
|
||||
batch.setRemainAmount(amount);
|
||||
batch.setEarnedAt(LocalDateTime.now());
|
||||
batch.setExpireAt(expireAt);
|
||||
batch.setStatus("active");
|
||||
batch.setRelatedId(relatedId);
|
||||
batch.setRelatedType(relatedType);
|
||||
batchRepo.insert(batch);
|
||||
}
|
||||
|
||||
/** 根据积分来源从 points_rules 表获取有效期天数,null则用默认值,0=永不过期 */
|
||||
private int getExpireDaysBySource(String source) {
|
||||
PointsRule rule = ruleRepo.findBySource(source);
|
||||
if (rule != null && rule.getExpireDays() != null) {
|
||||
return rule.getExpireDays();
|
||||
}
|
||||
return DEFAULT_EXPIRE_DAYS;
|
||||
}
|
||||
|
||||
/** FIFO消费批次积分 */
|
||||
private void consumeBatchesFIFO(Long userId, int amount) {
|
||||
List<PointsBatch> batches = batchRepo.findActiveBatchesByUserId(userId);
|
||||
int remaining = amount;
|
||||
for (PointsBatch batch : batches) {
|
||||
if (remaining <= 0) break;
|
||||
int deduct = Math.min(remaining, batch.getRemainAmount());
|
||||
batchRepo.deductBatch(batch.getId(), deduct);
|
||||
remaining -= deduct;
|
||||
if (batch.getRemainAmount() - deduct == 0) {
|
||||
batchRepo.markConsumed(batch.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** 根据积分来源返回对应的成长值 */
|
||||
private int getGrowthForSource(String source) {
|
||||
return switch (source) {
|
||||
case "register" -> 5;
|
||||
case "invite" -> 10;
|
||||
case "invited" -> 5;
|
||||
case "join_community" -> 5;
|
||||
case "review" -> 3;
|
||||
default -> 0;
|
||||
};
|
||||
}
|
||||
|
||||
private String getSourceLabel(String source) {
|
||||
return switch (source) {
|
||||
case "register" -> "新用户注册";
|
||||
@@ -182,7 +434,14 @@ public class PointsServiceImpl implements PointsService {
|
||||
case "review" -> "发表评价";
|
||||
case "activity" -> "活动奖励";
|
||||
case "admin_adjust" -> "管理员调整";
|
||||
default -> source;
|
||||
case "admin_add" -> "管理员增加";
|
||||
case "admin_deduct" -> "管理员扣减";
|
||||
case "refund" -> "退款退还";
|
||||
case "invited" -> "接受邀请";
|
||||
case "expire" -> "积分过期";
|
||||
case "activity_freeze" -> "活动冻结";
|
||||
case "activity_unfreeze" -> "活动解冻";
|
||||
default -> source;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
-- 通用补偿任务表(Outbox Pattern)
|
||||
-- 用于MQ发送失败、异步操作失败的重试补偿
|
||||
CREATE TABLE compensation_tasks (
|
||||
id BIGINT AUTO_INCREMENT PRIMARY KEY,
|
||||
task_type VARCHAR(50) NOT NULL COMMENT '任务类型: mq_refund_approved / mq_order_timeout / mq_order_paid / mq_order_cancelled',
|
||||
biz_key VARCHAR(100) NOT NULL COMMENT '业务幂等键: 如 refund_{id}, order_timeout_{id}',
|
||||
payload TEXT NOT NULL COMMENT 'JSON格式任务参数(exchange, routingKey, eventJson)',
|
||||
status VARCHAR(20) NOT NULL DEFAULT 'pending' COMMENT 'pending/processing/success/failed',
|
||||
retry_count INT NOT NULL DEFAULT 0 COMMENT '已重试次数',
|
||||
max_retries INT NOT NULL DEFAULT 5 COMMENT '最大重试次数',
|
||||
next_retry_at DATETIME NOT NULL COMMENT '下次重试时间',
|
||||
error_msg TEXT COMMENT '最近一次失败原因',
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||||
UNIQUE KEY uk_biz_key (biz_key),
|
||||
INDEX idx_status_retry (status, next_retry_at)
|
||||
) COMMENT '通用补偿任务表';
|
||||
Reference in New Issue
Block a user