pika
  WgxE7ox3LqRL 2023年11月05日 28 0

生产者代码

# -*- coding: utf-8 -*-
# pylint: disable=C0111,C0103,R0205

import json
import pika
from pika.exchange_type import ExchangeType

print('pika version: %s' % pika.__version__)

def main():
    credentials = pika.PlainCredentials('root', 'root')
    parameters = pika.ConnectionParameters('192.168.133.11', credentials=credentials)
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    channel.exchange_declare(  # 创建交换机
        exchange='test_exchange',
        exchange_type=ExchangeType.direct,
        passive=False,
        durable=True,
        auto_delete=False)
    channel.queue_declare(queue='standard')  # 创建队列
    channel.queue_bind(  # 队列绑定
        queue='standard', exchange='test_exchange', routing_key='standard_key')
    # 发布消息
    channel.basic_publish(
        exchange='test_exchange',
        routing_key='standard_key',
        body=json.dumps("hello world"),
        properties=pika.BasicProperties(content_type='application/json'))

    connection.close()

if __name__ == '__main__':
    main()

消费者

import functools
import logging
import time

import pika
from pika.exchange_type import ExchangeType

LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
              '-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)

logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)

MAX_RETRY_COUNT = 3
retry_count = 0

def on_message(chan, method_frame, header_frame, body, userdata=None):
    """Called when a message is received. Log message and ack it."""
    LOGGER.info('Delivery properties: %s, message metadata: %s', method_frame, header_frame)
    LOGGER.info('Userdata: %s, message body: %s', userdata, json.loads(body))
    LOGGER.info("chan>>>>>>>>>>>>>>>>>>{0}, type:{1}".format(chan, type(chan))) # BlockingChannel
    # 1.关闭自动回执,当auto_ack为false(默认就为false),需要自己手动回执即调用chan.basic_ack()方法
    # chan.basic_ack(delivery_tag=method_frame.delivery_tag)
    # 2.自动回执,当auto_ack为true时,自动回执,不需要再调用chan.basic_ack()方法

    # 设置nack回执,即拒绝消费,然后重入到队列,requeue=True为默认值, 会一直重试,requeue=False时,丢弃消息
    # chan.basic_nack(delivery_tag=method_frame.delivery_tag, requeue=True)

    # 本地重试机制,最大3次
    try:
        # 模拟错误
        print(1/0)
    except Exception as e:
        global retry_count
        if retry_count < MAX_RETRY_COUNT:
            # 处理失败,重新放回队列
            chan.basic_recover(requeue=True)
            retry_count += 1
            print(f"重试次数:{retry_count}")
            time.sleep(5) # 模拟延迟
        else:
            # 超过最大重试次数,不再消费
            print("超过最大重试次数")
            # 发送拒绝消费回执,同时设置requeue为False,将消息丢弃
            chan.basic_nack(delivery_tag=method_frame.delivery_tag, requeue=False)
            # 将该消息发送到其他队列处理
            chan.basic_publish(exchange='test_exchange',
                routing_key='handle_err',
                body=body,
                properties=pika.BasicProperties(content_type='application/json'))
            chan.stop_consuming()  # 停止消费


def main():
    """Main method."""
    credentials = pika.PlainCredentials('root', 'root')
    parameters = pika.ConnectionParameters('192.168.133.11', credentials=credentials)
    connection = pika.BlockingConnection(parameters)

    channel = connection.channel() # 创建信道
    channel.exchange_declare(  # 创建交换机
        exchange='test_exchange',
        exchange_type=ExchangeType.direct,
        passive=False,
        durable=True,
        auto_delete=False)
    channel.queue_declare(queue='dur_standard', durable=True) # 创建队列
    channel.queue_bind(  # 队列绑定
        queue='dur_standard', exchange='test_exchange', routing_key='standard_key')
    channel.queue_bind(  # 队列绑定
        queue='handle_err_queue', exchange='test_exchange', routing_key='handle_err')
    channel.basic_qos(prefetch_count=1)

    on_message_callback = functools.partial(
        on_message, userdata='on_message_userdata')
    # 消费者消费消息后的回调函数on_message_callback
    # basic_consume中有auto_ack字段,默认为false,即不自动ack。
    channel.basic_consume('dur_standard', on_message_callback)

    try:
        # 开始消费
        channel.start_consuming()
    except KeyboardInterrupt:
        # 停止消费
        channel.stop_consuming()

    connection.close()


if __name__ == '__main__':
    main()

-------------------------------------------

个性签名:代码过万,键盘敲烂!!!

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
WgxE7ox3LqRL
作者其他文章 更多

2023-12-26

2023-12-26

2023-12-26

2023-12-26

2023-12-26

2023-12-26

2023-12-26

2023-12-12

2023-11-19