2.消息队列kafka
  IS4yhiOomKTv 2023年11月02日 45 0

消息队列kafka

应用场景
削峰填谷 异步解耦 顺序收发 分布式事务一致性 大数据分析 分布式缓存同步 蓄流压测
kafka优点
高吞吐量
可扩展
永久存储
高可用性
多副本和分区
replication:副本
partition:分区,分区不要超过节点数

分区的优势:
1、实现存储空间的横向扩容,将多个kafka服务器的空间结合利用
2、提升性能,多服务器读写
3、实现高可用
kafka写入消息的流程
1、生产者先从集群获取分区的leader
2、生产者将消息发送给leader
3、leader将消息写入本地文件
4、follower从leader pull消息
5、follower将消息写入本地后向leader反馈写入成功
6、leader只要接收到一个follower的反馈之后就向生产者反馈写入成功

kafka集群部署

环境准备zookeeper

#在三个Ubuntu20.04节点提前部署zookeeper和kafka三个节点复用
10.0.0.100
10.0.0.101
10.0.0.102
#注意:生产中zookeeper和kafka一般是分开独立部署的,kafka安装前需要安装java环境

确保三个节点的zookeeper启动

[root@node0 ~]#zkServer.sh status
各节点部署 Kafka

kafka下载链接:

http://kafka.apache.org/downloads
Kafka 节点配置

配置文件说明

配置文件
vim /usr/local/kafka/config/server.properties
broker.id=1  #每个broker在集群中每个节点的正整数唯一标识,此值保存在log.dirs下的 meta.properties文件
listeners=PLAINTEXT://10.0.0.100:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
log.dirs=/usr/local/kafka/data  #kakfa用于保存数据的目录,所有的消息都会存储在该目录当中
num.partitions=1   #设置创建新的topic时默认分区数量,建议和kafka的节点数量一致
default.replication.factor=2   #指定默认的副本数为3,可改为2,可以实现故障的自动转移
log.retention.hours=168        #设置kafka中消息保留时间,默认为168小时即7天
zookeeper.connect=10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181
#指定连接的zk的地址,zk中存储了broker的元数据信息
zookeeper.connection.timeout.ms=6000 #设置连接zookeeper的超时时间,单位为ms,默认6秒钟
部署范例:
第一步:所有节点
#在所有节点上执行安装java(如果和zookeeper复用,则不用再下载)
[root@node0 ~]#apt install openjdk-8-jdk -y

#国内镜像下载
[root@node0 ~]#wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.3.1/kafka_2.13-3.3.1.tgz

#解压缩设值软链接
[root@node0 ~]#tar xf kafka_2.13-2.7.0.tgz -C /usr/local/
[root@node0 ~]#ln -s /usr/local/kafka_2.13-2.7.0/ /usr/local/kafka

#配置PATH变量
[root@node0 ~]#echo 'PATH=/usr/local/kafka/bin:$PATH' > /etc/profile.d/kafka.sh
[root@node0 ~]#. /etc/profile.d/kafka.sh

#修改配置文件
[root@node0 ~]#vim /usr/local/kafka/config/server.properties
broker.id=1  #每个broker在集群中每个节点的正整数唯一标识,此值保存在log.dirs下的 meta.properties文件
listeners=PLAINTEXT://10.0.0.100:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
log.dirs=/usr/local/kafka/data  #kakfa用于保存数据的目录,所有的消息都会存储在该目录当中
num.partitions=1   #设置创建新的topic时默认分区数量,建议和kafka的节点数量一致
default.replication.factor=2   #指定默认的副本数为3,可改为2,可以实现故障的自动转移
log.retention.hours=168        #设置kafka中消息保留时间,默认为168小时即7天
zookeeper.connect=10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181
#指定连接的zk的地址,zk中存储了broker的元数据信息
zookeeper.connection.timeout.ms=6000 #设置连接zookeeper的超时时间,单位为ms,默认6秒钟

