1.二进制部署带sasl 认证的kafka集群
1.1 下载kafka
3台机器都要执行
wget -c https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar zxvf kafka_2.13-3.4.0.tgz
mv kafka_2.13-3.4.0 /data/kafka
sed -i 's/-Xmx1G -Xms1G/-Xmx8G -Xms8G/' /data/kafka/bin/kafka-server-start.sh
配置环境变量
cat /data/kafka
cat <<'EOF'> /etc/profile.d/kafka.sh
export KAFKA_HOME=/data/kafka
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source /etc/profile.d/kafka.sh
重启脚本
cat <<EOF> /data/kafka/restart.sh
#!/bin/bash
kafka-server-stop.sh
nohup kafka-server-start.sh config/server.properties >> /data/kafka/nohup.out 2>&1 &
EOF
chmod +x /data/kafka/restart.sh
source /etc/profile.d/kafka.sh
用于监控的配置,修改 bin/kafka-server-start.sh,增加 JMX_PORT,可以获取更多指标。
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx8G -Xms8G"
export JMX_PORT="9099"
fi
1.2 配置zookeeper
修改Zookeeper配置文件
cp /data/kafka/config/zookeeper.properties /data/kafka/config/zookeeper.properties.bak
cat >/data/kafka/config/zookeeper.properties <<EOF
dataDir=/data/zookeeper
clientPort=2181
maxClientCnxns=60
# 为zk的基本时间单元,毫秒
tickTime=2000
# Leader-Follower初始通信时限 tickTime*10
initLimit=10
# Leader-Follower同步通信时限 tickTime*5
syncLimit=5
server.1=10.15.61.1:2888:3888
server.2=10.15.61.2:2888:3888
server.3=10.15.61.3:2888:3888
admin.enableServer=false
EOF
创建Zookeeper集群myid
Zookeeper集群模式下需要配置myid文件,该需要放在dataDir根目录下。文件填写一个数字,数字就是A的值。
#10.15.61.1创建myid,方法如下:mkdir -p /data/zookeeper/
echo "1" > /data/zookeeper/myid
#10.15.61.3创建myid,方法如下:mkdir -p /data/zookeeper/
echo "2" > /data/zookeeper/myid
#10.15.61.3创建myid,方法如下:mkdir -p /data/zookeeper/
echo "3" > /data/zookeeper/myid
3台机器启动zookeeper
zookeeper-server-start.sh -daemon config/zookeeper.properties
1.3 配置kafka
修改Kafka配置文件
修改3台服务器Kafka配置文件server.properties的内容如下:
10.15.61.1
cat >/data/kafka/config/server.properties <<EOF
broker.id=0
# sasl
listeners=SASL_PLAINTEXT://10.15.61.1:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
# sasl
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.15.61.1:2181,10.15.61.2:2181,10.15.61.3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
10.15.61.2
cat >/data/kafka/config/server.properties <<EOF
broker.id=1
listeners=SASL_PLAINTEXT://10.15.61.2:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.15.61.1:2181,10.15.61.2:2181,10.15.61.3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
10.15.61.3
cat >/data/kafka/config/server.properties <<EOF
broker.id=2
listeners=SASL_PLAINTEXT://10.15.61.3:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.15.61.1:2181,10.15.61.2:2181,10.15.61.3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
config 目录下新建 2 个文件
cat <<EOF> /data/kafka/config/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin@123"
user_admin="admin@123"
user_alice="admin@123";
};
EOF
cat <<EOF> /data/kafka/config/kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin@123";
};
EOF
修改启动参数
sed -i '$i\export KAFKA_OPTS="-Djava.security.auth.login.config=/data/kafka/config/kafka_server_jaas.conf"' /data/kafka/bin/kafka-server-start.sh
sed -i '$i\export KAFKA_OPTS="-Djava.security.auth.login.config=/data/kafka/config/kafka_client_jaas.conf"' /data/kafka/bin/kafka-console-consumer.sh
sed -i '$i\export KAFKA_OPTS="-Djava.security.auth.login.config=/data/kafka/config/kafka_client_jaas.conf"' /data/kafka/bin/kafka-console-producer.sh
在config文件夹新建文件sasl.conf,用来认证
cat >/data/kafka/config/sasl.conf <<EOF
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
EOF
创建topic
# 设置环境变量指定jaas文件
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/kafka/config/kafka_client_jaas.conf"
# 创建topic
bin/kafka-topics.sh --create --topic demo-topic --bootstrap-server 10.15.61.1:9092 --command-config config/sasl.conf
罗列目前的topic
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/kafka/config/kafka_client_jaas.conf"
bin/kafka-topics.sh --bootstrap-server 10.15.61.1:9092 --command-config config/sasl.conf --list
往topic写消息,可以使用Ctrl-C取消
设置环境变量指定jaas文件
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/kafka/config/kafka_client_jaas.conf"
bin/kafka-console-producer.sh --topic demo-topic --bootstrap-server 10.15.61.1:9092 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
消费topic消息
另开一个终端会话消费消息,使用Ctrl-C取消
# 设置环境变量指定jaas文件
export KAFKA_OPTS="-Djava.security.auth.login.config=/data/kafka/config/kafka_client_jaas.conf"
bin/kafka-console-consumer.sh --topic demo-topic --from-beginning --bootstrap-server 10.15.61.1:9092 --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
2.创建kafka-exporter服务
kafka-exporter-a.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-exporter
namespace: test
labels:
app: kafka-exporter
spec:
selector:
matchLabels:
app: kafka-exporter
template:
metadata:
labels:
app: kafka-exporter
spec:
containers:
- name: kafka-exporter
image: danielqsj/kafka-exporter
#args: ["--kafka.server=kafka-headless:9092"] #kafka单机监控
args: ["--kafka.server=10.15.61.1:9092","--kafka.server=10.15.61.2:9092","--kafka.server=10.15.61.3:9092","--sasl.enabled","--sasl.mechanism=plain","--sasl.username=admin","--sasl.password=admin@123"]
ports:
- containerPort: 9308
name: metrics
resources:
limits:
cpu: 250m
memory: 300Mi
requests:
cpu: 50m
memory: 10Mi
---
apiVersion: v1
kind: Service
metadata:
name: kafka-exporter
labels:
app: kafka-exporter
ns: test
namespace: test
spec:
type: ClusterIP
#clusterIP: None
ports:
- name: metrics
protocol: TCP
port: 9308
targetPort: 9308
selector:
app: kafka-exporter
3.创建servicemonitor
servicemonitor.yaml
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: kafka-exporter
labels:
prometheus: kube-prometheus
app: kafka-exporter
namespace: monitoring
spec:
endpoints:
- port: metrics
interval: 30s
scrapeTimeout: 30s
path: /metrics
namespaceSelector:
any: true
#namespaceSelector:
# matchNames:
# - kafka
selector:
matchLabels:
app: kafka-exporter
4.创建完毕之后,查看kafka的metrics
curl http://10.244.37.217:9308/metrics
# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 5.703e-05
go_gc_duration_seconds{quantile="0.25"} 7.2151e-05
go_gc_duration_seconds{quantile="0.5"} 8.855e-05
go_gc_duration_seconds{quantile="0.75"} 0.00012492
go_gc_duration_seconds{quantile="1"} 0.00025913
go_gc_duration_seconds_sum 0.326197401
go_gc_duration_seconds_count 3052
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 10
# HELP go_info Information about the Go environment.
# TYPE go_info gauge
go_info{version="go1.20.4"} 1
# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use.
# TYPE go_memstats_alloc_bytes gauge
go_memstats_alloc_bytes 3.795744e+06
# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed.
# TYPE go_memstats_alloc_bytes_total counter
go_memstats_alloc_bytes_total 8.21988516e+09
# HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table.
# TYPE go_memstats_buck_hash_sys_bytes gauge
go_memstats_buck_hash_sys_bytes 1.533884e+06
# HELP go_memstats_frees_total Total number of frees.
# TYPE go_memstats_frees_total counter
go_memstats_frees_total 1.8042409e+07
# HELP go_memstats_gc_sys_bytes Number of bytes used for garbage collection system metadata.
# TYPE go_memstats_gc_sys_bytes gauge
go_memstats_gc_sys_bytes 4.23324e+06
# HELP go_memstats_heap_alloc_bytes Number of heap bytes allocated and still in use.
# TYPE go_memstats_heap_alloc_bytes gauge
go_memstats_heap_alloc_bytes 3.795744e+06
# HELP go_memstats_heap_idle_bytes Number of heap bytes waiting to be used.
# TYPE go_memstats_heap_idle_bytes gauge
go_memstats_heap_idle_bytes 1.3770752e+07
# HELP go_memstats_heap_inuse_bytes Number of heap bytes that are in use.
# TYPE go_memstats_heap_inuse_bytes gauge
go_memstats_heap_inuse_bytes 5.85728e+06
# HELP go_memstats_heap_objects Number of allocated objects.
# TYPE go_memstats_heap_objects gauge
go_memstats_heap_objects 5194
# HELP go_memstats_heap_released_bytes Number of heap bytes released to OS.
# TYPE go_memstats_heap_released_bytes gauge
go_memstats_heap_released_bytes 1.0469376e+07
# HELP go_memstats_heap_sys_bytes Number of heap bytes obtained from system.
# TYPE go_memstats_heap_sys_bytes gauge
go_memstats_heap_sys_bytes 1.9628032e+07
# HELP go_memstats_last_gc_time_seconds Number of seconds since 1970 of last garbage collection.
# TYPE go_memstats_last_gc_time_seconds gauge
5.在prometheus中查看kafka-exporter Targets
6.grafana配置kafka监控图表
导入模板
https://grafana.com/grafana/dashboards/7589
效果如下