消息中间件 -RabbitMQ 集成和实战 (三十)
  mS9tsoCLUtbF 2023年11月02日 43 0


与Spring集成

pom文件

<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>

配置文件中增加命名空间

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">

连接相关配置:

<!-- rabbitMQ配置 -->
<bean id="rabbitConnectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="127.0.0.1"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="8"/>
<property name="port" value="5672"></property>
</bean>
<!--Spring的rabbitmq admin-->
<rabbit:admin connection-factory="rabbitConnectionFactory"/>

生产者端

RabbitTemplate

<!-- 创建rabbitTemplate 消息模板类 -->
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="rabbitConnectionFactory"/>
</bean>

或下面这种声明方式也是可以的。

消息中间件 -RabbitMQ 集成和实战 (三十)_订单系统

队列和交换器

可以在生产者配置文件中增加队列和交换器

消息中间件 -RabbitMQ 集成和实战 (三十)_spring_02

代码

发送消息时,使用rabbitTemplate即可。同时还可以给消息配置属性MessageProperties。

@Controller
@RequestMapping("/rabbitmq")
public class RabbitMqController {

private Logger logger = LoggerFactory.getLogger(RabbitMqController.class);

@Autowired
RabbitTemplate rabbitTemplate;

@ResponseBody
@RequestMapping("/fanoutSender")
public String fanoutSender(@RequestParam("message")String message){
String opt="";
try {
for(int i=0;i<3;i++){
String str = "Fanout,the message_"+i+" is : "+message;
logger.info("**************************Send Message:["+str+"]");
rabbitTemplate.send("fanout-exchange","",
new Message(str.getBytes(),new MessageProperties()));
}
opt = "suc";
} catch (Exception e) {
opt = e.getCause().toString();
}
return opt;
}

@ResponseBody
@RequestMapping("/topicSender")
public String topicSender(@RequestParam("message")String message){
String opt="";
try {
String[] severities={"error","info","warning"};
String[] modules={"email","order","user"};
for(int i=0;i<severities.length;i++){
for(int j=0;j<modules.length;j++){
String routeKey = severities[i]+"."+modules[j];
String str = "Topic,the message_["+i+","+j+"] is [rk:"+routeKey+"][msg:"+message+"]";
logger.info("**************************Send Message:["+str+"]");
MessageProperties messageProperties = new MessageProperties();
rabbitTemplate.send("topic_exchange",
routeKey,
new Message(str.getBytes(), messageProperties));
}
}
opt = "suc";
} catch (Exception e) {
opt = e.getCause().toString();
}
return opt;
}

}

消费者端

队列和交换器、

消费者中也可配置队列和交换器,以及指定队列和交换器绑定的路由键

消息中间件 -RabbitMQ 集成和实战 (三十)_订单系统_03

消息中间件 -RabbitMQ 集成和实战 (三十)_订单系统_04

消费者bean

消息中间件 -RabbitMQ 集成和实战 (三十)_持久化_05

消息中间件 -RabbitMQ 集成和实战 (三十)_订单系统_06

都可以

监听容器

将消费者bean和队列联系起来

消息中间件 -RabbitMQ 集成和实战 (三十)_订单系统_07

代码

消费者实现MessageListener接口即可。

实战-应用解耦

场景:

用户下订单买商品,订单处理成功后,去扣减库存,在这个场景里,订单系统是生产者,库存系统是消费者。

库存是必须扣减的,在业务上来说,有库存直接扣减即可,没库存或者低于某个阈值,可以扣减成功,不过要通知其他系统(如通知采购系统尽快采购,通知用户订单系统我们会尽快调货)。

RPC实现

通过RPC的实现,可以看到RPC会造成耦合。一旦库存系统失败,订单系统也会跟着失败。我们希望库存系统本身的失败,不影响订单系统的继续执行,在业务流程上,进行订单系统和库存系统的解耦。

消息中间件的实现

对于我们消息模式的实现,为保证库存必须有扣减,我们要考虑几个问题:

1、订单系统发给Mq服务器的扣减库存的消息必须要被Mq服务器接收到,意味着需要使用发送者确认。

2、Mq服务器在扣减库存的消息被库存服务正确处理前必须一直保存,那么需要消息进行持久化。

3、某个库存服务器出了问题,扣减库存的消息要能够被其他正常的库存服务处理,需要我们自行对消费进行确认,意味着不能使用消费者自动确认,而应该使用手动确认。

所以生产者订单系统这边需要 ,配置文件中队列和交换器进行持久化,消息发送时的持久化,发送者确认的相关配置和代码。

所以消费者库存系统这边要进行手动确认。

总结:与Spring集成时的更多配置

发送者确认

消息中间件 -RabbitMQ 集成和实战 (三十)_持久化_08

设置发送消息时的mandatory以及接收RabbitMQ中间件的应答

消息中间件 -RabbitMQ 集成和实战 (三十)_spring_09

交换器持久化配置

消息中间件 -RabbitMQ 集成和实战 (三十)_spring_10

消息持久化

消息中间件 -RabbitMQ 集成和实战 (三十)_spring_11

队列参数(包括持久化)配置

消息中间件 -RabbitMQ 集成和实战 (三十)_订单系统_12

消费者手动确认消息

消息中间件 -RabbitMQ 集成和实战 (三十)_订单系统_13

处理库存类要实现ChannelAwareMessageListener。

消息中间件 -RabbitMQ 集成和实战 (三十)_持久化_14

和对消息确认或拒绝

消息中间件 -RabbitMQ 集成和实战 (三十)_订单系统_15

 

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  40IdLO25mCaU   2023年11月02日   16   0   0 spring敏感词java
mS9tsoCLUtbF