SpringBoot整合Kafka集群
  vxNQtvtQlfbi 2023年11月02日 47 0

前言

SpringBoot中集成Kafka,主要目的干啥呢,当然消息推送啦。不同系统之间,自身系统不同组件之间消息通信的一种方式,也可以是使用MQ。

使用消息系统的目的主要就是为了解耦、异步通信、消峰处理。

消息系统三大优点

解耦

怎么理解呢,比如我是A系统,我要现在要给B、C两个系统发送消息,如果不用消息系统,直接调用,就相当于A系统跟B、C系统强耦合到一起了,如果后面还有D、E......等系统怎么办呢,我总不能挨着挨着一个一个写吧,这样代码耦合太高了,而且我还得考虑别人收到没有,处理成功失败等等情况。那我使用消息系统不就解决这个问题了嘛,我直管向Kafka推送消息,我才不管你谁来消费呢,你想消费你就去消费Kafka消息。这不就解耦了嘛。

异步

系统A调用其他系统接口的时候,需要一直等待其他系统处理完成它的业务逻辑后,返回处理结果,我才能继续处理我的业务逻辑,但是它的业务逻辑我其实不关心,我没必要等啊。

那用异步不就好了吗,我将消息发给Kafka,你其他系统去消费就行了,我监听你返回的处理结果就行了,我发送完后就可以继续做其他操作了,不用一直等着。

消峰

比如电商在秒杀的时候,用户量暴增,但是它又只是一小段时间的爆发量。如果不处理,那服务器不得直接挂了。怎么办呢?那我们用Kafka啊,来了请求我就推送消息到Kafka,然后呢,消费设置一秒消费多少就好,然后等高峰过去,用户量回归正常,积累的消息也会慢慢的被消费完。这样咱不是又能愉快的玩耍了嘛。

Kafka术语

  • Producer:生产者,消息的生产者,负责推送消息到Kafka队列

  • Consumer:消费者,负责消费Kafka队列中的消息

  • Consumer group:用来实现消息广播(发给多个Consumer)或单播(发给单个Consumer)的手段

  • Offset:kafka存储消息的偏移量,可以理解为下标用来控制消息的消费位置

  • Broker:Kafka服务节点,一个kafka服务器就是一个Broker,一个集群由多个Broker组成,一个 Broker下可以有多个Topic

  • Topic:消息的类别、标题,可以理解为是一个消息的队列

  • Partition:属于Topic的子集,消息分区,一个Topic可以有多个partition,一个partition在物理上对 应了一个文件夹;partition中的所有消息都会分配一个offset

Kafka消息模式

点对点消息传递模式:

一个消息推送出去,只能被消费一次,任何一个消费者消费了该消息后,其他消费者都不能继续消费该消息

发布-订阅模式:

消息发布到队列,所有订阅了topic的的消费者都可以消费topic里的所有消息。

一、SpringBoot整合Kafka

1.1 pom文件中添加maven引用

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.7</version>
</dependency>

1.2 yml文件中增加kafka生产者的配置

spring:
  kafka:
    # 指定kafka server的地址,集群配多个,中间,逗号隔开
    bootstrap-servers:
      - 127.0.0.1:9092
      - 127.0.0.1:9093
      - 127.0.0.1:9094
    # kafka生产者配置
    producer:
      # 写入失败时,重试次数。当leader失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
      # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
      retries: 0
      # 每次批量发送消息的数量,produce积累到一定数据,一次发数据量
      # 当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。
      # 这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置(16K)
      batch-size: 16384
      # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
      # #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
      buffer-memory: 33554432
      #默认情况下消息是不压缩的,此参数可指定采用何种算法压缩消息,可取值:none,snappy,gzip,lz4。snappy压缩算法由Google研发,
      #这种算法在性能和压缩比取得比较好的平衡;相比之下,gzip消耗更多的CPU资源,但是压缩效果也是最好的。通过使用压缩,我们可以节省网络带宽和Kafka存储成本。
      #如果不开启压缩,可设置为none(默认值),比较大的消息可开启
      compressionType: none
      #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
      #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,
      #    无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
      #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后
      #    立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
      #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,
      #    这相当于acks = -1的设置。
      #可以设置的值为:all, -1, 0, 1
      acks: all
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      # 连接超时时间
      properties:
        request.timeout.ms: 30000
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093

