RabbitMQ教程:AMQP模型详解(python发送消息、消费消息)
  0SnbOly3LC5t 2023年12月06日 27 0


重要概念

  • 消息
  • 生产者
  • 交换机
  • 队列
  • 绑定
  • 消费者
  • 路由键
  • ACK

在 RabbitMQ 中,AMQP(Advanced Message Queuing Protocol)模型涉及到以下几个重要的概念和角色:

  1. 消息(Message):消息是 AMQP 的基本单元,它是应用程序之间进行通信的数据包。消息通常包含要传递的有效负载数据以及关联的元数据(如标签、优先级等)。
  2. 生产者(Producer):生产者是发送消息到 RabbitMQ 的应用程序。它将消息发布到交换机(Exchange),并指定一个路由键(Routing Key)来描述消息的目的地。
  3. 交换机(Exchange):交换机是消息的接收和分发中心。生产者将消息发布到交换机,然后交换机根据特定的规则将消息路由到一个或多个队列(Queue)。
  4. 队列(Queue):队列是存储消息的缓冲区。消费者从队列中获取消息并进行处理。队列可以绑定到一个或多个交换机,接收交换机路由的消息。
  5. 绑定(Binding):绑定是交换机和队列之间的关联。它定义了交换机将消息路由到哪些队列上。绑定通常包括交换机名称、队列名称和路由键。
  6. 消费者(Consumer):消费者是从队列中接收消息并进行处理的应用程序。它订阅队列并等待消息的到达。一旦消息可用,消费者会将其从队列中取出,并执行相应的处理逻辑。
  7. 路由键(Routing Key):路由键是生产者在发布消息时指定的关键字。交换机根据路由键的值将消息路由到一个或多个队列。
  8. ACK(Acknowledgement):ACK 是消费者向 RabbitMQ 确认已成功处理消息的机制。一旦消费者处理完消息,它会向 RabbitMQ 发送 ACK,以告知 RabbitMQ 可以将该消息标记为已消费。

AMQP 模型的工作流程如下:

  1. 生产者将消息发布到交换机,并指定一个路由键。
  2. 交换机根据路由键将消息路由到一个或多个绑定的队列。
  3. 消费者订阅队列,并等待消息的到达。
  4. 一旦消息可用,消费者从队列中取出消息,并进行处理。
  5. 消费者处理完消息后,发送 ACK 给 RabbitMQ,表示消息已成功处理。

通过 AMQP 模型,应用程序可以实现解耦和异步通信,提高系统的可靠性和灵活性。RabbitMQ 提供了丰富的功能和配置选项,使得消息的路由、排队和处理变得更加灵活和可靠。

带标签和优先级的消息发送代码示例

下面是一个使用 RabbitMQ 发送带标签和优先级的消息的 Python 代码示例:

import pika

# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 定义交换机和队列
exchange_name = 'my_exchange'
queue_name = 'my_queue'
channel.exchange_declare(exchange=exchange_name, exchange_type='direct')
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=queue_name)

# 定义消息的属性
message_properties = pika.BasicProperties(
    delivery_mode=2,  # 设置消息持久化,确保消息在 RabbitMQ 服务器重启后不会丢失
    priority=1       # 设置消息优先级,值越高表示优先级越高
)

message_body = 'Hello, RabbitMQ!'
routing_key = queue_name

# 发送带标签和优先级的消息
channel.basic_publish(
    exchange=exchange_name,
    routing_key=routing_key,
    body=message_body,
    properties=message_properties
)

print("消息已发送")

# 关闭与 RabbitMQ 服务器的连接
connection.close()

在上述代码中,我们首先建立了与 RabbitMQ 服务器的连接,然后声明了一个交换机和队列,并将它们进行绑定。接下来,我们创建了一个 pika.BasicProperties 对象,设置了消息的属性,包括消息持久化和优先级。然后,使用 channel.basic_publish() 方法发送消息,指定交换机、路由键、消息内容和属性。

请注意,该示例中的 RabbitMQ 服务器地址为 'localhost',你可以根据实际情况修改为正确的地址。另外,该示例仅展示了消息的发送部分,接收消息的代码并未包含在内。

通过设置消息的标签和优先级,你可以在 RabbitMQ 中控制消息的路由和处理顺序,确保高优先级的消息能够更快地被消费者处理。

消费消息

以下是使用 RabbitMQ 消费带标签和优先级的消息的 Python 代码示例:

import pika

# 回调函数,处理接收到的消息
def callback(ch, method, properties, body):
    print("收到消息:", body.decode())
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 发送确认 ACK

# 建立与 RabbitMQ 服务器的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 定义队列
queue_name = 'my_queue'
channel.queue_declare(queue=queue_name)

# 指定消费者从队列中获取消息,并设置手动确认模式
channel.basic_consume(queue=queue_name, on_message_callback=callback)

# 开始消费消息
print("等待消息...")
channel.start_consuming()

在上述代码中,我们首先建立了与 RabbitMQ 服务器的连接,并声明了一个队列。然后,定义了一个回调函数 callback,用于处理接收到的消息。在回调函数中,我们打印出收到的消息,并发送确认 ACK 给 RabbitMQ,表示消息已成功处理。

接下来,使用 channel.basic_consume() 方法指定消费者从队列中获取消息,并将回调函数 callback 作为参数传入。最后,调用 channel.start_consuming() 开始消费消息,此时消费者会一直等待并处理接收到的消息。

请注意,该示例中的 RabbitMQ 服务器地址为 'localhost',你可以根据实际情况修改为正确的地址。另外,该示例仅展示了消息的消费部分,发送消息的代码并未包含在内。

通过以上代码,你可以编写一个消费者程序,从队列中接收带标签和优先级的消息,并根据实际需求进行相应的处理操作。


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年12月06日 0

暂无评论

推荐阅读
0SnbOly3LC5t