Spring Boot 整合 kafka(环境搭建+演示)
  HptQjPcX5vAL 2023年12月12日 53 0

在这里插入图片描述


🏆本文收录于《Spring Boot从入门到精通》,专门攻坚指数提升,2023 年国内最系统+最强(更新中)。

本专栏致力打造最硬核Spring Boot 系列教程,从零基础到进阶系列学习内容,🚀均为全网独家首发,打造精品专栏,专栏持续更新中…欢迎大家订阅持续学习。

环境说明:Windows10 + Idea2021.3.2 + Jdk1.8 + SpringBoot 2.3.1.RELEASE

前言

随着互联网的发展,数据量的急剧增加,各种大数据技术也应运而生。Kafka是由Apache开发的分布式流处理平台,使用Kafka可以在系统之间实现高效、可靠地数据传输,并且可以处理大量的实时数据。

Spring Boot是一个基于Spring框架的快速开发脚手架,使用Spring Boot可以快速搭建Web应用、批处理应用等等。Spring Boot提供了丰富的功能和简化的配置,非常适合用于构建分布式系统。本文将介绍如何在Spring Boot中整合Kafka,并演示一个简单的案例。

摘要

本文将介绍如何在Spring Boot中整合Kafka,并演示一个简单的生产者、消费者应用程序。首先介绍如何搭建Kafka环境,然后介绍如何创建Kafka生产者和消费者。最后,我们将使用一个简单的案例来演示如何使用Spring Boot和Kafka实现消息的生产和消费。

简介

什么是Kafka

Kafka是由Apache开发的分布式流处理平台,是一种高吞吐量、分布式、可扩展的消息队列。Kafka的设计目标是:快速、可靠地处理实时数据流,例如日志数据和消息。Kafka具有以下特点:

  • 高吞吐量:Kafka能够处理每秒数百万条消息。
  • 可扩展性好:Kafka采用分布式的方式来处理数据,可以很方便地扩展。
  • 持久性好:Kafka将消息存储在磁盘上,不会因为系统宕机而丢失数据。
  • 高可靠性:Kafka采用副本机制,能够保证数据的可靠性。

什么是Spring Boot

Spring Boot是一个基于Spring框架的快速开发脚手架,使用Spring Boot可以快速搭建Web应用、批处理应用等等,Spring Boot提供了丰富的功能和简化的配置,非常适合用于构建分布式系统。

为什么要使用Kafka和Spring Boot

现在的Web应用越来越需要处理大量数据,Kafka是优秀的处理实时数据的工具,而Spring Boot是一个快速开发应用的框架,将两者结合起来,可以方便快捷地构建分布式系统。

环境搭建

安装Kafka

在开始之前,需要先下载并安装Kafka。Kafka可以在官网下载:https://kafka.apache.org/downloads

下载完成后,解压缩文件,在conf目录下找到server.properties文件,修改以下参数:

# 监听的端口号
listeners=PLAINTEXT://localhost:9092

# ZooKeeper的地址
zookeeper.connect=localhost:2181

创建Topic

在开始使用Kafka之前,需要先创建一个Topic。Topic类似于消息队列,是Kafka中用于存储消息的容器,一个Topic可以有多个Partition,每个Partition中存储着一部分数据。

创建Topic的命令如下:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

其中,replication-factor是副本数,partitions是分区数,可以根据需要进行修改。test是Topic名称,可以自定义。

测试Kafka

在创建Topic之后,可以使用以下命令启动Kafka:

bin/kafka-server-start.sh config/server.properties

启动Kafka后,可以使用以下命令向Topic发送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

在发送消息的命令行中输入一些消息,例如:

hello world

然后使用以下命令来读取消息:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

可以看到刚才发送的消息被读取出来:

hello world

至此,Kafka环境已经搭建完成。

Spring Boot整合Kafka

在开始整合Kafka之前,需要添加Kafka的依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.1</version>
</dependency>

创建Kafka Producer

首先,需要创建一个KafkaProducerConfig类,用于配置Kafka连接信息:

@Configuration
public class KafkaProducerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

在上面的代码中,我们配置了Kafka连接信息,并创建了一个生产者工厂和KafkaTemplate。

然后,可以创建一个Kafka Producer类,用于向指定Topic发送消息:

@Service
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    private final KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        LOGGER.info("sending message='{}' to topic='{}'", message, topic);
        kafkaTemplate.send(topic, message);
    }
}