###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0

# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1

# 批量大小
spring.kafka.producer.batch-size=16384

# 提交延时
spring.kafka.producer.properties.linger.ms=0

# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了

# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432

# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup

# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true

# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000

#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
#spring.kafka.consumer.auto-commit-interval=100

# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest

# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000

# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000

# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false

# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50

1.3 配置生产者Producer

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import static com.alibaba.fastjson.JSON.toJSONString;

/**
 * @Author: huangyibo
 * @Date: 2022/7/2 17:28
 * @Description: kafka生产者类
 */

@Component
@Slf4j
@SuppressWarnings({"unused"})
public class ProducerUtils {

    private static final String PUSH_MSG_LOG = "准备发送消息为:{}";

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    private KafkaAdminClient kafkaAdminClient;

    /**
     * 如果没有topic,则创建一个
     * @param topicName :
     * @param partitionNum :
     * @param replicaNum :
     * @return org.apache.kafka.clients.admin.CreateTopicsResult
     */
    public Boolean createTopic(String topicName, int partitionNum, int replicaNum){
        KafkaFuture<Set<String>> topics = kafkaAdminClient.listTopics().names();
        try {
            if (topics.get().contains(topicName)) {
                return true;
            }
            NewTopic newTopic = new NewTopic(topicName, partitionNum, (short) replicaNum);
            kafkaAdminClient.createTopics(Collections.singleton(newTopic));
            return true;
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
            return false;
        }
    }