#准备数据目录
[root@node0 ~]#mkdir /usr/local/kafka/data
第二步:修改第二个节点配置
[root@node1 ~]#vim /usr/local/kafka/config/server.properties
broker.id=2  #每个broker在集群中每个节点的正整数唯一标识,此值保存在log.dirs下的 meta.properties文件
listeners=PLAINTEXT://10.0.0.101:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
第三步:修改第三个节点配置
[root@node2 ~]#vim /usr/local/kafka/config/server.properties
broker.id=3  #每个broker在集群中每个节点的正整数唯一标识,此值保存在log.dirs下的 meta.properties文件
listeners=PLAINTEXT://10.0.0.103:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
第四步:启动服务
在所有kafka节点执行下面操作
vim /usr/local/kafka/bin/kafka-server-start.sh 
if[ " x$KAFKA_HEAP_OPTS"="x"] ; then
    export KAFKA_HEAP_OPTS=" -Xmx1G-Xms1G"  #可以调整内存
fi

调用脚本启动
kafka-server-start.sh -daemon 
/usr/local/kafka/config/server.properties
第五步:准备Kafka的service文件
cat /lib/systemd/system/kafka.service 
[unit] 
Description=Apache kafka 
After=network.target 

[service] Type=simple 
#Environment=JAVA_HOME=/data/server/java 
PIDFile=/usr/local/kafka/kafka.pid 
Execstart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server. properties 
Execstop=/bin/kill  -TERM ${MAINPID} 
Restart=always 
RestartSec=20 

[Install]
wantedBy=multi-user.target

systemctl daemon-load 
systemctl restart kafka.service
脚本一键部署kafka三台机器(包括zookeeper、java、kafak)
[root@ubuntu2004 ~]#cat install_kafka.sh 
#!/bin/bash
#

KAFKA_VERSION=3.3.1
#KAFKA_VERSION=3.2.0
SCALA_VERSION=2.13
#KAFKA_VERSION=-3.0.0
KAFKA_URL="https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
#KAFKA_URL="https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.1/kafka_2.13-2.8.1.tgz"
#KAFKA_URL="https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz"
ZK_VERSOIN=3.6.3
ZK_URL="https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/apache-zookeeper-${ZK_VERSOIN}-bin.tar.gz"

ZK_INSTALL_DIR=/usr/local/zookeeper
KAFKA_INSTALL_DIR=/usr/local/kafka

NODE1=10.0.0.100
NODE2=10.0.0.101
NODE3=10.0.0.102

HOST=`hostname -I|awk '{print $1}'`
.  /etc/os-release


color () {
    RES_COL=60
    MOVE_TO_COL="echo -en \\033[${RES_COL}G"
    SETCOLOR_SUCCESS="echo -en \\033[1;32m"
    SETCOLOR_FAILURE="echo -en \\033[1;31m"
    SETCOLOR_WARNING="echo -en \\033[1;33m"
    SETCOLOR_NORMAL="echo -en \E[0m"
    echo -n "$1" && $MOVE_TO_COL
    echo -n "["
    if [ $2 = "success" -o $2 = "0" ] ;then
        ${SETCOLOR_SUCCESS}
        echo -n $"  OK  "    
    elif [ $2 = "failure" -o $2 = "1"  ] ;then 
        ${SETCOLOR_FAILURE}
        echo -n $"FAILED"
    else
        ${SETCOLOR_WARNING}
        echo -n $"WARNING"
    fi
    ${SETCOLOR_NORMAL}
    echo -n "]"
    echo 
}


install_jdk() {
    if [ $ID = 'centos' -o  $ID = 'rocky' ];then
        yum -y install java-1.8.0-openjdk-devel || { color "安装JDK失败!" 1; exit 1; }
    else
        apt update
        apt install openjdk-8-jdk -y || { color "安装JDK失败!" 1; exit 1; } 
    fi
    java -version
}

zk_myid () {
read -p "请输入node编号(默认为 1): " MYID

if [ -z "$MYID" ] ;then
    MYID=1
elif [[ ! "$MYID" =~ ^[0-9]+$ ]];then
    color  "请输入正确的node编号!" 1
    exit
else
    true
fi
}