在上面的代码中,我们注入了KafkaTemplate,并在sendMessage方法中使用它向指定的Topic发送消息。

创建Kafka Consumer

创建消费者和创建生产者类似,同样需要先创建一个KafkaConsumerConfig类,用于配置Kafka连接信息:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.topic}")
    private String topic;

    @Value("${kafka.group-id}")
    private String groupId;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public KafkaConsumer kafkaConsumer() {
        return new KafkaConsumer();
    }
}

在上面的代码中,我们配置了Kafka连接信息,并创建了一个消费者工厂和ConcurrentKafkaListenerContainerFactory。

然后,可以创建一个Kafka Consumer类,用于从指定Topic接收消息:

@Service
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(topics = "${kafka.topic}")
    public void receiveMessage(String message) {
        LOGGER.info("received message='{}'", message);
    }
}

在上面的代码中,我们使用@KafkaListener注解指定从哪个Topic接收消息,并在receiveMessage方法中处理接收到的消息。

编写演示案例

接下来,我们将使用一个简单的案例来演示如何使用Spring Boot和Kafka实现消息的生产和消费。

首先,我们需要在配置文件application.properties中配置Kafka连接信息:

kafka.bootstrap-servers=localhost:9092
kafka.topic=test
kafka.group-id=my-group

然后,我们可以创建一个Controller类,用于向Kafka发送消息:

@RestController
public class KafkaController {

    private final KafkaProducer producer;

    @Autowired
    public KafkaController(KafkaProducer producer) {
        this.producer = producer;
    }

    @PostMapping("/send")
    public void sendMessage(@RequestParam("message") String message) {
        producer.sendMessage("test", message);
    }
}

在上面的代码中,我们注入了KafkaProducer,并在sendMessage方法中通过它向指定的Topic发送消息。

最后,我们创建一个Kafka Consumer类,用于从指定Topic接收消息:

@Service
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.group-id}")
    public void receiveMessage(String message) {
        LOGGER.info("received message='{}'", message);
    }
}

在上面的代码中,我们使用@KafkaListener注解指定从哪个Topic接收消息,并在receiveMessage方法中处理接收到的消息。

至此,整个演示案例已经完成,可以通过发送HTTP请求来向Kafka发送消息,并在控制台中查看接收到的消息。

总结

本文介绍了如何在Spring Boot中整合Kafka,并演示了一个简单的生产者、消费者应用程序。首先介绍了如何搭建Kafka环境,然后介绍了如何创建Kafka生产者和消费者。最后,我们使用一个简单的案例来演示如何使用Spring Boot和Kafka实现消息的生产和消费。

Kafka具有高吞吐量、可扩展性好、持久性好、高可靠性等特点,非常适合用于处理大量实时数据。而Spring Boot是一个快速开发应用的框架,可以方便快捷地构建分布式系统。将Kafka和Spring Boot结合起来,可以轻松地构建高效、可靠的分布式系统。

附录源码

  如上涉及所有源码均已上传同步在「GitHub」,提供给同学们一对一参考学习,辅助你更迅速的掌握。

☀️建议/推荐你


无论你是计算机专业的学生,还是对编程有兴趣的小伙伴,都建议直接毫无顾忌的学习此专栏「滚雪球学Spring Boot」,从入门到精通,凡是学习此专栏的同学,均能获取到所需的知识和技能,全网最快速入门SpringBoot,就像滚雪球一样,越滚越大,指数级提升。

  最后,如果这篇文章对你有所帮助,帮忙给作者来个一键三连,关注、点赞、收藏,您的支持就是我坚持写作最大的动力。

  同时欢迎大家关注公众号:「猿圈奇妙屋」 ,以便学习更多同类型的技术文章,免费白嫖最新BAT互联网公司面试题、4000G pdf电子书籍、简历模板、技术文章Markdown文档等海量资料。

📣关于我

我是bug菌,CSDN | 掘金 | InfoQ | 51CTO 等社区博客专家,历届博客之星Top30,掘金年度人气作者Top40,51CTO年度博主Top12,华为云 | 阿里云| 腾讯云等社区优质创作者,全网粉丝合计15w+ ;硬核微信公众号「猿圈奇妙屋」,欢迎你的加入!免费白嫖最新BAT互联网公司面试题、4000G pdf电子书籍、简历模板等海量资料。


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年12月12日 0

暂无评论

推荐阅读
HptQjPcX5vAL