1、创建topic
# replication-factor表示该topic需要在不同的broker中保存几份 # replication-factor必须小于等于 broker 的个数
例如:
kafka-topics.sh --zookeeper node3:2181/kafka0.11 --create --topic mykafka1 --partitions 3 --replication-factor 2
# 列出全部的topic kafka-topics.sh --zookeeper node3:2181/kafka0.11 --list # 查看topic的状态 kafka-topics.sh --zookeeper node3:2181/kafka0.11 --describe --topic mykafka1
备注:--zookeeper node3:2181/kafka0.11 必须指明 ISR(In-Sync Replicas)副本同步队列
2、创建生产者
kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9093 --topic mykafka1
并输入一些信息:xxx yyy zzz
3、创建消费者
kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9093 --topic mykafka1
# 旧版本的命令(高版本中将被废弃)
kafka-console-consumer.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka0.11 --topic mykafka1
4、查询 topic 的所有记录数
kafka-run-class.sh kafka.tools.GetOffsetShell --topic mykafka1 --broker-list node1:9092,node2:9092,node3:9092
## 查询 topic 在分区上的记录数 kafka-run-class.sh kafka.tools.GetOffsetShell --topic mykafka1 --broker-list node1:9092,node2:9092,node3:9092 --partitions 0
5、查询 topic 的消费者情况
kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --group 1 zkCli.sh -server node3:12181 # 查看zookeeper中的内容
问题:如何删除已经存在的topic
kafka-topics.sh --zookeeper node3:12181/kafka0.11 --delete --topic mykafka1
- 命令执行后有如下返回信息:
Topic mykafka1 is marked for deletion.
#主题mykafka1被标记为删除
Note: This will have no impact if delete.topic.enable is not set to true.
#注意:如果删除.topic,这将没有影响。未将enable设置为true。