Files
AIGC/demo/TASK_QUEUE_README.md

229 lines
5.1 KiB
Markdown
Raw Normal View History

# 任务队列系统
## 概述
任务队列系统用于管理用户的视频生成任务,实现以下功能:
- **任务限制**: 每个用户最多同时有3个待处理任务
- **定时检查**: 每2分钟自动检查一次任务状态
- **优先级管理**: 支持任务优先级排序
- **状态跟踪**: 完整的任务状态生命周期管理
- **自动清理**: 定期清理过期任务
## 系统架构
### 核心组件
1. **TaskQueue 实体类**: 任务队列数据模型
2. **TaskQueueRepository**: 数据访问层
3. **TaskQueueService**: 业务逻辑层
4. **TaskQueueScheduler**: 定时任务调度器
5. **TaskQueueApiController**: API控制器
### 任务状态
- `PENDING`: 等待处理
- `PROCESSING`: 正在处理
- `COMPLETED`: 已完成
- `FAILED`: 失败
- `CANCELLED`: 已取消
- `TIMEOUT`: 超时
## 使用方法
### 1. 添加任务到队列
```java
// 文生视频任务
taskQueueService.addTextToVideoTask(username, taskId);
// 图生视频任务
taskQueueService.addImageToVideoTask(username, taskId);
```
### 2. 获取用户任务队列
```java
List<TaskQueue> userTasks = taskQueueService.getUserTaskQueue(username);
```
### 3. 取消任务
```java
boolean cancelled = taskQueueService.cancelTask(taskId, username);
```
### 4. 获取统计信息
```java
long totalCount = taskQueueService.getUserTaskCount(username);
```
## API接口
### 获取用户任务队列
```
GET /api/task-queue/user-tasks
Authorization: Bearer <token>
```
### 取消任务
```
POST /api/task-queue/cancel/{taskId}
Authorization: Bearer <token>
```
### 获取队列统计
```
GET /api/task-queue/stats
Authorization: Bearer <token>
```
### 手动处理任务(管理员)
```
POST /api/task-queue/process-pending
Authorization: Bearer <token>
```
### 手动检查状态(管理员)
```
POST /api/task-queue/check-statuses
Authorization: Bearer <token>
```
## 定时任务
### 1. 处理待处理任务
- **频率**: 每30秒
- **功能**: 处理队列中的待处理任务提交到外部API
### 2. 检查任务状态
- **频率**: 每2分钟
- **功能**: 检查正在处理的任务状态,更新任务状态
### 3. 清理过期任务
- **频率**: 每天凌晨2点
- **功能**: 清理超过7天的任务记录
### 4. 队列状态监控
- **频率**: 每5分钟
- **功能**: 记录队列运行状态
## 配置参数
### 任务限制
- 每个用户最多3个待处理任务
- 可通过修改 `MAX_TASKS_PER_USER` 常量调整
### 超时设置
- 默认最大检查次数30次
- 检查间隔2分钟
- 总超时时间60分钟
- 可通过修改 `maxCheckCount` 字段调整
### 清理策略
- 过期时间7天
- 可通过修改 `cleanupExpiredTasks` 方法调整
## 数据库表结构
```sql
CREATE TABLE task_queue (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(100) NOT NULL,
task_id VARCHAR(50) NOT NULL UNIQUE,
task_type ENUM('TEXT_TO_VIDEO', 'IMAGE_TO_VIDEO') NOT NULL,
status ENUM('PENDING', 'PROCESSING', 'COMPLETED', 'FAILED', 'CANCELLED', 'TIMEOUT') NOT NULL DEFAULT 'PENDING',
priority INT NOT NULL DEFAULT 0,
real_task_id VARCHAR(100),
last_check_time DATETIME,
check_count INT NOT NULL DEFAULT 0,
max_check_count INT NOT NULL DEFAULT 30,
error_message TEXT,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
completed_at DATETIME
);
```
## 集成说明
### 现有服务修改
1. **TextToVideoService**:
- 移除了原有的异步处理逻辑
- 改为添加任务到队列
2. **ImageToVideoService**:
- 移除了原有的异步处理逻辑
- 改为添加任务到队列
### 工作流程
1. 用户创建任务 → 保存到数据库 → 添加到队列
2. 定时任务处理队列 → 提交到外部API → 更新状态为PROCESSING
3. 定时任务检查状态 → 查询外部API → 更新任务状态
4. 任务完成/失败 → 更新原始任务状态 → 从队列中移除
## 监控和日志
### 关键日志
- 任务添加到队列
- 任务状态变更
- API调用结果
- 错误和异常
### 监控指标
- 队列中任务数量
- 各状态任务分布
- 处理成功率
- 平均处理时间
## 故障处理
### 常见问题
1. **任务卡在PROCESSING状态**
- 检查外部API是否正常
- 查看错误日志
- 手动触发状态检查
2. **队列积压**
- 检查外部API响应时间
- 调整检查频率
- 增加处理线程
3. **任务超时**
- 检查网络连接
- 调整超时设置
- 优化API调用
### 恢复策略
1. **自动恢复**: 系统会自动重试失败的任务
2. **手动干预**: 通过API接口手动处理
3. **数据修复**: 清理异常状态的任务
## 性能优化
### 数据库优化
- 添加适当的索引
- 定期清理过期数据
- 使用分页查询
### 系统优化
- 异步处理任务
- 批量操作
- 缓存热点数据
## 扩展功能
### 未来可扩展的功能
1. **任务优先级**: 支持动态调整优先级
2. **负载均衡**: 多实例部署时的任务分配
3. **任务依赖**: 支持任务间的依赖关系
4. **通知系统**: 任务完成后的消息通知
5. **统计分析**: 详细的性能统计和分析