kafka复习:(6)生产者拦截器
  Ta2cNb9VdLMk 2023年11月02日 65 0


一、定义生产者拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class MyProducerInterceptor implements ProducerInterceptor<String,String> {
    private volatile long sendSuccess = 0;
    private volatile long sendFail = 0;
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        String newValue = "prefix:"+record.value();
        ProducerRecord<String,String> recordFinal = new ProducerRecord<>(record.topic(), record.partition(),record.timestamp(),
                record.key(),newValue,record.headers());
        return recordFinal;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if(exception == null){
            sendSuccess++;
        }
        else{
            sendFail++;
        }

    }

    @Override
    public void close() {
        System.out.println("fail and success:");
        System.out.println(sendFail);
        System.out.println(sendSuccess);

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

二、使用生产者拦截器

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class ProducerInterceptorTest {
    public static void main(String[] args) {
        System.out.println(StringSerializer.class.getName());
        Properties properties= new Properties();

        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
        properties.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,MyProducerInterceptor.class.getName());
        KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);

        for(int i=0;i<10;i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("study2", "hello ,my sister 8");
            Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
            int partition = 0;
            try {
                partition = future.get().partition();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            //System.out.println(partition);
        }

        kafkaProducer.close();
    }
}


【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
Ta2cNb9VdLMk