RabbitMQ-直连模式
  TEZNKK3IfmPf 2023年11月14日 41 0

RabbitMQ-直连模式

 

  • 在上图的模型中,有以下概念
  • ​P​​:生产者,也就是要发送消息的程序
  • ​C​​:消费者,消息的接受者,会一直等待消息的到来
  • ​Queue​​:消息队列,图中蓝色部分
  • 类似一个邮箱,可以缓存消息
  • 生产者向其中投递消息,消费者从其中取出消息

创建生产者生产消息

  • 代码如下:
/**
* @author: BNTang
**/
public class Producer {

@Test
public void sendMessage() throws Exception {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();

// 2.设置连接相关信息的参数配置
// RabbitMQ 服务器地址
connectionFactory.setHost("192.168.0.130");
// RabbitMQ 服务器数据交互端口号
connectionFactory.setPort(5672);
// RabbitMQ 服务器用户名
connectionFactory.setUsername("user");
// RabbitMQ 服务器密码
connectionFactory.setPassword("123456");
// RabbitMQ 服务器虚拟主机
connectionFactory.setVirtualHost("v-it6666");

// 3.从连接工厂里面创建一个连接
Connection connection = connectionFactory.newConnection();

// 4.创建通道
Channel channel = connection.createChannel();

// 5.绑定队列
channel.queueDeclare("hello", true, false, false, null);

// 6.发送消息
channel.basicPublish("", "hello", null, "Hello RabbitMQ".getBytes());

// 7.释放资源,关闭通道和连接
channel.close();
connection.close();

System.out.println("message send success");
}
}
  • 运行测试结果如下:

RabbitMQ-直连模式

  • 在看看 RabbitMQ 队列中怎么样
  • 已经有了,说明成功了

RabbitMQ-直连模式

参数详解

  • ​queueDeclare​​ 方法
  • 参数1:队列名,如果发送消息时,队列在 RabbitMQ 中不存在,它会自动创建一个
  • 参数2:是否持久化,如果为​​false​​ 当 RabbitMQ 重启时,消息会丢失
  • 参数3:是否独享,​​true​​​ 代表只有当前的​​connection​​ 可以访问这个队列
  • 参数4:是否自动删除,是否用完之后就删除
  • 参数5:其他属性
  • ​basicPublish​​ 方法
  • 参数1:交换机名称,暂时用不到,因为现在是直连,所以不用经过交换机
  • 参数2:队列名
  • 参数3:基础参数,是否持久化
  • 参数4:消息的具体内容
  • 如果只设置了​​队列​​ 的持久化,消息默认是不会持久化的

创建消费者消费消息

  • 代码如下:
/**
* @author: BNTang
**/
public class Consumer {

@Test
public void receiveMessage() throws Exception {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();

// 2.设置连接相关信息的参数配置
// RabbitMQ 服务器地址
connectionFactory.setHost("192.168.0.130");
// RabbitMQ 服务器数据交互端口号
connectionFactory.setPort(5672);
// RabbitMQ 服务器用户名
connectionFactory.setUsername("user");
// RabbitMQ 服务器密码
connectionFactory.setPassword("123456");
// RabbitMQ 服务器虚拟主机
connectionFactory.setVirtualHost("v-it6666");

// 3.从连接工厂里面创建一个连接
Connection connection = connectionFactory.newConnection();

// 4.创建通道
Channel channel = connection.createChannel();

// 5.绑定队列
channel.queueDeclare("hello", true, false, false, null);

// 6.接收消息
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("get message success " + new String(body));
}
});

// 7.不能让程序结束
System.in.read();

// 8.释放资源,关闭通道和连接
channel.close();
connection.close();
}

}

 

  • 这种就是一个​​点对点​​ 的发送和消费
  • 一个生产者,一个消费者,可以用于登陆发送短信验证码等功能

抽取工具类

  • 抽取之后的工具类代码如下:
/**
* @author: BNTang
*/
public class RabbitMQUtil {

private static ConnectionFactory connectionFactory;

static {
connectionFactory = new ConnectionFactory();

// RabbitMQ 服务器地址
connectionFactory.setHost("192.168.0.130");
// RabbitMQ 服务器数据交互端口号
connectionFactory.setPort(5672);
// RabbitMQ 服务器用户名
connectionFactory.setUsername("user");
// RabbitMQ 服务器密码
connectionFactory.setPassword("123456");
// RabbitMQ 服务器虚拟主机
connectionFactory.setVirtualHost("v-it6666");
}

/**
* 提供一个获取连接的方法
*
* @return
*/
public static Connection getConnection() {
try {
// 从连接工厂里面创建一个连接
Connection connection = connectionFactory.newConnection();
return connection;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

/**
* 提供一个可以关闭通道和关闭连接的方法
*
* @param channel
* @param connection
*/
public static void closeChannelAndConnection(Channel channel, Connection connection) {
try {
if (null != channel) channel.close();
if (null != connection) connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

修改生产者

  • 修改之后的代码如下:
/**
* @author: BNTang
**/
public class Producer {

@Test
public void sendMessage() throws Exception {
// 1.创建连接工厂
// 2.设置连接相关信息的参数配置
// 3.从连接工厂里面创建一个连接
Connection connection = RabbitMQUtil.getConnection();

// 4.创建通道
Channel channel = connection.createChannel();

// 5.绑定队列
channel.queueDeclare("hello", true, false, false, null);

// 6.发送消息
channel.basicPublish("", "hello", null, "Hello RabbitMQ".getBytes());

// 7.释放相关资源
RabbitMQUtil.closeChannelAndConnection(channel, connection);

System.out.println("message send success");
}
}

修改消费者

  • 修改之后的代码如下:
/**
* @author: BNTang
**/
public class Consumer {

@Test
public void receiveMessage() throws Exception {
// 1.创建连接工厂
// 2.设置连接相关信息的参数配置
// 3.从连接工厂里面创建一个连接
Connection connection = RabbitMQUtil.getConnection();

// 4.创建通道
Channel channel = connection.createChannel();

// 5.绑定队列
channel.queueDeclare("hello", true, false, false, null);

// 6.接收消息
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("get message success " + new String(body));
}
});

// 7.不能让程序结束
System.in.read();

// 8.释放相关资源
RabbitMQUtil.closeChannelAndConnection(channel, connection);
}

}

如果只是设置了队列的持久化,消息默认的是不会持久化的

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月14日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2024年05月31日   30   0   0 服务器
  TEZNKK3IfmPf   2024年05月17日   41   0   0 linux服务器
  TEZNKK3IfmPf   2024年05月31日   51   0   0 linux服务器
  TEZNKK3IfmPf   2024年05月31日   29   0   0 linux服务器centos
  TEZNKK3IfmPf   2024年05月31日   42   0   0 服务器java
  TEZNKK3IfmPf   2024年05月31日   37   0   0 服务器http
TEZNKK3IfmPf