目标
通过封装 Redis Stream 的任务发送与消费机制,构建一套通用异步任务的 SDK,使各微服务可:
● 简单地异步发送执行任务
● 自动消费与处理任务
● 保证消息的可靠投递与处理(ACK + 重试)
● 解耦系统中各服务间的强依赖关系
🧱 技术选型
流程架构图
以上图例(@TaskHandler调整为@MessageListener),请看下文的示例
SDK 核心能力说明
1. 任务发送(生产者)
ublic interface MessageSenderService {
/**
* 发布redis消息,指定stream key, 默认会将用户信息传递过去(用户、租户、Token)
*
* @param stream stream key
* @param payload 消息体
*/
<T> void send(String stream, T payload);
/**
* 发布redis消息,指定stream key, 默认会将用户信息传递过去(用户、租户、Token)
*
* @param stream stream key
* @param payload 消息体
* @param retryTimes 重试次数
*/
<T> void send(String stream, T payload, int retryTimes);
/**
* 发布redis消息,指定stream key
*
* @param stream stream key
* @param payload 消息体
* @param ignoreUserInfo 忽略用户信息(用户、租户、Token)
*/
<T> void send(String stream, T payload, boolean ignoreUserInfo);
/**
* 发布redis消息,指定stream key
*
* @param stream stream key
* @param payload 消息体
* @param retryTimes 重试次数
* @param ignoreUserInfo 忽略用户信息(用户、租户、Token)
*/
<T> void send(String stream, T payload, int retryTimes, boolean ignoreUserInfo);
}将任务消息写入 Redis Stream
持久化+消费组消费
2. 任务消费(消费者)
● SDK 自动启动 Redis Stream 消费线程
● 拉取指定消费组的消息
● 根据 type 字段分发给对应的处理器
● 成功后 XACK,失败自动进入 PENDING
3. 自动重试与补偿机制
● SDK 定时扫描 XPENDING
● 对超过 N 秒未确认的消息重新执行(超过3次会通过retryFail方法通知)
● 执行成功即 XACK,失败保留在 Pending 中直到重试3次为止
各业务服务接入方式
第一步:引入 framework
<dependency>
<groupId>com.sie.mbm.mom</groupId>
<artifactId>framework-mq</artifactId>
<version>3.1.0-SNAPSHOT</version>
</dependency>
第二步:发送任务
注意:发送任务时
@Resource
private MessageSenderService messageSenderService;
@RequestMapping("/send")
public R<Object> send(@Valid @RequestBody DemoMessageDTO message) {
messageSenderService.send(DemoConstants.DEMO_STREAM_TEST, message);
return R.ok();
}第三步:注册任务处理器(所有的stream-key如果没有加stream:前缀则会自动加上stream:前缀)
@Slf4j
@Component
@MessageListener(value = DemoConstants.DEMO_STREAM_TEST)
public class MessageConsumeOneTest implements MessageHandlerInterface<DemoMessageDTO> {
@Override
public void handle(DemoMessageDTO payload) {
log.info("MessageConsumeOneTest接收到消息: {}", payload);
if (BooleanUtil.isTrue(payload.getThrowError())) {
throw new RuntimeException("测试ThrowError");
}
}
@Override
public void retryFail(DemoMessageDTO payload) {
}
}第四步:启动项目自动创建并注册stream、消费组、消费者;