    /**
     * 传入topic名称,json格式字符串的消息,生产者进行发送
     * @param topicName : topic名称
     * @param jsonStr : 消息json字符串
     * @return boolean : 推送是否成功
     */
    public boolean sendMessage(String topicName, String jsonStr) {
        createTopic(topicName, 5, 5);
        log.info(PUSH_MSG_LOG, jsonStr);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                jsonStr));

        return dealSendResult(future);
    }

    /**
     * 传入topic名称,json格式字符串数组的消息,生产者进行发送
     * @param topicName : topic名称
     * @param jsonStrs : 消息json字符串数组
     * @return boolean : 推送是否成功
     */
    public Boolean[] sendMessage(String topicName, String[] jsonStrs) {
        createTopic(topicName, 5, 5);
        int msgLength = jsonStrs.length;
        Boolean[] success = new Boolean[msgLength];
        for (int i = 0; i < msgLength; i++) {
            String jsonStr = jsonStrs[i];
            log.info(PUSH_MSG_LOG, jsonStr);
            //发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                    jsonStr));
            success[i] = dealSendResult(future);
        }
        return success;
    }

    /**
     * 传入topic名称,消息对象,生产者进行发送
     * @param topicName : topic名称
     * @param obj : 消息对象
     * @return boolean : 推送是否成功
     */
    public boolean sendMessage(String topicName, Object obj) {
        createTopic(topicName, 5, 5);
        String jsonStr = toJSONString(obj);
        log.info(PUSH_MSG_LOG, jsonStr);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                jsonStr));

        return dealSendResult(future);
    }

    /**
     * 传入topic名称,消息对象数组,生产者进行发送
     * @param topicName : topic名称
     * @param list : 消息对象数组
     * @return boolean : 推送是否成功
     */
    public Boolean[] sendMessage(String topicName, List<Object> list) {
        createTopic(topicName, 5, 5);
        Boolean[] success = new Boolean[list.size()];
        for (int i = 0; i < list.size(); i++) {
            Object obj = list.get(i);
            String jsonStr = toJSONString(obj);
            log.info(PUSH_MSG_LOG, jsonStr);
            //发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                    jsonStr));
            success[i] = dealSendResult(future);
        }
        return success;
    }

    /**
     * 传入topic名称,json格式字符串的消息,生产者进行发送
     * @param topicName : topic名称
     * @param key : 消息key
     * @param jsonStr : 消息json字符串
     * @return boolean : 推送是否成功
     */
    public boolean sendMessage(String topicName, String key, String jsonStr) {
        createTopic(topicName, 5, 5);
        log.info(PUSH_MSG_LOG, jsonStr);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                key, jsonStr));

        return dealSendResult(future);
    }

    /**
     * 传入topic名称,json格式字符串数组的消息,生产者进行发送
     * @param topicName : topic名称
     * @param key : 消息key
     * @param jsonStrs : 消息json字符串数组
     * @return boolean : 推送是否成功
     */
    public Boolean[] sendMessage(String topicName, String key, String[] jsonStrs) {
        createTopic(topicName, 5, 5);
        int msgLength = jsonStrs.length;
        Boolean[] success = new Boolean[msgLength];
        for (int i = 0; i < msgLength; i++) {
            String jsonStr = jsonStrs[i];
            log.info(PUSH_MSG_LOG, jsonStr);
            //发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                    key, jsonStr));
            success[i] = dealSendResult(future);
        }
        return success;
    }

    /**
     * 传入topic名称,消息对象,生产者进行发送
     * @param topicName : topic名称
     * @param key : 消息key
     * @param obj : 消息对象
     * @return boolean : 推送是否成功
     */
    public boolean sendMessage(String topicName, String key, Object obj) {
        createTopic(topicName, 5, 5);
        String jsonStr = toJSONString(obj);
        log.info(PUSH_MSG_LOG, jsonStr);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                key, jsonStr));

        return dealSendResult(future);
    }

    /**
     * 传入topic名称,消息对象数组,生产者进行发送
     * @param topicName : topic名称
     * @param key : 消息key
     * @param list : 消息对象数组
     * @return boolean : 推送是否成功
     */
    public Boolean[] sendMessage(String topicName, String key, List<Object> list) {
        createTopic(topicName, 5, 5);
        Boolean[] success = new Boolean[list.size()];
        for (int i = 0; i < list.size(); i++) {
            Object obj = list.get(i);
            String jsonStr = toJSONString(obj);
            log.info(PUSH_MSG_LOG, jsonStr);
            //发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                    key, jsonStr));
            success[i] = dealSendResult(future);
        }
        return success;
    }

    /**
     * 传入topic名称,json格式字符串的消息,生产者进行发送
     * @param topicName : topic名称
     * @param partition : 消息发送分区
     * @param key : 消息key
     * @param jsonStr : 消息json字符串
     * @return boolean : 推送是否成功
     */
    public boolean sendMessage(String topicName, int partition, String key, String jsonStr) {
        createTopic(topicName, 5, 5);
        log.info(PUSH_MSG_LOG, jsonStr);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                partition, key, jsonStr));

        return dealSendResult(future);
    }

    /**
     * 传入topic名称,json格式字符串数组的消息,生产者进行发送
     * @param topicName : topic名称
     * @param partition : 消息发送分区
     * @param key : 消息key
     * @param jsonStrs : 消息json字符串数组
     * @return boolean : 推送是否成功
     */
    public Boolean[] sendMessage(String topicName, int partition, String key, String[] jsonStrs) {
        createTopic(topicName, 5, 5);
        int msgLength = jsonStrs.length;
        Boolean[] success = new Boolean[msgLength];
        for (int i = 0; i < msgLength; i++) {
            String jsonStr = jsonStrs[i];
            log.info(PUSH_MSG_LOG, jsonStr);
            //发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                    partition, key, jsonStr));
            success[i] = dealSendResult(future);
        }
        return success;
    }

    /**
     * 传入topic名称,消息对象,生产者进行发送
     * @param topicName : topic名称
     * @param partition : 消息发送分区
     * @param key : 消息key
     * @param obj : 消息对象
     * @return boolean : 推送是否成功
     */
    public boolean sendMessage(String topicName, int partition, String key, Object obj) {
        createTopic(topicName, 5, 5);
        String jsonStr = toJSONString(obj);
        log.info(PUSH_MSG_LOG, jsonStr);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
                partition, key, jsonStr));

        return dealSendResult(future);
    }

    /**
     * 传入topic名称,消息对象数组,生产者进行发送
     * @param topicName : topic名称
     * @param partition : 消息发送分区
     * @param key : 消息key
     * @param list : 消息对象数组
     * @return boolean : 推送是否成功
     */
    public Boolean[] sendMessage(String topicName, int partition, String key, List<Object> list) {
        createTopic(topicName, 5, 5);
        Boolean[] success = new Boolean[list.size()];
        for (int i = 0; i < list.size(); i++) {
            Object obj = list.get(i);
            String jsonStr = toJSONString(obj);
            log.info(PUSH_MSG_LOG, jsonStr);
            //发送消息
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(
                    topicName, partition, key, jsonStr));
            success[i] = dealSendResult(future);
        }
        return success;
    }

    /**
     * 处理消息推送结果
     * kafkaTemplate提供了一个回调方法addCallback,
     * 我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理
     * @param future :
     * @return boolean
     */
    private boolean dealSendResult(ListenableFuture<SendResult<String, Object>> future) {
        final boolean[] success = {false};
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                //发送失败的处理
                log.info("生产者 发送消息失败 exMessage:{}", throwable.getMessage());
                success[0] = false;
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                //成功的处理
                log.info("生产者 发送消息成功, topic:{}, partition:{}, offset:{}",
                        result.getRecordMetadata().topic(),
                        result.getRecordMetadata().partition(),
                        result.getRecordMetadata().offset());
                success[0] = true;
            }
        });
        return success[0];
    }
}

