kafka——Producer API
  vxNQtvtQlfbi 2023年11月02日 348 0

一、Kafka 核心 API

下图是官方文档中的一个图,形象的描述了能与 Kafka集成的客户端类型

Kafka的五类客户端API类型如下:

  • AdminClient API:允许管理和检测Topic、broker以及其他Kafka实例,与Kafka自带的脚本命令作用类似。
  • Producer API:发布消息到1个或多个Topic,也就是生产者或者说发布方需要用到的API。
  • Consumer API:订阅1个或多个Topic,并处理产生的消息,也就是消费者或者说订阅方需要用到的API。
  • Stream API:高效地将输入流转换到输出流,通常应用在一些流处理场景。
  • Connector API:从一些源系统或应用程序拉取数据到Kafka,如上图中的DB。

本文中,我们将主要介绍 Producer API。

二、生产者客户端的基本架构图

由上图可以看出:KafkaProducer有两个基本线程。

 

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了 两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。

 

main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。

  • 主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecoderAccumulator中(这里可以看出拦截器确实在序列化和分区之前执行)。

    • 消息收集器主要的作用是缓存消息,让发送线程可以批量发送,减少网络传输资源消耗提升性能,缓存大小可以通过buffer.memory配置,默认值为32MB,如果生产者发送消息的速度超过发送到服务器的速度,则send()方法要么被阻塞,要么抛出异常,取决于参数max.block.ms,默认值为60000ms
    • 主线程发送的消息被追加到消息累加器的一个双端队列中,消息收集器RecoderAccumulator为每个分区都维护了一个 Deque<ProducerBatch> 类型的双端队列,队列中是ProducerBatch,包含多个ProducerRecord。
    • ProducerBatch 可以暂时理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低网络影响。
    • 由于生产者客户端使用 java.io.ByteBuffer 在发送消息之前进行消息保存,并维护了一个 BufferPool 实现 ByteBuffer 的复用;该缓存池只针对特定大小( batch.size 指定)的 ByteBuffer进行管理,对于消息过大的缓存,不能做到重复利用。
    • 每次追加一条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取一个ProducerBatch,判断当前消息的大小是否可以写入该批次中。若可以写入则写入;若不可以写入,则新建一个ProducerBatch,判断该消息大小是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建立新的ProducerBatch,这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的 ProducerBatch ,缺点就是该内存不能被复用了。
  • Sender线程

    • 该线程从消息收集器获取缓存的消息,将其处理为 <Node, List<ProducerBatch> 的形式, Node 表示集群的broker节点。
    • 进一步将<Node, List<ProducerBatch>转化为<Node, Request>形式,此时才可以向服务端发送数据。
    • 在发送之前,Sender线程将消息以 Map<NodeId, Deque<Request>> 的形式保存到 InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压力最小的一个,以实现消息的尽快发出。

相关参数:

  • batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。
  • linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。

三、Producer API

3.1、导入相关依赖

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.6.0</version>
</dependency>

3.2、Producer异步发送演示

private static final String TOPIC_NAME = "yibo_topic";

/**
 * Producer异步发送演示
 */
public static void producerSend(){
	Properties properties = new Properties();
	//kafka集群
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
	//ack应答级别
	properties.put(ProducerConfig.ACKS_CONFIG,"all");
	//重试次数
	properties.put(ProducerConfig.RETRIES_CONFIG,"3");
	//批次大小
	properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
	//等待时间
	properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
	//RecoderAccumulator缓冲区大小
	properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
	//key,value的序列化类
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	//创建生产者对象
	Producer<String,String> producer = new KafkaProducer<>(properties);
	//消息对象 ProducerRecoder
	for (int i = 0; i < 10; i++) {
		ProducerRecord<String,String> record =
				new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
		producer.send(record);
	}
	producer.close();
}

3.3、Producer异步发送演示

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

 

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

private static final String TOPIC_NAME = "yibo_topic";

/**
 * Producer异步发送带回调函数演示
 */
public static void producerSendWithCallback(){
	Properties properties = new Properties();
	//kafka集群
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
	//ack应答级别
	properties.put(ProducerConfig.ACKS_CONFIG,"all");
	//重试次数
	properties.put(ProducerConfig.RETRIES_CONFIG,"0");
	//批次大小
	properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
	//等待时间
	properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
	//RecoderAccumulator缓冲区大小
	properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
	//key,value的序列化类
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	//创建生产者对象
	Producer<String,String> producer = new KafkaProducer<>(properties);
	//消息对象 ProducerRecoder
	ProducerRecord<String,String> record =
			new ProducerRecord<>(TOPIC_NAME,"key-hello","value-hello world");
	producer.send(record, new Callback() {
		//回调函数,该方法会在 Producer 收到 ack 时调用,为异步调用
		@Override
		public void onCompletion(RecordMetadata recordMetadata, Exception e) {
			if (e == null) {
				log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
			} else {
				log.error("exception",e);
			}
		}
	});
	producer.close();
}

3.4、Producer异步发送带回调函数和Partition负载均衡

private static final String TOPIC_NAME = "yibo_topic";

/**
 * Producer异步发送带回调函数和Partition负载均衡
 */
public static void producerSendWithCallbackAndPartition(){
	Properties properties = new Properties();
	//kafka集群
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
	//ack应答级别
	properties.put(ProducerConfig.ACKS_CONFIG,"all");
	//重试次数
	properties.put(ProducerConfig.RETRIES_CONFIG,"0");
	//批次大小
	properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
	//等待时间
	properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
	//RecoderAccumulator缓冲区大小
	properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
	//key,value的序列化类
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	//Partition负载均衡
	properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.yibo.kafka.producer.SamplePartition");
	//创建生产者对象
	Producer<String,String> producer = new KafkaProducer<>(properties);
	//消息对象 ProducerRecoder
	ProducerRecord<String,String> record =
			new ProducerRecord<>(TOPIC_NAME,"key-hello","value-hello world");
	producer.send(record, new Callback() {
		//回调函数,该方法会在 Producer 收到 ack 时调用,为异步调用
		@Override
		public void onCompletion(RecordMetadata recordMetadata, Exception e) {
			if (e == null) {
				log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
			} else {
				log.error("exception",e);
			}
		}
	});
	producer.close();
}

