Files
AIGC/demo/TASK_QUEUE_README.md
AIGC Developer 8c55f9f376 feat: 完成代码逻辑错误修复和任务清理系统实现
主要更新:
- 修复了所有主要的代码逻辑错误
- 实现了完整的任务清理系统
- 添加了系统设置页面的任务清理管理功能
- 修复了API调用认证问题
- 优化了密码加密和验证机制
- 统一了错误处理模式
- 添加了详细的文档和测试工具

新增功能:
- 任务清理管理界面
- 任务归档和清理日志
- API监控和诊断工具
- 完整的测试套件

技术改进:
- 修复了Repository方法调用错误
- 统一了模型方法调用
- 改进了类型安全性
- 优化了代码结构和可维护性
2025-10-27 10:46:49 +08:00

5.1 KiB
Raw Blame History

任务队列系统

概述

任务队列系统用于管理用户的视频生成任务,实现以下功能:

  • 任务限制: 每个用户最多同时有3个待处理任务
  • 定时检查: 每2分钟自动检查一次任务状态
  • 优先级管理: 支持任务优先级排序
  • 状态跟踪: 完整的任务状态生命周期管理
  • 自动清理: 定期清理过期任务

系统架构

核心组件

  1. TaskQueue 实体类: 任务队列数据模型
  2. TaskQueueRepository: 数据访问层
  3. TaskQueueService: 业务逻辑层
  4. TaskQueueScheduler: 定时任务调度器
  5. TaskQueueApiController: API控制器

任务状态

  • PENDING: 等待处理
  • PROCESSING: 正在处理
  • COMPLETED: 已完成
  • FAILED: 失败
  • CANCELLED: 已取消
  • TIMEOUT: 超时

使用方法

1. 添加任务到队列

// 文生视频任务
taskQueueService.addTextToVideoTask(username, taskId);

// 图生视频任务
taskQueueService.addImageToVideoTask(username, taskId);

2. 获取用户任务队列

List<TaskQueue> userTasks = taskQueueService.getUserTaskQueue(username);

3. 取消任务

boolean cancelled = taskQueueService.cancelTask(taskId, username);

4. 获取统计信息

long totalCount = taskQueueService.getUserTaskCount(username);

API接口

获取用户任务队列

GET /api/task-queue/user-tasks
Authorization: Bearer <token>

取消任务

POST /api/task-queue/cancel/{taskId}
Authorization: Bearer <token>

获取队列统计

GET /api/task-queue/stats
Authorization: Bearer <token>

手动处理任务(管理员)

POST /api/task-queue/process-pending
Authorization: Bearer <token>

手动检查状态(管理员)

POST /api/task-queue/check-statuses
Authorization: Bearer <token>

定时任务

1. 处理待处理任务

  • 频率: 每30秒
  • 功能: 处理队列中的待处理任务提交到外部API

2. 检查任务状态

  • 频率: 每2分钟
  • 功能: 检查正在处理的任务状态,更新任务状态

3. 清理过期任务

  • 频率: 每天凌晨2点
  • 功能: 清理超过7天的任务记录

4. 队列状态监控

  • 频率: 每5分钟
  • 功能: 记录队列运行状态

配置参数

任务限制

  • 每个用户最多3个待处理任务
  • 可通过修改 MAX_TASKS_PER_USER 常量调整

超时设置

  • 默认最大检查次数30次
  • 检查间隔2分钟
  • 总超时时间60分钟
  • 可通过修改 maxCheckCount 字段调整

清理策略

  • 过期时间7天
  • 可通过修改 cleanupExpiredTasks 方法调整

数据库表结构

CREATE TABLE task_queue (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    username VARCHAR(100) NOT NULL,
    task_id VARCHAR(50) NOT NULL UNIQUE,
    task_type ENUM('TEXT_TO_VIDEO', 'IMAGE_TO_VIDEO') NOT NULL,
    status ENUM('PENDING', 'PROCESSING', 'COMPLETED', 'FAILED', 'CANCELLED', 'TIMEOUT') NOT NULL DEFAULT 'PENDING',
    priority INT NOT NULL DEFAULT 0,
    real_task_id VARCHAR(100),
    last_check_time DATETIME,
    check_count INT NOT NULL DEFAULT 0,
    max_check_count INT NOT NULL DEFAULT 30,
    error_message TEXT,
    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    completed_at DATETIME
);

集成说明

现有服务修改

  1. TextToVideoService:

    • 移除了原有的异步处理逻辑
    • 改为添加任务到队列
  2. ImageToVideoService:

    • 移除了原有的异步处理逻辑
    • 改为添加任务到队列

工作流程

  1. 用户创建任务 → 保存到数据库 → 添加到队列
  2. 定时任务处理队列 → 提交到外部API → 更新状态为PROCESSING
  3. 定时任务检查状态 → 查询外部API → 更新任务状态
  4. 任务完成/失败 → 更新原始任务状态 → 从队列中移除

监控和日志

关键日志

  • 任务添加到队列
  • 任务状态变更
  • API调用结果
  • 错误和异常

监控指标

  • 队列中任务数量
  • 各状态任务分布
  • 处理成功率
  • 平均处理时间

故障处理

常见问题

  1. 任务卡在PROCESSING状态

    • 检查外部API是否正常
    • 查看错误日志
    • 手动触发状态检查
  2. 队列积压

    • 检查外部API响应时间
    • 调整检查频率
    • 增加处理线程
  3. 任务超时

    • 检查网络连接
    • 调整超时设置
    • 优化API调用

恢复策略

  1. 自动恢复: 系统会自动重试失败的任务
  2. 手动干预: 通过API接口手动处理
  3. 数据修复: 清理异常状态的任务

性能优化

数据库优化

  • 添加适当的索引
  • 定期清理过期数据
  • 使用分页查询

系统优化

  • 异步处理任务
  • 批量操作
  • 缓存热点数据

扩展功能

未来可扩展的功能

  1. 任务优先级: 支持动态调整优先级
  2. 负载均衡: 多实例部署时的任务分配
  3. 任务依赖: 支持任务间的依赖关系
  4. 通知系统: 任务完成后的消息通知
  5. 统计分析: 详细的性能统计和分析