此处生产者是异步发送消息,不用等待消息发送完成。

Producer是一个接口,声明了同步send和异步send两个重要方法。

ProducerRecord 消息实体类,每条消息由(topic,key,value,timestamp)四元组封装。一条消息key可以为空和timestamp可以设置当前时间为默认值。

1.3.1 带回调的生产者

kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法,

@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
    kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
        // 消息发送到的topic
        String topic = success.getRecordMetadata().topic();
        // 消息发送到的分区
        int partition = success.getRecordMetadata().partition();
        // 消息在分区内的offset
        long offset = success.getRecordMetadata().offset();
        System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
    }, failure -> {
        System.out.println("发送消息失败:" + failure.getMessage());
    });
}
@GetMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
    kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("发送消息失败:"+ex.getMessage());
        }
 
        @Override
        public void onSuccess(SendResult<String, Object> result) {
            System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                    + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
        }
    });
}

1.3.2、自定义分区器

我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

  • ① 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
  • ② 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;
  • ③ patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;

※ 我们来自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区。

public class CustomizePartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 自定义分区规则(这里假设全部发到0号分区)
        // ......
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

在application.propertise中配置自定义分区器,配置的值就是分区器类的全路径名

# 自定义分区器
spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

1.3.3、kafka事务提交

如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务。

@GetMapping("/kafka/transaction")
public void sendMessage7(){
    // 声明事务:后面报错消息不会发出去
    kafkaTemplate.executeInTransaction(operations -> {
        operations.send("topic1","test executeInTransaction");
        throw new RuntimeException("fail");
    });

    // 不声明事务:后面报错但前面消息已经发送成功了
   kafkaTemplate.send("topic1","test executeInTransaction");
   throw new RuntimeException("fail");
}

1.4 消费者

1.4.1、指定topic、partition、offset消费

前面我们在监听消费topic1的时候,监听的是topic1上所有的消息,如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供。

/**
 * @Title 指定topic、partition、offset消费
 * @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
 * @Param [record]
 * @return void
 **/
@KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
        @TopicPartition(topic = "topic1", partitions = { "0" }),
        @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
})
public void onMessage2(ConsumerRecord<?, ?> record) {
    System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
}

属性解释:

  • ① id:消费者ID;
  • ② groupId:消费组ID;
  • ③ topics:监听的topic,可监听多个;
  • ④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。

上面onMessage2监听的含义:监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息。

注意:topics和topicPartitions不能同时使用;

1.4.2、批量消费

设置application.prpertise开启批量消费即可,

# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50

接收消息时用List来接收,监听代码如下,

@KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1")
public void onMessage3(List<ConsumerRecord<?, ?>> records) {
    System.out.println(">>>批量消费一次,records.size()="+records.size());
    for (ConsumerRecord<?, ?> record : records) {
        System.out.println(record.value());
    }
}

1.4.3、ConsumerAwareListenerErrorHandler 异常处理器

通过异常处理器,我们可以处理consumer在消费时发生的异常。

新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器。

// 新建一个异常处理器,用@Bean注入
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
    return (message, exception, consumer) -> {
        System.out.println("消费异常:"+message.getPayload());
        return null;
    };
}

// 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
@KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
    throw new Exception("简单消费-模拟异常");
}

// 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
@KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
    System.out.println("批量消费一次...");
    throw new Exception("批量消费-模拟异常");
}

执行看一下效果

1.4.4、消息过滤器

消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。

配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。

@Component
public class KafkaConsumer {
    @Autowired
    ConsumerFactory consumerFactory;

    // 消息过滤器
    @Bean
    public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        // 被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        // 消息过滤策略
        factory.setRecordFilterStrategy(consumerRecord -> {
            if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
                return false;
            }
            //返回true消息则被过滤
            return true;
        });
        return factory;
    }

    // 消息过滤监听
    @KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
    public void onMessage6(ConsumerRecord<?, ?> record) {
        System.out.println(record.value());
    }
}

上面实现了一个"过滤奇数、接收偶数"的过滤策略,我们向topic1发送0-99总共100条消息,看一下监听器的消费情况,可以看到监听器只消费了偶数

1.4.5、消息转发

在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。

在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下:

/**
 * @Title 消息转发
 * @Description 从topic1接收到的消息经过处理后转发到topic2
 * @Param [record]
 * @return void
 **/
@KafkaListener(topics = {"topic1"})
@SendTo("topic2")
public String onMessage7(ConsumerRecord<?, ?> record) {
    return record.value()+"-forward message";
}

1.4.6、定时启动、停止监听器

默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用KafkaListenerEndpointRegistry,下面我们就来实现:

  • ① 禁止监听器自启动;
  • ② 创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;

新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry 在SpringIO中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动。

@EnableScheduling
@Component
public class CronTimer {

    /**
     * @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
     * 而是会被注册在KafkaListenerEndpointRegistry中,
     * 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
     **/
    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private ConsumerFactory consumerFactory;

    // 监听器容器工厂(设置禁止KafkaListener自启动)
    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        //禁止KafkaListener自启动
        container.setAutoStartup(false);
        return container;
    }

    // 监听器
    @KafkaListener(id="timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
    public void onMessage1(ConsumerRecord<?, ?> record){
        System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }

    // 定时启动监听器
    @Scheduled(cron = "0 42 11 * * ? ")
    public void startListener() {
        System.out.println("启动监听器...");
        // "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
        if (!registry.getListenerContainer("timingConsumer").isRunning()) {
            registry.getListenerContainer("timingConsumer").start();
        }
        //registry.getListenerContainer("timingConsumer").resume();
    }

    // 定时停止监听器
    @Scheduled(cron = "0 45 11 * * ? ")
    public void shutDownListener() {
        System.out.println("关闭监听器...");
        registry.getListenerContainer("timingConsumer").pause();
    }
}

启动项目,触发生产者向topic1发送消息,可以看到consumer没有消费,因为这时监听器还没有开始工作

