一、定义生产者拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class MyProducerInterceptor implements ProducerInterceptor<String,String> {
private volatile long sendSuccess = 0;
private volatile long sendFail = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
String newValue = "prefix:"+record.value();
ProducerRecord<String,String> recordFinal = new ProducerRecord<>(record.topic(), record.partition(),record.timestamp(),
record.key(),newValue,record.headers());
return recordFinal;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if(exception == null){
sendSuccess++;
}
else{
sendFail++;
}
}
@Override
public void close() {
System.out.println("fail and success:");
System.out.println(sendFail);
System.out.println(sendSuccess);
}
@Override
public void configure(Map<String, ?> configs) {
}
}
二、使用生产者拦截器
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class ProducerInterceptorTest {
public static void main(String[] args) {
System.out.println(StringSerializer.class.getName());
Properties properties= new Properties();
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
properties.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,MyProducerInterceptor.class.getName());
KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);
for(int i=0;i<10;i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("study2", "hello ,my sister 8");
Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
int partition = 0;
try {
partition = future.get().partition();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//System.out.println(partition);
}
kafkaProducer.close();
}
}