任务系统使用说明
1. 系统概述
任务系统是一个异步任务处理框架,用于处理耗时较长的操作,如数据导入导出、批量处理等。系统采用异步执行方式,确保主线程不被阻塞,提高系统的响应速度和并发能力。
1.1 系统架构
┌─────────────┐ ┌─────────────┐ ┌──────────────┐
│ Task API │────>│ TaskService│────>│ TaskExecutor│
└─────────────┘ └─────────────┘ └──────────────┘
│
▼
┌─────────────┐ ┌─────────────┐ ┌────────────────────┐
│ TaskMapper │<────│ TaskHandler│<────│ TaskHandlerFactory│
└─────────────┘ └─────────────┘ └────────────────────┘
2. 核心组件
2.1 任务实体 (Task)
任务实体是任务系统的核心数据结构,包含以下主要字段:
id: 任务主键IDtaskName: 任务名称taskType: 任务类型,用于区分不同业务的任务taskStatus: 任务状态taskParam: 任务参数,存储任务执行所需的参数,建议使用JSON格式taskResult: 任务执行结果,存储任务成功后的返回数据errorMessage: 错误信息,任务执行失败时存储具体的异常信息startTime: 任务开始时间endTime: 任务结束时间duration: 任务执行时长(毫秒)
2.2 任务处理器 (ITaskHandler)
任务处理器是具体业务逻辑的实现者,需要实现以下方法:
getTaskType(): 返回任务类型,用于注册和识别execute(Object params): 执行具体任务逻辑,返回执行结果
2.3 任务处理器工厂 (TaskHandlerFactory)
任务处理器工厂负责管理和获取任务处理器,根据任务类型返回对应的处理器实例。在初始化时会检查重复的任务类型并记录警告日志。
2.4 任务执行器 (TaskExecutor)
任务执行器负责异步执行任务,处理任务的生命周期管理,包括:
- 任务状态管理
- 分布式锁处理(使用Redis实现)
- 异常处理
- 结果收集
- 锁自动续期
2.5 任务服务 (TaskService)
任务服务提供任务的创建、查询、状态更新等功能。
3. 快速开始
3.1 创建自定义任务处理器
- 实现
ITaskHandler接口 - 使用
@Component注解标记为Spring组件 - 实现
getTaskType()方法返回唯一的任务类型 - 实现
execute()方法处理具体业务逻辑
示例代码:
@Component
public class MyTaskHandler implements ITaskHandler {
@Override
public String getTaskType() {
return "MY_TASK";
}
@Override
public Object execute(Object params) throws Exception {
// 处理具体业务逻辑
System.out.println("执行任务,参数:" + params);
// 模拟处理时间
Thread.sleep(2000);
return "任务执行成功";
}
}
3.2 创建任务
通过 TaskService.createTask() 方法创建任务,或通过REST API创建。
示例代码:
TaskCreateDTO taskCreateDTO = new TaskCreateDTO();
taskCreateDTO.setTaskName("测试任务");
taskCreateDTO.setTaskType("MY_TASK");
taskCreateDTO.setTaskParam( );
Long taskId = taskService.createTask(taskCreateDTO);
3.3 查询任务状态
通过 TaskService.getTaskById() 方法查询任务状态,或通过REST API查询。
示例代码:
TaskViewDTO taskViewDTO = taskService.getTaskById(taskId);
System.out.println("任务状态:" + taskViewDTO.getTaskStatus());
3.4 重试失败任务
通过 TaskService.retryTask() 方法重试失败的任务,或通过REST API重试。
示例代码:
bolean success = taskService.retryTask(taskId);
System.out.println("重试任务结果:" + success);
4. API接口
4.1 创建任务
- URL:
/sys/v1/task - 方法: POST
- 参数:
TaskCreateDTO - 返回: 任务ID
示例请求:
{
"taskName": "测试任务",
"taskType": "MY_TASK",
"taskParam": "{\"userId\": 123, \"name\": \"测试\"}"
}
4.2 查询任务
- URL:
/sys/v1/task/get/{id} - 方法: GET
- 参数: 任务ID
- 返回: 任务详情
4.3 获取任务列表
- URL:
/sys/v1/task/list - 方法: GET
- 返回: 任务列表
4.4 分页查询任务
- URL:
/sys/v1/task/page - 方法: POST
- 参数:
TaskPageDTO - 返回: 分页结果
示例请求:
{
"pageNo": 1,
"pageSize": 10,
"taskType": "MY_TASK",
"taskStatus": "SUCCESS"
}
4.5 重试任务
- URL:
/sys/v1/task/retry/{id} - 方法: POST
- 参数: 任务ID
- 返回: 是否重试成功
4.6 获取任务结果
- URL:
/sys/v1/task/get/result/{id} - 方法: GET
- 参数: 任务ID
- 返回: 任务执行结果
5. 任务状态
任务状态枚举 (TaskStatusEnum) 包括:
PENDING: 待执行RUNNING: 执行中SUCCESS: 执行成功FAILED: 执行失败
6. 执行流程
- 创建任务: 调用
TaskService.createTask()创建任务,状态设置为PENDING - 任务调度: 系统自动调度任务执行
- 获取锁: 任务执行器尝试获取分布式锁,确保任务不被重复执行
- 状态更新: 获取锁成功后,将任务状态更新为
RUNNING - 执行任务: 调用对应任务处理器的
execute()方法执行具体业务逻辑 - 结果处理: 根据执行结果更新任务状态为
SUCCESS或FAILED - 释放锁: 执行完成后释放分布式锁
7. 最佳实践
7.1 任务处理器实现
- 任务类型唯一性: 确保每个任务处理器的
getTaskType()方法返回唯一的任务类型 - 异常处理: 合理处理任务执行过程中的异常,确保任务能够正确记录失败原因
- 参数格式: 使用JSON格式传递任务参数,便于序列化和反序列化
- 结果处理: 任务执行结果应简洁明了,便于前端展示
7.2 任务参数设计
- 参数结构: 建议使用JSON对象作为任务参数,包含所有必要的执行信息
- 参数验证: 在任务处理器中对参数进行验证,确保参数的有效性
- 参数大小: 避免传递过大的参数,建议不超过1MB
7.3 性能优化
- 任务拆分: 将大型任务拆分为多个小型任务,提高执行效率
- 并行执行: 对于独立的任务,考虑使用并行执行
- 资源管理: 合理管理任务执行过程中的资源,避免资源泄漏
- 超时处理: 为长时间运行的任务设置合理的超时时间
8. 示例场景
8.1 数据导入
@Component
public class ImportDataTaskHandler implements ITaskHandler {
@Override
public String getTaskType() {
return "IMPORT_DATA";
}
@Override
public Object execute(Object params) throws Exception {
// 解析导入参数
Map<String, Object> paramMap = (Map<String, Object>) params;
String filePath = (String) paramMap.get("filePath");
Long userId = (Long) paramMap.get("userId");
// 执行数据导入逻辑
int successCount = 0;
int failCount = 0;
// 模拟导入过程
Thread.sleep(5000);
successCount = 100;
failCount = 2;
// 返回导入结果
return String.format("导入成功,共导入 %d 条记录,失败 %d 条", successCount, failCount);
}
}
8.2 数据导出
@Component
public class ExportDataTaskHandler implements ITaskHandler {
@Override
public String getTaskType() {
return "EXPORT_DATA";
}
@Override
public Object execute(Object params) throws Exception {
// 解析导出参数
Map<String, Object> paramMap = (Map<String, Object>) params;
String exportType = (String) paramMap.get("exportType");
Long userId = (Long) paramMap.get("userId");
// 执行数据导出逻辑
// 生成文件
// 上传到OSS
// 模拟处理时间
Thread.sleep(3000);
// 返回文件下载链接
return "https://example.com/export/20260313.xlsx";
}
}
9. 监控与维护
- 任务列表: 通过
/sys/v1/task/list接口查看所有任务 - 任务详情: 通过
/sys/v1/task/get/{id}接口查看任务详情 - 失败任务: 通过分页查询接口,筛选状态为
FAILED的任务 - 任务重试: 对失败的任务,可以通过
/sys/v1/task/retry/{id}接口进行重试 - 执行日志: 查看系统日志了解任务执行情况
10. 扩展与定制
10.1 自定义任务处理器
按照 ITaskHandler 接口规范实现自定义任务处理器,注册到Spring容器即可。
10.2 任务参数扩展
可以根据业务需要扩展 TaskCreateDTO,添加更多字段。
10.3 任务结果处理
任务执行结果会存储在 taskResult 字段中,可以根据业务需要进行解析和处理。
10.4 分布式锁配置
可以通过修改 TaskConstant 中的锁过期时间和续期间隔来适应不同的任务执行场景。
11. 常见问题
11.1 任务执行失败
- 检查任务处理器的
execute方法是否抛出异常 - 检查任务参数是否正确
- 检查系统资源是否充足
- 检查分布式锁是否正常工作
11.2 任务重复执行
- 检查分布式锁配置是否正确
- 检查任务处理器的
getTaskType方法返回值是否唯一 - 检查系统是否存在多个实例同时执行任务
11.3 任务执行时间过长
- 考虑优化任务逻辑
- 调整锁过期时间和续期间隔
- 考虑将大任务拆分为多个小任务
- 检查系统资源使用情况
11.4 任务处理器未找到
- 检查任务处理器是否正确实现了
ITaskHandler接口 - 检查任务处理器是否使用了
@Component注解 - 检查任务类型是否正确
12. 总结
任务系统为应用提供了一个可靠的异步处理框架,适用于各种耗时操作。通过实现自定义任务处理器,可以轻松扩展系统功能,满足不同业务场景的需求。系统支持分布式环境,确保任务的可靠执行和状态管理。
13. 代码结构
task/
├── constant/ # 常量定义
│ └── TaskConstant.java
├── controller/ # 控制器
│ └── TaskController.java
├── convert/ # 转换器
│ └── TaskConvert.java
├── dto/ # 数据传输对象
│ ├── TaskCreateDTO.java
│ ├── TaskPageDTO.java
│ └── TaskViewDTO.java
├── entity/ # 实体类
│ └── Task.java
├── enums/ # 枚举类
│ └── TaskStatusEnum.java
├── executor/ # 执行器
│ └── TaskExecutor.java
├── handler/ # 任务处理器
│ ├── impl/ # 处理器实现
│ │ ├── ExportDataTaskHandler.java
│ │ └── ImportDataTaskHandler.java
│ ├── ITaskHandler.java
│ └── TaskHandlerFactory.java
├── init/ # 初始化
│ └── TaskInitializer.java
├── mapper/ # 数据访问
│ └── TaskMapper.java
├── service/ # 服务
│ ├── impl/ # 服务实现
│ │ └── TaskServiceImpl.java
│ └── TaskService.java
└── README.md # 文档