以下是使用 Python 进行 Kafka 消息发送和消费的示例代码:
首先,确保已经安装了 kafka-python
库,可以使用以下命令进行安装:
pip install kafka-python
- Kafka 消息发送示例:
from kafka import KafkaProducer
# 创建 KafkaProducer 实例
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 发送消息到指定 Topic
topic = 'my_topic'
message = 'Hello, Kafka!'
# 发送消息
producer.send(topic, message.encode())
producer.flush() # 等待消息发送完成
print("消息发送成功")
# 关闭 KafkaProducer
producer.close()
在上述代码中,我们首先创建了一个 KafkaProducer 实例,指定了 Kafka 服务器的地址(bootstrap_servers)。然后,我们定义了要发送的消息内容和要发送到的 Topic。使用 producer.send()
方法将消息发送到指定的 Topic,通过调用 producer.flush()
来确保消息发送完成,并最后关闭 KafkaProducer。
- Kafka 消息消费示例:
from kafka import KafkaConsumer
# 创建 KafkaConsumer 实例
consumer = KafkaConsumer('my_topic', bootstrap_servers='localhost:9092')
# 消费消息
try:
while True:
messages = consumer.poll(timeout_ms=500) # 每500毫秒拉取一次消息
for topic_partition, message_list in messages.items():
for message in message_list:
print(f"收到消息: {message.value.decode()}")
except KeyboardInterrupt:
pass
# 关闭 KafkaConsumer
consumer.close()
在上述代码中,我们创建了一个 KafkaConsumer 实例,并指定要消费的 Topic。通过迭代 KafkaConsumer 对象,我们可以遍历消费者接收到的消息并进行处理。在这个示例中,我们简单地将收到的消息打印出来。
请注意,以上示例中的 Kafka 服务器地址为 'localhost:9092'
,你可以根据实际情况修改为正确的地址。另外,确保主题(Topic)的名称和消费者订阅的主题名称一致。