KafkaTemplate配置ProducerInterceptor|springboot整合kafka,配置自定义拦截器
  TEZNKK3IfmPf 2023年11月15日 22 0

实际开发中,可能需要多kafka发送的消息在发送前进行操作,比如按照某个规则过滤掉不符合要求的消息,或者屏蔽某些关键词,或者改变某些属性等等,都可以依托拦截器链来实现。

自定义拦截器需要实现ProducerInterceptor接口,根据需要重写相应的方法

如果希望改变发送前的数据值,需要重新onSend方法,从ProducerRecord中取得对应的值,进行自定义的修改,然后重新包装回ProducerRecord

如果希望对消息发送的结果进行处理,则需要重写onAcknowledgement方法,exception有值,说明消息发送失败,可以采取自定义的措施,比如重试等等,也可以用来统计消息发送的成功率

如果希望在Producer关闭时,做一些自己的业务,则可以重写close方法。

package com.hulang.kafkaboot.Interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
* 自定义拦截器
*/
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord onSend(ProducerRecord record) {
System.out.println("MyProducerInterceptor 拦截器执行");

String value = (String) record.value();
if ("hello".equals(value)) {
value = "你好";
}
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
record.topic(),
record.partition(),
record.timestamp(),
(String) record.key(),
value, // 修改后的 value值
record.headers());
return producerRecord;

}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

}

@Override
public void close() {

}


@Override
public void configure(Map<String, ?> configs) {

}
}

常规项目配置

常规项目中,KafkaProducer是自己管理的,只需要在properties添加自定义拦截器即可,如下

private static void sendCallBack() {
// 创建kafka生产者对象
Properties properties = new Properties();
//连接集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.163:9092,192.168.0.216:9092,192.168.0.164:9092");

// 指定对应的key和value序列号类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 自定义拦截器
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());



KafkaProducer kafkaProducer = new KafkaProducer<String, String>(properties);
// 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("first", "hello call back" + i), (recordMetadata, exception) -> {
System.out.println(recordMetadata.topic() + ": " + recordMetadata.partition());
});

}
// 关闭资源
kafkaProducer.close();
}

springboot项目配置方法1

server:
port: 8877
spring:
kafka:
bootstrap-servers: 192.168.0.163:9092,192.168.0.216:9092,192.168.0.164:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
# 拦截器配置
interceptor:
classes: com.hulang.kafkaboot.Interceptor.MyProducerInterceptor
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: 9527

有多个,用逗号隔开即可,注意,执行顺序取决于配置的先后顺序,在前面的先执行

server:
port: 8877
spring:
kafka:
bootstrap-servers: 192.168.0.163:9092,192.168.0.216:9092,192.168.0.164:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
# 拦截器配置,执行先后顺序取决于配置顺序
interceptor:
classes: com.hulang.kafkaboot.Interceptor.MyProducerInterceptor2,com.hulang.kafkaboot.Interceptor.MyProducerInterceptor
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: 9527

springboot项目配置方法2

package com.hulang.kafkaboot.config;

import com.hulang.kafkaboot.Interceptor.MyProducerInterceptor;
import com.hulang.kafkaboot.Interceptor.MyProducerInterceptor2;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Configuration
public class KafkaConfig {

@Value("")
private String BOOTSTRAP_SERVERS_CONFIG;

@Value("")
private String KEY_SERIALIZER_CLASS_CONFIG;


@Value("")
private String VALUE_SERIALIZER_CLASS_CONFIG;
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();

// kafka连接地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
// key序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER_CLASS_CONFIG);
// value序列化
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER_CLASS_CONFIG);


// 自定义拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add(MyProducerInterceptor.class.getName());
interceptors.add(MyProducerInterceptor2.class.getName());
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);

return props;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
}

添加多个拦截器小细节

细节1

如果你有多个拦截器,put自定义拦截器时,不能直接put两个连续的拦截器,错误示范如下:

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,MyProducerInterceptor.class.getName());
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,MyProducerInterceptor2.class.getName());

这样操作只会让最后一个拦截器生效,前面的拦截器被覆盖。

需要采用数组的形式,挨个添加,然后将数组作为参数传入。

// 自定义拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add(MyProducerInterceptor.class.getName());
interceptors.add(MyProducerInterceptor2.class.getName());
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);

细节2

同一个拦截器添加多次,会执行多次,如果没有这样的需要,应注意避免,浪费资源。


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月15日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   18天前   41   0   0 java
  TEZNKK3IfmPf   2024年05月31日   52   0   0 java
TEZNKK3IfmPf