与Spring集成
pom文件
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
配置文件中增加命名空间
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
连接相关配置:
<!-- rabbitMQ配置 -->
<bean id="rabbitConnectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="127.0.0.1"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="8"/>
<property name="port" value="5672"></property>
</bean>
<!--Spring的rabbitmq admin-->
<rabbit:admin connection-factory="rabbitConnectionFactory"/>
生产者端
RabbitTemplate
<!-- 创建rabbitTemplate 消息模板类 -->
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="rabbitConnectionFactory"/>
</bean>
或下面这种声明方式也是可以的。
队列和交换器
可以在生产者配置文件中增加队列和交换器
代码
发送消息时,使用rabbitTemplate即可。同时还可以给消息配置属性MessageProperties。
@Controller
@RequestMapping("/rabbitmq")
public class RabbitMqController {
private Logger logger = LoggerFactory.getLogger(RabbitMqController.class);
@Autowired
RabbitTemplate rabbitTemplate;
@ResponseBody
@RequestMapping("/fanoutSender")
public String fanoutSender(@RequestParam("message")String message){
String opt="";
try {
for(int i=0;i<3;i++){
String str = "Fanout,the message_"+i+" is : "+message;
logger.info("**************************Send Message:["+str+"]");
rabbitTemplate.send("fanout-exchange","",
new Message(str.getBytes(),new MessageProperties()));
}
opt = "suc";
} catch (Exception e) {
opt = e.getCause().toString();
}
return opt;
}
@ResponseBody
@RequestMapping("/topicSender")
public String topicSender(@RequestParam("message")String message){
String opt="";
try {
String[] severities={"error","info","warning"};
String[] modules={"email","order","user"};
for(int i=0;i<severities.length;i++){
for(int j=0;j<modules.length;j++){
String routeKey = severities[i]+"."+modules[j];
String str = "Topic,the message_["+i+","+j+"] is [rk:"+routeKey+"][msg:"+message+"]";
logger.info("**************************Send Message:["+str+"]");
MessageProperties messageProperties = new MessageProperties();
rabbitTemplate.send("topic_exchange",
routeKey,
new Message(str.getBytes(), messageProperties));
}
}
opt = "suc";
} catch (Exception e) {
opt = e.getCause().toString();
}
return opt;
}
}
消费者端
队列和交换器、
消费者中也可配置队列和交换器,以及指定队列和交换器绑定的路由键
消费者bean
或
都可以
监听容器
将消费者bean和队列联系起来
代码
消费者实现MessageListener接口即可。
实战-应用解耦
场景:
用户下订单买商品,订单处理成功后,去扣减库存,在这个场景里,订单系统是生产者,库存系统是消费者。
库存是必须扣减的,在业务上来说,有库存直接扣减即可,没库存或者低于某个阈值,可以扣减成功,不过要通知其他系统(如通知采购系统尽快采购,通知用户订单系统我们会尽快调货)。
RPC实现
通过RPC的实现,可以看到RPC会造成耦合。一旦库存系统失败,订单系统也会跟着失败。我们希望库存系统本身的失败,不影响订单系统的继续执行,在业务流程上,进行订单系统和库存系统的解耦。
消息中间件的实现
对于我们消息模式的实现,为保证库存必须有扣减,我们要考虑几个问题:
1、订单系统发给Mq服务器的扣减库存的消息必须要被Mq服务器接收到,意味着需要使用发送者确认。
2、Mq服务器在扣减库存的消息被库存服务正确处理前必须一直保存,那么需要消息进行持久化。
3、某个库存服务器出了问题,扣减库存的消息要能够被其他正常的库存服务处理,需要我们自行对消费进行确认,意味着不能使用消费者自动确认,而应该使用手动确认。
所以生产者订单系统这边需要 ,配置文件中队列和交换器进行持久化,消息发送时的持久化,发送者确认的相关配置和代码。
所以消费者库存系统这边要进行手动确认。
总结:与Spring集成时的更多配置
发送者确认
设置发送消息时的mandatory以及接收RabbitMQ中间件的应答
交换器持久化配置
消息持久化
队列参数(包括持久化)配置
消费者手动确认消息
处理库存类要实现ChannelAwareMessageListener。
和对消息确认或拒绝