install_zookeeper() {
    wget -P /usr/local/src/ $ZK_URL || { color  "下载失败!" 1 ;exit ; }
    tar xf /usr/local/src/${ZK_URL##*/} -C `dirname ${ZK_INSTALL_DIR}`
    ln -s /usr/local/apache-zookeeper-*-bin/ ${ZK_INSTALL_DIR}
    echo 'PATH=${ZK_INSTALL_DIR}/bin:$PATH' >  /etc/profile.d/zookeeper.sh
    .  /etc/profile.d/zookeeper.sh
    mkdir -p ${ZK_INSTALL_DIR}/data 
    echo $MYID > ${ZK_INSTALL_DIR}/data/myid
    cat > ${ZK_INSTALL_DIR}/conf/zoo.cfg <<EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=${ZK_INSTALL_DIR}/data
clientPort=2181
maxClientCnxns=128
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
server.1=${NODE1}:2888:3888
server.2=${NODE2}:2888:3888
server.3=${NODE3}:2888:3888
EOF
   
	cat > /lib/systemd/system/zookeeper.service <<EOF
[Unit]
Description=zookeeper.service
After=network.target

[Service]
Type=forking
#Environment=${ZK_INSTALL_DIR}
ExecStart=${ZK_INSTALL_DIR}/bin/zkServer.sh start
ExecStop=${ZK_INSTALL_DIR}/bin/zkServer.sh stop
ExecReload=${ZK_INSTALL_DIR}/bin/zkServer.sh restart

[Install]
WantedBy=multi-user.target
EOF
    systemctl daemon-reload
    systemctl enable --now  zookeeper.service
    systemctl is-active zookeeper.service
    if [ $? -eq 0 ] ;then 
        color "zookeeper 安装成功!" 0  
    else 
        color "zookeeper 安装失败!" 1
        exit 1
    fi  
}


install_kafka(){
    wget -P /usr/local/src  $KAFKA_URL  || { color  "下载失败!" 1 ;exit ; }
    tar xf /usr/local/src/${KAFKA_URL##*/}  -C /usr/local/
    ln -s ${KAFKA_INSTALL_DIR}_*/ ${KAFKA_INSTALL_DIR}
    echo PATH=${KAFKA_INSTALL_DIR}/bin:'$PATH' > /etc/profile.d/kafka.sh
    . /etc/profile.d/kafka.sh
   
    cat > ${KAFKA_INSTALL_DIR}/config/server.properties <<EOF
broker.id=$MYID
listeners=PLAINTEXT://${HOST}:9092
log.dirs=${KAFKA_INSTALL_DIR}/data
num.partitions=1
log.retention.hours=168
zookeeper.connect=${NODE1}:2181,${NODE2}:2181,${NODE3}:2181
zookeeper.connection.timeout.ms=6000
EOF
    mkdir ${KAFKA_INSTALL_DIR}/data
	 
    cat > /lib/systemd/system/kafka.service <<EOF
[Unit]                                                                          
Description=Apache kafka
After=network.target

[Service]
Type=simple
#Environment=JAVA_HOME=/data/server/java
#PIDFile=${KAFKA_INSTALL_DIR}/kafka.pid
ExecStart=${KAFKA_INSTALL_DIR}/bin/kafka-server-start.sh  ${KAFKA_INSTALL_DIR}/config/server.properties
ExecStop=/bin/kill  -TERM \${MAINPID}
Restart=always
RestartSec=20

[Install]
WantedBy=multi-user.target

EOF
	systemctl daemon-reload
	systemctl enable --now kafka.service
	#kafka-server-start.sh -daemon ${KAFKA_INSTALL_DIR}/config/server.properties 
	systemctl is-active kafka.service
	if [ $? -eq 0 ] ;then 
        color "kafka 安装成功!" 0  
    else 
        color "kafka 安装失败!" 1
        exit 1
    fi    
}



zk_myid

install_jdk

install_zookeeper

install_kafka
kafka分区、副本有什么作用
分区:负载均衡,个数建议和节点数量相同
副本:高可用 至少两个副本
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  jnZtF7Co41Wg   2023年11月22日   23   0   0 linuxApacheci
  jnZtF7Co41Wg   2023年11月19日   26   0   0 ubuntuApachecentos
  9JCEeX0Eg8g4   2023年12月10日   32   0   0 应用程序javaApache
  OGG2zAst6hx8   2023年11月26日   32   0   0 bootstrapServerkafka
  9JCEeX0Eg8g4   2023年11月13日   37   0   0 bootstrapzookeeperkafka
  KRsXEGSB49bk   2023年11月27日   30   0   0 javaApache
  jnZtF7Co41Wg   2023年11月24日   31   0   0 mysqlApachecentos
  ETWZF7L1nsXD   2023年11月13日   36   0   0 mavenDockerApache
  xwGmYGXf1w4S   2023年11月22日   44   0   0 tomcatjavaApache