11:42分监听器启动开始工作,消费消息

11:45分监听器停止工作,

1.4.7 配置消费者Consumer

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.Optional;

/**
 * @Author: huangyibo
 * @Date: 2022/7/2 17:32
 * @Description: kafka消费者类
 */

@Component
@Slf4j
public class ConsumerUtils {

    @Autowired
    private RedisUtils redisUtils;

    @Bean
    public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory consumerFactory){
        ConcurrentKafkaListenerContainerFactory<Integer,String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(5);
        factory.getContainerProperties().setPollTimeout(1000);
        factory.setBatchListener(true);//设置为批量消费,每个批次数量在Kafka配置参数中设置
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动提交ackMode
        return factory;
    }

    /**
     * 单条的消费kafka消息
     * @param record : 消息记录
     * @param ack : ack回调确认
     * @return void :
     */
    @KafkaListener(topics = KafkaConstants.TOPIC_TEST, topicPartitions = {
            @TopicPartition(topic = KafkaConstants.TOPIC_TEST, partitions = {"0" ,"2" ,"4"}),
    }, groupId = KafkaConstants.TOPIC_GROUP1)
    public void topicTest(ConsumerRecord<String, String> record, Acknowledgment ack) {

        Optional<String> message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("topic_test 消费了: Topic:" + record.topic() + ",key:" + record.key() + ",Message:" + msg);
            ack.acknowledge();//手动提交offset
        }
    }

    /**
     * 批量的消费kafka消息,要配合containerFactory使用,配置的bean见batchFactory
     * @param records : 消息记录列表
     * @param ack : ack回调确认
     * @return void :
     */
    @Transactional(rollbackOn = Exception.class)
    @KafkaListener(topics = KafkaConstants.TOPIC_TEST, topicPartitions = {
            @TopicPartition(topic = KafkaConstants.TOPIC_TEST, partitions = {"1", "3"}),
    }, groupId = KafkaConstants.TOPIC_GROUP2, containerFactory="batchFactory")
    public void topicTest2(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        try {
            for (ConsumerRecord<String, String> record : records) {
                //取到消息后,先查询缓存中是否已经存在,存在表示不需要再次处理
                //如果消息不存在,业务处理完成后将消息存入redis;如果消息存在直接跳过;这样可防止重复消费
                boolean isExists = redisUtils.hasKey(record.topic() + record.partition() + record.key());
                if (!isExists) {
                    Optional<String> message = Optional.ofNullable(record.value());
                    if (message.isPresent()) {
                        Object msg = message.get();
                        log.info("topic_test1 消费了: Topic:" + record.topic() + ",key:" + record.key() + ",Message:" + msg);
                    }
                    redisUtils.set(record.topic() + record.partition() + record.key(), record.value());
                }
            }
            ack.acknowledge();//手动提交offset
        }catch (Exception e){
            log.error(e.getMessage());
            throw e;
        }
    }
}

消费者,写了两种模式,批量获取消息与单条获取消息,获取消息的时候,指定topic,partition。通过KafkaListener监听来消费消息。

采用Kafka提供的StringSerializer和StringDeserializer进行序列化和反序列化,因为此种序列化方式无法序列化实体类。

1.5 使用

接下来当然就是使用了,直接定义一个接口,然后访问接口,在接口中调用Producer发送消息,就可在Consumer消费者中监听获取到消息。

@ApiOperation(value = "测试1", notes = "test2")
@GetMapping(value = "/test2")
public ResponseVo test2(String value, String key, Integer partition){
	if(StringUtils.isBlank(value)){
		return new ResponseVo.Builder().error().message("请从传入发送的消息!").build();
	}
	Message message = new Message.Builder().id(UuidUtil.getUuid32()).msg(value).sendTime(DateUtils.nowDate()).build();
	String str = JSONObject.toJSONString(message);
	if(StringUtils.isNotBlank(key)){
		if(partition != null){
			producerUtils.sendMessage(KafkaConstants.TOPIC_TEST, partition, key, str);
		}else{
			producerUtils.sendMessage(KafkaConstants.TOPIC_TEST, key, str);
		}
	}else {
		producerUtils.sendMessage(KafkaConstants.TOPIC_TEST, str);
	}
	return null;
}

