一、前置环境准备
准备三台Linux
主机名 | IP |
---|---|
node1 | 192.168.100.100 |
node2 | 192.168.100.101 |
node3 | 192.168.100.102 |
配置HOSTS (3台都配置)
echo '
192.168.100.100 node1
192.168.100.101 node2
192.168.100.102 node3' >> /etc/hosts
关闭防火墙 (3台都配置)
sudo systemctl stop firewalld
sudo systemctl disable firewalld
SSH免密码访问(3台相互配置)
yum install ssh #安装ssh
ssh-keygen -t rsa # 执行
ssh-copy-id node1 # node2、node3执行
ssh-copy-id node2 # node1、node3执行
ssh-copy-id node3 # node1、node2执行
安装JDK17
# 创建安装路径
mkdir /opt/jdk17
# 下载jdk17最新版本
wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz -P /opt/jdk17/
# 解压
tar xf /opt/jdk17/jdk-17_linux-x64_bin.tar.gz -C /opt/jdk17/
# 查看jdk17盘本
ls /opt/jdk17
# 修改环境变量 路径和版本需确认
echo '
export JAVA_HOME=/opt/jdk17/jdk-17.0.8
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
export PATH=$JAVA_HOME/bin:$PATH' >> /etc/profile
# 使环境变量生效
source /etc/profile
# 查看java版本 验证是否安装成功
java -version
二、Kafka安装配置
下载kafka
# 创建kafka安装路径
mkdir /opt/kafka
# 获取kafka
wget https://dlcdn.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz -P /opt/kafka --no-check-certificate
# 解压
tar xzf /opt/kafka/kafka_2.13-3.5.1.tgz -C /opt/kafka/
修改kafka配置(无zookeeper版本)
# 进入kafka_2.13-3.5.1
cd /opt/kafka/kafka_2.13-3.5.1/
# 修改配置(无zookeeper版本)
vim config/kraft/server.properties
# 修改如下图 :set nu 显示行号
格式化存储目录
# node1节点执行
cd /opt/kafka/kafka_2.13-3.5.1/
# 生成集群唯一ID node1节点执行
./bin/kafka-storage.sh random-uuid
# 格式化目录 所有节点执行
./bin/kafka-storage.sh format \
-t oPW30vNpQPWSZYaj_TogKA \
-c ./config/kraft/server.properties
启动集群
# 启动集群
cd /opt/kafka/kafka_2.13-3.5.1/
./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
# 查询集群情况 失败可能情况 防火墙没关
jps
三、验证安装情况
创建topic
# node1 执行 创建 topic
./bin/kafka-topics.sh --create --topic hello-world --partitions 3 --replication-factor 3 --bootstrap-server node1:9092
# node2、node3 执行查看
./bin/kafka-topics.sh --list --bootstrap-server
./bin/kafka-topics.sh
:这是Kafka主题管理脚本的路径。--create
:这个参数表示你想要创建一个新的主题。--topic hello-world
:这里的hello-world
创建的主题的名称。--partitions 3
:这个参数用于设置主题的分区数,这里设置为3。--replication-factor 3
:这个参数用于设置每个分区的副本数,这里设置为3。副本可以在分区数据丢失时提供数据恢复。--bootstrap-server node1:9092
:Kafka服务器的地址和端口。
生产者、消费者
# 开启推送 node1
./bin/kafka-console-producer.sh --broker-list node1:9092 --topic hello-world
# 开启消费 node2、node3
./bin/kafka-console-consumer.sh --bootstrap-server node2:9092 --topic hello-world
./bin/kafka-console-consumer.sh --bootstrap-server node3:9092 --topic hello-world
四、集群启动、关闭脚本
编写脚本
echo '
#!/bin/bash
#set -x
brokers="node1 node2 node3"
kafka_home="/opt/kafka/kafka_2.13-3.5.1"
start() {
for i in $brokers
do
echo "Starting kafka on ${i} ... "
ssh root@$i "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-start.sh ${kafka_home}/config/kraft/server.properties > /dev/null 2>&1 &"
if [[ $? -ne 0 ]]; then
echo "Start kafka on ${i} is OK !"
fi
done
echo kafka kafka are started !
}
stop() {
for i in $brokers
do
echo "Stopping kafka on ${i} ..."
ssh root@$i "source /etc/profile;bash ${kafka_home}/bin/kafka-server-stop.sh"
if [[ $? -ne 0 ]]; then
echo "Stopping ${kafka_home} on ${i} is down"
fi
done
echo all kafka are stopped !
}
case "$1" in
start)
start
;;
stop)
stop
;;
*)
echo "Usage: start|stop"
;;
esac' >> kafka.sh
执行、停止kafka集群
# 赋执行权限
chmod +x kafka.sh
# 停止kfka
./kafka.sh stop
# 开启集群kafka
./kafka.sh start