Kafaka+Redis异步实现商品下单减库存【SpringCloud系列18】
  TEZNKK3IfmPf 2023年11月13日 25 0

SpringCloud 大型系列课程正在制作中,欢迎大家关注与提意见。


首先我这里的服务体系是:(网关认证这些不在此图中)

Kafaka+Redis异步实现商品下单减库存【SpringCloud系列18】

1 前端循环查询订单详情接口

异步下单,前端APP或者小程序调用下单接口,生成订单号
Kafaka+Redis异步实现商品下单减库存【SpringCloud系列18】

然后前端使用这个订单号,跳转到订单详情页面 轮循这个订单详情接口,下单成功显示订单详情。
Kafaka+Redis异步实现商品下单减库存【SpringCloud系列18】
轮循订单详情接口,下单失败,显示错误弹框,然后点击确认退出页面。
Kafaka+Redis异步实现商品下单减库存【SpringCloud系列18】
小程序的下单流程如下

Kafaka+Redis异步实现商品下单减库存【SpringCloud系列18】
那首先是根据 sn 来查询订单详情

  • 查询 Redis 中是否有订单号标识
  • 查询 对应订单号是否下单成功
  • 根据订单号 查询 Redis 缓存中的订单详情
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
     
       
    @Autowired
    private RedisService redisService;
    @Override
    public OrderInfo getOrderInfoBySn(String sn) {
     
       
        Boolean aBoolean = redisService.hasKey(Constant.ORDER_SN + sn);
        if (!aBoolean) {
     
       
            throw new RuntimeException("下单失败 库存不足");
        }
        //1正在处理中
        int statues = Integer.parseInt(redisService.get(Constant.ORDER_SN + sn).toString());
        if (statues == 1) {
     
       
            return null;
        }
        if (statues == 3) {
     
       
            throw new RuntimeException("下单失败 库存不足");
        }
        //缓存订单信息
        OrderInfo orderInfo = (OrderInfo) redisService.get(Constant.ORDER_SN_NORMAL + sn);
        return orderInfo;
    }
}

预下单时会生成订单号,然后将订单号标识保存到Redis中,并设置有效期为1分钟,标识状态如下:

  • 1 队列正在处理中
  • 2 下单成功
  • 3 下单失败

2 订单预下单生成订单号

我这里是在 mini-service 服务中预下单,mini-service 中要集成 kafaka

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

然后添加配置如下:

spring:

  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: Qgchun// 设置默认的生产者消费者所属组id
    template:
      default-topic: ORDER-CREATE-TOPIC

需要注意的是,开发环境要安装kafaka ,然后在预下单的时候发送下单的消息如下:

@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
     
       
    @Autowired
    private SnowFlakeCompone snowFlakeCompone;
    @Autowired
    private RedisService redisService;
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;
    @Value("${spring.kafka.template.default-topic}")
    private String kafkaTopic;

    @Override
    public String asyncCreatOrder(Long userId, Long goodsId) {
     
       
        //判断库存
        Boolean aBoolean = redisService.hasKey(Constant.GOOOD_STOCK + goodsId);
        if (!aBoolean) {
     
       
            throw new RuntimeException("库存不足");
        }
        //雪花算法订单号
        String nextId = snowFlakeCompone.getInstance().nextId() + "";
        //Redis保存预订单号状态 1 为正在处理中 2为下单完成 3为下单失败
        redisService.set(Constant.ORDER_SN + nextId, 1);
        //设置过期时间1分钟
        redisService.expire(Constant.ORDER_SN + nextId, 60);

        OrderMessage message = new OrderMessage();
        message.setSn(nextId);
        message.setUserId(userId);
        message.setGoodsId(goodsId);
        message.setNum(1);//下单商品数量
        //kafka下单消息发送
        kafkaTemplate.send(kafkaTopic, JSON.toJSONString(message));//使用send方法发送消息,需要传入topic名称
        log.info("消息发送至 Kafka 成功");

        return nextId;
    }
}

3 订单服务定义 kafaka 消费者

创建一个消费者,使用 @KafkaListener 来监听对应的通道

@Slf4j
@Component
public class OrderConsumerListen {
     
       


    @Resource
    private OrderKafakaService orderKafakaService;

    @KafkaListener(topics = "ORDER-CREATE-TOPIC")
    public void listen(ConsumerRecord<String, String> record) throws Exception {
     
       
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        // Object -> String
        String message = (String) kafkaMessage.get();
        // 反序列化
        OrderMessage orderMessage = JSON.parseObject(message,OrderMessage.class);
        // 创建订单
        orderKafakaService.consumerTopicToCreateOrderWithKafka(orderMessage);
    }
}