1.6 测试验证

打开swagger页面,找到接口,录入参数:

点击execute执行测试。结果如下图:

二、采用自定义序列化和反序列化器进行实体类的序列化和反序列化

和内置的StringSerializer字符串序列化一样,如果要自定义序列化方式,需要实现接口Serializer。假设每个字段按照下图所示的方式自定义序列化:

image

2.1、创建User实体类

public class User implements Serializable {

    private Long id;

    private String name;

    private Integer age;

    /**
     * transient 关键字修饰的字段不会被序列化
     */
    private transient String desc;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    public String getDesc() {
        return desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", age=" + age +
                ", desc='" + desc + '\'' +
                '}';
    }
}

2.2、创建User序列化器

public class UserSerializable implements Serializer<User> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String topic, User user) {
        System.out.println("topic : " + topic + ", user : " + user);
        byte[] dataArray = null;
        ByteArrayOutputStream outputStream = null;
        ObjectOutputStream objectOutputStream = null;
        try {
            outputStream = new ByteArrayOutputStream();
            objectOutputStream = new ObjectOutputStream(outputStream);
            objectOutputStream.writeObject(user);
            dataArray = outputStream.toByteArray();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }finally {
            if(outputStream != null){
                try {
                    outputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(objectOutputStream != null){
                try {
                    objectOutputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return dataArray;
    }

    @Override
    public void close() {

    }
}

2.3、创建User反序列化器

public class UserDeserializer implements Deserializer<User> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public User deserialize(String topic, byte[] bytes) {
        User user = null;
        ByteArrayInputStream inputStream = null;
        ObjectInputStream objectInputStream = null;
        try {
            inputStream = new ByteArrayInputStream(bytes);
            objectInputStream = new ObjectInputStream(inputStream);
            user = (User)objectInputStream.readObject();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }finally {
            if(inputStream != null){
                try {
                    inputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(objectInputStream != null){
                try {
                    objectInputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return user;
    }

    @Override
    public void close() {

    }
}

2.4、修改application-dev.properties配置

A、修改生产者配置的value-serializer

# 指定生产者消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=com.yibo.springbootkafkademo.Serializable.UserSerializable

B、修改消费者配置的value-deserializer

# 指定消费者消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.yibo.springbootkafkademo.Serializable.UserDeserializer

2.5、生产者向kafka发送消息

@RestController
public class KafkaController {

    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;

    @PostMapping("/user/save")
    public boolean saveUser(@RequestBody User user){
        kafkaTemplate.send("userTopic",user);
        return true;
    }
}

6、消费者监听topic=userTopic的消息

@Component
public class ConsumerListener {

    @KafkaListener(topics = "userTopic")
    public void onMessage(User user){
        //insertIntoDb(buffer);//这里为插入数据库代码
        System.out.println(user);
    }
}

总结

可以看到,自定义Serializer和Deserializer非常痛苦,还有很多类型不支持,非常脆弱。复杂类型的支持更是一件痛苦的事情,不同版本之间的兼容性问题更是一个极大的挑战。由于Serializer和Deserializer影响到上下游系统,导致牵一发而动全身。自定义序列化&反序列化实现不是能力的体现,而是逗比的体现。所以强烈不建议自定义实现序列化&反序列化,推荐直接使用StringSerializer和StringDeserializer,然后使用json作为标准的数据传输格式。站在巨人的肩膀上,事半功倍。

参考: https://blog.csdn.net/liu649983697/article/details/119456475

https://blog.csdn.net/yuanlong122716/article/details/105160545

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  UMG78UIvW0PY   2023年11月12日   22   0   0 服务端端口号json
vxNQtvtQlfbi