SpringBoot+RocketMQ实现异步延时队列案例
  zWz0rlp5WGLR 2023年12月19日 37 0


MQ常量类

public interface MQConstants {

    /**
     * 主题
     */
    interface Topic{
        //延时扣款
        String DELAY_AGAIN_DEDUCTION = "delay_again_deduction";

    }

    /**
     * 延时等级与对应时间
     */
    interface DelayLevel {
        int level_1 = 1;//延时1秒
        int level_2 = 2;//延时5秒
        int level_3 = 3;//延时10秒
        int level_4 = 4;//延时30秒
        int level_5 = 5;//延时1分钟
        int level_6 = 6;//延时2分钟
        int level_7 = 7;//延时3分钟
        int level_8 = 8;//延时4分钟
        int level_9 = 9;//延时5分钟
        int level_10 = 10;//延时6分钟
        int level_11 = 11;//延时7分钟
        int level_12 = 12;//延时8分钟
        int level_13 = 13;//延时9分钟
        int level_14 = 14;//延时10分钟
        int level_15 = 15;//延时20分钟
        int level_16 = 16;//延时30分钟
        int level_17 = 17;//延时1小时
        int level_18 = 18;//延时2小时
    }

}

请求DTO

@Data
@AllArgsConstructor
@NoArgsConstructor
public class BaseMqDTO<T> implements Serializable {

    private static final long serialVersionUID = -1L;

    private T data;     //消息体

    private String messageId;
}

异步延时接口

public interface RemoteSaasMqService {

    /**
     * 【异步】消费处理业务
     * @param topic 主题
     * @param data 数据
     */
    void convertAndSend(String topic, BaseMqDTO<?> data);

    /**
     * 【异步】延时消费处理业务
     * @param topic 主题
     * @param data 数据
     * @param delayLevel 延时等级
     */
    void sendDelayed(String topic, BaseMqDTO<?> data, int delayLevel);
}

异步实现类

@Slf4j
@DubboService
class RocketMQServiceImpl implements RemoteSaasMqService {

    @Lazy
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void convertAndSend(String topic, BaseMqDTO<?> data) {

        rocketMQTemplate.asyncSend(topic, data, new SendCallback() {
            @Override
            public void onSuccess(SendResult res) {
                log.info("convertAndSend - onSuccess - topic:{} data:{} sendResult:{}",topic, JSON.toJSONString(data),JSON.toJSONString(res));
            }
            @Override
            public void onException(Throwable e) {
                log.error("convertAndSend - onException - topic:{} data:{} e:",topic,JSON.toJSONString(data),e);
            }
        });
    }

    @Override
    public void sendDelayed(String topic, BaseMqDTO<?> data, int delayLevel) {
        rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(data).build(), new SendCallback() {
            @Override
            public void onSuccess(SendResult res) {
                log.info("sendDelayed - onSuccess - topic:{} data:{} sendResult:{}",topic,JSON.toJSONString(data),JSON.toJSONString(res));
            }
            @Override
            public void onException(Throwable e) {
                log.error("sendDelayed - onException - topic:{} data:{} e:",topic,JSON.toJSONString(data),e);
            }
        },5000,delayLevel);
    }
}

异步发送请求

remoteSaasMqService.sendDelayed(MQConstants.Topic.DELAY_AGAIN_DEDUCTION,
                        new BaseMqDTO<>(传入你要消费的数据体, UUID.randomUUID().toString()),
                        MQConstants.DelayLevel.level_3);

MQ监听器接受数据

@Slf4j
@Component
@RocketMQMessageListener(topic = MQConstants.Topic.DELAY_AGAIN_DEDUCTION,consumerGroup = "${rocketmq.producer.group}" + "_" + MQConstants.Topic.DELAY_AGAIN_DEDUCTION)
public class DelayDeductionListener implements RocketMQListener<BaseMqDTO<DelayDeductionDTO>> {

    @Override
    public void onMessage(BaseMqDTO<DelayDeductionDTO> data) {
        log.info("onMessage - data:{}", JSON.toJSONString(data));
        //TODO 处理延时递减扣款
        circleDelay(data.getData());
    }
}


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年12月19日 0

暂无评论

推荐阅读
zWz0rlp5WGLR