OrderKafakaService 就是实现下单的处理:

  • 校验库存,从 Redis 中获取库存信息
  • 乐观锁更新数据库库存
  • 更新Redis缓存中的库存
  • 创建订单
  • 更新Redis中订单号的 标识,并缓存订单信息
@Service
@Slf4j
public class OrderKafakaServiceImpl extends ServiceImpl<OrderMapper, OrderInfo> implements OrderKafakaService {
     
       

    @Resource
    private GoodsInfoService goodsInfoService;

    @Resource
    RedisService redisService;

    @Override
    @Transactional
    public void consumerTopicToCreateOrderWithKafka(OrderMessage orderMessage) throws Exception {
     
       
        Long goodsId = orderMessage.getGoodsId();
        Long userId = orderMessage.getUserId();
        String sn = orderMessage.getSn();
        Integer num = orderMessage.getNum();
        // 第一步 校验库存,从 Redis 中获取
        GoodsInfo goodsInfo = checkStockWithRedis(goodsId);
        if(goodsInfo==null){
     
       
            //更新下单标识下单失败
            redisService.set(Constant.ORDER_SN+sn,3);
            return;
        }
        // 第二步 乐观锁更新库存和Redis
        boolean update = saleStockOptimsticWithRedis(goodsInfo);
        if(!update){
     
       
            //更新下单标识下单失败
            redisService.set(Constant.ORDER_SN+sn,3);
            return;
        }
        //第三步 创建订单
        GoodsInfo goodsInfo1 = (GoodsInfo) redisService.get(Constant.GOODS_INFO + goodsId);

        OrderInfo order = new OrderInfo();

        order.setSn(sn);
        order.setGoodsId(goodsId);
        order.setUserId(userId);
        order.setNum(num);
        order.setPrice(goodsInfo1.getGoodsPrice().multiply(new BigDecimal(num)));
        order.setName(goodsInfo1.getGoodsName());
        order.setCreateTime(new Date());
        order.setOrderStatues(0);
        boolean res = this.save(order);
        if (!res) {
     
       
            //更新下单标识下单失败
            redisService.set(Constant.ORDER_SN+sn,3);
            return;

        }

        //第四步 更新预下订单标识
        redisService.set(Constant.ORDER_SN+sn,2);
        //未支付取消订单 设置过期时间30分钟
        redisService.expire(Constant.ORDER_SN+sn,60*30);
        //缓存订单信息
        redisService.set(Constant.ORDER_SN_NORMAL+sn,order);

        log.info("Kafka 消费 Topic 创建订单成功");


    }

    // Redis 中校验库存
    private GoodsInfo checkStockWithRedis(Long goodsId) throws Exception {
     
       
        int count = Integer.parseInt(redisService.get(Constant.GOOOD_STOCK + goodsId).toString());
        int sale = Integer.parseInt(redisService.get(Constant.GOOOD_SALE + goodsId).toString());
        int version = Integer.parseInt(redisService.get(Constant.GOOOD_VERSION + goodsId).toString());
        String goodsName = redisService.get(Constant.GOODS_NAME + goodsId).toString();
        if (count < 1) {
     
       
            log.info("库存不足");
            return null;
        }
        GoodsInfo stock = new GoodsInfo();
        stock.setId(goodsId);
        stock.setGoodsStock(count);
        stock.setGoodsSale(sale);
        stock.setVersion(version);
        stock.setGoodsName(goodsName);

        return stock;
    }

    // 更新 DB 和 Redis
    private boolean saleStockOptimsticWithRedis(GoodsInfo goodsInfo) throws Exception {
     
       
        boolean res = goodsInfoService.updateStockByOptimistic(goodsInfo);
        if (!res) {
     
       
           return false;
        }
        // 更新 Redis
        redisService.updateStockWithRedis(goodsInfo);
        return true;
    }

}
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月13日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2024年05月31日   29   0   0 redis用户
  TEZNKK3IfmPf   2024年05月31日   30   0   0 dataredis
  TEZNKK3IfmPf   2024年04月26日   34   0   0 javaRocketMQspringcloud
  TEZNKK3IfmPf   2024年05月31日   27   0   0 awkredis
  TEZNKK3IfmPf   2024年04月19日   34   0   0 springcloud
  TEZNKK3IfmPf   2024年04月19日   39   0   0 javarediskey
TEZNKK3IfmPf