CentOS7安装配置Kafka3.2.0(含SpringBoot连接测试)
  Bd1YdjdJDskC 2023年11月02日 70 0


目录

​​一、安装jdk(已安装可以不用管)​​

​​二、下载Kafka​​

​​三、配置Kafka​​

​​四、启动Kafka​​

​​五、SpringBoot连接测试​​

一、安装jdk(已安装可以不用管)

Kafka需要依赖jdk,这里先安装jdk 

【1】卸载jdk

有时候会因为各种原因我们需要卸载jdk,这里也记录一下

 # 查看安装的jdk  rpm -qa|grep -i jdk

CentOS7安装配置Kafka3.2.0(含SpringBoot连接测试)_java

 # 卸载jdk(根据自己的包名来进行卸载)  rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.332.b09-1.el7_9.x86_64  rpm -e --nodeps java-1.8.0-openjdk-1.8.0.332.b09-1.el7_9.x86_64  rpm -e --nodeps java-1.8.0-openjdk-devel-1.8.0.332.b09-1.el7_9.x86_64 # 卸载后查看jdk版本,发现找不到,卸载成功  java -version

【2】安装jdk

官网下载:​​Java Downloads | Oracle​​ 找到 tar.gz 结尾的进行下载,上传到服务器进行安装配置

 # 解压(我的解压目录为:/software/jdk/jdk1.8.0_333)  tar -zxvf jdk-8u333-linux-x64.tar.gz  ​  # 打开配置文件进行配置  vim /etc/profile  ​  # 添加如下配置(根据自己的路径来)  export JAVA_HOME=/software/jdk/jdk1.8.0_333  export JRE_HOME=/$JAVA_HOME/jre  export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar  export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin  ​  # 使配置文件生效  source /etc/profile  ​  # 查看是否安装成功  java -version

二、下载Kafka

官网:​​Apache Downloads​

CentOS7安装配置Kafka3.2.0(含SpringBoot连接测试)_zookeeper_02

根据对应的版本,使用wget下载,如果下载速度太慢,可以在Windows上通过下载器下载,然后上传到服务器上

 wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz --no-check-certificate

上传后进行解压

 tar -axvf kafka_2.13-3.2.0.tgz

注,解压路径不宜过长,否则后面启动的时候可能会报错,所以这里我将kafka_2.13-3.2.0重命名为kafka

 mv kafka_2.13-3.2.0 kafka

解压后目录结构如下:

CentOS7安装配置Kafka3.2.0(含SpringBoot连接测试)_spring boot_03

三、配置Kafka

这里只是单机kafka的配置,并不是集群的配置

【1】配置kafka的log路径

打开config目录下的server.properties文件修改配置

 # 打开配置文件  vim config/server.properties  ​  # 配置日志存放位置(启动的时候会自己在相应的目录下创建kafka-logs文件夹)  log.dirs=/software/kafka/kafka-logs

【2】配置外网访问

还是修改server.properties文件,找到下面配置进行修改,位置大概在34行左右

 # 放开注解  listeners=PLAINTEXT://:9092  ​  # 修改注解,改成自己服务器ip  advertised.listeners=PLAINTEXT://42.104.249.49:9092

CentOS7安装配置Kafka3.2.0(含SpringBoot连接测试)_java_04

【3】配置zookeeper数据路径

先在解压目录下创建zookeeper文件夹用来存放数据,再打开config目录下的zookeeper.properties文件修改配置

 # 打开配置文件  vim config/zookeeper.properties  ​  # 配置数据存放位置(启动的时候会自己在相应的目录下创建zookeeper文件夹)  dataDir=/software/kafka/zookeeper

CentOS7安装配置Kafka3.2.0(含SpringBoot连接测试)_zookeeper_05

【4】放通端口

 # 放通9092端口  firewall-cmd --zone=public --add-port=9092/tcp --permanent  ​  # 重启防火墙  firewall-cmd --reload

注意这里除了放通centos的端口,还要将安全组的端口也放通,比如我这里用的是腾讯云,就要上腾讯云的控制台开放相应的安全组端口

四、启动Kafka

 # 启动zookeeper  bin/zookeeper-server-start.sh config/zookeeper.properties  ​  # 启动kafka  bin/kafka-server-start.sh config/server.properties  ​  # 生产消息(创建名为testTopic的主题)  bin/kafka-console-producer.sh --topic testTopic --bootstrap-server localhost:9092  ​  # 监听消息(重开一个终端监听testTopic主题的消息,在生产消息的终端发消息,此终端收消息)  bin/kafka-console-consumer.sh --topic testTopic --from-beginning --bootstrap-server localhost:9092

在监听消息的终端能够收到消息,则说明安装配置成功

五、SpringBoot连接测试

目录结构如下:

CentOS7安装配置Kafka3.2.0(含SpringBoot连接测试)_spring boot_06

【1】引入pom依赖

 <dependency>      <groupId>org.apache.kafka</groupId>      <artifactId>kafka-clients</artifactId>      <version>3.2.0</version>  </dependency>  <dependency>      <groupId>org.projectlombok</groupId>      <artifactId>lombok</artifactId>  </dependency>

【2】配置application.yml

bootstrap-servers配置为自己服务器的IP

 spring:   kafka:      # 消费者     consumer:       group-id: foo       auto-offset-reset: earliest       bootstrap-servers: 42.104.249.49:9092      # 生产者     producer:       bootstrap-servers: 42.104.249.49:9092       key-serializer: org.apache.kafka.common.serialization.StringSerializer       value-serializer: org.apache.kafka.common.serialization.StringSerializer

【3】生产消息

生产消息通过controller访问接口来进行测试

@RestController  public class KafkaProducerController {            @Autowired      private KafkaTemplate kafkaTemplate;            @GetMapping("/send")      public void send() {          // testTopic为主题,和上面终端测试的保持一致,方便测试,onestar为要发送的消息          kafkaTemplate.send("testTopic","onestar");     }  }

【4】监听消费消息

@Component  @Slf4j  public class KafkaConsumer {      /**       * kafka的监听器,topic为"testTopic",消费者组默认为配置文件里面的       */      @KafkaListener(topics = {"testTopic"})      public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {          Optional<?> optional = Optional.ofNullable(consumerRecord.value());          if (optional.isPresent()) {              Object msg = optional.get();              log.info("message:{}", msg);         }     }  }

【5】测试

为了方便查看,我们可以将上面的监听消息终端开着,然后运行代码,通过访问 ​​http://localhost:8080/send​​ 进行测试

CentOS7安装配置Kafka3.2.0(含SpringBoot连接测试)_kafka_07

从打印的日志可以看到消息已经消费了,并且在监听消息的终端也打印出消息

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  4crWjjQBqFOy   2023年11月13日   28   0   0 javamavenandroid
  wpWn7yzs0oKF   2023年11月13日   32   0   0 javaapacheHDFS
Bd1YdjdJDskC