/**
 * @Description: Partitioner分区接口,以实现自定义的消息分区
 *
 * 默认分区器DefaultPartitioner org.apache.kafka.clients.producer.internals.DefaultPartitioner
 *
 * 如果消息的key为null,此时producer会使用默认的partitioner分区器将消息随机分布到topic的可用partition中。
 * 如果key不为null,并且使用了默认的分区器,kafka会使用自己的hash算法对key取hash值,
 * 使用hash值与partition数量取模,从而确定发送到哪个分区。
 * 注意:此时key相同的消息会发送到相同的分区(只要partition的数量不变化)
 */
public class SamplePartition implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        /**
         *由于我们按key分区,在这里我们规定:key值不允许为null。在实际项目中,key为null的消息*,可以发送到同一个分区。
         */
        if(keyBytes == null) {
            throw new InvalidRecordException("key cannot be null");
        }
        if(((String)key).equals("1")) {
            return 1;
        }
        System.out.println("key: " + key);
        //如果消息的key值不为1,那么使用hash值取模,确定分区。
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

3.5、Producer异步阻塞发送演示

由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。

private static final String TOPIC_NAME = "yibo_topic";

/**
 * Producer异步阻塞发送演示
 */
public static void producerSyncSend() throws Exception {
	Properties properties = new Properties();
	//kafka集群
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
	//ack应答级别
	properties.put(ProducerConfig.ACKS_CONFIG,"all");
	//重试次数
	properties.put(ProducerConfig.RETRIES_CONFIG,"0");
	//批次大小
	properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
	//等待时间
	properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
	//RecoderAccumulator缓冲区大小
	properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
	//key,value的序列化类
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
	//创建生产者对象
	Producer<String,String> producer = new KafkaProducer<>(properties);
	//消息对象 ProducerRecoder
	ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,"hello","hello world");
	Future<RecordMetadata> send = producer.send(record);
	RecordMetadata recordMetadata = send.get();
	log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
	producer.close();
}

四、自定义 Interceptor

4.1、拦截器原理

Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。

 

对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

  • 1、configure(configs): 获取配置信息和初始化数据时调用。

  • 2、onSend(ProducerRecord): 该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。

  • 3、onAcknowledgement(RecordMetadata, Exception): 该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程 中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer 的消息发送效率。

  • 4、close: 关闭 interceptor,主要用于执行一些资源清理工作。

如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保 线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

4.2、 拦截器案例

  • 1、需求: 实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间 戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。

  • 2、案例实操

  • 1)增加时间戳拦截器

public class TimeInterceptor implements ProducerInterceptor<String, String> {

    @Override
    public void configure(Map<String, ?> map) {

    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        // 创建一个新的 record,把时间戳写入消息体的最前部
        return new ProducerRecord(producerRecord.topic(),
                producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(),
                System.currentTimeMillis() + "," + producerRecord.value().toString());

    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    @Override
    public void close() {

    }
}
  • 2)统计发送消息成功和发送失败消息数,并在 producer 关闭时打印这两个计数器
public class CounterInterceptor implements ProducerInterceptor<String, String> {

    private int errorCounter = 0;
    private int successCounter = 0;

    @Override
    public void configure(Map<String, ?> map) {

    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        return null;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        // 统计成功和失败的次数
        if (e == null) {
            successCounter++;
        } else {
            errorCounter++;
        }

    }

    @Override
    public void close() {
        // 保存结果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }
}
  • 3)producer 主程序
public class InterceptorProducer {

    private static final String TOPIC_NAME = "yibo_topic";

    public static void main(String[] args) {
        // 1 设置配置信息
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");
        props.put("acks", "all");
        props.put("retries", 3);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2 构建拦截链
        List<String> interceptors = new ArrayList<>();
        interceptors.add("com.yibo.kafka.producer.TimeInterceptor");
        interceptors.add("com.yibo.kafka.producer.CounterInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

        Producer<String, String> producer = new KafkaProducer<>(props);

        // 3 发送消息
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "message" + i);
            producer.send(record);
        }

        // 4 一定要关闭 producer,这样才会调用 interceptor 的 close 方法
        producer.close();
    }
}

五、SpringBoot 集成 Kafka

5.1、添加maven依赖

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>2.6.1</version>
</dependency>

5.2、配置 application.properties

# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=192.168.174.128:9092

#=============== provider  =======================
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432

#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
spring.kafka.producer.acks=1

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=testGroup
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=false
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#=============== listener  =======================
# 在侦听器容器中运行的线程数。
spring.kafka.listener.concurrency=5
#listner负责ack,每调用一次,就立即commit
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.missing-topics-fatal=false

5.3、新建Producer

@Component
@Slf4j
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    private static final String TOPIC_NAME = "yibo_topic";

    public void send(Object obj) {
        String obj2String = JSONObject.toJSONString(obj);
        log.info("准备发送消息为:{}", obj2String);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_NAME, obj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                //发送失败的处理
                log.info(TOPIC_NAME + " - 生产者 发送消息失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //成功的处理
                log.info(TOPIC_NAME + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
            }
        });
    }
}

参考: https://www.cnblogs.com/L-Test/p/13443178.html

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
vxNQtvtQlfbi