董哲
Published on 2025-07-29 / 71 Visits
0
0

异步任务处理平台化方案(基于 Redis Stream SDK)

目标

通过封装 Redis Stream 的任务发送与消费机制,构建一套通用异步任务的 SDK,使各微服务可:

● 简单地异步发送执行任务

● 自动消费与处理任务

● 保证消息的可靠投递与处理(ACK + 重试)

● 解耦系统中各服务间的强依赖关系

 

🧱 技术选型

技术组件

用途

Redis Stream

持久化队列,支持 ACK + 消费组

Spring Boot

构建任务服务和 SDK

注解机制

实现业务处理器的自动注册

定时任务

用于 PENDING 消息补偿

 

流程架构图

 以上图例(@TaskHandler调整为@MessageListener),请看下文的示例

SDK 核心能力说明

1. 任务发送(生产者)

ublic interface MessageSenderService {

    /**
     * 发布redis消息,指定stream key, 默认会将用户信息传递过去(用户、租户、Token)
     *
     * @param stream  stream key
     * @param payload 消息体
     */
    <T> void send(String stream, T payload);

    /**
     * 发布redis消息,指定stream key, 默认会将用户信息传递过去(用户、租户、Token)
     *
     * @param stream  stream key
     * @param payload 消息体
     * @param retryTimes 重试次数
     */
    <T> void send(String stream, T payload, int retryTimes);

    /**
     * 发布redis消息,指定stream key
     *
     * @param stream  stream key
     * @param payload 消息体
     * @param ignoreUserInfo 忽略用户信息(用户、租户、Token)
     */
    <T> void send(String stream, T payload, boolean ignoreUserInfo);

    /**
     * 发布redis消息,指定stream key
     *
     * @param stream  stream key
     * @param payload 消息体
     * @param retryTimes 重试次数
     * @param ignoreUserInfo 忽略用户信息(用户、租户、Token)
     */
    <T> void send(String stream, T payload, int retryTimes, boolean ignoreUserInfo);
}

将任务消息写入 Redis Stream

持久化+消费组消费

 

2. 任务消费(消费者)

● SDK 自动启动 Redis Stream 消费线程

● 拉取指定消费组的消息

● 根据 type 字段分发给对应的处理器

● 成功后 XACK,失败自动进入 PENDING

 

3. 自动重试与补偿机制

● SDK 定时扫描 XPENDING

● 对超过 N 秒未确认的消息重新执行(超过3次会通过retryFail方法通知

● 执行成功即 XACK,失败保留在 Pending 中直到重试3次为止

 

各业务服务接入方式

第一步:引入 framework

<dependency>
    <groupId>com.sie.mbm.mom</groupId>
    <artifactId>framework-mq</artifactId>
    <version>3.1.0-SNAPSHOT</version>
</dependency>

第二步:发送任务

注意:发送任务时

    @Resource
    private MessageSenderService messageSenderService;

    @RequestMapping("/send")
    public R<Object> send(@Valid @RequestBody DemoMessageDTO message) {
        messageSenderService.send(DemoConstants.DEMO_STREAM_TEST, message);
        return R.ok();
    }

第三步:注册任务处理器(所有的stream-key如果没有加stream:前缀则会自动加上stream:前缀)

@Slf4j
@Component
@MessageListener(value = DemoConstants.DEMO_STREAM_TEST)
public class MessageConsumeOneTest implements MessageHandlerInterface<DemoMessageDTO> {
    @Override
    public void handle(DemoMessageDTO payload) {
        log.info("MessageConsumeOneTest接收到消息: {}", payload);
        if (BooleanUtil.isTrue(payload.getThrowError())) {
            throw new RuntimeException("测试ThrowError");
        }
    }

    @Override
    public void retryFail(DemoMessageDTO payload) {

    }
}

第四步:启动项目自动创建并注册stream、消费组、消费者;

 

 


Comment