python实现kafaka消息的发送和消费代码示例
  0SnbOly3LC5t 2023年11月15日 45 0


以下是使用 Python 进行 Kafka 消息发送和消费的示例代码:

首先,确保已经安装了 kafka-python 库,可以使用以下命令进行安装:

pip install kafka-python
  1. Kafka 消息发送示例:
from kafka import KafkaProducer

# 创建 KafkaProducer 实例
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# 发送消息到指定 Topic
topic = 'my_topic'
message = 'Hello, Kafka!'

# 发送消息
producer.send(topic, message.encode())
producer.flush()  # 等待消息发送完成
print("消息发送成功")

# 关闭 KafkaProducer
producer.close()

在上述代码中,我们首先创建了一个 KafkaProducer 实例,指定了 Kafka 服务器的地址(bootstrap_servers)。然后,我们定义了要发送的消息内容和要发送到的 Topic。使用 producer.send() 方法将消息发送到指定的 Topic,通过调用 producer.flush() 来确保消息发送完成,并最后关闭 KafkaProducer。

  1. Kafka 消息消费示例:
from kafka import KafkaConsumer

# 创建 KafkaConsumer 实例
consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092')

# 消费消息
try:
    while True:
        messages = consumer.poll(timeout_ms=500)  # 每500毫秒拉取一次消息
        for topic_partition, message_list in messages.items():
            for message in message_list:
                print(f"收到消息: {message.value.decode()}")

except KeyboardInterrupt:
    pass

# 关闭 KafkaConsumer
consumer.close()

在上述代码中,我们创建了一个 KafkaConsumer 实例,并指定要消费的 Topic。通过迭代 KafkaConsumer 对象,我们可以遍历消费者接收到的消息并进行处理。在这个示例中,我们简单地将收到的消息打印出来。

请注意,以上示例中的 Kafka 服务器地址为 'localhost:9092',你可以根据实际情况修改为正确的地址。另外,确保主题(Topic)的名称和消费者订阅的主题名称一致。


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月15日 0

暂无评论

推荐阅读
  KmYlqcgEuC3l   9天前   19   0   0 Python
0SnbOly3LC5t